Producers and Consumers Model

发布时间 2023-03-29 20:44:48作者: MrSphere

生产者消费者模型要点

主要使用 JoinableQueue, Process 类

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。
通知进程是使用共享的信号和条件变量来实现的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。

方法介绍:

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。
如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

-*- coding: utf-8 -*-

import os
import random
import time
from multiprocessing import JoinableQueue, Process


def producers(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        food = '%s-%s' % (name, i)
        q.put(food)
        print(f"{os.getpid()} produce {food}")
    # what is the q.join() meaning? producer call the function to make sure all the food was token by consumers.
    q.join()


def consumers(q):
    while True:
        food = q.get()
        time.sleep(random.randint(1, 3))
        print(f"{os.getpid()} eat {food}")
        # what is the q.task_done() meaning? consumers eat all the food that it get from q(combine with the q.join).
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producers, args=('hamburgers', q,))
    p2 = Process(target=producers, args=('apple', q,))
    p3 = Process(target=producers, args=('banana', q,))
    c1 = Process(target=consumers, args=(q,), daemon=True)
    c2 = Process(target=consumers, args=(q,), daemon=True)

    # consumer process will be shut down when the main process was done. because it is daemon subprocess
    # when the main process will be done ? when the 3 subprocess of producers finished ,all the food was made.
    # why the consumer will consume all the time?
    process_tuple = (p1, p2, p3, c1, c2)

    for i in process_tuple:
        i.start()

    for i in process_tuple[:3]:
        i.join()

    print('main process end')