饮冰十年-人工智能-FastAPI-01- 深入理解 Python 协程

发布时间 2023-12-24 22:38:49作者: 逍遥小天狼

Python 协程是一种强大的异步编程工具,可以有效地处理并发任务,提高程序性能。在这篇博客中,我们将深入探讨协程的概念、用法以及如何在 Python 中使用它们。

一、什么是协程

协程定义

协程(Coroutine)是一种特殊的函数,它可以在执行中暂停并在稍后的时间点继续执行。这种能力使得我们能够编写更为高效、可读性更强的异步代码。与传统的多线程或多进程相比,协程更加轻量级,避免了线程切换的开销。

协程不是计算机提供的,是程序员人为创造的。简而言之:通过一个线程实现代码块相互切换执行。

协程 vs 生成器

协程和生成器(Generator)在 Python 中都使用yield关键字,但它们有着不同的用途。生成器主要用于迭代,而协程则用于异步编程。协程的核心思想是在 yield 的基础上,增加了从外部发送数据的能力,从而实现了双向通信。

二、如何实现协程

  • greenlet,早期模块
  • yield 关键字
  • asyncio装饰器(py3.4)
  • async、await关键字(py3.5)
# pip install greenlet
"""
greenlet是一个在同一个线程中实现微线程的库,可以用它来实现协程。
下面是一个使用greenlet实现的简单协程的示例:
"""

from greenlet import greenlet


def coroutine_one():
    print("Start Coroutine One")
    gr2.switch()  # 切换到协程二
    print("End Coroutine One")
    gr2.switch()  # 再次切换到协程二,形成循环


def coroutine_two():
    print("Start Coroutine Two")
    gr1.switch()  # 切换到协程一
    print("End Coroutine Two")


# 创建两个 greenlet 对象,分别代表两个协程
gr1 = greenlet(coroutine_one)
gr2 = greenlet(coroutine_two)


def main():
    gr1.switch()  # 切换到协程一


if __name__ == "__main__":
    main()
greenlet实现的简单协程
"""
yield 实现比较牵强,为了实现协程而实现,没有什么意义
"""

from greenlet import greenlet


def coroutine_one():
    yield "Start Coroutine One"
    yield from coroutine_two()  # 切换到协程二
    yield "End Coroutine One"


def coroutine_two():
    yield "Start Coroutine Two"
    yield "End Coroutine Two"


gr1 = coroutine_one()
for item in gr1:
    print(item)
yield 实现协程
"""
python 3.4 之后引入
可以看到上面有个中划线,代表要废弃
特点:遇到IO阻塞自动切换
"""
import asyncio


@asyncio.coroutine
def coroutine_one():
    print("Start Coroutine One")
    # 遇到IO耗时操作,自动切换到tasks中的其他任务
    yield from asyncio.sleep(2)
    print("End Coroutine One")


@asyncio.coroutine
def coroutine_tow():
    print("Start Coroutine Two")
    yield from asyncio.sleep(1)
    print("End Coroutine Two")


task = [
    asyncio.ensure_future(coroutine_one()),
    asyncio.ensure_future(coroutine_tow())
]

# 协程函数执行有点特殊
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task))
asyncio.coroutine 实现
"""
python 3.5 之后引入
对装饰器进行修改
特点:遇到IO阻塞自动切换
"""
import asyncio
import time


async def coroutine_one():
    print("Start Coroutine One")
    # 遇到IO耗时操作,自动切换到tasks中的其他任务
    await asyncio.sleep(2)
    print("End Coroutine One")


async def coroutine_tow():
    print("Start Coroutine Two")
    await asyncio.sleep(1)
    print("End Coroutine Two")


task = [
    asyncio.ensure_future(coroutine_one()),
    asyncio.ensure_future(coroutine_tow())
]

start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task))
end_time = time.time()
total_time = end_time - start_time
print(f"耗时是: {total_time} 秒")
async 关键字实现协程(重点)

如何选择?使用 async 和 await 

在定义协程函数时,需要使用 async def 语法。而在协程内部,使用 await 关键字来等待异步操作完成。这种语法使得异步代码更加清晰和易读。

