【Java 线程池】【七】ScheduledThreadPoolExecutor提交任务原理

发布时间 2023-04-13 06:41:18作者: 酷酷-

1  前言

前面我们主要看了ThreadPoolExecutor线程池。包括线程池内部有哪些核心的参数、每个参数的含义,通过向线程池提交任务的execute方法的内部逻辑以及执行流程是什么,通过FutureTask获取任务执行结果,以及阻塞、唤醒调用线程和线程池内部的工作者Worker的工作原理,线程池的预热、关闭、其它核心方法等等。那么本节我们开始来研究另外一个线程池ScheduledThreadPoolExecutor,这个具有ThreadPoolExecutor的一切功能,但是同时它又具有任务调度的功能,包括延迟任务调度,定时任务调度等等。
还有我们最开始介绍线程池的时候的例子,我们回忆下:

public class ScheduledThreadPoolTest {
    // 创建一个调度线程池
    public static ScheduledExecutorService createThreadPool() {
        ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(
                // 线程池的核心线程数,配置为3
                3,
                // 创建线程的工厂,test-schedule-pool为线程池的名称
                new DefaultThreadFactory("test-schedule-pool"),
                // 任务拒绝策略,线程资源不足的时候,策略是“使用调用线程池的线程来执行”
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        return executor;
    }

    static class DefaultThreadFactory implements ThreadFactory {
        private String name;
        private final AtomicInteger nextId = new AtomicInteger();
        DefaultThreadFactory(String name) {
            this.name = name;
        }
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(null, r, name + "-" + nextId.incrementAndGet());
        }
    }


    public static void main(String[] args) {
        // 获取调度线程池
        ScheduledExecutorService executor = createThreadPool();
        // 打印当前时间
        System.out.println("当前时间:" + new Date());
        // 立即执行的任务
        Runnable nowTask = () -> {
            System.out.println("当前时间:" + new Date() + "------执行【立即】任务");
        };
        // 延迟执行的任务
        Runnable delayTask = () -> {
            System.out.println("当前时间:" + new Date() + "------执行【延迟】任务");
        };
        // 周期性执行的任务
        Runnable periodTask = () -> {
            System.out.println("当前时间:" + new Date() + "------执行【周期性】任务");
        };

        // 提交立即任务,有线程空闲立即执行
        executor.execute(nowTask);
        // 提交一次性延迟任务,延迟2秒执行
        executor.schedule(delayTask, 2, TimeUnit.SECONDS);
        // 提交周期性的延迟任务,10秒后每3秒执行一次
        executor.scheduleWithFixedDelay(periodTask, 10, 3, TimeUnit.SECONDS);
    }
}

2  ScheduledThreadPoolExecutor 源码

2.1  接口及构造方法

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor线程池,同时实现了ScheduleExecutorService接口,具有任务定时调度的功能。

// 继承了ThreadPoolExecutor线程池,具有ThreadPoolExecutor的一切功能
// 同时具有ThreadPoolExecutor所有的核心参数,核心线程数、最大线程数、线程空闲存活时间、阻塞队列、拒绝策略等等
// 实现了ScheduleExecutorService接口,具有任务调度的功能
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
}

再看下ScheduledThreadPoolExecutor的构造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    // 内部通过调用父类ThreadPoolExecutor的构造方法来构造线程池
    // 注意:这里设置了最大线程数maximumPoolSize为Integer.MAX_VALUE
    // 注意:这里设置了阻塞队列为内部实现的延迟队列DelayedWorkQueue (这个我们后面再研究)
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

再看下其它的构造方法,内部也是通过调用父类ThreadPoolExecutor的构造方法来构建线程池:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    // 内部调用父类ThreadPoolExecutor来构建线程池
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    // 内部调用父类ThreadPoolExecutor的构造方法来构建线程池
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

其实ScheduledThreadPoolExecutor内部也是通过ThreadPoolExecutor来构建线程池的,只不过基于ThreadPoolExecutor之上再进行拓展了,实现了一些任务调度的功能。

2.2  execute 提交立即执行任务

我们来看下ScheduledThreadPoolExecutor提交任务的方法:

public void execute(Runnable command) {
    // 内部调用了schedule方法去提交任务
    // schedule 方法有三个参数:
    // 第一个参数,就是要运行的任务
    // 第二个参数,就是要延迟运行的时间,由于是execute要求是提交之后又线程空闲立即执行,所以这里延迟时间是0
    // 第三个参数,延迟时间的时间单位,这里是纳秒
    schedule(command, 0, NANOSECONDS);
}

2.2.1  schedule 提交一个调度任务

// schedule方法有三个参数
// 第一个参数,表示要运行的任务
// 第二个参数,表示任务要延迟执行的时间,如果要求立即执行,可以传0
// 第三个参数,表示延迟执行的时间单位,毫秒、秒、分、小时等等
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // 这里的逻辑其实就是将传入的任务command封装成一个ScheduledFutureTask对象
    // FutureTask之前我们深入分析过了,这里ScheduleFutureTask具有FutureTask的一切功能,同时还具有延迟时间属性
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    // 这里就是核心的逻辑了,通过这个方法向线程池提交任务
    delayedExecute(t);
    return t;
}

