- 子类
QueueHandler
- 基于字典进行日志配置的示例
- 利用 rotator 和 namer 自定义日志轮换操作
- 更加详细的多道处理示例
- 在发送给 SysLogHandler 的信息中插入一个 BOM。
- 结构化日志的实现代码
- 利用
dictConfig()
自定义 handler - 生效于整个应用程序的格式化样式
- 利用
dictConfig()
定义过滤器 - 异常信息的自定义格式化
- 语音播报日志信息
- 缓冲日志消息并有条件地输出它们
- 将日志消息发送至电子邮件,附带缓存支持
- 通过配置使用UTC (GMT) 格式化时间
- 使用上下文管理器的可选的日志记录
- 命令行日志应用起步
- Qt GUI 日志示例
- 将日志记录到带有 RFC5424 支持的 syslog
- 如何将日志记录器作为输出流
- 理应避免的用法
- 其他资源
子类 QueueHandler
- # sender.py
- import json
- import logging
- import logging.handlers
- import time
- import random
- import pynng
- DEFAULT_ADDR = "tcp://localhost:13232"
- class NNGSocketHandler(logging.handlers.QueueHandler):
- def __init__(self, uri):
- socket = pynng.Pub0(dial=uri, send_timeout=500)
- super().__init__(socket)
- def enqueue(self, record):
- # Send the record as UTF-8 encoded JSON
- d = dict(record.__dict__)
- data = json.dumps(d)
- self.queue.send(data.encode('utf-8'))
- def close(self):
- self.queue.close()
- logging.getLogger('pynng').propagate = False
- handler = NNGSocketHandler(DEFAULT_ADDR)
- # 确保进程 ID 在输出内容中
- logging.basicConfig(level=logging.DEBUG,
- handlers=[logging.StreamHandler(), handler],
- format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
- levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
- logging.CRITICAL)
- logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
- msgno = 1
- while True:
- # 随机地选择日志记录器和层级并记录日志
- level = random.choice(levels)
- logger = logging.getLogger(random.choice(logger_names))
- logger.log(level, 'Message no. %5d' % msgno)
- msgno += 1
- delay = random.random() * 2 + 0.5
- time.sleep(delay)
你可以在不同的命令行 shell 中运行上面两个代码片段。 如果我们在一个 shell 中运行监听器并在两个不同的 shell 中运行发送器,我们将看到如下的结果。 在第一个发送器 shell 中:
- $ python sender.py
- DEBUG myapp 613 Message no. 1
- WARNING myapp.lib2 613 Message no. 2
- CRITICAL myapp.lib2 613 Message no. 3
- WARNING myapp.lib2 613 Message no. 4
- CRITICAL myapp.lib1 613 Message no. 5
- DEBUG myapp 613 Message no. 6
- CRITICAL myapp.lib1 613 Message no. 7
- INFO myapp.lib1 613 Message no. 8
- (下略)
在第二个发送器 shell 中:
- $ python sender.py
- INFO myapp.lib2 657 Message no. 1
- CRITICAL myapp.lib2 657 Message no. 2
- CRITICAL myapp 657 Message no. 3
- CRITICAL myapp.lib1 657 Message no. 4
- INFO myapp.lib1 657 Message no. 5
- WARNING myapp.lib2 657 Message no. 6
- CRITICAL myapp 657 Message no. 7
- DEBUG myapp.lib1 657 Message no. 8
- (下略)
在监听器 shell 中:
- $ python listener.py
- Press Ctrl-C to stop.
- DEBUG myapp 613 Message no. 1
- WARNING myapp.lib2 613 Message no. 2
- INFO myapp.lib2 657 Message no. 1
- CRITICAL myapp.lib2 613 Message no. 3
- CRITICAL myapp.lib2 657 Message no. 2
- CRITICAL myapp 657 Message no. 3
- WARNING myapp.lib2 613 Message no. 4
- CRITICAL myapp.lib1 613 Message no. 5
- CRITICAL myapp.lib1 657 Message no. 4
- INFO myapp.lib1 657 Message no. 5
- DEBUG myapp 613 Message no. 6
- WARNING myapp.lib2 657 Message no. 6
- CRITICAL myapp 657 Message no. 7
- CRITICAL myapp.lib1 613 Message no. 7
- INFO myapp.lib1 613 Message no. 8
- DEBUG myapp.lib1 657 Message no. 8
- (下略)
如你所见,来自两个发送器进程的日志记录会在监听器的输出中交错出现。
基于字典进行日志配置的示例
以下是日志配置字典的一个示例——它取自 Django 项目的文档<[https://docs.djangoproject.com/en/stable/topics/logging/#configuring-logging](https://docs.djangoproject.com/en/stable/topics/logging/#configuring-logging)>
_。此字典将被传给 dictConfig()
以使配置生效:
- LOGGING = {
- 'version': 1,
- 'disable_existing_loggers': False,
- 'formatters': {
- 'verbose': {
- 'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
- 'style': '{',
- },
- 'simple': {
- 'format': '{levelname} {message}',
- 'style': '{',
- },
- },
- 'filters': {
- 'special': {
- '()': 'project.logging.SpecialFilter',
- 'foo': 'bar',
- },
- },
- 'handlers': {
- 'console': {
- 'level': 'INFO',
- 'class': 'logging.StreamHandler',
- 'formatter': 'simple',
- },
- 'mail_admins': {
- 'level': 'ERROR',
- 'class': 'django.utils.log.AdminEmailHandler',
- 'filters': ['special']
- }
- },
- 'loggers': {
- 'django': {
- 'handlers': ['console'],
- 'propagate': True,
- },
- 'django.request': {
- 'handlers': ['mail_admins'],
- 'level': 'ERROR',
- 'propagate': False,
- },
- 'myproject.custom': {
- 'handlers': ['console', 'mail_admins'],
- 'level': 'INFO',
- 'filters': ['special']
- }
- }
- }
有关本配置的更多信息,请参阅 Django 文档的 有关章节 [https://docs.djangoproject.com/en/stable/topics/logging/#configuring-logging] 。
利用 rotator 和 namer 自定义日志轮换操作
下面的可运行代码给出了你可以怎样定义命名器和轮换器的例子,其中演示了日志文件的 gzip 压缩过程:
- import gzip
- import logging
- import logging.handlers
- import os
- import shutil
- def namer(name):
- return name + ".gz"
- def rotator(source, dest):
- with open(source, 'rb') as f_in:
- with gzip.open(dest, 'wb') as f_out:
- shutil.copyfileobj(f_in, f_out)
- os.remove(source)
- rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
- rh.rotator = rotator
- rh.namer = namer
- root = logging.getLogger()
- root.setLevel(logging.INFO)
- root.addHandler(rh)
- f = logging.Formatter('%(asctime)s %(message)s')
- rh.setFormatter(f)
- for i in range(1000):
- root.info(f'Message no. {i + 1}')
运行此脚本后,你将看到六个新文件,其中五个是已压缩的:
- $ ls rotated.log*
- rotated.log rotated.log.2.gz rotated.log.4.gz
- rotated.log.1.gz rotated.log.3.gz rotated.log.5.gz
- $ zcat rotated.log.1.gz
- 2023-01-20 02:28:17,767 Message no. 996
- 2023-01-20 02:28:17,767 Message no. 997
- 2023-01-20 02:28:17,767 Message no. 998
更加详细的多道处理示例
以下可运行的示例显示了如何利用配置文件在多进程中应用日志。这些配置相当简单,但足以说明如何在真实的多进程场景中实现较为复杂的配置。
上述示例中,主进程产生一个侦听器进程和一些工作进程。每个主进程、侦听器进程和工作进程都有三种独立的日志配置(工作进程共享同一套配置)。大家可以看到主进程的日志记录过程、工作线程向 QueueHandler 写入日志的过程,以及侦听器实现 QueueListener 和较为复杂的日志配置,如何将由队列接收到的事件分发给配置指定的 handler。请注意,这些配置纯粹用于演示,但应该能调整代码以适用于自己的场景。
以下是代码——但愿文档字符串和注释能有助于理解其工作原理:
- import logging
- import logging.config
- import logging.handlers
- from multiprocessing import Process, Queue, Event, current_process
- import os
- import random
- import time
- class MyHandler: """
- A simple handler for logging events. It runs in the listener process and
- dispatches events to loggers based on the name in the received record,
- which then get dispatched, by the logging system, to the handlers
- configured for those loggers.
- """
- def handle(self, record):
- if record.name == "root":
- logger = logging.getLogger()
- else:
- logger = logging.getLogger(record.name)
- if logger.isEnabledFor(record.levelno):
- # 进程名称经过变换以演示是由监听器来执行
- # 记录日志到文件和控制台
- record.processName = '%s (for %s)' % (current_process().name, record.processName)
- logger.handle(record)
- def listener_process(q, stop_event, config): """
- This could be done in the main process, but is just done in a separate
- process for illustrative purposes.
- This initialises logging according to the specified configuration,
- starts the listener and waits for the main process to signal completion
- via the event. The listener is then stopped, and the process exits.
- """
- logging.config.dictConfig(config)
- listener = logging.handlers.QueueListener(q, MyHandler())
- listener.start()
- if os.name == 'posix':
- # 在 POSIX 系统上,setup 日志记录器将会在
- # 父进程中完成配置,但应当在 dictConfig 调用
- # 之后即已被禁用。
- # 在 Windows 上,由于不会使用 fork,setup 日志记录器
- # 将不会在子进程中退出,因此它将被创建并且显示消息
- # —— 对应 "if posix" 子句。
- logger = logging.getLogger('setup')
- logger.critical('Should not appear, because of disabled logger ...')
- stop_event.wait()
- listener.stop()
- def worker_process(config): """
- A number of these are spawned for the purpose of illustration. In
- practice, they could be a heterogeneous bunch of processes rather than
- ones which are identical to each other.
- This initialises logging according to the specified configuration,
- and logs a hundred messages with random levels to randomly selected
- loggers.
- A small sleep is added to allow other processes a chance to run. This
- is not strictly needed, but it mixes the output from the different
- processes a bit more than if it's left out.
- """
- logging.config.dictConfig(config)
- levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
- logging.CRITICAL]
- loggers = ['foo', 'foo.bar', 'foo.bar.baz',
- 'spam', 'spam.ham', 'spam.ham.eggs']
- if os.name == 'posix':
- # 在 POSIX 系统上,setup 日志记录器将会在
- # 父进程中完成配置,但应当在 dictConfig 调用
- # 之后已被禁用。
- # 在 Windows 上,由于不会使用 fork,setup 日志记录器
- # 将不会在子进程中退出,因此它将被创建并且显示消息
- # —— 对应 "if posix" 子句。
- logger = logging.getLogger('setup')
- logger.critical('Should not appear, because of disabled logger ...')
- for i in range(100):
- lvl = random.choice(levels)
- logger = logging.getLogger(random.choice(loggers))
- logger.log(lvl, 'Message no. %d', i)
- time.sleep(0.01)
- def main():
- q = Queue()
- # 主进程将获得一个打印到控制台的简单配置。
- config_initial = {
- 'version': 1,
- 'handlers': {
- 'console': {
- 'class': 'logging.StreamHandler',
- 'level': 'INFO'
- }
- },
- 'root': {
- 'handlers': ['console'],
- 'level': 'DEBUG'
- }
- }
- # 工作进程配置就是一个附加到根日志记录器的 QueueHandler,
- # 它允许所有消息被发送至队列 。
- # 我们禁用现有的日志记录器以禁用在父进程中使用的 "setup"
- # 日志记录器。 这在 POSIX 中是必需的因为日志记录器将会在
- # fork() 之后出现在子进程中。
- config_worker = {
- 'version': 1,
- 'disable_existing_loggers': True,
- 'handlers': {
- 'queue': {
- 'class': 'logging.handlers.QueueHandler',
- 'queue': q
- }
- },
- 'root': {
- 'handlers': ['queue'],
- 'level': 'DEBUG'
- }
- }
- # 监听器进程配置显示可以使用日志记录配置的
- # 完整适应性以便以你希望的方式将事件分发给
- # 处理器。
- # 我们禁用现有的日志记录器以禁用在父进程中使用的
- # "setup" 日志记录器。 这在 POSIX 中是必需的因为
- # 日志记录器将会在 fork() 之后出现在子进程中。
- config_listener = {
- 'version': 1,
- 'disable_existing_loggers': True,
- 'formatters': {
- 'detailed': {
- 'class': 'logging.Formatter',
- 'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
- },
- 'simple': {
- 'class': 'logging.Formatter',
- 'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
- }
- },
- 'handlers': {
- 'console': {
- 'class': 'logging.StreamHandler',
- 'formatter': 'simple',
- 'level': 'INFO'
- },
- 'file': {
- 'class': 'logging.FileHandler',
- 'filename': 'mplog.log',
- 'mode': 'w',
- 'formatter': 'detailed'
- },
- 'foofile': {
- 'class': 'logging.FileHandler',
- 'filename': 'mplog-foo.log',
- 'mode': 'w',
- 'formatter': 'detailed'
- },
- 'errors': {
- 'class': 'logging.FileHandler',
- 'filename': 'mplog-errors.log',
- 'mode': 'w',
- 'formatter': 'detailed',
- 'level': 'ERROR'
- }
- },
- 'loggers': {
- 'foo': {
- 'handlers': ['foofile']
- }
- },
- 'root': {
- 'handlers': ['console', 'file', 'errors'],
- 'level': 'DEBUG'
- }
- }
- # 记录一些初始事件,以便显示父进程中的日志记录
- # 工作正常。
- logging.config.dictConfig(config_initial)
- logger = logging.getLogger('setup')
- logger.info('About to create workers ...')
- workers = []
- for i in range(5):
- wp = Process(target=worker_process, name='worker %d' % (i + 1),
- args=(config_worker,))
- workers.append(wp)
- wp.start()
- logger.info('Started worker: %s', wp.name)
- logger.info('About to create listener ...')
- stop_event = Event()
- lp = Process(target=listener_process, name='listener',
- args=(q, stop_event, config_listener))
- lp.start()
- logger.info('Started listener')
- # 我们现在要等待工作进程完成其工作。
- for wp in workers:
- wp.join()
- # 工作进程全部结束,现在可以停止监听。
- # 父进程中的日志记录仍然正常进行。
- logger.info('Telling listener to stop ...')
- stop_event.set()
- lp.join()
- logger.info('All done.')
- if __name__ == '__main__':
- main()
在发送给 SysLogHandler 的信息中插入一个 BOM。
RFC 5424 [https://datatracker.ietf.org/doc/html/rfc5424.html] 要求,Unicode 信息应采用字节流形式发送到系统 syslog 守护程序,字节流结构如下所示:可选的纯 ASCII部分,后跟 UTF-8 字节序标记(BOM),然后是采用 UTF-8 编码的 Unicode。(参见 相关规范 [https://datatracker.ietf.org/doc/html/rfc5424.html#section-6] 。)
在 Python 3.1 的 SysLogHandler
中,已加入了在日志信息中插入 BOM 的代码,但不幸的是,代码并不正确,BOM 出现在了日志信息的开头,因此在它之前就不允许出现纯 ASCII 内容了。
由于无法正常工作, Python 3.2.4 以上版本已删除了出错的插入 BOM 代码。但已有版本的代码不会被替换,若要生成与 RFC 5424 [https://datatracker.ietf.org/doc/html/rfc5424.html] 兼容的日志信息,包括一个 BOM 符,前面有可选的纯 ASCII 字节流,后面为 UTF-8 编码的任意 Unicode,那么 需要执行以下操作:
- 为
SysLogHandler
实例串上一个Formatter
实例,格式串可如下:
- 'ASCII section\ufeffUnicode section'
用 UTF-8 编码时,Unicode 码位 U+FEFF 将会编码为 UTF-8 BOM——字节串 b'\xef\xbb\xbf'
。
用任意占位符替换 ASCII 部分,但要保证替换之后的数据一定是 ASCII 码(这样在 UTF-8 编码后就会维持不变)。
用任意占位符替换 Unicode 部分;如果替换后的数据包含超出 ASCII 范围的字符,没问题——他们将用 UTF-8 进行编码。
SysLogHandler
将 对格式化后的日志信息进行 UTF-8 编码。如果遵循上述规则,应能生成符合 RFC 5424 [https://datatracker.ietf.org/doc/html/rfc5424.html] 的日志信息。否则,日志记录过程可能不会有什么反馈,但日志信息将不与 RFC 5424 兼容,syslog 守护程序可能会有出错反应。
结构化日志的实现代码
大多数日志信息是供人阅读的,所以机器解析起来并不容易,但某些时候可能希望以结构化的格式输出,以 能够 被程序解析(无需用到复杂的正则表达式)。这可以直接用 logging 包实现。实现方式有很多,以下是一种比较简单的方案,利用 JSON 以机器可解析的方式对事件信息进行序列化:
- import json
- import logging
- class StructuredMessage:
- def __init__(self, message, /, **kwargs):
- self.message = message
- self.kwargs = kwargs
- def __str__(self):
- return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
- _ = StructuredMessage # 可选项,用于提升可读性
- logging.basicConfig(level=logging.INFO, format='%(message)s')
- logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
上述代码运行后的结果是:
- message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
请注意,根据 Python 版本的不同,各项数据的输出顺序可能会不一样。
若需进行更为定制化的处理,可以使用自定义 JSON 编码对象,下面给出完整示例:
- import json
- import logging
- class Encoder(json.JSONEncoder):
- def default(self, o):
- if isinstance(o, set):
- return tuple(o)
- elif isinstance(o, str):
- return o.encode('unicode_escape').decode('ascii')
- return super().default(o)
- class StructuredMessage:
- def __init__(self, message, /, **kwargs):
- self.message = message
- self.kwargs = kwargs
- def __str__(self):
- s = Encoder().encode(self.kwargs)
- return '%s >>> %s' % (self.message, s)
- _ = StructuredMessage # 可选项,用于提升可读性
- def main():
- logging.basicConfig(level=logging.INFO, format='%(message)s')
- logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
- if __name__ == '__main__':
- main()
上述代码运行后的结果是:
- message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
请注意,根据 Python 版本的不同,各项数据的输出顺序可能会不一样。
利用 dictConfig()
自定义 handler
有时需要以特定方式自定义日志 handler,如果采用 dictConfig()
,可能无需生成子类就可以做到。比如要设置日志文件的所有权。在 POSIX 上,可以利用 shutil.chown()
轻松完成,但 stdlib 中的文件 handler 并不提供内置支持。于是可以用普通函数自定义 handler 的创建,例如:
- def owned_file_handler(filename, mode='a', encoding=None, owner=None):
- if owner:
- if not os.path.exists(filename):
- open(filename, 'a').close()
- shutil.chown(filename, *owner)
- return logging.FileHandler(filename, mode, encoding)
然后,你可以在传给 dictConfig()
的日志配置中指定通过调用此函数来创建日志处理程序:
- LOGGING = {
- 'version': 1,
- 'disable_existing_loggers': False,
- 'formatters': {
- 'default': {
- 'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
- },
- },
- 'handlers': {
- 'file':{
- # 下面的值将从该字典中弹出并被用来
- # 创建处理器,设置处理器的层级及其
- # 格式化器:
- '()': owned_file_handler,
- 'level':'DEBUG',
- 'formatter': 'default',
- # 下面的值将以关键字参数形式传给
- # 处理器调用方的可调用对象。
- 'owner': ['pulse', 'pulse'],
- 'filename': 'chowntest.log',
- 'mode': 'w',
- 'encoding': 'utf-8',
- },
- },
- 'root': {
- 'handlers': ['file'],
- 'level': 'DEBUG',
- },
- }
出于演示目的,以下示例设置用户和用户组为 pulse
。代码置于一个可运行的脚本文件 chowntest.py
中:
- import logging, logging.config, os, shutil
- def owned_file_handler(filename, mode='a', encoding=None, owner=None):
- if owner:
- if not os.path.exists(filename):
- open(filename, 'a').close()
- shutil.chown(filename, *owner)
- return logging.FileHandler(filename, mode, encoding)
- LOGGING = {
- 'version': 1,
- 'disable_existing_loggers': False,
- 'formatters': {
- 'default': {
- 'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
- },
- },
- 'handlers': {
- 'file':{
- # 下面的值将从此字典中被弹出并被用来
- # 创建处理器、设置处理器的层级
- # 及其格式化器。
- '()': owned_file_handler,
- 'level':'DEBUG',
- 'formatter': 'default',
- # 下面的值将以关键字参数形式传给处理器的
- # 创建方可调用对象。
- 'owner': ['pulse', 'pulse'],
- 'filename': 'chowntest.log',
- 'mode': 'w',
- 'encoding': 'utf-8',
- },
- },
- 'root': {
- 'handlers': ['file'],
- 'level': 'DEBUG',
- },
- }
- logging.config.dictConfig(LOGGING)
- logger = logging.getLogger('mylogger')
- logger.debug('A debug message')
可能需要 root
权限才能运行:
- $ sudo python3.3 chowntest.py
- $ cat chowntest.log
- 2013-11-05 09:34:51,128 DEBUG mylogger A debug message
- $ ls -l chowntest.log
- -rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
请注意此示例用的是 Python 3.3,因为 shutil.chown()
是从此版本开始出现的。 此方式应当适用于任何支持 dictConfig()
的 Python 版本 —— 例如 Python 2.7, 3.2 或更新的版本。 对于 3.3 之前的版本,你应当使用 os.chown()
之类的函数来实现实际的所有权修改。
实际应用中,handler 的创建函数可能位于项目的工具模块中。以下配置:
- '()': owned_file_handler,
应使用:
- '()': 'ext://project.util.owned_file_handler',
这里的 project.util
可以换成函数所在包的实际名称。 在上述的可用脚本中,应该可以使用 'ext://__main__.owned_file_handler'
。 在这里,实际的可调用对象是由 dictConfig()
从 ext://
说明中解析出来的。
上述示例还指明了其他的文件修改类型的实现方案 —— 比如同样利用 os.chmod()
设置 POSIX 访问权限位。
当然,以上做法也可以扩展到 FileHandler
之外的其他类型的 handler ——比如某个轮换文件 handler,或类型完全不同的其他 handler。
生效于整个应用程序的格式化样式
在 Python 3.2 中,Formatter
增加了一个 style
关键字形参,它默认为 %
以便向下兼容,但是允许采用 {
或 $
来支持 str.format()
和 string.Template
所支持的格式化方式。 请注意此形参控制着用用于最终输出到日志的日志消息格式,并且与单独日志消息的构造方式完全无关。
日志调用 (debug()
, info()
等) 只接受包含实际日志消息自身的位置参数,而关键字参数仅用于确定如何处理日志调用的选项 (例如 exc_info
关键字参数表示应将回溯信息记入日志,而 extra
关键字参数则指定要添加到日志的额外上下文信息)。 所以你不能直接使用 str.format()
或 string.Template
语法来直接执行日志调用,因为 logging 包在内部是使用 % 格式符来合并格式字符串和可变参数的。 这一点不应被改变以保持向下兼容性,因为现有代码中所有的日志调用都将使用 % 格式化字符串。
有人建议将格式化样式与特定的日志对象进行关联,但其实也会遇到向下兼容的问题,因为已有代码可能用到了某日志对象并采用了 %-f 格式串。
为了让第三方库和自编代码都能够交互使用日志功能,需要决定在单次日志记录调用级别采用什么格式。于是就出现了其他几种格式化样式方案。
LogRecord 工厂的用法
在 Python 3.2 中,伴随着 Formatter
的上述变化,logging 包增加了允许用户使用 setLogRecordFactory()
函数来。设置自己的 LogRecord
子类的功能。 你可以使用此功能来设置自己的 LogRecord
子类,它会通过重写 getMessage()
方法来完成适当的操作。 msg % args
格式化是在此方法的基类实现中进行的,你可以在那里用你自己的格式化操作来替换;但是,你应当注意要支持全部的格式化样式并允许将 %-formatting 作为默认样式,以确保与其他代码进行配合。 还应当注意调用 str(self.msg)
,正如基类实现所做的一样。
更多信息请参阅 setLogRecordFactory()
和 LogRecord
的参考文档。
自定义信息对象的使用
另一种方案可能更为简单,可以利用 {}- 和 $- 格式构建自己的日志消息。大家或许还记得(来自 使用任意对象作为消息),可以用任意对象作为日志信息的格式串,日志包将调用该对象上 str()
获取实际的格式串。看下以下两个类:
- class BraceMessage:
- def __init__(self, fmt, /, *args, **kwargs):
- self.fmt = fmt
- self.args = args
- self.kwargs = kwargs
- def __str__(self):
- return self.fmt.format(*self.args, **self.kwargs)
- class DollarMessage:
- def __init__(self, fmt, /, **kwargs):
- self.fmt = fmt
- self.kwargs = kwargs
- def __str__(self):
- from string import Template
- return Template(self.fmt).substitute(**self.kwargs)
以上两个类均都可用于替代格式串,以便用 {}- 或 $-formatting 构建实际的“日志信息”部分,此部分将出现在格式化后的日志输出中,替换 %(message)s 、“{message}”或“$message”。每次要写入日志时都使用类名,如果觉得使用不便,可以采用 M
或 _
之类的别名(如果将 _
用于本地化操作,则可用 __
)。
下面给出示例。 首先用 str.format()
进行格式化:
- >>> __ = BraceMessage
- >>> print(__('Message with {0} {1}', 2, 'placeholders'))
- Message with 2 placeholders
- >>> class Point: pass
- ...
- >>> p = Point()
- >>> p.x = 0.5
- >>> p.y = 0.5
- >>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
- Message with coordinates: (0.50, 0.50)
然后,用 string.Template
格式化:
- >>> __ = DollarMessage
- >>> print(__('Message with $num $what', num=2, what='placeholders'))
- Message with 2 placeholders
- >>>
需要注意的是使用这种方式不会对性能造成明显影响:实际的格式化工作不是在日志记录调用时发生的,而是在(如果)处理器即将把日志消息输出到日志时发生的。 因此,唯一可能令人困惑的不寻常之处在于包裹在格式字符串和参数外面的圆括号,而不仅仅是格式字符串。 这是因为 __ 符号只是对上面显示的 XXXMessage
类的构造器的调用的语法糖。
利用 dictConfig()
定义过滤器
用 dictConfig()
可以 对日志过滤器进行设置,尽管乍一看做法并不明显(所以才需要本秘籍)。 由于 Filter
是标准库中唯一的日志过滤器类,不太可能满足众多的要求(它只是作为基类存在),通常需要定义自己的 Filter
子类,并重写 filter()
方法。为此,请在过滤器的配置字典中设置 ()
键,指定要用于创建过滤器的可调用对象(最明显可用的就是给出一个类,但也可以提供任何一个可调用对象,只要能返回 Filter
实例即可)。下面是一个完整的例子:
- import logging
- import logging.config
- import sys
- class MyFilter(logging.Filter):
- def __init__(self, param=None):
- self.param = param
- def filter(self, record):
- if self.param is None:
- allow = True
- else:
- allow = self.param not in record.msg
- if allow:
- record.msg = 'changed: ' + record.msg
- return allow
- LOGGING = {
- 'version': 1,
- 'filters': {
- 'myfilter': {
- '()': MyFilter,
- 'param': 'noshow',
- }
- },
- 'handlers': {
- 'console': {
- 'class': 'logging.StreamHandler',
- 'filters': ['myfilter']
- }
- },
- 'root': {
- 'level': 'DEBUG',
- 'handlers': ['console']
- },
- }
- if __name__ == '__main__':
- logging.config.dictConfig(LOGGING)
- logging.debug('hello')
- logging.debug('hello - noshow')
以上示例展示了将配置数据传给构造实例的可调用对象,形式是关键字参数。运行后将会输出:
- changed: hello
这说明过滤器按照配置的参数生效了。
需要额外注意的地方:
如果在配置中无法直接引用可调用对象(比如位于不同的模块中,并且不能在配置字典所在的位置直接导入),则可以采用
ext://…
的形式,正如 访问外部对象 所述。例如,在上述示例中可以使用文本'ext://__main__.MyFilter'
而不是MyFilter
对象。与过滤器一样,上述技术还可用于配置自定义 handler 和格式化对象。有关如何在日志配置中使用用户自定义对象的更多信息,请参阅 用户定义对象,以及上述 利用 dictConfig() 自定义 handler 的其他指南。
异常信息的自定义格式化
有时可能需要设置自定义的异常信息格式——考虑到会用到参数,假定要让每条日志事件只占一行,即便存在异常信息也一样。这可以用自定义格式化类来实现,如下所示:
- import logging
- class OneLineExceptionFormatter(logging.Formatter):
- def formatException(self, exc_info): """
- Format an exception so that it prints on a single line.
- """
- result = super().formatException(exc_info)
- return repr(result) # 或格式化为任何你想要的单行内容
- def format(self, record):
- s = super().format(record)
- if record.exc_text:
- s = s.replace('\n', '') + '|'
- return s
- def configure_logging():
- fh = logging.FileHandler('output.txt', 'w')
- f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
- '%d/%m/%Y %H:%M:%S')
- fh.setFormatter(f)
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- root.addHandler(fh)
- def main():
- configure_logging()
- logging.info('Sample message')
- try:
- x = 1 / 0
- except ZeroDivisionError as e:
- logging.exception('ZeroDivisionError: %s', e)
- if __name__ == '__main__':
- main()
运行后将会生成只有两行信息的文件:
- 28/01/2015 07:21:23|INFO|Sample message|
- 28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
虽然上述处理方式很简单,但也给出了根据喜好对异常信息进行格式化输出的方案。或许 traceback
模块能满足更专门的需求。
语音播报日志信息
有时可能需要以声音的形式呈现日志消息。如果系统自带了文本转语音 (TTS)功能,即便没与 Python 关联也很容易做到。大多数 TTS 系统都有一个可运行的命令行程序,在 handler 中可以用 subprocess
进行调用。这里假定 TTS 命令行程序不会与用户交互,或需要很长时间才会执行完毕,写入日志的信息也不会多到影响用户查看,并且可以接受每次播报一条信息,以下示例实现了等一条信息播完再处理下一条,可能会导致其他 handler 的等待。这个简短示例仅供演示,假定 espeak
TTS 包已就绪:
- import logging
- import subprocess
- import sys
- class TTSHandler(logging.Handler):
- def emit(self, record):
- msg = self.format(record)
- # 以女性的英语语音慢速地说话
- cmd = ['espeak', '-s150', '-ven+f3', msg]
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- # 等待程序结束
- p.communicate()
- def configure_logging():
- h = TTSHandler()
- root = logging.getLogger()
- root.addHandler(h)
- # 默认格式化器简单地返回消息
- root.setLevel(logging.DEBUG)
- def main():
- logging.info('Hello')
- logging.debug('Goodbye')
- if __name__ == '__main__':
- configure_logging()
- sys.exit(main())
运行后将会以女声播报“Hello”和“Goodbye”。
当然,上述方案也适用于其他 TTS 系统,甚至可以通过利用命令行运行的外部程序来处理消息。
缓冲日志消息并有条件地输出它们
在某些情况下,你可能希望在临时区域中记录日志消息,并且只在发生某种特定的情况下才输出它们。 例如,你可能希望起始在函数中记录调试事件,如果函数执行完成且没有错误,你不希望输出收集的调试信息以避免造成日志混乱,但如果出现错误,那么你希望所有调试以及错误消息被输出。
下面是一个示例,展示如何在你的日志记录函数上使用装饰器以实现这一功能。该示例使用 logging.handlers.MemoryHandler
,它允许缓冲已记录的事件直到某些条件发生,缓冲的事件才会被刷新(flushed
) - 传递给另一个处理程序( target
handler)进行处理。 默认情况下, MemoryHandler
在其缓冲区被填满时被刷新,或者看到一个级别大于或等于指定阈值的事件。 如果想要自定义刷新行为,你可以通过更专业的 MemoryHandler
子类来使用这个秘诀。
这个示例脚本有一个简单的函数 foo
,它只是在所有的日志级别中循环运行,写到 sys.stderr
,说明它要记录在哪个级别上,然后在这个级别上实际记录一个消息。你可以给 foo
传递一个参数,如果为 true ,它将在ERROR和CRITICAL级别记录,否则,它只在DEBUG、INFO和WARNING级别记录。
脚本只是使用了一个装饰器来装饰 foo
,这个装饰器将记录执行所需的条件。装饰器使用一个记录器作为参数,并在调用被装饰的函数期间附加一个内存处理程序。装饰器可以使用目标处理程序、记录级别和缓冲区的容量(缓冲记录的数量)来附加参数。这些参数分别默认为写入 sys.stderr
的 StreamHandler
, logging.ERROR
和 100
。
以下是脚本:
- import logging
- from logging.handlers import MemoryHandler
- import sys
- logger = logging.getLogger(__name__)
- logger.addHandler(logging.NullHandler())
- def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
- if target_handler is None:
- target_handler = logging.StreamHandler()
- if flush_level is None:
- flush_level = logging.ERROR
- if capacity is None:
- capacity = 100
- handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
- def decorator(fn):
- def wrapper(*args, **kwargs):
- logger.addHandler(handler)
- try:
- return fn(*args, **kwargs)
- except Exception:
- logger.exception('call failed')
- raise
- finally:
- super(MemoryHandler, handler).flush()
- logger.removeHandler(handler)
- return wrapper
- return decorator
- def write_line(s):
- sys.stderr.write('%s\n' % s)
- def foo(fail=False):
- write_line('about to log at DEBUG ...')
- logger.debug('Actually logged at DEBUG')
- write_line('about to log at INFO ...')
- logger.info('Actually logged at INFO')
- write_line('about to log at WARNING ...')
- logger.warning('Actually logged at WARNING')
- if fail:
- write_line('about to log at ERROR ...')
- logger.error('Actually logged at ERROR')
- write_line('about to log at CRITICAL ...')
- logger.critical('Actually logged at CRITICAL')
- return fail
- decorated_foo = log_if_errors(logger)(foo)
- if __name__ == '__main__':
- logger.setLevel(logging.DEBUG)
- write_line('Calling undecorated foo with False')
- assert not foo(False)
- write_line('Calling undecorated foo with True')
- assert foo(True)
- write_line('Calling decorated foo with False')
- assert not decorated_foo(False)
- write_line('Calling decorated foo with True')
- assert decorated_foo(True)
运行此脚本时,应看到以下输出:
- Calling undecorated foo with False
- about to log at DEBUG ...
- about to log at INFO ...
- about to log at WARNING ...
- Calling undecorated foo with True
- about to log at DEBUG ...
- about to log at INFO ...
- about to log at WARNING ...
- about to log at ERROR ...
- about to log at CRITICAL ...
- Calling decorated foo with False
- about to log at DEBUG ...
- about to log at INFO ...
- about to log at WARNING ...
- Calling decorated foo with True
- about to log at DEBUG ...
- about to log at INFO ...
- about to log at WARNING ...
- about to log at ERROR ...
- Actually logged at DEBUG
- Actually logged at INFO
- Actually logged at WARNING
- Actually logged at ERROR
- about to log at CRITICAL ...
- Actually logged at CRITICAL
如你所见,实际日志记录输出仅在消息等级为ERROR或更高的事件时发生,但在这种情况下,任何之前较低消息等级的事件还会被记录。
你当然可以使用传统的装饰方法:
- @log_if_errors(logger)
- def foo(fail=False):
- …
将日志消息发送至电子邮件,附带缓存支持
为演示如何通过电子邮件发送日志消息,让每封电子邮件发送指定数量的日志消息,你可以子类化 BufferingHandler
。 对于下面的例子,你可以继续调整以适合你自己的特定需求,它提供了简单的测试代码来允许你附带命令行参数运行该脚本来指定你需要通过 SMTP 发送的内容。 (请附带 -h
参数运行已下载的脚本来查看必须的和可选的参数。)
- import logging
- import logging.handlers
- import smtplib
- class BufferingSMTPHandler(logging.handlers.BufferingHandler):
- def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
- subject, capacity):
- logging.handlers.BufferingHandler.__init__(self, capacity)
- self.mailhost = mailhost
- self.mailport = port
- self.username = username
- self.password = password
- self.fromaddr = fromaddr
- if isinstance(toaddrs, str):
- toaddrs = [toaddrs]
- self.toaddrs = toaddrs
- self.subject = subject
- self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))
- def flush(self):
- if len(self.buffer) > 0:
- try:
- smtp = smtplib.SMTP(self.mailhost, self.mailport)
- smtp.starttls()
- smtp.login(self.username, self.password)
- msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
- for record in self.buffer:
- s = self.format(record)
- msg = msg + s + "\r\n"
- smtp.sendmail(self.fromaddr, self.toaddrs, msg)
- smtp.quit()
- except Exception:
- if logging.raiseExceptions:
- raise
- self.buffer = []
- if __name__ == '__main__':
- import argparse
- ap = argparse.ArgumentParser()
- aa = ap.add_argument
- aa('host', metavar='HOST', help='SMTP server')
- aa('--port', '-p', type=int, default=587, help='SMTP port')
- aa('user', metavar='USER', help='SMTP username')
- aa('password', metavar='PASSWORD', help='SMTP password')
- aa('to', metavar='TO', help='Addressee for emails')
- aa('sender', metavar='SENDER', help='Sender email address')
- aa('--subject', '-s',
- default='Test Logging email from Python logging module (buffering)',
- help='Subject of email')
- options = ap.parse_args()
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG)
- h = BufferingSMTPHandler(options.host, options.port, options.user,
- options.password, options.sender,
- options.to, options.subject, 10)
- logger.addHandler(h)
- for i in range(102):
- logger.info("Info index = %d", i)
- h.flush()
- h.close()
如果你运行此脚本并且你的 SMTP 服务器已正确设置,你将发现它会向你指定的地址发出十一封电子邮件。 前十封邮件每封各有十条日志消息,第十一封将有两条消息。 如脚本所指定的总计为 102 条消息。
通过配置使用UTC (GMT) 格式化时间
有时你会想要使用 UTC 时间格式,这可以用 UTCFormatter
这样的类来实现,如下所示:
- import logging
- import time
- class UTCFormatter(logging.Formatter):
- converter = time.gmtime
然后你可以在你的代码中使用 UTCFormatter
,而不是 Formatter
。 如果你想通过配置来实现这一功能,你可以使用 dictConfig()
API 来完成,该方法在以下完整示例中展示:
- import logging
- import logging.config
- import time
- class UTCFormatter(logging.Formatter):
- converter = time.gmtime
- LOGGING = {
- 'version': 1,
- 'disable_existing_loggers': False,
- 'formatters': {
- 'utc': {
- '()': UTCFormatter,
- 'format': '%(asctime)s %(message)s',
- },
- 'local': {
- 'format': '%(asctime)s %(message)s',
- }
- },
- 'handlers': {
- 'console1': {
- 'class': 'logging.StreamHandler',
- 'formatter': 'utc',
- },
- 'console2': {
- 'class': 'logging.StreamHandler',
- 'formatter': 'local',
- },
- },
- 'root': {
- 'handlers': ['console1', 'console2'],
- }
- }
- if __name__ == '__main__':
- logging.config.dictConfig(LOGGING)
- logging.warning('The local time is %s', time.asctime())
脚本会运行输出类似下面的内容:
- 2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
- 2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
展示了如何将时间格式化为本地时间和UTC两种形式,其中每种形式对应一个日志处理器 。
使用上下文管理器的可选的日志记录
有时候,我们需要暂时更改日志配置,并在执行某些操作后将其还原。为此,上下文管理器是实现保存和恢复日志上下文的最明显的方式。这是一个关于上下文管理器的简单例子,它允许你在上下文管理器的作用域内更改日志记录等级以及增加日志处理器:
- import logging
- import sys
- class LoggingContext:
- def __init__(self, logger, level=None, handler=None, close=True):
- self.logger = logger
- self.level = level
- self.handler = handler
- self.close = close
- def __enter__(self):
- if self.level is not None:
- self.old_level = self.logger.level
- self.logger.setLevel(self.level)
- if self.handler:
- self.logger.addHandler(self.handler)
- def __exit__(self, et, ev, tb):
- if self.level is not None:
- self.logger.setLevel(self.old_level)
- if self.handler:
- self.logger.removeHandler(self.handler)
- if self.handler and self.close:
- self.handler.close()
- # 隐式地返回 None => 不捕获异常
如果指定上下文管理器的日志记录等级属性,则在上下文管理器的with语句所涵盖的代码中,日志记录器的记录等级将临时设置为上下文管理器所配置的日志记录等级。 如果指定上下文管理的日志处理器属性,则该句柄在进入上下文管理器的上下文时添加到记录器中,并在退出时被删除。 如果你再也不需要该日志处理器时,你可以让上下文管理器在退出上下文管理器的上下文时关闭它。
为了说明它是如何工作的,我们可以在上面添加以下代码块:
- if __name__ == '__main__':
- logger = logging.getLogger('foo')
- logger.addHandler(logging.StreamHandler())
- logger.setLevel(logging.INFO)
- logger.info('1. This should appear just once on stderr.')
- logger.debug('2. This should not appear.')
- with LoggingContext(logger, level=logging.DEBUG):
- logger.debug('3. This should appear once on stderr.')
- logger.debug('4. This should not appear.')
- h = logging.StreamHandler(sys.stdout)
- with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
- logger.debug('5. This should appear twice - once on stderr and once on stdout.')
- logger.info('6. This should appear just once on stderr.')
- logger.debug('7. This should not appear.')
我们最初设置日志记录器的消息等级为 INFO
,因此消息#1出现,消息#2没有出现。在接下来的 with
代码块中我们暂时将消息等级变更为 DEBUG
,从而消息 #3 出现。在这一代码块退出后,日志记录器的消息等级恢复为 INFO
,从而消息 #4 没有出现。在下一个 with
代码块中,我们再一次将设置消息等级设置为 DEBUG
,同时添加一个将消息写入 sys.stdout
的日志处理器。因此,消息#5在控制台出现两次 (分别通过 stderr
和 stdout
)。在 with
语句完成后,状态与之前一样,因此消息 #6 出现(类似消息 #1),而消息 #7 没有出现(类似消息 #2)。
如果我们运行生成的脚本,结果如下:
- $ python logctx.py
- 1. This should appear just once on stderr.
- 3. This should appear once on stderr.
- 5. This should appear twice - once on stderr and once on stdout.
- 5. This should appear twice - once on stderr and once on stdout.
- 6. This should appear just once on stderr.
我们将 stderr
标准错误重定向到 devnull
,我再次运行生成的脚步,唯一被写入 stdout
标准输出的消息,即我们所能看见的消息,如下:
- $ python logctx.py 2>devnull
- 5. This should appear twice - once on stderr and once on stdout.
再一次,将 stdout
标准输出重定向到 devnull
,我获得如下结果:
- $ python logctx.py >devnull
- 1. This should appear just once on stderr.
- 3. This should appear once on stderr.
- 5. This should appear twice - once on stderr and once on stdout.
- 6. This should appear just once on stderr.
在这种情况下,与预期一致,打印到 stdout
标准输出的消息#5不会出现。
当然,这里描述的方法可以被推广,例如临时附加日志记录过滤器。 请注意,上面的代码适用于Python 2以及Python 3。
命令行日志应用起步
下面的示例提供了如下功能:
根据命令行参数确定日志级别
在单独的文件中分发多条子命令,同一级别的子命令均以一致的方式记录。
最简单的配置用法
假定有一个命令行应用程序,用于停止、启动或重新启动某些服务。为了便于演示,不妨将 app.py
作为应用程序的主代码文件,并在 start.py
、 stop.py
和 restart.py
中实现单独的命令。再假定要通过命令行参数控制应用程序的日志粒度,默认为 logging.INFO
。以下是 app.py
的一个示例:
- import argparse
- import importlib
- import logging
- import os
- import sys
- def main(args=None):
- scriptname = os.path.basename(__file__)
- parser = argparse.ArgumentParser(scriptname)
- levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
- parser.add_argument('--log-level', default='INFO', choices=levels)
- subparsers = parser.add_subparsers(dest='command',
- help='Available commands:')
- start_cmd = subparsers.add_parser('start', help='Start a service')
- start_cmd.add_argument('name', metavar='NAME',
- help='Name of service to start')
- stop_cmd = subparsers.add_parser('stop',
- help='Stop one or more services')
- stop_cmd.add_argument('names', metavar='NAME', nargs='+',
- help='Name of service to stop')
- restart_cmd = subparsers.add_parser('restart',
- help='Restart one or more services')
- restart_cmd.add_argument('names', metavar='NAME', nargs='+',
- help='Name of service to restart')
- options = parser.parse_args()
- # 分发命令的代码可以全都放在此文件中。 只是出于演示目的,
- # 我们将在单独的模块中实现每个命令。
- try:
- mod = importlib.import_module(options.command)
- cmd = getattr(mod, 'command')
- except (ImportError, AttributeError):
- print('Unable to find the code for command \'%s\'' % options.command)
- return 1
- # 这里可以做得更为灵活并从文件或目录加载配置
- logging.basicConfig(level=options.log_level,
- format='%(levelname)s %(name)s %(message)s')
- cmd(options)
- if __name__ == '__main__':
- sys.exit(main())
start
、stop
和 restart
命令可以在单独的模块中实现,启动命令的代码可如下:
- # start.py
- import logging
- logger = logging.getLogger(__name__)
- def command(options):
- logger.debug('About to start %s', options.name)
- # 在此进行实际的命令处理 ...
- logger.info('Started the \'%s\' service.', options.name)
然后是停止命令的代码:
- # stop.py
- import logging
- logger = logging.getLogger(__name__)
- def command(options):
- n = len(options.names)
- if n == 1:
- plural = ''
- services = '\'%s\'' % options.names[0]
- else:
- plural = 's'
- services = ', '.join('\'%s\'' % name for name in options.names)
- i = services.rfind(', ')
- services = services[:i] + ' and ' + services[i + 2:]
- logger.debug('About to stop %s', services)
- # 在此进行实际的命令处理 ...
- logger.info('Stopped the %s service%s.', services, plural)
重启命令类似:
- # restart.py
- import logging
- logger = logging.getLogger(__name__)
- def command(options):
- n = len(options.names)
- if n == 1:
- plural = ''
- services = '\'%s\'' % options.names[0]
- else:
- plural = 's'
- services = ', '.join('\'%s\'' % name for name in options.names)
- i = services.rfind(', ')
- services = services[:i] + ' and ' + services[i + 2:]
- logger.debug('About to restart %s', services)
- # 在此进行实际的命令处理 ...
- logger.info('Restarted the %s service%s.', services, plural)
如果以默认日志级别运行该程序,会得到以下结果:
- $ python app.py start foo
- INFO start Started the 'foo' service.
- $ python app.py stop foo bar
- INFO stop Stopped the 'foo' and 'bar' services.
- $ python app.py restart foo bar baz
- INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一个单词是日志级别,第二个单词是日志事件所在的模块或包的名称。
如果修改了日志级别,发送给日志的信息就能得以改变。如要显示更多信息,则可:
- $ python app.py --log-level DEBUG start foo
- DEBUG start About to start foo
- INFO start Started the 'foo' service.
- $ python app.py --log-level DEBUG stop foo bar
- DEBUG stop About to stop 'foo' and 'bar'
- INFO stop Stopped the 'foo' and 'bar' services.
- $ python app.py --log-level DEBUG restart foo bar baz
- DEBUG restart About to restart 'foo', 'bar' and 'baz'
- INFO restart Restarted the 'foo', 'bar' and 'baz' services.
若要显示的信息少一些,则:
- $ python app.py --log-level WARNING start foo
- $ python app.py --log-level WARNING stop foo bar
- $ python app.py --log-level WARNING restart foo bar baz
这里的命令不会向控制台输出任何信息,因为没有记录 WARNING
以上级别的日志。
Qt GUI 日志示例
一个时常被提出的问题是 GUI 应用程序要如何记录日志。 Qt [https://www.qt.io/] 框架是一个流行的跨平台 UI 框架,它具有使用 PySide2 [https://pypi.org/project/PySide2/] 或 PyQt5 [https://pypi.org/project/PyQt5/] 库的 Python 绑定。
下面的例子演示了将日志写入 Qt GUI 程序的过程。这里引入了一个简单的 QtHandler
类,参数是一个可调用对象,其应为嵌入主线程某个“槽位”中运行的,因为GUI 的更新由主线程完成。这里还创建了一个工作线程,以便演示由 UI(通过人工点击日志按钮)和后台工作线程(此处只是记录级别和时间间隔均随机生成的日志信息)将日志写入 GUI 的过程。
该工作线程是用 Qt 的 QThread
类实现的,而不是 threading
模块,因为某些情况下只能采用 `QThread
,它与其他 Qt
组件的集成性更好一些。
此代码应当适用于最新的 PySide6
, PyQt6
, PySide2
或 PyQt5
发布版。 你也可以将此做法适配到更早的 Qt 版本。 请参阅代码片段中的注释来获取更详细的信息。
- import datetime
- import logging
- import random
- import sys
- import time
- # 处理不同 Qt 包之间的微小差异
- try:
- from PySide6 import QtCore, QtGui, QtWidgets
- Signal = QtCore.Signal
- Slot = QtCore.Slot
- except ImportError:
- try:
- from PyQt6 import QtCore, QtGui, QtWidgets
- Signal = QtCore.pyqtSignal
- Slot = QtCore.pyqtSlot
- except ImportError:
- try:
- from PySide2 import QtCore, QtGui, QtWidgets
- Signal = QtCore.Signal
- Slot = QtCore.Slot
- except ImportError:
- from PyQt5 import QtCore, QtGui, QtWidgets
- Signal = QtCore.pyqtSignal
- Slot = QtCore.pyqtSlot
- logger = logging.getLogger(__name__)
- #
- # 信号需要被包含在 QObject 或其子类中以便能够正确地
- # 初始化
- #
- class Signaller(QtCore.QObject):
- signal = Signal(str, logging.LogRecord)
- #
- # 输出到 Qt GUI 应当仅发生在主线程中。 因此,本处理器
- # 被设计为接受一个已经设置好运行主线程的槽位函数。
- # 在本示例中,该函数接受一个已格式化的日志消息字符串
- # 参数,以及生成它的日志记录。 已格式化的字符串只是
- # 出于方便考虑 —— 你也可以在槽位函数本身以任意方式
- # 格式化字符串供输出。
- #
- # 你可以指定槽位函数执行任何你想要的 GUI 更新。 处理器
- # 并不知道或关心特定的 UI 元素。
- #
- class QtHandler(logging.Handler):
- def __init__(self, slotfunc, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.signaller = Signaller()
- self.signaller.signal.connect(slotfunc)
- def emit(self, record):
- s = self.format(record)
- self.signaller.signal.emit(s, record)
- #
- # 本示例使用 QThreads,这意味着在 Python 层级中的线程
- # 将为像 "Dummy-1" 的名称。 下面的函数将获得当前线程的
- # Qt 名称。
- #
- def ctname():
- return QtCore.QThread.currentThread().objectName()
- #
- # 用于生成随机的日志记录层级。
- #
- LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
- logging.CRITICAL)
- #
- # 这个工作类代表在一个独立于主线程的线程中完成的工作。
- # 该线程开始执行工作的方式是通过按下一个连接到工作类中
- # 槽位的按钮。
- #
- # 因为 LogRecord 中默认的 threadName 值没有什么用处。
- # 我们增加了一个包含通过上述方式计算的 QThread 的
- # qThreadName,并在一个“额外”字典中传递该值并使用它
- # 将 LogRecord 更新为 QThread 名称。
- #
- # 这个示例工作类将顺序地输出消息,并以数秒的随机延时
- # 进行间隔。
- #
- class Worker(QtCore.QObject):
- @Slot()
- def start(self):
- extra = {'qThreadName': ctname() }
- logger.debug('Started work', extra=extra)
- i = 1
- # 让线程运行直到被中断。 这允许合理并且清晰的
- # 线程终结。
- while not QtCore.QThread.currentThread().isInterruptionRequested():
- delay = 0.5 + random.random() * 2
- time.sleep(delay)
- try:
- if random.random() < 0.1:
- raise ValueError('Exception raised: %d' % i)
- else:
- level = random.choice(LEVELS)
- logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
- except ValueError as e:
- logger.exception('Failed: %s', e, extra=extra)
- i += 1
- #
- # 为本专题指南示例实现一个简单的 UI。 其中包含:
- #
- # * 一个只读的文本编辑窗口用以显示已格式化的日志消息
- # * 一个按钮用以开始工作并在单独线程中记录日志内容
- # * 一个按钮用以以记录来自主线程的日志内容
- # * 一个按钮用以清空日志窗口
- #
- class Window(QtWidgets.QWidget):
- COLORS = {
- logging.DEBUG: 'black',
- logging.INFO: 'blue',
- logging.WARNING: 'orange',
- logging.ERROR: 'red',
- logging.CRITICAL: 'purple',
- }
- def __init__(self, app):
- super().__init__()
- self.app = app
- self.textedit = te = QtWidgets.QPlainTextEdit(self)
- # 设置系统平台所使用的默认等宽字体
- f = QtGui.QFont('nosuchfont')
- if hasattr(f, 'Monospace'):
- f.setStyleHint(f.Monospace)
- else:
- f.setStyleHint(f.StyleHint.Monospace) # 针对 Qt6
- te.setFont(f)
- te.setReadOnly(True)
- PB = QtWidgets.QPushButton
- self.work_button = PB('Start background work', self)
- self.log_button = PB('Log a message at a random level', self)
- self.clear_button = PB('Clear log window', self)
- self.handler = h = QtHandler(self.update_status)
- # 记得在格式字符串中使用 qThreadName 而非 threadName。
- fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
- formatter = logging.Formatter(fs)
- h.setFormatter(formatter)
- logger.addHandler(h)
- # 设置当退出时终结 QThread
- app.aboutToQuit.connect(self.force_quit)
- # 对所有控件进行布局
- layout = QtWidgets.QVBoxLayout(self)
- layout.addWidget(te)
- layout.addWidget(self.work_button)
- layout.addWidget(self.log_button)
- layout.addWidget(self.clear_button)
- self.setFixedSize(900, 400)
- # 连接非工作槽位和信号
- self.log_button.clicked.connect(self.manual_update)
- self.clear_button.clicked.connect(self.clear_display)
- # 启动一个新工作线程并为其连接槽位
- self.start_thread()
- self.work_button.clicked.connect(self.worker.start)
- # 一旦启动,该按钮应当被禁用
- self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
- def start_thread(self):
- self.worker = Worker()
- self.worker_thread = QtCore.QThread()
- self.worker.setObjectName('Worker')
- self.worker_thread.setObjectName('WorkerThread') # 针对 qThreadName
- self.worker.moveToThread(self.worker_thread)
- # 这将在工作线程中启动一个事件循环
- self.worker_thread.start()
- def kill_thread(self):
- # 告知工作线程停止运行,然后告知它退出并等待
- # 后续发生的事件
- self.worker_thread.requestInterruption()
- if self.worker_thread.isRunning():
- self.worker_thread.quit()
- self.worker_thread.wait()
- else:
- print('worker has already exited.')
- def force_quit(self):
- # 当窗口被关闭时使用
- if self.worker_thread.isRunning():
- self.kill_thread()
- # 下面的函数更新 UI 并在主线程中运行因为槽位是在
- # 那里设置的
- @Slot(str, logging.LogRecord)
- def update_status(self, status, record):
- color = self.COLORS.get(record.levelno, 'black')
- s = '<pre><font color="%s">%s</font></pre>' % (color, status)
- self.textedit.appendHtml(s)
- @Slot()
- def manual_update(self):
- # 此函数使用传入的已格式化消息,但也会使用来自
- # 记录的信息根据其严重程度(层级)以适当的颜色
- # 格式化消息。
- level = random.choice(LEVELS)
- extra = {'qThreadName': ctname() }
- logger.log(level, 'Manually logged!', extra=extra)
- @Slot()
- def clear_display(self):
- self.textedit.clear()
- def main():
- QtCore.QThread.currentThread().setObjectName('MainThread')
- logging.getLogger().setLevel(logging.DEBUG)
- app = QtWidgets.QApplication(sys.argv)
- example = Window(app)
- example.show()
- if hasattr(app, 'exec'):
- rc = app.exec()
- else:
- rc = app.exec_()
- sys.exit(rc)
- if __name__=='__main__':
- main()
将日志记录到带有 RFC5424 支持的 syslog
虽然 RFC 5424 [https://datatracker.ietf.org/doc/html/rfc5424.html] 在 2009 年就已发布,但大多数 syslog 服务器都默认被配置为使用更旧的 RFC 3164 [https://datatracker.ietf.org/doc/html/rfc3164.html],它发布于 2001 年。 当 logging
在 2003 年被加入 Python 时,它支持了当时(唯一存在的)较早版本的协议。 自从 RFC5424 发布后,因为它还未被广泛部署到 syslog 服务器上,因此 SysLogHandler
的功能也没有被更新。
RFC 5424 包括一些有用的特性例如对结构化数据的支持等,如果你想要能够将日志记录到带有该协议支持的 syslog 服务器上,你可以使用一个看起来像是这样的子类化处理器来实现:
- import datetime
- import logging.handlers
- import re
- import socket
- import time
- class SysLogHandler5424(logging.handlers.SysLogHandler):
- tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
- escaped = re.compile(r'([\]"\\])')
- def __init__(self, *args, **kwargs):
- self.msgid = kwargs.pop('msgid', None)
- self.appname = kwargs.pop('appname', None)
- super().__init__(*args, **kwargs)
- def format(self, record):
- version = 1
- asctime = datetime.datetime.fromtimestamp(record.created).isoformat()
- m = self.tz_offset.match(time.strftime('%z'))
- has_offset = False
- if m and time.timezone:
- hrs, mins = m.groups()
- if int(hrs) or int(mins):
- has_offset = True
- if not has_offset:
- asctime += 'Z'
- else:
- asctime += f'{hrs}:{mins}'
- try:
- hostname = socket.gethostname()
- except Exception:
- hostname = '-'
- appname = self.appname or '-'
- procid = record.process
- msgid = '-'
- msg = super().format(record)
- sdata = '-'
- if hasattr(record, 'structured_data'):
- sd = record.structured_data
- # 这应当是一个字典,其中的键为 SD-ID 而值则为
- # 将 PARAM-NAME 映射到 PARAM-VALUE 的字典
- # (请参阅 RFC了解其含义)
- # 这里没有错误检查 —— 它只是作为演示,你可以
- # 调整此代码以在生产环境中使用
- parts = []
- def replacer(m):
- g = m.groups()
- return '\\' + g[0]
- for sdid, dv in sd.items():
- part = f'[{sdid}'
- for k, v in dv.items():
- s = str(v)
- s = self.escaped.sub(replacer, s)
- part += f' {k}="{s}"'
- part += ']'
- parts.append(text-part)
- sdata = ''.join(text-parts)
- return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'
你需要熟悉 RFC 5424 才能完全理解上面的代码,你还可能会有稍加变化的的需求(例如你要如何将结构化数据记入日志)。 不管怎样,上面的代码应当根据你的特定需求来灵活调整。 通过上面的处理器,你可以使用类似这样的代码来传入结构化数据:
- sd = {
- 'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
- 'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
- }
- extra = {'structured_data': sd}
- i = 1
- logger.debug('Message %d', i, extra=extra)
如何将日志记录器作为输出流
有时,你需要通过接口访问某个预期要写入到文件型对象第三方 API,但你希望将 API 的输出重定向到一个日志记录器。 你可以使用一个以文件类 API 来包装日志记录器的类。 下面是一个演示这样的类的简短脚本:
- import logging
- class LoggerWriter:
- def __init__(self, logger, level):
- self.logger = logger
- self.level = level
- def write(self, message):
- if message != '\n': # 避免打印空白行,如果你希望如此
- self.logger.log(self.level, message)
- def flush(self):
- # 实际上不做任何事,但对文件型对象来说应当提供
- # —— 因此根据你的情况作为可选项
- pass
- def close(self):
- # 实际上不做任何事,但对文件型对象来说应当提供
- # —— 因此根据你的情况作为可选项。 你可能会希望
- # 设置一个旗标以便后续的写入调用引发异常
- pass
- def main():
- logging.basicConfig(level=logging.DEBUG)
- logger = logging.getLogger('demo')
- info_fp = LoggerWriter(logger, logging.INFO)
- debug_fp = LoggerWriter(logger, logging.DEBUG)
- print('An INFO message', file=info_fp)
- print('A DEBUG message', file=debug_fp)
- if __name__ == "__main__":
- main()
当此脚本运行时,它将打印
- INFO:demo:An INFO message
- DEBUG:demo:A DEBUG message
你还可以使用 LoggerWriter
通过下面这样的做法来重定向 sys.stdout
和 sys.stderr
:
- import sys
- sys.stdout = LoggerWriter(logger, logging.INFO)
- sys.stderr = LoggerWriter(logger, logging.WARNING)
你应当在根据需要配置日志记录 之后 再这样做。 在上面的例子中,basicConfig()
调用执行了此操作(在 sys.stderr
被一个 LoggerWriter
实例覆盖 之前 使用它的值)。 然后,你将得到这样的结果:
- >>> print('Foo')
- INFO:demo:Foo
- >>> print('Bar', file=sys.stderr)
- WARNING:demo:Bar
- >>>
当然,上面的例子是按照 basicConfig()
所使用的格式来显示输出的,但你也可以在配置日志记录时使用其他的格式。
请注意当使用上面的预置方案时,你将在一定程度上受到你所拦截的写入调用的缓冲和顺序的控制。 例如,在使用上述 LoggerWriter
的定义的情况下,如果你使用代码段
- sys.stderr = LoggerWriter(logger, logging.WARNING)
- 1 / 0
则运行该脚本的结果为
- WARNING:demo:Traceback (most recent call last):
- WARNING:demo: File "homerunner/cookbook-loggerwriter/test.py", line 53, in <module>
- WARNING:demo:
- WARNING:demo:main()
- WARNING:demo: File "homerunner/cookbook-loggerwriter/test.py", line 49, in main
- WARNING:demo:
- WARNING:demo:1 / 0
- WARNING:demo:ZeroDivisionError
- WARNING:demo::
- WARNING:demo:division by zero
如你所见,这个输出并不很理想。 那是因为下层的写入 sys.stderr
的代码会执行多次写入,每次都将产生一条单独的日志记录行(例如,上面的最后三行)。 要避免这个问题,你需要使用缓冲并且只在看到换行符时才输出日志记录行。 让我们使用一个更好些的 LoggerWriter
实现:
- class BufferingLoggerWriter(LoggerWriter):
- def __init__(self, logger, level):
- super().__init__(logger, level)
- self.buffer = ''
- def write(self, message):
- if '\n' not in message:
- self.buffer += message
- else:
- parts = message.split('\n')
- if self.buffer:
- s = self.buffer + parts.pop(0)
- self.logger.log(self.level, s)
- self.buffer = parts.pop()
- for part in parts:
- self.logger.log(self.level, part)
这段代码对内容进行了缓冲直至遇到换行符,然后将完整的行写入日志记录。 通过这种方式,你将得到更好的输出:
- WARNING:demo:Traceback (most recent call last):
- WARNING:demo: File "homerunner/cookbook-loggerwriter/main.py", line 55, in <module>
- WARNING:demo: main()
- WARNING:demo: File "homerunner/cookbook-loggerwriter/main.py", line 52, in main
- WARNING:demo: 1/0
- WARNING:demo:ZeroDivisionError: division by zero
理应避免的用法
前几节虽介绍了几种方案,描述了可能需要处理的操作,但值得一提的是,有些用法是 没有好处 的,大多数情况下应该避免使用。下面几节的顺序不分先后。
多次打开同一个日志文件
因会导致 "文件被其他进程占用 "错误,所以在 Windows 中一般无法多次打开同一个文件。但在 POSIX 平台中,多次打开同一个文件不会报任何错误。这种操作可能是意外发生的,比如:
多次添加指向同一文件的 handler(比如通过复制/粘贴,或忘记修改)。
打开两个貌似不同(文件名不一样)的文件,但一个是另一个的符号链接,所以其实是同一个文件。
进程 fork,然后父进程和子进程都有对同一文件的引用。 例如,这可能是通过使用
multiprocessing
模块实现的。
在大多数情况下,多次打开同一个文件 貌似 一切正常,但实际会导致很多问题。
由于多个线程或进程会尝试写入同一个文件,日志输出可能会出现乱码。尽管日志对象可以防止多个线程同时使用同一个 handler 实例,但如果两个不同的线程使用不同的 handler 实例同时写入文件,而这两个 handler 又恰好指向同一个文件,那么就失去了这种防护。
尝试删除文件(例如在轮换日志文件时)会静默地失败,因为存在另一个指向它的引用。 这可能导致混乱并浪费调试时间 —— 日志条目会出现在意想不到的地方,或者完全丢失。 或者会有应当移除的文件仍然保持存在,文件还会在已经设置了基于文件大小的轮换的情况下仍然增长到预料之外的大小。
请用 从多个进程记录至单个文件 中介绍的技术来避免上述问题。
将日志对象用作属性或传递参数
虽然特殊情况下可能有必要如此,但一般来说没有意义,因为日志是单实例对象。代码总是可以通过 logging.getLogger(name)
用名称访问一个已有的日志对象实例,因此将实例作为参数来传递,或作为属性留存,都是毫无意义的。请注意,在其他语言中,如 Java 和 C#,日志对象通常是静态类属性。但在 Python 中是没有意义的,因为软件拆分的单位是模块(而不是类)。
为库中的日志记录器添加 NullHandler
以外的处理器
通过添加 handler、formatter 和 filter 来配置日志,这是应用程序开发人员的责任,而不是库开发人员该做的。如果正在维护一个库,请确保不要向任何日志对象添加 NullHandler
实例以外的 handler。
创建大量的日志对象
日志是单实例对象,在代码执行过程中不会被释放,因此创建大量的日志对象会占用很多内存,而这些内存又不能被释放。与其为每个文件或网络连接创建一个日志,还不如利用 已有机制 将上下文信息传给自定义日志对象,并将创建的日志对象限制在应用程序内的指定区域(通常是模块,但偶尔会再精细些)使用。
其他资源
参见
- 模块
logging
日志记录模块的 API 参考。
日志记录模块的配置 API 。
- 日志记录模块附带的有用处理器。