基于Redis的简易延时队列

发布时间 2023-12-09 16:48:56作者: 熊熊会发光哦

基于Redis的简易延时队列

一、背景

在实际的业务场景中,经常会遇到需要延时处理的业务,比如订单超时未支付,需要取消订单,或者是用户注册后,需要在一段时间内激活账号,否则账号失效等等。这些业务场景都可以通过延时队列来实现。
最近在实际业务当中就遇到了这样的一个场景,需要实现一个延时队列,用来处理订单超时未支付的业务。在网上找了一些资料,发现大部分都是使用了mq来实现,比如rabbitmq,rocketmq等等,但是这些mq都是需要安装的,而且还需要配置,对于此项目来说不想增加额外的依赖,所以就想到了使用redis来实现一个简易的延时队列。

二、实现思路

1. 业务场景

订单超时未支付,需要取消订单,这个业务场景可以分为两个步骤来实现:

  1. 用户下单后,将订单信息存入数据库,并将订单信息存入延时队列中,设置延时时间为30分钟。
  2. 30分钟后,从延时队列中取出订单信息,判断订单是否已支付,如果未支付,则取消订单。
  3. 如果用户在30分钟内支付了订单,则将订单从延时队列中删除。

2. 实现思路

  1. 使用redis的zset来实现延时队列,zset的score用来存储订单的超时时间,value用来存储订单信息。
  2. 使用redis的set来存储已支付的订单,set中的value为订单id。

三、实现代码

1. 使用了两个注解类分别标记生产者类、生产者方法,消费者方法

/**
 * @program: 
 * @description: redis延时队列生产者类注解,标记生产者类,用来扫描生产者类中的生产者方法,将生产者方法注册到redis延时队列中
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:32
 */
@Component
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueue {}
/**
 * @program: 
 * @description: 
 * 带有此注解的方法,方法的入参首先会被转换为json字符串,然后存入redis的zset中,score为当前时间+延时时间,value为json字符串
 * 当延时时间到达后,会从redis的zset中取出value,然后将value转换为入参类型,调用此方法,执行业务逻辑
 * 此注解只能标记在方法上,且方法必须为public,且只能有一个参数
 * 此注解标记的方法,必须在redis延时队列生产者类中,否则不会生效
 * @author: jiangchengxuan
 * @created:  2023/12/09 10:37
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueueMethod {
    String threadName() default "redis消息队列默认线程";
    String queueKey();   // 队列key值
    int threadNum() default 1;      //默认线程数量
    int threadSleepTime() default 500;  //默认线程休眠时间默认500ms
}

2. 生产者类具体实现

/**
 * @program: 
 * @description:  生产者类具体实现
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:44
 */
@Slf4j
@Component
public class DelayQueueWorkerConfig implements InitializingBean {
    private volatile boolean monitorStarted = false;

    private volatile boolean monitorShutDowned = false;

    private ExecutorService executorService;

    // 需要监控的延时队列
    @Autowired
    protected IDelayQueue<String> monitorQueue;

    @Autowired
    private ApplicationContext applicationContext;


