【5.0】RabbitMQ使用之闲置消费

发布时间 2023-09-10 19:41:41作者: Chimengmeng

【一】闲置消费介绍

  • 正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者

  • 但是可能第一个消息的消费者处理消息很耗时,一直没结束,就可以让第二个消费者优先获得闲置的消息

  • 传统情况下,如果有多个消费者,消息会按顺序依次发送给每个消费者。
    • 但是,如果第一个消费者处理消息的时间很长,那么其他消费者就会等待,无法继续进行任务处理。
  • 为了解决这个问题,可以引入闲置消费机制。
    • 闲置消费允许消费者在其他消费者正在处理消息时获得闲置消息,从而保证消息的高效处理。

【二】实现

【1】生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密码
# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='dream123',durable=True)

# 【4】发布消息
channel.basic_publish(exchange='',
                      routing_key='dream123', # 消息队列名称
                      body='111',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )

# 【5】关闭连接
connection.close()

【2】消费者

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
# channel.queue_declare(queue='dream123')

# 【4】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 【5】消费消息
#####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
# 将prefetch_count设置为1,表示同时只向一个消费者发送一条消息,当消费者处理完成后,再发送下一条消息
channel.basic_qos(prefetch_count=1) 

channel.basic_consume(queue='dream123',on_message_callback=callback,auto_ack=False)

channel.start_consuming()
  • 通过调用basic_qos(prefetch_count=1)设置了每次向消费者发送一条消息,并定义了一个回调函数(callback)。
    • 在回调函数中,消费者接收到消息后进行处理,并通过basic_ack(delivery_tag=method.delivery_tag)通知服务器消息已被处理。
  • 最后,通过调用basic_consume()方法开始消费消息。