Python 队列queue与多线程组合(生产者+消费者模式)

发布时间 2023-09-13 16:12:50作者: 北京涛子

参考

* https://www.jianshu.com/p/8a9af2e7e1b4
* https://www.kancloud.cn/noahs/python/956687

程序

import threading
import time
import queue


def producer(name, q, data):  # 生产者,从data里面取数据,塞入队列q,如果q已满,则等待
    for i in data:
        q.put(i)
        print('Producer {} put {}; '.format(name, i))
        time.sleep(1)
    q.put(None)  # 将None放入queue作为标记给生产者用


def consumer(name, q):  # 消费者,从q里拿出数据并处理
    while True:  # 通过while的方式推动生产者不断尝试从queue中取数据
        get_result = q.get()  # 从队列里挨个取出数据
        q.task_done()  # 发送当前q中对应元素已被取出的消息,与后面的q.join()配合形成阻塞,保证q结束再执行其他程序,否则后面的q.join()将永远阻塞(后面程序无法执行)

        if get_result is None:  # 判断生产者是否已经结束,即将data的所有数据都加入queue中
            q.put(None)  # 该步很重要,当producer()放入的None被某个consumer()抽取后,其他consumer()就没有结束标志了。缺点是最后queue中始终留有结束标志
            print("All data have been tooken out!")
            break

        print('consumer {} get {}; '.format(name, get_result))  # 输出当前从queue中得到的数据
        time.sleep(0.1)


def main():
    data = list(range(10))  # 待处理的原始数据

    consumer_names = ['甲', '乙', '丙']  # 3个消费者

    q = queue.Queue(maxsize=5)  # 生成一个最大容量为5的queue

    threads = []  # 线程列表
    p = threading.Thread(target=producer, args=('生产者', q, data))  # 生成一个生产者线程对象,该生产者名为A
    threads.append(p)  # 将该线程加入线程列表

    for consumer_name in consumer_names:  # 为每一个消费者分配一个线程
        t = threading.Thread(target=consumer, args=(consumer_name, q))  # 消费者都从q里面拿数据
        threads.append(t)  # 将五个消费者线程加入线程列表

    for i in threads:  # 并列启动所有线程
        i.setDaemon(True)  # 保证子线程在主线程退出时,无论出于什么状况都强制退出
        i.start()

    for i in threads:  # 将所有线程阻塞(即,不执行完,就不执行后面程序)
        i.join()

    # 判断q里面是否还有剩余的对象没有处理(包括None),有则挨个拿出,否则q不为空后面的q.join()将一直阻塞(后面程序无法执行)
    if not q.empty():
        for i in range(q.qsize()):
            q.get()
            q.task_done()  # 不能删除,作用于前一个q.task_done()相同,

    q.join()  # 保证q阻塞,接受前面所有的q.task_done()发来的信息,否则程序一直停在该处不往下执行。(必须保证前面任何一处出现q.get()后都有q.task_done())

    print("Program is over!")


if __name__ == '__main__':
    main()

输出

Producer 生产者 put 0; 
consumer 甲 get 0; 
Producer 生产者 put 1; 
consumer 乙 get 1; 
Producer 生产者 put 2; 
consumer 丙 get 2; 
Producer 生产者 put 3; 
consumer 甲 get 3; 
Producer 生产者 put 4; 
consumer 乙 get 4; 
Producer 生产者 put 5; 
consumer 丙 get 5; 
Producer 生产者 put 6; 
consumer 甲 get 6; 
Producer 生产者 put 7; 
consumer 乙 get 7; 
Producer 生产者 put 8; 
consumer 丙 get 8; 
Producer 生产者 put 9; 
consumer 甲 get 9; 
All data have been tooken out!
All data have been tooken out!
All data have been tooken out!
Program is over!