基于multiprocessing map实现python并行化(全局变量共享 map机制实用向分析 常见问题 pandas存储数据)

发布时间 2023-05-31 17:36:40作者: burlingame

转载:(15条消息) 基于multiprocessing map实现python并行化(全局变量共享 map机制实用向分析 常见问题 pandas存储数据)_goto_past的博客-CSDN博客

基于multiprocessing map实现python并行化
之前从来没考虑python可以并行化,最近有一个项目需要计算100*100 次的遗传算法适应度,每次计算都要用到700000+的数据,每次计算不并行的话得用几十分钟,根本顶不住,因此调研并学习了一下并行化处理,还是很有效的,现在每次计算基本控制在2分钟以内。

首先看一个并行化实现for循环的博客,适合刚刚接触python并行化的伙伴。
[python]map方法与并行执行

我的并行化启蒙就是源自上面博客中的代码,果然Talk is cheap, show us the code!

之后在此基础上一步步解决了共享变量等问题,map相较于Process的一个好处是更容易获得返回值,这也是为啥之后见到的多是Process的教程我依然坚持用map。

话不多说,直接上代码,个人心得见注释。

"""
基于multiprocessing map
实现python并行化 的一些问题分享
环境:python 3.8.5 其他库也要进行相应的配套更新(很重要 不同的python版本并行机制有一定差异)
"""

import time
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import multiprocessing as mp
from multiprocessing import Array
# from read_data import get_distance_hav
from multiprocessing.dummy import freeze_support, Manager
from math import sin, asin, cos, radians, fabs, sqrt

"""
在引用自己定义的外部函数时要注意:
被引用文件的全局代码段会在引用时被执行! 造成大量的时间和内存浪费。
之前在引用'get_distance_hav'函数时没有注意,导致多进程开展前进行了大量的读取和运算。
"""

EARTH_RADIUS = 6371  # 地球平均半径,6371km
#plan_index = np.load('plan_index.npy', allow_pickle=True).item()  # 读入外部数据,作为全局变量,子进程会只读
plan_index = range(10)
W1, W2, W3 = 1, 1, 1

"""
multiprocessing map机制:
map函数对可迭代的参数列表里的每个参数执行所传入的func,并且创建Pool规定的进程数来并行实现前面的过程。
理论上来说,每个进程执行 "len(参数列表))/cpu_num" 次func。

***函数外部的代码(该注释上为例),每个进程都会且只会执行一次(进程第一次创建时),之后只会重复执行func的内容***
在函数外部声明的全局变量,每个进程都可以读到(使用global,原理上是子进程复制了该全局变量到自己的内存,子进程可以修改该值,但子进程不会影响该全局变量在主进程中的值。 另外,根据上面星号标注的内容所说,一旦子进程中更改过读进来的全局变量的值,子进程之后所有func的执行都以更改后的值为准,并不会恢复到声明时所赋的值。

(例如前面的EARTH_RADIUS,一旦子进程中修改该值为0,该子进程之后对func所有的执行读入EARTH_RADIUS就已经为0了)

对此有疑惑的可以见下一份代码示例
"""


# 下面是两个计算距离的功能函数 对本文没有帮助 可跳过
def hav(theta):
    s = sin(theta / 2)
    return s * s


def get_distance_hav(lat0, lng0, lat1, lng1):
    """用haversine公式计算球面两点间的距离。"""
    # do calculation
    return distance


def get_d(index1, index2, ns):
    # 这里同样可以读入ns里面的变量
    # 与本文关系不大 省略
    return 0


# 进程执行的func:
def find_min(args):
    index1, ns = args  # 读入了两个变量,需要计算的index下标,以及Manager Namespace
    global plan_index  # 读入全局变量 注意不要修改,否则子进程中下次func执行将会是修改后的值,而非最开始读入的值
    for i in range(1, len(ns.df)):  # ns.df 即为传入的df
        last, next = max(0, index1 - i), min(index1 + i, len(ns.df) - 1)
        d1 = get_d(index1, last, ns)
        d2 = get_d(index1, next, ns)
        d = min(d1, d2)
        index2 = (last if d == d1 else next)
        if d != np.inf:
            break
    v= d*W1+W2+W3
    #v = d * (W1 * ns.df[index1]['Q'] + W2 * ns.df.iloc[index1]['A'] + W3 * ns.df.iloc[index1]['T'])
    return v


if __name__ == '__main__':
    t1 = time.time()
    # with open('plan_able.pkl', 'rb') as f:
    #     plan_able = pickle.load(f)
    plan_able = list(range(10))
    # df = pd.read_pickle('TJ_UGS.pkl')

    code = np.zeros(385066)
    planed = np.random.choice(range(0, 385066), 120000)
    code[planed] = 1
    code = code.astype(int)

    # 通过Manger实现参数共享,节省内存占用。
    # Manger也可以实现不同进程对变量修改(需要锁),但本代码中 外部数据都是只读,没有用到。
    mgr = Manager()
    ns = mgr.Namespace()
    # ns.df = df
    ns.df = list(range(10))
    ns.code = code  # 为Namespace()存入df和code两个两边,func可访问

    p = mp.Pool(8)
    result = p.map(find_min, tqdm(list(zip(plan_able, [ns] * len(plan_able)))))
    # 由于map机制,要为每个共享的参数复制len份

    p.close()
    p.join()

    print(code)
    print(sum(result))
    t2 = time.time()
    print('time used:', t2 - t1)

下面这份代码示例主要用于说明map子进程中变量更改的一些机制
在函数外部声明的全局变量,每个进程都可以读到(使用global,原理上是子进程复制了该全局变量到自己的内存,子进程可以修改该值,但子进程不会影响该全局变量在主进程中的值。) 一旦子进程中更改过读进来的全局变量的值,子进程之后所有func的执行都以更改后的值为准,并不会恢复到声明时所赋的值。

import time, os
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import os
import multiprocessing as mp
from multiprocessing.dummy import freeze_support, Manager

pid = os.getpid()  #用于辨别父进程和两个子进程
ppid = os.getppid()
print('pid:', pid, 'ppid:', ppid)

EARTH_RADIUS = 6371  # 地球平均半径,6371km
plan_index = np.load('plan_index.npy', allow_pickle=True).item()
W1, W2, W3 = 1, 1, 1


def find_min(arg):
    global EARTH_RADIUS
    if EARTH_RADIUS == 6371:  #在运行时,只会输出两次,因为子进程在之后修改了自己的EARTH_RADIUS为0
        print('************************************')
    pid = os.getpid()
    EARTH_RADIUS = 0
    print('in pid:{}, value:{}'.format(pid, EARTH_RADIUS))


if __name__ == '__main__':
    print('main before change:', EARTH_RADIUS)
    p = mp.Pool(2)
    p.map(find_min, list(range(1000)))  #range的值不能太小,否则map只会分配给一个进程执行
    p.close() 
    p.join()
    # find_min(0)

    print('main after change:', EARTH_RADIUS)  # 仍为 6371

另外,推荐一篇主动实现sharedmem (共享内存)的博文,亲测有效:
知乎 Python3.8 多进程之共享内存