Rabbitmq

发布时间 2024-01-11 19:03:11作者: Meeeoww

一 消息队列介绍

1.1 介绍

消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”

1.2 MQ解决什么问题

MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。

应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。

当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性

流量削峰

举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

消息分发

多个服务队数据感兴趣,只需要监听同一类消息即可处理。

例如A产生数据,B对数据感兴趣。如果没有消息的队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码。很不方便。

有了消息队列后,A只管发送一次消息,B对消息感兴趣,只需要监听消息。C感兴趣,C也去监听消息。A服务作为基础服务完全不需要有改动

异步消息

有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅

使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。

这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息

1.3 常见消息队列及比较

# rabbitmq和kafka
-编程语言不同:erlang,java
-对客户端支持都一样,都支持
-处理数据能力(吞吐量):rabbitmq低于kafak(处理大数据类的性能高十万级)
-可靠性:rabbitmq更高
-图形化界面:rabbitmq有自己的管理界面,kafak第三方的

结论:

Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka

# rabbitmq  消息模式
	- 普通模式:生产者消费者
    	-消息确认
        -持久化 :queue,消息
    - worker模式: 生产者 多个消费者--》轮询
    	-闲置消费
        
    -发布订阅之 fanout模式
    	-发布者---》所有订阅者都能收到
    -发布订阅之 direct模式
    	-消费者只消费某一类消息
        -发布指定: routing_key
        -接收
    -发布订阅之 topic模式   模糊匹配模式
    	###  # 表示任意字符
        ###  * 表示一个单词
    	
        
 # 交换机的四种类型
	-topic
    -fanout
    -direct
    -headers

二 Rabbitmq安装

官网:https://www.rabbitmq.com/getstarted.html

2.1 服务端原生安装

# 1 centos 安装
# 安装配置epel源
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server

# 2 win 安装
	-官网先下载erlang,版本跟rabbimq版本有对应关系
	-rabbimq 官网下载安装包:https://github.com/rabbitmq/rabbitmq-server/releases
 
# 创建用户
上面两种方式启动后,会启动一个服务。访问本地的15672端口,也会打开它的图形化界面。但是它上面没有用户,需要自己创建用户,使用命令来创建用户。
rabbitmqctl add_user lqz 123
# 设置用户为administrator角色
rabbitmqctl set_user_tags lqz administrator
# 设置权限
rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"
# 然后重启rabbiMQ服务
systemctl reatart rabbitmq-server
# 然后可以使用刚才的用户远程连接rabbitmq server了

2.2 服务端Docker安装

# docker拉取镜像
docker pull rabbitmq:management
# 启动成容器
# -e加环境变量:设置用户名和密码都为admin
# -p端口映射;15672 端口是web界面的端口(它自己的图形化界面),5672端口是rabbitmq服务端口
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management


# web 管理页面的端口是 15672
# 服务端口是5672,使用python连接时候,指定这个端口。此时python就是一个客户端,rabbitmq是服务端


上面都是运维来搭建的,我们要做的是,服务已经搭建好了,我们要连接rabbitmq服务,使用它

2.3 客户端安装

pip3 install pika

三 基于Queue实现生产者消费者模型

import Queue # 线程队列
import threading

# 生成一个队列
message = Queue.Queue(10)

def producer(i):
    while True:
        # 往队列中放值
        message.put(i)

def consumer(i):
    while True:
        # 从队列中取值
        msg = message.get()

for i in range(12):# 12个生产者
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):# 10个消费者
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

四 基本使用(生产者消费者模型)

img

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

安装模块:pip3 install pika

生产者


# pip install pika

import pika

#### 连接 rabbitmq 开始####
### 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101','5672'))

## 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101','5672',credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为hello
channel = connection.channel()
channel.queue_declare(queue='hello')

# 往管道中发布消息
channel.basic_publish(exchange='',routing_key='hello',body='xxx')
print(" 生产者放入队列中: 'Hello World!'")

connection.close()

消费者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(f" 收到了: {body}")


# 会从 hello 队列中取数据消费,执行 callback函数
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

五 消息安全之ack

# 1 消费者消费了消息后,如果不给消息队列,发送消费完成的通知,消息处于unacked状态,只要消费者关闭,消息会被重新放回队列中---》准备好的消息中

# 2 消费者给队列回复ack(不好,有可能消息没有被消费成功,消息队列就删了)
	-方式一:auto_ack=True
        channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
        
     -方式二:正常消费完成,再通知队列,让它删除
    	def callback(ch, method, properties, body):
            print(f" 收到了: {body}")
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
 # 3 一个生产者,两个消费者案例

生产者


# pip install pika

import pika

#### 连接 rabbitmq 开始####
### 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101','5672'))

## 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101','5672',credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为hello
channel = connection.channel()
channel.queue_declare(queue='hello')

