multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供了一些函数,用于分配来自共享内存的、可被子进程继承的 ctypes 对象。

备注

虽然可以将指针存储在共享内存中,但请记住它所引用的是特定进程地址空间中的位置。 而且,指针很可能在第二个进程的上下文中无效,尝试从第二个进程对指针进行解引用可能会导致崩溃。

  • multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
  • 从共享内存中申请并返回一个 ctypes 数组。

typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中使用的类型字符。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。

注意对元素的访问、赋值操作可能是非原子操作 - 使用 Array() , 从而借助其中的锁保证操作的原子性。

  • multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
  • 从共享内存中申请并返回一个 ctypes 对象。

typecode_or_type 指明了返回的对象类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 *args 会透传给这个类的构造函数。

注意对 value 的访问、赋值操作可能是非原子操作 - 使用 Value() ,从而借助其中的锁保证操作的原子性。

请注意 ctypes.c_char 的数组具有 value 和 raw 属性,允许被用来保存和提取字符串 - 请查看 ctypes 文档。

  • multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
  • 返回一个纯 ctypes 数组, 或者在此之上经过同步器包装过的进程安全的对象,这取决于 lock 参数的值,除此之外,和 RawArray() 一样。

如果 lock 为 True (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lock 为 False 则对所返回对象的访问将不会自动得到锁的保护,也就是说它将不是“进程安全的”。

注意 lock 只能是命名参数。

  • multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
  • 返回一个纯 ctypes 数组, 或者在此之上经过同步器包装过的进程安全的对象,这取决于 lock 参数的值,除此之外,和 RawArray() 一样。

如果 lock 为 True (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lock 为 False 则对所返回对象的访问将不会自动得到锁的保护,也就是说它将不是“进程安全的”。

注意 lock 只能是命名参数。

  • multiprocessing.sharedctypes.copy(obj)
  • 从共享内存中申请一片空间将 ctypes 对象 obj 过来,然后返回一个新的 ctypes 对象。
  • multiprocessing.sharedctypes.synchronized(obj[, lock])
  • 将一个 ctypes 对象包装为进程安全的对象并返回,使用 lock 同步对于它的操作。如果 lock 是 None (默认值) ,则会自动创建一个 multiprocessing.RLock 对象。

同步器包装后的对象会在原有对象基础上额外增加两个方法: get_obj() 返回被包装的对象, get_lock() 返回内部用于同步的锁。

需要注意的是,访问包装后的ctypes对象会比直接访问原来的纯 ctypes 对象慢得多。

在 3.5 版本发生变更: 同步器包装后的对象支持 context manager 协议。

下面的表格对比了创建普通ctypes对象和基于共享内存上创建共享ctypes对象的语法。(表格中的 MyStructctypes.Structure 的子类)

ctypes 使用类型的共享ctypes 使用 typecode 的共享 ctypes
c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4)
MyStruct(4, 6) RawValue(MyStruct, 4, 6)
(c_short 7)() RawArray(c_short, 7) RawArray('h', 7)
(c_int 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))

下面是一个在子进程中修改多个ctypes对象的例子。

  1. from multiprocessing import Process, Lock
  2. from multiprocessing.sharedctypes import Value, Array
  3. from ctypes import Structure, c_double
  4.  
  5. class Point(Structure):
  6. fields = [('x', c_double), ('y', c_double)]
  7.  
  8. def modify(n, x, s, A):
  9. n.value **= 2
  10. x.value **= 2
  11. s.value = s.value.upper()
  12. for a in A:
  13. a.x **= 2
  14. a.y **= 2
  15.  
  16. if __name__ == '__main__':
  17. lock = Lock()
  18.  
  19. n = Value('i', 7)
  20. x = Value(c_double, 1.0/3.0, lock=False)
  21. s = Array('c', b'hello world', lock=lock)
  22. A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
  23.  
  24. p = Process(target=modify, args=(n, x, s, A))
  25. p.start()
  26. p.join()
  27.  
  28. print(n.value)
  29. print(x.value)
  30. print(s.value)
  31. print([(a.x, a.y) for a in A])

