Springboot+Quartz+Dynamic-datasource

发布时间 2023-09-05 15:11:10作者: YancyMauno

在使用dynamic-datasource多数据源切换场景下,实现Quartz任务持久化配置和API动态调度

1. pom依赖

暂未找到版本对应关系,若有版本不一致异常,请自行尝试升降版本。

    <dependencies>
        <!-- 动态数据源 -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.5.0</version>
        </dependency>
        <!-- quartz依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
            <version>2.5.4</version>
        </dependency>
    </dependencies>

2. 数据源和Quartz参数配置

点击查看代码
#主数据源
spring.datasource.dynamic.primary=main

db.primary.host=127.0.0.1
db.primary.username=root
db.primary.password=123456
db.primary.database=main
spring.datasource.dynamic.datasource.main.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.main.url=jdbc:mysql://${db.primary.host}/${db.primary.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=GMT%2B8
spring.datasource.dynamic.datasource.main.username=${db.primary.username}
spring.datasource.dynamic.datasource.main.password=${db.primary.password}

#次级数据源
db.slave.host=127.0.0.1
db.slave.username=root
db.slave.password=123456
db.slave.database=slave
spring.datasource.dynamic.datasource.slave.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.slave.url=jdbc:mysql://${db.slave.host}/${db.slave.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=GMT%2B8
spring.datasource.dynamic.datasource.slave.username=${db.slave.username}
spring.datasource.dynamic.datasource.slave.password=${db.slave.password}

# quartz持久化数据源
db.quartz.host=127.0.0.1
db.quartz.username=root
db.quartz.password=123456
db.quartz.database=quartz
spring.datasource.dynamic.datasource.quartz.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.quartz.url=jdbc:mysql://${db.quartz.host}/${db.quartz.database}?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&serverTimezone=GMT%2B8
spring.datasource.dynamic.datasource.quartz.username=${db.quartz.username}
spring.datasource.dynamic.datasource.quartz.password=${db.quartz.password}

#quartz 配置
spring.quartz.properties.timeZoneId = Asia/Shanghai
spring.quartz.properties.schedulerName=demoScheduler
spring.quartz.properties.quartzDataSourceName=quartz
spring.quartz.properties.schedulerContextKey=SERVICEX-APPLICATION-CONTEXT-KEY
spring.quartz.job-store-type=jdbc
spring.quartz.jdbc.initialize-schema=never
spring.quartz.auto-startup=false
spring.quartz.startup-delay=0
spring.quartz.overwrite-existing-jobs=true
spring.quartz.wait-for-jobs-to-complete-on-shutdown=true
org.quartz.scheduler.instanceName=demo
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.instanceIdGenerator.class=org.quartz.simpl.SimpleInstanceIdGenerator
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=false
org.quartz.jobStore.clusterCheckinInterval=10000
org.quartz.jobStore.useProperties=false
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=10
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

3. Job信息实体类

点击查看代码
package com.xx.xx.model;
import lombok.Data;

@Data
public class JobInfo {
    private String jobName;
    private String jobClassPath;//JobBean执行类的包路径如:com.xx.xx.DemoJob
    private String jobGroup;
    private String description;
    private String cron;
    private String timeZoneId;//设置成默认值Asia/Shanghai,并对前端调用隐藏
    private String status;//设置对前端调用隐藏,仅在查询Job时回显状态
}

4. JobService服务层接口

点击查看代码
package com.xx.xx.service;

import com.xx.xx.model.JobInfo;
import org.quartz.SchedulerException;
import org.springframework.web.bind.annotation.RequestBody;
import java.text.ParseException;
import java.util.List;

/**
 * @author : Mao
 * 该接口按需定义定时任务全生命周期方法
 * 包括  启动Scheduler、暂停Scheduler、关闭Scheduler、获取所有执行中的Jobs
 * 新增Job、更新Job、删除Job、暂停Job、恢复Job、暂停所有Jobs、恢复所有Jobs、立即执行一个Job(主要是用于执行一次任务的场景)
 * 注意: standbyScheduler将暂停所有Jobs(包括新添加的),pauseAllJobs将暂停所有已添加的Jobs(新添加的不受影响)
 */