三、协程的优势

协程在异步编程中有着诸多优势。首先,协程避免了线程切换的开销,因此在高并发场景下性能更好。其次,协程可以更方便地处理异步操作的结果,提高代码的可读性。最重要的是,协程支持多个任务并发执行,使得编写复杂异步逻辑更为简单。

实际应用场景

协程在网络编程、Web 开发、爬虫等领域得到了广泛应用。在异步框架如 FastAPI、Tornado 中,协程帮助我们处理大量并发请求,提高系统的吞吐量。此外,协程也是编写高效爬虫的利器,可以并发地请求多个页面。

import time

import requests
from io import BytesIO
from PIL import Image


def download_images(url, name):
    print(f"图片{name} 开始下载!")
    # time.sleep(1)
    response = requests.get(url)

    if response.status_code == 200:
        print(f"图片{name} 获取成功!")
        # 使用 BytesIO 将图片内容转换为二进制流
        image_data = BytesIO(response.content)
        # time.sleep(2)
        # 使用 PIL 库打开图片
        image = Image.open(image_data)

        # 保存图片到本地
        image.save(f'image_{name}.jpg')

        print(f"图片{name} 保存成功.")


if __name__ == "__main__":
    image_urls = [
        'https://img1.tuguaishou.com/ips_templ_preview/69/76/1b/lg_5811401_1701160656_6565a6d092aee.jpg!w440?auth_key=1701387000-0-0-ed2a2dbae71d8fa6e08da8dca5662d7d',
        'https://img1.tuguaishou.com/ips_templ_small/ec/b1/ee/sm_5850428_1701141850_65655d5a10191.jpg!w440?auth_key=1701387000-0-0-61b975a896823b40c461a4a7d6c204af',
        'https://img2.tuguaishou.com/ips_templ_small/2a/9d/a5/sm_5850288_1701281313_65677e21d7bb2.jpg!w440?auth_key=1701387000-0-0-850ab5453894a2313ef7690125379b34'
    ]

    start_time = time.time()
    for i, item in enumerate(image_urls):
        download_images(item, i)
    end_time = time.time()
    total_time = end_time - start_time
    print(f"耗时是: {total_time} 秒")
普通下载
# pip install aiohttp

import time

import aiohttp
import asyncio
import os

from io import BytesIO
from PIL import Image


async def download_image(session, url, name, save_path='.'):
    print(f"图片{name} 开始下载!")
    try:
        async with session.get(url, verify_ssl=False) as response:
            await asyncio.sleep(1)
            response.raise_for_status()  # 检查是否有请求错误

            print(f"图片{name} 获取成功!")
            # 使用 BytesIO 将图片内容转换为二进制流
            image_data = BytesIO(await response.read())
            await asyncio.sleep(2)
            # 使用 PIL 库打开图片
            image = Image.open(image_data)

            # 检查保存路径是否存在,不存在则创建
            save_folder = os.path.join(save_path, 'images')
            os.makedirs(save_folder, exist_ok=True)

            # 保存图片到本地 异步调整
            image.save(os.path.join(save_folder, f'image_{name}.jpg'))

            print(f"图片{name} 保存成功.")
    except aiohttp.ClientError as e:
        print(f"图片{name} 下载失败: {e}")


