springboot quartz 定时任务

发布时间 2023-08-10 18:14:22作者: wczhw

定时任务实现方式

  1. quartz 定时调用http 请求
  2. quertz 定时调用 openfegin
  3. quartz 定时调用普通定时任务

springboot 集成quartz

pom.xml 添加配置

   <dependency>
	   <groupId>org.springframework.boot</groupId>
	   <artifactId>spring-boot-starter-quartz</artifactId>
   </dependency>
	<dependency>
		<groupId>org.redisson</groupId>
		<artifactId>redisson</artifactId>
	</dependency>

数据库设计

CREATE TABLE `wxapp_dispatch` (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'id',
  `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '任务名称',
  `sys_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '系统名称',
  `user_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '登录用户id',
  `cron` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'corn表达式',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求地址',
  `type` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求类型',
  `code` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '代码',
  `header` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '头部参数',
  `param` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求参数',
  `weight` int DEFAULT NULL COMMENT '权重',
  `request_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求类型',
  `open` tinyint(1) DEFAULT NULL COMMENT '是否开启',
  `counts` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '执行次数',
  `last_time` datetime DEFAULT NULL COMMENT '最后一次执行时间',
  `stop_time` datetime DEFAULT NULL COMMENT '停止时间',
  `dispatch_group` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'group',
  `class_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '类名',
  `dispatch_status` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '强求状态',
  `remark` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '备注',
  `status` tinyint(1) DEFAULT NULL COMMENT '状态;true 存在 false 不存在',
  `create_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '创建人',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '更新人',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='定时调度表';

CREATE TABLE `wxapp_dispatch_logs` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  `dispatch_code` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '调度id',
  `run_time` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '运行时间',
  `type` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '失败类型',
  `logs` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '日志',
  `status` tinyint(1) DEFAULT NULL COMMENT '状态;true 存在 false 不存在',
  `create_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '创建人',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '更新人',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1684530168993525763 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='调度日志';

quartz 配置

package com.wxapp.dispatch.service.config;

import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;

import java.io.IOException;
import java.util.Properties;

@Configuration
public class SchedulerConfig {
    @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext) {
        SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        return jobFactory;
    }

    /**
     * 调度工厂bean
     *
     * @param jobFactory
     * @throws IOException
     */
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setJobFactory(jobFactory);
        //QuartzScheduler 延时启动,应用启动完5秒后 QuartzScheduler 再启动
        factory.setStartupDelay(1);
        // this allows to update triggers in DB when updating settings in config file:
        //用于quartz集群,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
        factory.setOverwriteExistingJobs(true);
        //用于quartz集群,加载quartz数据源配置
        factory.setQuartzProperties(quartzProperties());
        factory.setAutoStartup(true);
        return factory;
    }

    /**
     * 加载quartz数据源配置,quartz集群时用到
     */
    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        Properties properties = new Properties();
        properties.put("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
        properties.put("org.quartz.scheduler.instanceId", "AUTO");
        properties.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
        properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        properties.put("org.quartz.threadPool.makeThreadsDaemons", "true");
        properties.put("org.quartz.threadPool.threadCount", "20");
        properties.put("org.quartz.threadPool.threadPriority", "5");
        properties.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
        properties.put("org.quartz.jobStore.misfireThreshold", "60000");
        propertiesFactoryBean.setProperties(properties);
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }
}

quartz util

package com.wxapp.dispatch.service.util;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.util.check.CheckUtil;
import com.wxapp.common.util.copy.CopyUtil;
import com.wxapp.dispatch.api.pojo.entity.DispatchDO;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DispatchVo;
import com.wxapp.dispatch.service.listener.TimerSchedulerLister;
import com.wxapp.dispatch.service.mapper.DispatchLogsMapper;
import com.wxapp.dispatch.service.mapper.DispatchMapper;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
 * 定时任务增删改查
 * CommandLineRunner 项目启动时调用run 方法把已开启的定时任务开启
 */
@Component
public class JobUtil implements CommandLineRunner {

    private final Logger log = LoggerFactory.getLogger(JobUtil.class);
    private final DispatchMapper dispatchMapper;
    private final DispatchLogsMapper dispatchLogsMapper;

    public JobUtil(DispatchMapper dispatchMapper, DispatchLogsMapper dispatchLogsMapper) {
        this.dispatchMapper = dispatchMapper;
        this.dispatchLogsMapper = dispatchLogsMapper;
    }

    @Override
    public void run(String... args) throws Exception {
        var query = new LambdaQueryWrapper<DispatchDO>();
        query.eq(DispatchDO::getStatus, true).eq(DispatchDO::getOpen,true)
                .orderByAsc(DispatchDO::getWeight);
        var dispatchDOS = dispatchMapper.selectList(query);
        if (CheckUtil.isNotNullList(dispatchDOS)) {
            addOrUpdateJob(CopyUtil.copyListNew(dispatchDOS, DispatchVm::new, null));
        }
    }

    /**
     * 添加或者更新job
     */
    public void addOrUpdateJob(List<DispatchVm> dispatchVm) throws SchedulerException {
        if (CheckUtil.isNullList(dispatchVm)) {
            return;
        }
        Map<JobDetail, Set<? extends Trigger>> triggersAndJobs = new HashMap<>(0);
        var scheduler = getScheduler();
        dispatchVm.forEach(st -> {
            //新增定时任务
            try {
                //获去调度器实例
                TriggerKey triggerKey = TriggerKey.triggerKey(st.getCode(), st.getDispatchGroup());
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                Class clazz = Class.forName(st.getClassName());
                clazz.getDeclaredConstructor().newInstance();
                // 构建job信息
                JobDetail jobDetail = JobBuilder.newJob(clazz).usingJobData(dealData(st)).withIdentity(st.getCode(), st.getDispatchGroup()).build();
                // 表达式调度构建器(即任务执行的时间)
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(st.getCron());
                TriggerBuilder triggerBuilder;
                if (trigger == null) {
                    // 按新的cronExpression表达式构建一个新的trigger
                      triggerBuilder = TriggerBuilder.newTrigger().withIdentity(st.getCode(), st.getDispatchGroup()).withSchedule(scheduleBuilder).endAt(st.getStopTime());
                } else {
                      triggerBuilder = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).endAt(st.getStopTime());
                }
                if (st.getOpen()){
                    trigger= (CronTrigger) triggerBuilder.startNow().build();
                }else {
                    trigger= (CronTrigger) triggerBuilder.build();
                }
                //设置job执行
                triggersAndJobs.put(jobDetail, Set.of(trigger));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        if (scheduler != null) {
            scheduler.scheduleJobs(triggersAndJobs, true);
            scheduler.start();
        }

    }
    /**
     * 暂停job
     */
    public void removeOrStopJob(DispatchVm dispatchVm) {
        try {
            var scheduler = getScheduler();
            JobKey jobKey = JobKey.jobKey(dispatchVm.getCode(), dispatchVm.getDispatchGroup());
            scheduler.deleteJob(jobKey);
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 立即执行
     */
    public void runAJobNow(DispatchVm dispatchVm) {
        try {
            var scheduler = getScheduler();
            JobKey jobKey = JobKey.jobKey(dispatchVm.getCode(), dispatchVm.getDispatchGroup());
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取所有的执行job
     */
    public List<DispatchVo> getAllJob() {
        try {
            var scheduler = getScheduler();
            List<DispatchVo> jobList = new ArrayList<DispatchVo>();
            GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
            jobKeys.forEach(jobKey -> {
                List<? extends Trigger> triggers = null;
                try {
                    triggers = scheduler.getTriggersOfJob(jobKey);
                } catch (SchedulerException e) {
                    throw new RuntimeException(e);
                }
                if (triggers != null) {
                    triggers.forEach(trigger -> {
                        DispatchVo job = new DispatchVo();
                        job.setName(jobKey.getName());
                        job.setDispatchGroup(jobKey.getGroup());
                        if (trigger instanceof CronTrigger) {
                            CronTrigger cronTrigger = (CronTrigger) trigger;
                            String cronExpression = cronTrigger.getCronExpression();
                            job.setCron(cronExpression);
                        }
                        jobList.add(job);
                    });
                }
            });
            return jobList;
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取正在运行的job
     */
    public List<DispatchVo> getRunningJob() {
        try {
            var scheduler = getScheduler();
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
            var jobList = new ArrayList<DispatchVo>();
            executingJobs.forEach(s -> {
                DispatchVo job = new DispatchVo();
                JobDetail jobDetail = s.getJobDetail();
                JobKey jobKey = jobDetail.getKey();
                job.setCode(jobKey.getName());
                job.setDispatchGroup(jobKey.getGroup());
                Trigger trigger = s.getTrigger();
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    job.setCron(cronExpression);
                }
                jobList.add(job);
            });
            return jobList;
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 暂停所有job
     */
    public void pauseAllJobs() {
        try {
            var scheduler = getScheduler();
            scheduler.pauseAll();
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 恢复所有job
     */
    public void resumeAllJobs() {
        try {
            var scheduler = getScheduler();
            scheduler.resumeAll();
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 设置参数
     *
     * @param dispatchVm
     * @return
     */
    private JobDataMap dealData(DispatchVm dispatchVm) {
        JobDataMap jobDataMap = new JobDataMap();
        if (CheckUtil.isNotNullObject(dispatchVm.getParam())) {
            jobDataMap.putAll(JacksonUtil.parseObject(dispatchVm, Map.class));
        }
        log.info("调度参数" + JacksonUtil.toJSONStringPretty(jobDataMap));
        return jobDataMap;
    }

    private Scheduler getScheduler() {
        try {
            var scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.getListenerManager().addSchedulerListener(new TimerSchedulerLister(dispatchLogsMapper));
            return scheduler;
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return null;
    }
}

quartz 定时任务增删改查

  1. controller
package com.wxapp.dispatch.service.controller;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wxapp.common.constant.base.IBaseController;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.pojo.vm.dispatch.AddVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.PageVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.UpdateVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DetailVo;
import com.wxapp.dispatch.api.pojo.vo.dispatch.PageVo;
import com.wxapp.dispatch.service.service.DispatchService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.quartz.SchedulerException;
import org.springframework.web.bind.annotation.*;

/**
 * @Description 调度管理
 * @Datetime 2023/5/26 17:58
 * @Modified By
 */
@RestController
@Api(tags = "调度管理")
@RequestMapping("dispatch")
public class DispatchController extends IBaseController {

    private DispatchService dispatchService;

    public DispatchController(DispatchService dispatchService) {
        this.dispatchService = dispatchService;
    }

    @PostMapping("page")
    @ApiOperation("定时任务分页")
    public ResultVo<Page<PageVo>> page(@RequestBody PageVm vm) {
        return dispatchService.page(vm);
    }

    @GetMapping("detail/{id}")
    @ApiOperation("详情")
    public ResultVo<DetailVo> detail(@PathVariable String id) {
        return dispatchService.detail(id);
    }

    @PostMapping("add")
    @ApiOperation("添加")
    public ResultVo<String> add(@RequestBody AddVm vm) throws SchedulerException {
        return dispatchService.add(vm);
    }

    @PutMapping("update")
    @ApiOperation("更新")
    public ResultVo<String> update(@RequestBody UpdateVm vm) throws SchedulerException {
        return dispatchService.update(vm);
    }

    @DeleteMapping("delete/{id}")
    @ApiOperation("删除定时任务")
    public ResultVo<String> delete(@PathVariable String id) {
        return dispatchService.delete(id);
    }


    @PutMapping("pauseOrRes/{id}")
    @ApiOperation("暂停或恢复执行定时任务")
    public ResultVo<String> pauseOrRes(@PathVariable String id, @RequestBody Boolean isPause) throws SchedulerException {
        return dispatchService.pauseOrRes(id, isPause);
    }
}
  1. service
package com.wxapp.dispatch.service.service;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.pojo.vm.dispatch.AddVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.PageVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.UpdateVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DetailVo;
import com.wxapp.dispatch.api.pojo.vo.dispatch.PageVo;
import org.quartz.SchedulerException;

/**
 * @Description
 * @Datetime 2023/5/26 18:01
 * @Modified By
 */
public interface DispatchService {
    ResultVo<String> add(AddVm vm) throws SchedulerException;

    ResultVo<DetailVo> detail(String id);

    ResultVo<Page<PageVo>> page(PageVm vm);

    ResultVo<String> update(UpdateVm vm) throws SchedulerException;

    ResultVo<String> delete(String id);

    ResultVo<String> pauseOrRes(String id, Boolean isPause) throws SchedulerException;
}

  1. serviceimpl
package com.wxapp.dispatch.service.service.impl;

import com.baomidou.mybatisplus.core.enums.SqlKeyword;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wxapp.common.constant.base.IBaseService;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.common.util.bean.SpringUtils;
import com.wxapp.common.util.condition.ConditionUtil;
import com.wxapp.common.util.copy.CopyUtil;
import com.wxapp.common.util.random.RandomUtil;
import com.wxapp.dispatch.api.pojo.entity.DispatchDO;
import com.wxapp.dispatch.api.enums.DispatchStatusEnum;
import com.wxapp.dispatch.api.pojo.vm.dispatch.AddVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.PageVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.UpdateVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DetailVo;
import com.wxapp.dispatch.api.pojo.vo.dispatch.PageVo;
import com.wxapp.dispatch.service.mapper.DispatchMapper;
import com.wxapp.dispatch.service.service.DispatchService;
import com.wxapp.dispatch.service.util.JobUtil;
import org.quartz.SchedulerException;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Description
 * @Datetime 2023/5/26 18:01
 * @Modified By
 */
@Service
public class DispatchServiceImpl extends IBaseService implements DispatchService {
    private DispatchMapper dispatchMapper;

    private JobUtil jobUtil;

    public DispatchServiceImpl(DispatchMapper dispatchMapper, JobUtil jobUtil) {
        this.dispatchMapper = dispatchMapper;
        this.jobUtil = jobUtil;
    }

    @Override
    public ResultVo<String> add(AddVm vm) throws SchedulerException {
        try {
            var dispatch = CopyUtil.copyObjectNew(vm, DispatchDO::new, (s, r) -> {
                r.setCode(RandomUtil.getUUID());
                r.setOpen(false);
                r.setDispatchStatus(DispatchStatusEnum.PAUSE);
            });
            Object  classBean = SpringUtils.getBean(vm.getClassName());
            dispatch.setClassName(classBean.getClass().getName());
            dispatchMapper.insert(dispatch);
            var dispatchVm = CopyUtil.copyObjectNew(dispatch, DispatchVm::new, null);
            if (dispatch.getOpen()) {
                jobUtil.addOrUpdateJob(List.of(dispatchVm));
            }
        } catch (Exception e) {
            return error("没有找到类信息");
        }
        return success();
    }

    @Override
    public ResultVo<DetailVo> detail(String id) {
        var dispatchDO = dispatchMapper.selectById(id);
        return success(CopyUtil.copyObjectNew(dispatchDO, DetailVo::new, null));
    }

    @Override
    public ResultVo<Page<PageVo>> page(PageVm vm) {
        Map<SFunction<DispatchDO, ?>, SqlKeyword> condition = new HashMap<>();
        condition.put(DispatchDO::getName, SqlKeyword.LIKE);
        condition.put(DispatchDO::getLastTime, SqlKeyword.DESC);
        var page = ConditionUtil.getPage(vm, DispatchDO.class);
        var conditions = ConditionUtil.getSCondition(DispatchDO::new, vm, PageVo.class, condition);
        var doPage = dispatchMapper.selectPage(page, conditions);
        return success(CopyUtil.copyPage(doPage, PageVo::new, null));
    }

    @Override
    public ResultVo<String> update(UpdateVm vm) throws SchedulerException {
        var dispatchDO = dispatchMapper.selectById(vm.getId());
        CopyUtil.copyObject(vm, dispatchDO, (s, r) -> {
            r.setClassName(SpringUtils.getBean(s.getClassName()).getClass().getName());
        });
        dispatchMapper.updateById(dispatchDO);
        var dispatchVm = CopyUtil.copyObjectNew(dispatchDO, DispatchVm::new, null);
        jobUtil.addOrUpdateJob(List.of(dispatchVm));
        return success();
    }

    @Override
    public ResultVo<String> delete(String id) {
        var dispatchDO = dispatchMapper.selectById(id);
        dispatchDO.setStatus(false);
        dispatchMapper.updateById(dispatchDO);
        var dispatchVm = CopyUtil.copyObjectNew(dispatchDO, DispatchVm::new, null);
        jobUtil.removeOrStopJob(dispatchVm);
        return success();
    }

    @Override
    public ResultVo<String> pauseOrRes(String id, Boolean isPause) throws SchedulerException {
        var dispatchDO = dispatchMapper.selectById(id);
        dispatchDO.setOpen(!isPause);
        if (isPause) {
            dispatchDO.setDispatchStatus(DispatchStatusEnum.PAUSE);
        } else {
            dispatchDO.setDispatchStatus(DispatchStatusEnum.RUN);
        }
        dispatchMapper.updateById(dispatchDO);
        var dispatchVm = CopyUtil.copyObjectNew(dispatchDO, DispatchVm::new, null);
        if (isPause) {
            jobUtil.removeOrStopJob(dispatchVm);
        } else {
            jobUtil.addOrUpdateJob(List.of(dispatchVm));
        }
        return success();
    }
}

SchedulerLister 定时任务监听

package com.wxapp.dispatch.service.listener;

import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.util.bean.SpringUtils;
import com.wxapp.dispatch.api.pojo.entity.DispatchLogsDO;
import com.wxapp.dispatch.service.mapper.DispatchLogsMapper;
import com.wxapp.dispatch.service.util.JobUtil;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class TimerSchedulerLister implements SchedulerListener {
    private static final Logger log = LoggerFactory.getLogger(JobUtil.class);

    private DispatchLogsMapper dispatchLogsMapper;

    private Map map = new ConcurrentHashMap();

    public TimerSchedulerLister(DispatchLogsMapper dispatchLogsMapper) {
        this.dispatchLogsMapper = dispatchLogsMapper;
    }

    @Override
    public void jobScheduled(Trigger trigger) {
        map.put("name", trigger.getKey().getName());
        map.put("group", trigger.getKey().getGroup());
        System.out.println("调用时参数:" + JacksonUtil.toJSONString(map));
    }

    @Override
    public void jobUnscheduled(TriggerKey triggerKey) {

    }

    @Override
    public void triggerFinalized(Trigger trigger) {
    }

    @Override
    public void triggerPaused(TriggerKey triggerKey) {
        log.info("暂停:" + triggerKey.getName());
    }

    @Override
    public void triggersPaused(String triggerGroup) {

    }

    @Override
    public void triggerResumed(TriggerKey triggerKey) {
        log.info("取消暂停:" + triggerKey.getName());
    }

    @Override
    public void triggersResumed(String triggerGroup) {
        log.info("取消暂停:" + triggerGroup);
    }

    @Override
    public void jobAdded(JobDetail jobDetail) {

    }

    @Override
    public void jobDeleted(JobKey jobKey) {

    }

    @Override
    public void jobPaused(JobKey jobKey) {

    }

    @Override
    public void jobsPaused(String jobGroup) {

    }

    @Override
    public void jobResumed(JobKey jobKey) {

    }

    @Override
    public void jobsResumed(String jobGroup) {

    }

    @Override
    public void schedulerError(String msg, SchedulerException cause) {
        log.error("定时任务保存:" + msg);
        DispatchLogsDO dispatchLogsDO = new DispatchLogsDO();
        dispatchLogsDO.setDispatchCode((String) map.get("code"));
        dispatchLogsDO.setLogs(cause.getMessage());
        dispatchLogsMapper.insert(dispatchLogsDO);
    }

    @Override
    public void schedulerInStandbyMode() {
    }

    @Override
    public void schedulerStarted() {

    }

    @Override
    public void schedulerStarting() {

    }

    @Override
    public void schedulerShutdown() {

    }

    @Override
    public void schedulerShuttingdown() {

    }

    @Override
    public void schedulingDataCleared() {

    }
}

job 实现

我自己目录如下
image

  1. BaseJob
package com.wxapp.dispatch.service.job;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.common.util.bean.SpringUtils;
import com.wxapp.common.util.check.CheckUtil;
import com.wxapp.common.util.redisson.RedissonUtil;
import com.wxapp.dispatch.api.feign.LoginFeign;
import com.wxapp.dispatch.api.pojo.entity.DispatchDO;
import com.wxapp.dispatch.api.pojo.po.vm.PLoginVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.openfegin.OpenFeginJob;
import com.wxapp.dispatch.service.mapper.DispatchMapper;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;
import java.util.Date;

public abstract class BaseJob extends QuartzJobBean {
    private static final Logger log = LoggerFactory.getLogger(OpenFeginJob.class);

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
 // 分布式事务
        if (!RedissonUtil.tryLock(context.getJobDetail().getKey().toString())) {
            log.error("*****************调度任务" + context.getJobDetail().getKey().toString() + "正在执行*********************");
            return;
        }
        log.info("**********执行定时任务调用开始******************");
        var startTime = System.currentTimeMillis();
        try {
            //获取请求参数
            var dispatchVm = JacksonUtil.parseObject(JacksonUtil.toJSONString(context.getMergedJobDataMap()), DispatchVm.class);
            //获取token
            if (CheckUtil.isNotNullString(dispatchVm.getUserId())) {
                ResultVo<PLoginVm> resultVo;
                String token = "";
                var loginFeign = SpringUtils.getBean(LoginFeign.class);
                if (context.getNextFireTime() != null) {
                    resultVo = loginFeign.remoteLogin(dispatchVm.getUserId(), (new Date().getTime() - context.getNextFireTime().getTime()) / 1000 - 1);
                } else {
                    resultVo = loginFeign.remoteLogin(dispatchVm.getUserId(), null);
                }
                if (resultVo.getData() != null) {
                    token = resultVo.getData().getToken();
                }
                dispatchVm.setToken(token);
                log.info("登录获取的token" + token);
            }
            log.info("**********执行结果:" + JacksonUtil.toJSONString(toDealJob(context, dispatchVm)) + "******************");
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            saveDispatch(context.getJobDetail().getKey().getName());
            RedissonUtil.unLock(context.getJobDetail().getKey().toString());
            log.info("**********执行定时任务" + context.getJobDetail().getKey().toString() + "结束******************");
            log.info("************************执行时间:" + (System.currentTimeMillis() - startTime) + "ms ******************************");
        }
    }

    @Async
    protected void saveDispatch(String code) {
        var wrapper = new LambdaUpdateWrapper<DispatchDO>();
        wrapper.eq(DispatchDO::getCode, code)
                .set(DispatchDO::getLastTime, LocalDateTime.now())
                .setSql("`counts`=`counts`+1");
        SpringUtils.getBean(DispatchMapper.class).update(null, wrapper);
    }

    protected abstract Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm);

}

  1. OpenFeginJob quartz 调用OpenFegin
package com.wxapp.dispatch.service.job.openfegin;

import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.BaseJob;
import com.wxapp.dispatch.service.service.DynamicClientService;
import lombok.NoArgsConstructor;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;

@Component(value = "OpenFeginJob")
@NoArgsConstructor
public class OpenFeginJob extends BaseJob {
    private DynamicClientService dynamicClientService;

    public OpenFeginJob(DynamicClientService dynamicClientService) {
        this.dynamicClientService = dynamicClientService;
    }

    @Override
    public Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm) {
        ResultVo<Object> res;
        //请求
        switch (dispatchVm.getType().getValue()) {
            case "POST":
                res = dynamicClientService.executePostApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
                break;
            case "DELETE":
                res = dynamicClientService.executeDeleteApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
                break;
            case "PUT":
                res = dynamicClientService.executePutApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
                break;
            case "GET":
            default:
                res = dynamicClientService.executeGetApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
                break;
        }
        return res;
    }
}
  1. HttpRequestJob quartz http 调用
package com.wxapp.dispatch.service.job.http;

import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.constant.vo.HttpRequestVo;
import com.wxapp.common.util.http.HttpRequestUtil;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.BaseJob;
import lombok.NoArgsConstructor;
import org.quartz.JobExecutionContext;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component(value = "HttpJob")
@NoArgsConstructor
public class HttpRequestJob extends BaseJob {
    @Override
    public Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm) {
        Map<String, Object> data = dispatchVm.getParam() == null ? null : JacksonUtil.parseObject(dispatchVm.getParam(), Map.class);
        HttpRequestVo<Map<String, Object>> request = new HttpRequestVo<>(dispatchVm.getUrl(), null, data, dispatchVm.getMapHearder(), data, "200", "code", "定时任务调用失败");
        Map res;
        //请求
        switch (dispatchVm.getRequestType().getValue()) {
            case "POST":
                request.setMethod(HttpMethod.POST);
                break;
            case "DELETE":
                request.setMethod(HttpMethod.DELETE);
                break;
            case "PUT":
                request.setMethod(HttpMethod.PUT);
                break;
            case "GET":
            default:
                request.setMethod(HttpMethod.GET);
                break;
        }
        return HttpRequestUtil.request(request);
    }
}

  1. CommonTextJob 普通的quartz 调用
package com.wxapp.dispatch.service.job.common;

import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.BaseJob;
import lombok.NoArgsConstructor;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;

/**
 * @Description
 * @Datetime 2023/6/2 9:28
 * @Modified By
 */
@Component(value = "CommonTextJob")
@NoArgsConstructor
public class CommonTextJob extends BaseJob {
    @Override
    protected Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm) {
	// 处理业务
        return null;
    }
}

openfegin 调用是用代理模式开发的

  1. openfegin service
package com.wxapp.dispatch.service.service;

import com.wxapp.common.constant.vo.ResultVo;

/**
 * @Description
 * @Datetime 2023/5/25 16:18
 * @Modified By
 */
public interface DynamicClientService {
    /**
     * post 请求
     */
    public ResultVo<Object> executePostApi(String feignName, String url, String token, Object params);

    /**
     * get请求
     */
    public ResultVo<Object> executeGetApi(String feignName, String url, String token, Object params);

    /**
     * delete 请求
     */
    public ResultVo<Object> executeDeleteApi(String feignName, String url, String token, Object params);

    /**
     * put 请求
     */
    public ResultVo<Object> executePutApi(String feignName, String url, String token, Object params);
}

  1. openfegin serviceimpl
package com.wxapp.dispatch.service.service.impl;

import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.feign.DynamicFeign;
import com.wxapp.dispatch.service.service.DynamicClientService;
import com.wxapp.dispatch.service.service.FeignClientFactoryService;
import org.springframework.stereotype.Service;

@Service
public class DynamicClientServiceImpl implements DynamicClientService {

    private FeignClientFactoryService<DynamicFeign> feignFeignClientFactory;

    public DynamicClientServiceImpl(FeignClientFactoryService<DynamicFeign> feignFeignClientFactory) {
        this.feignFeignClientFactory = feignFeignClientFactory;
    }

    @Override
    public ResultVo<Object> executePostApi(String feignName, String url, String token, Object params) {
        DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
        return dynamicService.executePostApi(url, token, params);
    }

    @Override
    public ResultVo<Object> executeGetApi(String feignName, String url, String token, Object params) {
        DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
        return dynamicService.executeGetApi(url, token, params);
    }

    @Override
    public ResultVo<Object> executeDeleteApi(String feignName, String url, String token, Object params) {
        DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
        return dynamicService.executeDeleteApi(url, token, params);
    }

    @Override
    public ResultVo<Object> executePutApi(String feignName, String url, String token, Object params) {
        DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
        return dynamicService.executePutApi(url, token, params);
    }

}

  1. openFeign 调用接口
package com.wxapp.dispatch.api.feign;

import com.wxapp.common.constant.constant.LoginConstant;
import com.wxapp.common.constant.vo.ResultVo;
import org.springframework.cloud.openfeign.SpringQueryMap;
import org.springframework.web.bind.annotation.*;

public interface DynamicFeign {

    @PostMapping("{url}")
    ResultVo<Object> executePostApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @RequestBody Object params);

    @GetMapping("{url}")
    ResultVo<Object> executeGetApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @SpringQueryMap Object params);

    @DeleteMapping("{url}")
    ResultVo<Object> executeDeleteApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @RequestBody Object params);

    @PutMapping("{url}")
    ResultVo<Object> executePutApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @RequestBody Object params);
}

FeignClientFactory 设计

  1. FeignClientFactoryService
package com.wxapp.dispatch.service.service;

/**
 * @Description
 * @Datetime 2023/5/25 16:13
 * @Modified By
 */
public interface FeignClientFactoryService<T> {


    public T getFeignClient(final Class<T> type, String serviceId);
}

  1. FeignClientFactoryServiceImpl
package com.wxapp.dispatch.service.service.impl;

import com.wxapp.dispatch.service.service.FeignClientFactoryService;
import org.springframework.cloud.openfeign.FeignClientBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

@Service
public class FeignClientFactoryServiceImpl<T> implements FeignClientFactoryService<T> {

    private FeignClientBuilder feignClientBuilder;

    public FeignClientFactoryServiceImpl(ApplicationContext appContext) {
        this.feignClientBuilder = new FeignClientBuilder(appContext);
    }

    @Override
    public T getFeignClient(final Class<T> type, String serviceId) {
        return this.feignClientBuilder.forType(type, serviceId).build();
    }
}

http 调用工具类

package com.wxapp.common.util.http;

import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.constant.vo.HttpRequestVo;
import com.wxapp.common.core.filter.except.ThrowMyException;
import com.wxapp.common.util.check.CheckUtil;
import org.springframework.http.HttpEntity;
import org.springframework.web.client.RestTemplate;

import java.lang.reflect.Field;
import java.util.Map;

/**
 * @Description http请求工具包
 * @Datetime 2022/4/23 11:56
 * @Modified By
 */
public class HttpRequestUtil {

    private static final RestTemplate restTemplate = new RestTemplate();

    /**
     * TODO get请求
     */
    public static <T> Map request(HttpRequestVo<T> requestVo) {
        try {
            var url = urlAppendValue(requestVo.getUrl(), requestVo.getUrlAppend());
            url = urlAppendValue(url, requestVo.getData());
            String exchange;
            HttpEntity<T> httpEntity = new HttpEntity<>(requestVo.getData(), requestVo.getHeader());
            if (CheckUtil.isNotNullObject(requestVo.getUrlAppend())) {
                exchange = (String) restTemplate.exchange(url, requestVo.getMethod(), httpEntity, String.class, requestVo.getUrlAppend()).getBody();
            } else {
                exchange = restTemplate.exchange(url, requestVo.getMethod(), httpEntity, String.class).getBody();
            }
            var map = JacksonUtil.parseObject(exchange, Map.class);
            if (valiResult(map, requestVo)) {
                throw new ThrowMyException(requestVo.getErrMassage());
            }
            return map;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * TODO: url 处理
     */
    private static <T> String urlAppendValue(String url, T urlAppend) {
        StringBuffer urlApendValue = new StringBuffer();
        if (CheckUtil.isNotNullObject(urlAppend)) {
            if (urlAppend instanceof Map) {
                Map uls = (Map) urlAppend;
                var set = uls.keySet();
                for (var en : set) {
                    urlApendValue.append(en).append("=").append("{").append(en).append("}&");
                }
                return url + "?" + (urlApendValue.toString()).substring(0, urlApendValue.length() - 1);
            }
            var keySet = urlAppend.getClass();
            var fields = keySet.getFields();
            for (Field field : fields) {
                var name = field.getName();
                urlApendValue.append(name).append("=").append("{").append(name).append("}&");
            }
            return url + "?" + (urlApendValue.toString()).substring(0, urlApendValue.length() - 1);
        }
        return url;
    }

    public static <T> Boolean valiResult(Map map, HttpRequestVo<T> requestVo) {
        String code = (String) map.get(requestVo.getStatusName());
        return CheckUtil.isNullString(code) || !code.equals(requestVo.getSuccCode());
    }
}

http 请求工具类封装

package com.wxapp.common.constant.vo;

import com.wxapp.common.base.util.jackson.JacksonUtil;
import lombok.Data;
import org.springframework.http.HttpMethod;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import java.util.Map;

/**
 * @Description 第三方请求封装类
 * @Datetime 2023/4/7 11:17
 * @Modified By
 */
@Data
public class HttpRequestVo<T> {
    /**
     * 请求url
     */
    private String url;
    /**
     * 请求数据
     */
    private T Data;
    /**
     * 请求头
     */
    private MultiValueMap<String, String> header;
    /**
     * url 拼接参数 key 参数名称 value 参数值
     */
    private Map urlAppend;
    /**
     * 成功码
     */
    private String succCode;
    /**
     * 状态码字段
     */
    private String statusName;
    /**
     * 失败信息
     */
    private String errMassage;
    /**
     * 请求方式
     */
    private HttpMethod method;

    public HttpRequestVo() {
    }

    public HttpRequestVo(String url, HttpMethod method, T data, Object header, Object urlAppend, String succCode, String statusName, String errMassage) {
        this.url = url;
        this.method = method;
        Data = data;
        this.succCode = succCode;
        this.statusName = statusName;
        this.errMassage = errMassage;
        if (urlAppend != null && !"".equals(String.valueOf(urlAppend))) {
            try {
                this.urlAppend = JacksonUtil.parseObject(urlAppend, Map.class);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (header != null && !"".equals(String.valueOf(header))) {
            try {
                this.header = JacksonUtil.parseObject(JacksonUtil.toJSONString(header), MultiValueMap.class);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public MultiValueMap getHeader() {
        if (this.header == null) {
            return new LinkedMultiValueMap<>();
        }
        return this.header;
    }

    public void setHeader(String header) throws Exception {
        if (header == null || "".equals(header)) {
            return;
        }
        this.header = JacksonUtil.parseObject(header, MultiValueMap.class);
    }

    public void setUrlAppend(Map urlAppend) {
        this.urlAppend = urlAppend;
    }

    public void setUrlAppend(String urlAppend) throws Exception {
        if (urlAppend == null || "".equals(urlAppend)) {
            return;
        }
        this.urlAppend = JacksonUtil.parseObject(urlAppend, Map.class);
    }

}