public interface JobService {

    /**
     * 启动Scheduler
     */
    void startScheduler() throws SchedulerException;

    /**
     * 暂停Scheduler
     */
    void standbyScheduler() throws SchedulerException;

    /**
     * 关闭Scheduler
     */
    void shutdownScheduler() throws SchedulerException;

    /**
     * 获取所有执行中的Jobs
     *
     * @return 任务信息
     */
    List<JobInfo> getAllJobs() throws SchedulerException;

    /**
     * 新增一个Job
     *
     * @param jobInfo 任务信息
     */
    void addJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ClassNotFoundException, ParseException;

    /**
     * 更新一个Job
     *
     * @param jobInfo 任务信息
     */
    void updateJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ParseException, ClassNotFoundException;

    /**
     * 删除一个Job
     *
     * @param jobName  任务名称
     * @param jobGroup 任务组
     */
    void removeJob(String jobName, String jobGroup) throws SchedulerException;

    /**
     * 暂停一个Job
     *
     * @param jobName  任务名称
     * @param jobGroup 任务组
     */
    void pauseJob(String jobName, String jobGroup) throws SchedulerException;

    /**
     * 恢复一个Job
     *
     * @param jobName  任务名称
     * @param jobGroup 任务组
     */
    void resumeJob(String jobName, String jobGroup) throws SchedulerException;

    /**
     * 暂停所有Jobs
     */
    void pauseAllJobs() throws SchedulerException;

    /**
     * 恢复所有Jobs
     */
    void resumeAllJobs() throws SchedulerException;

    /**
     * 立即执行一个Job(主要是用于执行一次任务的场景)
     *
     * @param jobName  任务名称
     * @param jobGroup 任务组
     */
    void execJob(String jobName, String jobGroup) throws SchedulerException;

}

5. JobServiceImpl实现类

点击查看代码
package com.xx.xx.service.impl;

import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.xx.xx.model.JobInfo;
import com.xx.xx.service.JobService;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.ParseException;
import java.util.*;

@Service
public class JobServiceImpl implements JobService {

    Scheduler scheduler;
    @Value("${spring.quartz.properties.schedulerName}")
    private String schedulerName;
    @Value("${spring.quartz.properties.quartzDataSourceName}")
    private String quartzDataSourceName;
    @Value("${spring.quartz.properties.schedulerContextKey}")
    private String schedulerContextKey;
    @Value("${spring.quartz.properties.timeZoneId}")
    private String timeZoneId;

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("application.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

	//关键Bean配置quartz参数和数据源,注意DynamicRoutingDataSource的引用位置。
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(Properties quartzProperties, DynamicRoutingDataSource dataSource) throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
	//properties文件中的配置
        factory.setQuartzProperties(quartzProperties);
	//其他自定义配置需手动在此设置
        factory.setSchedulerName(schedulerName);
        factory.setDataSource(dataSource.getDataSource(quartzDataSourceName));
        factory.setApplicationContextSchedulerContextKey(schedulerContextKey);
        return factory;
    }