async def main():
    image_urls = [
        'https://img1.tuguaishou.com/ips_templ_preview/69/76/1b/lg_5811401_1701160656_6565a6d092aee.jpg!w440?auth_key=1701387000-0-0-ed2a2dbae71d8fa6e08da8dca5662d7d',
        'https://img1.tuguaishou.com/ips_templ_small/ec/b1/ee/sm_5850428_1701141850_65655d5a10191.jpg!w440?auth_key=1701387000-0-0-61b975a896823b40c461a4a7d6c204af',
        'https://img2.tuguaishou.com/ips_templ_small/2a/9d/a5/sm_5850288_1701281313_65677e21d7bb2.jpg!w440?auth_key=1701387000-0-0-850ab5453894a2313ef7690125379b34'
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [download_image(session, url, i) for i, url in enumerate(image_urls)]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    total_time = end_time - start_time
    print(f"耗时是: {total_time} 秒")
协程下载
import time
import requests
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from PIL import Image
import os


def download_image(url, name, save_path='.'):
    print(f"图片{name} 开始下载!")
    # time.sleep(1)
    try:
        response = requests.get(url)
        response.raise_for_status()  # 检查是否有请求错误

        print(f"图片{name} 获取成功!")
        image_data = BytesIO(response.content)
        # time.sleep(2)
        image = Image.open(image_data)

        save_folder = os.path.join(save_path, 'images')
        os.makedirs(save_folder, exist_ok=True)

        image.save(os.path.join(save_folder, f'image_{name}.jpg'))

        print(f"图片{name} 保存成功.")
    except requests.RequestException as e:
        print(f"图片{name} 下载失败: {e}")


def main():
    image_urls = [
        'https://img1.tuguaishou.com/ips_templ_preview/69/76/1b/lg_5811401_1701160656_6565a6d092aee.jpg!w440?auth_key=1701387000-0-0-ed2a2dbae71d8fa6e08da8dca5662d7d',
        'https://img1.tuguaishou.com/ips_templ_small/ec/b1/ee/sm_5850428_1701141850_65655d5a10191.jpg!w440?auth_key=1701387000-0-0-61b975a896823b40c461a4a7d6c204af',
        'https://img2.tuguaishou.com/ips_templ_small/2a/9d/a5/sm_5850288_1701281313_65677e21d7bb2.jpg!w440?auth_key=1701387000-0-0-850ab5453894a2313ef7690125379b34'
    ]

    with ThreadPoolExecutor(max_workers=3) as executor:
        for i, url in enumerate(image_urls):
            executor.submit(download_image, url, i)


if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    total_time = end_time - start_time
    print(f"耗时是: {total_time} 秒")
多线程下载

四、异步编程

事件循环

事件循环是一种用于管理异步任务执行的机制,通常在异步编程中使用。事件循环会不断地检查事件队列,执行已准备好的任务,以此来实现非阻塞的异步执行。
任务列表 = [任务1,任务2,任务3...]
while True:
可执行任务列表,已完成任务列表 = 去 任务列表 中检查所有任务,筛选出可执行任务和已完成任务
for 就绪任务 in 可执行任务列表:
执行就绪任务
for 已完成任务 in 已完成任务列表:
在任务列表中移除已完成任务

如果任务列表中的任务都已完成,则终止循环
import asyncio

# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到“任务列表”
loop.run_util_complate(任务)
伪代码
import asyncio


class EventLoop:
    def __init__(self):
        self.tasks = []  # 存储异步任务的队列

    def add_task(self, task):
        self.tasks.append(task)

    async def run_until_complete(self):
        while self.tasks:
            task = self.tasks.pop(0)  # 从队列中取出一个任务
            result = await task()  # 使用 await 执行异步任务
            if result is not None:
                self.add_task(result)  # 如果任务返回了新的任务,添加到队列中


# 示例:定义两个简单的异步任务
async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)  # 模拟异步操作,这里是暂停2秒
    print("Task 1 completed")
    return task2  # 返回另一个任务


async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 completed")


# 创建事件循环
event_loop = EventLoop()

# 将异步任务添加到事件循环
event_loop.add_task(task1)

# 运行事件循环,直到所有任务完成
asyncio.run(event_loop.run_until_complete())
一个简单的demo

Python 中的协程通过 async def 关键字定义协程函数,协程函数执行时会返回一个协程对象。协程函数的调用并不会立即执行,而是返回一个协程对象。要执行协程,需要将协程对象添加到事件循环中并等待其执行。在上面的例子中,task1task2 就是协程对象,它们是通过调用协程函数 task1()task2() 创建的。

await 是在协程函数内部用于等待异步操作完成的关键字。在 Python 中,await 通常与异步协程函数一起使用,用于挂起当前协程的执行,等待一个异步操作的结果。

