Asyncio 协程异步笔记

发布时间 2023-09-19 17:44:57作者: 车天健

协程 & asyncio & 异步

1. 协程 (coroutine)

协程不是计算机提供,而是程序员人为创造。

协程(coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块互相切换运行。例如:

def func1():
    print(1)
    ...
    print(2)
    
def func2():
    print(3)
    ...
    print(4)
    
func1()
func2()


实现协程有这么几种方法:

  • greenlet,早期模块。
  • yield 关键字。
  • asyncio 装饰器(python 3.4)
  • asyncawait 关键字(python 3.5)


1.1 greenlet 实现协程

pip3 install greenlet
from greenlet import greenlet

def func1():
    print(1)  # 第 1 步:输出 1
    gr2.switch()  # 第 3 步:切换到 func2 函数
    print(2)  # 第 6 步:输出 2
    gr2.switch()  # 第 7 步 切换到 func2 函数,从上一次执行的位置继续向后执行
    
def func2():
    print(3)  # 第 4 步:输出 3
    gr1.switch()  # 第 5 步:切换到 func1 函数,从上一次执行的位置继续向后执行
    print(4)  # 第 8 步:输出 4
    
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()  # 第 1 步:去执行 func1 函数


1.2 yield 关键字

def func1():
    yield 1
    yield from func2()
    yield 2
    
def func2():
    yield 3
    yield 4
    
f1 = func1()
for item in f1:
    print(item)

伪实现,仅能实现协程的功能。



1.3 asyncio

在 python 3.4 及之后的版本。

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中其它任务
    print(2)
    
@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中其它任务
    print(4)
    
tasks = [
    asyncio.ensure_future(func1())
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意:遇到 IO 阻塞自动切换。



1.4 aynsc & await 关键字

在 python 3.5 及之后的版本。

import asyncio

async def func1():
    print(1)
    # 网络 IO 请求:下载一张图片
    await asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中的其它任务。
    print(2)
    
async def func2():
    print(3)
        # 网络 IO 请求:下载一张图片
    await asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中的其它任务。
    print(4)
    
tasks = [
    asyncio.ensure_future(func1())
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))    


2. 协程的意义

在一个线程中如果遇到 IO 等待时间,线程不会傻等,而是利用空闲时间再去干点其它事情。



案例:下载三张图片(网络 IO):

  • 普通方式(同步)

    pip3 install requests
    
    import requests
    
    def download_image(url):
        print("开始下载:", url)
        response = requests.get(url)
        print("下载完成")
    
        file_name = url.rsplit("_")[-1]
        with open(file_name, mode="wb") as file_object:
            file_object.write(response.content)
    
    url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
                "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
                "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", 
    ]
    
    for item in url_list:
        download_image(item)
    


  • 协程方式(异步)

    pip3 install aiohttp
    
    import aiohttp
    import asyncio
    
    async def fetch(session, url):
        print("发送请求:", url)
        async with session.get(url, verify_ssl=False) as response:
            content = await response.content.read()
            file_name = url.rsplit("_")[-1]
            with open(file_name, mode="wb") as file_object:
                file_object.write(content)
                
    async def main():
        async with aiohttp.ClientSession() as session:
            url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
                "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
                "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", 
    ]
            tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
            await asyncio.wait(tasks)
            
            
    if __name__ == "__main__":
        aynscio.run(main())
    


3. 异步编程

3.1 事件循环(Event Loop)

理解成一个死循环,去检测并执行某些代码。

task_list = [task1, task2, task3, ...]

while True:
	executables, completes = [...], [...]  # 在 task_list 中检查所有任务,将可执行和已完成返回
	
	for executable in executables:
		execute executable
		
	for complete in completes:
		remove complete from task_list
		
	if task_list == []:  # 如果 task_list 中的任务都已完成,则终止循环
		break


import asyncio

# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务放到任务列表
loop.run_until_complete(asyncio.wait(tasks))


3.2 快速上手

协程函数(coroutine function):定义函数时 async def (加上 async 关键字)。

协程对象(coroutine object):执行协程函数得到的协程对象。

async def func():
    pass

result = func()

注意到 result = func() 中 call 了 func(),但并不会执行 func() 内部代码,只是得到了 func() 的协程对象。



若要执行协程函数内部代码,需要事件循环去处理协程函数得到的协程对象。

async def func():
    print("come here.")
    
result = func()

loop = async.get_event_loop()
loop.run_until_complete(result)


到了 python 3.7 之后,还有更简便的写法:

async def func():
    print("come here.")
    
result = func()

# loop = async.get_event_loop()
# loop.run_until_complete(result)
async.run(result)  # python 3.7


