Python协程详解(二)

时间:2022-09-17 23:35:10

上一章,我们介绍了Python的协程,并讲到用yield达到协程的效果,这一章,我们来介绍yield from的结构和作用

我们先来对比下yield和yield from的用法

def first_gen():
    for c in "AB":
        yield c
    for i in range(0, 3):
        yield i


print(list(first_gen()))


def second_gen():
    yield from "AB"
    yield from range(0, 3)


print(list(second_gen()))

  

运行结果:

['A', 'B', 0, 1, 2]
['A', 'B', 0, 1, 2]

  

我们可以看到,两个方法都可以达到一样的效果,但是second_gen()方法比first_gen()方法来的简练,在second_gen()中使用yield from subgen()时,second_gen()是陷入阻塞的,真正在交互的是调用second_gen()的那一方,也就是调用方和subgen()在交互,second_gen()的阻塞状态会一直持续到subgen()调用完毕,才会接着执行yield from后续的代码

再来看下面两个例子:

第一个例子:

def chain(*iterables):
    for it in iterables:
        yield from it


s = "ABC"
t = tuple(range(0, 3))
print(list(chain(s, t)))

  

运行结果:

['A', 'B', 'C', 0, 1, 2]

  

第二个例子:

from collections import Iterable


def flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            yield from flatten(x)  # 这里递归调用,如果x是可迭代对象,继续分解
        else:
            yield x


items = [1, 2, [3, 4, [5, 6], 7], 8]

# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):
    print(x)

items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
    print(x)

  

运行结果:

1
2
3
4
5
6
7
8
Dave
Paula
Thomas
Lewis

  

yield from item 表达式对item对象所做的第一件事,就是调用iter(item),从中获取迭代器,因此,item可以是任何一个可迭代的对象,在某些时候,yield from可以代替for循环,使得我们的代码更加的精炼,yield from是委派生成器,主要功能还是和yield一样,打开双向通道,把最外层的调用方和实际在传输值的生成器连接起来,这样,二者就可以发送和产出值

下面一个例子,让我们用yield from计算平均值并返回统计报告,假设我们有若干男孩和女孩,我们分别拥有这些男孩和女孩的体重和身高,现在我们要分别计算男孩女孩的体重、身高的平均值

from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():  # <1>
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # <2>
        if term is None:  # <3>
            break
        total += term
        count += 1
        average = total / count
    return Result(count=count, average=average)  # <4>


def grouper(results, key):  # <5>
    while True:  # <6>
        results[key] = yield from averager()  # <7>


def main(data):
    results = {}
    for key, values in data.items():
        group = grouper(results, key)
        next(group)  # <8>
        for value in values:
            group.send(value)
        group.send(None)  # <9>
    report(results)


def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(";")
        print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))


data = {
    'girls;kg':
        [40.9, 38.5, 44.3],
    'girls;m':
        [1.6, 1.51, 1.4],
    'boys;kg':
        [39.0, 40.8],
    'boys;m':
        [1.38, 1.5],
}
if __name__ == '__main__':
    main(data)

      

运行结果:

 3 boys  averaging 41.00kg
 3 boys  averaging 1.40m
 3 girls averaging 41.23kg
 3 girls averaging 1.50m

  

<1>中的yield标明averager是一个生成器,这里我们在grouper()方法中调用yield from averager(),所以averager是一个子生成器

<2>main()函数中的代码发送值绑定到term上

<3>当子生成器发现客户端代码发送一个None时,跳出循环,如果不那么做,averager()方法永远不会返回我们要的统计量,也就是Result对象,我们可以通过可以关闭委托生成器,但Result对象依旧无法返回

<4>当调用方传入一个None值,子生成器跳出循环,并返回Result对象

<5>yield from averager()表达式拿到上一个步骤返回的Result对象,赋值给左边的变量

<6>这个循环每次迭代时会新建一个averager生成器的实例,每个实例都是作为协程使用的生成器对象

<7>grouper会在yield from处暂停,等待调用方和子生成器交互完毕后,子生成器返回Result对象,grouper把返回的结果绑定到result[key]上

<8>我们依旧用next()方法激活一个委派生成器,然后我们开始不断往委派生成器中输送值

<9>当我们将值输送完毕后,传入None方法,使得自生成器内部跳出循环,并计算Result对象返回给grouper委托生成器存储起来

 

让我们用协程模拟一个出租车运营场景,系统会创建几辆出租车,每辆车都会拉客,每辆车拉客的次数不同,而每次拉客都有乘客上车和乘客下车这一动作,出租车继续四处拉客,当到达各自拉客的次数后,出租车回家。

出租车运营程序:

import random
import collections
import queue

DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5

Event = collections.namedtuple('Event', ["time", "proc", "action"])


def taxi_process(ident, trips, start_time=0):
    time = yield Event(start_time, ident, '出库')
    for i in range(trips):  
        time = yield Event(time, ident, '开始拉客')
        time = yield Event(time, ident, '乘客下车')
    yield Event(time, ident, '车入库')


class Simulator:
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)

    def run(self, end_time):
        for _, proc in sorted(self.procs.items()):
            first_event = next(proc)
            self.events.put(first_event)
        sim_time = 0
        while sim_time < end_time:
            if self.events.empty():
                print('*** 事件结束 ***')
                break
            current_event = self.events.get()
            sim_time, proc_id, previous_action = current_event
            print('taxi:', proc_id, proc_id * '   ', current_event)
            active_proc = self.procs[proc_id]
            next_time = sim_time + compute_duration(previous_action)
            try:
                next_event = active_proc.send(next_time)
            except StopIteration:
                del self.procs[proc_id]
            else:
                self.events.put(next_event)
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))