输出如下

  1. 49
  2. 0.1111111111111111
  3. HELLO WORLD
  4. [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。

  • multiprocessing.Manager()
  • 返回一个已启动的 SyncManager 管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已经启动的子进程,并且拥有一系列方法可以用于创建共享对象、返回对应的代理。

当管理器被垃圾回收或者父进程退出时,管理器进程会立即退出。管理器类定义在 multiprocessing.managers 模块:

  • class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
  • 创建一个 BaseManager 对象。

一旦创建,应该及时调用 start() 或者 get_server().serve_forever() 以确保管理器对象对应的管理进程已经启动。

address 是管理器服务进程监听的地址。如果 address 是 None ,则允许和任意主机的请求建立连接。

authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None, 则会使用 current_process().authkey , 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。

serializer 必须为 'pickle' (使用 pickle 序列化) 或 'xmlrpclib' (使用 xmlrpc.client 序列化)。

ctx 是一个上下文对象,或者为 None (使用当前上下文)。 参见 get_context() 函数。

shutdown_timeout 是用于等待直到 shutdown() 方法中的管理器所使用的进程结束的超时秒数。 如果关闭超时,进程将被终结。 如果终结进程的操作也超时,进程将被杀掉。

在 3.11 版本发生变更: 添加了 shutdown_timeout 形参。

  • start([initializer[, initargs]])
  • 为管理器开启一个子进程,如果 initializer 不是 None , 子进程在启动时将会调用 initializer(*initargs)

  • get_server()

  • 返回一个 Server 对象,它是管理器在后台控制的真实的服务。 Server 对象拥有 serve_forever() 方法。
  1. >>> from multiprocessing.managers import BaseManager
  2. >>> manager = BaseManager(address=('', 50000), authkey=b'abc')
  3. >>> server = manager.get_server()
  4. >>> server.serve_forever()

Server 额外拥有一个 address 属性。

  • connect()
  • 将本地管理器对象连接到一个远程管理器进程:
  1. >>> from multiprocessing.managers import BaseManager
  2. >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
  3. >>> m.connect()
  • shutdown()
  • 停止管理器的进程。这个方法只能用于已经使用 start() 启动的服务进程。

它可以被多次调用。

  • register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
  • 一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。

typeid 是一种 "类型标识符",用于唯一表示某种共享对象类型,必须是一个字符串。

callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用 connect() 方法连接到服务器,或者 create_method 参数为 False,那么这里可留下 None

proxytype 是 BaseProxy 的子类,可以根据 typeid 为共享对象创建一个代理,如果是 None , 则会自动创建一个代理类。

exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用 BaseProxy._callmethod() 代理。(如果 exposed 是 None, 则会在 proxytype.exposed 存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有 __call__() 方法并且不是以 '_' 开头的属性)

methodto_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeid 是 None, 则 proxytype._method_to_typeid\ 会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是 None ,则方法返回的对象会是一个值拷贝。

create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为 True

BaseManager 实例也有一个只读属性。

  • address
  • 管理器所用的地址。

在 3.3 版本发生变更: 管理器对象支持上下文管理协议 - 查看 上下文管理器类型__enter__() 启动服务进程(如果它还没有启动)并且返回管理器对象, __exit__() 会调用 shutdown()

在之前的版本中,如果管理器服务进程没有启动, __enter__() 不会负责启动它。

它拥有一系列方法,可以为大部分常用数据类型创建并返回 代理对象 代理,用于进程间同步。甚至包括共享列表和字典。

  • Barrier(text-parties[, action[, timeout]])
  • 创建一个共享的 threading.Barrier 对象并返回它的代理。

Added in version 3.3.

如果提供了 lock 参数,那它必须是 threading.Lockthreading.RLock 的代理对象。

在 3.3 版本发生变更: 新增了 wait_for() 方法。

  • Event()
  • 创建一个共享的 threading.Event 对象并返回它的代理。

  • Lock()

  • 创建一个共享的 threading.Lock 对象并返回它的代理。

  • Namespace()

  • 创建一个共享的 Namespace 对象并返回它的代理。

  • Queue([maxsize])

  • 创建一个共享的 queue.Queue 对象并返回它的代理。

  • RLock()

  • 创建一个共享的 threading.RLock 对象并返回它的代理。

  • Semaphore([value])

  • 创建一个共享的 threading.Semaphore 对象并返回它的代理。

  • Array(typecode, sequence)

  • 创建一个数组并返回它的代理。

  • Value(typecode, value)

  • 创建一个具有可写 value 属性的对象并返回它的代理。

  • dict()

  • dict(mapping)
  • dict(sequence)
  • 创建一个共享的 dict 对象并返回它的代理。

  • list()

  • list(sequence)
  • 创建一个共享的 list 对象并返回它的代理。

在 3.6 版本发生变更: 共享对象能够嵌套。例如, 共享的容器对象如共享列表,可以包含另一个共享对象,他们全都会在 SyncManager 中进行管理和同步。

  • class multiprocessing.managers.Namespace
  • 一个可以注册到 SyncManager 的类型。

命名空间对象没有公共方法,但是拥有可写的属性。直接print会显示所有属性的值。

值得一提的是,当对命名空间对象使用代理的时候,访问所有名称以 '_' 开头的属性都只是代理器上的属性,而不是命名空间对象的属性。

  1. >>> mp_context = multiprocessing.get_context('spawn')
  2. >>> manager = mp_context.Manager()
  3. >>> Global = manager.Namespace()
  4. >>> Global.x = 10
  5. >>> Global.y = 'hello'
  6. >>> Global._z = 12.3 # 这是该代理的一个属性
  7. >>> print(Global)
  8. Namespace(x=10, y='hello')

自定义管理器

要创建一个自定义的管理器,需要新建一个 BaseManager 的子类,然后使用这个管理器类上的 register() 类方法将新类型或者可调用方法注册上去。例如:

  1. from multiprocessing.managers import BaseManager
  2.  
  3. class MathsClass:
  4. def add(self, x, y):
  5. return x + y
  6. def mul(self, x, y):
  7. return x * y
  8.  
  9. class MyManager(BaseManager):
  10. pass
  11.  
  12. MyManager.register('Maths', MathsClass)
  13.  
  14. if __name__ == '__main__':
  15. with MyManager() as manager:
  16. maths = manager.Maths()
  17. print(maths.add(4, 3)) # 打印 7
  18. print(maths.mul(7, 8)) # 打印 56

使用远程管理器

可以将管理器服务运行在一台机器上,然后使用客户端从其他机器上访问。(假设它们的防火墙允许)

运行下面的代码可以启动一个服务,此付包含了一个共享队列,允许远程客户端访问:

  1. >>> from multiprocessing.managers import BaseManager
  2. >>> from queue import Queue
  3. >>> queue = Queue()
  4. >>> class QueueManager(BaseManager): pass
  5. >>> QueueManager.register('get_queue', callable=lambda:queue)
  6. >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
  7. >>> s = m.get_server()
  8. >>> s.serve_forever()

远程客户端可以通过下面的方式访问服务:

  1. >>> from multiprocessing.managers import BaseManager
  2. >>> class QueueManager(BaseManager): pass
  3. >>> QueueManager.register('get_queue')
  4. >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
  5. >>> m.connect()
  6. >>> queue = m.get_queue()
  7. >>> queue.put('hello')

也可以通过下面的方式:

  1. >>> from multiprocessing.managers import BaseManager
  2. >>> class QueueManager(BaseManager): pass
  3. >>> QueueManager.register('get_queue')
  4. >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
  5. >>> m.connect()
  6. >>> queue = m.get_queue()
  7. >>> queue.get()
  8. 'hello'

本地进程也可以访问这个队列,利用上面的客户端代码通过远程方式访问:

  1. >>> from multiprocessing import Process, Queue
  2. >>> from multiprocessing.managers import BaseManager
  3. >>> class Worker(Process):
  4. ... def __init__(self, q):
  5. ... self.q = q
  6. ... super().__init__()
  7. ... def run(self):
  8. ... self.q.put('local hello')
  9. ...
  10. >>> queue = Queue()
  11. >>> w = Worker(queue)
  12. >>> w.start()
  13. >>> class QueueManager(BaseManager): pass
  14. ...
  15. >>> QueueManager.register('get_queue', callable=lambda: queue)
  16. >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
  17. >>> s = m.get_server()
  18. >>> s.serve_forever()

代理对象

代理是一个 指向 其他共享对象的对象,这个对象(很可能)在另外一个进程中。共享对象也可以说是代理 指涉 的对象。多个代理对象可能指向同一个指涉对象。

代理对象代理了指涉对象的一系列方法调用(虽然并不是指涉对象的每个方法都有必要被代理)。通过这种方式,代理的使用方法可以和它的指涉对象一样:

  1. >>> mp_context = multiprocessing.get_context('spawn')
  2. >>> manager = mp_context.Manager()
  3. >>> l = manager.list([i*i for i in range(10)])
  4. >>> print(l)
  5. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  6. >>> print(repr(l))
  7. <ListProxy object, typeid 'list' at 0x...>
  8. >>> l[4]
  9. 16
  10. >>> l[2:5]
  11. [4, 9, 16]

注意,对代理使用 str() 函数会返回指涉对象的字符串表示,但是 repr() 却会返回代理本身的内部字符串表示。

被代理的对象很重要的一点是必须可以被序列化,这样才能允许他们在进程间传递。因此,指涉对象可以包含 代理对象 。这允许管理器中列表、字典或者其他 代理对象 对象之间的嵌套。

  1. >>> a = manager.list()
  2. >>> b = manager.list()
  3. >>> a.append(b) # a 的引用对象现在包含 b 的引用对象
  4. >>> print(a, b)
  5. [<ListProxy object, typeid 'list' at ...>] []
  6. >>> b.append('hello')
  7. >>> print(a[0], b)
  8. ['hello'] ['hello']

类似地,字典和列表代理也可以相互嵌套:

  1. >>> l_outer = manager.list([ manager.dict() for i in range(2) ])
  2. >>> d_first_inner = l_outer[0]
  3. >>> d_first_inner['a'] = 1
  4. >>> d_first_inner['b'] = 2
  5. >>> l_outer[1]['c'] = 3
  6. >>> l_outer[1]['z'] = 26
  7. >>> print(l_outer[0])
  8. {'a': 1, 'b': 2}
  9. >>> print(l_outer[1])
  10. {'c': 3, 'z': 26}

如果指涉对象包含了普通 listdict 对象,对这些内部可变对象的修改不会通过管理器传播,因为代理无法得知被包含的值什么时候被修改了。但是把存放在容器代理中的值本身是会通过管理器传播的(会触发代理对象中的 __setitem__ )从而有效修改这些对象,所以可以把修改过的值重新赋值给容器代理:

  1. # 创建一个代理列表并添加一个可变对象(字典)
  2. lproxy = manager.list()
  3. lproxy.append({})
  4. # 现在改变该字典
  5. d = lproxy[0]
  6. d['a'] = 1
  7. d['b'] = 2
  8. # 这时,对 d 的改变尚未同步,但通过更新该字典,
  9. # 代理将得到改变的通知
  10. lproxy[0] = d

在大多是使用情形下,这种实现方式并不比嵌套 代理对象 方便,但是依然演示了对于同步的一种控制级别。

备注

multiprocessing 中的代理类并没有提供任何对于代理值比较的支持。所以,我们会得到如下结果:

  1. >>> manager.list([1,2,3]) == [1,2,3]
  2. False

当需要比较值的时候,应该替换为使用指涉对象的拷贝。

  • class multiprocessing.managers.BaseProxy
  • 代理对象是 BaseProxy 派生类的实例。

    • _callmethod(methodname[, args[, kwds]])
    • 调用指涉对象的方法并返回结果。

如果 proxy 是一个代理且其指涉的是 obj , 那么下面的表达式:

  1. proxy._callmethod(methodname, args, kwds)

相当于求取以下表达式的值:

  1. getattr(obj, methodname)(*args, **kwds)

于管理器进程。

返回结果会是一个值拷贝或者一个新的共享对象的代理 - 见函数 BaseManager.register() 中关于参数 method_to_typeid 的文档。

如果这个调用熬出了异常,则这个异常会被 _callmethod() 透传出来。如果是管理器进程本身抛出的一些其他异常,则会被 _callmethod() 转换为 RemoteError 异常重新抛出。

特别注意,如果 methodname 没有 暴露 出来,将会引发一个异常。

_callmethod() 的一个使用示例:

  1. >>> l = manager.list(range(10))
  2. >>> l._callmethod('__len__')
  3. 10
  4. >>> l._callmethod('__getitem__', (slice(2, 7),)) # 等价于 l[2:7]
  5. [2, 3, 4, 5, 6]
  6. >>> l._callmethod('__getitem__', (20,)) # 等价于 l[20]
  7. Traceback (most recent call last):
  8. ...
  9. IndexError: list index out of range
  • _getvalue()
  • 返回指涉对象的一份拷贝。

如果指涉对象无法序列化,则会抛出一个异常。

  • repr()
  • 返回代理对象的内部字符串表示。

  • str()

  • 返回指涉对象的内部字符串表示。

清理

代理对象使用了一个弱引用回调函数,当它被垃圾回收时,会将自己从拥有此指涉对象的管理器上反注册,

当共享对象没有被任何代理器引用时,会被管理器进程删除。

进程池

可以创建一个进程池,它将使用 Pool 类执行提交给它的任务。

  • class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • 一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。

processes 是要使用的工作进程数量。 如果 processes 为 None 则使用 os.process_cpu_count() 所返回的数值。

如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)

