延迟队列服务提供对外接口

发布时间 2023-07-05 16:33:30作者: 佛系粥米

延迟队列微服务:

        redis:list-执行时间<=当前时间     zset-当前时间<执行时间<当前时间+5分钟

        添加任务:【以防任务数量过大在,一旦服务器挂掉,内存所有的数据都消失了,所以要做数据持久化】添加任务到数据库、符合条件的任务添加到redis【list,zset】

        取消任务:删除数据库任务信息表中的任务,更新对应的任务日志表,删除redis中对应的任务

        拉取任务:按照任务类型和优先级从redis的list中拉取任务pop,更新数据库日志信息【删除任务,设置任务日志的status】,返回一个Task

        定时任务1:从redis中的zset找到<=当前时间的任务,[分布式锁结局多个部署项目定时处理问题],从zset中删除,移动到list中

        定时任务2:从数据库中将小于当前时间+5分钟即未来5分钟内要发生的任务,从数据库中同步到redis中,【后面拉取任务会涉及删除数据库中的任务】

 

延迟队列服务提供对外接口:

package com.heima.apis.schedule;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient("leadnews-schedule")
public interface IScheduleClient {

    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task);

    /**
     * 取消任务
     * @param taskId
     * @return
     */
    @GetMapping("/api/v1/task/{taskId}")
    public ResponseResult  cancelTask(@PathVariable("taskId") Long taskId);

    /**
     * 按照任务的类型和优先级拉取任务,是从redis中pop的
     * @return
     */
    @GetMapping("/api/v1/task/{type}/{priority}")
    public ResponseResult  poll(@PathVariable("type") int type, @PathVariable("priority") int priority);
}
package com.heima.schedule.feign;

import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
public class ScheduleClient implements IScheduleClient {


    @Autowired
    private TaskService taskService;

    /**
     * 添加延迟任务
     *
     * @param task
     * @return
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task) {

        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 取消任务
     *
     * @param taskId
     * @return
     */
    @GetMapping("/api/v1/task/{taskId}")
    public ResponseResult  cancelTask(@PathVariable("taskId") Long taskId){
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
     * 按照任务的类型和优先级拉取任务,是从redis中pop的
     *
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/{type}/{priority}")
    public ResponseResult  poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
        return ResponseResult.okResult(taskService.poll(type, priority));
    }
}