springboot + redis stream做轻量级消息队列

发布时间 2023-08-03 14:27:48作者: 一贯可乐

背景

前面我们探讨了使用redis stream做消息中间件的可行性,结论是在保证数据并发量不大, 数据敏感性不高, 且不方便使用重量级MQ,kafka的情况下可以使用.
探讨过程:https://www.cnblogs.com/qds1401744017/p/17598613.html

java springboot项目如何操作redis stream呢

1. 环境准备

  • 安装redis
    linux版本: https://redis.io/download/
    windows版本: https://github.com/tporadowski/redis/releases

  • 修改配置打开指定ip的远程访问
    a. 修改bind配置
    windows版本是redis安装目录下redis.windows-service.conf
    linux版本是redis.conf
    修改bind这一项,注释掉的话就是默认全IP可以访问(生产环境有风险)

    bind 127.0.0.1
    

    b. 关闭保护模式,把protected-mode默认的yes改成no

    protected-mode no
    
    #Reids默认开启保护模式(protected-mode yes),开启之后只有本机可以连接,其他机器无法连接。
    保护模式开启的两个条件
    没有使用bind
    没有设置密码(即:没有设置requirepass)
    

    c. 添加密码

    requirepass 你的密码
    

    d. 防火墙允许6379端口

  • 建立stream和对应的消费者组
    打开redis-cli.exe客户端

    #新增流
    XADD dcir  * data 1
    XADD formation  * data 1
    XADD preCharge  * data 1
    XADD division  * data 1
    #新增消费者组,从末尾开始消费
    XGROUP CREATE dcir dcir-group-1 $
    XGROUP CREATE formation formation-group-1 $ 
    XGROUP CREATE preCharge preCharge-group-1 $ 
    XGROUP CREATE division division-group-1 $ 
    

2. 版本和依赖

<!-- springboot 版本 -->
<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.5.4</version>
	<relativePath/> <!-- lookup parent from repository -->
<!-- jdk版本 -->
</parent>
<properties>
	<java.version>1.8</java.version>
</properties>
<!-- redis依赖 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
</dependency>

3. 消息生产者

  • application.yml配置添加redis相关
spring:
  redis:
    host: 10.168.204.80
    database: 0
    port: 6379
    password: 123456
    timeout: 1000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
server:
  port: 8087
redisstream:
  stream: dcir
  • RedisStreamConfig.java
    读取application.yml中的stream配置
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "redisstream")
public class RedisStreamConfig {
    private String stream;
}
  • RedisPushService.java
    调用springboot-data-redis的redisTemplate发送消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Collections;

@Service
@Slf4j
public class RedisPushService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisStreamConfig redisStreamConfig;

    public void push(String msg){
        // 创建消息记录, 以及指定stream
        StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("data", msg)).withStreamKey(redisStreamConfig.getStream());
        // 将消息添加至消息队列中
        this.stringRedisTemplate.opsForStream().add(stringRecord);
        log.info("{}已发送消息:{}",redisStreamConfig.getStream(),msg);
    }
}

4.消费者

  • application.yml添加redis相关配置
spring:
  redis:
    database: 0
    host: 10.168.204.80
    port: 6379
    password: 123456
    timeout: 5000
    jedis:
      pool:
        max-idle: 10
        max-active: 50
        max-wait: 1000
        min-idle: 1
redisstream:
  dcirgroup: dcir-group-1
  dcirconsumer: dcir-consumer-1
  formationgroup: formation-group-1
  formationconsumer: formation-consumer-1
  divisiongroup: division-group-1
  divisionconsumer: division-consumer-1
  prechargegroup: precharge-group-1
  prechargeconsumer: precharge-consumer-1
  • RedisStreamConfig.java
    读取application.yml的配置
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "redisstream")
public class RedisStreamConfig {
    static final String DCIR = "dcir";
    static final String PRECHARGE = "preCharge";
    static final String FORMATION = "formation";
    static final String DIVISION = "division";
    private String stream;
    private String group;
    private String consumer;
    private String dcirgroup;
    private String formationgroup;
    private String divisiongroup;
    private String prechargegroup;
    private String dcirconsumer;
    private String formationconsumer;
    private String divisionconsumer;
    private String prechargeconsumer;
}
  • RedisStreamConsumerConfig.java
    把消费者和Listener绑定
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.time.Duration;
import java.util.concurrent.ExecutorService;

@Configuration
@Slf4j
public class RedisStreamConsumerConfig {

    @Autowired
    ExecutorService executorService;

    @Autowired
    RedisStreamConfig redisStreamConfig;

    /**
     * 主要做的是将OrderStreamListener监听绑定消费者,用于接收消息
     *
     * @param connectionFactory
     * @param streamListener
     * @return
     */
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> dcirConsumerListener(
            RedisConnectionFactory connectionFactory,
            DcirStreamListener streamListener) {
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
                streamContainer(redisStreamConfig.DCIR, connectionFactory, streamListener);
        container.start();
        return container;
    }
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> divisionConsumerListener(
            RedisConnectionFactory connectionFactory,
            DivisionStreamListener streamListener) {
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
                streamContainer(redisStreamConfig.DIVISION, connectionFactory, streamListener);
        container.start();
        return container;
    }
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> formationConsumerListener(
            RedisConnectionFactory connectionFactory,
            FormationStreamListener streamListener) {
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
                streamContainer(redisStreamConfig.FORMATION, connectionFactory, streamListener);
        container.start();
        return container;
    }
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> preChargeConsumerListener(
            RedisConnectionFactory connectionFactory,
            PrechargeStreamListener streamListener) {
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
                streamContainer(redisStreamConfig.PRECHARGE, connectionFactory, streamListener);
        container.start();
        return container;
    }