主要特点和使用方式如下:

  1. 等待异步操作: 在协程函数内,使用 await 关键字可以挂起当前协程的执行,让出控制权,等待一个异步操作完成。这通常是异步 I/O 操作、定时器或其他异步任务。

  2. 协程函数内使用: await 只能在协程函数内使用,而不能在普通函数或全局作用域中使用。一个函数如果包含了 await,必须使用 async def 定义为协程函数。

  3. 异步对象: await 后面通常跟随一个返回协程对象的表达式,例如异步函数调用、协程函数调用或者异步操作。这个表达式的结果应该是一个协程对象,否则会引发 TypeError

  4. 非阻塞等待: await 的关键作用是进行非阻塞等待,即在等待异步操作的同时,事件循环可以继续执行其他协程,不会阻塞整个程序。

import asyncio


# 通过async关键字的协程函数
async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)  # 模拟异步操作,这里是暂停2秒
    print("Task 1 completed")


# python 3.7 之后,可以通过run去执行
asyncio.run(task1())  # 调用协程函数获取到协程对象
一个简单的demo

Task对象:

Task 对象是 asyncio 模块中的核心概念之一,用于管理异步操作的执行。它是对协程的封装,允许将协程添加到事件循环中执行,并方便地管理协程的状态和结果。Task 对象是 Future 的子类,因此继承了 Future 的所有特性。

  • 创建 Task 对象: 使用 asyncio.create_task() 函数或 asyncio.ensure_future() 函数创建 Task 对象,将协程转化为可调度的任务。

  • 运行 Task: 将 Task 对象添加到事件循环中执行,可以使用 asyncio.run() 函数或者事件循环的 run_until_complete() 方法。

  • 取消 Task: 通过调用 Task 对象的 cancel() 方法取消任务。取消操作将引发 CancelledError 异常,可以在协程中捕获以执行清理操作。

  • 等待 Task 完成: 使用 await 关键字或 asyncio.gather() 函数等待任务的完成。如果任务已经完成,await 将立即返回结果。

  • 获取 Task 的结果: 通过 Task 对象的 result() 方法获取任务的执行结果。这将阻塞等待任务的完成。

  • 异常处理: 使用 try 和 except 块捕获任务中的异常。未捕获的异常将导致任务状态变为 exception。

import asyncio


async def my_coroutine():
    # 协程逻辑
    pass


async def main():
    # 创建 Task 对象
    my_task = asyncio.create_task(my_coroutine())

    # 或者使用 ensure_future
    my_task = asyncio.ensure_future(my_coroutine())

    # 运行 Task
    await my_task

    # 取消 Task
    my_task.cancel()

    # 等待 Task 完成
    await my_task

    # 获取 Task 的结果
    result = my_task.result()

    # 异常处理
    try:
        await my_task
    except Exception as e:
        print(f"Task raised an exception: {e}")


# 运行事件循环
asyncio.run(main())
View Code

Feture对象:

Future 对象是 asyncio 中表示异步操作结果的占位符。它是一个可等待对象,用于在异步操作完成时存储结果。Future 对象通常由事件循环自动创建。

  • 创建 Future 对象: 通常由事件循环在执行异步操作时自动创建。可以使用 asyncio.Future() 显式地创建一个 Future 对象。

  • 设置 Future 的结果: 使用 set_result() 方法为 Future 对象设置结果。一旦结果被设置,与该 Future 相关的所有等待者将被唤醒。

  • 异步操作完成时获取结果: 使用 await 关键字或 add_done_callback() 方法等待异步操作完成,并获取 Future 的结果。

  • 异常处理: 使用 set_exception() 方法设置 Future 的异常。在等待 Future 完成时,可以捕获异常。

import asyncio

async def main():
    # 创建 Future 对象
    my_future = asyncio.Future()

    # 设置 Future 的结果
    my_future.set_result("Hello, World!")

    # 异步操作完成时获取结果
    result = await my_future

    # 异常处理
    # my_future.set_exception(ValueError("An error occurred!"))

    # 通过 add_done_callback() 方法添加回调函数
    def callback(future):
        result = future.result()
        print(f"Future completed with result: {result}")

    my_future.add_done_callback(callback)

# 运行事件循环
asyncio.run(main())
View Code

