一、概述
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。
整体上看其实就是一个生产者消费者模型。只是这个模型更加抽象及精细化了。
RabbitMQ大体可以分为三层,其中第二层又可以细分为两层:
1.生产者
2.Broker
a.交换机(exchange)
b.队列
3.消费者
大体上如下图所示:
其中P代表生产者、X代表交换机、红色的长条代表队列,C代表消费者。
运行过程大致描述:
P生产的消息先放入交换机,交换机通过路由键找到绑定的队列,这样交换机的数据就直接到队列中了。而C作为消费者会监听是否有消息(可以主动去拿,可以被动接受)
生产消费过程:P->exchange->queue->c
二、代码示例
1.在pom.xml中引入RabbitMQ
<!-- 集成rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置application.yml
rabbitmq: host: rabbitmq主机ip port: 5672 username: 用户名 password: 密码 publisher-returns: true #开启发送失败退回 publisher-confirm-type: correlated
3.创建配置文件(简单配置)RabbitConfig.java
public class RabbitConfig { /** * 交换机名称(自定义的,想起什么就起身名字) */ public static final String EXCHANGE = "topic.exchange"; /** * 队列名称(自定义的,想起什么就起身名字) */ public static final String QUEUE_A = "topic_test_queue"; /** * 路由键(自定义的,想起什么就起身名字) */ public static final String ROUTINGKEY_A = "key.#"; }
4.编写生产者
/** * 消息生产者 * * @author Tony * @version 2023 * @date 2023/9/20 11:40 */ @Slf4j @Component public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; @Autowired public RabbitMqProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } /** * 发送消息 * * @param content 消息内容 */ public void sendMessage(String content) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend( RabbitConfig.EXCHANGE,//交换机 RabbitConfig.ROUTINGKEY_A,//路由键 content, correlationData ); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("接收到了RabbitMQ的回调id:{}", correlationData); if (ack) { log.info("消息成功消费"); } else { log.info("消息消费失败:{}", cause); } } }
5.编写消费者
/** * 队列消费者 * * @author Tony * @version 2023 * @date 2023/9/20 11:41 */ @Slf4j @Component @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value =RabbitConfig.EXCHANGE,durable = "false",type = "topic"),//指明交换机及交换机的类型以及是否持久化 value = @Queue(value = RabbitConfig.QUEUE_A,durable = "false"),//指明交换机绑定的队列 key = RabbitConfig.ROUTINGKEY_A))//指明交换机和队列之间的桥梁路由键 public class RabbitMqConsumer { /** * 接收String类型的消息 * @param message */ @RabbitHandler public void onStringHandle(String message) { log.info("RabbitMQ=>这是String类型的消息:{}",message); } /** * 接收Byte数组类型的消息 * @param message */ @RabbitHandler public void onByteHandle(byte[] message) { log.info("RabbitMQ=>这是byte[]类型的消息:{}",message); } }
6.编写一个RabbitMQController.java进行发送消息的测试
@RestController @RequestMapping("/api/v1/pub/mq/") public class RabbitMqController { @Autowired RabbitMqProducer rabbitMqProducer; @GetMapping("send") public String sendMsg() { String msg = "第" + new Random().nextInt(1000) + "个消息," + UUID.randomUUID().toString(); rabbitMqProducer.sendMessage(msg); return msg; } }
7.运行效果
- 注解 SpringBoot2 SpringBoot RabbitMQ注解springboot2 springboot rabbitmq 注解springboot2 springboot mapper springboot2 springboot2 springboot springboot2 springboot swagger3 swagger springboot2 springboot mybatis springboot2 springboot lettuce redis springboot2 springboot spring3 spring springboot2 springboot后台 管理系统 springboot2 springcache springboot