asyncio——协程

发布时间 2023-09-13 01:46:18作者: 江雪。

asyncio————协程

实现协程有这么几种方法:

  • greenlet 早期模块。

  • yield 关键字。

  • asyncio装饰器(python3.4以后)

  • async,await关键字(python3.5之后)【推荐】

greenle 实现协程

pip install greenle
  • from greenlet import greenlet
    
    def func1():
        print(1)      #第1步:输出 1
        gr2.switch()  #第3步:切换到 func2 函数
        print(2)      #第6步:输出 2
        gr2.switch()  #第7步:切换到 func2 函数,从上一次执行的位置继续向后执行
    
    def func2():
        print(3)      #第4步:输出 3
        gr1.switch()  #第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
        print(4)      #第8步:输出 4
    
    grl = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch()     #第1步:去执行 funcl 函数
    

yield关键字

  • def funcl():
        yield 1
        yield from func2()
        yield 2
    
    def func2():
        yield 3
        yield 4
    
    f1 = funcl()
    for item in f1:
        print(item)
    

asyncio模块

在python3.4及之后的版本。
  • import asyncio
    
    @asyncio.coroutine
    def func1():
        print(1)
        yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
        print(2)
    
    @asyncio.coroutine
    def func2():
        print(3)
        yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
        print(4)
    tasks =[
        asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
    ]
    1oop = asyncio.get_event_loop()
    loop.run_urtil_complete(asyncio.wait(tasks))
    

    注意:遇到IO堵塞自动切换。

async & await 关键字

在python3.5及以后的版本。
  • import asyncio
    async def func1():
        print(1)
        #网络IO请求:下载一张图片
        await asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
        print(2)
    
    async def func2():
        print(3)
        #网络IO请求:下载一张图片
        await asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
        print(4)
    
    tasks =[
        asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
    ]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    

协程的意义

在一个线程中如果遇到IO等待的时间,线程不会傻傻等,利用空闲的时间再去干点其他的事。

