Rabbitmq介绍,安装,基于queue实现消费者生产者,基本使用,消息安全,持久化,闲置消费,发布订阅,发布订阅高级routing按关键字,模糊匹配

发布时间 2023-05-06 23:21:02作者: clever-cat

内容详细

Rabbmit介绍

消息队列

中间件概念很大,准确一些叫消息队列中间件

消息队列中间件

使用redis当作消息队列来用,blpop阻塞式弹出,实现队列,先进先出

MQ,消息队列,MessageQueue是什么?

消息队列就是基础数据结构中先进先出(队列)的一种数据机制,类比于生活中,买东西,需要排队,先排队的人先买消费,就是典型的先进先出(队列)

MQ解决的问题

应用解耦

流量削峰

消息分发(发布订阅)

异步消息

IPC进程间通信也可以通过消息队列

Rabbmit安装

windows安装:https://www.rabbitmq.com/install-windows-manual.html

依赖于erlang解释器

rabbmit软件

cemtos

yum -y install erlang
yum -y install rabbitmq-server

docker安装(安装简单便捷)

docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

安装完毕后,访问15672端口可以来到rabbitmq的图像化管理界面(官方提供),手动点点操作

基于queue实现生产者消费者

import threading
import queue

message = queue.Queue(10)


def product(i):
    while True:
        message.put(i)


def consumer(j):
    while True:
        print(message.get())


if __name__ == '__main__':
    t = threading.Thread(target=product, args=[123, ])
    t1 = threading.Thread(target=consumer, args=[11])
    t.start()
    t1.start()
    for i in range(10):
        t = threading.Thread(target=product, args=[i, ])
    
        t.start()
    for j in range(10):
        t1 = threading.Thread(target=consumer, args=[j])
        t1.start()

基本使用

发送者

import pika

###第一步,连接
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.101',port=5672))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))

# 第二步:连接channal
channel = connection.channel()
# 第三步:创建一个队列,名字叫hello
channel.queue_declare(queue='hello')
# 第三步:向hello队列中,发送Hello World
# routing_key 队列名字
#body 发送的内容

"""
queue ,routing_key
如果queue为空或则会创建一个队列
channel.queue_declare(queue='hello')
这个控制向哪个队列发送消息
channel.basic_publish(exchange='', routing_key='hello',
                      body="123")
"""

channel.basic_publish(exchange='', routing_key='hello', body='Hello World4441!')
print("  Sent 'Hello World!'")
connection.close()

消费者

import pika

# 连接
credentials = pika.PlainCredentials("admin", "admin")

connecnt = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
# 假设队列不存在,如果存在不会有任何作用,不存在会创建队列
channle = connecnt.channel()
channle.queue_declare(queue='hello')


# 回调函数

def callback(ch, method, properties, body):
    print(body,'消费正在消费消息')
    # 停止消费循环
    channle.stop_consuming()

# queue从那个队列去取
# on_message_callback获取数据后的回调函数
# auto_ack 自定确认,获取到数据就确认删除,不管是否出错都会删除
channle.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channle.start_consuming()  # 程序会夯在这里,等待从消息队列中取消息

消费安全

消费完,确认后,在删除消息

import pika

# 连接
credentials = pika.PlainCredentials("admin", "admin")

connecnt = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
# 假设队列不存在,如果存在不会有任何作用,不存在会创建队列
channle = connecnt.channel()
channle.queue_declare(queue='hello')


# 回调函数

def callback(ch, method, properties, body):
    print(body, '消费正在消费消息')
    # 停止消费循环
    # channle.stop_consuming()

    # raise Exception(123)
    # 使用完毕,手动确认
    ch.basic_ack(delivery_tag=method.delivery_tag)


# queue从那个队列去取
# on_message_callback获取数据后的回调函数
# auto_ack 自定确认,获取到数据就确认删除,不管是否出错都会删除
channle.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)
channle.start_consuming()  # 程序会夯在这里,等待从消息队列中取消息

image-20230506173911461

image-20230506180204058

消息安全之durable持久化

生产者

import pika

# connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200', port=5672))
# 连接
credentials = pika.PlainCredentials('admin', 'admin')
connect = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))

# 连接channal
channel = connect.channel()
# 创建队列,名字叫hello
# durable声明队列持久化
channel.queue_declare(queue='test',durable=True)
# 向队列中发送HELLO WORLD
# routing_key 队列名字
# body 发送的内容
# exchange交换机名字