    /**
     * @param mystream          从哪个流接收数据
     * @param connectionFactory
     * @param streamListener    绑定的监听类
     * @return
     */
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String mystream, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间
                        .batchSize(10) // 批量抓取消息
                        .targetType(String.class) // 传递的数据类型
                        .executor(executorService)
                        .build();
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
                .create(connectionFactory, options);
        //指定消费最新的消息
        StreamOffset<String> offset = StreamOffset.create(mystream, ReadOffset.lastConsumed());
        //创建消费者
        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = null;
        try {
            streamReadRequest = buildStreamReadRequest(offset, streamListener);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        //指定消费者对象
        container.register(streamReadRequest, streamListener);
        return container;
    }

    private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {
        Consumer consumer = null;
        if(streamListener instanceof DcirStreamListener){
            consumer = Consumer.from(redisStreamConfig.getDcirgroup(), redisStreamConfig.getDcirconsumer());
        }else if(streamListener instanceof DivisionStreamListener){
            consumer = Consumer.from(redisStreamConfig.getDivisiongroup(), redisStreamConfig.getDivisionconsumer());
        }else if(streamListener instanceof FormationStreamListener){
            consumer = Consumer.from(redisStreamConfig.getFormationgroup(), redisStreamConfig.getFormationconsumer());
        }else if(streamListener instanceof PrechargeStreamListener){
            consumer = Consumer.from(redisStreamConfig.getPrechargegroup(), redisStreamConfig.getPrechargeconsumer());
        }else{
            throw new Exception("无法识别的stream key");
        }
        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest.builder(offset)
                .errorHandler((error) -> {
                    error.printStackTrace();
                    log.error(error.getMessage());
                })
                .cancelOnError(e -> false)
                .consumer(consumer)
                //关闭自动ack确认
                .autoAcknowledge(false)
                .build();
        return streamReadRequest;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

}
  • 其中一个消费者listener:dcirStreamListener
import com.alibaba.fastjson.JSONObject;
import com.qds.k2h.domain.*;
import com.qds.k2h.service.DcirDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class DcirStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Autowired
    RedisStreamConfig redisStreamConfig;

    @Autowired
    DcirDataService dcirDataService;

    @Override
    protected void finalize() throws Throwable {
        super.finalize();
    }

    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        try{
            // 消息ID
            RecordId messageId = message.getId();

            // 消息的key和value
            String string = message.getValue();
            log.info("dcir获取到数据。messageId={}, stream={}, body={}", messageId, message.getStream(), string);
            DcirData data = JSONObject.parseObject(string, DcirData.class);
                data.setDeviceSn(data.getLineNo() + "_" + data.getCabNo() + "_" + data.getCellNo() + "_" + data.getChannelNo());
                data.setCellTemp(data.getCellT1() + "," + data.getCellT2() + "," + data.getCellT3() + "," + data.getCellT4() + "," + data.getCellT5() + "," + data.getCellT6());
                if (Boolean.TRUE.equals(data.getStepStop())) {
                    try {
                        dcirDataService.insertMysql(data);
                        log.info("已插入截止数据: {}",data);
                        if (data.getRunState() == 4) {
                            DcirQuery dcirQuery = new DcirQuery();
                            dcirQuery.setBatchNo(data.getBatchNo());
                            dcirQuery.setBatteryNo(data.getBatteryNo());
                            dcirQuery.setDeviceSn(data.getDeviceSn());
                            dcirQuery.setRecipeName(data.getRecipeName());
                            dcirQuery.setStartTime(data.getCurrentTime());
                            dcirQuery.setTrayNo(data.getTrayNo());
                            dcirDataService.insertForQuery(dcirQuery);
                        }
                        if (data.getRunState() == 0 || data.getRunState() == 3) {
                            DcirQuery dcirQuery = new DcirQuery();
                            dcirQuery.setDeviceSn(data.getDeviceSn());
                            dcirQuery.setBatchNo(data.getBatchNo());
                            dcirQuery.setEndTime(data.getCurrentTime());
                            dcirDataService.updateForQuery(dcirQuery);
                        }
                    } catch (Exception e) {
                        log.error("{}--{}dcir插入mysql失败", data, e.getMessage());
                    }
                } else {
                    try {
                        dcirDataService.insertIntoTd(data);
                    } catch (Exception e) {
                        log.error(e.getMessage() + "错误数据:{}",data);
                        throw new Exception(e);
                    }
                }
            // 通过RedisTemplate手动确认消息
            this.stringRedisTemplate.opsForStream().acknowledge(redisStreamConfig.getDcirgroup(), message);
        }catch (Exception e){
            // 处理异常
            e.printStackTrace();
        }
    }
}