maxtasksperchild 是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。默认的 maxtasksperchild 是 None,意味着工作进程寿与池齐。

context 可被用于指定启动的工作进程的上下文。通常一个进程池是使用函数 multiprocessing.Pool() 或者一个上下文对象的 Pool() 方法创建的。在这两种情况下, context 都是适当设置的。

注意,进程池对象的方法只有创建它的进程能够调用。

警告

multiprocessing.pool 对象具有需要正确管理的内部资源 (像任何其他资源一样),具体方式是将进程池用作上下文管理器,或者手动调用 close()terminate()。 未做此类操作将导致进程在终结阶段挂起。

请注意依赖垃圾回收器来销毁进程池是 不正确的 做法,因为 CPython 并不保证进程池终结器会被调用(请参阅 object.__del__() 来了解详情)。

在 3.2 版本发生变更: 增加了 maxtasksperchild 形参。

在 3.4 版本发生变更: 增加了 context 形参。

在 3.13 版本发生变更: 在默认情况下 processes 将使用 os.process_cpu_count(),而不是 os.cpu_count()

备注

通常来说,Pool 中的 Worker 进程的生命周期和进程池的工作队列一样长。一些其他系统中(如 Apache, mod_wsgi 等)也可以发现另一种模式,他们会让工作进程在完成一些任务后退出,清理、释放资源,然后启动一个新的进程代替旧的工作进程。 Pool 的 maxtasksperchild 参数给用户提供了这种能力。

  • apply(func[, args[, kwds]])
  • 使用 args 参数以及 kwds 命名参数调用 func , 它会返回结果前阻塞。这种情况下,apply_async() 更适合并行化工作。另外 func 只会在一个进程池中的一个工作进程中执行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])

  • apply() 方法的一个变种,返回一个 AsyncResult 对象。

