python-asyncio

发布时间 2023-05-20 20:56:37作者: 贝壳里的星海

python -asyncio

协程是在用户态实现的上下文切换技术

相比线程切换,协程上下文切换代价更小。
协程是单线程的,不存在多线程互斥访问共享变量,不用锁、信号量等机制
协程非常灵活,当出现I/O阻塞时,就去切换任务,I/O完成再唤醒,这就是所谓的 异步I/O,

实现化协程对象的方法有:

  • yield关键字
  • yield from关键字
  • asyncawait 关键字

yield

def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。
此处使用的是Python 3.5之后出现的async/await来实现协程

通过 async/await 语法来声明协程是编写 asyncio 应用的推荐方式。

asyncio中几个重要概念

1.事件循环

管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放在队列中,空闲时调用相应的事件处理者来处理这些事件。

2.Future

Future对象表示尚未完成的计算,还未完成的结果

3.Task

是Future的子类,作用是在运行某个任务的同时可以并发的运行多个任务。

asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:

asyncio 完整流程

1、定义/创建协程对象
2、将协程转为task任务
3、定义事件循环对象容器
4、将task任务扔进事件循环对象中触发
asyncio.async()
loop.create_task() 或 asyncio.ensure_future()

异步代码

import time
import asyncio

# 定义异步函数
async def hello():
    await asyncio.sleep(1)
    print(f'Hello World: {time.time()}')

def addtack(loop):
    for i in range(5):
        loop.create_task(hello())

if __name__ == '__main__':
    t1 = time.time()
    loop = asyncio.get_event_loop()      # 初始化时间
    addtack(loop)                        # 创建任务
    loop.run_until_complete(hello())     # 等待任务执行结束
    t2 = time.time()
    print(f"all time {t2 - t1}")

# -------------------------------------------------------
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
all time 1.0133047103881836

定义和运行

使用async def语句定义一个协程函数,但这个函数不可直接运行

# 普通函数
def function():
    return 1
   
# 异步函数
async def asynchronous():
    return 1
# 异步函数不同于普通函数不可能被直接调用
async def aaa():
    print('hello')

print(aaa())

# 输出----------------------------------
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

如何运行一个协程呢,有三种方式:

asyncio.run()

1、使用函数,可直接运行

import asyncio

async def aaa():
    print('hello')

asyncio.run(aaa())
# 输出-------------------------
hello

await()

2、使用await()进行异步等待

  • await语法等待另一个协程,这将挂起当前协程,直到另一个协程返回结果。

asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程

import asyncio
async def aaa():
    print('hello')

async def main():
    await aaa()
    
asyncio.run(main())

asyncio.create_task()

使用asyncio.create_task() 函数来创建一个任务,放入事件循环中

import asyncio

async def aaa():
    print('hello')

async def main():
    asyncio.create_task(aaa())
asyncio.run(main())
import asyncio

async def asfunc():
    await asyncio.sleep(1)
    print('hello')

aaa=asfunc()
loop = asyncio.get_event_loop()
task = loop.create_task(aaa)
loop.run_until_complete(task)

创建Task

loop.create_task()

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print('beike tian')

coro = helperfunc()

loop = asyncio.get_event_loop()
print("loop:",loop)
task = loop.create_task(coro)
print('task:', task)

loop.run_until_complete(task)
print('task:', task)
loop.close()

# 输出 ----------------------------------------
loop: <ProactorEventLoop running=False closed=False debug=False>
task: <Task pending name='Task-1' coro=<helperfunc() running at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3>>
beike tian
task: <Task finished name='Task-1' coro=<helperfunc() done, defined at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3> result=None>

run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回

import asyncio

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

获取协程返回

有2种方案可以获取返回值。

方法一:通过task.result()

可通过调用 task.result() 方法来获取协程的返回值,但是只有运行完毕后才能获取,若没有运行完毕,result()方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 错误

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print('beike tian')
    return  "小贝壳"

