【Java 线程池】【九】Timer定时器、ScheduleThreadPoolExecutor延迟调度、时间轮算法对比

发布时间 2023-04-14 06:49:23作者: 酷酷-

1  前言

这节我们来讲点别的东西,专门来分析一下定时任务、延迟任务的实现。

2  对比分析

前面我们讲解的ScheduledThreadPoolExecutor,这个线程池可以进行延迟任务、定时任务的调度,底层依赖的是DelayedWorkQueue这个阻塞队列。DelayedWorkQueue这个延迟阻塞队列内部的原理:
(1)内部使用数组来存储数据,底层依赖延迟时间、sequenceNumber作为优先级进行比较,将底层的数组构建成一个小顶堆
(2)每次插入元素、取出元素之后都会再进行一次堆调整,重新调整成小顶堆
既然是小顶堆,那应该之后每次插入和删除元素之后,进行堆调整耗费,每次堆调整的时间复杂度是O(logN),N代表堆中数据的大小。也就是说插入N个元素大概耗时N(logN)。
而DelayQueue延迟队列底层采用的是PriorityQueue 优先级队列来进行存储数据的。那PriorityQueue队列底层又是通过什么来存储的呢?数组、链表、hash表?PriorityQueue优先级队列肯定是设计了排序规则,它底层到底采用了什么排序算法,时间复杂度又是多少呢?我们就来看看。

public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {
    // 底层也是采用数组的方式来进行存储数据的
    transient Object[] queue;
    // 初始化默认的容量,默认数组的长度是11
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    // 队列的大小
    private int size = 0;
    
    // 插入元素的方法
    public boolean add(E e) {
        return offer(e);
    }
    // 实际插入元素的方法
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        // 修改次数+1(关联并发修改异常,ConcurrentModificationException)
        modCount++;
        // 获取当前元素的个数
        int i = size;
        // 如果元素达到数组容量上限,进行扩容
        if (i >= queue.length)
            // 扩容
            grow(i + 1);
        // 大小+1
        size = i + 1;
        // 如果数组内没存有元素,插入元素位于数组头部
        if (i == 0)
            queue[0] = e;
        else
            // 看到这里是不是很熟悉,这里就是跟之前DelayedWorkQueue一样的堆调整算法
            siftUp(i, e);
        return true;
    }
    // 插入元素之后,进行堆调整
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }
    // 这里就是实际进行堆调整的逻辑,原理跟之前讲解的DelayedWorkQueue一模一样的
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

其实这个PriorityQueue优先级队列跟DelayedWorkQueue很多地方都太像了,很重要的一点就是这两个队列底层都是采用堆排序,每一次调整的时间复杂度是O(logN):
(1)PriorityQueue允许用户自定义排序逻辑,传入一个Comparator接口的实现;
然而DelayedWorkQueue延迟阻塞队列内部已经写死了排序规则,就是先根据延迟时间、然后再根据sequenceNumber
(2)两个队列底层都是使用了数组来存储数据,而且都是采用了堆排序、堆调整的思想构建小顶堆,时间复杂度都是O(logN)
(3)区别在于DelayedWorkQueue是阻塞队列保证并发安全,每次操作需要进行加锁,而PriorityQueue不需要进行加锁
之前我们ScheduledThreadPoolExecutor内部的流程图:

我们再来看下Timer定时器,跟ScheduledThreadPoolExecutor原理差不多。
它底层也是采用一个阻塞队列来存储任务,同时这个队列具有延迟时间优先级的功能。不一样的是Timer定时器内部只有一个线程,负责去任务、执行任务。
Timer、ScheduledThreadPoolExecutor这两个延迟、定时调度器的特点,他们有什么缺点呢?
(1)两个调度器底层都是使用堆排序、堆调整的方式。并不适合大规模的任务调度,时间复杂度上消耗比较高。
试想一下如果要进行十万、百万节的任务调度,那么总耗时O(NlogN),这个N的基数太大,会导致性能上的损耗比较高。
(2)取任务和执行任务的线程是同一个;对于ScheduledThreadPoolExecutor线程池来说,线程负责从阻塞队列里面取任务出来,同时取到任务之后负责执行。
如果线程数量太少,所有线程卡在执行任务过程中,会导致很多已经到了执行时间的任务没法如期执行。
(3)Timer的缺点更加明显,只有一个线程来负责所有任务的提取和执行,假如某一个任务执行时间太长,会导致后面的所有任务全部推迟执行
那如果要解决上面的问题,应该怎么做呢?
比如将执行任务的线程和取任务的线程分开:

如上图所示,将取任务的操作和执行任务的操作单独分开,分别独立一个线程池。如果任务量大的话可以配置任务执行线程池的线程数量大一点。
上面说的Timer、ScheduledThreadPoolExecutor底层采用堆排序的方式来进行任务优先级排列,每一次插入元素、取出元素之后都要进行堆调整,每次调整的的时间复杂度是O(logN)。
接下来我们讲解一种定时调度的算法,叫做时间轮算法,能做到O(1)的时间复杂度存、取。

如上图所示:
(1)毫秒级别的时间轮,每毫秒移动一个刻度,在毫秒级别有1000个格子,每次指针指向的格子就说明这个格子上所有定时任务到了执行时间,取出这个格子的所有任务,交给执行线程池执行
(2)通过数组的方式,做到O(1)时间复杂度插入,每次指针移动一次时间复杂度也是O(1)
上面的那个时间轮只能做到ms级别的延迟或者定时任务,那么如果我要调度一个5.5秒的延迟任务呢?
那就在新增一个秒级别的时间轮就可以了,如下图所示:

如上图:
(1)秒级别的时间轮,每秒移动一个刻度,取出刻度指向格子中所有的任务。
(2)发现5秒刻度的任务列表中,有一个还有500ms才能执行,那就把这个任务进行降级一个时间轮,丢到ms级别的时间轮里面。
样的道理,分钟级别、小时级别、天级别的定时任务都可以通过不断增加时间轮来解决,这样就能做到每次存取任务的时间复杂度都是O(1)级别的了。

3  小结

好了,这节我们主要看了下三者Timer定时器、ScheduleThreadPoolExecutor延迟调度、时间轮算法内部复杂度上的比较,特别是这个时间轮算法,很多中间件采用来这种算法来进行任务调度,其中就包括了大名鼎鼎的Kafka,有理解不对的地方欢迎指正哈。