如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。

如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。

回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。

  • map(func, iterable[, chunksize])
  • 内置 map() 函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap())。 它会保持阻塞直到获得结果。

这个方法会将可迭代对象分割为许多块,然后提交给进程池。 可以将 chunksize 设置为一个正整数来指定每个块的(近似)大小。

注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用 imap()imap_unordered() 并且显式指定 chunksize 以提升效率。

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
  • map() 方法的一个变种,返回一个 AsyncResult 对象。

如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。

如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。

回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。

  • imap(func, iterable[, chunksize])
  • map() 的延迟执行版本。

chunksize 参数的作用和 map() 方法的一样。对于很长的迭代器,给 chunksize 设置一个很大的值会比默认值 1 极大 地加快执行速度。

同样,如果 chunksize 是 1 , 那么 imap() 方法所返回的迭代器的 next() 方法拥有一个可选的 timeout 参数: 如果无法在 timeout 秒内执行得到结果,则 next(timeout) 会抛出 multiprocessing.TimeoutError 异常。

  • imap_unordered(func, iterable[, chunksize])
  • imap() 相同,只不过通过迭代器返回的结果是任意的。(当进程池中只有一个工作进程的时候,返回结果的顺序才能认为是"有序"的)

  • starmap(func, iterable[, chunksize])

  • map() 类似,不过 iterable 中的每一项会被解包再作为函数参数。

比如可迭代对象 [(1,2), (3, 4)] 会转化为等价于 [func(1,2), func(3,4)] 的调用。

Added in version 3.3.

  • starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
  • 相当于 starmap()map_async() 的结合,迭代 iterable 的每一项,解包作为 func 的参数并执行,返回用于获取结果的对象。

Added in version 3.3.

  • close()
  • 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。

  • terminate()

  • 不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用 terminate()

  • join()

  • 等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate()

在 3.3 版本发生变更: 进程池对象现在支持上下文管理器协议 - 参见 上下文管理器类型__enter__() 返回进程池对象, __exit__() 会调用 terminate()

  • class multiprocessing.pool.AsyncResult
  • Pool.apply_async()Pool.map_async() 返回对象所属的类。

    • get([timeout])
    • 用于获取执行结果。如果 timeout 不是 None 并且在 timeout 秒内仍然没有执行完得到结果,则抛出 multiprocessing.TimeoutError 异常。如果远程调用发生异常,这个异常会通过 get() 重新抛出。

    • wait([timeout])

    • 阻塞,直到返回结果,或者 timeout 秒后超时。

    • ready()

    • 返回执行状态,是否已经完成。

    • successful()

    • 判断调用是否已经完成并且未引发异常。 如果还未获得结果则将引发 ValueError

在 3.7 版本发生变更: 如果没有执行完,会抛出 ValueError 异常而不是 AssertionError

下面的例子演示了进程池的用法:

  1. from multiprocessing import Pool
  2. import time
  3.  
  4. def f(x):
  5. return x*x
  6.  
  7. if __name__ == '__main__':
  8. with Pool(processes=4) as pool: # 启动 4 个工作进程
  9. result = pool.apply_async(f, (10,)) # 在单个进程中异步地对 "f(10)" 求值
  10. print(result.get(timeout=1)) # 打印 "100" 除非你的计算机 非常
  11.  
  12. print(pool.map(f, range(10))) # 打印 "[0, 1, 4,..., 81]"
  13.  
  14. it = pool.imap(f, range(10))
  15. print(next(it)) # 打印 "0"
  16. print(next(it)) # 打印 "1"
  17. print(it.next(timeout=1)) # 打印 "4" 除非你的计算机 非常
  18.  
  19. result = pool.apply_async(time.sleep, (10,))
  20. print(result.get(timeout=1)) # 引发 multiprocessing.TimeoutError

监听器及客户端

通常情况下,进程间通过队列或者 Pipe() 返回的 Connection 传递消息。

不过,multiprocessing.connection 模块其实提供了一些更灵活的特性。最基础的用法是通过它抽象出来的高级API来操作socket或者Windows命名管道。也提供一些高级用法,如通过 hmac 模块来支持 摘要认证,以及同时监听多个管道连接。

  • multiprocessing.connection.deliver_challenge(connection, authkey)
  • 发送一个随机生成的消息到另一端,并等待回复。

如果收到的回复与使用 authkey 作为键生成的信息摘要匹配成功,就会发送一个欢迎信息给管道另一端。否则抛出 AuthenticationError 异常。

  • multiprocessing.connection.answer_challenge(connection, authkey)
  • 接收一条信息,使用 authkey 作为键计算信息摘要,然后将摘要发送回去。

如果没有收到欢迎消息,就抛出 AuthenticationError 异常。

  • multiprocessing.connection.Client(address[, family[, authkey]])
  • 尝试使用 address 地址上的监听器建立一个连接,返回 Connection

连接的类型取决于 family 参数,但是通常可以省略,因为可以通过 address 的格式推导出来。(查看 地址格式 )

如果给出了 authkey 并且不为 None,则它应为一个字节串并且会被用作基于 HMAC 认证的密钥。 如果 authkey 为 None 则不会执行认证。 如果认证失败则会引发 AuthenticationError。 参见 认证密码

  • class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
  • 可以监听连接请求,是对于绑定套接字或者 Windows 命名管道的封装。

address 是监听器对象中的绑定套接字或命名管道使用的地址。

备注

如果使用 '0.0.0.0' 作为监听地址,那么在Windows上这个地址无法建立连接。想要建立一个可连接的端点,应该使用 '127.0.0.1' 。

