主要通过消息合并实现,采用线程池异步发撒
@Configuration
public class BatchMqConfig {
//测试批量
public static final String BATCH_QUEUE_NAME="batch.queue";
@Bean
public Queue batchQueue(){
return new Queue(BATCH_QUEUE_NAME);
}
@Bean("batchQueueTaskScheduler")
public TaskScheduler batchQueueTaskScheduler(){
TaskScheduler taskScheduler=new ThreadPoolTaskScheduler();
return taskScheduler;
}
//批量处理rabbitTemplate
@Bean("batchQueueRabbitTemplate")
public BatchingRabbitTemplate batchQueueRabbitTemplate(ConnectionFactory connectionFactory,
@Qualifier("batchQueueTaskScheduler") TaskScheduler taskScheduler){
//!!!重点: 所谓批量, 就是spring 将多条message重新组成一条message, 发送到mq, 从mq接受到这条message后,在重新解析成多条message
//一次批量的数量
int batchSize=1;
// 缓存大小限制,单位字节,
// simpleBatchingStrategy的策略,是判断message数量是否超过batchSize限制或者message的大小是否超过缓存限制,
// 缓存限制,主要用于限制"组装后的一条消息的大小"
// 如果主要通过数量来做批量("打包"成一条消息), 缓存设置大点
// 详细逻辑请看simpleBatchingStrategy#addToBatch()
int bufferLimit=1024; //1 K
long timeout=10000;
BatchingStrategy batchingStrategy=new SimpleBatchingStrategy(batchSize,bufferLimit,timeout);
return new BatchingRabbitTemplate(connectionFactory,batchingStrategy,taskScheduler);
}
@Bean("batchQueueRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置批量
factory.setBatchListener(true);
factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效
factory.setBatchSize(5);//设置监听器一次批量处理的消息数量
return factory;
}
}
kv