    @Override
    public void afterPropertiesSet(){
        //spring工具类,可以获取指定注解的类
        Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class);
        for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) {
            Object bean = entry.getValue();
            Method[] methods = bean.getClass().getMethods();
            for (Method method : methods) {
                Annotation[] annotations = method.getDeclaredAnnotations();
                for (Annotation annotation : annotations) {
                    if (annotation instanceof RedisMessageQueueMethod) {
                        RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation;
                        //找的需要使用消息队列的方法后,
                        initExecuteQueue(queueMethod, method, bean);
                    }
                    }
                }
            }
        }


    /**
     * 初始化执行造作
     * @param queueAnnotations 注解
     * @param method 方法
     * @param bean 对象
     */
    void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) {
        String threadName = queueAnnotations.threadName();
        int threadNum = queueAnnotations.threadNum();
        int threadSheepTime = queueAnnotations.threadSleepTime();
        String queueKey = queueAnnotations.queueKey();
        //获取所有消息队列名称
        executorService = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                Thread.currentThread().setName(threadName + "[" + num + "]");
                //如果没有设置队列queuekey或者已经暂停则不执行
                while (!monitorShutDowned) {
                    String value = null;
                    try {
                        value = monitorQueue.get(queueKey);
                        // 获取数据时进行删除操作,删除成功,则进行处理,业务逻辑处理失败则继续添加回队列但是时间设置最大以达到保存现场的目的,防止并发获取重复数据
                        if (StringUtils.isNotEmpty(value)) {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value);
                            }
                            boolean success = (Boolean) method.invoke(bean, value);
                            // 失败重试
                            if (!success) {
                                success =  (Boolean) method.invoke(bean, value);;
                                if (!success) {
                                    log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value);
                                    monitorQueue.add(TimeUnit.DAYS,365, value, queueKey);
                                }
                            } else {
                                if (log.isDebugEnabled()) {
                                    log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value);
                                }
                            }
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted);
                            }
                            Thread.sleep(threadSheepTime);
                        }
                    } catch (Exception e) {
                        log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e);
                    }
                }
                log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed...");
            });
        }
        log.info("thread pool is started...");
    }

}
/**
 * @program: 
 * @description: 
 * 延时队列接口实现类,
 * 使用redis的zset实现延时队列,
 * @author: jiangchengxuan
 * @created:  2023/12/09 23:34
 */
public interface IDelayQueue <E> {
    /**
     * 向延时队列中添加数据
     *
     * @param score 分数
     * @param data  数据
     * @return true 成功 false 失败
     */
    boolean add(long score, E data,String queueKey);


    /**
     * 向延时队列中添加数据
     *
     * @param timeUnit 时间单位
     * @param time     延后时间
     * @param data     数据
     * @param queueKey
     * @return true 成功 false 失败
     */
    boolean add(TimeUnit timeUnit, long time, E data, String queueKey);

    /**
     * 从延时队列中获取数据
     * @param queueKey 队列key
     * @return 数据
     */
    String get(String queueKey);

    /**
     * 删除数据
     *
     * @param key
     * @param data 数据
     * @return
     */
    public<T> boolean rem(String key, T data) ;
}
/**
 * @program: 
 * @description:  redis操作类,封装了redis的操作方法,使用时直接注入即可使用,不需要关心redis的操作细节,使用时只需要关心业务逻辑即可
 * @author: jiangchengxuan
 * @created: 2023/12/09 23:35
 */
@Service
public class RedisDelayQueue implements IDelayQueue<String> {

    @Autowired
    private RedisService redisService;


    @Override
    public boolean add(long score, String data,String queueKey) {
        return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score);
    }

    @Override
    public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) {
        switch (timeUnit) {
            case SECONDS:
                return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey);
            case MINUTES:
                return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case HOURS:
                return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case DAYS:
                return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            default:
                return false;
        }
    }


    @Override
    public String get(String queueKey) {
        long now = System.currentTimeMillis();
        long min = Long.MIN_VALUE;
        Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10);
        if (!CollectionUtils.isEmpty(res)) {
            for (String data : res){
                // 删除成功,则进行处理,防止并发获取重复数据
                if (rem(queueKey, data)){
                    return data;
                }
            }
        }
        return null;
    }


    @Override
    public<T> boolean rem(String key, T data) {
        return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data);
    }
}
  1. 使用
@RedisMessageQueue
public class SomethingClass
{
    @Autowired
    private IDelayQueue<String> messageQueue;

    /**
     * 生产者,向队列中添加数据,30秒后消费者进行消费
     */
    public void test(){
        messageQueue.add(TimeUnit.SECONDS,30L,"这是参数数据","new_queue");
    }
    
    /**
     * 消费者,如果按此配置的话,会启动一个线程,线程名称为:测试线程名称,线程数量为1,线程休眠时间为10毫秒
     * 注意:queueKey需要与生产者中的queueKey保持一致才能进行消费
     * @param data 
     */
    @Override
    @RedisMessageQueueMethod(threadName = "测试线程名称",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10)
    public void testMethod(String data) {
        //do something
    }

}