Python协程:从yield/send到yield from再到async/await

发布时间 2023-05-27 23:36:03作者: 砚台是黑的

Python中的协程大概经历了如下三个阶段:

  1. 最初的生成器变形 yield/send
  2. 引入@asyncio.coroutine和yield from
  3. 在最近的Python3.5版本中引入async/await关键字

一、生成器变形yield/send

普通函数如果出现了yield关键字,那么该函数就不再是普通函数,而是一个生成器。

import random

def my_gen(alist):
    while len(alist) > 0:
        c = random.randint(0, len(alist) - 1)
        yield alist.pop(c)

a = ["aa", "bb", "cc"]
c = my_gen(a)
print(c)

# 输出:<generator object my_gen at 0x7f7ffc52e580>

像上面代码中的c就是一个生成器。生成器就是一种迭代器,可以使用for进行迭代。生成器函数最大的特点是可以接受外部传入的一个变量,并根据变量内容计算结果后返回。

这一切都是因为生成器内部的send函数实现的。

def gen():
    value = 0
    while True:
        recv = yield value
        if recv == "e":
            break
        value = "got: %s" % recv

g = gen()
print(g.send(None))
print(g.send("Hello"))
print(g.send(123456))
print(g.send("e"))

输出结果:

0
got: Hello
got: 123456
Traceback (most recent call last):
  File "/home/xxx/Repos/Gen/ambitious_perf_test/sil_test/coroutine/yield_example_v2.py", line 13, in <module>
    print(g.send("e"))
StopIteration

上面生成器函数最关键也是最易理解错的,就是recv = yield value 这句。如果对循环体的执行步骤理解错误,就会失之毫厘,差之千里。

其实recv = yield value包含了三个步骤:

  1. 向函数外抛出(返回)value
  2. 暂停(Pause),等待next()send()恢复
  3. 赋值recv = MockGetValue(),这个MockGetValue是假设的函数,用来接收send()发送进来的值

执行流程:

  1. 通过g.send(None)或者next(g)启动生成器函数,并执行到第一个yield语句结束的位置。这里是关键,很多人就是在这里搞糊涂的。运行recv = yield value语句时,我们按照开始说的拆开来看,实际程序只执行了 1、2两步,程序返回了value值,并暂停pause,并没有执行第3步给recv赋值。因此yield value会输出初始值0。这里要特别注意:在启动生成器函数时只能send(None),如果试图输入其他的值都会得到错误提示信息。
  2. 通过g.send('hello'),会传入hello,从上次暂停的位置继续执行,那么就是运行第三步,赋值给recv。然后计算出value的值,并回到while头部,遇到yield value,程序再次执行了 1、2两步,程序返回了value值,并暂停(pause)。此时yield value会输出got:hello,并等待send()激活。
  3. 通过g.send(123456)会重复第2步,最后输出结果为got:123456
  4. 当我们g.send("e")时,程序会执行break然后退出循环,最后整个函数执行完毕,所以会得到StopIteration异常。

从上面可以看出,在第一次send(None)启动生成器(执行 1->2,通常第一次返回的值没有什么作用)之后,对于外部的每一次send(),生成器的实际在循环中的运行顺序是3 -> 1-> 2,也就是先获取值,然后do something,然后返回一个值,再暂停等待。

二、yield from

看一段代码:

def g1():
    yield range(5)
def g2():
    yield from range(5)
    
it1 = g1()
it2 = g2()
for x in it1:
    print(x)

for x in it2:
    print(x)

输出结果:

range(0, 5)
0
1
2
3
4

这说明yield 就是将range这个可迭代对象直接返回了。

yield from iterable本质上等于for item in iterable:yield item的缩写版

来看一下例子,假设我们已经编写好一个斐波那契数列的函数:

def fab(max_arg):
    n, a, b = 0, 0, 1
    while n < max_arg:
        yield b
        a, b = b, a + b
        n = n + 1
        
f = fab(5)

fab不是一个普通函数,而是一个生成器。因此fab(5)并没有执行函数,而是返回一个生成器对象(生成器一定是迭代器iterator,迭代器一定是可迭代对象iterable

现在我们来看一下,假设要在fab()基础上实现一个函数,调用起始都要记录日志

def fab(max_arg):
    n, a, b = 0, 0, 1
    while n < max_arg:
        yield b
        a, b = b, a + b
        n = n + 1

def f_wrapper(fun_iterable):
    print("start")
    for item in fun_iterable:
        yield item
    print("end")

wrap = f_wrapper(fab(5))
for i in wrap:
    print(i, end = "\n")

输出结果:

start
1
1
2
3
5
end

现在使用 yield from 代替for循环

import logging

def fab(max_arg):
    n, a, b = 0, 0, 1
    while n < max_arg:
        yield b
        a, b = b, a + b
        n = n + 1

def f_wrapper(fun_iterable):
    print("start")
    # 注意此处必须是一个可迭代对象
    yield from fun_iterable
    print("end")
    
    
wrap = f_wrapper(fab(5))
for i in wrap:
    print(i, end = "\n")

输出结果:

start
1
1
2
3
5
end

再强调一遍:yield from后面必须跟iterable对象(可以使用 for 循环语句的对象)

三、asyncio.coroutineyield from

yield fromasyncio模块中得以发扬光大。之前都是我们手动(即需要明确的写yield from)切换协程,现在当声明函数为协程之后,我们通过事件循环来调度协程。

先看示例代码:

import asyncio, random


@asyncio.coroutine
def smart_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.2)
        # 通常yield from后都是接的耗时操作
        yield from asyncio.sleep(sleep_secs)
        print("Smart one think {} secs to get {}".format(sleep_secs, b))
        a, b = b, a + b
        index += 1
    

