日志专题手册
- 作者:
- Vinay Sajip
本页面包含多个与日志相关的专题,历史证明它们是很有用的。教程和参考信息的链接另见 其他资源。
在多模块中使用日志
无论对 logging.getLogger('someLogger')
进行多少次调用,都会返回同一个 logger 对象的引用。不仅在同一个模块内如此,只要是在同一个 Python 解释器进程中,跨模块调用也是一样。同样是引用同一个对象,应用程序也可以在一个模块中定义和配置一个父 logger,而在另一个单独的模块中创建(但不配置)子 logger,对于子 logger 的所有调用都会传给父 logger。以下是主模块:
- import logging
- import auxiliary_module
- # 创建 'spam_application' 日志记录器
- logger = logging.getLogger('spam_application')
- logger.setLevel(logging.DEBUG)
- # 创建可记录调试消息的文件处理器
- fh = logging.FileHandler('spam.log')
- fh.setLevel(logging.DEBUG)
- # 创建具有更高日志层级的控制台处理器
- ch = logging.StreamHandler()
- ch.setLevel(logging.ERROR)
- # 创建格式化器并将其添加到处理器
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- fh.setFormatter(formatter)
- ch.setFormatter(formatter)
- # 将处理器添加到日志记录器
- logger.addHandler(fh)
- logger.addHandler(ch)
- logger.info('creating an instance of auxiliary_module.Auxiliary')
- a = auxiliary_module.Auxiliary()
- logger.info('created an instance of auxiliary_module.Auxiliary')
- logger.info('calling auxiliary_module.Auxiliary.do_something')
- a.do_something()
- logger.info('finished auxiliary_module.Auxiliary.do_something')
- logger.info('calling auxiliary_module.some_function()')
- auxiliary_module.some_function()
- logger.info('done with auxiliary_module.some_function()')
以下是辅助模块:
- import logging
- # 创建日志记录器
- module_logger = logging.getLogger('spam_application.auxiliary')
- class Auxiliary:
- def __init__(self):
- self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
- self.logger.info('creating an instance of Auxiliary')
- def do_something(self):
- self.logger.info('doing something')
- a = 1 + 1
- self.logger.info('done doing something')
- def some_function():
- module_logger.info('received a call to "some_function"')
输出结果会像这样:
- 2005-03-23 23:47:11,663 - spam_application - INFO -
- creating an instance of auxiliary_module.Auxiliary
- 2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
- creating an instance of Auxiliary
- 2005-03-23 23:47:11,665 - spam_application - INFO -
- created an instance of auxiliary_module.Auxiliary
- 2005-03-23 23:47:11,668 - spam_application - INFO -
- calling auxiliary_module.Auxiliary.do_something
- 2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
- doing something
- 2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
- done doing something
- 2005-03-23 23:47:11,670 - spam_application - INFO -
- finished auxiliary_module.Auxiliary.do_something
- 2005-03-23 23:47:11,671 - spam_application - INFO -
- calling auxiliary_module.some_function()
- 2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
- received a call to 'some_function'
- 2005-03-23 23:47:11,673 - spam_application - INFO -
- done with auxiliary_module.some_function()
在多个线程中记录日志
多线程记录日志并不需要特殊处理,以下示例演示了在主线程(起始线程)和其他线程中记录日志的过程:
- import logging
- import threading
- import time
- def worker(arg):
- while not arg['stop']:
- logging.debug('Hi from myfunc')
- time.sleep(0.5)
- def main():
- logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
- info = {'stop': False}
- thread = threading.Thread(target=worker, args=(info,))
- thread.start()
- while True:
- try:
- logging.debug('Hello from main')
- time.sleep(0.75)
- except KeyboardInterrupt:
- info['stop'] = True
- break
- thread.join()
- if __name__ == '__main__':
- main()
脚本会运行输出类似下面的内容:
- 0 Thread-1 Hi from myfunc
- 3 MainThread Hello from main
- 505 Thread-1 Hi from myfunc
- 755 MainThread Hello from main
- 1007 Thread-1 Hi from myfunc
- 1507 MainThread Hello from main
- 1508 Thread-1 Hi from myfunc
- 2010 Thread-1 Hi from myfunc
- 2258 MainThread Hello from main
- 2512 Thread-1 Hi from myfunc
- 3009 MainThread Hello from main
- 3013 Thread-1 Hi from myfunc
- 3515 Thread-1 Hi from myfunc
- 3761 MainThread Hello from main
- 4017 Thread-1 Hi from myfunc
- 4513 MainThread Hello from main
- 4518 Thread-1 Hi from myfunc
以上如期显示了不同线程的日志是交替输出的。当然更多的线程也会如此。
多个 handler 和多种 formatter
日志是个普通的 Python 对象。 addHandler()
方法可加入不限数量的日志 handler。有时候,应用程序需把严重错误信息记入文本文件,而将一般错误或其他级别的信息输出到控制台。若要进行这样的设定,只需多配置几个日志 handler 即可,应用程序的日志调用代码可以保持不变。下面对之前的分模块日志示例略做修改:
- import logging
- logger = logging.getLogger('simple_example')
- logger.setLevel(logging.DEBUG)
- # 创建可记录调试消息的文件处理器
- fh = logging.FileHandler('spam.log')
- fh.setLevel(logging.DEBUG)
- # 创建具有更高日志层级的控制台处理器
- ch = logging.StreamHandler()
- ch.setLevel(logging.ERROR)
- # 创建格式化器并将其添加到处理器
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- ch.setFormatter(formatter)
- fh.setFormatter(formatter)
- # 将处理器添加到日志记录器
- logger.addHandler(ch)
- logger.addHandler(fh)
- # '应用程序' 代码
- logger.debug('debug message')
- logger.info('info message')
- logger.warning('warn message')
- logger.error('error message')
- logger.critical('critical message')
需要注意的是,“应用程序”内的代码并不关心是否存在多个日志 handler。示例中所做的改变,只是新加入并配置了一个名为 fh 的 handler。
在编写和测试应用程序时,若能创建日志 handler 对不同严重级别的日志信息进行过滤,这将十分有用。调试时无需用多条 print
语句,而是采用 logger.debug
:print 语句以后还得注释或删掉,而 logger.debug 语句可以原样留在源码中保持静默。当需要再次调试时,只要改变日志对象或 handler 的严重级别即可。
在多个地方记录日志
假定要根据不同的情况将日志以不同的格式写入控制台和文件。比如把 DEBUG 以上级别的日志信息写于文件,并且把 INFO 以上的日志信息输出到控制台。再假设日志文件需要包含时间戳,控制台信息则不需要。以下演示了做法:
- import logging
- # 设置日志记录到文件 —— 参阅前一节了解详情
- logging.basicConfig(level=logging.DEBUG,
- format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
- datefmt='%m-%d %H:%M',
- filename='tmpmyapp.log',
- filemode='w')
- # 定义一个将 INFO 或更高层级消息写到 sys.stderr 的处理器
- console = logging.StreamHandler()
- console.setLevel(logging.INFO)
- # 设置一个适用于控制台的更简单格式
- formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
- # 告诉处理器使用此格式
- console.setFormatter(formatter)
- # 将处理器添加到根日志记录器
- logging.getLogger('').addHandler(console)
- # 现在我们可以写入根记录器或任何其他记录器。 首先是根记录器...
- logging.info('Jackdaws love my big sphinx of quartz.')
- # 现在,定义几个可以代表你的应用程序中不同组成部分的
- # 其他日志记录器
- logger1 = logging.getLogger('myapp.area1')
- logger2 = logging.getLogger('myapp.area2')
- logger1.debug('Quick zephyrs blow, vexing daft Jim.')
- logger1.info('How quickly daft jumping zebras vex.')
- logger2.warning('Jail zesty vixen who grabbed pay from quack.')
- logger2.error('The five boxing wizards jump quickly.')
当运行后,你会看到控制台如下所示
- root : INFO Jackdaws love my big sphinx of quartz.
- myapp.area1 : INFO How quickly daft jumping zebras vex.
- myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
- myapp.area2 : ERROR The five boxing wizards jump quickly.
而日志文件将如下所示:
- 10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
- 10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
- 10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
- 10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
- 10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所见,DEBUG 级别的日志信息只出现在了文件中,而其他信息则两个地方都会输出。
上述示例只用到了控制台和文件 handler,当然还可以自由组合任意数量的日志 handler。
请注意上面选择的日志文件名 tmpmyapp.log
表示在 POSIX 系统上使用临时文件的标准位置。 在 Windows 上,你可能需要为日志选择不同的目录名称 —— 只要确保该目录存在并且你有在其中创建和更新文件的权限。
自定义处理级别
有时,你想要做的可能略微不同于处理器中标准的级别处理方式,即某个界限以上的所有级别都会被处理器所处理。 要做到这一点,你需要使用过滤器。 让我们来看一个假设你想要执行如下安排的场景:
将严重级别为
INFO
和WARNING
的消息发送到sys.stdout
将严重级别为
ERROR
及以上的消息发送到sys.stderr
将严重级别为
DEBUG
及以上的消息发送到文件app.log
假定你使用以下 JSON 来配置日志记录:
- { "version": 1, "disable_existing_loggers": false, "formatters": { "simple": { "format": "%(levelname)-8s - %(message)s" } }, "handlers": { "stdout": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout" }, "stderr": { "class": "logging.StreamHandler", "level": "ERROR", "formatter": "simple", "stream": "ext://sys.stderr" }, "file": { "class": "logging.FileHandler", "formatter": "simple", "filename": "app.log", "mode": "w" } }, "root": { "level": "DEBUG", "handlers": [ "stderr", "stdout", "file" ] }
- }
这个配置 几乎 能做到我们想要的,但是除了 sys.stdout
在 INFO
和 WARNING
消息之外会只显示严重程度 ERROR
及以上的消息。 为了防止这种情况,我们可以设置一个排除掉这些消息的过滤器并将其添加到相应的处理器中。 这可以通过添加一个平行于 formatters
和 handlers
的 filters
节来配置:
- { "filters": { "warnings_and_below": { "()" : "__main__.filter_maker", "level": "WARNING" } }
- }
并修改 stdout
处理器上的节来添加它:
- { "stdout": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout", "filters": ["warnings_and_below"] }
- }
过滤器就是一个函数,因此我们可以定义 filter_maker
(工厂函数) 如下:
- def filter_maker(level):
- level = getattr(logging, level)
- def filter(record):
- return record.levelno <= level
- return filter
此函数将传入的字符串参数转换为数字级别,并返回一个仅在传入等于或低于指定数字级别的级别时返回 True
的函数。 请注意在这个示例中我是将 filter_maker
定义在一个从命令行运行的测试脚本 main.py
中,因此其所属模块将为 __main__
—— 即在过滤器配置中写作 __main__.filter_maker
。 如果你在不同的模块中定义它则需要加以修改。
在添加该过滤器后,我们就可以运行 main.py
,完整代码如下:
- import json
- import logging
- import logging.config
- CONFIG = '''
- {
- "version": 1,
- "disable_existing_loggers": false,
- "formatters": {
- "simple": {
- "format": "%(levelname)-8s - %(message)s"
- }
- },
- "filters": {
- "warnings_and_below": {
- "()" : "__main__.filter_maker",
- "level": "WARNING"
- }
- },
- "handlers": {
- "stdout": {
- "class": "logging.StreamHandler",
- "level": "INFO",
- "formatter": "simple",
- "stream": "ext://sys.stdout",
- "filters": ["warnings_and_below"]
- },
- "stderr": {
- "class": "logging.StreamHandler",
- "level": "ERROR",
- "formatter": "simple",
- "stream": "ext://sys.stderr"
- },
- "file": {
- "class": "logging.FileHandler",
- "formatter": "simple",
- "filename": "app.log",
- "mode": "w"
- }
- },
- "root": {
- "level": "DEBUG",
- "handlers": [
- "stderr",
- "stdout",
- "file"
- ]
- }
- }
- '''
- def filter_maker(level):
- level = getattr(logging, level)
- def filter(record):
- return record.levelno <= level
- return filter
- logging.config.dictConfig(json.loads(CONFIG))
- logging.debug('A DEBUG message')
- logging.info('An INFO message')
- logging.warning('A WARNING message')
- logging.error('An ERROR message')
- logging.critical('A CRITICAL message')
使用这样的命令运行它之后:
- python main.py 2>stderr.log >stdout.log
我们可以看到结果是符合预期的:
- $ more *.log
- ::::::::::::::
- app.log
- ::::::::::::::
- DEBUG - A DEBUG message
- INFO - An INFO message
- WARNING - A WARNING message
- ERROR - An ERROR message
- CRITICAL - A CRITICAL message
- ::::::::::::::
- stderr.log
- ::::::::::::::
- ERROR - An ERROR message
- CRITICAL - A CRITICAL message
- ::::::::::::::
- stdout.log
- ::::::::::::::
- INFO - An INFO message
- WARNING - A WARNING message
日志配置服务器示例
以下是一个用到了日志配置服务器的模块示例:
- import logging
- import logging.config
- import time
- import os
- # 读取初始配置文件
- logging.config.fileConfig('logging.conf')
- # 在 9999 端口上创建并启动监听器
- t = logging.config.listen(9999)
- t.start()
- logger = logging.getLogger('simpleExample')
- try:
- # 循环遍历日志记录调用以查看
- # 新配置进行的修改,直到按下 Ctrl+C
- while True:
- logger.debug('debug message')
- logger.info('info message')
- logger.warning('warn message')
- logger.error('error message')
- logger.critical('critical message')
- time.sleep(5)
- except KeyboardInterrupt:
- # 清理
- logging.config.stopListening()
- t.join()
以下脚本将接受文件名作为参数,然后将此文件发送到服务器,前面加上文件的二进制编码长度,做为新的日志配置:
- #!/usr/bin/env python
- import socket, sys, struct
- with open(sys.argv[1], 'rb') as f:
- data_to_send = f.read()
- HOST = 'localhost'
- PORT = 9999
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- print('connecting...')
- s.connect((HOST, PORT))
- print('sending config...')
- s.send(struct.pack('>L', len(data_to_send)))
- s.send(data_to_send)
- s.close()
- print('complete')
处理日志 handler 的阻塞
有时你必须让日志记录处理程序的运行不会阻塞你要记录日志的线程。 这在 Web 应用程序中是很常见,当然在其他场景中也可能发生。
有一种原因往往会让程序表现迟钝,这就是 SMTPHandler
:由于很多因素是开发人员无法控制的(例如邮件或网络基础设施的性能不佳),发送电子邮件可能需要很长时间。不过几乎所有网络 handler 都可能会发生阻塞:即使是 SocketHandler
操作也可能在后台执行 DNS 查询,而这种查询实在太慢了(并且 DNS 查询还可能在很底层的套接字库代码中,位于 Python 层之下,超出了可控范围)。
有一种解决方案是分成两部分实现。第一部分,针对那些对性能有要求的关键线程,只为日志对象连接一个 QueueHandler
。日志对象只需简单地写入队列即可,可为队列设置足够大的容量,或者可以在初始化时不设置容量上限。尽管为以防万一,可能需要在代码中捕获 queue.Full
异常,不过队列写入操作通常会很快得以处理。如果要开发库代码,包含性能要求较高的线程,为了让使用该库的开发人员受益,请务必在开发文档中进行标明(包括建议仅连接 QueueHandlers
)。
解决方案的另一部分就是 QueueListener
,它被设计为 QueueHandler
的对应部分。QueueListener
非常简单:传入一个队列和一些 handler,并启动一个内部线程,用于侦听 QueueHandlers
(或其他 LogRecords
源)发送的 LogRecord 队列。LogRecords
会从队列中移除并传给 handler 处理。
QueueListener
作为单独的类,好处就是可以用同一个实例为多个 QueueHandlers
服务。这比把现有 handler 类线程化更加资源友好,后者会每个 handler 会占用一个线程,却没有特别的好处。
以下是这两个类的运用示例(省略了 import 语句):
- que = queue.Queue(-1) # 对大小没有限制
- queue_handler = QueueHandler(que)
- handler = logging.StreamHandler()
- listener = QueueListener(que, handler)
- root = logging.getLogger()
- root.addHandler(queue_handler)
- formatter = logging.Formatter('%(threadName)s: %(message)s')
- handler.setFormatter(formatter)
- listener.start()
- # 日志输出将显示生成事件的线程(主线程)
- # 而不是监控内部队列的内部线程。这也正是
- # 你所希望的。
- root.warning('Look out!')
- listener.stop()
在运行后会产生:
- MainThread: Look out!
备注
虽然前面的讨论没有专门提及异步代码,但需要注意当在异步代码中记录日志时,网络甚至文件处理器都可能会导致问题(阻塞事件循环)因为某些日志记录是在 asyncio
内部完成的。 如果在应用程序中使用了任何异步代码,最好的做法是使用上面的日志记录方式,这样任何阻塞式代码都将只在 QueueListener
线程中运行。
在 3.5 版本发生变更: 在 Python 3.5 之前,QueueListener
总会把由队列接收到的每条信息都传递给已初始化的每个处理程序。(因为这里假定级别过滤操作已在写入队列时完成了。)从 3.5 版开始,可以修改这种处理方式,只要将关键字参数 respect_handler_level=True
传给侦听器的构造函数即可。这样侦听器将会把每条信息的级别与 handler 的级别进行比较,只在适配时才会将信息传给 handler 。
通过网络收发日志事件
假定现在要通过网络发送日志事件,并在接收端进行处理。有一种简单的方案,就是在发送端的根日志对象连接一个 SocketHandler
实例:
- import logging, logging.handlers
- rootLogger = logging.getLogger('')
- rootLogger.setLevel(logging.DEBUG)
- socketHandler = logging.handlers.SocketHandler('localhost',
- logging.handlers.DEFAULT_TCP_LOGGING_PORT)
- # 不必设置格式化器,因为套接字处理器会将事件以未格式化的
- # pickle 形式发送
- rootLogger.addHandler(socketHandler)
- # 现在我们可以写入根记录器或任何其他记录器。 首先是根记录器...
- logging.info('Jackdaws love my big sphinx of quartz.')
- # 现在定义几个可以代表你的应用程序中不同组成部分的
- # 其他日志记录器:
- logger1 = logging.getLogger('myapp.area1')
- logger2 = logging.getLogger('myapp.area2')
- logger1.debug('Quick zephyrs blow, vexing daft Jim.')
- logger1.info('How quickly daft jumping zebras vex.')
- logger2.warning('Jail zesty vixen who grabbed pay from quack.')
- logger2.error('The five boxing wizards jump quickly.')
在接收端,可以用 socketserver
模块设置一个接收器。简要示例如下:
- import pickle
- import logging
- import logging.handlers
- import socketserver
- import struct
- class LogRecordStreamHandler(socketserver.StreamRequestHandler): """Handler for a streaming logging request.
- This basically logs the record using whatever logging policy is
- configured locally.
- """
- def handle(self): """
- Handle multiple requests - each expected to be a 4-byte length,
- followed by the LogRecord in pickle format. Logs the record
- according to whatever policy is configured locally.
- """
- while True:
- chunk = self.connection.recv(4)
- if len(chunk) < 4:
- break
- slen = struct.unpack('>L', chunk)[0]
- chunk = self.connection.recv(slen)
- while len(chunk) < slen:
- chunk = chunk + self.connection.recv(slen - len(chunk))
- obj = self.unPickle(chunk)
- record = logging.makeLogRecord(obj)
- self.handleLogRecord(record)
- def unPickle(self, data):
- return pickle.loads(data)
- def handleLogRecord(self, record):
- # 如果指定了名称,我们将使用指定的记录器而不是
- # record 原本使用的。
- if self.server.logname is not None:
- name = self.server.logname
- else:
- name = record.name
- logger = logging.getLogger(name)
- # 注意每条记录都会被写入。 这是因为 Logger.handle
- # 通常会在记录器层级过滤之后被调用。 如果你希望
- # 进行过滤,请在客户端结束时进行以避免浪费循环
- # 并节省网络带宽!
- logger.handle(record)
- class LogRecordSocketReceiver(socketserver.ThreadingTCPServer): """
- Simple TCP socket-based logging receiver suitable for testing.
- """
- allow_reuse_address = True
- def __init__(self, host='localhost',
- port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
- handler=LogRecordStreamHandler):
- socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
- self.abort = 0
- self.timeout = 1
- self.logname = None
- def serve_until_stopped(self):
- import select
- abort = 0
- while not abort:
- rd, wr, ex = select.select([self.socket.fileno()],
- [], [],
- self.timeout)
- if rd:
- self.handle_request()
- abort = self.abort
- def main():
- logging.basicConfig(
- format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
- tcpserver = LogRecordSocketReceiver()
- print('About to start TCP server...')
- tcpserver.serve_until_stopped()
- if __name__ == '__main__':
- main()
先运行服务端,再运行客户端。客户端控制台不会显示什么信息;在服务端应该会看到如下内容:
- About to start TCP server...
- 59 root INFO Jackdaws love my big sphinx of quartz.
- 59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
- 69 myapp.area1 INFO How quickly daft jumping zebras vex.
- 69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
- 69 myapp.area2 ERROR The five boxing wizards jump quickly.
请注意在某些情况下 pickle 会存在一些安全问题。 如果这些问题对你有影响,你可以换用自己的替代序列化方案,只要重写 makePickle()
方法并在其中实现你的替代方案,并调整上述脚本以使用这个替代方案。
在生产中运行日志套接字侦听器
要在生产环境中运行日志记录监听器,你可能需要使用一个进程管理工具如 Supervisor [http://supervisord.org/]。 这个 Gist [https://gist.github.com/vsajip/4b227eeec43817465ca835ca66f75e2b] 提供了使用 Supervisor 来运行上述功能的基本框架文件。 它由以下文件组成:
文件 | 目的 |
---|---|
prepare.sh
| 用于准备针对测试的环境的 Bash 脚本 |
supervisor.conf
| Supervisor 配置文件,其中有用于侦听器和多进程 Web 应用程序的条目 |
ensure_app.sh
| 用于确保 Supervisor 在使用上述配置运行的 Bash 脚本 |
log_listener.py
| 接收日志事件并将其记录到文件中的套接字监听器 |
main.py
| 一个通过连接到监听器的套接字来执行日志记录的简单 Web 应用程序 |
webapp.json
| 一个针对 Web 应用程序的 JSON 配置文件 |
client.py
| 使用 Web 应用程序的 Python 脚本 |
该 Web 应用程序使用了 Gunicorn [https://gunicorn.org/],这个流行的 Web 应用服务器可启动多个工作进程来处理请求。 这个示例设置演示了多个工作进程是如何写入相同的日志文件而不会相互冲突的 —- 它们都通过套接字监听器进程操作。
要测试这些文件,请在 POSIX 环境中执行以下操作:
使用 Download ZIP 按钮将 此 Gist [https://gist.github.com/vsajip/4b227eeec43817465ca835ca66f75e2b] 下载为 ZIP 归档文件。
将上述文件从归档解压缩到一个初始目录中。
在初始目录中,运行
bash prepare.sh
完成准备工作。 这将创建一个run
子目录来包含 Supervisor 相关文件和日志文件,以及一个venv
子目录来包含安装了bottle
,gunicorn
和supervisor
的虚拟环境。运行
bash ensure_app.sh
以确保 Supervisor 正在使用上述配置运行。运行
venv/bin/python client.py
来使用 Web 应用程序,这将使得记录被写入到日志中。检查
run
子目录中的日志文件。 你应当看到匹配模式为app.log*
的文件中最新的日志记录行。 它们不会有任何特定的顺序,因为它们是由不同的工作进程以不确定的方式并发地处理的。你可以通过运行
venv/bin/supervisorctl -c supervisor.conf shutdown
来关闭监听器和 Web 应用程序。
你可能需要在配置的端口与你的测试环境中其他程序发生意外冲突的情况下调整配置文件。
在自己的输出日志中添加上下文信息
有时,除了调用日志对象时传入的参数之外,还希望日志输出中能包含上下文信息。 比如在网络应用程序中,可能需要在日志中记录某客户端的信息(如远程客户端的用户名或 IP 地址)。 这虽然可以用 extra 参数实现,但传递起来并不总是很方便。 虽然为每个网络连接都创建 Logger
实例貌似不错,但并不是个好主意,因为这些实例不会被垃圾回收。 虽然在实践中不是问题,但当 Logger
实例的数量取决于应用程序要采用的日志粒度时,如果 Logger
实例的数量实际上是无限的,则有可能难以管理。
利用 LoggerAdapter 传递上下文信息
要传递上下文信息和日志事件信息,有一种简单方案是利用 LoggerAdapter
类。这个类设计得类似 Logger
,所以可以直接调用 debug()
、info()
、 warning()
、 error()
、exception()
、 critical()
和 log()
。这些方法的签名与 Logger
对应的方法相同,所以这两类实例可以交换使用。
当你创建一个 LoggerAdapter
的实例时,你会传入一个 Logger
的实例和一个包含了上下文信息的字典对象。当你调用一个 LoggerAdapter
实例的方法时,它会把调用委托给内部的 Logger
的实例,并为其整理相关的上下文信息。这是 LoggerAdapter
的一个代码片段:
- def debug(self, msg, /, *args, **kwargs): """
- 在添加来自这个适配器实例的上下文信息之后,
- 将调试调用委托给下层的日志记录器。
- """
- msg, kwargs = self.process(msg, kwargs)
- self.logger.debug(msg, *args, **kwargs)
LoggerAdapter
的 process()
方法是将上下文信息添加到日志的输出中。 它传入日志消息和日志调用的关键字参数,并传回(隐式的)这些修改后的内容去调用底层的日志记录器。此方法的默认参数只是一个消息字段,但留有一个 'extra' 的字段作为关键字参数传给构造器。当然,如果你在调用适配器时传入了一个 'extra' 字段的参数,它会被静默覆盖。
使用 'extra' 的优点是这些键值对会被传入 LogRecord
实例的 dict 中,让你通过 Formatter
的实例直接使用定制的字符串,实例能找到这个字典类对象的键。 如果你需要一个其他的方法,比如说,想要在消息字符串前后增加上下文信息,你只需要创建一个 LoggerAdapter
的子类,并覆盖它的 process()
方法来做你想做的事情,以下是一个简单的示例:
- class CustomAdapter(logging.LoggerAdapter): """
- This example adapter expects the passed in dict-like object to have a
- 'connid' key, whose value in brackets is prepended to the log message.
- """
- def process(self, msg, kwargs):
- return '[%s] %s' % (self.extra['connid'], msg), kwargs
你可以这样使用:
- logger = logging.getLogger(__name__)
- adapter = CustomAdapter(logger, {'connid': some_conn_id})
然后,你记录在适配器中的任何事件消息前将添加 some_conn_id
的值。
使用除字典之外的其它对象传递上下文信息
你不需要将一个实际的字典传递给 LoggerAdapter
-你可以传入一个实现了 __getitem__
和 __iter__
的类的实例,这样它就像是一个字典。这对于你想动态生成值(而字典中的值往往是常量)将很有帮助。
使用过滤器传递上下文信息
你也可以使用一个用户定义的类 Filter
在日志输出中添加上下文信息。Filter
的实例是被允许修改传入的 LogRecords
,包括添加其他的属性,然后可以使用合适的格式化字符串输出,或者可以使用一个自定义的类 Formatter
。
例如,在一个web应用程序中,正在处理的请求(或者至少是请求的一部分),可以存储在一个线程本地 (threading.local
) 变量中,然后从 Filter
中去访问。请求中的信息,如IP地址和用户名将被存储在 LogRecord
中,使用上例 LoggerAdapter
中的 'ip' 和 'user' 属性名。在这种情况下,可以使用相同的格式化字符串来得到上例中类似的输出结果。这是一段示例代码:
- import logging
- from random import choice
- class ContextFilter(logging.Filter): """
- This is a filter which injects contextual information into the log.
- Rather than use actual contextual information, we just use random
- data in this demo.
- """
- USERS = ['jim', 'fred', 'sheila']
- IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
- def filter(self, record):
- record.ip = choice(ContextFilter.IPS)
- record.user = choice(ContextFilter.USERS)
- return True
- if __name__ == '__main__':
- levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
- logging.basicConfig(level=logging.DEBUG,
- format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
- a1 = logging.getLogger('a.b.c')
- a2 = logging.getLogger('d.e.f')
- f = ContextFilter()
- a1.addFilter(f)
- a2.addFilter(f)
- a1.debug('A debug message')
- a1.info('An info message with %s', 'some parameters')
- for x in range(10):
- lvl = choice(levels)
- lvlname = logging.getLevelName(lvl)
- a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
在运行时,产生如下内容:
- 2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
- 2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
- 2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
- 2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
- 2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
- 2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
- 2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
- 2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
- 2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
- 2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
- 2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
- 2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
contextvars
的使用
自 Python 3.7 起,contextvars
模块提供了同时适用于 threading
和 asyncio
处理需求的上下文本地存储。 因此这种存储类型通常要比线程本地存储更好。 下面的例子演示了在多线程环境中日志如何用上下文信息来填充内容,例如 Web 应用程序所处理的请求属性。
出于说明的目的,比方说你有几个不同的 Web 应用程序,彼此都保持独立状态但运行在同一个 Python 进程中并且它们共同使用了某个库。 这些应用程序要如何拥有各自的日志记录,其中来自这个库的日志消息(以及其他请求处理代码)会发到对应的应用程序的日志文件,同时在日志中包括额外的上下文信息如客户端 IP、HTTP 请求方法和客户端用户名呢?
让我们假定这个库可以通过以下代码来模拟:
- # webapplib.py
- import logging
- import time
- logger = logging.getLogger(__name__)
- def useful():
- # 一条从库中记录的代表性事件
- logger.debug('Hello from webapplib!')
- # 休眠一下以便其他线程能够运行
- time.sleep(0.01)
我们可以通过两个简单的类 Request
和 WebApp
来模拟多个 Web 应用程序。 它们模拟了真正的多线程 Web 应用程序是如何工作的 —— 每个请求均由单独的线程来处理:
- # main.py
- import argparse
- from contextvars import ContextVar
- import logging
- import os
- from random import choice
- import threading
- import webapplib
- logger = logging.getLogger(__name__)
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- class Request: """
- A simple dummy request class which just holds dummy HTTP request method,
- client IP address and client username
- """
- def __init__(self, method, ip, user):
- self.method = method
- self.ip = ip
- self.user = user
- # 将在模拟中使用的一组假请求 —— 我们将从这个列表随机选取。
- # 请注意所有 GET 请求都来自 192.168.2.XXX 地址,
- # 而 POST 请求都来自 192.16.3.XXX 地址。
- # 在这些样例请求中有三个用户。
- REQUESTS = [
- Request('GET', '192.168.2.20', 'jim'),
- Request('POST', '192.168.3.20', 'fred'),
- Request('GET', '192.168.2.21', 'sheila'),
- Request('POST', '192.168.3.21', 'jim'),
- Request('GET', '192.168.2.22', 'fred'),
- Request('POST', '192.168.3.22', 'sheila'),
- ]
- # 请注意格式字符串包括了对请求上下文信息的引用
- # 如 HTTP 方法,客户端 IP 和用户名
- formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')
- # 创建我们的上下文变量。 它们将在开始处理请求时被填充,
- # 并将在处理时发生的日志记录中被使用。
- ctx_request = ContextVar('request')
- ctx_appname = ContextVar('appname')
- class InjectingFilter(logging.Filter): """
- A filter which injects context-specific information into logs and ensures
- that only information for a specific webapp is included in its log
- """
- def __init__(self, app):
- self.app = app
- def filter(self, record):
- request = ctx_request.get()
- record.method = request.method
- record.ip = request.ip
- record.user = request.user
- record.appName = appName = ctx_appname.get()
- return appName == self.app.name
- class WebApp: """
- A dummy web application class which has its own handler and filter for a
- webapp-specific log.
- """
- def __init__(self, name):
- self.name = name
- handler = logging.FileHandler(name + '.log', 'w')
- f = InjectingFilter(self)
- handler.setFormatter(formatter)
- handler.addFilter(f)
- root.addHandler(handler)
- self.num_requests = 0
- def process_request(self, request): """
- This is the dummy method for processing a request. It's called on a
- different thread for every request. We store the context information into
- the context vars before doing anything else.
- """
- ctx_request.set(request)
- ctx_appname.set(self.name)
- self.num_requests += 1
- logger.debug('Request processing started')
- webapplib.useful()
- logger.debug('Request processing finished')
- def main():
- fn = os.path.splitext(os.path.basename(__file__))[0]
- adhf = argparse.ArgumentDefaultsHelpFormatter
- ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
- description='Simulate a couple of web '
- 'applications handling some '
- 'requests, showing how request '
- 'context can be used to '
- 'populate logs')
- aa = ap.add_argument
- aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
- options = ap.parse_args()
- # 创建假 Web 应用并将其放在列表中以便我们
- # 用于随机选取
- app1 = WebApp('app1')
- app2 = WebApp('app2')
- apps = [app1, app2]
- threads = []
- # 添加一个将捕获所有事件的通用处理器
- handler = logging.FileHandler('app.log', 'w')
- handler.setFormatter(formatter)
- root.addHandler(handler)
- # 生成调用来处理请求
- for i in range(options.count):
- try:
- # Pick an app at random and a request for it to process
- app = choice(apps)
- request = choice(REQUESTS)
- # Process the request in its own thread
- t = threading.Thread(target=app.process_request, args=(request,))
- threads.append(t)
- t.start()
- except KeyboardInterrupt:
- break
- # 等待线程终结
- for t in threads:
- t.join()
- for app in apps:
- print('%s processed %s requests' % (app.name, app.num_requests))
- if __name__ == '__main__':
- main()
如果你运行上面的代码,你将会发现约有半数请求是发给 app1.log
而其余的则是发给 app2.log
,并且所有请求都会被记录至 app.log
。 每个 Web 应用专属的日志将只包含该 Web 应用的日志条目,请求信息也将以一致的方式显示在日志里(即每个模拟请求中的信息将总是在一个日志行中一起显示)。 如下面的 shell 输出所示:
- ~/logging-contextual-webapp$ python main.py
- app1 processed 51 requests
- app2 processed 49 requests
- ~/logging-contextual-webapp$ wc -l *.log 153 app1.log 147 app2.log 300 app.log 600 total
- ~/logging-contextual-webapp$ head -3 app1.log
- Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
- Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
- Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
- ~/logging-contextual-webapp$ head -3 app2.log
- Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
- Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
- Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
- ~/logging-contextual-webapp$ head app.log
- Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
- Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
- Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
- Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
- Thread-2 (process_request) app2 webapplib jim 192.168.2.20 GET Hello from webapplib!
- Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
- Thread-4 (process_request) app2 __main__ fred 192.168.2.22 GET Request processing started
- Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
- Thread-4 (process_request) app2 webapplib fred 192.168.2.22 GET Hello from webapplib!
- Thread-6 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
- ~/logging-contextual-webapp$ grep app1 app1.log | wc -l
- 153
- ~/logging-contextual-webapp$ grep app2 app2.log | wc -l
- 147
- ~/logging-contextual-webapp$ grep app1 app.log | wc -l
- 153
- ~/logging-contextual-webapp$ grep app2 app.log | wc -l
- 147
在处理器中传递上下文信息
每个 Handler
都有自己的过滤器链。 如果你想向一个 LogRecord
添加上下文信息而不使其泄露给其它处理器,你可以使用一个返回新 LogRecord
而不是原地修改它的过滤器,如下面的脚本所示:
- import copy
- import logging
- def filter(record: logging.LogRecord):
- record = copy.copy(record)
- record.user = 'jim'
- return record
- if __name__ == '__main__':
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(message)s from %(user)-8s')
- handler.setFormatter(formatter)
- handler.addFilter(filter)
- logger.addHandler(handler)
- logger.info('A log message')
从多个进程记录至单个文件
尽管 logging 是线程安全的,将单个进程中的多个线程日志记录至单个文件也 是 受支持的,但将 多个进程 中的日志记录至单个文件则 不是 受支持的,因为在 Python 中并没有在多个进程中实现对单个文件访问的序列化的标准方案。 如果你需要将多个进程中的日志记录至单个文件,有一个方案是让所有进程都将日志记录至一个 SocketHandler
,然后用一个实现了套接字服务器的单独进程一边从套接字中读取一边将日志记录至文件。 (如果愿意的话,你可以在一个现有进程中专门开一个线程来执行此项功能。) 这一部分 文档对此方式有更详细的介绍,并包含一个可用的套接字接收器,你自己的应用可以在此基础上进行适配。
你也可以编写你自己的处理器,让其使用 multiprocessing
模块中的 Lock
类来顺序访问你的多个进程中的文件。 标准库的 FileHandler
及其子类均未使用 multiprocessing
。
或者,你也可以使用 Queue
和 QueueHandler
将所有的日志事件发送至你的多进程应用的一个进程中。 以下示例脚本演示了如何执行此操作。 在示例中,一个单独的监听进程负责监听其他进程的日志事件,并根据自己的配置记录。 尽管示例只演示了这种方法(例如你可能希望使用单独的监听线程而非监听进程 —— 它们的实现是类似的),但你也可以在应用程序的监听进程和其他进程使用不同的配置,它可以作为满足你特定需求的一个基础:
- # 你将在自己的代码中需要这些导入
- import logging
- import logging.handlers
- import multiprocessing
- # 以下两行导入仅针对本演示
- from random import choice, random
- import time
- #
- # 因为你会希望为监听进程和工作进程定义日志记录配置,
- # 这些进程函数将接受一个可调用对象作为 configurer 形参
- # 用于为进程配置日志记录。 这些函数还将接受一个队列,
- # 供它们在通信中使用。
- #
- # 实际上,你可以根据你的需要任意配置监听进程,但请注意在
- # 该简单示例中监听进程没有对收到的记录应用层级或过滤逻辑。
- # 在实践中,你可能会希望在工作进程中执行此逻辑,以避免发送
- # 将会在进程间被过滤掉的事件。
- #
- # 轮转文件的尺寸被设置为很小以便你能方便地查看结果。
- def listener_configurer():
- root = logging.getLogger()
- h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
- f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
- h.setFormatter(f)
- root.addHandler(h)
- # 这是监听进程的最高层级循环:等待队列中的日志记录事件
- # (LogRecords) 并处理它们,当在接受 LogRecord 时收到 None
- # 则退出。
- def listener_process(queue, configurer):
- configurer()
- while True:
- try:
- record = queue.get()
- if record is None: # 我们发送该值以通知监听进程退出。
- break
- logger = logging.getLogger(record.name)
- logger.handle(record) # 未应用层级或过滤逻辑 —— 直接做! except Exception:
- import sys, traceback
- print('Whoops! Problem:', file=sys.stderr)
- traceback.print_exc(file=sys.stderr)
- # 用于在本演示中随机选取的数组
- LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
- logging.ERROR, logging.CRITICAL]
- LOGGERS = ['a.b.c', 'd.e.f']
- MESSAGES = [
- 'Random message #1',
- 'Random message #2',
- 'Random message #3',
- ]
- # 工作进程配置在工作进程开始运行时完成。
- # 请注意在 Windows 上不能依赖 fork 语义,因此每个进程
- # 将在启动时运行日志记录配置代码。
- def worker_configurer(queue):
- h = logging.handlers.QueueHandler(queue) # 只需要一个处理器
- root = logging.getLogger()
- root.addHandler(h)
- # 发送所有消息,用于演示;未应用其他层级或过滤逻辑。
- root.setLevel(logging.DEBUG)
- # 这是工作进程的最高层级循环,它将在结束前以随机间隔
- # 记录十个事件。
- # 打印消息只是让你知道它正在做一些事情!
- def worker_process(queue, configurer):
- configurer(queue)
- name = multiprocessing.current_process().name
- print('Worker started: %s' % name)
- for i in range(10):
- time.sleep(random())
- logger = logging.getLogger(choice(LOGGERS))
- level = choice(LEVELS)
- message = choice(MESSAGES)
- logger.log(level, message)
- print('Worker finished: %s' % name)
- # 以下是演示整合各个组件的地方。 创建队列,创建并启动
- # 监听进程,创建十个工作进程并启动它们,等待它们结束,
- # 然后向队列发送 None 以通知监听进程退出。
- def main():
- queue = multiprocessing.Queue(-1)
- listener = multiprocessing.Process(target=listener_process,
- args=(queue, listener_configurer))
- listener.start()
- workers = []
- for i in range(10):
- worker = multiprocessing.Process(target=worker_process,
- args=(queue, worker_configurer))
- workers.append(worker)
- worker.start()
- for w in workers:
- w.join()
- queue.put_nowait(None)
- listener.join()
- if __name__ == '__main__':
- main()
上面脚本的一个变种,仍然在主进程中记录日志,但使用一个单独的线程:
- import logging
- import logging.config
- import logging.handlers
- from multiprocessing import Process, Queue
- import random
- import threading
- import time
- def logger_thread(q):
- while True:
- record = q.get()
- if record is None:
- break
- logger = logging.getLogger(record.name)
- logger.handle(record)
- def worker_process(q):
- qh = logging.handlers.QueueHandler(q)
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- root.addHandler(qh)
- levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
- logging.CRITICAL]
- loggers = ['foo', 'foo.bar', 'foo.bar.baz',
- 'spam', 'spam.ham', 'spam.ham.eggs']
- for i in range(100):
- lvl = random.choice(levels)
- logger = logging.getLogger(random.choice(loggers))
- logger.log(lvl, 'Message no. %d', i)
- if __name__ == '__main__':
- q = Queue()
- d = {
- 'version': 1,
- 'formatters': {
- 'detailed': {
- 'class': 'logging.Formatter',
- 'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
- }
- },
- 'handlers': {
- 'console': {
- 'class': 'logging.StreamHandler',
- 'level': 'INFO',
- },
- 'file': {
- 'class': 'logging.FileHandler',
- 'filename': 'mplog.log',
- 'mode': 'w',
- 'formatter': 'detailed',
- },
- 'foofile': {
- 'class': 'logging.FileHandler',
- 'filename': 'mplog-foo.log',
- 'mode': 'w',
- 'formatter': 'detailed',
- },
- 'errors': {
- 'class': 'logging.FileHandler',
- 'filename': 'mplog-errors.log',
- 'mode': 'w',
- 'level': 'ERROR',
- 'formatter': 'detailed',
- },
- },
- 'loggers': {
- 'foo': {
- 'handlers': ['foofile']
- }
- },
- 'root': {
- 'level': 'DEBUG',
- 'handlers': ['console', 'file', 'errors']
- },
- }
- workers = []
- for i in range(5):
- wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
- workers.append(wp)
- wp.start()
- logging.config.dictConfig(d)
- lp = threading.Thread(target=logger_thread, args=(q,))
- lp.start()
- # 在这里,主进程可以执行某些对它自己有用的工作
- # 当其完成后,即可等待工作进程终结...
- for wp in workers:
- wp.join()
- # 现在再通知日志记录线程结束
- q.put(None)
- lp.join()
这段变种的代码展示了如何使用特定的日志记录配置 - 例如 foo
记录器使用了特殊的处理程序,将 foo
子系统中所有的事件记录至一个文件 mplog-foo.log
。在主进程(即使是在工作进程中产生的日志事件)的日志记录机制中将直接使用恰当的配置。
concurrent.futures.ProcessPoolExecutor 的用法
若要利用 concurrent.futures.ProcessPoolExecutor
启动工作进程,创建队列的方式应稍有不同。不能是:
- queue = multiprocessing.Queue(-1)
而应是:
- queue = multiprocessing.Manager().Queue(-1) # 同样适用于上面的例子
然后就可以将以下工作进程的创建过程:
- workers = []
- for i in range(10):
- worker = multiprocessing.Process(target=worker_process,
- args=(queue, worker_configurer))
- workers.append(worker)
- worker.start()
- for w in workers:
- w.join()
改为 (记得要先导入 concurrent.futures
):
- with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
- for i in range(10):
- executor.submit(worker_process, queue, worker_configurer)
使用 Gunicorn 和 uWSGI 来部署 Web 应用程序
当使用 Gunicorn [https://gunicorn.org/] 或 uWSGI [https://uwsgi-docs.readthedocs.io/en/latest/] (或其他类似工具) 来部署 Web 应用时,会创建多个工作进程来处理客户端请求。 在这种环境下,要避免在你的 Web 应用中直接创建基于文件的处理器。 而应改为使用一个 SocketHandler
将来自 Web 应用的日志发送到在单独进程中运行的监听器。 这可以通过使用一个进程管理工具例如 Supervisor 来进行设置 —— 请参阅 Running a logging socket listener in production 了解详情。
轮换日志文件
有时您会希望让日志文件增长到一定大小,然后打开一个新的接着记录日志。 您可能希望只保留一定数量的日志文件,当创建文件达到指定数量后将会轮换文件,从而使文件数量和文件大小都保持在一定范围之内。 对于这种使用模式,日志包提供了一个 RotatingFileHandler
:
- import glob
- import logging
- import logging.handlers
- LOG_FILENAME = 'logging_rotatingfile_example.out'
- # 使用我们想要的输出层级设置特定的日志记录器
- my_logger = logging.getLogger('MyLogger')
- my_logger.setLevel(logging.DEBUG)
- # 将日志消息处理器添加到日志记录器
- handler = logging.handlers.RotatingFileHandler(
- LOG_FILENAME, maxBytes=20, backupCount=5)
- my_logger.addHandler(handler)
- # 记录一些消息
- for i in range(20):
- my_logger.debug('i = %d' % i)
- # 查看创建了哪些文件
- logfiles = glob.glob('%s*' % LOG_FILENAME)
- for filename in logfiles:
- print(filename)
结果应该是6个单独的文件,每个文件都包含了应用程序的部分历史日志:
- logging_rotatingfile_example.out
- logging_rotatingfile_example.out.1
- logging_rotatingfile_example.out.2
- logging_rotatingfile_example.out.3
- logging_rotatingfile_example.out.4
- logging_rotatingfile_example.out.5
最新的文件始终是 logging_rotatingfile_example.out
,每次到达大小限制时,都会使用后缀 .1
重命名。每个现有的备份文件都会被重命名并增加其后缀(例如 .1
变为 .2
),而 .6
文件会被删除掉。
显然,这个例子将日志长度设置得太小,这是一个极端的例子。 你可能希望将 maxBytes 设置为一个合适的值。
使用其他日志格式化方式
当日志模块被添加至 Python 标准库时,只有一种格式化消息内容的方法即 %-formatting。 在那之后,Python 又增加了两种格式化方法: string.Template
(在 Python 2.4 中新增) 和 str.format()
(在 Python 2.6 中新增)。
日志(从 3.2 开始)为这两种格式化方式提供了更多支持。Formatter
类可以添加一个额外的可选关键字参数 style
。它的默认值是 '%'
,其他的值 '{'
和 '$'
也支持,对应了其他两种格式化样式。其保持了向后兼容(如您所愿),但通过显示指定样式参数,你可以指定格式化字符串的方式是使用 str.format()
或 string.Template
。 这里是一个控制台会话的示例,展示了这些方式:
- >>> import logging
- >>> root = logging.getLogger()
- >>> root.setLevel(logging.DEBUG)
- >>> handler = logging.StreamHandler()
- >>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
- ... style='{')
- >>> handler.setFormatter(bf)
- >>> root.addHandler(handler)
- >>> logger = logging.getLogger('foo.bar')
- >>> logger.debug('This is a DEBUG message')
- 2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
- >>> logger.critical('This is a CRITICAL message')
- 2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
- >>> df = logging.Formatter('$asctime $name ${levelname} $message',
- ... style='$')
- >>> handler.setFormatter(df)
- >>> logger.debug('This is a DEBUG message')
- 2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
- >>> logger.critical('This is a CRITICAL message')
- 2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
- >>>
请注意最终输出到日志的消息格式完全独立于单条日志消息的构造方式。 它仍然可以使用 %-formatting,如下所示:
- >>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
- 2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
- >>>
日志调用(logger.debug()
、logger.info()
等)接受的位置参数只会用于日志信息本身,而关键字参数仅用于日志调用的可选处理参数(如关键字参数 exc_info
表示应记录跟踪信息, extra
则标识了需要加入日志的额外上下文信息)。所以不能直接用 str.format()
或 string.Template
语法进行日志调用,因为日志包在内部使用 %-f 格式来合并格式串和参数变量。在保持向下兼容性时,这一点不会改变,因为已有代码中的所有日志调用都会使用%-f 格式串。
还有一种方法可以构建自己的日志信息,就是利用 {}- 和 $- 格式。回想一下,任意对象都可用为日志信息的格式串,日志包将会调用该对象的 str()
方法,以获取最终的格式串。不妨看下一下两个类:
- class BraceMessage:
- def __init__(self, fmt, /, *args, **kwargs):
- self.fmt = fmt
- self.args = args
- self.kwargs = kwargs
- def __str__(self):
- return self.fmt.format(*self.args, **self.kwargs)
- class DollarMessage:
- def __init__(self, fmt, /, **kwargs):
- self.fmt = fmt
- self.kwargs = kwargs
- def __str__(self):
- from string import Template
- return Template(self.fmt).substitute(**self.kwargs)
上述两个类均可代替格式串,使得能用 {}- 或 $-formatting 构建最终的“日志信息”部分,这些信息将出现在格式化后的日志输出中,替换 %(message)s 或“{message}”或“$message”。每次写入日志时都要使用类名,有点不大实用,但如果用上 _ 之类的别名就相当合适了(双下划线 —- 不要与 混淆,单下划线用作 gettext.gettext()
或相关函数的同义词/别名 )。
Python 并没有上述两个类,当然复制粘贴到自己的代码中也很容易。用法可如下所示(假定在名为 wherever
的模块中声明):
- >>> from wherever import BraceMessage as __
- >>> print(__('Message with {0} {name}', 2, name='placeholders'))
- Message with 2 placeholders
- >>> class Point: pass
- ...
- >>> p = Point()
- >>> p.x = 0.5
- >>> p.y = 0.5
- >>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
- ... point=p))
- Message with coordinates: (0.50, 0.50)
- >>> from wherever import DollarMessage as __
- >>> print(__('Message with $num $what', num=2, what='placeholders'))
- Message with 2 placeholders
- >>>
上述示例用了 print()
演示格式化输出的过程,实际记录日志时当然会用类似 logger.debug()
的方法来应用。
需要注意的是使用这种方式不会对性能造成明显影响:实际的格式化工作不是在日志记录调用时发生的,而是在(如果)处理器即将把日志消息输出到日志时发生的。 因此,唯一可能令人困惑的不寻常之处在于包裹在格式字符串和参数外面的圆括号,而不仅仅是格式字符串。 这是因为 __ 标记只是对 XXXMessage
类的构造器的调用的语法糖。
只要愿意,上述类似的效果即可用 LoggerAdapter
实现,如下例所示:
- import logging
- class Message:
- def __init__(self, fmt, args):
- self.fmt = fmt
- self.args = args
- def __str__(self):
- return self.fmt.format(*self.args)
- class StyleAdapter(logging.LoggerAdapter):
- def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
- if self.isEnabledFor(level):
- msg, kwargs = self.process(msg, kwargs)
- self.logger.log(level, Message(msg, args), **kwargs,
- stacklevel=stacklevel+1)
- logger = StyleAdapter(logging.getLogger(__name__))
- def main():
- logger.debug('Hello, {}', 'world!')
- if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- main()
在用 Python 3.8 以上版本运行时上述脚本应该会将消息 Hello, world!
写入日志。
自定义 LogRecord
每条日志事件都由一个 LogRecord
实例表示。当某事件要记入日志并且没有被某级别过滤掉时,就会创建一个 LogRecord
对象,并将有关事件的信息填入,传给该日志对象的 handler(及其祖先,直至对象禁止向上传播为止)。在 Python 3.2 之前,只有两个地方会进行事件的创建:
Logger.makeRecord()
,在事件正常记入日志的过程中调用。这会直接调用LogRecord
来创建一个实例。makeLogRecord()
,调用时会带上一个字典参数,其中存放着要加入 LogRecord 的属性。这通常在通过网络接收到合适的字典时调用(如通过SocketHandler
以 pickle 形式,或通过HTTPHandler
以 JSON 形式)。
于是这意味着若要对 LogRecord
进行定制,必须进行下述某种操作。
创建
Logger
自定义子类,重写Logger.makeRecord()
,并在实例化所需日志对象之前用setLoggerClass()
进行设置。
比如说在有多个不同库要完成不同操作的场景下,第一种方式会有点笨拙。 每次都要尝试设置自己的 Logger
子类,而起作用的是最后一次尝试。
第二种方式在多数情况下效果都比较良好,但不允许你使用特殊化的 LogRecord
子类。 库开发者可以为他们的日志记录器设置合适的过滤器,但他们应当要记得每次引入新的日志记录器时都需如此(他们只需通过添加新的包或模块并执行以下操作即可):
- logger = logging.getLogger(__name__)
或许这样要顾及太多事情。开发人员还可以将过滤器附加到其顶级日志对象的 NullHandler
中,但如果应用程序开发人员将 handler 附加到较底层库的日志对象,则不会调用该过滤器 —- 所以 handler 输出的内容不会符合库开发人员的预期。
在 Python 3.2 以上版本中,LogRecord
的创建是通过工厂对象完成的,工厂对象可以指定。工厂对象只是一个可调用对象,可以用 setLogRecordFactory()
进行设置,并用 getLogRecordFactory()
进行查询。工厂对象的调用参数与 LogRecord
的构造函数相同,因为 LogRecord
是工厂对象的默认设置。
这种方式可以让自定义工厂对象完全控制 LogRecord 的创建过程。比如可以返回一个子类,或者在创建的日志对象中加入一些额外的属性,使用方式如下所示:
- old_factory = logging.getLogRecordFactory()
- def record_factory(*args, **kwargs):
- record = old_factory(*args, **kwargs)
- record.custom_attribute = 0xdecafbad
- return record
- logging.setLogRecordFactory(record_factory)
这种模式允许不同的库将多个工厂对象链在一起,只要不会覆盖彼此的属性或标准属性,就不会出现意外。但应记住,工厂链中的每个节点都会增加日志操作的运行开销,本技术仅在采用 Filter
无法达到目标时才应使用。
子类化 QueueHandler 和 QueueListener - ZeroMQ 示例
子类 QueueHandler
你可以使用 QueueHandler
子类将消息发送给其他类型的队列 ,比如 ZeroMQ 'publish' 套接字。 在以下示例中,套接字将单独创建并传给处理器 (作为它的 'queue'):
- import zmq # 使用 pyzmq,这是 ZeroMQ 的 Python 绑定
- import json # 用于可移植地对记录进行序列化
- ctx = zmq.Context()
- sock = zmq.Socket(ctx, zmq.PUB) # 或 zmq.PUSH,或其他适当的值
- sock.bind('tcp://*:5556') # 或任何值
- class ZeroMQSocketHandler(QueueHandler):
- def enqueue(self, record):
- self.queue.send_json(record.__dict__)
- handler = ZeroMQSocketHandler(sock)
当然还有其他方案,比如通过 hander 传入所需数据,以创建 socket:
- class ZeroMQSocketHandler(QueueHandler):
- def __init__(self, uri, socktype=zmq.PUB, ctx=None):
- self.ctx = ctx or zmq.Context()
- socket = zmq.Socket(self.ctx, socktype)
- socket.bind(uri)
- super().__init__(socket)
- def enqueue(self, record):
- self.queue.send_json(record.__dict__)
- def close(self):
- self.queue.close()
子类 QueueListener
你还可以子类化 QueueListener
来从其他类型的队列中获取消息,比如从 ZeroMQ 'subscribe' 套接字。 下面是一个例子:
- class ZeroMQSocketListener(QueueListener):
- def __init__(self, uri, /, *handlers, **kwargs):
- self.ctx = kwargs.get('ctx') or zmq.Context()
- socket = zmq.Socket(self.ctx, zmq.SUB)
- socket.setsockopt_string(zmq.SUBSCRIBE, '') # 全部预订
- socket.connect(uri)
- super().__init__(socket, *handlers, **kwargs)
- def dequeue(self):
- msg = self.queue.recv_json()
- return logging.makeLogRecord(msg)
子类化 QueueHandler 和 QueueListener - pynng
示例
通过与上一节类似的方式,我们可以使用 pynng [https://pypi.org/project/pynng/] 来实现监听器和处理器,这个包是针对 NNG [https://nng.nanomsg.org/] 的 Python 绑定,它被确定为 ZeroMQ 的精神后继者。 以下代码片段被用作演示 — 你可以在安装了 pynng
的环境中测试它们。 为增加变化,我们先编写监听器。
子类 QueueListener
- # listener.py
- import json
- import logging
- import logging.handlers
- import pynng
- DEFAULT_ADDR = "tcp://localhost:13232"
- interrupted = False
- class NNGSocketListener(logging.handlers.QueueListener):
- def __init__(self, uri, /, *handlers, **kwargs):
- # 设置超时以允许中断,并打开一个
- # 订阅方套接字
- socket = pynng.Sub0(listen=uri, recv_timeout=500)
- # b'' 订阅将匹配所有主题
- topics = kwargs.pop('topics', None) or b''
- socket.subscribe(topics)
- # 我们将套接字视为一个队列
- super().__init__(socket, *handlers, **kwargs)
- def dequeue(self, block):
- data = None
- # 在未被打断且未从套接字接收数据时
- # 保持循环
- while not interrupted:
- try:
- data = self.queue.recv(block=block)
- break
- except pynng.Timeout:
- pass
- except pynng.Closed: # 会在你按下 Ctrl-C 时发生
- break
- if data is None:
- return None
- # 获取从发布方发送的日志记录事件
- event = json.loads(data.decode('utf-8'))
- return logging.makeLogRecord(event)
- def enqueue_sentinel(self):
- # 在本实现中未被使用,因为套接字并不是
- # 真正的队列
- pass
- logging.getLogger('pynng').propagate = False
- listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
- listener.start()
- print('Press Ctrl-C to stop.')
- try:
- while True:
- pass
- except KeyboardInterrupt:
- interrupted = True
- finally:
- listener.stop()