def compute_duration(previous_action):
    if previous_action in ['出库', '乘客下车']:
        interval = SEARCH_DURATION
    elif previous_action == '开始拉客':
        interval = TRIP_DURATION
    elif previous_action == '车入库':
        interval = 1
    else:
        raise ValueError('Unknown previous_action: %s' % previous_action)
    return int(random.expovariate(1 / interval)) + 1


def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS):
    taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL)
             for i in range(num_taxis)}
    print("出租车数量:", len(taxis))
    sim = Simulator(taxis)
    sim.run(end_time)


if __name__ == '__main__':
    main()

      

执行结果:

出租车数量: 3
taxi: 0  Event(time=0, proc=0, action='出库')
taxi: 1     Event(time=5, proc=1, action='出库')
taxi: 0  Event(time=6, proc=0, action='开始拉客')
taxi: 0  Event(time=7, proc=0, action='乘客下车')
taxi: 0  Event(time=9, proc=0, action='开始拉客')
taxi: 2        Event(time=10, proc=2, action='出库')
taxi: 2        Event(time=12, proc=2, action='开始拉客')
taxi: 1     Event(time=24, proc=1, action='开始拉客')
taxi: 1     Event(time=26, proc=1, action='乘客下车')
taxi: 1     Event(time=28, proc=1, action='开始拉客')
taxi: 0  Event(time=33, proc=0, action='乘客下车')
taxi: 0  Event(time=36, proc=0, action='车入库')
taxi: 2        Event(time=39, proc=2, action='乘客下车')
taxi: 1     Event(time=41, proc=1, action='乘客下车')
taxi: 2        Event(time=41, proc=2, action='开始拉客')
taxi: 1     Event(time=45, proc=1, action='开始拉客')
taxi: 1     Event(time=51, proc=1, action='乘客下车')
taxi: 1     Event(time=52, proc=1, action='开始拉客')
taxi: 1     Event(time=55, proc=1, action='乘客下车')
taxi: 1     Event(time=61, proc=1, action='车入库')
taxi: 2        Event(time=67, proc=2, action='乘客下车')
taxi: 2        Event(time=70, proc=2, action='开始拉客')
taxi: 2        Event(time=75, proc=2, action='乘客下车')
taxi: 2        Event(time=80, proc=2, action='开始拉客')
taxi: 2        Event(time=87, proc=2, action='乘客下车')
taxi: 2        Event(time=90, proc=2, action='开始拉客')
taxi: 2        Event(time=96, proc=2, action='乘客下车')
taxi: 2        Event(time=97, proc=2, action='开始拉客')
taxi: 2        Event(time=98, proc=2, action='乘客下车')
taxi: 2        Event(time=106, proc=2, action='车入库')
*** 事件结束 ***

      

利用协程,我们可以看到每辆出租车各自的出库、拉客、乘客下车、再拉客,直到车入库虽然是顺序执行,但车与车之间的行为却是并行,这里我们分析一下主要的代码块

首先是我们的taxi_process()方法:

def taxi_process(ident, trips, start_time=0):
    time = yield Event(start_time, ident, '出库')
    for i in range(trips):  # <1>
        time = yield Event(time, ident, '开始拉客')
        time = yield Event(time, ident, '乘客下车')
    yield Event(time, ident, '车入库')

  

这个方法会返回一个协程,这个协程控制着车的出库时间,循环拉客的次数以及入库时间,<1>处的trips就代表这辆车可以拉几次乘客

再来是Simulator模块

class Simulator:
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()  # <1>
        self.procs = dict(procs_map)  

    def run(self, end_time):
        for _, proc in sorted(self.procs.items()):
            first_event = next(proc)  # <2>
            self.events.put(first_event) 
        sim_time = 0
        while sim_time < end_time:
            if self.events.empty():  
                print('*** 事件结束 ***')
                break
            current_event = self.events.get()  
            sim_time, proc_id, previous_action = current_event  # <3>
            print('taxi:', proc_id, proc_id * '   ', current_event)
            active_proc = self.procs[proc_id]
            next_time = sim_time + compute_duration(previous_action) 
            try:
                next_event = active_proc.send(next_time) 
            except StopIteration:
                del self.procs[proc_id] 
            else:
                self.events.put(next_event)  # <4>
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))  # <5>

  

我们在<1>处生成一个队列,这个队列是后续协程在模拟出租车运营时需要用到的,之后我们copy一个字典,这个字典key是每辆车的id,value是每辆车的生成器

在<2>处,我们激活每辆车的生成器,把每辆车生成器返回的初始状态存入事先准备好的队列中

在<3>处,我们会从队列循环取出每辆车当前的状态,并从先前copy好的字典里取出每辆车的生成器,计算该车下一次行动的时间,并传入给生成器,如果生成器抛出StopIteration异常,则代表该生成器已结束,则在异常捕捉处从字典里删去该车的记录

在我们往生成器传入值的时候,如果生成器没有报错,则会返回车的下一个状态,而我们把最新的状态存入队列中,如果循环,一直到队列为空为止,则跳出循环,又或者我们设定的初始时间大于终止时间,则跳出循环,进入到<5>打印队列中海油多少个事件