08-asyncio -- 异步IO

发布时间 2023-12-14 23:32:53作者: yaowy

写在前面

async -- 异步
io --- IO
asyncio -- 异步IO

1. Python 协程介绍

协程,又称之为微线程,本质上还是 python 运行的单进程单线程程序。和线程不同,协程不涉及到系统级的上下文切换,而是在单个线程内进行锁执行代码块的切换。因此协程并没有提高计算速度,而是在代码执行的等待时间,去做别的任务,典型的运用场景就是网络通讯。

在 python3.7 以后,由 asyncio 包实现协程,虽然有更古老的实现办法,但是我们没必要去做深入的了解。asyncio 包的运行核心就是event loop(事件循环)。event loop 就像是程序运行时的大脑,他面对着很多的任务,并从中选择可以执行的任务进行执行。由于协程本质上还是在单进程单线程内执行程序,因此,同时执行的任务只能有一个,这个的只有一个指的是任务的代码执行只有一个,IO操作是可以有多个的。在切换任务时不存在系统级的上下文切换,都是在用户态内进行的执行代码块的切换。

在python asyncio 包中,需要每个任务(可等待对象)主动显式得告诉 event loop 自己已经执行完了,需要将程序的控制权交还给 event loop, event loop 才会继续调用其他的可执行的任务执行。因此,在 python 的asyncio 协程中,可以明确知道任务什么时候结束运算,不存在竞争冒险这样的问题。

2. Python asyncio 包使用

2.1 程序入口

若想运行协程,需要做出如下的操作:

  • 进入async 模式,创建 event loop, 并交由 event loop 控制整个程序的运行
  • 将所有可等待对象托管给 event loop, 并等待 event loop 进行调用执行。

在Python中,若想进行 async 模式并让 event loop 控制整个执行状态,基本全部用入口函数 asyncio.run(). asyncio.run() 方法定义如下所示

def run(main, *, debug=None):
    """Execute the coroutine and return the result.

    This function runs the passed coroutine, taking care of
    managing the asyncio event loop and finalizing asynchronous
    generators.

    This function cannot be called when another asyncio event loop is
    running in the same thread.

    If debug is True, the event loop will be run in debug mode.

    This function always creates a new event loop and closes it at the end.
    It should be used as a main entry point for asyncio programs, and should
    ideally only be called once.

    Example:

        async def main():
            await asyncio.sleep(1)
            print('hello')

        asyncio.run(main())
    """

asyncio.run() 主要做了两件事:

  1. 建立 event loop, 用于调用执行可等待对象。在一个线程内只能有一个 event loop;
  2. event loop 建立后,将执行协程对象 main,待执行完成后返回执行结果。协程对象main是由程序中最高层级入口点函数main() 生成的 coroutine object。其他可能带对象,均由入口点函数main() 及其调用函数托管注册给 event loop 中,并交由 event loop 调用执行,以期孜孜不倦完成整个程序功能。

如下所示:

import asyncio


async def coroutine_func():
    print('hello')
    await asyncio.sleep(1)
    print('world')


coroutine_obj = coroutine_func()
asyncio.run(coroutine_obj)

2.2 可等待对象

如果一个对象可以使用 await 关键字修饰,那么他就是可等待对象(awaitable object)在asyncio包中,许多 API 都是接收可等待对象作为参数。

主要有三种可等待对象:coroutine、task、future

coroutine

coroutine 有两层相关定义:coroutine function【协程函数】、 coroutine object【协程对象】。

  • coroutine function:以 async 关键字修饰的 function。在 coroutine function 中定义 coroutine 的执行过程。
  • coroutine object:对 coroutine function 进行调用,得到 coroutine object。和生成器对象相似,在调用 coroutine function时,并不会执行 coroutine function 中的代码,而是得到 coroutine object。如下所示:
import asyncio


async def coroutine_func():
    print('hello')
    await asyncio.sleep(1)
    print('world')


coroutine_obj = coroutine_func()
print(type(coroutine_obj))

在 event loop 执行可等待对象(task, coroutine)时,若遇到 await coroutine, 不会将程序的控制权交还给 event loop,而是直接执行 await 的 coroutine对象,直到
await 的 coroutine 执行过程中遇到过不去的坎儿,如 await 新的task或者Future, await asyncio.sleep() 等,才会将控制权还给 event loop, 以便于 event loop 调用新的 task并执行。当await 的 coroutine的执行结束后,会返回代表 coroutine 执行结果的 Future 对象给 await 他的 task 或者 coroutine。在一个 task 或 coroutine object 中, await 一个 coroutine function,其执行过程如下:

  1. 调用 coroutine function 并生成对应的 coroutine object。因此,await 可以直接修饰 coroutine object;
  2. coroutine object 将直接被调用运行,并直到此 coroutine object 执行过程中遇到无法向下执行的事件(比如 await 一个 Task 或者 Future 对象,不能是 await coroutine object, 否则 await 的 coroutine object 也会继续直接被调用执行)时;或者此 coroutine object 执行结束时,返回代表当前 coroutine object 执行结果的 Future 对象。
  3. 与此同时,在此 coroutine object 执行过程中,await它的 Task 或 coroutine object 并不会交还控制权给event loop,而是等待此 coroutine object 执行返回Future对象;
  4. 若返回的 Future 对象代表的 coroutine object 已经执行完成,则当前 Task 会保存 Future 对象,以便于获取 coroutine function 的返回值,并继续向下执行;
  5. 若 Future 对象代表的 coroutine object 因遇到过不去的坎而未完成执行(如创建了 Task 对象,并await 了此 Task 对象),则会将 Future 对象注册到 event loop 中,以表明当前 await 它的 Task 或 coroutine object 需要等到此 Future 对象所代表的的 coroutine object 执行完成后才能继续,从而建立依赖关系,告诉 event loop 此 Task 无法继续执行, event loop 可以调用其他 Task 进行运行。 当 Future 对象代表的 coroutine object 异步执行完成后, event loop 将择机再次安排当前 Task 运行,并把依赖的 Future 对象中里面真正的返回值保存起来,并继续向下执行。
