RabbitMq

发布时间 2023-11-09 21:41:17作者: fly_birdY

为啥要用rabbitmq

1,松耦合结构(解耦,异步处理,缓冲能力,伸缩性,扩展性)

2,性能是万级的

rabbitmq的生产者:
rabbitmq的生产者如何保证数据安全问题:

1,发送者确认

2,失败者通知
rabbitmq的消费者
rabbitmq的消费者如何保证数据安全问题:

1,手动消费确认
rabbitmq与springboot的集成:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

package touchbase.infrastructure.common.config.rabbitmq;


import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 1,使用顺序
*/
@Configuration
public class RabbitMqConfig {
@Value("${touchbase.rabbitmq.host}")
private String host;
@Value("${touchbase.rabbitmq.port}")
private String port;
@Value("${touchbase.rabbitmq.username}")
private String username;
@Value("${touchbase.rabbitmq.password}")
private String password;
@Value("${touchbase.rabbitmq.virtual-host}")
private String virtualhost;

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
}

生产者:

package touchbase.infrastructure.common.config.rabbitmq;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitMqProducerConfig {

@Autowired
private ConnectionFactory connectionFactory;

@PostConstruct
public void init() {
// 如果交换器,队列未创建,可以通过RabbitAdmin进行创建
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
DirectExchange directExchange = new DirectExchange("cmpn.strategy");
rabbitAdmin.declareExchange(directExchange);

Queue weChatQueue = new Queue("touch.channel.wechatassis");
rabbitAdmin.declareQueue(weChatQueue);
rabbitAdmin.declareBinding(BindingBuilder.bind(weChatQueue).to(directExchange)
.with("touch.channel.wechatassis"));
}

//使用template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//失败通知(在不可路由的情况下)
template.setMandatory(true);
//发送方确认
template.setConfirmCallback(confirmCallback());
//失败回调
template.setReturnCallback(returnCallback());
return template;
}

/*使用了RabbitMq系统的缺省的交换器(direct交换器:完全匹配)*/
//申明队列
@Bean
public Queue weChatAssisQueue() {
return new Queue("touch.channel.wechatassis");
}

/*生产者发送确认*/
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("发送者确认发送给mq成功");
} else {
//处理失败的消息
System.out.println("发送者发送给给mq失败,考虑重发:" + cause);
}

}
};
}

/*失败者通知*/
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey) {
System.out.println("无法路由的消息,需要考虑另外处理");
System.out.println("returned replyText:" + replyText);
System.out.println("returned exchange:" + exchange);
System.out.println("returned routingKey:" + routingKey);
String msgJson = new String(message.getBody());
System.out.println("Returned Message:" + msgJson);
}
};
}
}

消费者:
package touchbase.infrastructure.common.config.rabbitmq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConsumerConfig {

@Autowired
private ConnectionFactory connectionFactory;

/**
* @Description: SimpleMessageListenerContainer的工厂類
* @Author rty
* @Date: 2023/10/21 23:42
* @Version 1.0
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置当前的消费者数量
factory.setConcurrentConsumers(1);
//设置当前的消费者数量上限
factory.setMaxConcurrentConsumers(5);
//设置是否重回队列
factory.setDefaultRequeueRejected(true);
return factory;
}
}

package touchbase.adapter.message;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import touchbase.application.dto.common.FlowDataDTO;
import touchbase.application.service.channeltask.ChannelTaskService;

import java.io.IOException;
import java.util.List;

/**
* @Description: 渠道任务的消费
* @Author rty
* @Date: 2023/10/21 15:36
* @Version 1.0
*/
@Component
@Slf4j
public class ChannelTaskConsumer {

public static final String WECHATASSIS = "WECHATASSIS";

@Autowired
private ChannelTaskService channelTaskService;

@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory", queues = "touch.channel.wechatassis")
@RabbitHandler
public void dealWeChatChannelTaskConsumer(List<Message> messages, Channel channel) throws IOException {
long deliveryTag = 0l;
try {
for (Message message : messages) {
String flowDataString = new String(message.getBody());
log.info("企微专员任务流转", flowDataString);
deliveryTag = message.getMessageProperties().getDeliveryTag();
channelTaskService.dealChannelChannelTask(JSONObject.parseArray(flowDataString, FlowDataDTO.class),
WECHATASSIS);
}
} catch (Exception e) {
log.error("企微专员任务流转失败,{}", e);
//basicReject一次只能拒绝接收一个消息,basicNack方法可以支持一次0个或多个消息的拒收
} finally {
channel.basicAck(deliveryTag, false);
}
}
}