asyncio:python3未来并发编程主流、充满野心的模块

发布时间 2023-06-02 10:44:11作者: tomato-haha

https://www.cnblogs.com/traditional/p/11828780.html


楔子

asyncio 是 Python 在 3.5 版本中正式引入的标准库,这是 Python 未来并发编程的主流,非常重要的一个模块。有一个 Web 框架叫 sanic,就是基于 asyncio,使用 sanic 可以达到匹配 Go 语言的并发量(有点夸张了,还是有差距的,但至少在一个量级)。

asyncio 模块提供了使用协程构建并发应用的工具,threading 模块通过应用线程实现并发,multiprocessing 使用系统进程实现并发。asyncio 使用一种单线程、单进程模式实现并发,应用的各个部分会彼此合作,在最优的时刻显式的切换任务。大多数情况下,会在程序阻塞等待读写数据时发生这种上下文切换,不过 asyncio 也支持调度代码在将来的某个特定时间运行,从而支持一个协程等待另一个协程完成,以处理系统信号和识别其他一些事件(这些事件可能导致应用改变其工作内容)。

asyncio中,有几个非常重要的概念。

  • coroutine 对象(协程对象):调用一个使用 async def 定义的函数会返回一个协程对象,协程对象无法直接执行,需要将其注册到事件循环中,由事件循环调用。
  • Future 对象(未来对象):在 asyncio 中,如何才能得到异步调用的结果呢?先设计一个对象,异步调用执行完的时候,就把结果放在它里面,这种对象称之为未来对象。未来对象有一个 result 方法,可以获取未来对象的内部结果。还有个 set_result 方法,是用于设置 result 的。set_result 设置的是什么,调用 result 得到的就是什么。Future 对象可以看作下面的 Task 对象的容器。
  • Task 对象(任务):一个协程就是一个原生可以挂起的函数,任务则是对象协程的进一步封装,里面可以包含协程在执行时的各种状态,关于 Task 和 Future 两者之前的关系我们后面会说。
  • event loop(事件循环):程序开启一个无限循环,可以把一些协程注册到事件循环中,当满足事件发生的时候,就会执行相应的协程。
  • async / await 关键字:Python 3.5 开始引入的用于定义协程函数的关键字,async def 定义一个协程函数,调用协程函数会创建协程对象;在一个协程中可以驱动另一个协程,而驱动的方式就是使用 await 关键字。

使用其它并发模型的大多数程序都采用线性方式编写,而且依赖于语言运行时系统或操作系统的底层线程、进程管理来适当地改变上下文。基于 asyncio 的应用要求应用代码显式地处理上下文切换,要正确地使用相关技术,这取决于是否能正确理解一些相关联的概念。

asyncio 提供的框架以一个事件循环(event loop)为中心,这是一个首类对象,负责高效地处理I / O 事件、系统事件、和应用上下文切换。目前已经提供了多个循环实现来高效地利用操作系统的功能。尽管通常会自动选择一个合理的默认实现,但也完全可以在应用中选择某个特定的事件循环实现。

与事件循环交互的应用要显式地注册将运行的代码,让事件循环在资源可用时向应用代码发出必要的调用。

例如:一个网络服务器打开套接字,然后注册为当这些套接字上出现输入事件时服务器要得到的通知。

事件循环在建立一个新的进入链接或者在数据可读取时都会提醒服务器代码,当前上下文中没有更多工作可做时,应用代码要再次短时间地交出控制权。

例如:如果一个套接字没有更多的数据可以接收,那么服务器会把控制权交给事件循环。

所以,就是把事件注册到事件循环中,不断地循环这些事件,可以处理了那么就去处理,如果卡住了,那么把控制权交给事件循环,继续执行其他可执行的任务。

像传统的 twisted、gevent、以及 tornado,都是采用了事件循环的方式,这种模式只适用于高 I / O,低 CPU 的场景,一旦出现了耗时的复杂运算,那么所有任务都会被卡住。

将控制权交给事件循环的机制依赖于协程(coroutine),这是一些特殊的函数,可以将控制返回给调用者而不丢失其状态。

协程与生成器非常类似,实际上,在 python3.5 版本之前还未对协程提供原生支持时,可以用生成器来实现协程。asyncio 还为协议(protocol)和传输(transport)提供了一个基于类的抽象层,可以使用回调编写代码而不是直接编写协程。在基于类的模型和协程模型时,可以通过重新进入事件循环显式地改变上下文,以取代 Python 多线程实现中隐式的上下文改变。

创建一个协程并执行

那么我们看看如何创建一个协程,协程是一个专门设计用来实现并发操作的语言构造。在早期版本,是使用yield来模拟协程,但它本质上是一个生成器,但是从 Python3.5 开始,Python 已经支持原生协程。通过 async def 定义一个协程函数,调用协程函数会创建一个协程(对象),协程中可以使用 await 关键字驱动另一个协程执行。

# 使用 async def 可以直接定义一个协程函数
async def coroutine():
    print("in coroutine")

# 得到一个协程对象
c = coroutine()
print(c)  # <coroutine object coroutine at 0x000002903CB322C0>
print(type(c))  # <class 'coroutine'>

# 协程是没有办法直接执行的, 我们需要扔到事件循环中
import asyncio
loop = asyncio.get_event_loop()  # 创建一个事件循环
loop.run_until_complete(c)
"""
in coroutine
"""
loop.close()  # 关闭事件循环

run_until_complete 方法会启动协程,也可以同时启动多个协程,当所有的协程都运行关闭时,会停止循环。

值得一提的是,从 Python3.7 开始,async 和 await 已经是关键字了,我们之前说的关键字其实是保留关键字,意思是你可以使用 async 和 await 作为变量名,但是在 Python3.7之后,就不可以了。另外在 Python3.7 中还提供了另一种运行协程的方法。

import asyncio

async def coroutine():
    print("in coroutine")

c = coroutine()
# asyncio.run 内部包含了创建事件循环、执行协程、关闭事件循环等一套逻辑
asyncio.run(c)
"""
in coroutine
"""

如果协程有返回值呢?

import asyncio

async def coroutine():
    print("in coroutine")
    return "result"

loop = asyncio.get_event_loop()
c = coroutine()
result = loop.run_until_complete(c)
print(result)
"""
in coroutine
result
"""
loop.close()


# 对于 asyncio.run 而言也是一样的
c = coroutine()
result = asyncio.run(c)
print(result)
"""
in coroutine
result
"""

但是注意了,如果一个协程已经运行完毕了,那么就不能再扔到事件循环中了,举个栗子:

import asyncio

async def coroutine():
    return "result"

loop = asyncio.get_event_loop()
try:
    c = coroutine()
    result = loop.run_until_complete(c)
    print(result)  # result
    # 再次运行
    loop.run_until_complete(c)
except RuntimeError as e:
    print(e)  # cannot reuse already awaited coroutine
finally:
    loop.close()

如果一个协程已经被扔到事件循环中执行完毕了,那么它就已经是 awaited 的协程了,关于 await 我们后面会说。这个时候要是把 awaited 的协程再扔到事件循环中,那么就会报错。

多个协程合作

一个协程还可以驱动另一个协程执行,并等待其返回结果,从而可以更容易地将一个任务分解为多个可重用的部分。

import asyncio

async def worker():
    print("worker....")
    # 使用 await 方法会驱动协程 consumer() 执行, 并得到其返回值
    # 这里类似于函数调用一样, 但是协程需要加上一个 await
    res = await consumer()
    print(res)


async def consumer():
    return "i am consumer"


asyncio.run(worker())
"""
worker....
i am consumer
"""

在这里,使用 await 关键字,而不是向循环中增加新的协程。因为控制流已经在循环管理的一个协程中,所以没必要告诉事件循环来管理这些协程。另外,协程可以并发运行,但前提是多个协程。这个协程卡住了,可以切换到另一个协程。但是就卡住的协程本身来说,该卡多长时间还是多长时间,不可能说跳过卡住的部分执行下面的代码。

另外我们还可以通过装饰器来模拟协程,协程函数是 asyncio 设计中的关键部分,它们提供了一个语言构造,可以停止程序某一部分的执行,保留这个调用的状态,并在以后重新进入这个状态,这些动作都是并发框架很重要的功能。Python3.5中引入了一些新的语言特性,可以使用 async def 以原生方式定义这些协程函数,以及使用 await 交出控制,asyncio 的例子应用了这些新特性。但是早期版本,可以使用 asyncio.coroutine 装饰器将函数装饰成一个协程函数,并使用 yield from 来达到同样其它协程执行的效果。

import asyncio

@asyncio.coroutine
def worker():
    print("worker....")
    res = yield from consumer()
    print(res)


@asyncio.coroutine
def consumer():
    return "i am consumer"


asyncio.run(worker())
"""
worker....
i am consumer
"""

我们看到使用生成器依旧可以达到这样的效果,然而尽管使用生成器可以达到同样的效果,但还是推荐使用 async 和 await,原因如下:

  • 生成器既可以做生成器,又可以包装为协程,那么它到底是协程还是生成器呢?这会使得代码出现混乱
  • 生成器既然叫生成器,那么就应该做自己
  • 基于async的原生协程比使用yield装饰器的协程要快,大概快10-20%

并且在 Python3.8 中,已经警告了,不建议使用这种方法,定义一个协程函数应该使用 async def。

Task 与 Future

对于协程来说,是没有办法直接放到事件循环里面运行的,需要的是 Task 对象(任务)。而我们刚才之所以直接将协程扔进去,是因为 asyncio 内部会有检测机制,如果是协程的话,会自动将协程包装成一个 Task 对象。

import asyncio

async def coroutine():
    print(123)


loop = asyncio.get_event_loop()
# 如何创建一个任务呢?
task = loop.create_task(coroutine())
loop.run_until_complete(task)
"""
123
"""

因此一个协程是一个可以原生挂起的函数,而一个 Task 对象则是对协程的进一步封装,里面包含了协程的各种执行状态。

