multiprocessing —- 基于进程的并行

源代码 Lib/multiprocessing/ [https://github.com/python/cpython/tree/3.13/Lib/multiprocessing/]


Availability: not Android, not iOS, not WASI.

此模块在 移动平台WebAssembly 平台 上不受支持。

概述

multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 POSIX 和 Windows 上均可运行。

multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool

  1. from multiprocessing import Pool
  2.  
  3. def f(x):
  4. return x*x
  5.  
  6. if __name__ == '__main__':
  7. with Pool(5) as p:
  8. print(p.map(f, [1, 2, 3]))

将在标准输出中打印

  1. [1, 4, 9]

参见

concurrent.futures.ProcessPoolExecutor 提供了一个更高层级的接口用来将任务推送到后台进程而不会阻塞调用方进程的执行。 与直接使用 Pool 接口相比,concurrent.futures API 能更好地允许将工作单元发往无需等待结果的下层进程池。

Process

multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。 Processthreading.Thread API 相同。 一个简单的多进程程序示例是:

  1. from multiprocessing import Process
  2.  
  3. def f(name):
  4. print('hello', name)
  5.  
  6. if __name__ == '__main__':
  7. p = Process(target=f, args=('bob',))
  8. p.start()
  9. p.join()

要显示所涉及的各个进程ID,这是一个扩展示例:

  1. from multiprocessing import Process
  2. import os
  3.  
  4. def info(title):
  5. print(title)
  6. print('module name:', __name__)
  7. print('parent process:', os.getppid())
  8. print('process id:', os.getpid())
  9.  
  10. def f(name):
  11. info('function f')
  12. print('hello', name)
  13.  
  14. if __name__ == '__main__':
  15. info('main line')
  16. p = Process(target=f, args=('bob',))
  17. p.start()
  18. p.join()

关于为什么 if __name__ == '__main__' 部分是必需的解释,请参见 编程指导

上下文和启动方法

根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法

spawn

父进程会启动一个新的 Python 解释器进程。 子进程将只继承那些运行进程对象的 run() 方法所必须的资源。 特别地,来自父进程的非必需文件描述符和句柄将不会被继承。 使用此方法启动进程相比使用 forkforkserver 要慢上许多。

在 POSIX 和 Windows 平台上可用。 默认在 Windows 和 macOS 上。

fork

父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。

在 POSIX 系统上可用。 目前在除 macOS 之外的 POSIX 上为默认值。

备注

在 Python 3.14 上默认的启动方法将不再为 fork。 需要 fork 的代码应当显式地通过 get_context()set_start_method() 来指定。

在 3.12 版本发生变更: 如果 Python 能够检测到你的进程有多个线程,那么在该启动方法内部调用 os.fork() 函数将引发 DeprecationWarning。 请使用其他启动方法。 进一步的解释参见 os.fork() 文档。

forkserver

当程序启动并选择 forkserver 启动方法时,将产生一个服务器进程。 从那时起,每当需要一个新进程时,父进程就会连接到该服务器并请求它分叉一个新进程。 分叉服务器进程是单线程的,除非因系统库或预加载导入的附带影响改变了这一点,因此使用 os.fork() 通常是安全的。 没有不必要的资源被继承。

在支持通过 Unix 管道传递文件描述符的 POSIX 平台上可用,例如 Linux。

在 3.4 版本发生变更: 在所有 POSIX 平台上添加了 spawn,并为某些 POSIX 平台添加了 forkserver。 在 Windows 上子进程将不再继承父进程的所有可继承句柄。

在 3.8 版本发生变更: 在 macOS 上,现在 spawn 启动方法是默认值。 由于 fork 启动方法可能因 macOS 系统库也许会启动线程导致子进程崩溃因而应当被视为是不安全的。 参见 bpo-33725 [https://bugs.python.org/issue?@action=redirect&bpo=33725]。

在 POSIX 上使用 spawnforkserver 启动方法将同时启动一个 资源追踪器 进程,负责追踪当前程序的进程产生的已取消连接的命名系统资源(如命名信号量或 SharedMemory 对象)。 当所有进程退出后资源追踪器会负责取消链接任何仍然被追踪的对象。 在通常情况下应该是没有此种对象的,但是如果一个子进程是被某个信号杀掉的则可能存在一些“泄露”的资源。 (泄露的信号量或共享的内存段不会被自动取消链接直到下一次重启。 对于这两种对象来说会有问题因为系统只允许有限数量的命名信号量,而共享的内存段在主内存中占用一些空间。)

要选择一个启动方法,你应该在主模块的 if __name__ == '__main__' 子句中调用 set_start_method() 。例如:

  1. import multiprocessing as mp
  2.  
  3. def foo(q):
  4. q.put('hello')
  5.  
  6. if __name__ == '__main__':
  7. mp.set_start_method('spawn')
  8. q = mp.Queue()
  9. p = mp.Process(target=foo, args=(q,))
  10. p.start()
  11. print(q.get())
  12. p.join()

在程序中 set_start_method() 不应该被多次调用。

或者,你可以使用 get_context() 来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法。:

  1. import multiprocessing as mp
  2.  
  3. def foo(q):
  4. q.put('hello')
  5.  
  6. if __name__ == '__main__':
  7. ctx = mp.get_context('spawn')
  8. q = ctx.Queue()
  9. p = ctx.Process(target=foo, args=(q,))
  10. p.start()
  11. print(q.get())
  12. p.join()

请注意,对象在不同上下文创建的进程间可能并不兼容。 特别是,使用 fork 上下文创建的锁不能传递给使用 spawnforkserver 启动方法启动的进程。

想要使用特定启动方法的库应该使用 get_context() 以避免干扰库用户的选择。

警告

'spawn''forkserver' 启动方法在 POSIX 系统上通常不能与“已冻结”可执行程序一同使用(例如由 PyInstallercx_Freeze 等软件包产生的二进制文件)。 如果代码没有使用线程则可以使用 'fork' 启动方法。

在进程之间交换对象

multiprocessing 支持进程之间的两种通信通道:

队列

Queue 类是一个近似 queue.Queue 的克隆。 例如:

  1. from multiprocessing import Process, Queue
  2.  
  3. def f(q):
  4. q.put([42, None, 'hello'])
  5.  
  6. if name == 'main':
  7. q = Queue()
  8. p = Process(target=f, args=(q,))
  9. p.start()
  10. print(q.get()) # 打印 "[42, None, 'hello']"
  11. p.join()

队列是线程和进程安全的。 任何放入 multiprocessing 队列的对象都将被序列化。

管道

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

  1. from multiprocessing import Process, Pipe
  2.  
  3. def f(conn):
  4. conn.send([42, None, 'hello'])
  5. conn.close()
  6.  
  7. if name == 'main':
  8. parent_conn, child_conn = Pipe()
  9. p = Process(target=f, args=(child_conn,))
  10. p.start()
  11. print(parent_conn.recv()) # 打印 "[42, None, 'hello']"
  12. p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send()recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

send() 方法将序列化对象而 recv() 将重新创建对象。

进程间同步

multiprocessing 包含来自 threading 的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

  1. from multiprocessing import Process, Lock
  2.  
  3. def f(l, i):
  4. l.acquire()
  5. try:
  6. print('hello world', i)
  7. finally:
  8. l.release()
  9.  
  10. if __name__ == '__main__':
  11. lock = Lock()
  12.  
  13. for num in range(10):
  14. Process(target=f, args=(lock, num)).start()

不使用锁的情况下,来自于多进程的输出很容易产生混淆。

进程间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。例如,以下代码:

  1. from multiprocessing import Process, Value, Array
  2.  
  3. def f(n, a):
  4. n.value = 3.1415927
  5. for i in range(len(a)):
  6. a[i] = -a[i]
  7.  
  8. if name == 'main':
  9. num = Value('d', 0.0)
  10. arr = Array('i', range(10))
  11.  
  12. p = Process(target=f, args=(num, arr))
  13. p.start()
  14. p.join()
  15.  
  16. print(num.value)
  17. print(arr[:])

将打印

  1. 3.1415927
  2. [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务进程

Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型: listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 。例如

  1. from multiprocessing import Process, Manager
  2.  
  3. def f(d, l):
  4. d[1] = '1'
  5. d['2'] = 2
  6. d[0.25] = None
  7. l.reverse()
  8.  
  9. if name == 'main':
  10. with Manager() as manager:
  11. d = manager.dict()
  12. l = manager.list(range(10))
  13.  
  14. p = Process(target=f, args=(d, l))
  15. p.start()
  16. p.join()
  17.  
  18. print(d)
  19. print(l)

将打印

  1. {0.25: None, 1: '1', '2': 2}
  2. [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

使用工作进程

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

例如:

  1. from multiprocessing import Pool, TimeoutError
  2. import time
  3. import os
  4.  
  5. def f(x):
  6. return x*x
  7.  
  8. if __name__ == '__main__':
  9. # 启动 4 个工作进程
  10. with Pool(processes=4) as pool:
  11.  
  12. # 打印 "[0, 1, 4,..., 81]"
  13. print(pool.map(f, range(10)))
  14.  
  15. # 以任意顺序打印同样的数字
  16. for i in pool.imap_unordered(f, range(10)):
  17. print(i)
  18.  
  19. # 异步地对 "f(20)" 求值
  20. res = pool.apply_async(f, (20,)) # runs in only one process
  21. print(res.get(timeout=1)) # prints "400"
  22.  
  23. # 异步地对 "os.getpid()" 求值
  24. res = pool.apply_async(os.getpid, ()) # 仅在 一个进程中运行
  25. print(res.get(timeout=1)) # 打印进程的 PID
  26.  
  27. # 异步地进行多次求值 可能 会使用更多进程
  28. multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
  29. print([res.get(timeout=1) for res in multiple_results])
  30.  
  31. # 让一个工作进程休眠 10 秒
  32. res = pool.apply_async(time.sleep, (10,))
  33. try:
  34. print(res.get(timeout=1))
  35. except TimeoutError:
  36. print("We lacked patience and got a multiprocessing.TimeoutError")
  37.  
  38. print("For the moment, the pool remains available for more work")
  39.  
  40. # 退出 'with' 代码块将停止进程池
  41. print("Now the pool is closed and no longer available")

请注意,进程池的方法只能由创建它的进程使用。

备注

这个包中的功能要求子进程可以导入 __main__ 模块。虽然这在 编程指导 中有描述,但还是需要提前说明一下。这意味着一些示例在交互式解释器中不起作用,比如 multiprocessing.pool.Pool 示例。例如:

  1. >>> from multiprocessing import Pool
  2. >>> p = Pool(5)
  3. >>> def f(x):
  4. ... return x*x
  5. ...
  6. >>> with p:
  7. ... p.map(f, [1,2,3])
  8. Process PoolWorker-1:
  9. Process PoolWorker-2:
  10. Process PoolWorker-3:
  11. Traceback (most recent call last):
  12. Traceback (most recent call last):
  13. Traceback (most recent call last):
  14. AttributeError: Can't get attribute 'f' on <module '__main__' (<class 'frozenimportlib.BuiltinImporter'>)>
  15. AttributeError: Can't get attribute 'f' on <module '__main__' (<class 'frozenimportlib.BuiltinImporter'>)>
  16. AttributeError: Can't get attribute 'f' on <module '__main__' (<class 'frozenimportlib.BuiltinImporter'>)>

(如果尝试执行上面的代码,它会以一种半随机的方式将三个完整的堆栈内容交替输出,然后你只能以某种方式停止父进程。)

参考

multiprocessing 包主要复制了 threading 模块的API。

Process 和异常

  • class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 进程对象表示在单独进程中运行的活动。 Process 类拥有和 threading.Thread 等价的大部分方法。

构造器被调用时应当总是传入关键字参数。 group 应当始终为 None;它的存在仅是为了与 threading.Thread 兼容。 target 是由 run() 方法来唤起的可调用对象。 它的默认值为 None,表示不调用任何东西。 name 是进程名称(请参阅 name 了解详情)。 args 是针对目标调用的参数元组。 kwargs 是针对目标调用的关键字参数字典。 如果提供,则仅限关键字参数 daemon 会将进程的 daemon 旗标设为 TrueFalse。 如果为 None (默认值),则该旗标将从创建方进程继承。

在默认情况下,不会将任何参数传递给 target。 args 参数默认值为 (),可被用来指定要传递给 target 的参数列表或元组。

如果子类重写构造函数,它必须确保它在对进程执行任何其他操作之前调用基类构造函数( Process.__init__() )。

在 3.3 版本发生变更: 增加了 daemon 形参。

  • run()
  • 表示进程活动的方法。

你可以在子类中重写此方法。标准 run() 方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 args 和 kwargs 参数中获取顺序和关键字参数。

使用列表或元组作为传给 Process 的 args 参数可以达成同样的效果。

示例:

  1. >>> from multiprocessing import Process
  2. >>> p = Process(target=print, args=[1])
  3. >>> p.run()
  4. 1
  5. >>> p = Process(target=print, args=(1,))
  6. >>> p.run()
  7. 1
  • start()
  • 启动进程活动。

这个方法每个进程对象最多只能调用一次。它会将对象的 run() 方法安排在一个单独的进程中调用。

  • join([timeout])
  • 如果可选参数 timeout 是 None (默认值),则该方法将阻塞,直到调用 join() 方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回 None 。检查进程的 exitcode 以确定它是否终止。

一个进程可以被 join 多次。

进程无法join自身,因为这会导致死锁。尝试在启动进程之前join进程是错误的。

  • name
  • 进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。

初始名称由构造器设定。 如果没有为构造器提供显式名称,则会构造一个形式为 'Process-N1:N2:…:Nk' 的名称,其中每个 Nk 是其父亲的第 N 个孩子。

  • is_alive()
  • 返回进程是否还活着。

粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。

  • daemon
  • 进程的守护标志,一个布尔值。这必须在 start() 被调用之前设置。

初始值继承自创建进程。

当进程退出时,它会尝试终止其所有守护进程子进程。

请注意,不允许在守护进程中创建子进程。这是因为当守护进程由于父进程退出而中断时,其子进程会变成孤儿进程。 另外,这些 不是 Unix 守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。

除了 threading.Thread API ,Process 对象还支持以下属性和方法:

  • pid
  • 返回进程ID。在生成该进程之前,这将是 None

  • exitcode

  • 子进程的退出代码。如果该进程尚未终止则为 None

如果子进程的 run() 方法正常返回,退出代码将是 0 。 如果它通过 sys.exit() 终止,并有一个整数参数 N ,退出代码将是 N 。

如果子进程由于在 run() 内的未捕获异常而终止,退出代码将是 1 。 如果它是由信号 N 终止的,退出代码将是负值 -N 。

  • authkey
  • 进程的身份验证密钥(字节字符串)。

multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串。

当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管可以通过将 authkey 设置为另一个字节字符串来更改。

参见 认证密码

  • sentinel
  • 系统对象的数字句柄,当进程结束时将变为 "ready" 。

如果你想使用 multiprocessing.connection.wait() 来一次等待多个事件则可以使用此值。 在其他情况下调用 join() 更为简单。

在 Windows 上,这是一个可以与 WaitForSingleObjectWaitForMultipleObjects API 调用族一起使用的 OS 句柄。 在 POSIX 上,这是一个可以与来自 select 模块的原语一起使用的文件描述符。

Added in version 3.3.

  • terminate()
  • 终结进程。 在 POSIX 上这是使用 SIGTERM 信号来完成的;在 Windows 上则会使用 TerminateProcess()。 请注意 exit 处理器和 finally 子句等将不会被执行。

请注意,进程的后代进程将不会被终止 —— 它们将简单地变成孤立的。

警告

如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。

  • kill()
  • terminate() 相同但在 POSIX 上将使用 SIGKILL 信号。

Added in version 3.7.

  • close()
  • 关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError 。一旦 close() 成功返回, Process 对象的大多数其他方法和属性将引发 ValueError

Added in version 3.7.

注意 start()join()is_alive()terminate()exitcode 方法只能由创建进程对象的进程调用。

Process 一些方法的示例用法:

  1. >>> import multiprocessing, time, signal
  2. >>> mp_context = multiprocessing.get_context('spawn')
  3. >>> p = mp_context.Process(target=time.sleep, args=(1000,))
  4. >>> print(p, p.is_alive())
  5. <...Process ... initial> False
  6. >>> p.start()
  7. >>> print(p, p.is_alive())
  8. <...Process ... started> True
  9. >>> p.terminate()
  10. >>> time.sleep(0.1)
  11. >>> print(p, p.is_alive())
  12. <...Process ... stopped exitcode=-SIGTERM> False
  13. >>> p.exitcode == -signal.SIGTERM
  14. True
  • exception multiprocessing.ProcessError
  • 所有 multiprocessing 异常的基类。
  • exception multiprocessing.BufferTooShort
  • 当所提供的缓冲区对象太小而无法读取消息时由 Connection.recv_bytes_into() 引发的异常。

如果 e 是一个 BufferTooShort 实例,那么 e.args[0] 将把消息作为字节字符串给出。

  • exception multiprocessing.AuthenticationError
  • 出现身份验证错误时引发。
  • exception multiprocessing.TimeoutError
  • 有超时的方法超时时引发。

管道和队列

使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用同步原语,例如锁。

消息机制包含: Pipe() (可以用于在两个进程间传递消息),以及队列(能够在多个生产者和消费者之间通信)。

Queue, SimpleQueue 以及 JoinableQueue 都是多生产者,多消费者,并且实现了 FIFO 的队列类型,其表现与标准库中的 queue.Queue 类相似。 不同之处在于 Queue 缺少标准库的 queue.Queue 从 Python 2.5 开始引入的 task_done()join() 方法。

如果你使用了 JoinableQueue ,那么你 必须 对每个已经移出队列的任务调用 JoinableQueue.task_done()。 不然的话用于统计未完成任务的信号量最终会溢出并抛出异常。

与其他 Python 队列实现的区别之一,在于 multiprocessing 队列会使用 pickle 来序列化所有被放入的对象。 由获取方法所返回的对象是重新创建的对象,它不会与原始对象共享内存。

另外还可以通过使用一个管理器对象创建一个共享队列,详见 管理器

备注

multiprocessing 使用了普通的 queue.Emptyqueue.Full 异常去表示超时。 你需要从 queue 中导入它们,因为它们并不在 multiprocessing 的命名空间中。

备注

当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用 pickle 序列化,并将序列化后的数据通过一个底层管道的管道传递到队列中。 这种做法会有点让人惊讶,但一般不会出现什么问题。 如果它们确实妨碍了你,你可以使用一个由管理器 manager 创建的队列替换它。

  • 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty() 才会返回 False 。而 get_nowait() 可以不抛出 queue.Empty 直接返回。

  • 如果有多个进程同时将对象放入队列,那么在队列的另一端接受到的对象可能是无序的。但是由同一个进程放入的多个对象的顺序在另一端输出时总是一样的。

警告

如果一个进程在尝试使用 Queue 期间被 Process.terminate()os.kill() 调用终止了,那么队列中的数据很可能被破坏。 这可能导致其他进程在尝试使用该队列时发生异常。

警告

正如刚才提到的,如果一个子进程将一些对象放进队列中 (并且它没有用 JoinableQueue.cancel_join_thread 方法),那么这个进程在所有缓冲区的对象被刷新进管道之前,是不会终止的。

这意味着,除非你确定所有放入队列中的对象都已经被消费了,否则如果你试图等待这个进程,你可能会陷入死锁中。相似地,如果该子进程不是后台进程,那么父进程可能在试图等待所有非后台进程退出时挂起。

注意用管理器创建的队列不存在这个问题,详见 编程指导

例子 展示了如何使用队列实现进程间通信。

  • multiprocessing.Pipe([duplex])
  • 返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端。

如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。

The send() 方法将使用 pickle 来序列化对象而 recv() 将重新创建对象。

  • class multiprocessing.Queue([maxsize])
  • 返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。

一旦超时,将抛出标准库 queue 模块中常见的异常 queue.Emptyqueue.Full

除了 task_done()join() 之外,Queue 实现了标准库类 queue.Queue 中所有的方法。

  • qsize()
  • 返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。

请注意这可能会在未实现 sem_getvalue() 的平台如 macOS 上引发 NotImplementedError

  • empty()
  • 如果队列是空的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。

在已关闭的队列上可能会引发 OSError。 (但不保证如此)

  • full()
  • 如果队列是满的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。

  • put(obj[, block[, timeout]])

  • 将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。

在 3.8 版本发生变更: 如果队列已经关闭,会抛出 ValueError 而不是 AssertionError

  • put_nowait(obj)
  • 相当于 put(obj, False)

  • get([block[, timeout]])

  • 从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。

在 3.8 版本发生变更: 如果队列已经关闭,会抛出 ValueError 而不是 OSError

  • get_nowait()
  • 相当于 get(False)

multiprocessing.Queue 类有一些在 queue.Queue 类中没有出现的方法。这些方法在大多数情形下并不是必须的。

  • close()
  • 指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用。

  • join_thread()

  • 等待后台线程。这个方法仅在调用了 close() 方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中。

默认情况下,如果一个不是队列创建者的进程试图退出,它会尝试等待这个队列的后台线程。这个进程可以使用 cancel_join_thread()join_thread() 方法什么都不做直接跳过。

  • cancel_join_thread()
  • 防止 join_thread() 方法阻塞当前进程。具体而言,这防止进程退出时自动等待后台线程退出。详见 join_thread()

这个方法更好的名字可能是 allow_exit_without_flush()。 这可能会导致已排入队列的数据丢失,几乎可以肯定你将不需要用到这个方法。 实际上它仅适用于当你需要当前进程立即退出而不必等待将已排入的队列更新到下层管道,并且你不担心丢失数据的时候。

备注

该类的功能依赖于宿主操作系统具有可用的共享信号量实现。否则该类将被禁用,任何试图实例化一个 Queue 对象的操作都会抛出 ImportError 异常,更多信息详见 bpo-3770 [https://bugs.python.org/issue?@action=redirect&bpo=3770] 。后续说明的任何专用队列对象亦如此。

  • class multiprocessing.SimpleQueue
  • 这是一个简化的 Queue 类的实现,很像带锁的 Pipe

    • close()
    • 关闭队列:释放内部资源。

队列在被关闭后就不可再被使用。 例如不可再调用 get(), put()empty() 等方法。

Added in version 3.9.

  • empty()
  • 如果队列为空返回 True ,否则返回 False

如果 SimpleQueue 已关闭则总是会引发 OSError

  • get()
  • 从队列中移出并返回一个对象。

  • put(item)

  • 将 item 放入队列。
  • class multiprocessing.JoinableQueue([maxsize])
  • JoinableQueue 类是 Queue 的子类,额外添加了 task_done()join() 方法。

    • task_done()
    • 指出之前进入队列的任务已经完成。由队列的消费者进程使用。对于每次调用 get() 获取的任务,执行完成后调用 task_done() 告诉队列该任务已经处理完成。

如果 join() 方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用 put() 放进队列中的所有对象都已经返回了对应的 task_done() ) 。

如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。

  • join()
  • 阻塞至队列中所有的元素都被接收和处理完毕。

当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。

杂项

  • multiprocessing.active_children()
  • 返回当前进程存活的子进程的列表。

调用该方法有“等待”已经结束的进程的副作用。

  • multiprocessing.cpu_count()
  • 返回系统的CPU数量。

该数值不等于当前进程可使用的 CPU 数量。 可用的 CPU 数量可以通过 os.process_cpu_count() (或 len(os.sched_getaffinity(0))) 获得。

当 CPU 的数量无法确定时,会引发 NotImplementedError

参见

os.cpu_count() os.process_cpu_count()

在 3.13 版本发生变更: 返回值也可使用 -X cpu_count 旗标或 PYTHON_CPU_COUNT 来覆盖因为这只是一个针对 os cpu count API 的包装器。

  • multiprocessing.current_process()
  • 返回与当前进程相对应的 Process 对象。

threading.current_thread() 相同。

  • multiprocessing.parent_process()
  • 返回父进程 Process 对象,和父进程调用 current_process() 返回的对象一样。如果一个进程已经是主进程, parent_process 会返回 None.

Added in version 3.8.

  • multiprocessing.freeze_support()
  • 为使用了 multiprocessing 的程序,提供冻结以产生 Windows 可执行文件的支持。(在 py2exe, PyInstaller 和 cx_Freeze 上测试通过)

需要在 main 模块的 if __name__ == '__main__' 该行之后马上调用该函数。例如:

  1. from multiprocessing import Process, freeze_support
  2.  
  3. def f():
  4. print('hello world!')
  5.  
  6. if __name__ == '__main__':
  7. freeze_support()
  8. Process(target=f).start()

如果没有调用 freeze_support() 在尝试运行被冻结的可执行文件时会抛出 RuntimeError 异常。

freeze_support() 的调用在非 Windows 平台上是无效的。如果该模块在 Windows 平台的 Python 解释器中正常运行 (该程序没有被冻结), 调用 freeze_support() 也是无效的。

  • multiprocessing.get_all_start_methods()
  • 返回由受支持的启动方法组成的列表,其中第一项将为默认值。 可用的启动方法有 'fork', 'spawn''forkserver'。 并非所有的平台都支持所有的方法。 参见 上下文和启动方法

Added in version 3.4.

  • multiprocessing.get_context(method=None)
  • 返回一个 Context 对象。该对象具有和 multiprocessing 模块相同的API。

如果 method 为 None 则将返回默认的上下文。 否则 method 应为 'fork', 'spawn', 'forkserver'。 如果指定的启动方法不可用则将引发 ValueError。 参见 上下文和启动方法

Added in version 3.4.

  • multiprocessing.get_start_method(allow_none=False)
  • 返回启动进程时使用的启动方法名。

如果启动方法已经固定,并且 allow_none 被设置成 False ,那么启动方法将被固定为默认的启动方法,并且返回其方法名。如果启动方法没有设定,并且 allow_none 被设置成 True ,那么将返回 None

返回值可以为 'fork', 'spawn', 'forkserver'None。 参见 上下文和启动方法

Added in version 3.4.

在 3.8 版本发生变更: 对于 macOS,spawn 启动方式是默认方式。 因为 fork 可能导致subprocess崩溃,被认为是不安全的,查看 bpo-33725 [https://bugs.python.org/issue?@action=redirect&bpo=33725] 。

  • multiprocessing.set_executable(executable)
  • 设置在启动子进程时使用的 Python 解释器路径。 ( 默认使用 sys.executable ) 嵌入式编程人员可能需要这样做:
  1. set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

以使他们可以创建子进程。

在 3.4 版本发生变更: 当使用 'spawn' 启动方法时在 POSIX 上受到支持。

在 3.11 版本发生变更: 接受一个 path-like object

  • multiprocessing.set_forkserver_preload(module_names)
  • 为 forkserver 主进程设置一个可尝试导入的模块名称列表以使得它们已导入的状态被分叉进程所继承。 当执行操作时引发的任何 ImportError 会被静默地忽略。 这可被用作一种性能增强措施以避免在每个进程中的重复操作。

要让此方法发挥作用,它必须在 forkserver 进程执行之前被调用(在创建 Pool 或启动 Process 之前)。

仅在使用 'forkserver' 启动方法时是有意义的。 参见 上下文和启动方法

Added in version 3.4.

  • multiprocessing.set_start_method(method, force=False)
  • 设置应当被用于启动子进程的方法。 method 方法可以为 'fork', 'spawn''forkserver'。 如果启动方法已经设置且 force 不为 True 则会引发 RuntimeError。 如果 method 为 None 而 force 为 True 则启动方法会被设为 None。 如果 method 为 None 而 force 为 False 则上下文会被设为默认的上下文。

注意这最多只能调用一次,并且需要藏在 main 模块中,由 if __name__ == '__main__' 保护着。

参见 上下文和启动方法

Added in version 3.4.

备注

multiprocessing 并没有包含类似 threading.active_count() , threading.enumerate() , threading.settrace() , threading.setprofile(), threading.Timer , 或者 threading.local 的方法和类。

连接对象(Connection)

Connection 对象允许收发可以序列化的对象或字符串。它们可以看作面向消息的连接套接字。

通常使用 Pipe 创建 Connection 对象。详见 : 监听器及客户端.

  • class multiprocessing.connection.Connection
    • send(obj)
    • 将一个对象发送到连接的另一端,可以用 recv() 读取。

发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发 ValueError 异常。

  • recv()
  • 返回一个由另一端使用 send() 发送的对象。该方法会一直阻塞直到接收到对象。 如果对端关闭了连接或者没有东西可接收,将抛出 EOFError 异常。

  • fileno()

  • 返回由连接对象使用的描述符或者句柄。

  • close()

  • 关闭连接对象。

当连接对象被垃圾回收时会自动调用。

  • poll([timeout])
  • 返回连接对象中是否有可以读取的数据。

如果未指定 timeout ,此方法会马上返回。如果 timeout 是一个数字,则指定了最大阻塞的秒数。如果 timeout 是 None,那么将一直等待,不会超时。

注意通过使用 multiprocessing.connection.wait() 可以一次轮询多个连接对象。

  • send_bytes(buffer[, offset[, size]])
  • 从一个 bytes-like object 对象中取出字节数组并作为一条完整消息发送。

如果由 offset 给定了在 buffer 中读取数据的位置。 如果给定了 size ,那么将会从缓冲区中读取多个字节。 过大的缓冲区 ( 接近 32MiB+ ,此值依赖于操作系统 ) 有可能引发 ValueError 异常。

  • recv_bytes([maxlength])
  • 以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出 EOFError 异常。

如果给定了 maxlength 并且消息长于 maxlength 那么将抛出 OSError 并且该连接对象将不再可读。

在 3.3 版本发生变更: 曾经该函数抛出 IOError ,现在这是 OSError 的别名。

  • recv_bytes_into(buffer[, offset])
  • 将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出 EOFError 异常。

buffer must be a writable bytes-like object. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).

如果缓冲区太小,则将引发 BufferTooShort 异常,并且完整的消息将会存放在异常实例 ee.args[0] 中。

在 3.3 版本发生变更: 现在连接对象自身可以通过 Connection.send()Connection.recv() 在进程之间传递。

连接对象现在支持上下文管理协议 — 参见 上下文管理器类型__enter__() 返回连接对象,而 __exit__() 将调用 close()

例如:

  1. >>> from multiprocessing import Pipe
  2. >>> a, b = Pipe()
  3. >>> a.send([1, 'hello', None])
  4. >>> b.recv()
  5. [1, 'hello', None]
  6. >>> b.send_bytes(b'thank you')
  7. >>> a.recv_bytes()
  8. b'thank you'
  9. >>> import array
  10. >>> arr1 = array.array('i', range(5))
  11. >>> arr2 = array.array('i', [0] * 10)
  12. >>> a.send_bytes(arr1)
  13. >>> count = b.recv_bytes_into(arr2)
  14. >>> assert count == len(arr1) * arr1.itemsize
  15. >>> arr2
  16. array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() 方法会自动解封它收到的数据,除非你能够信任发送消息的进程,否则此处可能有安全风险。

因此, 除非连接对象是由 Pipe() 产生的,否则你应该仅在使用了某种认证手段之后才使用 recv()send() 方法。 参考 认证密码

警告

如果一个进程在试图读写管道时被终止了,那么管道中的数据很可能是不完整的,因为此时可能无法确定消息的边界。

同步原语

通常来说同步原语在多进程环境中并不像它们在多线程环境中那么必要。参考 threading 模块的文档。

注意可以使用管理器对象创建同步原语,参考 管理器

  • class multiprocessing.Barrier(text-parties[, action[, timeout]])
  • 类似 threading.Barrier 的栅栏对象。

Added in version 3.3.

一个小小的不同在于,它的 acquire 方法的第一个参数名是和 Lock.acquire() 一样的 block 。

备注

在 macOS 平台上, 该对象于 Semaphore 不同在于 sem_getvalue() 方法并没有在该平台上实现。

指定的 lock 参数应该是 multiprocessing 模块中的 Lock 或者 RLock 对象。

在 3.3 版本发生变更: 新增了 wait_for() 方法。

  • class multiprocessing.Lock
  • 原始锁(非递归锁)对象,类似于 threading.Lock 。一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。任何进程或线程都可以释放锁。除非另有说明,否则 multiprocessing.Lock 用于进程或者线程的概念和行为都和 threading.Lock 一致。

注意 Lock 实际上是一个工厂函数。它返回由默认上下文初始化的 multiprocessing.synchronize.Lock 对象。

Lock supports the context manager protocol and thus may be used in with statements.

  • acquire(block=True, timeout=None)
  • 可以阻塞或非阻塞地获得锁。

如果 block 参数被设为 True ( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回 True 。需要注意的是第一个参数名与 threading.Lock.acquire() 的不同。

如果 block 参数被设置成 False ,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回 False ; 否则将锁设置成锁住状态,并返回 True

当 timeout 是一个正浮点数时,会在等待锁的过程中最多阻塞等待 timeout 秒,当 timeout 是负数时,效果和 timeout 为0时一样,当 timeout 是 None (默认值)时,等待时间是无限长。需要注意的是,对于 timeout 参数是负数和 None 的情况, 其行为与 threading.Lock.acquire() 是不一样的。当 block 参数 为 False 时, timeout 并没有实际用处,会直接忽略。否则,函数会在拿到锁后返回 True 或者 超时没拿到锁后返回 False

  • release()
  • 释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。

当尝试释放一个没有被持有的锁时,会抛出 ValueError 异常,除此之外其行为与 threading.Lock.release() 一样。

  • class multiprocessing.RLock
  • 递归锁对象: 类似于 threading.RLock 。递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。

注意 RLock 是一个工厂函数,调用后返回一个使用默认 context 初始化的 multiprocessing.synchronize.RLock 实例。

RLock 支持 context manager 协议,因此可在 with 语句内使用。

  • acquire(block=True, timeout=None)
  • 可以阻塞或非阻塞地获得锁。

当 block 参数设置为 True 时,会一直阻塞直到锁处于空闲状态(没有被任何进程、线程拥有),除非当前进程或线程已经拥有了这把锁。然后当前进程/线程会持有这把锁(在锁没有其他持有者的情况下),锁内的递归等级加一,并返回 True . 注意, 这个函数第一个参数的行为和 threading.RLock.acquire() 的实现有几个不同点,包括参数名本身。

当 block 参数是 False , 将不会阻塞,如果此时锁被其他进程或者线程持有,当前进程、线程获取锁操作失败,锁的递归等级也不会改变,函数返回 False , 如果当前锁已经处于释放状态,则当前进程、线程则会拿到锁,并且锁内的递归等级加一,函数返回 True

timeout 参数的使用方法及行为与 Lock.acquire() 一样。但是要注意 timeout 的其中一些行为和 threading.RLock.acquire() 中实现的行为是不同的。

  • release()
  • 释放锁,使锁内的递归等级减一。如果释放后锁内的递归等级降低为0,则会重置锁的状态为释放状态(即没有被任何进程、线程持有),重置后如果有有其他进程和线程在等待这把锁,他们中的一个会获得这个锁而继续运行。如果释放后锁内的递归等级还没到达0,则这个锁仍将保持未释放状态且当前进程和线程仍然是持有者。

只有当前进程或线程是锁的持有者时,才允许调用这个方法。如果当前进程或线程不是这个锁的拥有者,或者这个锁处于已释放的状态(即没有任何拥有者),调用这个方法会抛出 AssertionError 异常。注意这里抛出的异常类型和 threading.RLock.release() 中实现的行为不一样。

  • class multiprocessing.Semaphore([value])
  • 一种信号量对象: 类似于 threading.Semaphore.

一个小小的不同在于,它的 acquire 方法的第一个参数名是和 Lock.acquire() 一样的 block 。

备注

在 macOS 上,不支持 sem_timedwait ,所以,调用 acquire() 时如果使用 timeout 参数,会通过循环sleep来模拟这个函数的行为。

备注

这个包的某些功能依赖于宿主机系统的共享信号量的实现,如果系统没有这个特性, multiprocessing.synchronize 会被禁用,尝试导入这个模块会引发 ImportError 异常,详细信息请查看 bpo-3770 [https://bugs.python.org/issue?@action=redirect&bpo=3770] 。

共享 ctypes 对象

在共享内存上创建可被子进程继承的共享对象时是可行的。

  • multiprocessing.Value(typecode_or_type, *args, lock=True)
  • 返回一个从共享内存上创建的 ctypes 对象。默认情况下返回的对象实际上是经过了同步器包装过的。可以通过 Value 的 value 属性访问这个对象本身。

typecode_or_type 指明了返回的对象类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 *args 会透传给这个类的构造函数。

如果 lock 参数是 True (默认值), 将会新建一个递归锁用于同步对于此值的访问操作。 如果 lock 是 Lock 或者 RLock 对象,那么这个传入的锁将会用于同步对这个值的访问操作,如果 lock 是 False , 那么对这个对象的访问将没有锁保护,也就是说这个变量不是进程安全的。

诸如 += 这类的操作会引发独立的读操作和写操作,也就是说这类操作符并不具有原子性。所以,如果你想让递增共享变量的操作具有原子性,仅仅以这样的方式并不能达到要求:

  1. counter.value += 1

共享对象内部关联的锁是递归锁(默认情况下就是)的情况下, 你可以采用这种方式

  1. with counter.get_lock():
  2. counter.value += 1

注意 lock 只能是命名参数。

  • multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
  • 从共享内存中申请并返回一个具有ctypes类型的数组对象。默认情况下返回值实际上是被同步器包装过的数组对象。

typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。

如果 lock 为 True (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lock 为 False 则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。

请注意 lock 是一个仅限关键字参数。

请注意 ctypes.c_char 的数组具有 value 和 raw 属性,允许被用来保存和提取字符串。