4.消息的性质和集群

发布时间 2023-06-04 17:49:28作者: 22-10-21

7.消息的性质

7.1.消息可靠性

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的

如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。

img

① 代表消息从生产者发送到Exchange;

② 代表消息从Exchange路由到Queue;

③ 代表消息在Queue中存储;

④ 代表消费者监听Queue并消费消息;

7.1.1.确保消息发送到交换机

可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;

有两种解决方案:

第一种是开启Confirm(确认)模式;(异步)

第二种是开启Transaction(事务)模式;(性能低,实际项目中很少用)

7.1.2.确保消息路由到正确的队列

可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。

有两种解决方案:

第一种是使用return模式,可以实现消息无法路由的时候返回给生产者;

第二种是使用备份交换机(alternate-exchange),将路由的消息会发送到这个备用交换机上;

这两个方案是二选一

7.1.3.确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

解决方案:

1.队列持久化

QueueBuilder.durable(QUEUE).build();

2.交换机持久化

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

3.消息持久化

MessageProperties messageProperties = new MessageProperties();
//设置消息持久化,默认是持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 

4.集群,镜像队列,高可用

7.1.4.确保消息从队列正确地投递到消费

采用消息消费时的手动ack确认机制来保证;

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

#开启手动ack消息消费确认
spring:
	rabbitmq:
		listener:
			simple:
				acknowledge-mode=manual

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;

如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

7.2.消息幂等性

消息消费时的幂等性(消息不被重复消费)

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了

幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响

接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;

比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;

避免消息的重复消费问题:全局唯一ID + Redis

生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

8.集群

RabbitMQ的集群分两种模式:默认集群模式镜像集群模式

在RabbitMQ集群中所有的节点(一个节点就是一个RabbitMQ的broker服务器)被归为两类:磁盘节点内存节点

磁盘节点:会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中

内存节点:只会将这些信息保存到内存中,如果该节点宕机或重启

内存节点的数据会全部丢失,而磁盘节点的数据不会丢失

8.1.默认集群模式

默认集群模式也叫普通集群模式内置集群模式

RabbitMQ默认集群模式,只会把交换机、队列、虚拟主机等元数据信息在各个节点同步,而具体队列中的消息内容不会在各个节点中同步

元数据

  1. 队列元数据:队列名称和属性(是否可持久化,是否自动删除)
  2. 交换器元数据:交换器名称、类型和属性
  3. 绑定元数据:交换器和队列的绑定列表
  4. vhost元数据:vhost内的相关属性,如安全属性等;

img

当用户访问其中任何一个RabbitMQ节点时,查询到的queue/user/exchange/vhost等信息都是相同的;

但集群中队列的具体信息数据只在队列的拥有者节点保存,其他节点只知道队列的元数据和指向该节点的指针,所以其他节点接收到不属于该节点队列的消息时会将该消息传递给该队列的拥有者节点上;

8.2.搭建默认集群

先准备好三台RabbitMQ,并且都已经启动

8.2.1.构建集群

让第二和第三台mq加入第一台mq中

rabbitmqctl stop_app # 只停止当前mq
rabbitmqctl reset	#重启mq
rabbitmqctl join_cluster 第一台mq主机名 --ram #加入第一台mq,--ram:表示让第一台mq成为一个内存节点,如果不带参数默认为disk磁盘节点;
rabbitmqctl start_app #启动当前mq

8.2.2.添加权限

在任意一台mq中执行

#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#设置角色
rabbitmqctl set_user_tags admin administrator
#启动web控制台插件
rabbitmq-plugins enable rabbitmq_management 

8.2.3.查看集群状态

rabbitmqctl cluster_status

8.2.4.连接集群

spring:
  #配置rabbitmq
  rabbitmq:
  #host:只能连接单个mq
  #addresses:可以连接多个
  #连接集群,使用逗号分隔
    addresses: 第一台mq的ip地址:端口,第二台mq的ip地址:端口,第三台mq的ip地址:端口
    username: admin
    password: 123456
    virtual-host: chenchen

8.3.镜像集群模式

镜像模式是基于默认集群模式加上一定的配置得来的

镜像模式是把所有的队列数据完全同步,包括元数据信息和消息数据信息,当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模式

8.4.搭建镜像集群

它是在普通集群模式基础之上搭建而成的;

镜像队列配置命令

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
  1. -p Vhost:可选参数,针对指定vhost下的queue进行设置
  2. Name::policy的名称
  3. Pattern::queue的匹配模式(正则表达式)
  4. Definition:镜像定义,包括三个部分ha-mode,、ha-params ha-sync-mode;(json格式)
    1. {“ha-mode”:”exactly”,”ha-params”:2}
    2. ha-mode:指明镜像队列的模式,有效值为all/exactly/nodes
      1. all:表示在集群中所有的节点上进行镜像
      2. exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      3. nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
    3. ha-paramsha-mode模式需要用到的参数
    4. ha-sync-mode:进行队列中消息的同步方式,有效值为automaticmanual
      1. automatic:自动
      2. manual:手动
  5. priority:可选参数,policy的优先级

例1:配置所有名字开头为policy_的队列进行镜像,镜像数量为2(在任意节点执行如下命令)

rabbitmqctl set_policy -p chenchen test 1"^policy_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

  • 虚拟主机为:chenchen
  • 名字为:test1
  • 匹配的队列:以policy_开头
  • 镜像定义:指定以自动的方式同步消息,在两个节点上进行镜像

例2:所有节点所有队列上进行镜像(在任意节点执行如下命令)

rabbitmqctl set_policy test2 "^" '{"ha-mode":"all"}'

例3:针对某个虚拟主机进行镜像

rabbitmqctl set_policy -p chenchen test3 "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'