Python concurrent.futures 模块(转载)

发布时间 2023-07-13 17:15:58作者: evescn

Python concurrent.futures 模块

Python标准模块 concurrent.futures 高度封装进程池线程池模块

https://www.cnblogs.com/linhaifeng/articles/7428877.html#_label13

1 介绍

  • concurrent.futures: 模块提供了高度封装的异步调用接口
  • ThreadPoolExecutor: 线程池,提供异步调用
  • ProcessPoolExecutor: 进程池,提供异步调用

2 基本方法

submit(fn, *args, **kwargs)

异步提交任务

pool = ProcessPoolExecutor(4)
for i in range(13):
    pool.submit(task, i)

map(func, *iterables, timeout=None, chunksize=1)

取代for循环submit的操作

pool = ThreadPoolExecutor(max_workers=3)

# for i in range(11):
#     future = pool.submit(task,i)

pool.map(task,range(1,12)) # map取代了 for + submit

shutdown(wait=True)

相当于进程池的 pool.close() + pool.join() 操作

  • wait=True,等待池内所有任务执行完毕回收完资源后才继续
  • wait=False,立即返回,并不会等待池内的任务执行完毕
  • 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
  • submit和map必须在shutdown之前
pool = ProcessPoolExecutor(4)
for i in range(13):
    pool.submit(task, i)

pool.shutdown(wait=True)

result(timeout=None)

取得池内函数返回的结果

pool = ProcessPoolExecutor(4)
for i in range(13):
    res = pool.submit(task, i)
    res = res.result()

add_done_callback(fn)

回调函数,和异步任务一起实现程序并行

pool = ProcessPoolExecutor(4)
for i in range(13):
    res = pool.submit(task, i)
    res.add_done_callback(hanle)

ProcessPoolExecutor

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        res=executor.submit(task,i)
        futures.append(res)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

进程池用户和线程池用法类似,程序具体使用进程池还是线程池,取决于程序是计算密集型还是IO密集型

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        res=executor.submit(task,i)
        futures.append(res)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

进程池用户和线程池用法类似,程序具体使用进程池还是线程池,取决于程序是计算密集型还是IO密集型

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit

回调函数

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

转载自

https://www.cnblogs.com/linhaifeng/articles/7428877.html#_label13