family 是套接字(或者命名管道)使用的类型。它可以是以下一种: 'AF_INET' ( TCP 套接字类型), 'AF_UNIX' ( Unix 域套接字) 或者 'AF_PIPE' ( Windows 命名管道)。其中只有第一个保证各平台可用。如果 family 是 None ,那么 family 会根据 address 的格式自动推导出来。如果 address 也是 None , 则取默认值。默认值为可用类型中速度最快的。见 地址格式 。注意,如果 family 是 'AF_UNIX' 而address是 None ,套接字会在一个 tempfile.mkstemp() 创建的私有临时目录中创建。

如果监听器对象使用了套接字,backlog (默认值为1) 会在套接字绑定后传递给它的 listen() 方法。

如果给出了 authkey 并且不为 None,则它应为一个字节串并且会被用作基于 HMAC 认证的密钥。 如果 authkey 为 None 则不会执行认证。 如果认证失败则会引发 AuthenticationError。 参见 认证密码

  • accept()
  • 接受一个连接并返回一个 Connection 对象,其连接到的监听器对象已绑定套接字或者命名管道。如果已经尝试过认证并且失败了,则会抛出 AuthenticationError 异常。

  • close()

  • 关闭监听器对象上的绑定套接字或者命名管道。此函数会在监听器被垃圾回收后自动调用。不过仍然建议显式调用函数关闭。

监听器对象拥有下列只读属性:

  • address
  • 监听器对象使用的地址。

  • last_accepted

  • 最后一个连接所使用的地址。如果没有的话就是 None

在 3.3 版本发生变更: 监听器对象现在支持了上下文管理协议 - 见 上下文管理器类型__enter__() 返回一个监听器对象, __exit__() 会调用 close()

  • multiprocessing.connection.wait(object_list, timeout=None)
  • 一直等待直到 object_list 中某个对象处于就绪状态。返回 object_list 中处于就绪状态的对象。如果 timeout 是一个浮点型,该方法会最多阻塞这么多秒。如果 timeout 是 None ,则会允许阻塞的事件没有限制。timeout为负数的情况下和为0的情况相同。

对于 POSIX 和 Windows,满足下列条件的对象可以出现在 object_list 中

当一个连接或者套接字对象拥有有效的数据可被读取的时候,或者另一端关闭后,这个对象就处于就绪状态。

POSIX: wait(object_list, timeout)select.select(object_list, [], [], timeout) 几乎相同。 差别在于,如果 select.select() 被信号中断,它会引发 OSError 并附带错误号 EINTR,而 wait() 则不会。

Windows: object_list 中的条目必须是一个可等待的整数句柄 (根据 Win32 函数 WaitForMultipleObjects() 文档所使用的定义) 或者一个具有 fileno() 方法的对象,该方法返回一个套接字句柄或管道句柄。 (注意管道句柄和套接字句柄 不是 可等待的句柄。)

Added in version 3.3.

示例

下面的服务代码创建了一个使用 'secret password' 作为认证密码的监听器。它会等待连接然后发送一些数据给客户端:

  1. from multiprocessing.connection import Listener
  2. from array import array
  3.  
  4. address = ('localhost', 6000) # 协议簇缩减为 'AF_INET'
  5.  
  6. with Listener(address, authkey=b'secret password') as listener:
  7. with listener.accept() as conn:
  8. print('connection accepted from', listener.last_accepted)
  9.  
  10. conn.send([2.25, None, 'junk', float])
  11.  
  12. conn.send_bytes(b'hello')
  13.  
  14. conn.send_bytes(array('i', [42, 1729]))

下面的代码连接到服务然后从服务器上j接收一些数据:

  1. from multiprocessing.connection import Client
  2. from array import array
  3.  
  4. address = ('localhost', 6000)
  5.  
  6. with Client(address, authkey=b'secret password') as conn:
  7. print(conn.recv()) # => [2.25, None, 'junk', float]
  8.  
  9. print(conn.recv_bytes()) # => 'hello'
  10.  
  11. arr = array('i', [0, 0, 0, 0, 0])
  12. print(conn.recv_bytes_into(arr)) # => 8
  13. print(arr) # => array('i', [42, 1729, 0, 0, 0])

下面的代码使用了 wait() ,以便在同时等待多个进程发来消息。

  1. from multiprocessing import Process, Pipe, current_process
  2. from multiprocessing.connection import wait
  3.  
  4. def foo(w):
  5. for i in range(10):
  6. w.send((i, current_process().name))
  7. w.close()
  8.  
  9. if __name__ == '__main__':
  10. readers = []
  11.  
  12. for i in range(4):
  13. r, w = Pipe(duplex=False)
  14. readers.append(r)
  15. p = Process(target=foo, args=(w,))
  16. p.start()
  17. # 现在我们关闭管道的可写端以确定
  18. # p 是拥有其所对应句柄的唯一进程。
  19. # 这将确保当 p 关闭可写端的句柄时,
  20. # wait() 将立即报告可读端已经就绪。
  21. w.close()
  22.  
  23. while readers:
  24. for r in wait(readers):
  25. try:
  26. msg = r.recv()
  27. except EOFError:
  28. readers.remove(r)
  29. else:
  30. print(msg)

地址格式

  • 'AF_INET' 地址是 (hostname, port) 形式的元组类型,其中 hostname 是一个字符串,port 是整数。

  • 'AF_UNIX' 地址是文件系统上文件名的字符串。

  • 'AF_PIPE' 地址是一个 r'\.\pipe\PipeName' 形式的字符串。 要使用 Client() 来连接到远程计算机上一个名为 ServerName 的命名管道则应当改用 r'\ServerName\pipe\PipeName' 形式的地址。

注意,使用两个反斜线开头的字符串默认被当做 'AF_PIPE' 地址而不是 'AF_UNIX' 地址。

认证密码

当使用 Connection.recv 接收数据时,数据会自动被反序列化。不幸的是,对于一个不可信的数据源发来的数据,反序列化是存在安全风险的。所以 ListenerClient() 之间使用 hmac 模块进行摘要认证。

认证密钥是一个 byte 类型的字符串,可以认为是和密码一样的东西,连接建立好后,双方都会要求另一方证明知道认证密钥。(这个证明过程不会通过连接发送密钥)

如果要求认证但是没有指定认证密钥,则会使用 current_process().authkey 的返回值 (参见 Process)。 这个值将被当前进程所创建的任何 Process 对象自动继承。 这意味着 (在默认情况下) 一个包含多进程的程序中的所有进程会在相互间建立连接的时候共享单个认证密钥。