# 往管道中发布消息
channel.basic_publish(exchange='',routing_key='hello',body='xxx')
print(" 生产者放入队列中: 'Hello World!'")

connection.close()

消费者1

import pika

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为hello
channel = connection.channel()
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(f" 收到了: {body}")
    # 假设没错误,正常顺利执行,消息被处理完成,给确认
    raise Exception('消费过程中出异常了')
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 会从 hello 队列中取数据,消费---》执行 callback函数
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

消费者2

import pika

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为hello
channel = connection.channel()
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(f" 收到了: {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 会从 hello 队列中取数据,消费---》执行 callback函数
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

六 消息安全之durable持久化

如果rabbitmq服务停了,队列因为没有做持久化数据就会丢失,对队列和消息都要做持久化队列和数据都不会丢失

生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='lqz1',durable=True)

channel.basic_publish(exchange='',
                      routing_key='lqz1', # 消息队列名称
                      body='lqz is handsome',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )
connection.close()

消费者

import pika

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为lqz1
channel = connection.channel()
channel.queue_declare(queue='lqz1', durable=True)


def callback(ch, method, properties, body):
    print(f" 收到了: {body}")
    # 假设没错误,正常顺利执行,消息被处理完成,给确认
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 会从 lqz1 队列中取数据,消费---》执行 callback函数
channel.basic_consume(queue='lqz1', on_message_callback=callback, auto_ack=False)

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

七 闲置消费

正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者,
但是可能第一个消息的消费者处理消息很耗时,一直没结束,就可以让第二个消费者优先获得闲置的消息.
channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来

生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='lqz1',durable=True)

channel.basic_publish(exchange='',
                      routing_key='lqz1', # 消息队列名称
                      body='lqz is handsome',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )


connection.close()

消费者1

import pika
import time

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为lqz1
channel = connection.channel()
channel.queue_declare(queue='lqz1', durable=True)


def callback(ch, method, properties, body):
    print(f" 收到了: {body}")
    # 假设没错误,正常顺利执行,消息被处理完成,给确认
    time.sleep(10)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
# 会从 lqz1队列中取数据,消费---》执行 callback函数
channel.basic_consume(queue='lqz1', on_message_callback=callback, auto_ack=False)

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

消费者2

import time

import pika

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####

# 连接到管道上,给管道命名 为lqz1
channel = connection.channel()
channel.queue_declare(queue='lqz1', durable=True)


def callback(ch, method, properties, body):
    print(f" 收到了: {body}")
    # 假设没错误,正常顺利执行,消息被处理完成,给确认
    time.sleep(2)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
# 会从 lqz1队列中取数据,消费---》执行 callback函数
channel.basic_consume(queue='lqz1', on_message_callback=callback, auto_ack=False)

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

八 发布订阅-fanout模式

img

发布订阅是extange(路由)生产者生产一个消息,如果有两个消费者,那么这两个消费者都可以收到同样的这个消息

# 使用
### 发布者
# 声明一个路由exchange叫m1,路由模式是fanout模式
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 发布消息
channel.basic_publish(exchange='m1',
                      routing_key='',  # 不需要写发给哪个队列
                      body='lqz nb')


### 订阅者
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 随机生成一个队列,会自动生成一个随即名字的队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True)


# 应用场景:
例如:中国气象局提供“天气预报”送入交换机,网易、新浪、百度、搜狐等门户接入通过队列绑定到该交换机,自动获取气象局推送的气象数据。
例如:买了视频网站的会员,只要网站上映新的电影,就会给你发送通知。

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))
channel = connection.channel()

# 声明一个交换机 exchange 为m1,   exchange_type 路由模式为fanout
channel.exchange_declare(exchange='m1',exchange_type='fanout')


channel.basic_publish(exchange='m1',
                      routing_key='',
                      body='lqz asss')


connection.close()

发布者订阅者模式

订阅者

  • 开启几个py文件,就是几个订阅者,就会生成几个队列
import pika
import time

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####


channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

订阅者2

import pika
import time

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####


channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout')

result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()


九 发布订阅高级之Routing(按关键字匹配)- direct模式

img

# 按关键字匹配,direct

# 生产者
# 声明一个exchange叫m2,exchange_type是'direct'
channel.exchange_declare(exchange='m2',exchange_type='direct')
channel.basic_publish(exchange='m2',
                      routing_key='nb',  # 要指定消费者需要监听的key是什么
                      body='lqz nb')

# 消费者
# 让exchange和queque进行绑定,绑定中写关键字参数routing_key
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')

# 应用场景:
对于不同级别日志来说,对于 error 级别的日志信息可能是我们需要特别关注的,会被单单独的消费者进行处理,此时交换机分发消息是有条件的进行分发,这个就是根据 Routing Key 进行不同的消息分发。