import asyncio
import time


async def say_after(delay, say_what):
    await asyncio.sleep(delay=delay)
    print(say_what)


async def main():
    print(f"start at {time.strftime('%X')}")
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

在此补充执行过程。

在上述例子中,两个say_after执行总耗时是3秒,并没有发挥协程的作用。这是因为,遇到await关键字后,就会立即继续执行相应的coroutine object,后面的代码就必须等到前面的coroutine object执行结束后才能执行。

注意,在await coroutine时,将直接调用执行该 coroutine。此时,程序控制权在 await 的 coroutine 中,并没有回到 event loop 中,event loop就无实现并发执行。而若想真正实现并发执行 coroutine,则是需要将 coroutine object 封装到 task中,等待便于event loop 进行并发调度执行 task,以并发调度执行 coroutine。下面看 Task 介绍。

Task

Task 用于并发调度执行 coroutine,他是线程非安全的。Task 类继承自 Future 类。 coroutine 可以也应该被封装为 Task,被 event loop 调度运行。如果 Task 中封装的 coroutine 需要 await Future 对象(这里的 Future 对象泛指一步操作的结果,在 asyncio 中 Future 对象往往代表着 coroutine object 的执行结果),那么将建立起 Task 中 coroutine 对象对于 await 的 Future 对象的依赖关系,挂起此 coroutine 的执行、将程序执行控制权交回给 event loop,并等待此 Future 对象的完成。当 Future 对象完成后, event loop 将择机再次调用 Task 中封装的 coroutine 继续执行。

event loop 使用协作调度:event loop 同一时刻只能运行一个 Task。当Task 需要 await Future 对象完成,那么 event loop 将与运行其他 Task、回调、或者执行 IO操作。

可以使用高层级的 asyncio.create_task(), 低层级的 loop.create_task()、ebsure_future() 方法创建 Task,但不鼓励手动直接实例化 Task 对象。

在 Task 被创建时, event loop 就保存着 Task 的弱引用,即已经被注册到 event loop 中。当在 Task 中 await Task 时,将告诉 event loop 当前 Task 需要等到 await 的 Task 执行完成后,才能继续执行。同时,当前 Task 立即将程序控制权交还给 event loop,以便于 event loop 调用执行新的 Task。当 event loop 再次安排当前 Task 运行时,会吧 await 的 Task 中的返回值保存起来,并继续向下执行。

import asyncio
import time


async def say_after(delay, say_what):
    print('++', say_what)
    await asyncio.sleep(delay=delay)
    print(say_what)


async def main():
    task1 = asyncio.create_task(say_after(4, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    await asyncio.sleep(5)
    print(f"start at {time.strftime('%X')}")
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

在上述程序中,create_task函数把 coroutine object 封装成为task,并注册到event loop中,告诉event loop这个task已经可以开始执行了, 但此时还未开始执行,程序的执行权还在 mian 这个 coroutine 这里。等到遇到了 await asyncio.sleep(5) 这行代码,main coroutine 交出了程序执行权,event loop 调用 task1 和 task2 开始执行。
后续的 await task1 这里则必须接收到结果, mian coroutine 才能继续向下执行。

Tuture

Future 是一种特殊的 低层级 可等待对象,代表异步操作的 最终结果。在cououtine中 await Future 对象,就意味着 coroutine 将保持等待,直到该 Future 对象在其他地方操作完毕 才能继续向下执行。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

通常情况下 不允许在应用代码中手动创建 Future 对象。

在asyncio中,Future 对象往往代表着 coroutine object 的执行结果。如 直接await coroutine object时,Future对象将保存 coroutine object 的最终结果;当 await Task对象时,Future对象将代表 Task中封装的 coroutine object 的异步执行结果。

由上述介绍可知,所有控制权的返回都是显式的。event loop并没有办法强行从某个task或coroutine object中拿回控制权,而是必须由task 或coroutine object主动把控制权交回给event loop。交回的方式有两种:await关键字交回,coroutine object的函数运行完毕后返回。也正因为如此,task中不可包含死循环,否则event loop将会卡死。

并发执行 Task

那么,假如我们需要await很多个task时,怎么处理呢?在asyncio包中,提供了gather函数用于并发执行多个可等待对象,主要特点如下:

  • gather函数用于并发执行coros_or_futures序列中的可等待对象;
  • gather函数的返回值是future对象,代表了coros_or_futures序列中可等待对象的执行结果;
  • gather函数返回的future对象也是可await的;
  • coros_or_futures序列中的可等待对象若是coroutine,那么coroutine将会被封装为Task进行调用执行;
  • coros_or_futures序列中的可等待对象并不会按照传入的顺序进行调用。

gather函数入参是由若干个coroutine object、task、future(gather的结果可以继续gather)组成的序列。gather函数会把非task的参数封装成task对象,注册到event loop中,然后返回future对象。当await gather函数返回的future对象时,就是在告诉event loop当前正在执行的task需要等到gaither里面所有的task都完成,才可以继续。同时,会把这些task的return值放到一个list中,然后返回回来,list中值的顺序和task的顺序一致。

import asyncio


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f


async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)


asyncio.run(main())

gather函数会把coroutine直接封装成为task,就不用手动调用create_task把coroutine封装成task。