os.urandom() 也可以用来生成合适的认证密钥。

日志记录

当前模块也提供了一些对 logging 的支持。注意, logging 模块本身并没有使用进程间共享的锁,所以来自于多个进程的日志可能(具体取决于使用的日志 handler 类型)相互覆盖或者混杂。

  • multiprocessing.get_logger()
  • 返回 multiprocessing 使用的 logger,必要的话会创建一个新的。

当首次创建时日志记录器级别为 logging.NOTSET 并且没有默认处理器。 发送到这个日志记录器的消息默认将不会传播到根日志记录器。

注意在 Windows 上,子进程只会继承父进程 logger 的日志级别 - 对于logger的其他自定义项不会继承。

  • multiprocessing.log_to_stderr(level=None)
  • 此函数会调用 get_logger() 但是会在返回的 logger 上增加一个 handler,将所有输出都使用 '[%(levelname)s/%(processName)s] %(message)s' 的格式发送到 sys.stderr 。你可以通过传递一个 level 参数来修改记录器的 levelname

下面是一个在交互式解释器中打开日志功能的例子:

  1. >>> import multiprocessing, logging
  2. >>> logger = multiprocessing.log_to_stderr()
  3. >>> logger.setLevel(logging.INFO)
  4. >>> logger.warning('doomed')
  5. [WARNING/MainProcess] doomed
  6. >>> m = multiprocessing.Manager()
  7. [INFO/SyncManager-...] child process calling self.run()
  8. [INFO/SyncManager-...] created temp directory ...pymp-...
  9. [INFO/SyncManager-...] manager serving at '...listener-...'
  10. >>> del m
  11. [INFO/MainProcess] sending shutdown message to manager
  12. [INFO/SyncManager-...] manager exiting with exitcode 0

要查看日志等级的完整列表,见 logging 模块。

multiprocessing.dummy 模块

multiprocessing.dummy 复制了 multiprocessing 的 API,不过是在 threading 模块之上包装了一层。

特别地,multiprocessing.dummy 所提供的 Pool 函数会返回一个 ThreadPool 的实例,该类是 Pool 的子类,它支持所有相同的方法调用但会使用一个工作线程池而非工作进程池。

  • class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])
  • 一个线程池对象,用来控制可向其提交任务的工作线程池。 ThreadPool 实例与 Pool 实例是完全接口兼容的,并且它们的资源也必须被正确地管理,或者是将线程池作为上下文管理器来使用,或者是通过手动调用 close()terminate()

processes 是要使用的工作线程数量。 如果 processes 为 None 则使用 os.process_cpu_count() 所返回的数值。

如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)

不同于 Pool,maxtasksperchild 和 context 不可被提供。

备注

ThreadPool 具有与 Pool 相同的接口,它围绕一个进程池进行设计并且先于 concurrent.futures 模块的引入。 因此,它继承了一些对于基于线程的池来说没有意义的操作,并且它具有自己的用于表示异步任务状态的类型 AsyncResult,该类型不为任何其他库所知。

用户通常应该倾向于使用 concurrent.futures.ThreadPoolExecutor,它拥有从一开始就围绕线程进行设计的更简单接口,并且返回与许多其他库相兼容的 concurrent.futures.Future 实例,包括 asyncio 库。

编程指导

使用 multiprocessing 时,应遵循一些指导原则和习惯用法。

所有start方法

下面这些适用于所有start方法。

避免共享状态

应该尽可能避免在进程间传递大量数据,越少越好。

最好坚持使用队列或者管道进行进程间通信,而不是底层的同步原语。

可序列化

保证所代理的方法的参数是可以序列化的。

代理的线程安全性

不要在多线程中同时使用一个代理对象,除非你用锁保护它。

(而在不同进程中使用 相同 的代理对象却没有问题。)

使用 Join 避免僵尸进程

在 POSIX 上当一个进程结束但没有被合并则它将变成僵尸进程。 这样的进程应该不会很多因为每次启动新进程(或 active_children() 被调用)时所有尚未被合并的已完成进程都将被合并。 而且调用一个已结束进程的 Process.is_alive 也会合并这个进程。 虽然如此但显式地合并你所启动的所有进程仍然是个好习惯。

继承优于序列化、反序列化

当使用 spawn 或者 forkserver 的启动方式时,multiprocessing 中的许多类型都必须是可序列化的,这样子进程才能使用它们。但是通常我们都应该避免使用管道和队列发送共享对象到另外一个进程,而是重新组织代码,对于其他进程创建出来的共享对象,让那些需要访问这些对象的子进程可以直接将这些对象从父进程继承过来。

避免杀死进程

通过 Process.terminate 停止一个进程很容易导致这个进程正在使用的共享资源(如锁、信号量、管道和队列)损坏或者变得不可用,无法在其他进程中继续使用。

所以,最好只对那些从来不使用共享资源的进程调用 Process.terminate

Join 使用队列的进程

记住,往队列放入数据的进程会一直等待直到队列中所有项被"feeder" 线程传给底层管道。(子进程可以调用队列的 Queue.cancel_join_thread 方法禁止这种行为)

这意味着,任何使用队列的时候,你都要确保在进程join之前,所有存放到队列中的项将会被其他进程、线程完全消费。否则不能保证这个写过队列的进程可以正常终止。记住非精灵进程会自动 join 。

下面是一个会导致死锁的例子:

  1. from multiprocessing import Process, Queue
  2.  
  3. def f(q):
  4. q.put('X' * 1000000)
  5.  
  6. if name == 'main':
  7. queue = Queue()
  8. p = Process(target=f, args=(queue,))
  9. p.start()
  10. p.join() # 这将死锁
  11. obj = queue.get()

交换最后两行可以修复这个问题(或者直接删掉 p.join())。

显式传递资源给子进程

在 POSIX 上使用 fork 启动方法,子进程将能够访问使用全局资源在父进程中创建的共享资源。 但是,更好的做法是将对象作为子进程构造器的参数来传入。

除了(部分原因)让代码兼容 Windows 以及其他的进程启动方式外,这种形式还保证了在子进程生命期这个对象是不会被父进程垃圾回收的。如果父进程中的某些对象被垃圾回收会导致资源释放,这就变得很重要。

