Python多进程处理(读、写)numpy矩阵

发布时间 2023-05-31 17:06:23作者: burlingame

转载:(15条消息) Python多进程处理(读、写)numpy矩阵_multiprocessing.rawarray_Hayreen的博客-CSDN博客

前言

由于需要使用python处理一个380*380的numpy矩阵,经过计算后对其中的每个元素进行赋值,单进程处理大约需要4小时,要处理几百个矩阵,时间上有些耗不起,研究了一下python的多进程(multiprocessing),坑点在于numpy array需要在多个子进程之间共享,总结如下。不对或不妥之处望指教。

另,据说python目前已加入multiprocessing.shared_memory 模块,由于使用的某些包不支持python3.8及以上版本,所以未作研究。目前代码基于Python3.7,在windows环境下编写。

多进程共享变量
multiprocessing支持的共享对象有:Value、Array、RawValue 与 RawArray。网上资料显示Value、Array支持加入进程锁,RawValue 与 RawArray则不支持。值的注意的是,不支持加入进程锁不代表不能在不同进程上共同编辑该共享变量。如对于本人遇到的问题而言,不同进程对共享array的不同位置各自进行修改,不存在内存上的冲突(同一个array的不同位置对应不同的内存地址),所以本例使用RawArray进行进程间的变量共享。

#定义一个double类型的RawArray,d表示double类型,100为其长度
import multiprocessing
X = multiprocessing.RawArray('d', 100)

This RawArray is an 1D array, or a chunk of memory that will be used to hold the data matrix. To make it easier to manipulate its data, we can wrap it as an numpy array by using the frombuffer function. The following code shows how a RawArray is wrapped as an numpy array and how numpy. copyto can be used to fill the RawArray with existing data.

X_shape = (16, 1000000)
# Randomly generate some data
data = np.random.randn(*X_shape)
X = multiprocessing.RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X, dtype=np.float64).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
但是RawArray仅支持一维数组,因此传入numpy数组时,需要先将其展成一维,多个子进程的处理对象为一维的RawArray。在多进程运行结束后,再使用np.frombuffer()将其reshape为二维numpy矩阵。示例代码如上所示。

多进程处理共享的一维Array (RawArray为例)
对于子进程需要面对的RawArray而言,不能直接传入,需要通过pool的initializer进行传入,不然据说会报错(RuntimeError: c_double_Array_x objects should only be shared between processes through inheritance)。

也就是说,需要在一开始定义进程池的时候,首先将共享Array通过initializer函数传入,实现子进程的初始化。

#定义一个initializer函数
def init_pool(array):
    global glob_array    #定义全局变量
    glob_array = array
 
def func(array_SHAPE, alpha, i):
    for j in range(array_SHAPE[0]):  
        glob_array[i*array_SHAPE[0]+j] = i**2+j*alpha 
        # 子进程内直接读取全局变量glob_array,无需传入
        # glob_array为展平的矩阵(即multiprocessing.RawArray, 多进程不支持二维矩阵)
        # 此处需要对齐一维的glob_array与原始的二维array的对应位置关系。
if __name__ == '__main__'
    array = np.randn(30,30)
    func_partial = partial(func, array_SHAPE = array.shape, alpha = 3.1416) 
    #由于下面要使用的多线程的map函数仅支持输入一个参数,因此此处使用了python内置的偏函数
    #偏函数functools.partial()可以为某函数先行传入变量,第一个参数为函数名,其余参数为对应的原函数的输入变量。
    #此例中func_partial(i) 等同于 func(array_SHAPE, alpha, i)
    array_shared = multiprocessing.RawArray('d', array.ravel())  
    # 由于对矩阵元素的写入是有位置坐标的(各进程可以各自改动对应矩阵位置(即内存地址)处的值,
    # 故无需加进程锁
    p = multiprocessing.Pool(processes=6, initializer=init_pool, 
                             initargs=(array_shared,))
    # 定义一个进程池,指定进程数量(processes),初始化函数(initializer)以及初始化函数中的输入(initargs)
    p.map(func_partial, range(array.shape[1]))
    # 此处使用进程池map()函数对numpy矩阵维度1(即,行)进行迭代
    # 此时可视为map函数向子进程中分配不同的行,各个子进程在分配的不同行中各自处理整行的数据。
    p.close()
    p.join()

 

主进程操作共享变量转化为二维numpy矩阵