# 这个发消息控制队列名
"""
queue ,routing_key
如果queue为空或则会创建一个队列
channel.queue_declare(queue='hello')
这个控制向哪个队列发送消息
channel.basic_publish(exchange='', routing_key='hello',
                      body="123")
"""
# properties=pika.BasicProperties(delivery_mode=2) 消息持久化
channel.basic_publish(exchange='', routing_key='test',
                      body="1231123",
                      properties=pika.BasicProperties(delivery_mode=2)

                      )
print('发送了HELLO')
connect.close()
"""
队列和消息都做持久化,以外的关闭后,在启动消息队列和消息都会在

"""

消费者(消费者不需要修改代码)

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz1')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='lqz1',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

闲置消费

正常情况如果有多个消费者,是按照顺序第一个消息给第一个第二个给第二个,第三个继续给第一个,只有等第三个消费后,第四个才能消费,如果第一个消耗时间比较久,那么后续都要等待

生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='lqz123',durable=True)

channel.basic_publish(exchange='',
                      routing_key='lqz123', # 消息队列名称
                      body='111',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )
connection.close()

消费者(有多个,谁没有任务谁就去执行)

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
# channel.queue_declare(queue='lqz123')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue='lqz123',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

发布订阅

发布者(fanout模式)

import pika

credentials = pika.PlainCredentials("admin", "admin")

connect = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))

chanle = connect.channel()

chanle.exchange_declare(exchange='f1', exchange_type='fanout')

chanle.basic_qos(prefetch_count=1)

chanle.basic_publish(exchange='f1', routing_key='',
                     body='你好啊1111',
                     properties=pika.BasicProperties(delivery_mode=2)

                     )

chanle.close()

订阅者

import pika

credentials = pika.PlainCredentials("admin", "admin")

connect = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))

chanle = connect.channel()

chanle.exchange_declare(exchange='f1', exchange_type='fanout')
result = chanle.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

chanle.queue_bind(exchange='f1', queue=queue_name)


def task(ch, method, properties, body):
    print('消费者接到了任务', body.decode('utf8'))

    ch.basic_ack(delivery_tag=method.delivery_tag)


chanle.basic_consume(queue=queue_name, on_message_callback=task, auto_ack=False)

chanle.start_consuming()
# 消费者断开后队列自动删除

发布订阅高级Routing(按关键字精准匹配)

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m2',exchange_type='direct')

channel.basic_publish(exchange='m2',
                      routing_key='bnb', # 多个关键字,指定routing_key
                      body='lqz nb')

connection.close()

订阅者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bnb')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

发布订阅高级Topic(按关键字模糊匹配)

发布者

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

channel.basic_publish(exchange='f3', routing_key='lqz.hand', body='一个*或#可收到消息',
                      properties=pika.BasicProperties(delivery_mode=2)
                      )
# channel.basic_publish(exchange='m3',
#                       # routing_key='lqz.handsome', #都能收到
#                       routing_key='lqz.handsome.xx', #只有lqz.#能收到
#                       body='lqz nb')

connect.close()

发布者2

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

# channel.basic_publish(exchange='f3', routing_key='lqz.xx.xx', body='一个#可以收到消息',
#                       properties=pika.BasicProperties(delivery_mode=2)
#                       )
channel.basic_publish(exchange='f3', routing_key='zzz.xx', body='一个#可以收到消息',
                      properties=pika.BasicProperties(delivery_mode=2)
                      )

connect.close()

订阅者1

*只能加一个单词例如:lqz.123 lqz.*可以匹配

#可以加任意单词字符例如 lqz.xx.qq.zz lqz.#可以匹配

一个匹配只能使用一种而且只能有一个

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True, durable=True)
queue_name = result.method.queue
channel.queue_bind(queue=queue_name, exchange='f3', routing_key='lqz.xx.*')
channel.queue_bind(queue=queue_name, exchange='f3', routing_key='zzz.*')


# channel.queue_bind(queue=queue_name, exchange='f3', routing_key='lqz.xx.*')
# 一个匹配里面只能使用一个*或一个#
# 可以监听多个


def task(ch, method, pro, body):
    print('*消费消费者', body.decode('utf-8'))


channel.basic_consume(queue=queue_name, on_message_callback=task, auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True, durable=True)
queue_name = result.method.queue
channel.queue_bind(queue=queue_name, exchange='f3', routing_key='lqz.#')

def task(ch, method, pro, body):
    print('#消费消费者', body.decode('utf-8'))


channel.basic_consume(queue=queue_name, on_message_callback=task, auto_ack=True)

channel.start_consuming()