3.3 await 关键字

await 一般要加上 可等待的对象(协程对象、Future 对象、Task 对象),可以简单理解为 IO 等待(但实际上并没有这么简单)。



示例 1:

import asyncio

async def func():
    print("come here.")
    response = await asyncio.sleep(2)  # 没有什么意义,假设这是一个 IO 等待(例如网络请求)
    print("terminate", response)
    
asyncio.run(func())

在事件循环内部,执行协程对象 func() 时会先执行 print("come here."),接下来会进入 IO 等待,此时事件循环会跳出 func() 函数去执行其它任务,一旦 response 得到返回值(即结束 IO 等待),事件循环会在下一次循环中检测到 IO 等待已经结束,此刻才会继续执行 func() 后面的代码(即 print("terminate", response))。



示例 2(协程对象之间可以嵌套):

async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"

async def func():
    print("执行协程函数内部代码")
    
    # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其它协程(任务)。
    response = await others()
    print("IO 请求结束,结果为:", response)
    

asyncio.run(func())


示例 3:

async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"

async def func():
    print("执行协程函数内部代码")
    
    # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其它协程(任务)。
    response_1 = await others()
    print("IO 请求结束,结果为:", response_1)
    
    response_2 = await others()
    print("IO 请求结束,结果为:", response_2)
    

asyncio.run(func())

await 关键字的含义就是,等待对象的值得到返回结果之后再继续向下运行。



3.4 Task 对象

Tasks are used to schedule coroutines concurrently.

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon.

简单来说,它可以在事件循环中添加多个任务。

Tasks 用于并发调度协程,通过 asyncio.create_task(协程对象) 的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外的,还可以用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。



示例 1(这种代码写得比较少):

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")
    
    # 创建协程,将协程封装到一个 Task 对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(fucn())
    
    # 当执行某协程遇到 IO 操作时,会自动化切换执行其它任务。
    # 此处的 await 时等待相对应的协程全都执行完毕并获取结果。
    result_1 = await task1
    result_2 = await task2
    print(result_1, result_2)
    

asyncio.run(main())


示例 2(这种代码应用得比较多):

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")
    
    # 创建协程任务列表
	task_list = [
        asyncio.create_task(func(), name="n1"),  # 给 task 命名,会在返回集中显示
        asyncio.create_task(func(), name="n2")
    ]
    
	# 不能直接把 task_list 以列表的形式加在 await 之后
    # 注意 await 关键字只接受 coroutine object, task object, future object
    # 此处 done 是一个集合,为 task_list 的返回值
    # pending 在 timeout 不为 None 时有意义,timeout 规定了最长等待时间,
    # 如果超过 timeout,那么还未完成的任务将添加到 pending 中。
	done, _ = await asyncio.wait(task_list, timeout=1)
    print(done)
    

asyncio.run(main())


示例 3:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

# 创建协程任务列表
task_list = [
    asyncio.create_task(func(), name="n1"),  # 给 task 命名,会在返回集中显示
    asyncio.create_task(func(), name="n2")
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)


注意到以上代码会导致程序报错。原因是:asyncio.create_task() 会将协程对象立即添加到事件循环中,但是,事件循环是在 asyncio.run() 中被创造,因此此时并不存在事件循环。应该如此修改:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

# 创建协程对象列表
task_list = [
    func(), 
    func()
]

# 此时 asyncio 会在创建事件循环之后,在内部将 task_list 中的协程对象添加到事件循环中
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)


3.5 Future 对象

Future 类是 Task 类的父类,即 Task 类继承自 Future 类,Task 对象内部 await 结果的处理基于 Future 对象而来。

A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.



示例 1:

import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务(Future 对象),这个任务什么都不干。
    future = loop.create_future()
    
    # 等待任务最终结果(Future 对象),没有结果则会一直等下去。
    await future
    
asyncio.run(main())

在上述代码中,由于创建的 Future 对象什么也不干,因此 await future 将一直卡住,无法获得返回结果,所以上述代码是没有实际意义的。但注意,如果某一个时刻突然给 future 赋值,那么 future 立刻可以获得返回结果,并且跳出 await



示例 2(没什么意义,用于理解 Future 对象的作用,即帮助我们等待结果):

async def set_after(future):
    await asyncio.sleep(2)
    future.set_result("666")
    
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务(Future 对象),没有绑定任何行为,则这个任务永远不知道什么时候结束。
    future = loop.create_future()
    
    # 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在 2s 之后会给 future 赋值。
    # 即手动设置 future 任务的最终结果,那么 future 就可以结束了。
    await loop.create_task(set_after(future))
    
    # 等待 Future 对象获取最终结果,否则一直等待下去。
    data = await future
    print(data)
    
