Redis基于Stream实现消息队列

发布时间 2023-06-16 00:11:36作者: 不忘初心2021

先上效果图

 需要使用redis5.0以上版本,使用了redis5.0新增的数据类型Stream,使用block表示阻塞等待,直到有新的数据添加

这里不需要再redis新增Stream和消息组

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.java</groupId>
    <artifactId>redis-study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.3</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


</project>

  配置文件

server.port=8080

spring.redis.database=0
spring.redis.host=192.168.0.101
spring.redis.port=6379
spring.redis.password=123

  代码结构

 

package com.java;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @Description:
 * @Author: qiuxie
 * @Create: 2023/6/15 18:38
 */
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

}

  

package com.java.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @Description:  消费者
 * @Author: qiuxie
 * @Create: 2023/6/15 18:30
 */
@Component
public class RedisConsumer  implements StreamListener<String, MapRecord<String,String,String>> {

    private final Logger log= LoggerFactory.getLogger(RedisConsumer.class);

    public static final String streamName= "qiuxieStream";

    public static final String groupName= "qiuxieGroup";

    public final String consumerName= "emailConsumer";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        log.warn("【消费者】Stream名称:{},消息内容:{}",streamName,message.getValue());

        //获取生产者消息
        Map<String, String> msgMap = message.getValue();
        log.info("msgMap:{}",msgMap);

        //业务逻辑 略

        //ps:通过 msgMap.get("key")  拿到需要参数执行业务逻辑

        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        //消息应答
        streamOperations.acknowledge( streamName,groupName,message.getId() );

    }
}

  

package com.java.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.time.Duration;
import java.util.Collections;

/**
 * @Description:
 * @Author: qiuxie
 * @Create: 2023/6/15 18:34
 */
@Configuration
public class RedisStreamConfig {

    @Autowired
    private RedisConsumer emailConsumer;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    private final Logger log= LoggerFactory.getLogger(RedisStreamConfig.class);

    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block读取超时时间
                .pollTimeout(Duration.ofSeconds(3))
                //count 数量(一次只获取一条消息)
                .batchSize(1000)
                //序列化规则
                .serializer( stringRedisSerializer )
                .build();
    }

    /**
     * 开启监听器接收消息
     */
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                                                          StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){

        StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 创建 stream 流
        if( !redisTemplate.hasKey(RedisConsumer.streamName)){
            redisTemplate.opsForStream().add(RedisConsumer.streamName, Collections.singletonMap("", ""));
            log.info("初始化stream:{} success", RedisConsumer.streamName);
        }

        //创建消费者组
        try {
            redisTemplate.opsForStream().createGroup(RedisConsumer.streamName, RedisConsumer.groupName);
        } catch (Exception e) {
            log.info("消费者组:{} 已存在", RedisConsumer.groupName);
        }

        //注册消费者 消费者名称,从哪条消息开始消费,消费者类
        // > 表示没消费过的消息
        // $ 表示最新的消息
        listenerContainer.receive(
                Consumer.from(RedisConsumer.groupName, emailConsumer.consumerName),
                StreamOffset.create(RedisConsumer.streamName, ReadOffset.lastConsumed()),
                emailConsumer
        );
        listenerContainer.start();
        return listenerContainer;
    }

}

  

package com.java.controller.front;

import com.java.service.impl.RedisConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 生产者
 * @Author: qiuxie
 * @Create: 2023/6/15 17:02
 */
@RestController
public class RedisProducerController {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;


    @GetMapping("/generateData")
    public String generateData(String message) {
        StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
        RecordId recordId = streamOperations.add(ObjectRecord.create(RedisConsumer.streamName, message));
        System.out.println("Published message with record ID: " + recordId);
        return "Success";
    }

}

  

package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Description:  设置年轻代和老年代的比例  1:4
 * E:S0:S1  8:1:1
 * @Author: Yourheart
 * @Create: 2023/4/21 11:27
 */
@SpringBootApplication()
public class MysqlServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(MysqlServiceApplication.class, args);
    }

}

  使用postman测试,127.0.0.1:8080/generateData?message=aaaaa