路由模式是一种精准的匹配,只有设置了 Routing Key 消息才能进行分发。

路由匹配应用:不同日志级别

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))
channel = connection.channel()

# 声明一个交换机 exchange    exchange_type 路由模式  直连
channel.exchange_declare(exchange='m2',exchange_type='direct')


channel.basic_publish(exchange='m2',
                      routing_key='nb',# 多个关键字,指定routing_key
                      body='lqz nb nbsdfsdfdfasdf')


connection.close()

订阅者1

import pika
import time

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####


channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m2',exchange_type='direct')

result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bnb')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

订阅者2

import pika
import time

#### 连接 rabbitmq 开始####
## 有密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', '5672', credentials=credentials))
#### 连接 rabbitmq  结束####


channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m2',exchange_type='direct')

result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

# 程序卡在这,消费者会一直监听队列
channel.start_consuming()

九 发布订阅高级之Topic(按关键字模糊匹配)

img

## 发布者
channel.exchange_declare(exchange='m3',exchange_type='topic')
# 发布消息
channel.basic_publish(exchange='m3',
                      # routing_key='lqz.nb',  # lqz.#,lqz.* 两个都能匹配
                      routing_key='lqz.nb.bb',  # 这个只有lqz.#的消费者可以匹配
                      body='lqz bnb')


# 订阅者
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.#')

"""
*只能加一个单词
#可以加任意单词字符
"""

# 应用场景

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3',exchange_type='topic')

channel.basic_publish(exchange='m3',
                      # routing_key='lqz.handsome', #都能收到
                      routing_key='lqz.handsome.xx', #只有lqz.#能收到
                      body='lqz nb')

connection.close()

订阅者1

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.#')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='topic' , 模糊匹配
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.*')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

rpc介绍

https://www.cnblogs.com/liuqingzheng/p/16271923.html
    
# RPC (Remote Procedure Call)是指远程过程调用,也就是说两台服务器 A,B 一个应用部署在 A 服务器上,想要调用 B 服务器上应用提供的函数或方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据

# 用来做服务间通信的
	-方式一: 使用 restful调用 同步调用
    -方式二:借助于消息队列 异步通信(微服务案例就是这种方案)
    -方式三:rpc通信:远程过程调用
    
# 为什么要用 RPC?
就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如比如不同的系统间的通讯,甚至不同的组织间的通讯。由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用


# 主流rpc框架
	grpc:跨语言---》python调用go服务   https://zhuanlan.zhihu.com/p/425725192
    			   go调用python服务
    dubbo:java用的多

功能 Hessian Montan rpcx gRPC Thrift Dubbo Dubbox Spring Cloud
开发语言 跨语言 Java Go 跨语言 跨语言 Java Java Java
分布式(服务治理) × × ×
多序列化框架支持 hessian √(支持Hessian2、Json,可扩展) × 只支持protobuf) ×(thrift格式)
多种注册中心 × × ×
管理中心 × × ×
跨编程语言 ×(支持php client和C server) × × × ×
支持REST × × × × × ×
关注度
上手难度
运维成本
开源机构 Caucho Weibo Apache Google Apache Alibaba Dangdang Apache

python实现rpc

# SimpleXMLRPCServer 自带的
# ZeroRPC

内置的

from xmlrpc.server import SimpleXMLRPCServer


# 通信使用xml格式
class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()

    def add(self, a, b):
        print('来了')
        return a + b


# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()

import time
from xmlrpc.client import ServerProxy


# SimpleXMLRPCServer
def xmlrpc_client():
    print('xmlrpc client')
    c = ServerProxy('http://localhost:4242')

    res = c.add(3, 4)
    print(res)


if __name__ == '__main__':
    xmlrpc_client()

zeroRpc

import zerorpc


class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)

    def add(self, a, b):
        print(a, b)
        return a + b + 10


# zerorpc
s = zerorpc.Server(RPCServer())
s.bind('tcp://0.0.0.0:4243')
s.run()

import zerorpc
import time
# zerorpc
def zerorpc_client():
    print('zerorpc client')
    c = zerorpc.Client()
    c.connect('tcp://127.0.0.1:4243')
    res=c.add(2,3)
    print(res)


if __name__ == '__main__':
    zerorpc_client()

十 基于rabbitmq实现rpc

服务端

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))
channel = connection.channel()

# 声明一个队列rpc_queue
channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

客户端

import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("admin", "admin")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', credentials=credentials))
        self.channel = self.connection.channel()

        # 随机生成一个消息队列(用于接收结果)
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 客户端 给 服务端 发送一个任务:  任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',  # 服务端接收任务的队列名称
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,  # 用于接收结果的队列
                                       correlation_id=self.corr_id,  # 任务ID
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return self.response


fibonacci_rpc = FibonacciRpcClient()

response = fibonacci_rpc.call(9)
print('返回结果:', response)