案例:去下载三张图片(网络IO)。

  • 普通方式

    """pip3 insta1l requests"""
    
    import requests
    
    def download_image(url):
        print("开始下载:",ur1)
        # 发送网络请求,下载图片
        response = requests.get(ur1)
        print("下载完成")
        #图片保存到本地文件
        file_name = url.rsplit('_')[-1]
        with open(file_name,mode='wb') as file_object:
            file_object.write(response.content)
    if __name__== '__main__':
        ur1_list =[
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar_chsEe12AX06A0OH_AAFOCMs8nzu621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar_chcCSV2BBICAUntfAADiJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar_chcCP12BFCMAIO83AAGg7vK0sGY193.jpg'
        ]  
        for item in url_list:
            download_image(item)
    
  • 协程方式

    """
    下载图片使用第三方模块aiohttp,请提前安装: pip3 insta11 aiohttp
    """  
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import aiohttp
    import asyncio
    
    async def fetch(session, ur1):
        print("发送请求:",ur1)
        async with session.get(ur1, verify_ss1=False) as response:
            content = await response.content.read()
            file_name = url.rsplit('_')[-1]
            with open(file_name, mode='wb') as file_object:
                file_object.write(content)
    
    async def main():
        async with aiohttp.Clientsession() as session:
            ur1_1ist =[
                'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar_ChsEe12AX06A0OH_AAocMs8nzu621.jpg',
                'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar_chcCSV2BBICAUntfAADjJFd6800429.jpg',
                'https://www3.autoimg.cn/newsdfs/g26/MOB/3C/65/120x90_0_autohomecar_chcCP12BFCmAIO83AAGq7vK0SGY193.jpg'
            ]
            tasks = [asyncio.create_task(fetch(session,ur1)) for url in url_list]
    
            await asyncio.wait(tasks
    
    if __name__ == '__main__':
        asynico.run(main())
    

异步编程

事件循环

  • 理解成一个死循环,去检测并执行某些代码。

    #伪代码
    
    任务列表 =[ 任务1,任务2,任务3,... ]
    
    while True:
        可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
    
        for 就绪任务 in 可执行的任务列表:
            执行已就绪的任务
    
        for 已完成的任务 in 已完成的任务列表:
            在任务列表中移除 已完成的任务
    
        如果 任务列表 中的任务都已完成,则终止循环 
    
  • import asyncio
    
    # 去生成或获取一个事件循环
    loop = asyncio.get_event_1oop()
    
    # 将任务放到'任务列表'
    loop.run_until_complete(任务)
    

快速上手

  • 协程函数,定义函数的时候,async def 函数名。

  • 协程对象, 执行 协程函数()得到的协程对象。

    async def func():
        pass
    
    result = func()
    

    执行协程函数创建协程对象,函数内部代码不会执行。

    如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。

    import asyncio
    
    async def func():
        print("快来!")
    
    result = func()
    
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(result)
    asyncio.run(result)  # python3.7
    

await

await + 可等待的对象(协程对象,Future对象,Task对象->>>IO等待)

  • 示例1:

    import asyncio
    
    async def func():
        print("快来!")
        response = await asyncio.sleep(2)
        print("结束",response)
    
    asyncio.run(func())
    
    #  执行结果:输出--> 快来   
    #      两秒后输出--> 结束
    
  • 示例2:

    import asyncio
    
    async def others() :
        print("start")
        await asyncio.sleep(2)
        print("end")
        return "返回值"
    
    async def func() :
        print("执行协程函数内部代码")
        # 遇到I0操作挂起当前协程(任务),等I0操作完成之后再继续往下执行,当前协程挂起时,事件循环可以去执行其他协程(任务)
        response = await others()
        print("IO请求结束,结果为:",response)
    
    asyncio.run(func())
    
    #  执行结果:输出--> 执行协程函数内部代码  start   
    #      两秒后输出--> end  IO请求结束,结果为:返回值
    
  • 示例3:

    import asyncio
    
    async def others():
        print("start")
        await asyncio.sleep(2)
        print("end")
        return "返回值"
    
    async def func():
        print("执行协程函数内部代码")
        # 遇到I0操作挂起当前协程(任务),等I0操作完成之后再继续往下执行,当前协程挂起时,事件循环可以去执行其他协程(任务)
        response1 = await others()
        print("IO请求结束,结果为:",response1)
        response2 = await others()
        print("IO请求结束,结果为:",response2)
    
    asyncio.run(func())
    
    #  执行结果:输出--> 执行协程函数内部代码  start   
    #      两秒后输出--> end  IO请求结束,结果为:返回值  start
    #      两秒后输出--> end  IO请求结束,结果为:返回值
    
  • 综合以上三个示例可以得出结论:await 就是等待对应的值得到结果之后再继续往下走。

Task对象

  • 在事件循环中添加多个任务的。

  • 官方文档:Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行,除了使用asyncio.create_task()函数以外,还可以用低层级的loop.create_task()或ensure_future()函数,不建议手动实例化Task对象。

注意:asyncio.create_task()函数在Python3.7中被加入,在Python3.7之前,可以改用低层级的asyncio.ensure_future()函数

  • 示例1:

    import asyncio
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    async def main() :
        print("main开始")
    
        # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)
        task1 = asyncio.create_task(func())
        # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)
        task2 = asyncio.create_task(func())
    
        print("main结束")
    
        # 当执行某协程遇到I0操作时,会自动化切换执行其他任务
        # 此处的await时等待相对应的协程全部执行完毕并获取结果
        ret1 = await task1
        ret2 = await task2
        print(ret1, ret2)
    
    asyncio.run(main())
    
    #  执行结果:输出--> main开始 main结束 1 1
    #      两秒后输出--> 2 2 返回值 返回值   
    
  • 示例2:

    import asyncio
    
    async def func() :
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    async def main():
        print("main开始")
        task_list = [
            asyncio.create_task(func()),
            asyncio.create_task(func())
        ]
        print("main结束")
        # 当执行某协程遇到I0操作时,会自动化切换执行其他任务,
        # 此处的await时等待相对应的协程全部执行完毕并获取结果
        done, pending = await asyncio.wait(task_list, timeout=None)
        print(done)
    
    asyncio.run(main())
    

异步迭代器

迭代器

  • 在他内部实现了iter方法和next方法的对象

可迭代对象

  • 在他的类里面实现了iter方法并且返回一个迭代器

什么是异步迭代器

  • 实现了__aiter__()和__anext__()方法的对象。__anext__()必须返回一个awaitable对象。async for会
    处理异步迭代器的__anext__()方法所返回的可等待对象,直到其引发一个StopAsyncIteration异常。由
    PEP 492 引入。

什么是异步可迭代对象

  • 可在async for 语句中被使用的对象,必须通过他的__aiter__()方法返回一个asynchronous iterator。
    (异步迭代器)由PEP 492 引入。

  • 示例:

    import asyncio
    
    class Reader(object):
        """自定义异步迭代器 (同时也是异步可迭代对象) """
        def __init__(self):
            self.count = 0
    
        async def readline(self):
            # await asyncio.sleep(1)
            self.count += 1
            if self.count == 100:
              return None
            return self.count
    
        def __aiter__(self):
          return self
    
        async def __anext__(self):
            val = await self.readline()
            if val == None:
              raise StopAsynciteration
            return val
    
    async def func():
        obj = Reader()
        async for item in obj:
            print(item)
    
    asyncio.run(func())
    

    async for 必须写在协程函数里面,不然会报错

异步上下文管理器

  • 此种对象通过定义__aenter__()和__aexit__()方法来对async with语句中的环境进行控制。
    由PEP 492 引入。

  • 示例:

    import asyncio
    
    class AsyncContextManager(object):
        def __init__(self):
            self.conn = conn
    
        async def do_something(self):
            # 异步操作数据库
            return 666
    
        async def __aenter__(self):
            # 异步链接数据库
            self.conn = await asyncio.sleep(1)
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # 异步关闭数据库链接
            await asyncio.sleep(1)
    
    async def func():
        async with AsyncContextManager() as f:
            result = await f.do_something()
            print(result)
    
    asyncio.run(func())
    

异步操作redis

  • 在使用python代码操作redis时,链接/操作/断开都是网络IO。

pip install aioredis

  • 示例:

    import asyncio
    import aioredis
    
    async def execute(address,password):
        print("开始执行",address)
    
        # 网络IO操作:先去连接 47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93,4.198:6379
        redis = await aioredis.create_redis_pool(address, password=password)
    
        # 网络IO操作:遇到I0会自动切换任务
        await redis.hmset_dict('car', keyl=1, key2=2, key3=3)
    
        # 网络IO操作: 遇到I0会自动切换任务
        result = await redis.hgetall('car', encoding='utf-8')
        print(result)
    
        redis.close()
        # 网络IO操作: 遇到IO会自动切换任务
        await redis.wait_closed()
    
        print("结束", address)
    
      
    task_list = [
        execute("redis://47.93.4.197:6379", "root!2345"),
        execute("redis://47.93.4.198:6379", "root!2345")
    ]
    
    asyncio.run(asyncio.wait(task_list))
    

异步操作mysql

pip install aiomysql

  • 示例:

    import asyncio
    import aiomysql
    
    async def execute(host, password):
        print("开始", host)
        # 网络IO操作: 先去连接 47,93,40.197,遇到IO则自动切换任务,去连接47.93,40.198:6379
        conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
    
        # 网络IO操作: 遇到IO会自动切换任务
        cur = await conn.cursor()
    
        # 网络IO操作:遇到IO会自动切换任务
        await cur.execute("SELECT Host,User FROM user")
    
        # 网络IO操作: 遇到IO会自动切换任务
        result = await cur.fetchall()
        print(result)
    
        # 网络TO操作: 遇到IO会自动切换任务
        await cur.close()
        conn.close()
        print("结束", host)
    
    task_list = [
        execute("47.93.40.197", "root!2345"),
        execute("47.93,40.197", "root!2345")
        ]
    asyncio.run(asyncio.wait(task_list))