而 Future 被称之为未来对象,我觉得可以把它看成是 Task 对象的容器,因为 Task 继承自 Future。任务执行完毕的时候会通过 set_result 设置返回值,然后外部可以调用 result 获取 set_result 设置的返回值。而 Task 对象内部是没有这两个方法的,它们属于 Future 对象,但是 Task 是 Future 的子类,所以它也可以使用。我们可以通过 asyncio 直接创建一个 Future 对象,但是创建 Task 对象则需要一个协程对象。

import asyncio

def mark_done(future, result):
    print("setting result")
    future.set_result(result)


async def main(loop):
    future = asyncio.Future()
    print("scheduling mark_done")
    # loop.call_later 我们后面会说, 总之这一步是不会阻塞的, 会在两秒钟之后执行
    loop.call_later(2, mark_done, future, "the result")
    print("~~~~~~~~~~~~~~")

    # await future 会等待这个 future 完成, 但什么时候完成呢?
    # 当这个 future 执行 set_result 的时候就代表它完成了, 然后 await future 会返回 set_result 设置的值, 相当于 future.result()
    # 其实我们 await 一个协程也是一样, 也是当协程对应的任务执行完毕、将返回值进行 set_result 的时候
    # 然后我们知道 await 协程 得到的就是当前定义的协程函数的返回值, 其实准确来说,应该是协程对应的 Task 对象的 result()
    # 只不过 result() 得到的就是 set_result 设置进去的值, 而 set_result 设置进去的正式当前定义的协程函数的返回值
    # 尽管是一样的, 但是这个逻辑还是要理清
    res = await future
    print("res =", res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
scheduling mark_done
setting result
res = the result
"""

所以一个协程对应一个任务,而任务继承自未来对象。当调用未来对象内部的 set_result 的时候,代表这个任务执行完毕了。我们 await 协程 的时候可以拿到的返回值,就是 set_result 时设置的值,本质上就是协程函数的返回值,相当于 future.result()、或者 task.result()。

除了做法与协程类似,future 还可以绑定回调,回调的顺序按照其注册的顺序调用。

import asyncio
import functools


def callback(future, n):
    print(f"future result: {future.result()} n:{n}")


async def register_callback(future):
    print("register callback on futures")
    # 设置一个回调,只能传递函数名,触发回调的时候,会自动将future本身作为第一个参数传递给回调函数
    # 回调什么时候执行,还是那句话,当 future 执行set_result的时候执行
    future.add_done_callback(functools.partial(callback, n=1))
    future.add_done_callback(functools.partial(callback, n=2))


async def main(future):
    # 等待回调注册完成
    await register_callback(future)
    print("setting result of future")
    future.set_result("the result")


event_loop = asyncio.get_event_loop()
# 可以直接创建一个未来对象
future = asyncio.Future()
event_loop.run_until_complete(main(future))
"""
register callback on futures
setting result of future
future result: the result n:1
future result: the result n:2
"""

执行任务

任务是与事件循环交互的主要途径之一,任务可以包装协程,并跟踪协程何时完成。另外 Task 继承自 Future,所以它是可以等待的。每个任务都有一个结果,是通过调用 future 内部 set_result 方法设置的,设置的值就是协程的返回值,并且可以通过调用 result() 获取这些结果,当然这些上面说过了,这里不再赘述了。

import asyncio


# 启动一个任务,可以使用 create_task 函数创建一个 Task 对象
# 只要循环还在运行而且协程没有返回, create_task 得到的任务便会作为事件循环管理的并发操作的一部分运行
async def task_func():
    print("in task func")
    return "the result"


async def main(loop):
    print("creating task")
    # 除了使用loop.create_task,我们还可以使用asyncio.ensure_future
    # 对于传入一个协程的话,asyncio.ensure_future还是调用了loop.create_task
    task = loop.create_task(task_func())
    print(f"wait for {task}")
    return_value = await task
    print(f"task completed {task}")
    print(f"return value {return_value}")



loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
creating task
wait for <Task pending name='Task-2' coro=<task_func() running at D:/satori/2.py:6>>
in task func
task completed <Task finished name='Task-2' coro=<task_func() done, defined at D:/satori/2.py:6> result='the result'>
return value the result
"""
# 在我们还没有await驱动任务执行的时候, 是Task pending
# 当await之后, 已处于finished状态, 我们看到了 result, 这个就是调用 set_result 设置进去的

通过 create_task 可以创建任务,那么也可以在任务完成前取消操作。

import asyncio


async def task_func():
    print("in task func")
    return "the result"


async def main(loop):
    print("creating task")
    task = loop.create_task(task_func())

    print("canceling task")
    # 任务创建之后,可以调用cancel函数取消
    task.cancel()
    print(f"canceled task: {task}")

    try:
        # 任务取消之后再await则会引发CancelledError
        await task
    except asyncio.CancelledError:
        print("caught error from canceled task")
    else:
        print(f"task result: {task.result()}")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
"""
creating task
canceling task
canceled task: <Task cancelling name='Task-2' coro=<task_func() running at D:/satori/2.py:4>>
caught error from canceled task
"""

调用常规函数

事件循环中有三个函数,分别是 call_soon、call_later、call_at,我们来看看它们的用法。

call_soon

可以使用这个函数给协程绑定一个回调,从名字也能看出来是立即执行,只不过是遇到阻塞立即执行。

import asyncio
from functools import partial

'''
除了管理协程和I/P回调,asyncio事件循环还可以根据循环中保存的一个定时器值来调度常规函数调用。
'''
# 如果回调的时间不重要,那么可以使用call_soon调度下一次循环迭代的调用


def callback(*args, **kwargs):
    print("callback:", args, kwargs)


async def main(loop):
    print("register callback")
    # 接收一个回调函数,和参数
    loop.call_soon(callback, "mashiro", 16)
    print("********")
    # 另外call_soon不支持使用关键字参数来向回调传递参数
    # 所以如果想使用关键字参数,需要使用偏函数转换一下
    # 其实不仅是这里的call_sonn,以及后面要介绍的call_later和call_at都不支持使用关键字参数来向回调传递参数
    # 因此如果不想使用偏函数来包装的话,就直接使用位置参数就可以了
    wrapped = partial(callback, **{"name": "satori", "age": 16})
    loop.call_soon(wrapped, "mahsiro", 16)
    print("—————————")

    await asyncio.sleep(0.6)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
register callback
********
—————————
callback: ('mashiro', 16) {}
callback: ('mahsiro', 16) {'name': 'satori', 'age': 16}
"""
# 另外我们发现我们在调用了call_soon之后没有立刻执行,而是先进性了print
# 这是因为只有在遇到阻塞才会立刻执行,所以当遇到await asyncio.sleep的时候会去执行
# 另外这里的阻塞,不能是time.sleep,必须是可以awaitable的阻塞

call_later

同样是给一个协程绑定一个回调,但是从名字也能看出来这需要指定一个时间,表示多长时间之后调用。

import asyncio

'''
要将回调推迟到将来的某个时间调用,可以使用call_later。这个方法的第一个参数是延迟时间(单位为秒),第二个参数是回调。
'''


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(0.2, callback, "call_later", "0.2s")
    loop.call_later(0.1, callback, "call_later", "0.1s")
    loop.call_soon(callback, "call_soon", "None")
    print("-----------")
    await asyncio.sleep(0.6)


event_loop = asyncio.get_event_loop()
try:
    print("entering event loop")
    event_loop.run_until_complete(main(event_loop))
finally:
    print("closing event loop")
    event_loop.close()

'''
entering event loop
register callback
-----------
call_soon None
call_later 0.1s
call_later 0.2s
closing event loop
'''

我们注意一下 main 里面的第二个 print,我们看到无论是 call_soon 还是 call_later 都是在第二个print 结束之后才调用,说明 call_later 和 call_soon 一样,都是在遇到异步 io 阻塞、比如 asyncio.sleep 之后才会执行。

但是值得一提的是,对于 call_later 来说,计时是从注册回调的那一刻就已经开始了。可如果假设执行 call_later 注册的回调需要 3s,但是asyncio.sleep异步阻塞只有 2s,该怎么办呢?那么不好意思,程序会继续往下走,因为asyncio.sleep 结束之后,还需要 1s 才会执行 call_later 指定的回调。所以程序向下执行,直到出现下一个异步 io 阻塞,如果不是异步 io 阻塞的话,那么 call_later 指定的回调也是不会执行的。

因此:执行回调,是指在遇见异步 io 阻塞的时候才会执行。call_soon 是只要遇见异步 io 就会执行,即使遇见异步 io,call_later 已经等待完毕,执行的先后顺序依旧是 call_soon 先执行。我们来验证一下:

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(0.2, callback, "call_later", "0.2s")
    loop.call_later(0.1, callback, "call_later", "0.1s")
    loop.call_soon(callback, "call_soon", "None")
    # time.sleep不是异步io, 它是一个同步io
    time.sleep(1)
    # 当time.sleep(1)之后call_later和call_soon肯定都会执行,因为call_later里面指定的是 0.2 和 0.1, 比1小
    await asyncio.sleep(0.6)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_soon None
call_later 0.1s
call_later 0.2s
'''

再来看看 call_later:

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "2s")
    print("call_later注册完毕")
    # 这里执行完毕,call_later还没有开始
    await asyncio.sleep(1.5)
    # 1.5 + 1肯定比2大,所以time.sleep(1)之后call_later里面的指定的时间肯定已经过了
    time.sleep(1)
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    await asyncio.sleep(0.1)
    print("完了,我上面出现了异步io阻塞,我要比call_later指定的回调后执行了")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_later注册完毕
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
call_later 2s
完了,我上面出现了异步io阻塞,我要比call_later指定的回调后执行了
'''

再来看个栗子:

import asyncio


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "2s")
    print("call_later注册完毕")
    await asyncio.sleep(1)
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("不能")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_later注册完毕
call_later指定的回调能执行吗
call_later指定的回调能执行吗
call_later指定的回调能执行吗
不能
'''

我们看到 call_later 指定的回调没有执行程序就退出了,这是因为 main 里面的代码全部执行完之后 call_later 指定的时间还没有到,所以直接退出了。

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "2s")
    print("call_later注册完毕")
    await asyncio.sleep(1)
    time.sleep(1)
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("能,因为时间已经到了")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_later注册完毕
call_later指定的回调能执行吗
call_later指定的回调能执行吗
call_later指定的回调能执行吗
能,因为时间已经到了
call_later 2s
'''

当代码全部执行完之后,call_later 指定的时间已经到了,所以会在最后执行它。

call_at

除了 call_soon 瞬间执行,和 call_later 延迟执行之外,还有一个call_at 在指定之间内执行。实现这个目的的循环依赖于一个单调时钟,而不是墙上的时钟时间,以确保 now 时间绝对不会逆转。要为一个调度回调选择时间,必须使用循环的time方法从这个时钟的内部开始。

import asyncio
import time


def callback(cb, loop):
    print(f"callback {cb} invoked at {loop.time()}")


async def main(loop):
    now = loop.time()
    print("clock time:", time.time())
    print("loop time:", now)
    print("register callback")
    # 表示在当前时间(time = loop.time())之后的0.2s执行,个人觉得类似于 call_later
    loop.call_at(now + 0.2, callback, "call_at", loop)
    time.sleep(1)
    print("是先打印我呢?还是先执行call_at或者call_sonn呢")
    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
clock time: 1573291054.068545
loop time: 160079.265
register callback
是先打印我呢?还是先执行call_at或者call_sonn呢
callback call_at invoked at 160079.453
'''
# 所以这和call_later也是类似的,都是在遇到io阻塞之后才会执行

以上三者的执行顺序

首先在遇到异步 io 阻塞的时候,call_soon 是立刻执行,call_later 和 call_at 是需要等指定过了才会执行,如果时间没到,那么执行顺序肯定是call_soon最先,这没问题。但是,如果当遇到一个异步io阻塞的时候,call_later 和 call_at 所指定的时间都过了,那么这个三者的执行顺序是怎么样的呢?

import asyncio


def callback(cb):
    print(f"callback {cb}")


async def main(loop):
    now = loop.time()
    loop.call_at(now + 0.2, callback, "call_at")
    loop.call_later(0.2, callback, "call_later")
    loop.call_soon(callback, "call_soon")

    #await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
callback call_soon
'''

首先我们发现,如果没有异步 io 阻塞,那么最终只有 call_soon 会执行。

import asyncio
import time


def callback(cb):
    print(f"callback {cb}")


async def main(loop):
    now = loop.time()
    loop.call_at(now + 0.3, callback, "call_at")
    loop.call_later(0.2, callback, "call_later")
    loop.call_soon(callback, "call_soon")
    time.sleep(1)
    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
callback call_soon
callback call_later
callback call_at
'''
# 遇到异步io, 那么 call_soon 仍然最先执行
# 至于 call_later 和 call_at, 则是两者指定的时间哪个先到, 先执行哪个

多个task并发执行

首先我们来看一个例子。

import asyncio
import time


async def foo():
    await asyncio.sleep(1)


async def main():
    # 三者是一样的
    await foo()
    await asyncio.create_task(foo())
    await asyncio.ensure_future(foo())


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")  # 总用时:3.0022929

在一个协程中可以使用 await 关键字驱动另一个协程执行,这里我们驱动了三个协程执行,这没问题。但问题是我们发现总用时为 3s,这是为什么?不是说遇见异步 io 会自动切换么?那么整体用时应该还是 1s 才对啊。确实理论上是这样的,但是观察我们的代码是怎么写的,我们三个 await 是分开写的,而且 await 协程 是能得到当前协程的返回值的,如果这个协程都还没有执行完毕、对应的 Task 对象都还没结束、Future 对象还没有 set_result,我们又怎么能拿到呢?还是那句话,异步是在多个协程之间进行切换,至于当前的协程阻塞了只会切换到另一个协程里面去执行,但是对于当前协程来说,该阻塞多长还是阻塞多长,不可能说这一步阻塞还没过去,就直接调到下一行代码去执行,这是不可能的。

因此三个await,必须等第一个 await 完毕之后,才会执行下一个 await。至于我们刚才的call_soon、call_later、call_at,可以看做是另一个协程,在遇到了 asyncio.sleep 之后就切换过去了。但是对于协程本身来说,该asyncio.sleep 多少秒还是多少秒, 只有sleep结束了,才会执行 await asyncio.sleep 下面的代码。还是那句话,切换是指多个协程之间切换,而我们上面代码是两个 await,这两个 await 本身来说相当于还是串行,就是 main 协程里面的两行代码,只有第一个await结束了,才会执行第二个await。

那么问题来了,我们如何才能让这两个协程并发的执行呢?

asyncio.wait

首先是 asyncio.wait

import asyncio
import time


async def task_func():
    await asyncio.sleep(1)


async def main():
    # 将多个协程或者任务放在一个列表里面,传给 asyncio.wait
    # 里面还可以再传其他参数:
    # timeout:超时时间, 如果在这个时间段内任务没有执行完毕, 那么没完成的任务直接取消
    # return_when:FIRST_COMPLETED, 第一个任务完成时结束; FIRST_EXCEPTION, 第一次出现异常时结束; ALL_COMPLETED, 所有任务都完成时结束。
    # 默认是ALL_COMPLETED
    await asyncio.wait([task_func(), task_func()])


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")  # 总用时:1.0012839

我们看到此时就只用了 1s, 因为两个任务(协程被包装成任务)是一起执行的。

那么如何获取任务的返回值呢?

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():
    # 这个 wait 函数有两个返回值, 一个是执行状态为完成的Task对象, 一个是未完成的Task对象
    finished, pending = await asyncio.wait([task_func(_) for _ in range(1, 5)])
    # 我们说过一旦任务完成,就会通过 future 内部的 set_result方法设置返回值
    # 然后我们通过 future.result() 就能拿到返回值, 而 Task 是 Future 的子类, 可以直接通过 Task 对象调用
    print(f"results: {[task.result() for task in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")  
"""
results: ['task2', 'task3', 'task1', 'task4']
总用时:1.0015823
"""

但是我们发现执行的顺序貌似不是我们添加的顺序,因此 wait 返回的 future 的顺序是无序的,如果希望有序,那么需要使用另一个函数。

可能有人觉得这个 pending 是不是有点脱裤子放屁的感觉的,asyncio.wait 会等到所有的任务都完成,而 pending 又表示没有完成的任务,这不矛盾吗?答案是不矛盾,因为我们 asyncio.wait 内部可以接收一个超时时间,时间一到,没有执行完的任务会直接被取消掉。

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(n)
    return f"task{n}"


async def main():
    finished, pending = await asyncio.wait([task_func(_) for _ in range(1, 5)], timeout=3.9)
    print(f"results: {[future.result() for future in finished]}")
    print(len(pending))  # 1


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
results: ['task3', 'task1', 'task2']
1
总用时:3.9053138
"""

我们看到有一个未完成的任务。

asyncio.gather

asyncio.gather 可以保证返回的结果有序。

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():

    # gather只有一个返回值,直接返回已完成的任务的返回值,注意是返回值,不是任务,也就是说返回的是future.result() 或者 task.result()
    # 但是传递的时候就不要传递列表,而是需要传递一个个的task,因此我们这里要将列表打散
    finished = await asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    print(f"results: {[res for res in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
results: ['task1', 'task2', 'task3', 'task4']
总用时:1.0012363
"""

使用 gather 是可以保证顺序的,顺序就是我们添加任务的顺序。

但这里还有一个问题,如果我们执行的任务里面报错了该怎么办?我们来看一下:

import time
import asyncio


async def f1():
    1 / 0
    return "f1"

async def f2():
    time.sleep(1)
    print("我是 f2")
    return "f2"

async def f3():
    time.sleep(1)
    print("我是 f3")
    return "f3"

async def main():
    finished = await asyncio.gather(f1(), f2(), f3())
    print(finished)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
我是 f2
Traceback (most recent call last):
  File "D:/satori/1.py", line 24, in <module>
    loop.run_until_complete(main())
  File "C:\python38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "D:/satori/1.py", line 20, in main
    finished = await asyncio.gather(f1(), f2(), f3())
  File "D:/satori/1.py", line 6, in f1
    1 / 0
ZeroDivisionError: division by zero
我是 f3
"""

首先 f1 中出现了除零异常,如果一个任务出现了异常,那么会导致整体异常。但是一个任务出现了异常,并不代表其它的任务就不执行了,从结果上看 f2 和 f3 都已经执行完毕了。但是问题来了,如果我不希望一个任务失败而导致整体异常,该怎么做呢?

import time
import asyncio


async def f1():
    1 / 0
    return "f1"

async def f2():
    time.sleep(1)
    print("我是 f2")
    return "f2"

async def f3():
    time.sleep(1)
    print("我是 f3")
    return "f3"

async def main():
    # asyncio.gather 内部有一个参数 return_exceptions, 默认是 False
    # 如果设置为 True 的话, 那么在失败的时候会将异常返回
    finished = await asyncio.gather(f1(), f2(), f3(), return_exceptions=True)
    print(finished)
    print(finished[0], type(finished[0]))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
我是 f2
我是 f3
[ZeroDivisionError('division by zero'), 'f2', 'f3']
division by zero <class 'ZeroDivisionError'>
"""

我们看到当失败的时候,返回值就是对应的异常。

除此之外,asyncio.gather 还可以进行分组,举个栗子:

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():

    group1 = asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    group2 = asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    finished = await asyncio.gather(group1, group2)
    print(f"results: {[res for res in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
results: [['task1', 'task2', 'task3', 'task4'], ['task1', 'task2', 'task3', 'task4']]
总用时:1.0011522
"""

我们看到此时的 finished 就是一个包含列表的列表,里面的列表就是每一个组的结果。

asyncio.as_completed

我们看到这个 wait 类似于 concurrent.futures 里面的 submit,gather 类似于 map,而 concurrent.futures 里面还有一个 as_completed,那么同理 asyncio 里面也有一个 as_completed。另外个人觉得 asyncio 借鉴了 concurrent.futures 里的不少理念,而且 wai t里面还有一个 return_when,这个里面的参数,内部就是从 concurrent.futures 包里面导入的。

那这个函数是用来干什么的呢?从名字也能看出来,是哪个先完成哪个就先返回。as_completed 函数调用后返回一个生成器,会管理指定的一个协程列表,并生成它们的结果,每个协程结束运行时一次生成一个结果。与 wait 类似,as_completed 不能保证顺序,从名字也能看出来,哪个先完成哪个先返回。

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(n)
    return f"task{n}"


async def main():
    # 同样需要传递一个列表, 里面同样可以指定超时时间
    completed = asyncio.as_completed([task_func(2), task_func(1), task_func(3), task_func(4)])
    # 遍历每一个task,进行await,哪个先完成,就先返回
    for task in completed:
        res = await task
        print(res)


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
task1
task2
task3
task4
总用时:4.0048034999999995
"""

同步原语

尽管asyncio应用通常作为单线程的进程运行,不过仍被构建为并发应用。由于I/O以及其他外部事件的延迟和中断,每个协程或任务可能按照一种不可预知的顺序执行,为了支持安全的并发执行,asyncio 包含了 threading 和 multiprocessing 模块中一些底层原语的实现。

Lock 可以用来保护对一个共享资源的访问,只有锁的持有者可以使用这个资源。如果有多个请求要得到这个锁,那么其将会阻塞,以保证一次只有一个持有者。

import asyncio


def unlock(lock):
    print("回调释放锁,不然其他协程获取不到。")
    print("但我是1秒后被调用,锁又在只能通过调用我才能释放,所以很遗憾,其他协程要想执行,至少要1秒后了")
    lock.release()


async def coro1(lock):
    print("coro1在等待锁")
    # 使用async with语句很方便,是一个上下文。
    # 我们知道在多线程中,也可以使用with,相当于开始的lock.acquire和结尾lock.release
    # 那么在协程中,也有await lock.acquire和lock.release,以及专业写法async with
    async with lock:
        print("coro1获得了锁")
        print("coro1释放了锁")


async def coro2(lock):
    print("coro2在等待锁")
    # 使用await lock.acquire()和lock.release()这种方式也是一样的
    await lock.acquire()
    print("coro2获得了锁")
    print("coro2释放了锁")
    # 注意release是不需要await的
    lock.release()


async def main(loop):
    # 创建共享锁
    lock = asyncio.Lock()

    print("在开始协程之前创建一把锁")
    await lock.acquire()  # 这里先把锁给锁上
    print("锁是否被获取:", lock.locked())

    # 执行回调将锁释放,不然协程无法获取锁
    loop.call_later(1, unlock, lock)

    # 运行想要使用锁的协程
    print("等待所有协程")
    await asyncio.wait([coro1(lock), coro2(lock)])



loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
在开始协程之前创建一把锁
锁是否被获取: True
等待所有协程
coro2在等待锁
coro1在等待锁
回调释放锁,不然其他协程获取不到。
但我是1秒后被调用,锁又在只能通过调用我才能释放,所以很遗憾,其他协程要想执行,至少要1秒后了
coro2获得了锁
coro2释放了锁
coro1获得了锁
coro1释放了锁
"""

事件

和线程一样,协程里面也有事件的概念。asyncio.Event 基于 threading.Event,它允许多个消费者等待某个事件发生。Event 对象可以使用 set、wait、clear。

  • set:设置标志位,调用is_set可以查看标志位是否被设置。一个刚创建的Event对象默认是没有设置的
  • wait:等待,在没有调用set的情况下,会阻塞。如果设置了set,wait则不会阻塞
  • clear:清空标志位
import asyncio


def set_event(event):
    print("设置标志位,因为协程会卡住,只有设置了标志位才会往下走")
    print("但我是一秒后才被调用,所以协程想往下走起码也要等到1秒后了")
    event.set()


async def coro1(event):
    print("coro1在这里卡住了,快设置标志位啊")
    await event.wait()
    print(f"coro1飞起来了,不信你看现在标志位,是否设置标志位:{event.is_set()}")


async def coro2(event):
    print("coro2在这里卡住了,快设置标志位啊")
    await event.wait()
    print(f"coro2飞起来了,不信你看现在标志位,是否设置标志位:{event.is_set()}")


async def main(loop):
    # 创建共享事件
    event = asyncio.Event()
    # 现在设置标志位了吗?
    print("是否设置标志位:", event.is_set())

    # 执行回调将标志位设置,不然协程卡住了
    loop.call_later(1, set_event, event)

    # 运行卡住的的协程
    print("等待所有协程")
    await asyncio.wait([coro1(event), coro2(event)])


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
是否设置标志位: False
等待所有协程
coro2在这里卡住了,快设置标志位啊
coro1在这里卡住了,快设置标志位啊
设置标志位,因为协程会卡住,只有设置了标志位才会往下走
但我是一秒后才被调用,所以协程想往下走起码也要等到1秒后了
coro2飞起来了,不信你看现在标志位,是否设置标志位:True
coro1飞起来了,不信你看现在标志位,是否设置标志位:True
"""

队列

asyncio.Queue 为协程提供了一个先进先出的数据结构,这与线程的 queue.Queue 或者进程里面的 Queue 很类似。

import asyncio
import time


async def consumer(q: asyncio.Queue, n):
    print(f"消费者{n}号 开始")
    while True:
        await asyncio.sleep(2)
        item = await q.get()
        # 由于我们要开启多个消费者, 为了让其停下来, 我们添加None作为停下来的信号
        if item is None:
            # task_done是什么意思? 队列有一个属性, 叫做unfinished_tasks
            # 每当我们往队列里面put一个元素的时候, 这个值就会加1,
            q.task_done()
            # 并且队列还有一个join方法, 调用之后会一直阻塞, 什么时候不阻塞呢? 当 unfinished_tasks 为 0 的时候。
            # 但是我们每put一个元素的时候, unfinished_tasks都会加 1
            # 而 get 一个元素的时候, unfinished_tasks 不会自动减 1
            # get方法不会自动帮我们做这件事,需要手动调用task_done方法实现
            break
        print(f"消费者{n}号: 消费元素{item}")
        q.task_done()


async def producer(q: asyncio.Queue, consumer_num):
    print(f"生产者 开始")
    for i in range(1, 10):
        await q.put(i)
        print(f"生产者: 生产元素{i}, 并放在了队列里")
    # 为了让消费者停下来, 我就把None添加进去吧
    # 开启几个消费者, 就添加几个None
    for i in range(consumer_num):
        await q.put(None)

    # 等待所有消费者执行完毕
    # 只要unfinished_tasks不为0,那么q.join就会卡住,直到消费者全部消费完为止
    await q.join()
    print("生产者生产的东西全被消费者消费了")


async def main(consumer_num):
    q = asyncio.Queue()
    consumers = [consumer(q, i) for i in range(consumer_num)]
    await asyncio.wait(consumers + [producer(q, consumer_num)])


start = time.perf_counter()
asyncio.run(main(3))
print(f"总用时:{time.perf_counter() - start}")
"""
消费者1号 开始
生产者 开始
生产者: 生产元素1, 并放在了队列里
生产者: 生产元素2, 并放在了队列里
生产者: 生产元素3, 并放在了队列里
生产者: 生产元素4, 并放在了队列里
生产者: 生产元素5, 并放在了队列里
生产者: 生产元素6, 并放在了队列里
生产者: 生产元素7, 并放在了队列里
生产者: 生产元素8, 并放在了队列里
生产者: 生产元素9, 并放在了队列里
消费者0号 开始
消费者2号 开始
消费者1号: 消费元素1
消费者0号: 消费元素2
消费者2号: 消费元素3
消费者1号: 消费元素4
消费者0号: 消费元素5
消费者2号: 消费元素6
消费者1号: 消费元素7
消费者0号: 消费元素8
消费者2号: 消费元素9
生产者生产的东西全被消费者消费了
总用时:7.989401599999999
"""

我们对队列进行循环,然后 await 的时候,实际上有一个更加 pythonic 的写法,也就是 async for。

import asyncio
import time
from tornado.queues import Queue
from tornado import gen


# 注意 asyncio 中的 Queue 不支持 async for,我们需要使用 tornado 中的 Queue
async def consumer(q: Queue, n):
    print(f"消费者{n}号 开始")
    async for item in q:
        await gen.sleep(2)
        if item is None:
            q.task_done()
            break
        print(f"消费者{n}号: 消费元素{item}")
        q.task_done()


async def producer(q: Queue, consumer_num):
    print(f"生产者 开始")
    for i in range(1, 10):
        await q.put(i)
        print(f"生产者: 生产元素{i},并放在了队列里")
    for i in range(consumer_num):
        await q.put(None)

    await q.join()
    print("生产者生产的东西全被消费者消费了")


async def main(consumer_num):
    q = Queue()
    consumers = [consumer(q, i) for i in range(consumer_num)]
    await asyncio.wait(consumers + [producer(q, consumer_num)])


start = time.perf_counter()
asyncio.run(main(3))
print(f"总用时:{time.perf_counter() - start}")
"""
消费者1号 开始
生产者 开始
生产者: 生产元素1,并放在了队列里
生产者: 生产元素2,并放在了队列里
生产者: 生产元素3,并放在了队列里
生产者: 生产元素4,并放在了队列里
生产者: 生产元素5,并放在了队列里
生产者: 生产元素6,并放在了队列里
生产者: 生产元素7,并放在了队列里
生产者: 生产元素8,并放在了队列里
生产者: 生产元素9,并放在了队列里
消费者0号 开始
消费者2号 开始
消费者0号: 消费元素2
消费者2号: 消费元素3
消费者1号: 消费元素1
消费者0号: 消费元素4
消费者2号: 消费元素5
消费者1号: 消费元素6
消费者0号: 消费元素7
消费者2号: 消费元素8
消费者1号: 消费元素9
生产者生产的东西全被消费者消费了
总用时:8.0046028
"""

协程与线程结合

如果出现了一个同步耗时的任务,我们可以将其扔到线程池里面去运行。对于协程来说,仍然是单线程的,我们是可以将耗时的任务单独开启一个线程来执行的。

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor


def foo(n):
    time.sleep(n)
    print(f"foo睡了{n}秒")


async def bar():
    await asyncio.sleep(3)
    return "bar"


async def main():
    # 线程池最多装两个任务
    executor = ThreadPoolExecutor(max_workers=2)
    # loop.run_in_executor 表示扔到线程池里面运行, 这个过程是瞬间返回的
    loop.run_in_executor(executor, foo, 3)
    loop.run_in_executor(executor, foo, 2)
    print("瞬间返回")
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"总用时:{time.perf_counter() - start}")
"""
瞬间返回
foo睡了2秒
foo睡了3秒
bar
总用时:3.0015592
"""

所以 run_in_executor 相当于将耗时的任务单独丢到一个线程中执行,但是它不会创建任务,而是返回一个 future,因此它不会等待,而是会立刻向下执行。如果我们希望等待它完成之后再执行下面的逻辑呢?

import asyncio
import time


def foo(n):
    time.sleep(n)
    return f"foo睡了{n}秒"


async def bar():
    await asyncio.sleep(1)
    return "bar"


async def main():
    # 这里不创建线程池也是可以的, 传递一个 None 的话会默认创建
    # 可以调用 await, 会等待耗时任务完成, 同时拿到返回值
    print(await loop.run_in_executor(None, foo, 3))
    print("瞬间返回吗?")
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"总用时:{time.perf_counter() - start}")
"""
foo睡了3秒
瞬间返回吗?
bar
总用时:4.0045044
"""

同步 3s,加上异步 1s,所以总共 4s。因此即便扔到线程池中,也是可以等待它完成的,但是这样做没有什么意义,因为你既然要等待的话,那干脆还不如直接调用,有啥必要扔到线程池中呢?

但我之所以说这一点,主要是想表明 run_in_executor 仍然会返回一个 future,因此这就意味着耗时的任务执行完毕之前,循环是不可以关闭的。

import asyncio
import time


def foo(n):
    time.sleep(n)
    return f"foo睡了{n}秒"


async def bar():
    await asyncio.sleep(1)
    return "bar"


async def main():
    loop.run_in_executor(None, foo, 3)
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
bar
exception calling callback for <Future at 0x152bd9ccf10 state=finished returned str>
Traceback (most recent call last):
  File "C:\python38\lib\concurrent\futures\_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "C:\python38\lib\asyncio\futures.py", line 374, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "C:\python38\lib\asyncio\base_events.py", line 764, in call_soon_threadsafe
    self._check_closed()
  File "C:\python38\lib\asyncio\base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
"""

我们看到循环关闭之后,耗时的任务还没有完成,而主线程默认会等待所有子线程执行结束。于是当耗时任务执行完毕之后,返回一个 future,在解析这个 future 的时候发现循环已经关闭了,因此就报错了。

所以我们不能用 asyncio.run,因为它的内部自动包含了 loop.close() 逻辑,如果当耗时任务用时比较长的时候,那么关闭循环之后会报错,但是报错也是在任务执行完毕、返回 future 之后才会报错。

另外,我们有多个同步耗时任务需要扔到线程池中的话,那么最好事先创建一个线程池。因为如果不指定,那么每一个耗时的任务都会创建一个线程池。

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time


def foo(n):
    time.sleep(n)
    return f"foo睡了{n}秒"


async def bar():
    await asyncio.sleep(1)
    return "bar"


async def main():
    executor = ThreadPoolExecutor(max_workers=2)
    # 我们可以在调用 run_in_executor 传入 executor, 也可以通过 set_default_executor 进行设置
    # 这样的话, 后面就会调用 run_in_executor 的时候就会使用我们创建的线程池
    loop.set_default_executor(executor)
    loop.run_in_executor(None, foo, 3)
    loop.run_in_executor(None, foo, 3)
    loop.run_in_executor(None, foo, 3)
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

关于 async with 和 async for

如果让你定义一个类支持一个 with 和 for 语句,我相信肯定没有问题,但是 async with 和 async for 呢?我们要怎么实现呢?

async with

我们知道自定义一个类支持 with 语句,需要实现 __enter____exit__ 这两个魔法方法,那么如果想支持 async with,则需要实现 __aenter____aexit__

import asyncio


class Open:

    def __init__(self, file, mode="r", encoding="utf-8"):
        self.file = file
        self.mode = mode
        self.encoding = encoding
        self.__fd = None

    # 要使用 async def 定义
    async def __aenter__(self):
        self.__fd = open(file=self.file, mode=self.mode, encoding=self.encoding)
        return self.__fd

    # 同样使用 async def 定义
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.__fd.close()


# 既然是async就必须要创建协程,扔到事件循环里面运行
async def main():
    async with Open("白色相簿.txt") as f:
        print(f.read())


asyncio.run(main())
"""
为什么你那么熟练啊
"""

可以看到我们自己实现了一个 async with,但是注意这个不是异步的,我们还是调用了底层的 open 函数。当然还可以使用 contextlib:

import asyncio
import contextlib


@contextlib.asynccontextmanager
async def foo():
    print("xxx")
    l = list()
    yield l 
    print(l)


async def main():
    async with foo() as l:
        l.append(1)
        l.append(2)
        l.append(3)


asyncio.run(main())
"""
xxx
[1, 2, 3]
"""

async for

我们知道自定义一个类支持 for 语句,需要实现 __iter____next__ 这两个魔法方法,那么如果想支持 async for,则需要实现 __aiter____anext__

import asyncio


class A:

    def __init__(self):
        self.l = [1, 2, 3, 4]
        self.__index = 0

    # 注意:定义 __aiter__ 是不需要 async 的
    def __aiter__(self):
        return self

    # 但是定义 __anext__ 需要 async
    async def __anext__(self):
        try:
            res = self.l[self.__index]
            self.__index += 1
            return res
        except IndexError:
            # 捕获异常,协程则要raise一个StopAsyncIteration
            raise StopAsyncIteration


async def main():
    async for _ in A():
        print(_)


asyncio.run(main())
"""
1
2
3
4
"""

另外我们知道,可以对 for 循环可以作用于生成器,那么 async for 则也可以作用于异步生成器中。

import asyncio


# 具体版本记不清了,不知是3.5还是3.6,记得那时候引入async和await的时候,python是不允许async和yield两个关键字同时出现的
# 但是现在至少python3.7是允许的,这种方式叫做异步生成器。
# 但是注意:如果async里面出现了yield,那么就不可以有return xxx了。
async def foo():
    yield 123
    yield 456
    yield 789
    print("xxx")


async def main():
    async for _ in foo():
        print(_)


asyncio.run(main())
"""
123
456
789
xxx
"""

await

很多人可能对 Python 中的 await 这个关键字很懵逼,到底什么对象才可以被 await 呢?

从抽象基类的源码中我们可以看到一个对象如果想被 await,就必须要实现 __await__ 这个魔法方法。

import asyncio


class A:

    def __await__(self):
        return "xxx"


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  # __await__() returned non-iterator of type 'str'

但是它报错了,意思是必须返回一个迭代器。

import asyncio


class A:

    def __await__(self):
        return "xxx".__iter__()


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  # Task got bad yield: 'x'

说明要返回一个迭代器,然后 yield,但是这里提示我们 Task got bad yield: 'x'。我们来分析一下这句话,bad yield: 'x',肯定是告诉我们 yield 出了一个不好的值,这个不好的值被 Task 获取了,也就是不应该给 Task 一个 'x'。咦,Task,这是啥?我们首先想到了 asyncio 里面 task,而 task 对应的类正是 Task,这是不是说明我们返回一个Task对象就是可以了。

import asyncio


async def foo():
    return "我是foo"


class A:

    def __await__(self):
        # 同样需要调用__iter__
        return asyncio.ensure_future(foo()).__iter__()


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  
"""
我是foo
"""
# 可以看到成功执行, 其实兜了这么大圈子,完全没必要
# await 后面肯定是一个 task(或者future), 而我们 await 的是A(), 那么A()的 __await__ 方法返回的肯定是一个task(或者future)
# await后面如果是coroutine, 会自动包装成 task
# 但是用我们自己的类返回的话, 那么我们必须在 __await__ 中手动的使用 ensure_future 或者 create_task 进行包装, 再返回其迭代器
# asyncio.ensure_future(foo()).__iter__() 还可以换成 asyncio.ensure_future(foo()).__await__(),后者同样会返回迭代器
# 其实最简单的做法是直接返回:foo().__await__() 即可

手动实现异步

尽管我们实现了 async for、async with、await 等方法,但是它们并没有达到异步的效果,比如之前的 async for 底层还是使用了 open。再比如网络请求,对于像 requests 这种库,是属于同步的,因此在协程函数中使用requests.get 是没有用的。正如在协程函数中使用 time.sleep 一样,如果想切换,就必须使用 asyncio.sleep,因为这本质上还是一个协程。所以如果想在获取网络请求的时候,使用 requests 来达到异步的效果是行不通的,只能通过底层的 socket 重新设计,比如aiohttp。再比如那些数据库驱动,也是同样的道理。

因此 Python 中的协程比较尴尬的就是,你学完了不知道该怎么用,有时发现用了异步,耗时反而还比同步的要高。

目前来说,我们想实现异步,最好的方式就是通过线程池。

我们有两个文件:

1.txt, 内容如下

地灵殿的主人,她独居在旧地狱的地灵殿,完全放弃和他人沟通。
旧地狱里满是受人厌恶的妖怪,但其中仍然有一种离群索居、被归为遭人厌恶的类别,那就是“觉”。
她们和人类与妖怪相处不来,逃入了旧地狱,结果连旧地狱的妖怪都讨厌她们,地灵殿没有访客,也不会有人跑到地灵殿来找她。

2.txt, 内容如下

有着粉紫色的短卷发,黑褐色眼瞳的少女。
头上戴着黑色的发箍,发箍上有黄色的心形装饰,连接这她的第三只眼。
与妹妹恋不同,觉的第三只眼是红色的。
穿着淡蓝色的上衣,袖口与披肩都是淡粉色。下身穿着由白色渐变为粉色的短裙,鞋子则是粉色的拖鞋。

下面来通过线程池的方式进行读取,如果有耗时的任务,那么不妨扔到线程池中。

import asyncio
import time


class Reader:

    def __init__(self, file):
        self.fd = open(file, "r", encoding="utf-8")

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.fd.close()

    def __read(self):
        # 假设读取一个文件需要两秒钟
        time.sleep(2)
        return self.fd.read()

    async def read(self):
        # 如果我们需要其返回值, 那么需要进行await
        # 并且不传线程池, 会默认创建一个
        return await loop.run_in_executor(None, self.__read)


async def read(name):
    async with Reader(name) as f:
        print(await f.read())


async def main():
    await asyncio.wait([read("1.txt"), read("2.txt")])


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print(f"总用时: {time.perf_counter() - start}")
"""
地灵殿的主人,她独居在旧地狱的地灵殿,完全放弃和他人沟通。
旧地狱里满是受人厌恶的妖怪,但其中仍然有一种离群索居、被归为遭人厌恶的类别,那就是“觉”。
她们和人类与妖怪相处不来,逃入了旧地狱,结果连旧地狱的妖怪都讨厌她们,地灵殿没有访客,也不会有人跑到地灵殿来找她。
有着粉紫色的短卷发,黑褐色眼瞳的少女。
头上戴着黑色的发箍,发箍上有黄色的心形装饰,连接这她的第三只眼。
与妹妹恋不同,觉的第三只眼是红色的。
穿着淡蓝色的上衣,袖口与披肩都是淡粉色。下身穿着由白色渐变为粉色的短裙,鞋子则是粉色的拖鞋。
总用时: 2.0039085
"""

我们这里没有传递线程池,那么会默认创建一个,当然生产环境中我们最好手动创建一个,并且指定能容纳的线程数量。

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time


class Reader:
    __thread_pool = ThreadPoolExecutor(max_workers=2)

    def __init__(self, file):
        self.fd = open(file, "r", encoding="utf-8")

    def __aiter__(self):
        return self

    def __read(self):
        # 假设读一行需要 1s
        try:
            time.sleep(1)
            return self.fd.__next__()
        except StopIteration:
            raise StopAsyncIteration

    async def __anext__(self):
        return await loop.run_in_executor(self.__thread_pool, self.__read)


async def read(name):
    async for line in Reader(name):
        # 由于文件结尾含有换行,我们将其去掉,因为print自带换行
        print(line[: -1])


async def main():
    await asyncio.wait([read("1.txt"), read("2.txt")])


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print(f"总用时: {time.perf_counter() - start}")
"""
地灵殿的主人,她独居在旧地狱的地灵殿,完全放弃和他人沟通。
有着粉紫色的短卷发,黑褐色眼瞳的少女。
旧地狱里满是受人厌恶的妖怪,但其中仍然有一种离群索居、被归为遭人厌恶的类别,那就是“觉”。
头上戴着黑色的发箍,发箍上有黄色的心形装饰,连接这她的第三只眼。
与妹妹恋不同,觉的第三只眼是红色的。
她们和人类与妖怪相处不来,逃入了旧地狱,结果连旧地狱的妖怪都讨厌她们,地灵殿没有访客,也不会有人跑到地灵殿来找她
穿着淡蓝色的上衣,袖口与披肩都是淡粉色。下身穿着由白色渐变为粉色的短裙,鞋子则是粉色的拖鞋
总用时: 5.0068648
"""

可以看到是并发交错打印的,两个任务分别运行在不同的线程中。

asyncio 提供的信号机制

任务在事件循环中运行的时候,可以被强制停止,并且在停止之后还可以继续运行,我们举个栗子:

from pprint import pprint
import asyncio


async def f(n):
    await asyncio.sleep(n)
    print(f"sleep {n}")


loop = asyncio.get_event_loop()
tasks = [loop.create_task(f(n)) for n in range(1, 5)]
# 查看当前的所有任务, 注意: 这里查看的是任务, 如果不使用 loop.create_task 创建任务的话, 那么得到的是空集合
pprint(asyncio.all_tasks(loop=loop))
"""
{<Task pending name='Task-1' coro=<f() running at C:/Users/satori/Desktop/yousa/1.py:5>>,
 <Task pending name='Task-2' coro=<f() running at C:/Users/satori/Desktop/yousa/1.py:5>>,
 <Task pending name='Task-3' coro=<f() running at C:/Users/satori/Desktop/yousa/1.py:5>>,
 <Task pending name='Task-4' coro=<f() running at C:/Users/satori/Desktop/yousa/1.py:5>>}
"""
try:
    loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt:
    # 一会我们需要在命令行中执行
    print("用户按下了 Ctrl + C")

pprint(asyncio.all_tasks(loop=loop))
r"""
sleep 1
sleep 2
用户按下了 Ctrl + C
{<Task pending name='Task-3' coro=<f() running at 1.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001D75827
2E80>()]> cb=[gather.<locals>._done_callback() at C:\python38\lib\asyncio\tasks.py:751]>,
 <Task pending name='Task-4' coro=<f() running at 1.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001D75827
2EB0>()]> cb=[gather.<locals>._done_callback() at C:\python38\lib\asyncio\tasks.py:751]>}
"""
tasks = asyncio.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
"""
sleep 3
sleep 4
"""
loop.close()

我们来解释一下上面都做了写什么,首先我们创建了 4 个任务,然后执行。当执行完两个任务之后,我们按下了 Ctrl + C 进行引发 KeyboardInterrupt,这个时候被异常语句捕获。此时其它任务处于中断状态,然后我们继续打印所有任务,发现只剩下两个了,因为有两个已经执行完毕。

之后继续执行剩余的任务,会从中断的地方重新执行。但如果我们不想执行呢?我们希望一个失败了,其它的也不执行的话,那么还可以将任务取消掉。

import asyncio


async def f(n):
    await asyncio.sleep(n)
    1 / n
    print(f"sleep {n}")


loop = asyncio.get_event_loop()
# 创建任务的时候还可以为任务起一个名字, 否则会有一个默认的名字: Task-{n}
tasks = [loop.create_task(f(n), name=f"任务_{n}") for n in range(0, 5)]

try:
    loop.run_until_complete(asyncio.gather(*tasks))
except ZeroDivisionError:
    # 显然执行第一个任务就会报错, 因为 n == 0
    print("出现了除零错误")

# 剩下的任务, 显然第一个任务已经失败, 那么还剩下 4 个任务
tasks = asyncio.all_tasks(loop=loop)
for t in tasks:
    t.cancel()

# 我们将任务取消掉之后, 需要再通过 asyncio.gather 放入到事件循环中运行一次, 进行一些资源的释放
# 因为如果任务取消之后不管了, 那么会抛出警告: Task was destroyed but it is pending!
# 并且是需要通过 asyncio.gather, 并且指定 return_exceptions=True
# 因为被取消的任务如果还被扔到事件循环中进行调度的话, 那么会抛出 asyncio.exceptions.CancelledError
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
"""
出现了除零错误
"""

如果你细心的话,应该会发现这里和我们之前说的有一个矛盾的地方。我们之前说一个任务出现了异常,并不会立刻报错,而是会等到其它任务都执行完毕之后再整体报错。所以按照之前的逻辑,我们在捕获异常之后任务应该不存在了才对(任务执行完毕、或者报错就算结束了),但是请注意一下之前的代码。如果一个任务报错,而其它任务正好处于异步 IO 阻塞的时候,会立即停止,但如果不是异步 IO 阻塞,那么仍会继续执行。我们举个栗子:

import asyncio


async def f1():
    1 / 0

async def f2():
    await asyncio.sleep(3)
    print("我是 f2")

async def f3():
    import time
    time.sleep(5)
    print("我是 f3")

async def f4():
    await asyncio.sleep(8)
    print("我是 f4")


loop = asyncio.get_event_loop()
try:
    # 我们传入协程会自动包装成任务
    loop.run_until_complete(asyncio.gather(f1(), f2(), f3(), f4()))
except ZeroDivisionError:
    print("出现除零错误")

tasks = asyncio.all_tasks(loop=loop)
for t in tasks:
    print(t.get_name())

"""
我是 f3
我是 f2
出现除零错误
Task-4
"""

我们解释一下,因为里面有很多可以说的地方。首先我们看到 "我是 f3" 是最先打印的,因为 asyncio 的事件循环本质上还是一个单线程。所以我们使用 loop = asyncio.get_event_loop() 之后,如果在协程内部想要获取事件循环的话,除了作为参数进行传递之外,还可以使用 loop = asyncio.get_running_loop() 来进行获取。因为一个线程只会有一个事件循环,一旦创建就作为全局变量保存起来,后面拿到的都是这个事件循环。即使你还使用 loop = asyncio.get_event_loop(),那么 asyncio 会先检测是否创建了事件循环,如果创建那么直接返回已经创建的事件循环。因此当事件循环创建之后,更推荐使用 get_running_loop,而不是 get_event_loop。

那为什么先打印 "我是 f3" 呢,首先 f1 在报错的时候,f2、f4 虽然处于异步 IO 阻塞状态,但 f3 的内部并没有出现异步 IO 阻塞,所以它必须要等到 f3 内部出现 IO 阻塞才行(准确的说是所有任务都出现)。既然没有出现异步 IO,那么会一直执行 f3,因为只有遇见 IO 阻塞的时候才会切换,如果 f3 内部一直没有出现的话,那么会一直执行 f3。从这里我们也看出,这种架构只适用于高 IO、低 CPU 的场景,因为它始终是单线程。所以最终 f3 会执行完毕,从而打印 "我是 f3",但是当 f3 执行完毕之后,f2 内部的异步 IO 阻塞已经结束,因此此时便不处于异步 IO 阻塞状态,所以它也不会结束,而是会打印 "我是 f2"。最后只剩下 f4 了,但不幸的是它此时处于异步 IO 阻塞,因此会被中断。

于是我们在收集任务的时候,只剩下了 f4,通过 get_name 打印任务名也证明了这一点。当然这里我们最终没有将任务给取消掉,也没有继续运行,那么任务就会一直处于中断状态。这样也是不会抛警告的,只不过资源会一直占用,因此对于那些处于 pending 的任务,还是做一些善后处理比较好。

我们上面演示了如何通过 KeyboardInterrupt 来停止事件循环,也就是按 Ctrl + C。在 asyncio 内部,引发的 KeyboardInterrupt 有效解除了 loop.run_until_complete 的调用阻塞。

但 KeyboardInterrupt 实际上对应 SIGINT 信号(一个数值,等于 2),在网络服务中更常见的进程终止信号实际上是 SIGTERM(一个数值,等于 15),这也是在 Linux 中使用 kill 命令时的默认信号。

asyncio 对进程信号的处理也提供了内置的支持,但通常涉及的信号的处理时,都会非常的复杂(不局限于 asyncio)。

import asyncio


async def main():
    while True:
        print("正在处理中·······")
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
task = loop.create_task(main())
try:
    loop.run_until_complete(task)
except KeyboardInterrupt:
    print("Got signal: SIGINT, shutting down")

# 良好的习惯, 最后在收集一次任务, 将其取消掉, 扔到事件循环中清理并释放资源
tasks = asyncio.all_tasks(loop)
for t in tasks:
    t.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
"""
正在处理中·······
正在处理中·······
正在处理中·······
正在处理中·······
Got signal: SIGINT, shutting down
"""

看起来没有任何问题,那如何通过信号的方式呢?asyncio 也提供了相应的 API 来保证我们做到这一点。

import asyncio
from signal import SIGINT, SIGTERM


async def main():
    while True:
        print("正在处理中·······")
        await asyncio.sleep(3)


def handler(sig):
    if sig == SIGINT:
        print(f"获取到信号 SIGINT, 但是秋梨膏, 我是什么也不会做的")
    elif sig == SIGTERM:
        print(f"获取到信号 SIGTERM, 但我也什么都不会做的, 你还是使用 kill -9 吧, 它最厉害")


loop = asyncio.get_event_loop()
# 将原来的信号给移除
loop.remove_signal_handler(SIGINT)
# 然后再重新添加回去, 只不过这里换成了我们自己的逻辑, 此时再按下 Ctrl + C 的时候, 就会执行 handler, 后面的 SIGINT 是参数
loop.add_signal_handler(SIGINT, handler, SIGINT)
loop.remove_signal_handler(SIGTERM)
loop.add_signal_handler(SIGTERM, handler, SIGTERM)

# 执行任务
task = loop.create_task(main())
loop.run_until_complete(task)

由于 Windows 上不支持这种操作,所以我们在 Linux 上演示,并且我们 Linux 上的是 Python3.6,对应的 asyncio 没有 all_tasks 这个属性,所以我们只演示信号的处理,但你也可以做一些其它操作比如资源的释放、任务的取消等等。

即使我们的 handler 里面报错了也是没有关系的,程序不会中断。

所以这便是信号机制,asyncio 对信号默认有相应的处理逻辑,比如 Ctrl + C 会引发 KeyboardInterrupt,但是我们可以将它和信号的关联给移除掉。然后重新添加,但是此时程序的处理逻辑就由我们来指定,比如清理资源、取消任务等等。

实现 TCP 服务端、客户端

asyncio 还可以作为 TCP 服务端提供相应的服务,作为客户端发送相应的请求,我们来看一下。

import asyncio
from asyncio import StreamReader, StreamWriter


async def echo(reader: StreamReader, writer: StreamWriter):
    """
    下面开启了一个服务, 每当一个连接过来的时候, 就会基于 echo 创建一个协程
    并且自动传递两个参数: reader 用户读取用户发来的数据, writer 可以将数据返回给用户
    :param reader:
    :param writer:
    :return:
    """
    print("来了一个连接")
    try:
        # 不停地接收数据, 直到接收完毕, 得到的是字节
        while data := await reader.read(1024):
            # 转成大写, writer.write 表示将数据写入到缓冲区中
            writer.write(data.upper())
            # 刷新缓冲区
            await writer.drain()
            # 关闭 writer
            writer.close()
        print("连接结束")
    except Exception:
        print("用户强行取消连接")

async def main():
    # 开始一个服务, 第一个函数是一个回调函数, 当连接过来的时候就会根据此函数创建一个协程
    # 后面是绑定的 ip 和 端口
    server = await asyncio.start_server(echo, "0.0.0.0", 9999)
    # 然后开启无线循环
    async with server:
        await server.serve_forever()


asyncio.run(main())

上面便是服务端,然后我们来编写客户端的代码。

import asyncio

async def main():
    # open_connection 可以理解为 s = socket.socket(); s.connect()
    # 同样返回一个 reader 和一个 writer
    reader, writer = await asyncio.open_connection("localhost", 9999)
    # 调用 writer 写入数据
    writer.write(b"komeiji satori")
    # 刷新缓冲区
    await writer.drain()
    # 读取数据
    data = await reader.read()
    print(data)  # b'KOMEIJI SATORI'
    # 关闭 writer
    writer.close()


asyncio.run(main())

注意:以上都是 TCP 请求,asyncio 不支持 HTTP 请求,所以下面我们就通过 TCP 请求的方式来下载网上的一张图片。

import asyncio

async def main():
    reader, writer = await asyncio.open_connection("i0.hdslb.com", 80)
    # 发送请求, HTTP 请求也是基于 TCP, 所以我们完全可以通过 TCP 的方式发送 HTTP 请求
    # 格式如下: GET {路径} HTTP/1.1\r\nHost: {域名}\r\nConnection: close\r\n\r\n
    # 当然你也可以加入一些请求头什么的, 具体可以参数 HTTP 请求报文格式
    writer.write(b"GET /bfs/album/c63463e6a086a15064d548cad662a0e6f0b31741.jpg@518w.jpg HTTP/1.1\r\n"
                 b"Host: i0.hdslb.com\r\n"
                 b"Connection: close\r\n\r\n")
    # 图片的地址: https://i0.hdslb.com/bfs/album/c63463e6a086a15064d548cad662a0e6f0b31741.jpg@518w.jpg
    await writer.drain()
    # 读取数据
    data = await reader.read()
    writer.close()
    return data


loop = asyncio.get_event_loop()
# 这个 data 就是我们发送请求所获取的字节流
data = loop.run_until_complete(main())
# 这个 data 里面除了包含图片信息之外, 还包含 HTTP 响应头, 我们可以通过 \r\n 进行分隔
# data_list 中的最后一个元素就是图片对应的字节流
data_list = data.split(b"\r\n")
for item in data_list[: -1]:
    print(item)
"""
b'HTTP/1.1 200 OK'
b'Content-Type: image/jpeg'
b'Content-Length: 115785'
b'Connection: close'
b'Server: Tengine/2.1.1'
b'ETag: e6e189dc1f329f1626671d457bc77c2da00b695c'
b'Date: Tue, 24 Nov 2020 09:08:29 GMT'
b'Last-Modified: Mon, 16 Dec 2019 18:29:24 GMT'
b'Expires: Wed, 24 Nov 2021 09:08:29 GMT'
b'Age: 1'
b'Cache-Control: max-age=31536000'
b'Accept-Ranges: bytes'
b'o-height: 733'
b'o-width: 518'
b'Access-Control-Allow-Origin: *'
b'Access-Control-Allow-Methods: GET, POST, OPTIONS'
b'Access-Control-Allow-Credentials: true'
b'Access-Control-Allow-Headers: Origin,No-Cache,X-Requested-With,If-Modified-Since,Pragma,Last-Modified,Cache-Control,Expires,Content-Type,Access-Control-Allow-Credentials,DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Cache-Webcdn'
b'Vary: Origin,Accept-Encoding'
b'Access-Control-Expose-Headers: Content-Length,X-Cache-Webcdn'
b'X-Hash: /bfs/album/c63463e6a086a15064d548cad662a0e6f0b31741.jpg'
b'X-Cache-Status: HIT from KS-CLOUD-HUZ-MP-02-01'
b'X-Cache-Status: MISS from KS-CLOUD-TJ-CT-23-21'
b'X-Cache-Webcdn: KS'
b'X-Cdn-Request-ID: 2ddd1afae88b75a59ffd9b409c9e50c1'
b''
"""
# HTTP 响应头最后面是两个 \r\n, 因此使用 \r\n 分隔会多出一个 b''
# 怎么样, 是不是很简单呢? 如果我们使用 requests 的话
import requests
res = requests.get("https://i0.hdslb.com/bfs/album/c63463e6a086a15064d548cad662a0e6f0b31741.jpg@518w.jpg")
# 这个 content 就是图片的字节流, 不包含请求头, 所以它和 data_list 中的最后一个元素是相等的
content = res.content
print(data_list[-1] == content)  # True

# 我们将其写入到文件中
with open("1.png", "wb") as f:
    f.write(data_list[-1])

下载成功,当然我们通过 socket 也是可以实现的(同样是 TCP 请求),只不过 asyncio 是异步下载。我们来测试一下 asyncio 和 requests 之间的速度如何:

import time
import asyncio
import requests

async def download_async():
    reader, writer = await asyncio.open_connection("i0.hdslb.com", 80)
    writer.write(b"GET /bfs/album/c63463e6a086a15064d548cad662a0e6f0b31741.jpg@518w.jpg HTTP/1.1\r\n"
                 b"Host: i0.hdslb.com\r\n"
                 b"Connection: close\r\n\r\n")
    await writer.drain()
    # 读取数据
    data = (await reader.read()).split(b"\r\n")[-1]
    writer.close()



def download_sync():
    for i in range(10):
        res = requests.get("https://i0.hdslb.com/bfs/album/c63463e6a086a15064d548cad662a0e6f0b31741.jpg@518w.jpg")
        data = res.content


async def main():
    # 10 个请求并发发送
    tasks = [download_async() for _ in range(10)]
    start_time = time.perf_counter()
    await asyncio.gather(*tasks)
    end_time = time.perf_counter()
    print(f"异步用时: {end_time - start_time} 秒")

    start_time = time.perf_counter()
    download_sync()
    end_time = time.perf_counter()
    print(f"同步用时: {end_time - start_time} 秒")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
异步用时: 0.35589609999999994 秒
同步用时: 3.3828416 秒
"""

可以看到 asyncio 比同步的 requests 要快很多,当然这种测试没太大意义,应该让 aiohttp 和 requests 做对比,但这都无所谓了。另外有可能存在网络问题导致测量不准确,你可以多执行几次,总之异步肯定是要比同步快上好几倍的。

并发请求网络

正如上面演示的那样,asyncio 是可以向 web 服务发送请求的,但是它不支持 http 请求,只支持 tcp 请求。如果使用 asyncio 的话,我们需要手动填写一大堆的 HTTP 报文,所以建议使用aiohttp。

aiohttp 非常简单,如果你会用 requests,那么使用 aiohttp 也不在话下,我们简单介绍一下吧。还是那句话,aiohttp 和 requests 非常类似,只不过前者异步、后者同步。

首先在 requests 中我们可以创建一个 session,然后通过 session 调用 get 等方法发送请求,在 aiohttp 与之类似。aiohttp 官方建议:session 只需要创建一次即可,然后用同一个 session 发送请求,不要为每一个请求单独创建一个 session,这是非常不明智的。

import aiohttp


async def foo():
    async with aiohttp.ClientSession() as session:
        async with session.get("url") as response:
            # 也可以 await response.text() , 但是要注意编码
            # 也可以 await response.read() , 得到原始的字节流, 可以通过 chardet 检测编码, 然后转换
            return await response.json()

整体逻辑大致如此,一些请求头、cookie 之类的可以在 ClientSession 中设置,当然也可以在调用的 get、post、delete 等方法中设置。

返回的 response 可以获取很多内容,比如:

import aiohttp
import asyncio


async def foo():
    async with aiohttp.ClientSession() as session:
        async with session.get("url") as response:
            # 服务端返回的内容的请求头信息
            print(response.headers)
            # 像服务端发送请求所携带的请求信息
            print(response.request_info)
            # 服务端返回的 cookie
            print(response.cookies)

整体套路和 requests 非常类似,包括其它请求也是同理,非常简单这里不细说了。对着官网看半小时,你就知道该怎么用了,至于还有返回的 response 还有哪些属性,IDE 的智能提示就会告诉你,而且属性都是见名知意。

import aiohttp


async def foo(session: aiohttp.ClientSession, 
              url: str):
    """
    我们说不要给每一个请求都创建一个 session, 而是这些请求共用一个 session
    因为 aiohttp 底层已经将请求异步化了, 所以一个请求阻塞会自动切换到另一个请求
    :param session: 
    :param url: 
    :return: 
    """
    async with session.get(url) as response:
        return await response.json()


async def main():
    async with aiohttp.ClientSession() as session:
        # 假设我们有 100 个 url, 那么这 100 个请求都会共用一个 session
        for url in ["url1", "url2", "url3"]:
            print(await foo(session, url))

但是有些时候服务器的并发量有限制,不能支持我们同时发送 100 个请求,这个时候该怎么做呢?

import aiohttp
from aiohttp.connector import TCPConnector


async def foo(session: aiohttp.ClientSession,
              url: str):
    """
    我们说不要给每一个请求都创建一个 session, 而是这些请求共用一个 session
    因为 aiohttp 底层已经将请求异步化了, 所以一个请求阻塞会自动切换到另一个请求
    :param session:
    :param url:
    :return:
    """
    async with session.get(url) as response:
        return await response.json()


async def main():
    # 指定 connector 连接器, 将请求的最大数量限制为 10 (默认为100)
    # 可以类比线程池, 这里是一个 TCP 连接池, 一个 session 发送一个请求就建议一个 TCP 连接, 同时最多支持 10 个 TCP 连接
    # 如果已经发出去、但还没有回来的请求达到了 10 个, 说明 TCP 连接数已经达到了 10 个, 那么下面的请求就不会再发了
    # 当请求回来之后, 这个 TCP 连接就结束了, 然后才能发送下一个请求
    async with aiohttp.ClientSession(connector=TCPConnector(limit=10)) as session:
        for url in ["url1", "url2", "url3"]:
            print(await foo(session, url))

然后再重点说一下文件上传,很多服务器要求通过 multipart/form 的方式,而这是 requests 所不支持的,但是我们可以通过 requests_toolbelt 来解决这一点,这在 aiohttp 中要怎么做呢?

from aiohttp import MultipartWriter, BytesPayload, ClientSession
from multidict import MultiDict


async def foo():
    # 构造 multipart/form-data 表单数据
    multi_writer = MultipartWriter()
    multi_writer.append_payload(
        BytesPayload(
            b'{"content": "content"}',  # 将内容转成 bytes
            content_type="application/json; charset=utf8",  # 服务端要求的类型
            headers=MultiDict({"a": "b", "c": "d"})  # 一些请求头信息, 服务端如果不需要可以不传递
        )
        # 然后我们还需要为这个 BytesPayload 对象指定一个名字, 可以通过 set_content_disposition 进行设置
        # 第一个参数一般填写 "form-data" 即可, 然后名字通过 name 指定, 说实话个人觉得这里没有 requests_toolbelt 方便
        # requests_toolbelt 的 MultipartEncoder 的第一个参数 fields, 我们可以传递一个字典
        # 而字典里面的 key, 就相当于这里的 name; 而 value 是一个元组, 元组里面元素则对应 BytesPayload 里面的参数接收的内容
    ).set_content_disposition("form-data", name="metadata")
    # 如果还需要传递其它内容, 那么需要再写一个 writer.append_payload, 所以个人觉得用起来比较麻烦
    # 如果是 requests_toolbelt.MultipartEncoder, 那么只需要多加一个键值对即可
    # 比如我们上传文件:
    multi_writer.append_payload(
        BytesPayload(
            bytes("文件内容", encoding="utf-8"),
            # 类型, 比如: 我们上传的是 wav 文件
            # 事实上如果上传的是文件内容, 那么这个参数可以不用, 因为服务端基本上会根据传递的内容进行推断
            content_type="audio/wav",
            # 也可以指定文件名, 因为我们这里是文件读取之后的字节流, 没有文件名等元信息
            # 但很多服务端基本上不需要, 很多时候我们直接指定为 "file" 即可, 当然写文件名本身也是一样的
            filename="1.wav"
        )
        # 这里通过 set_content_disposition 将 name 也指定为 "file", 一般也都是 "file"
        # 如果是 requests_toolbelt.MultipartEncoder 的话
        # 等价于: "file": ("1.wav", bytes("文件内容", encoding="utf-8"), "audio/wav")
        # 如果还需要头部信息, 那么通过 BytesPayload 里面的参数 headers 指定即可, 对比 requests_toolbelt.MultipartEncoder 的话
        # 等价于: "file": ("1.wav", bytes("文件内容", encoding="utf-8"), "audio/wav", {"h1": "xxx", "h2": "yyy"})
    ).set_content_disposition("form-data", name="file")

    async with ClientSession() as session:
        async with session.post("url",
                                data=multi_writer,
                                # 还需要在请求中设置 headers, 指定 Content-Type, 不然服务端不一定能够正确解析我们传递的内容
                                # 如果请求头还需要其它内容的话, 也直接写在里面即可
                                # 比如有些服务要求请求头中必须包含某个 AppKey 之类的
                                headers={"Content-Type": "multipart/form-data; boundary=" + multi_writer.boundary}
                                ) as response:
            print(await response.json())

估计很多人在使用 aiohttp 上传 multipart/form-data 格式的内容时,会踩很多坑,不知道该怎么弄,至少我在当时郁闷了一会。尽管使用 requests_toolbelt 可以很轻松解决问题,但是整个项目都是异步的,所以还是希望使用 aiohttp 的方式解决问题。经过一番努力,算是找到了解决办法,而且对比一下发现 aiohttp 支持的功能还要更强大一些。

aiohttp 就介绍到这里,还是那句话用过一段时间 requests,上手 aiohttp 也是分分钟的事,其它的一些内容也都是老生常谈,不过这仅仅针对于请求,也就是 aiohttp 充当的角色是客户端。aiohttp 还可以用来做 web,充当服务端的角色,也支持 WebSocket 等等,有兴趣可以了解一下。我个人用 aiohttp 主要是发送请求,没有怎么关注提供响应方面的内容,因为很少用,如果想做异步的 web 服务,肯定选 FastAPI,tornado、sanic 等框架,很少有拿 aiohttp 做 web 的。

小结

使用 Python 进行异步编程也是一个趋势,异步框架种类也丰富多彩,像 Tornado、Quart、FastAPI、Sanic 等等。Tornado 算是老牌框架了,在 Python 还没有提供原生协程的时候它就已经出现了,而 Sanic 则是号称能达到匹配 Go 语言的并发量(还是无法媲美的,但至少在一个数量级),Quart 语法类似于 Flask,FastAPI 则是文档最全、功能最丰富、也是我个人目前在用的一个框架。如果你要选择异步框架的话,首推 FastAPI 和 Sanic,但不管是哪种异步框架,它们都使用了 Python 中的协程,而如果想要运行协程,那么就必须要事件循环,而事件循环的创建必然离不开 asyncio(早期的 tornado 是自己实现了一套,asyncio 出来之后就不再用自己的那一套了)。所以理解 Python 中的协程以及使用掌握 asyncio 是我们进行异步编程的基础。