多进程运行结束后,可以将共享变量取出,再次reshape为二维numpy矩阵。

    new_array = np.frombuffer(array_shared, np.double).reshape(array_SHAPE) 
    # 需要注意,此处要使用array_shared,而不是glob_array.
    # glob_array为子进程中的全局变量,在主进程中并未被定义,但主进程中的array_shared与子进程中的glob_radioScore_array指向同一内存地址.

效果实测
实验室主机CPU为8核16线程,分别开启了6个和8个进程,6个进程相比单进程快了一倍(4小时 vs 2小时)。具体效果可能与具体任务有关,也可能还有改进空间,望大家指点~

更新:之前vscode调试模式下8进程花费1小时40分钟,直接在power shell中运行脚本则只需40分钟,有点不解。。。

 win10资源监视器。左图为8进程,右图为单进程

 

参考:

1. https://research.wmz.ninja/articles/2018/03/on-sharing-large-arrays-when-using-pythons-multiprocessing.html 作者: Mianzhi Wang

2. https://www.7forz.com/3408/#comment-952 作者: 7forz

 

1,正确的代码

import multiprocessing
import numpy as np
from functools import partial

# X = multiprocessing.RawArray('d', 100)
#
# X_shape = (16, 1000000)
# # Randomly generate some data
# data = np.random.randn(*X_shape)
# X = multiprocessing.RawArray('d', X_shape[0] * X_shape[1])
# # Wrap X as an numpy array so we can easily manipulates its data.
# X_np = np.frombuffer(X, dtype=np.float64).reshape(X_shape)
# # Copy data to our shared array.
# np.copyto(X_np, data)


# 定义一个initializer函数
def init_pool(array):
    global glob_array  # 定义全局变量
    glob_array = array


def func(i,array_SHAPE, alpha):
    print(array_SHAPE, alpha,i)
    for j in range(array_SHAPE):
        glob_array[i * array_SHAPE + j] = i ** 2 + j * alpha
        # 子进程内直接读取全局变量glob_array,无需传入
        # glob_array为展平的矩阵(即multiprocessing.RawArray, 多进程不支持二维矩阵)
        # 此处需要对齐一维的glob_array与原始的二维array的对应位置关系。

if __name__ == '__main__':
    X_shape = (30000, 30000)
    array = np.random.randn(*X_shape)
    func_partial = partial(func, array_SHAPE=array.shape[0], alpha=3.1416)
    # func_partial = partial(func, array_SHAPE=X_shape[0], alpha=3.1416)
    #由于下面要使用的多线程的map函数仅支持输入一个参数,因此此处使用了python内置的偏函数
    #偏函数functools.partial()可以为某函数先行传入变量,第一个参数为函数名,其余参数为对应的原函数的输入变量。
    #此例中func_partial(i) 等同于 func(array_SHAPE, alpha, i)
    array_shared = multiprocessing.RawArray('d', array.ravel())
    # 由于对矩阵元素的写入是有位置坐标的(各进程可以各自改动对应矩阵位置(即内存地址)处的值,
    # 故无需加进程锁
    p = multiprocessing.Pool(processes=6, initializer=init_pool,
                             initargs=(array_shared,))
    # 定义一个进程池,指定进程数量(processes),初始化函数(initializer)以及初始化函数中的输入(initargs)
    p.map(func_partial, range(array.shape[1]))
    #p.map(func_partial, list(range(X_shape[1])))
    # p.map(func_partial, array.shape[1])
    # p.map(func_partial, range(16))
    # 此处使用进程池map()函数对numpy矩阵维度1(即,行)进行迭代
    # 此时可视为map函数向子进程中分配不同的行,各个子进程在分配的不同行中各自处理整行的数据。
    p.close()
    p.join()

 

  def func(i,array_SHAPE, alpha): 如果写成 def func(array_SHAPE, alpha,i): 那么会报错如下所示:

Traceback (most recent call last):
  File "C:\yzm1\PycharmProjects\onnx_pro\test14.py", line 46, in <module>
    p.map(func_partial, range(array.shape[1]))
  File "C:\Users\13269\anaconda3\lib\multiprocessing\pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\13269\anaconda3\lib\multiprocessing\pool.py", line 774, in get
    raise self._value
TypeError: func() got multiple values for argument 'array_SHAPE'

报上述错误是因为:

这里有个容易忽略的点,如果不指定参数位置的话,那么默认从第一个参数开始固定,所以使用它的时候最好指定参数。但是被指定的参数必须在函数的最后部分,比如 func01(a,b,c),我们可以固定 b 和 c,可以固定 c,但是绝对不能只固定 b!否则会报错。