参考
* https://www.jianshu.com/p/8a9af2e7e1b4
* https://www.kancloud.cn/noahs/python/956687
程序
import threading
import time
import queue
def producer(name, q, data): # 生产者,从data里面取数据,塞入队列q,如果q已满,则等待
for i in data:
q.put(i)
print('Producer {} put {}; '.format(name, i))
time.sleep(1)
q.put(None) # 将None放入queue作为标记给生产者用
def consumer(name, q): # 消费者,从q里拿出数据并处理
while True: # 通过while的方式推动生产者不断尝试从queue中取数据
get_result = q.get() # 从队列里挨个取出数据
q.task_done() # 发送当前q中对应元素已被取出的消息,与后面的q.join()配合形成阻塞,保证q结束再执行其他程序,否则后面的q.join()将永远阻塞(后面程序无法执行)
if get_result is None: # 判断生产者是否已经结束,即将data的所有数据都加入queue中
q.put(None) # 该步很重要,当producer()放入的None被某个consumer()抽取后,其他consumer()就没有结束标志了。缺点是最后queue中始终留有结束标志
print("All data have been tooken out!")
break
print('consumer {} get {}; '.format(name, get_result)) # 输出当前从queue中得到的数据
time.sleep(0.1)
def main():
data = list(range(10)) # 待处理的原始数据
consumer_names = ['甲', '乙', '丙'] # 3个消费者
q = queue.Queue(maxsize=5) # 生成一个最大容量为5的queue
threads = [] # 线程列表
p = threading.Thread(target=producer, args=('生产者', q, data)) # 生成一个生产者线程对象,该生产者名为A
threads.append(p) # 将该线程加入线程列表
for consumer_name in consumer_names: # 为每一个消费者分配一个线程
t = threading.Thread(target=consumer, args=(consumer_name, q)) # 消费者都从q里面拿数据
threads.append(t) # 将五个消费者线程加入线程列表
for i in threads: # 并列启动所有线程
i.setDaemon(True) # 保证子线程在主线程退出时,无论出于什么状况都强制退出
i.start()
for i in threads: # 将所有线程阻塞(即,不执行完,就不执行后面程序)
i.join()
# 判断q里面是否还有剩余的对象没有处理(包括None),有则挨个拿出,否则q不为空后面的q.join()将一直阻塞(后面程序无法执行)
if not q.empty():
for i in range(q.qsize()):
q.get()
q.task_done() # 不能删除,作用于前一个q.task_done()相同,
q.join() # 保证q阻塞,接受前面所有的q.task_done()发来的信息,否则程序一直停在该处不往下执行。(必须保证前面任何一处出现q.get()后都有q.task_done())
print("Program is over!")
if __name__ == '__main__':
main()
输出
Producer 生产者 put 0;
consumer 甲 get 0;
Producer 生产者 put 1;
consumer 乙 get 1;
Producer 生产者 put 2;
consumer 丙 get 2;
Producer 生产者 put 3;
consumer 甲 get 3;
Producer 生产者 put 4;
consumer 乙 get 4;
Producer 生产者 put 5;
consumer 丙 get 5;
Producer 生产者 put 6;
consumer 甲 get 6;
Producer 生产者 put 7;
consumer 乙 get 7;
Producer 生产者 put 8;
consumer 丙 get 8;
Producer 生产者 put 9;
consumer 甲 get 9;
All data have been tooken out!
All data have been tooken out!
All data have been tooken out!
Program is over!