您现在的位置是:主页 > news > 淮北论坛网/好口碑的关键词优化
淮北论坛网/好口碑的关键词优化
admin2025/4/26 12:21:37【news】
简介淮北论坛网,好口碑的关键词优化,北京大兴区网站建设,卓创源码网日志,几乎每个程序都需要有的功能,对于很多比较大型的,多人合作的程序,使用专业的日志解决方案,比如 fluentd,是个不错的选择。 fluentd 就是重了点,你需要搭建 fluentd 服务,然后不…
日志,几乎每个程序都需要有的功能,对于很多比较大型的,多人合作的程序,使用专业的日志解决方案,比如 fluentd,是个不错的选择。
fluentd 就是重了点,你需要搭建 fluentd 服务,然后不同的应用再通过相应的方式将日志信息传导 fluentd 服务中,当然重的好处是强大,它可以兼容多个语言,只有你的 client 实现好就行,还可以在日志传输管道中加入各种 hook,比如某个带关键字的日志要执行某种操作等等。
我的程序比较轻,之前都是运维同学搭建好了 fluentd+ES 一套日志管理系统,现在要自己弄,有点麻烦,所以决定使用其他方式来实现日志的管理。
先列一下我简单的需求:
-
1. 日志可以存入文件(最基本要求)
-
2. 日志可以存入 MongoDB(方便搜索分析)
-
3. 报警日志可以主动告警(方便我及时修复)
日志存文件
首先来实现前两个功能,利用 Python 自带的 logging 便可以实现将日志内容存入文件的功能,代码如下:
import time
import logging
import logging.handlersLOG_FILENAME = 'main.log'
logger = logging.getLogger()def set_logger():logger.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(process)d-%(threadName)s - ''%(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')console_handler = logging.StreamHandler()console_handler.setFormatter(formatter)logger.addHandler(console_handler)# log output to filefile_handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=10485760, backupCount=5, encoding="utf-8")logger.addHandler(file_handler)set_logger()
logging 模块标准的写法,利用 logging 的 handler 功能实现格式化,同样利用 handler 功能,将日志存入到本地文件中。
日志存 MongoDB
使用 log4mongo 库,可以让你的 logging 无缝将日志存入到 MongoDB 中,log4mongo 提供了符合 logging 调用格式的 Handler,直接使用则可,代码如下:
import time
import logging
import logging.handlers
from log4mongo.handlers import MongoHandler
from logging import *LOG_FILENAME = 'main.log'
logger = logging.getLogger()def set_logger(mongodb=False):logger.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(process)d-%(threadName)s - ''%(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')console_handler = logging.StreamHandler()console_handler.setFormatter(formatter)logger.addHandler(console_handler)if mongodb:# log output to mongodbdb_name = 'bestpitcher_log'mon_handler = MongoHandler(host=mongodb_config['host'],port=int(mongodb_config['port']),database_name=db_name,# username=mongodb_config['user'],# password=mongodb_config['password'],# authentication_db=db_name)mon_handler.setLevel(logging.INFO)logger.addHandler(mon_handler)else:# log output to filefile_handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=10485760, backupCount=5, encoding="utf-8")logger.addHandler(file_handler)set_logger(mongodb=True)
实例化 MongoHandler,获得相应的 handler,然后添加到 logger 中,便实现了将日志写入 MongoDB 的效果,如下图:
日志报警
项目代码在阿里云上运行,阿里云提供了对日志文件进行监控并通过钉钉报警的功能,跟其他组同事交流,他不希望不是自己组里的项目也使用他这套,会显得很乱,至于会不会乱,不纠结,既然人家不想我这样搞,那就自己搞。
简单调用,使用飞书的 WebHook 机器人可以非常轻松的实现日志推送报警的功能。
在开始编写前,要理清飞书机器人的概念,飞书中其实有两种机器人,如果你通过【飞书机器人】去搜索,就会有点懵。
飞书中,每个群组可以设置一个 WebHook 机器人,这个使用个人版飞书便可以直接使用,非常方便,我们的日志监控就利用 WebHook 机器人,其添加方式如下:
1. 先创建一个群,然后点击设置,然后点击【群机器人】,然后点击【添加机器人】
2. 简单配置 WebHook 机器人
从配置就可以看出,WebHook 机器人的工作原理,通过 HTTP 请求机器人的 webhook 地址,请求数据的格式符合 webhook 文档定义的格式变可以请求成功了。
为了安全,我这里还开启了【签名校验】,即如果你通过中间人攻击抓我的包,包中的内容是加密的,而我的后端程序会使用这个签名校验秘钥对加密内容进行解析,获得真实数据,与 WebHook 交互代码如下:
import base64
import hashlib
import hmac
from datetime import datetimeimport requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util import Retry
import hashlib
import base64
from Crypto.Cipher import AESfrom configs import *timestamp = int(datetime.now().timestamp())class AESCipher(object):def __init__(self, key):self.bs = AES.block_sizeself.key = hashlib.sha256(AESCipher.str_to_bytes(key)).digest()@staticmethoddef str_to_bytes(data):u_type = type(b"".decode('utf8'))if isinstance(data, u_type):return data.encode('utf8')return data@staticmethoddef _unpad(s):return s[:-ord(s[len(s) - 1:])]def decrypt(self, enc):iv = enc[:AES.block_size]cipher = AES.new(self.key, AES.MODE_CBC, iv)return self._unpad(cipher.decrypt(enc[AES.block_size:]))def decrypt_string(self, enc):enc = base64.b64decode(enc)return self.decrypt(enc).decode('utf8')class BaseBot:def __init__(self):self.session = requests.Session()# 设置重试self.session.mount('https://', HTTPAdapter(max_retries=Retry(total=5, method_whitelist=frozenset(['GET', 'POST']))))def gen_sign(self, secret):# 拼接时间戳以及签名校验string_to_sign = '{}\n{}'.format(timestamp, secret)# 使用 HMAC-SHA256 进行加密hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 对结果进行 base64 编码sign = base64.b64encode(hmac_code).decode('utf-8')return signclass BaseMsgBot(BaseBot):def __init__(self):super(BaseMsgBot, self).__init__()def send_base_msg(self, msg):"""发送基本的信息:return:"""sign = self.gen_sign(WEBHOOK_SECRET)params = {"timestamp": timestamp,"sign": sign,"msg_type": "text","content": {"text": msg}}resp = requests.post(WEBHOOK_URL, json=params)resp.raise_for_status()result = resp.json()if result.get("code") and result.get("code") != 0:print(f"发送失败:{result['msg']}")returnprint("消息发送成功")if __name__ == '__main__':BaseMsgBot().send_base_msg('懒编程YYDS!')
效果如下:
WebHook 机器人是不与我们的后端程序交互的,即无法实现,我发一段指令给他,他执行相应动作这样的效果,但对于单纯的日志监控,WebHook 够用了。
飞书中另外一种机器人是需要通过创建机器人应用的方式创建,这种机器人不在群组里,而是在工作台中,比如下图我创建了自己的应用机器人。
要创建这种机器人,需要企业版飞书,因为机器人获取消息、发送消息的功能需要申请相应的权限,当然,还有国内惯例,通过 APPSECRET 换取 2 小时后会过期的 access_token,这个我也弄了,因为我喜欢通过飞书机器人控制程序的一下动作,比如从日志机器人中发现了严重报错,但日志机器人无法控制程序,而我人在外面,此时可以通过应用机器人执行一些动作。
飞书 WebHook 机器人对接完了,那怎么与 logging 结合在一起使用呢?
因为我已有的项目中已经大量的使用 logger 了,我不希望去逐行修改使用 logger 的方式,而是希望通过某种对 logger 无感的方式来实现日志传递到 WebHook 的效果。
简单阅读 logging 文档,发现没有 Hook 机制,没办法,只能看 logging 源码走继承重写的解决方案了。
这里可以总结一下我对库修改的方式,如果一个库,没有我想要的功能(通过文档判断),我就会去看它的源代码,然后尝试将核心类通过继承的方式弄出来,然后再在继承出的子类中添加自己的逻辑。
简单分析,会发现 logger 下,使用的 info、warning、error 等方法,都会调用_log 方法,_log 方法会进一步执行相应的动作,这些动作我不关心,因为我会通过 super 方法直接使用。
琢磨一下自己的需求,对于 info 基本的日志,当然不需要日志报警,简单记录到 MongoDB 中就好了,对于 error 级别日志,报错了嘛,当然希望主动告诉我,但有时 info 基本,我也希望它主动告诉我,基于上述分析,写出如下代码:
import logging
import logging.handlers
from logging import *LOG_FILENAME = 'main.log'
LOG_LEVEL = ERRORclass MyLogger(Logger):def __init__(self, name, level=NOTSET):super(MyLogger, self).__init__(name=name, level=level)def _log(self,level,msg,args,exc_info=None,extra=None,stack_info=False,robot=False) -> None:""":param level::param msg::param args::param exc_info::param extra::param stack_info::param robot: 是否要通过飞书机器人将日志发送到飞书上:return:"""super(MyLogger, self)._log(level, msg, args, exc_info, extra, stack_info)if robot or level >= LOG_LEVEL:msg_bot.send_base_msg(msg)def __reduce__(self):return getLogger, ()logger = MyLogger('bestpitcher_log', WARNING)
上述代码中,实现 MyLogger 基础于 Logger,然后重写其中的_log 方法,_log 方法中,第一件事便是通过 super 调用父类中_log 方法的逻辑,然后再添加自己的逻辑,即发送信息到飞书 webhook 的逻辑。
_log 方法中,我添加了 robot 参数,如果打印日志时,设置了 robot,则发送到 webhook,此外还有默认日志级别,这里是 ERROR 级别,即 error 日志,就算 robot 为 False,也会主动发送日志到 webhook 中。
简单测试使用一下:
from logger import loggerdef test_logger():logger.info('[info] 这条日志只会记录在MongoDB中')# exc_info 获得报错时的调用链logger.error('[error] 这条日志会发送到WebHook机器人上', exc_info=True)logger.warning('[warning] 这条日志也会发送到WebHook', robot=True)if __name__ == '__main__':test_logger()
WebHook 效果:
MongoDB 效果:
项目 Github 地址:https://github.com/ayuLiao/simple-logger