asyncio.run(main())


3.6 concurrent 中的 Future 对象

首先注意到,concurrent 中的 Future 对象(concurrent.futures.Future)和 asyncio 中的 Future 对象没有关系。concurrent 中的 Future 对象是当使用线程池、进程池实现异步操作时使用到的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
    time.sleep(1)
   	return value
    
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

# 或创建进程池
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
    # 让 pool 拿出一个线程去执行 func 函数
    future = pool.submit(func, i)
    print(future)

实际中可能会存在两种 Future 对象交叉使用。例如:crm 项目中 80% 都基于协程异步编程 + MySQL,但 MySQL 不支持异步,因此在 MySQL 中使用进程池、线程池做异步编程。



示例 1:

import time
import asyncio
import concurrent.futures

def func1():
    # 某个耗时操作
    time.sleep(2)
    return "complete"

async def main():
    loop = asyncio.get_running_loop()
    
    # 1. Run in the default loop's executor (default to ThreadPoolExecutor)
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去
    # 执行 func1 函数,并返回一个 concurrent.futures.Future 对象
    # 第二步:调用 asyncio.wrap_future 将 concurrent.future.Future 对象
    # 包装为 asyncio.Future 对象。
    # 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 
    # asyncio.Future 对象才能使用。
   	future = loop.run_in_executor(None, func1)  # 返回一个 Future
    # 上面这一步内部会调用 asyncio.wrap_future 将返回的 concurrent.futures.Future 
    # 对象转换为 asyncio.Future 对象
    # 默认 None 意味着创建线程池,若想使用进程池请参考以下注释代码
    result = await future
    print("default thread pool", result)
    
    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
        # result = await loop.run_in_executor(pool, func1)
        # print("custom thread pool", result)
    
    # 3. Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
        # result = await loop.run_in_executor(pool, func1)
        # print("custom process pool", result)

asyncio.run(main())


案例:asyncio + 不支持异步的模块(爬虫)

import asyncio
import requests

async def download_image(url):
    # 发送网络请求,下载图片(遇到网络下载图片的 IO 请求,自动化切换到其它任务)
    print("开始下载:", url)
    
    loop = asyncio.get_event_loop()
    
    # requests 模块默认不支持异步操作,所以就用线程池配合实现了
    future = loop.run_in_executor(None, requests.get, url)
    
    response = await future
    print("下载完成")
    
    # 图片保存到本地文件
    file_name = url.rsplit("_")[-1]
    with open(file_name, mode="wb") as file_object:
        file_object.write(response.content)
        
if __name__ == "__main__":
    url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
            "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
            "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", 
]
    tasks = [download_image(url) for url in url_list]
   	loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

耗费资源更大,不得已而为之。



3.7 异步迭代器

  • 什么是异步迭代器?

    实现了 __aiter__()__anext__() 方法的对象。__anext__() 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

  • 什么是异步可迭代对象?

    可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。



示例:

import asyncio

class Reader(object):
    """
    自定义异步迭代器(同时也是异步可迭代对象)
    """
    
    def __init__(self):
        self.count = 0
    
    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        val = await self.readline()
        if val is None:
            raise StopAsyncIteration
        return val
    
# 以下代码会报错,因为 async for 必须写在协程函数内。
# obj = Reader()
# async for item in obj:
    # print(item)
    
async def func():
    obj = Reader()
    async for item in obj:
        print(item)
        
asyncio.run(func())


3.8 异步上下文管理器

  • 什么是异步上下文管理器?

    此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。



示例:

import asyncio

class AsyncContextManager(object):
    def __init__(self):
        self.conn = conn
    
    async def do_something(self):
        # 异步操作数据库
        return 666
    
    async def __aenter__(self):
        # 异步连接数据库
        self.conn = await asyncio.sleep(1)  # 可以换成连接数据库代码
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)
        
        
# 以下代码会报错,因为 async with 必须写在协程函数内。
# obj = AsyncContextManager()
# async with obj:
    # result = await obj.do_something()
    # print(result)

# 或者
# async with AsyncContextManager() as f:
    # result = await f.do_something()
    # print(result)
    
async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
        
asyncio.run(func())


4. uvloop

uvloopasyncio 事件循环的替代方案,可以提高事件循环效率,性能接近于 go 语言。

pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Your asyncio code here.

# 内部的事件循环自动会变为 uvloop
asyncio.run()

注意:asgi 是支持异步的 wsgi 网关接口(e.g. uvicorn,内部使用的就是 uvloop)。