concurrent.futures 模块

发布时间 2023-03-31 16:40:18作者: MrSphere

应用场景

# 什么时候用池
池的功能是限制启动的进程数或线程数
# 什么时候应该限制
当并发的任务数量远远超过了计算机的承受能力时,
即无法一次性开启过多的进程数或线程数时,
就应该用池的概念,将开启的进程数或线程数限制在计算机的可承受范围内

# 提交任务的两种形式
1. 同步:提交完任务后就在原地等待,直到任务运行完毕后拿到任务的返回值,再继续运行下一行代码
2. 异步:提交完任务后不在原地等待,直接运行下一行代码,任务的返回值

线程池

'''
线程池由concurrent.futures 下的ThreadPoolExecutor提供
submit(fn,*args,**kwarg s)将函数fn提交给线程池,
map(fn,*iterables,timeout=None,chunksize =1) 启动多线程,让函数fn分别使用后面的可迭代参数
	map的结果是生成器类型的数据,映射函数名字对应的结果保存在map object中
shutdown(wait = True) 关闭线程池,已经关闭的线程池不能再添加线程,可以用上下文管理器,自动关闭
'''

'''
使用submit() 函数提交后会返回future 对象
cancel() 可以取消该线程,如果线程正则运行,不可取消并返回Fasle,否则取消,并返回True
cancelled() 返回线程是否被取消
running() 返回线程是否正在运行
done() 返回线程是否完成,包括取消和正常完成
result()获取该线程的返回值,会阻塞线程,timeout 是阻塞时间,阻塞时间内获取不到就不要了,报错
add_done_callback(fn) 线程结束后执行fn回调函数

# -*- coding: utf-8 -*-
import os
import random
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def task(n):
    print(f"{os.getpid()} is run..")
    time.sleep(n)
    # print('f')
    return n ** 2

# if __name__ == '__main__':
#     pool = ProcessPoolExecutor(max_workers=4)
#     start = time.perf_counter()
#     res_list = [pool.submit(task, random.randint(1, 5)).result() for i in range(5)]
#     pool.shutdown(wait=True)
#
#     # res = [res.result() for res in res_list]
#     end = time.perf_counter()
#     print(res_list)  # [1, 4, 4, 25, 9]
#     print(f'main... {end - start}')  # main... 13.29411249999248 1+2+2+5+3


if __name__ == '__main__':
    pool = ProcessPoolExecutor(max_workers=4)
    start = time.perf_counter()
    res_list = [pool.submit(task, random.randint(1, 5)) for i in range(5)]
    pool.shutdown(wait=True)

    res = [res.result() for res in res_list]
    end = time.perf_counter()
    print(res)  # [9, 25, 4, 25, 4]
    print(f'main... {end - start}')  # main... 5.186168200001703 5
	

# if __name__ == '__main__':
#     pool = ProcessPoolExecutor(max_workers=4)
#     start = time.perf_counter()
#     for i in range(10):
#         # pool.submit(task,args = random.randint(1,5))
#		  # main...0.021992599999066442,多进程执行结果
#         res =pool.submit(task,2)
#		  main...0.018832400004612282 604 is run..14160 is run..
#         # res = pool.submit(task, i)
#         # 产生res对象,对象有result方法
#         # pool 没有 join 方法,使用了res.result() 获取结果,会使多进程或者多线程阻塞
#         # print(res.result()) # main...20.27270380000118
#     pool.shutdown(wait=True)
# 	  #shutdown 关闭 进程池的入口,等待全部运行完 main...9.246424099997967
#     end = time.perf_counter()
#     # processes = [pool.submit(task,args = range(1,5)) for i in range(10)]
#     print(f'main...{end- start}') # main...45.25680309999734