所以对于实例:

  1. from multiprocessing import Process, Lock
  2.  
  3. def f():
  4. 使用锁进行一些操作
  5.  
  6. if name == 'main':
  7. lock = Lock()
  8. for i in range(10):
  9. Process(target=f).start()

应当重写成这样:

  1. from multiprocessing import Process, Lock
  2.  
  3. def f(l):
  4. 使用 "l" 进行一些操作
  5.  
  6. if name == 'main':
  7. lock = Lock()
  8. for i in range(10):
  9. Process(target=f, args=(lock,)).start()

谨防将 sys.stdin 数据替换为 “类似文件的对象”

multiprocessing 原本会无条件地这样调用:

  1. os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中 —— 这会导致与"进程中的进程"相关的一些问题。这已经被修改成了:

  1. sys.stdin.close()
  2. sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

它解决于进程相互冲突导致文件描述符损坏错误的基础性问题,但是又对使用带输出缓冲的“文件型对象”替代 sys.stdin() 的应用程序引入了潜在的危险。 这种危险在于如果有多个进程在此文件型对象上调用 close(),可能导致相同的数据被多次刷写到对象,造成数据损坏。

如果你写入文件型对象并实现了自己的缓存,可以在每次追加缓存数据时记录当前进程id,从而将其变成 fork 安全的,当发现进程id变化后舍弃之前的缓存,例如:

  1. @property
  2. def cache(self):
  3. pid = os.getpid()
  4. if pid != self._pid:
  5. self._pid = pid
  6. self._cache = []
  7. return self._cache