2.2.2  delayedExecute 提交任务

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果线程池关闭了,不能再提交任务,直接执行拒绝策略
    if (isShutdown())
        reject(task);
    else {
        // 获取阻塞队列,也就是延迟队列DelayedWorkQueue
        // 将任务放入到延迟阻塞队列中
        super.getQueue().add(task);
        // 再次进行校验,如果线程池关闭了
        // 则刚提交的任务需要从队列移除,然后执行cancel方法取消这个任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 如果当前线程数 < corePoolSize,就创建出来一个新线程
            ensurePrestart();
    }
}
// 这里就是进行线程预热
// 核心逻辑就是如果当前线程数量小于corePoolSize或者为0;启动一个新线程
// 毕竟任务提交到阻塞队列了之后,需要线程从队列中不断取出任务来执行
void ensurePrestart() {
    // 计算当前线程数量
    int wc = workerCountOf(ctl.get());
    // 如果当前线程数量 < corePoolSize,则调用addWorker方法创建出来一个工作者,也就是创建一个线程
    if (wc < corePoolSize)
        // 注意这里创建新worker的时候,传入的firstTask为null
        // 也就是新线程创建后没有给与要运行的任务,让它自己从阻塞队列取任务出来运行
        addWorker(null, true);
    // 同样,如果当前线程数为0,也需要创建出新线程出来
    else if (wc == 0)
        addWorker(null, false);
}

ScheduledThreadPoolExecutor线程池的execute方法源码看完了,我们再看下其它的方法:

2.3  submit 方法

public Future<?> submit(Runnable task) {
    // 内部也是通过schedule方法向提交一个任务,上面讲解过了
    // 这里task是任务,0是延迟时间,说明要求立即执行,NANOSECONDS是时间单位
    return schedule(task, 0, NANOSECONDS);
}

2.4  scheduleAtFixedRate 提交一个延迟定时任务

// 这个方法是提交一个延迟的定时任务,具有定时执行的周期
// 第一个参数:command就是要运行的任务
// 第二个参数:initialDelay就是第一次执行任务延迟多久
// 第三个参数:定时运行任务的周期,如果配置为0那么就只会执行一次,如果不是0,那么就每隔period执行一次任务
// 第四个参数:时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
     // 这里构建一个ScheduledFutureTask任务出来,具有FutureTask的特性,同时具有延迟任务、周期任务的属性
     // 这里的period就是定时任务的周期,initialDelay是延迟属性,第一次执行延迟多少时间,后面就是每隔period执行一次
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 这里内部也是通过delayedExecute方法向线程池提交定时(只不过这里是定时任务)
    delayedExecute(t);
    return t;
}

上面的execute、schedule、submit、scheduleAtFixRate方法,我们画个图理解一下:

上面就是向ScheduledThreadPoolExecutor线程池提交任务的流程了:
(1)execute、submit方法提交任务时候,底层都是调用schedule方法提交任务,只不过传入的延迟时间是0而已。
(2)schedule、scheduleAtFixRate方法底层都是需要将传入的任务封装成一个ScheduleFututeTask任务,这个任务具有延迟属性、定时周期属性等等
(3)最终底层都是调用delayedExecute方法提交任务,首先是将任务提交到阻塞队列里面,然后判断当前线程数量小于corePoolSize或者为0,则需要创建线程出来。

2.5  ScheduledThreadPoolExecutor 与 ThreadPoolExecutor 提交任务的区别

ThreadPoolExecutor提交任务流程
(1)首先判断当前线程数量小于corePoolSize,则创建新线程,将任务交给这个新线程执行
(2)判断如果当前小程数大于等于corePoolSize,则将任务尝试放入阻塞队列中
(3)如果步骤(2)放入阻塞队列失败,则可能阻塞队列满了,再次创建创建新线程
(4)如果此时线程数量小于maximumPoolSize,则可以创建新线程,任务交给新线程运行
(5)如果线程数量达到了maximumPoolSize,则不再运行创建新线程,执行线程拒绝策略
ScheduleThreadPoolExecutor提交任务流程
(1)将任务构建成ScheduleFutureTask对象,赋予延迟时间、周期执行时间等属性
(2)将任务放入DelayedWorkQueue延迟阻塞队列
(3)判断当前线程数量为0,或者当前线程数量 < corePoolSize则创建新线程
对比不同点:
(1)ThreadPoolExecutor提交任务的时候,在线程数量小于corePoolSize的时候创建新线程,并且将任务交给新线程执行,不会直接进入阻塞队列。
但是ScheduledThreadPoolExecutor线程池提交任务的时候,统统先放入阻塞队列,后面再用线程从中取出来执行
(2)ThreadPoolExecutor线程池的线程数量在阻塞队列满了的时候,是可以增长到maximumPoolSize大小的。
但是ScheduledThreadPoolExecutor创建线程入口方法是ensurePrestart(),这里只有线程数量小于corePoolSize的时候才会创建线程,也就是说线程池最大只能增长到corePoolSize大小。

2.6  提交任务小结

最后我们画个图,整体理解一下ScheduledThreadPoolExecutor线程池提交任务,执行任务:

上面这个图就是ScheduledThreadPoolExecutor线程池提交任务,执行任务的流程图了,可能你会想,比如我们的任务为什么要封装成ScheduledFutureTask,以及它是怎么具有延迟属性、周期属性的,以及DelayedWorkQueue是怎么实现延迟阻塞功能的,哈哈,这个我们下节会深入看ScheduleFutureTask这个类,以及DelayedWorkQueue是怎么实现延迟队列的。

3  小结

这节我们主要看了ScheduledThreadPoolExecutor线程池提交任务、执行任务的源码,当然还没有看完,我们下节继续,有理解不对的地方欢迎指正哈。