coro = helperfunc()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('task:', task)
try:
    print('task.result:', task.result())
except asyncio.InvalidStateError:
    print('task状态未完成,捕获了 InvalidStateError 异常')

loop.run_until_complete(task)
print('task:', task)
print('task.result:', task.result())
loop.close()

# ----------------------------
task: <Task pending name='Task-1' coro=<helperfunc() running at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3>>
task状态未完成,捕获了 InvalidStateError 异常
beike tian
task: <Task finished name='Task-1' coro=<helperfunc() done, defined at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3> result='小贝壳'>
task.result: 小贝壳

方法二、asyncio.gather

import asyncio

async def func1(i):
    print(f"协程函数{i}马上开始执行。")
    await asyncio.sleep(2)
    return i

async def main():
    tasks = []
    for i in range(1, 5):
        tasks.append(func1(i))

    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"执行结果: {result}")

if __name__ == '__main__':
    asyncio.run(main())

取消任务

import asyncio
import time

async def helperfunc(nums):
    await asyncio.sleep(1)
    print(f'beike tian {nums}')
    return  "小贝壳"


if __name__ == "__main__":
    task1 = helperfunc(2)
    task2 = helperfunc(3)
    task3 = helperfunc(3)

    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())
        loop.stop()
        # stop 调用之后,需要调用 run_forever,不然会报错
        loop.run_forever()
    finally:
        loop.close()

方法三:通过add_done_callback()回调

通过 Future 的 add_done_callback() 方法来添加回调函数,当任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print('beike tian')
    return  "小贝壳"

def my_callback(future):
    print('返回值:', future.result())

coro = helperfunc()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
task.add_done_callback(my_callback)

loop.run_until_complete(task)
loop.close()

# -----------------------------------------
beike tian
返回值: 小贝壳

多任务控制

通过asyncio.wait()可以控制多任务

asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环

asyncio.gatherasyncio.wait 功能相似。

方法1、asyncio.wait

import asyncio

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)

loop = asyncio.get_event_loop()

tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
loop.close()

方法2、asyncio.gather

import asyncio
async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)
    
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

# 或者借助列表
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

# -------------------------------------
Waiting 3
Waiting 1
<等待三秒钟>
Done

多任务返回

方法1、需要通过loop.create_task()创建task对象

import asyncio

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

for task in tasks:
    print(task.result())

loop.close()

方法2、回调add_done_callback()

import asyncio

def my_callback(future):
    print('返回值:', future.result())

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = []
for i in range(3):
    task = loop.create_task(coroutine_example('Zarten_' + str(i)))
    task.add_done_callback(my_callback)
    tasks.append(task)

wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

loop.close()

其他知识点

run_until_complete 实现的原理

run_forever 会一直运行,直到 stop 被调用

loop.run_forever() 会让线程一直运行。loop.run_until_complete() 借助了 run_forever 方法。
在 run_until_complete 的实现中,调用了 future.add_done_callback(_run_until_complete_cb)。
async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

wait 与 gather 中的区别

gather 比 wait 更加高层。gather 可以将任务分组,一般优先使用 gather。在某些定制化任务需求的时候,会使用 wait。

from functools import partial
import asyncio
import time


async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [do_some_work(i) for i in range(10)]

    group1 = [do_some_work(i)  for i in range(2)]
    group2 = [do_some_work(i)  for i in range(2)]

    loop.run_until_complete(asyncio.gather(*group1, *group2))
    print(time.time() - start_time)

参考文献

https://docs.python.org/zh-cn/3/library/asyncio-task.html#running-an-asyncio-program

https://www.cnblogs.com/Red-Sun/p/16934843.html

https://zhuanlan.zhihu.com/p/137698989

https://www.cnblogs.com/MrReboot/p/16413332.html

https://zhuanlan.zhihu.com/p/59621713 动态添加协程

https://zhuanlan.zhihu.com/p/137698989 gather 和 wait 区别