Rabbit MQ 消息批量发送接收

发布时间 2023-06-30 15:37:40作者: FromZeroToOne

主要通过消息合并实现,采用线程池异步发撒

@Configuration
public class BatchMqConfig {

    //测试批量
    public static final String BATCH_QUEUE_NAME="batch.queue";

    @Bean
    public Queue batchQueue(){
        return new Queue(BATCH_QUEUE_NAME);
    }

    @Bean("batchQueueTaskScheduler")
    public TaskScheduler batchQueueTaskScheduler(){
        TaskScheduler taskScheduler=new ThreadPoolTaskScheduler();
        return taskScheduler;
    }

    //批量处理rabbitTemplate
    @Bean("batchQueueRabbitTemplate")
    public BatchingRabbitTemplate batchQueueRabbitTemplate(ConnectionFactory connectionFactory,
                                                           @Qualifier("batchQueueTaskScheduler") TaskScheduler taskScheduler){

        //!!!重点: 所谓批量, 就是spring 将多条message重新组成一条message, 发送到mq, 从mq接受到这条message后,在重新解析成多条message

        //一次批量的数量
        int batchSize=1;
        // 缓存大小限制,单位字节,
        // simpleBatchingStrategy的策略,是判断message数量是否超过batchSize限制或者message的大小是否超过缓存限制,
        // 缓存限制,主要用于限制"组装后的一条消息的大小"
        // 如果主要通过数量来做批量("打包"成一条消息), 缓存设置大点
        // 详细逻辑请看simpleBatchingStrategy#addToBatch()
        int bufferLimit=1024; //1 K
        long timeout=10000;

        BatchingStrategy batchingStrategy=new SimpleBatchingStrategy(batchSize,bufferLimit,timeout);
        return new BatchingRabbitTemplate(connectionFactory,batchingStrategy,taskScheduler);
    }

    @Bean("batchQueueRabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置批量
        factory.setBatchListener(true);
        factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效
        factory.setBatchSize(5);//设置监听器一次批量处理的消息数量
        return factory;
    }
}
kv