python实现生产者和消费者

发布时间 2023-05-09 11:31:44作者: shadown404

1 基础说明

python版本:Python 3.6.8

rabbitmq版本: rabbitmq-server-3.6.12-1.el7.noarch.rpm

 

2 python代码示例

(1)生产者

 1 import pika
 2 import json
 3 import datetime
 4 
 5 """
 6 /usr/bin/pip3 install -Iv pika -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com;
 7 /usr/bin/python3 ./rabbimq_pro_v1.py > ./num_pro.txt
 8 """
 9 
10 today_time = datetime.datetime.now()
11 credentials = pika.PlainCredentials('usermq', 'xxxxxxx')  #mq用户名和密码
12 
13 #虚拟队列需要指定参数 virtual_host,如果是默认的可以不填
14 connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'x.x.x.x' ,port = 5672 ,virtual_host = '/' ,credentials = credentials))
15 channel=connection.channel()
16 
17 #声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建; durable = True 代表exchange持久化存储,False 非持久化存储
18 channel.exchange_declare(exchange = 'python-celnumber_v1', durable = True, exchange_type='direct')
19 for i in range(200000): #写入20w的消息
20     message=json.dumps({"number":"num-%s"%i,"key00000":1234})
21 
22     #指定 routing_key, delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
23     channel.basic_publish(exchange = 'python-celnumber_v1', routing_key = 'OrderId', body = message, properties=pika.BasicProperties(delivery_mode = 2))
24     print(message)
25 connection.close()
26 print("耗时为 {0} 秒".format((datetime.datetime.now() - today_time).total_seconds()))

 

(2)消费者

 1 import pika
 2 
 3 """
 4 /usr/bin/pip3 install -Iv pika -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com;
 5 /usr/bin/python3 ./rabbitmq_cus_v1.py >> ./rabbitmq_cus.txt
 6 """
 7 
 8 credentials = pika.PlainCredentials('usermq', 'xxxxxx')
 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'x.x.x.x', port = 5672, virtual_host = '/', credentials = credentials))
10 channel = connection.channel()
11 # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
12 result = channel.queue_declare('python_cus_v1', exclusive=True)
13 # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
14 channel.exchange_declare(exchange = 'python-celnumber_v1', durable = True, exchange_type='direct')
15 # 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
16 channel.queue_bind(exchange = 'python-celnumber_v1', queue = result.method.queue, routing_key='OrderId')
17 # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
18 def callback(ch, method, properties, body):
19     ch.basic_ack(delivery_tag = method.delivery_tag)
20     print(body.decode())
21 
22 
23 #channel.basic_qos(prefetch_count=1)
24 # 告诉rabbitmq,用callback来接受消息
25 channel.basic_consume(result.method.queue, callback, auto_ack = False)
26 # 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
27 channel.start_consuming()

 

(3)操作说明

先运行消费者:/usr/bin/python3 ./rabbitmq_cus_v1.py >> ./rabbitmq_cus.txt

再运行生产者:/usr/bin/python3 ./rabbimq_pro_v1.py > ./num_pro.txt