这两个对象是异步编程中的关键工具,它们简化了异步操作的管理和处理。通过合理使用 Task 和 Future 对象,可以更高效地编写和维护异步代码。

四、同步方法==》异步方法

如果你有一个普通的同步方法(阻塞式方法),而你的系统中都是异步形式的,你可以使用 loop.run_in_executor 方法将同步方法包装为异步方法。

import asyncio


def sync_method(arg1, arg2):
    # 同步逻辑
    return f"Result from sync_method with args: {arg1}, {arg2}"


async def async_method(arg1, arg2):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sync_method, arg1, arg2)
    # 异步逻辑使用 result
    print(result)


async def main():
    # 调用异步方法
    await async_method("value1", "value2")


# 运行事件循环
asyncio.run(main())
一个小demo
import functools
import time

import requests
import asyncio
import os

from io import BytesIO
from PIL import Image


async def download_image(url, name, save_path='.'):
    print(f"图片{name} 开始下载!")
    try:
        loop = asyncio.get_event_loop()
        feature = loop.run_in_executor(None, requests.get, url)
        response = await feature

        if response.status_code == 200:
            print(f"图片{name} 获取成功!")
            # 使用 BytesIO 将图片内容转换为二进制流
            image_data = BytesIO(response.content)
            await asyncio.sleep(2)
            # 使用 PIL 库打开图片
            image = Image.open(image_data)

            # 检查保存路径是否存在,不存在则创建
            save_folder = os.path.join(save_path, 'images')
            os.makedirs(save_folder, exist_ok=True)

            # 保存图片到本地 异步调整
            image.save(os.path.join(save_folder, f'image_{name}.jpg'))

        print(f"图片{name} 保存成功.")
    except  Exception as e:
        print(f"图片{name} 下载失败: {e}")


async def main():
    image_urls = [
        'http://img.tuguaishou.com/ips_templ_preview/8a/b9/67/lg_3796260_1614914426_6041a37a2ef0e.jpg!w440?auth_key=1897344000-0-0-80c9604a8c2fc51a9fa5df0bb98191b3',
        'http://img1.tuguaishou.com/ips_templ_small/ec/b1/ee/sm_5850428_1701141850_65655d5a10191.jpg!w440?auth_key=1701387000-0-0-61b975a896823b40c461a4a7d6c204af',
        'http://img2.tuguaishou.com/ips_templ_small/2a/9d/a5/sm_5850288_1701281313_65677e21d7bb2.jpg!w440?auth_key=1701387000-0-0-850ab5453894a2313ef7690125379b34'
    ]

    tasks = [download_image(url, i) for i, url in enumerate(image_urls)]
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    total_time = end_time - start_time
    print(f"耗时是: {total_time} 秒")
将上面下载图片的方式同步和异步结合

uvloop:

uvloop 是一个针对 Python 的 asyncio 事件循环的高性能实现。它基于 libuv,这是一个高性能的事件循环库,被广泛用于构建高性能的异步应用程序。uvloop 的目标是通过替换 Python 标准库中的事件循环实现,提供更好的性能。

以下是关于 uvloop 的一些主要特点和优势:

  1. 更高的性能: uvloop 在性能方面通常比标准的 asyncio 事件循环更快。这得益于其基于 libuv 的底层实现,libuv 是一个专为异步 I/O 设计的跨平台库。

  2. 更低的延迟: uvloop 通过使用更高效的事件循环实现,可以显著减少异步任务的启动和执行之间的延迟。

  3. 更好的并发性: uvloop 的实现针对高并发场景进行了优化,使得在大量并发连接的情况下表现更好。

  4. 兼容性: uvloop 设计成与标准的 asyncio 事件循环兼容,因此你可以在现有的 asyncio 代码中无缝使用它。只需安装 uvloop 并在代码中导入即可。

  5. 易于安装: uvloop 通过 PyPI(Python Package Index)提供,并且可以使用常见的包管理工具(例如 pip)轻松安装。pip install uvloop

import asyncio
import uvloop

# 使用 uvloop 替代默认的 asyncio 事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def main():
    # 你的异步代码

if __name__ == "__main__":
    asyncio.run(main())
View Code