需要更多信息,请查看 bpo-5155 [https://bugs.python.org/issue?@action=redirect&bpo=5155], bpo-5313 [https://bugs.python.org/issue?@action=redirect&bpo=5313] 以及 bpo-5331 [https://bugs.python.org/issue?@action=redirect&bpo=5331]

spawnforkserver 启动方式

还有一些没有被应用到 fork 启动方法的额外限制。

更依赖序列化

Process.__init__() 的所有参数都必须可序列化。同样的,当你继承 Process 时,需要保证当调用 Process.start 方法时,实例可以被序列化。

全局变量

记住,如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。

当全局变量只是模块级别的常量时,是不会有问题的。

安全导入主模块

确保新的 Python 解释器可以安全地导入主模块,而不会导致意想不到的副作用(如启动新进程)。

例如,使用 spawnforkserver 启动方式执行下面的模块,会引发 RuntimeError 异常而失败。

  1. from multiprocessing import Process
  2.  
  3. def foo():
  4. print('hello')
  5.  
  6. p = Process(target=foo)
  7. p.start()

应该通过下面的方法使用 if __name__ == '__main__': ,从而保护程序"入口点":

  1. from multiprocessing import Process, freezesupport, setstartmethod
  2. def foo():
  3. print('hello')
  4. if _name == '__main':
  5. freeze_support()
  6. set_start_method('spawn')
  7. p = Process(target=foo)
  8. p.start()

(如果程序将正常运行而不是冻结,则可以省略 freeze_support() 行)

这允许新启动的 Python 解释器安全导入模块然后运行模块中的 foo() 函数。

如果主模块中创建了进程池或者管理器,这个规则也适用。

例子

创建和使用自定义管理器、代理的示例:

  1. from multiprocessing import freeze_support
  2. from multiprocessing.managers import BaseManager, BaseProxy
  3. import operator
  4.  
  5. ##
  6.  
  7. class Foo:
  8. def f(self):
  9. print('you called Foo.f()')
  10. def g(self):
  11. print('you called Foo.g()')
  12. def _h(self):
  13. print('you called Foo._h()')
  14.  
  15. # 一个简单的生成器函数
  16. def baz():
  17. for i in range(10):
  18. yield i*i
  19.  
  20. # 针对生成器对象的代理类型
  21. class GeneratorProxy(BaseProxy):
  22. exposed = ['__next__']
  23. def __iter__(self):
  24. return self
  25. def __next__(self):
  26. return self._callmethod('__next__')
  27.  
  28. # 返回 operator 模块的函数
  29. def get_operator_module():
  30. return operator
  31.  
  32. ##
  33.  
  34. class MyManager(BaseManager):
  35. pass
  36.  
  37. # 注册 Foo 类;使 `f()` 和 `g()` 可通过代理访问
  38. MyManager.register('Foo1', Foo)
  39.  
  40. # 注册 Foo 类;使 `g()` 和 `_h()` 可通过代理访问
  41. MyManager.register('Foo2', Foo, exposed=('g', '_h'))
  42.  
  43. # 注册生成器函数 baz;使用 `GeneratorProxy` 来作为代理
  44. MyManager.register('baz', baz, proxytype=GeneratorProxy)
  45.  
  46. # 注册 get_operator_module();使公有函数可通过代理访问
  47. MyManager.register('operator', get_operator_module)
  48.  
  49. ##
  50.  
  51. def test():
  52. manager = MyManager()
  53. manager.start()
  54.  
  55. print('-' * 20)
  56.  
  57. f1 = manager.Foo1()
  58. f1.f()
  59. f1.g()
  60. assert not hasattr(f1, '_h')
  61. assert sorted(f1.exposed) == sorted(['f', 'g'])
  62.  
  63. print('-' * 20)
  64.  
  65. f2 = manager.Foo2()
  66. f2.g()
  67. f2._h()
  68. assert not hasattr(f2, 'f')
  69. assert sorted(f2.exposed) == sorted(['g', '_h'])
  70.  
  71. print('-' * 20)
  72.  
  73. it = manager.baz()
  74. for i in it:
  75. print('<%d>' % i, end=' ')
  76. print()
  77.  
  78. print('-' * 20)
  79.  
  80. op = manager.operator()
  81. print('op.add(23, 45) =', op.add(23, 45))
  82. print('op.pow(2, 94) =', op.pow(2, 94))
  83. print('op.exposed =', op.exposed)
  84.  
  85. ##
  86.  
  87. if __name__ == '__main__':
  88. freeze_support()
  89. test()

使用 Pool:

  1. import multiprocessing
  2. import time
  3. import random
  4. import sys
  5.  
  6. #
  7. # 供测试代码使用的函数
  8. #
  9.  
  10. def calculate(func, args):
  11. result = func(*args)
  12. return '%s says that %s%s = %s' % (
  13. multiprocessing.current_process().name,
  14. func.__name__, args, result
  15. )
  16.  
  17. def calculatestar(args):
  18. return calculate(*args)
  19.  
  20. def mul(a, b):
  21. time.sleep(0.5 * random.random())
  22. return a * b
  23.  
  24. def plus(a, b):
  25. time.sleep(0.5 * random.random())
  26. return a + b
  27.  
  28. def f(x):
  29. return 1.0 / (x - 5.0)
  30.  
  31. def pow3(x):
  32. return x ** 3
  33.  
  34. def noop(x):
  35. pass
  36.  
  37. #
  38. # 测试代码
  39. #
  40.  
  41. def test():
  42. PROCESSES = 4
  43. print('Creating pool with %d processes\n' % PROCESSES)
  44.  
  45. with multiprocessing.Pool(PROCESSES) as pool:
  46. #
  47. # 测试
  48. #
  49.  
  50. TASKS = [(mul, (i, 7)) for i in range(10)] + \
  51. [(plus, (i, 8)) for i in range(10)]
  52.  
  53. results = [pool.apply_async(calculate, t) for t in TASKS]
  54. imap_it = pool.imap(calculatestar, TASKS)
  55. imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
  56.  
  57. print('Ordered results using pool.apply_async():')
  58. for r in results:
  59. print('\t', r.get())
  60. print()
  61.  
  62. print('Ordered results using pool.imap():')
  63. for x in imap_it:
  64. print('\t', x)
  65. print()
  66.  
  67. print('Unordered results using pool.imap_unordered():')
  68. for x in imap_unordered_it:
  69. print('\t', x)
  70. print()
  71.  
  72. print('Ordered results using pool.map() --- will block till complete:')
  73. for x in pool.map(calculatestar, TASKS):
  74. print('\t', x)
  75. print()
  76.  
  77. #
  78. # Test error handling
  79. #
  80.  
  81. print('Testing error handling:')
  82.  
  83. try:
  84. print(pool.apply(f, (5,)))
  85. except ZeroDivisionError:
  86. print('\tGot ZeroDivisionError as expected from pool.apply()')
  87. else:
  88. raise AssertionError('expected ZeroDivisionError')
  89.  
  90. try:
  91. print(pool.map(f, list(range(10))))
  92. except ZeroDivisionError:
  93. print('\tGot ZeroDivisionError as expected from pool.map()')
  94. else:
  95. raise AssertionError('expected ZeroDivisionError')
  96.  
  97. try:
  98. print(list(pool.imap(f, list(range(10)))))
  99. except ZeroDivisionError:
  100. print('\tGot ZeroDivisionError as expected from list(pool.imap())')
  101. else:
  102. raise AssertionError('expected ZeroDivisionError')
  103.  
  104. it = pool.imap(f, list(range(10)))
  105. for i in range(10):
  106. try:
  107. x = next(it)
  108. except ZeroDivisionError:
  109. if i == 5:
  110. pass
  111. except StopIteration:
  112. break
  113. else:
  114. if i == 5:
  115. raise AssertionError('expected ZeroDivisionError')
  116.  
  117. assert i == 9
  118. print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
  119. print()
  120.  
  121. #
  122. # 测试超时
  123. #
  124.  
  125. print('Testing ApplyResult.get() with timeout:', end=' ')
  126. res = pool.apply_async(calculate, TASKS[0])
  127. while 1:
  128. sys.stdout.flush()
  129. try:
  130. sys.stdout.write('\n\t%s' % res.get(0.02))
  131. break
  132. except multiprocessing.TimeoutError:
  133. sys.stdout.write('.')
  134. print()
  135. print()
  136.  
  137. print('Testing IMapIterator.next() with timeout:', end=' ')
  138. it = pool.imap(calculatestar, TASKS)
  139. while 1:
  140. sys.stdout.flush()
  141. try:
  142. sys.stdout.write('\n\t%s' % it.next(0.02))
  143. except StopIteration:
  144. break
  145. except multiprocessing.TimeoutError:
  146. sys.stdout.write('.')
  147. print()
  148. print()
  149.  
  150.  
  151. if __name__ == '__main__':
  152. multiprocessing.freeze_support()
  153. test()

一个演示如何使用队列来向一组工作进程提供任务并收集结果的例子:

  1. import time
  2. import random
  3.  
  4. from multiprocessing import Process, Queue, current_process, freeze_support
  5.  
  6. #
  7. # 由工作进程运行的函数
  8. #
  9.  
  10. def worker(input, output):
  11. for func, args in iter(input.get, 'STOP'):
  12. result = calculate(func, args)
  13. output.put(result)
  14.  
  15. #
  16. # 用于计算结果的函数
  17. #
  18.  
  19. def calculate(func, args):
  20. result = func(*args)
  21. return '%s says that %s%s = %s' % \
  22. (current_process().name, func.__name__, args, result)
  23.  
  24. #
  25. # 被任务引用的函数
  26. #
  27.  
  28. def mul(a, b):
  29. time.sleep(0.5*random.random())
  30. return a * b
  31.  
  32. def plus(a, b):
  33. time.sleep(0.5*random.random())
  34. return a + b
  35.  
  36. #
  37. #
  38. #
  39.  
  40. def test():
  41. NUMBER_OF_PROCESSES = 4
  42. TASKS1 = [(mul, (i, 7)) for i in range(20)]
  43. TASKS2 = [(plus, (i, 8)) for i in range(10)]
  44.  
  45. # 创建队列
  46. task_queue = Queue()
  47. done_queue = Queue()
  48.  
  49. # 提交任务
  50. for task in TASKS1:
  51. task_queue.put(task)
  52.  
  53. # 启动工作进程
  54. for i in range(NUMBER_OF_PROCESSES):
  55. Process(target=worker, args=(task_queue, done_queue)).start()
  56.  
  57. # 获取并打印结果
  58. print('Unordered results:')
  59. for i in range(len(TASKS1)):
  60. print('\t', done_queue.get())
  61.  
  62. # 使用 `put()` 添加更多任务
  63. for task in TASKS2:
  64. task_queue.put(task)
  65.  
  66. # 获取并打印更多结果
  67. for i in range(len(TASKS2)):
  68. print('\t', done_queue.get())
  69.  
  70. # 通知子进程停止运行
  71. for i in range(NUMBER_OF_PROCESSES):
  72. task_queue.put('STOP')
  73.  
  74.  
  75. if __name__ == '__main__':
  76. freeze_support()
  77. test()