Task的设计

发布时间 2023-06-28 15:46:07作者: 做时间的好朋友

1.Task的概念

  • Task是任务的概念
  • 在程序中定义为需要异步执行的一批动作
  • 并且具有延迟特性,延迟多少秒后执行的概念
  • 所以需要实现Delayed,Runnable接口
  • 因为是抽象
public abstract class Task implements Delayed, Runnable {
    private final String id;
    private final long start;

    /**
     *
     * @param id 定时任务ID
     * @param delayInMilliseconds 延迟执行时间,单位/毫秒
     */
    public Task(String id, long delayInMilliseconds) {
        this.id = id;
        this.start = System.currentTimeMillis() + delayInMilliseconds;
    }

    public String getId() {
        return id;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = this.start - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.start - ((Task) o).start);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null) return false;
        if (!(o instanceof Task t)) {
            return false;
        }
        return this.id.equals(t.getId());
    }

    @Override
    public int hashCode() {
        return this.id.hashCode();
    }
}

2.Task的实现OrderUnPaidTask(订单超时自动取消任务)

public class OrderUnPaidTask extends Task {
    /**
     * 默认延迟时间30分钟,单位毫秒
     */
    private static final long DELAY_TIME = 30 * 60 * 1000;

    private final Logger log = LoggerFactory.getLogger(OrderUnPaidTask.class);
    /**
     * 订单id
     */
    private final Long orderId;

    public OrderUnPaidTask(Long orderId, long delayInMilliseconds) {
        super("OrderUnPaidTask-" + orderId, delayInMilliseconds);
        this.orderId = orderId;
    }

    public OrderUnPaidTask(Long orderId) {
        super("OrderUnPaidTask-" + orderId, DELAY_TIME);
        this.orderId = orderId;
    }

    @Override
    public void run() {
        log.info("系统开始处理延时任务---订单超时未付款--- {}", this.orderId);

        NewBeeMallOrderMapper newBeeMallOrderMapper = SpringContextUtil.getBean(NewBeeMallOrderMapper.class);
        NewBeeMallOrderItemMapper newBeeMallOrderItemMapper = SpringContextUtil.getBean(NewBeeMallOrderItemMapper.class);
        NewBeeMallGoodsMapper newBeeMallGoodsMapper = SpringContextUtil.getBean(NewBeeMallGoodsMapper.class);
        NewBeeMallCouponService newBeeMallCouponService = SpringContextUtil.getBean(NewBeeMallCouponService.class);

        NewBeeMallOrder order = newBeeMallOrderMapper.selectByPrimaryKey(orderId);
        if (order == null) {
            log.info("系统结束处理延时任务---订单超时未付款--- {}", this.orderId);
            return;
        }
        if (order.getOrderStatus() != NewBeeMallOrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()) {
            log.info("系统结束处理延时任务---订单超时未付款--- {}", this.orderId);
            return;
        }

        // 设置订单为已取消状态
        order.setOrderStatus((byte) NewBeeMallOrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus());
        order.setUpdateTime(new Date());
        if (newBeeMallOrderMapper.updateByPrimaryKey(order) <= 0) {
            throw new RuntimeException("更新数据已失效");
        }

        // 商品货品数量增加
        List<NewBeeMallOrderItem> newBeeMallOrderItems = newBeeMallOrderItemMapper.selectByOrderId(orderId);
        for (NewBeeMallOrderItem orderItem : newBeeMallOrderItems) {
            if (orderItem.getSeckillId() != null) {
                Long seckillId = orderItem.getSeckillId();
                NewBeeMallSeckillMapper newBeeMallSeckillMapper = SpringContextUtil.getBean(NewBeeMallSeckillMapper.class);
                RedisCache redisCache = SpringContextUtil.getBean(RedisCache.class);
                if (!newBeeMallSeckillMapper.addStock(seckillId)) {
                    throw new RuntimeException("秒杀商品货品库存增加失败");
                }
                redisCache.increment(Constants.SECKILL_GOODS_STOCK_KEY + seckillId);
            } else {
                Long goodsId = orderItem.getGoodsId();
                Integer goodsCount = orderItem.getGoodsCount();
                if (!newBeeMallGoodsMapper.addStock(goodsId, goodsCount)) {
                    throw new RuntimeException("商品货品库存增加失败");
                }
            }
        }

        // 返还优惠券
        newBeeMallCouponService.releaseCoupon(orderId);
        log.info("系统结束处理延时任务---订单超时未付款--- {}", this.orderId);
    }
}

3.Task的服务类,用于处理task及调度task

springBean加载的时候,PostConstruct自动起一个线程,消费延迟队列,延迟队列里存储task,消费task,然后执行task.run

@Component
public class TaskService {
    private final DelayQueue<Task> delayQueue = new DelayQueue<>();

    @PostConstruct
    private void init() {
	    // 起了一个线程的调度线程池,为了后续兼容周期性任务
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
        executorService.execute(() -> {
            while (true) {
                try {
                    Task task = delayQueue.take();
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void addTask(Task task) {
        if (delayQueue.contains(task)) {
            return;
        }
        delayQueue.add(task);
    }

    public void removeTask(Task task) {
        delayQueue.remove(task);
    }

}

4.Task任务的添加

taskService.addTask(new OrderUnPaidTask(order.getOrderId(), 0));