	//scheduler初始化完成,返回实例
    @Bean
    public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) throws IOException, SchedulerException {
        scheduler = schedulerFactoryBean.getScheduler();
        return scheduler;
    }

    @Override
    public void startScheduler() throws SchedulerException {
        scheduler.start();
    }

    @Override
    public void standbyScheduler() throws SchedulerException {
        scheduler.standby();
    }

    @Override
    public void shutdownScheduler() throws SchedulerException {
        scheduler.shutdown();
    }

    @Override
    public List<JobInfo> getAllJobs() throws SchedulerException {
        //Job list
        List<JobInfo> jobs = new ArrayList<>();
        List<String> jobGroups = scheduler.getJobGroupNames();
        Set<JobKey> jobKeySet = new HashSet<>();
        for (String jobGroup : jobGroups) {
            jobKeySet = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(jobGroup));
        }
        for (JobKey jobKey : jobKeySet) {
            List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
            for (Trigger trigger : triggers) {
                JobInfo jobInfo = new JobInfo();
                jobInfo.setJobName(jobKey.getName());
                jobInfo.setJobGroup(jobKey.getGroup());
                jobInfo.setJobClassPath(scheduler.getJobDetail(jobKey).getJobClass().getName());
                jobInfo.setDescription(scheduler.getJobDetail(jobKey).getDescription());
                jobInfo.setStatus(scheduler.getTriggerState(trigger.getKey()).name());
                if (trigger instanceof CronTrigger) {
                    jobInfo.setCron(((CronTrigger) trigger).getCronExpression());
                    jobInfo.setTimeZoneId(((CronTrigger) trigger).getTimeZone().getID());
                }
                jobs.add(jobInfo);
            }

        }
        return jobs;
    }

    @Override
    public void addJob(JobInfo jobInfo) throws SchedulerException, ClassNotFoundException, ParseException {
        //查询是否已有相同任务 jobKey可以唯一确定一个任务
        JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
        JobDetail jobDetail = scheduler.getJobDetail(jobKey);
        if (Objects.nonNull(jobDetail)) {
            throw new SchedulerException("Job already exist.");
        }
	//和执行具体业务的JobBean类分离
        Class<? extends Job> jobClass = null;
        if (!Job.class.isAssignableFrom(Class.forName(jobInfo.getJobClassPath()))) {
            throw new ClassNotFoundException("Class is not a Job.");
        } else {
            jobClass = (Class<? extends Job>) Class.forName(jobInfo.getJobClassPath());
        }

        //任务详情
        jobDetail = JobBuilder.newJob(jobClass)
                .withDescription(jobInfo.getDescription())  //任务描述
                .withIdentity(jobKey) //指定任务
                .build();

        //根据cron,TimeZone时区,指定执行计划
        CronScheduleBuilder builder = CronScheduleBuilder.cronSchedule(jobInfo.getCron())
                .inTimeZone(TimeZone.getTimeZone(timeZoneId));

        //触发器
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup())
                .startNow()
                .withSchedule(builder)
                .build();
        //添加任务
        scheduler.scheduleJob(jobDetail, trigger);
    }

    @Override
    public void updateJob(JobInfo jobInfo) throws SchedulerException, ClassNotFoundException {
        JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
        if (!CronExpression.isValidExpression(jobInfo.getCron()) || !scheduler.checkExists(jobKey)) {
            throw new JobExecutionException("Job Not Exist");
        }
        TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), jobInfo.getJobGroup());

        Class<? extends Job> jobClass = null;
        if (!Job.class.isAssignableFrom(Class.forName(jobInfo.getJobClassPath()))) {
            throw new ClassNotFoundException("Class is not a Job.");
        } else {
            jobClass = (Class<? extends Job>) Class.forName(jobInfo.getJobClassPath());
        }
        //任务详情
        JobDetail jobDetail = JobBuilder.newJob(jobClass)
                .withDescription(jobInfo.getDescription())  //任务描述
                .withIdentity(jobKey) //指定任务
                .build();

        //根据cron,TimeZone时区,指定执行计划
        CronScheduleBuilder builder = CronScheduleBuilder.cronSchedule(jobInfo.getCron())
                .inTimeZone(TimeZone.getTimeZone(timeZoneId));

        //触发器
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup())
                .startNow()
                .withSchedule(builder)
                .build();

        scheduler.rescheduleJob(triggerKey, trigger);
    }

    @Override
    public void removeJob(String jobName, String jobGroup) throws SchedulerException {
        //获取任务触发器
        TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
        //停止触发器
        scheduler.pauseTrigger(triggerKey);
        //移除触发器
        scheduler.unscheduleJob(triggerKey);
        //删除任务
        scheduler.deleteJob(JobKey.jobKey(jobName, jobGroup));
    }

    @Override
    public void resumeJob(String jobName, String jobGroup) throws SchedulerException {
        //根据jobName,jobGroup获取jobKey 恢复任务
        scheduler.resumeJob(JobKey.jobKey(jobName, jobGroup));
    }

    @Override
    public void pauseAllJobs() throws SchedulerException {
        scheduler.pauseAll();
    }

    @Override
    public void resumeAllJobs() throws SchedulerException {
        scheduler.resumeAll();
    }

    @Override
    public void pauseJob(String jobName, String jobGroup) throws SchedulerException {
        //根据jobName,jobGroup获取jobKey 暂停任务
        scheduler.pauseJob(JobKey.jobKey(jobName, jobGroup));
    }

    @Override
    public void execJob(String jobName, String jobGroup) throws SchedulerException {
        //根据jobName,jobGroup获取jobKey 立即执行任务
        scheduler.triggerJob(JobKey.jobKey(jobName, jobGroup));
    }
}


