RabbitMQ发布确认及备份交换机

发布时间 2024-01-11 19:07:23作者: 雨中遐想

RabbitMQ发布确认及备份交换机

可以通过设置RabbitMQ的发布确认和失败回退功能来确认消息是否成功发布。

也可以为交换机设置备份交换机,来接收不可路由的消息。

demo结构

配置及实现

application.yml

server:
  port: 8080

spring:
  rabbitmq:
    host: rabbitmq
    port: 5672
    username: admin
    password: admin
    publisher-confirm-type: correlated #设置发布者确认,调用RabbitTemplate.ConfirmCallback.confirm方法
    publisher-returns: true  # 交换机无法路由消息时回退给生产者,调用RabbitTemplate.ReturnCallback.returnedMessage方法

交换机及队列配置

package com.zjw.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 发布确认SpringBoot
 */
@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    /**
     * 创建发布确认交换机
     * @return 交换机
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        ExchangeBuilder builder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)
                //设置交换机的备份交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return builder.build();
    }

    /**
     * 创建备份交换机
     * @return
    交换机
     */
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    /**
     * 创建发布确认队列
     * @return 队列
     */
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /**
     * 创建备份队列
     * @return 队列
     */
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    /**
     * 创建告警队列
     * @return 队列
     */
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    /**
     * 绑定发布确认队列和交换机
     * @param queue 队列
     * @param exchange 交换机
     * @return 绑定关系
     */
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

    /**
     * 绑定备份队列和备份交换机
     * @param queue 队列
     * @param exchange 交换机
     * @return 绑定关系
     */
    @Bean
    public Binding backupBinding(@Qualifier("backupQueue") Queue queue,
                                @Qualifier("backupExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    /**
     * 绑定告警队列和备份交换机
     * @param queue 队列
     * @param exchange 交换机
     * @return 绑定关系
     */
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
}

生产者配置

package com.zjw.controller;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 发布确认生产者
 */
@AllArgsConstructor
@RestController
@RequestMapping("/confirm")
@Slf4j
public class SendConfirmMsgController {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    private final RabbitTemplate rabbitTemplate;


    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //指定消息id为1
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message+routingKey, correlationData1);

        CorrelationData correlationData2 = new CorrelationData("2");
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message+routingKey, correlationData2);

        log.info("发送消息内容:{}", message);
    }
}

这里我使用了不同的routingKey来实现将消息发送到不同的队列及备份交换机中。

交换机回调方法

package com.zjw.config;

import jakarta.annotation.PostConstruct;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 交换机的回调接口
 */
@AllArgsConstructor
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机不管是否收到消息的一个回调方法。
     * @param correlationData  回调的关联数据。
     * @param ack ACK 为 true,nack 为 false
     * @param cause 可选原因,用于 nack,如果可用,否则为 null。
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机收到id为{}的消息",id);
        } else {
            log.info("交换机未确认收到id为{}的消息,由于原因{}", id, cause);
        }
    }

    /**
     * Returned message callback.
     * 如果通过延迟队列发送的消息,由于消息是在延迟交换机中,还没有到达延迟队列,也会被交换机调用退回方法,不过等待消息到达延迟时间后会再发送到队列中
     * 如果交换机是发布确认,但是设置了备份交换机,消息会被发送到备份交换机中,不会被退回。如果没有备份交换机,消息会被退回。
     * @param returned the returned message and metadata.
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.warn("消息:{},被交换机退回:{},退回原因:{},路由Key:{}",
                new String(returned.getMessage().getBody()),
                returned.getExchange(),
                returned.getReplyText(),
                returned.getRoutingKey());
    }
}

这里有个问题,由于rabbitTemplate设置的是单例的,如果向其他交换机发送消息的时候也会被调用到回调方法,使用的时候需要注意下。

消费者

我这里开启了两个消费者,一个用来消费confirm.queue,一个用来消费warning.queue

package com.zjw.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 确认队列消费者,消费确认队列消息
 */
@Component
@Slf4j
public class ConfirmConsumer {

    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message){
        String msg = new String(message.getBody());
        log.info("接受到队列confirm.queue消息:{}", msg);
    }
}
package com.zjw.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 告警消费者,消费告警队列消息
 */
@Component
@Slf4j
public class WarningConsumer {

    public static final String WARNING_QUEUE_NAME = "warning.queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveMsg(Message message){
        String msg = new String(message.getBody());
        log.info("告警消费者到队列warning.queue消息:{}", msg);
    }
}

测试

通过浏览器访问 http://localhost:8080/confirm/sendMessage/hello ,发送消息

生产者日志

2024-01-11T18:57:19.643+08:00  INFO 19184 --- [io-8080-exec-10] c.z.controller.SendConfirmMsgController  : 发送消息内容:hello
2024-01-11T18:57:19.652+08:00  INFO 19184 --- [ectionFactory19] com.zjw.config.MyCallBack                : 交换机收到id为2的消息
2024-01-11T18:57:19.652+08:00  INFO 19184 --- [ectionFactory18] com.zjw.config.MyCallBack                : 交换机收到id为1的消息

发现调用了confirm的回调方法。

至于returnedMessage回调为什么没有被调用,即使没有key2routingKey,是因为我这里设置了备份的交换机,消息被发送到了备份的交换机backup.exchange,进而被送到了backup.queuewarning.queue队列。

消费者日志

2024-01-11T18:57:19.647+08:00  INFO 3368 --- [ntContainer#0-1] com.zjw.consumer.ConfirmConsumer         : 接受到队列confirm.queue消息:hellokey1
2024-01-11T18:57:19.647+08:00  INFO 3368 --- [ntContainer#4-1] com.zjw.consumer.WarningConsumer         : 告警消费者到队列warning.queue消息:hellokey2

可以看到消息1进入了confirm.queue,消息2进入了warning.queue。也可以通过管理界面看到backup.queue也有我们的消息2.