基于SpringBoot整合Redisson的延迟队列

发布时间 2023-06-28 16:46:58作者: 一池寒潭

需求:

         1.订单下单超过30分钟以后,如果还未支付,则自动转为取消支付状态

    2.订单收货超过七天以后,如果还未评价,则自动转为好评

    3.等类似需求

实现步骤:

        1.  引入redisson依赖

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.11.5</version>
        </dependency>

 

        <!--加载hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.3</version>
        </dependency>

         2. 创建延时队列到期事件处理方法,消费延时队列  

/**
 * @author dong
 */
public interface DelayQueueConsumer {

    /**
     * 执行延迟消息
     *
     * @param delayMessage delayMessage
     */
    void execute(DelayMessage delayMessage);

}

      3. 具体的延时队列消费实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author dong
 */
@Component
@Slf4j
public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer {

    @Override
    public void execute(DelayMessage delayMessage) {
        //处理自己过期的后的业务
        log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage);
    }
}

    4. 初始化队列

  注:SpringUtil.getBean,若是启动时报空指针,启动类加上@ComponentScan(basePackages={"com.ctw.utils","com.ctw.*"}),扫描自己SpringUtil所在的位置,若是用的hutool则改成@ComponentScan(basePackages={"cn.hutool.extra.spring"})

import com.ctw.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author dong
 */
@Slf4j
@Component
public class RedissonDelayQueueClient implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16);

    public void addDelayMessage(DelayMessage delayMessage) {
        log.info("delayMessage={}", delayMessage);
        if (delayQueueMap.get(delayMessage.getQueueName()) == null) {
            log.warn("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
            return;
        }
        delayMessage.setCreateTime("123456");
        RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName());
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer,并且service名称为 ${queueName}Consumer
        RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();

        for (RedisDelayQueueEnum queueEnum : queueEnums) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanId());
            if (delayQueueConsumer == null) {
                throw new RuntimeException("queueName=" + queueEnum.getBeanId() + ",delayQueueConsumer=null,请检查配置...");
            }
            // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
            // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
            RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);
            // 订阅新元素的到来,调用的是takeAsync(),异步执行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }
}

   5. 延迟队列业务实体

import com.alibaba.fastjson.JSON;
import lombok.Data;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 * @author dong
 */
@Data
public class DelayMessage implements Serializable {

    private String queueName;

    private Long delayTime;

    private TimeUnit timeUnit;

    private String msgBody;

    private String createTime;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

6. 延迟队列Enum

  注:如果没有业务不要产生过多的队列Enum,否则会找不到类而抛出异常

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {

    ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderAutoCancelDelayQueueConsumer");
//    ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");

    /**
     * 延迟队列 Redis Key
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private String beanId;

}

7. 此时只需要再下单成功的方法里面新增以下逻辑即可