6. DemoJob执行具体业务的JobBean类

点击查看代码
package com.xx.xx.jobs;

import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.util.List;
import java.util.StringJoiner;

@Slf4j
@DisallowConcurrentExecution
public class DemoJob extends QuartzJobBean {

    //注入其他service要从Spring上下文获取,否则为null,因为当前Job类的实例化在JobServiceImpl中,未托管到Spring
    //@Autowired
    //xxService xxService = SpringContextUtil.getBean(xx.class);

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StringJoiner joiner = new StringJoiner(" | ")
                .add("---Executed Job---")
                .add(jobExecutionContext.getTrigger().getKey().getName())
                .add("Hello, the first demo job.")
                .add(new Date());
        log.info(String.valueOf(joiner));
    }
}

7. JobController动态管理任务

点击查看代码
package com.xx.xx.api;

import com.xx.xx.model.JobInfo;
import com.xx.power.service.JobService;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/job")
public class JobController {

    @Autowired
    JobService jobService;

    @GetMapping("/startScheduler")
    public String startScheduler() throws SchedulerException {
        jobService.startScheduler();
        return "Operation Succeeded.";
    }

    @GetMapping("/standbyScheduler")
    public String standbyScheduler() throws SchedulerException {
        jobService.standbyScheduler();
        return "Operation Succeeded.";
    }

    @GetMapping("/shutdownScheduler")
    public String shutdownScheduler() throws SchedulerException {
        jobService.shutdownScheduler();
        return "Operation Succeeded.";
    }

    @GetMapping("/getAllJobs")
    public List<JobInfo> getAllJobs() throws SchedulerException {
        return jobService.getAllJobs();
    }

    @PostMapping("/addJob")
    public String addJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ClassNotFoundException, ParseException {
        jobService.addJob(jobInfo);
        return "Operation Succeeded.";
    }

    @PostMapping("/updateJob")
    public String updateJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ParseException, ClassNotFoundException {
        jobService.updateJob(jobInfo);
        return "Operation Succeeded.";
    }

    @PostMapping("/removeJob")
    public String removeJob(String jobName, String jobGroup) throws SchedulerException {
        jobService.removeJob(jobName, jobGroup);
        return "Operation Succeeded.";
    }

    @PostMapping("/pauseJob")
    public String pauseJob(String jobName, String jobGroup) throws SchedulerException {
        jobService.pauseJob(jobName, jobGroup);
        return "Operation Succeeded.";
    }

    @PostMapping("/resumeJob")
    public String resumeJob(String jobName, String jobGroup) throws SchedulerException {
        jobService.resumeJob(jobName, jobGroup);
        return "Operation Succeeded.";
    }

    @GetMapping("/pauseAllJobs")
    public String pauseAllJobs() throws SchedulerException {
        jobService.pauseAllJobs();
        return "Operation Succeeded.";
    }

    @GetMapping("/resumeAllJobs")
    public String resumeAllJobs() throws SchedulerException {
        jobService.resumeAllJobs();
        return "Operation Succeeded.";
    }

    @PostMapping("/execJob")
    public String execJob(String jobName, String jobGroup) throws SchedulerException {
        jobService.execJob(jobName, jobGroup);
        return "Operation Succeeded.";
    }
}

8. 启动后访问API: /job/addJob路径,RequestBody输入JobInfo参数

{
"jobName":"testJob",
"jobClassPath":"com.xx.xx.jobs.DemoJob",
"jobGroup":"testJobGroup",
"description":"This is a test job.",
"cron":"0/5 * * * * ? "
}

9. 查询任务状态,调用API:/job/getAllJob,或自行从日志文件和日志表中查询