Python多进程并行操作-multiprocessing-Managers

发布时间 2023-05-22 17:16:13作者: burlingame

转载:Python多进程并行操作-multiprocessing-Managers - 知乎 (zhihu.com)

Manager提供了一种方法创建数据,数据能够在不同进程之间共享,包括跨网络的运行在不同机器上的进程。manager对象控制有共享对象的服务进程。其他进程通过代理后也能操作共享对象。

multiprocessing.Manager()

返回一个开始的SyncManager对象,能够在不同进程之间共享对象。返回的manager对象对应了新生成的子进程,能够创建共享独享,返回对应的代理器。

Manger 进程会在垃圾回收或者父进程推出时关闭,manager类在multiprocessing.managers模块中定义。

class multiprocessing.managers.BaseManager([address[, authkey]])

创建一个BaseManager对象

在对象创建后需要调用start()或get_server().server_forever()保证开启的manager进程对应的manager对象。

参数

  • address:manager进程监听新连接的地址。如果地址是None,会选择一个随机地址。
  • authkey:是授权密钥,用来检查连接进程到服务进程的有效性。如果authkey是None,就会使用current_process().authkey。如果使用了authkey,就必须是字节码形式。

方法

  • start([initializer[, initargs]])
    启动子进程开始manager。如果initializer不是None,当开始时,子进程会调用initializer(*initargs)。
  • get_server()
    返回Server()对象,表示真实的在Manager控制下的服务。Server对象支持server_forever()方法
from multiprocessing.managers import BaseManager
manager = BaseManager(address='',50000), authkey=b'abc')
server = manager.get_server()
server.server_forever()

server额外有address属性

  • connect()
    连接本地的manager对象到远程manager进程
from multiprocessing.managers import BaseManager
m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc')
m.connect()
  • shutdown()
    停止manager使用的进程,只能用在使用start()启动服务进程的情况下。
    能够用好多次。
  • register(typeid[,callable[,proxytype[,exposed[,method_to_typeid[,create_method]]]]])
    一个用于注册一种类型或调用manager类的类方法。

    typeid:一种类型标识,用于标志特定共享独享的类型。必须是string类型。
    callable:用于新建这一类型标识的对象。如果manager实例使用connect()方法连接到服务器,或者 create_method参数为False,就会认为是None。
    proxytype:BaseProxy的子类,用于给这个typeid的共享对象创建代理器。如果是None,会自动创建一个代理类。
    exposed:用于给定一个方法名的序列给这个typeid。如果expose是None,如果存在的话,会使用 proxytype._exposed_。如果没有特定的expose列表,就能使用所有共享独享的共有方法。(这里说的共有方法意味着所有有 __call__()方法的属性,且名字不以'_'开头。
    method_to_typeid:这是一个会返回代理的映射,指定exposed方法的返回类型。它映射了方法名称到typeid 字符串。如果 method_to_typeid是None,就会使用proxytype._method_to_typeid_替代。如果方法名不是映射到关键字,或者如果映射是None,方法返回的对象会复制值。
    create_method:决定方法是否要用typeid作为名字建立,用来告诉服务器进程创建一个新的共享对象,返回代理器。默认为True。

    BaseManager实例有只读特性:address,给manager使用

Class multiprocessing.managers.SyncManager

BaseManager的子类,用于不同进程的同步。multiprocessing.Manager()返回的就是这种类型的对象。

它的方法给一些常用数据类型的创建和返回Proxy对象,用于在不同进程间的同步。主要是共享列表和字典。

Barrier(parties[, action[, timeout]])

新建一个共享的threading.Barrier对象,返回一个proxy

BoundedSemaphore([value])

创建一个共享的threading.BoundedSemaphore对象,返回一个proxy

Condition([lock])

创建一个共享的threading.Condition对象,返回一个proxy。

如果提供了lock,需要给threading.Lock或threading.RLock一个proxy

Event()

创建一个共享的threading.Event对象,返回proxy

Lock()

创建共享的threading.Lock对象,返回proxy

Namespace()

创建共享的Namespace对象,返回proxy

Queue()

创建共享queue.Queue对象,返回proxy

RLock

创建threading.RLock对象,返回proxy

Semaphore([value])

创建threading.Semaphore对象,返回proxy

Array(typecode,sequence)

创建向量,返回proxy

Value(typecode, value)

创建带有可写value属性的对象,返回proxy

dict()、dict(mappting)、dict(sequence)

创建共享字典对象,返回proxy

list()、list(sequence)

创建共享list对象,返回proxy

class multiprocessing.managers.Namespace

可以注册SyncManager的类型。

命名空间对象,没有公用方法,但是有可以写的属性。表示有属性的值。

但是,当给namespace对象使用proxy时,以'_'开头的属性会成为proxy的属性,而不是引用属性。

manager = multiprocessing.Manager()
Global = manager.Namespace()
Global.x=10
Global.y='hello'
Global._z=12.3. # 这是proxy的属性
print(Global)
# 结果是
Namespace(x=10,y='hello')

自定义managers

为创建自己的manager,可以新建BaseManager的子类,使用register()类方法注册一种新的类型或调用manager类。例如

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用远程manager

在一台机器上运行manager服务,从其他机器上以客户端接入是可以的。(假设防火墙允许)

如下例子,创建一个带有共享的queue的服务端,客户端可远程接入:

from multiprocessing.managers import BaseManager
from queue import Queue
queue = Queue()
class QueueManager(BaseManager): pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()

另一个客户端:

from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
m.connect()
queue = m.get_queue()
queue.get()

本地进程也可以接入queue,使用上面的客户端来远程接入

from multiprocessing import Process, Queue
from multiprocessing.managers import BaseManager
class Worker(Process):
    def __init__(self, q):
        self.q = q
        super().__init__()
    def run(self):
        self.q.put('local hello')

queue = Queue()
w = Worker(queue)
w.start()
class QueueManager(BaseManager): pass

QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()