@asyncio.coroutine
def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.4)
        yield from asyncio.sleep(sleep_secs)
        print("Stupid one think {} secs to get {}".format(sleep_secs, b))
        a, b = b, a + b
        index += 1
    

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    tasks = [
        smart_fib(10),
        stupid_fib(10)
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print("All fib finished")
    loop.close()

运行结果:

/home/xxxx/Repos/Gen/ambitious_perf_test/sil_test/coroutine/asyncio_example_v1.py:5: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
  def smart_fib(n):
/home/xxx/Repos/Gen/ambitious_perf_test/sil_test/coroutine/asyncio_example_v1.py:19: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
  def stupid_fib(n):
Smart one think 0.11119502452643744 secs to get 1
Stupid one think 0.24183311729147616 secs to get 1
Smart one think 0.19385159869767554 secs to get 1
Smart one think 0.13225003215862918 secs to get 2
Stupid one think 0.204761573184748 secs to get 1
Smart one think 0.19205387499034643 secs to get 3
Smart one think 0.15200341615909563 secs to get 5
Stupid one think 0.3641219961931771 secs to get 2
Stupid one think 0.08332659000582675 secs to get 3
Smart one think 0.18344675102866925 secs to get 8
Smart one think 0.16137987374343296 secs to get 13
Smart one think 0.07562679157963843 secs to get 21
Stupid one think 0.39276603725693926 secs to get 5
Smart one think 0.08858547929100301 secs to get 34
Smart one think 0.05110305053990954 secs to get 55
Stupid one think 0.2222061304473443 secs to get 8
Stupid one think 0.028620222010319066 secs to get 13
Stupid one think 0.27643344112646384 secs to get 21
Stupid one think 0.20055738409780832 secs to get 34
Stupid one think 0.1776127450552333 secs to get 55
All fib finished

yield from语法可以让我们方便地调用另一个generator

本例中yield from后面接的asyncio.sleep()是一个coroutine(里面也用了yield from),所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。

asyncio是一个基于事件循环的实现异步I/O的模块。通过yield from,我们可以将协程asyncio.sleep的控制权交给事件循环,然后挂起当前协程;之后,由事件循环决定何时唤醒asyncio.sleep,接着向后执行代码。

协程之间的调度都是由事件循环决定。

yield from asyncio.sleep(sleep_secs),这里不能用time.sleep(1),因为time.sleep()返回的是None,它不是iterable,还记得前面说的yield from后面必须跟iterable对象(可以是生成器,迭代器)。

所以会报错:

yield from time.sleep(sleep_secs)
TypeError:"NoneType" object is not iterable

四、asyncawait

弄清楚了asyncio.coroutineyield from之后,在Python3.5中引入的asyncawait就不难理解了:可以将他们理解成asyncio.coroutineyield from的完美替身。当然,从Python设计的角度来说,asyncio/await让协程表面上独立于生成器而存在,将细节都隐藏于asyncio模块之下,语法更清晰明了。

加入新的关键字async,可以将任何一个普通函数变成协程

import time
import random
import asyncio


async def my_gen(alist):
    while len(alist) > 0:
        c = randint(0, len(alist) - 1)
        print(alist.pop(c))
        
a = ["aa", "bb", "cc"]
c = my_gen(a)
print(c)

输出结果:

<coroutine object my_gen at 0x7f787bcaaac0>
sys:1: RuntimeWarning: coroutine 'my_gen' was never awaited

在上面程序中,我们在前面加上async,该函数就变成一个协程了。

但是async对生成器是无效的。async无法将一个生成器转换成协程。

还是刚才那段代码,我们把print改成yield

import time
import asyncio
import random


async def my_gen(alist):
    while len(alist) > 0:
        c = randint(0, len(alist) - 1)
        yield alist.pop(c)


a = ["ss", "dd", "gg"]
c = my_gen(a)
print(c)

可以看到输出:

<async_generator object my_gen at 0x7f7f2170ad30>

并不是coroutine协程对象。

所以我们的协程代码应该是这样的

import time
import asyncio
import random


async def my_gen(alist):
    while len(alist) > 0:
        c = random.randint(0, len(alist) -1)
        print(alist.pop(c))
        await asyncio.sleep(1)
        
strlist = ["ss", "dd", "gg"]
intlist = [1, 2, 3, 4, 5]
c1 = my_gen(strlist)
c2 = my_gen(intlist)
print(c1)
print(c2)

输出:

<coroutine object my_gen at 0x7f4955753b40>
<coroutine object my_gen at 0x7f4955753c40>

要运行协程,必须使用事件循环

在上面的代码下面加上:

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    tasks = [
        c1,
        c2
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print("All fib finished.")
    loop.close()

完整代码如下:

import time
import asyncio
import random


async def my_gen(alist):
    while len(alist) > 0:
        c = random.randint(0, len(alist) -1)
        print(alist.pop(c))
        # 使用await挂起当前线程,释放控制权
        await asyncio.sleep(0.0001)

strlist = ["ss", "dd", "gg"]
intlist = [1, 2, 3]
c1 = my_gen(strlist)
c2 = my_gen(intlist)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    tasks = [
        c1,
        c2
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print("All fib finished.")
    loop.close()

输出结果:

2
gg
1
dd
4
ss
5
3
All fib finished.

五、附录

https://blog.csdn.net/soonfly/article/details/78361819