SpringBoot2集成RabbitMQ(注解+回调)

发布时间 2023-09-20 15:47:09作者: 飘杨......

一、概述

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。
整体上看其实就是一个生产者消费者模型。只是这个模型更加抽象及精细化了。
RabbitMQ大体可以分为三层,其中第二层又可以细分为两层:
  1.生产者
  2.Broker
    a.交换机(exchange)
    b.队列
  3.消费者
大体上如下图所示:

   其中P代表生产者、X代表交换机、红色的长条代表队列,C代表消费者。

  运行过程大致描述:

    P生产的消息先放入交换机,交换机通过路由键找到绑定的队列,这样交换机的数据就直接到队列中了。而C作为消费者会监听是否有消息(可以主动去拿,可以被动接受)

    生产消费过程:P->exchange->queue->c

 


 

二、代码示例

  1.在pom.xml中引入RabbitMQ

  <!--        集成rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  2.配置application.yml

  rabbitmq:
    host: rabbitmq主机ip
    port: 5672
    username: 用户名
    password: 密码
    publisher-returns: true #开启发送失败退回
    publisher-confirm-type: correlated

  3.创建配置文件(简单配置)RabbitConfig.java

public class RabbitConfig {
    /**
     * 交换机名称(自定义的,想起什么就起身名字)
     */
    public static final String EXCHANGE = "topic.exchange";
    /**
     * 队列名称(自定义的,想起什么就起身名字)
     */
    public static final String QUEUE_A = "topic_test_queue";

    /**
     * 路由键(自定义的,想起什么就起身名字)
     */
    public static final String ROUTINGKEY_A = "key.#";
}

  4.编写生产者

/**
 * 消息生产者
 *
 * @author Tony
 * @version 2023
 * @date 2023/9/20 11:40
 */

@Slf4j
@Component
public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback {
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 发送消息
     *
     * @param content 消息内容
     */
    public void sendMessage(String content) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(
                RabbitConfig.EXCHANGE,//交换机
                RabbitConfig.ROUTINGKEY_A,//路由键
                content,
                correlationData
        );

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("接收到了RabbitMQ的回调id:{}", correlationData);
        if (ack) {
            log.info("消息成功消费");
        } else {
            log.info("消息消费失败:{}", cause);
        }
    }
}

  5.编写消费者

/**
 * 队列消费者
 *
 * @author Tony
 * @version 2023
 * @date 2023/9/20 11:41
 */
@Slf4j
@Component
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value =RabbitConfig.EXCHANGE,durable = "false",type = "topic"),//指明交换机及交换机的类型以及是否持久化
        value = @Queue(value = RabbitConfig.QUEUE_A,durable = "false"),//指明交换机绑定的队列
        key = RabbitConfig.ROUTINGKEY_A))//指明交换机和队列之间的桥梁路由键
public class RabbitMqConsumer {

    /**
     * 接收String类型的消息
     * @param message
     */
    @RabbitHandler
    public void onStringHandle(String message) {
        log.info("RabbitMQ=>这是String类型的消息:{}",message);
    }

    /**
     * 接收Byte数组类型的消息
     * @param message
     */
    @RabbitHandler
    public void onByteHandle(byte[] message) {
        log.info("RabbitMQ=>这是byte[]类型的消息:{}",message);
    }
}

  6.编写一个RabbitMQController.java进行发送消息的测试

@RestController
@RequestMapping("/api/v1/pub/mq/")
public class RabbitMqController {
    @Autowired
    RabbitMqProducer rabbitMqProducer;

    @GetMapping("send")
    public String sendMsg() {
        String msg = "" + new Random().nextInt(1000) + "个消息," + UUID.randomUUID().toString();
        rabbitMqProducer.sendMessage(msg);
        return msg;

    }
}

  7.运行效果