【Java 线程池】【三】ThreadPoolExecutor提交任务流程,execute源码分析

发布时间 2023-04-11 21:09:33作者: 酷酷-

1  前言

上一节我们看了线程池的状态和数量的计数以及提供的拒绝策略,这节我们就要看线程池的运行原理,那么在了解原理之前不知道大家对线程池有没有这样的疑问:
比如线程池的线程数量是怎么进行增长的?随着任务的不断提交,线程池中的线程数量什么时候线程数量达到corePoolSize?
什么时候线程数达到maximumPoolSize?线程数量达到了maximumPoolSize之后会怎么样?
什么时候任务才会进入到阻塞队列?阻塞队列满了之后会怎么样?
什么时候触发线程拒绝策略的执行?那么这节我们就来结合ThreadPoolExecutor线程池的execute提交任务方法,里面的源码流程揭开这其中的奥秘。

2  execute方法

先看一下向线程池提交任务的方法execute方法源码:

public void execute(Runnable command) {
    // 如果提交任务为null,直接抛出异常,没啥好说的,就是不允许提交空任务
    if (command == null)
        throw new NullPointerException();
    // 这里获取线程池的控制变量ctl (该变量同时存储了线程池状态、线程池大小)
    int c = ctl.get();
    // workerCountOf(c)位运算方法得到当前线程池的线程数量,这里上一章讲过这个方法
    if (workerCountOf(c) < corePoolSize) {
          // 如果当前线程数量 < corePoolSize,也就是线程数还没达到设置的核心线程数
          // 这里就调用addWorder方法,创建一个新的线程,然后将command任务交给这个新线程去运行
        if (addWorker(command, true))
            // 走到这里说明addWorder返回true,说明创建新线程成功,任务提交成功,直接返回
            return;
        // 走到这里说明addWorker失败了,说明创建新线程失败
        c = ctl.get();
    }
    // 走到这里有两种情况:
    // (1) 当前线程数量 >= corePoolSize
    // (2) 当前线程数量 < corePoolSize 但是addWorker创建新线程失败了,几乎不会发生,可以忽略
    // isRunning(c) 判断当前线程池是否处于RUNNING状态,也就是是否还正常运行
    // 如果正常运行,调用workQueue.offer(command)方法将任务放入阻塞队列
    // 注意:阻塞队列在上一篇非常详细的讲解过了,这里的offer方法在容量满的时候不会阻塞调用线程
    if (isRunning(c) && workQueue.offer(command)) {
        // 如果任务放入阻塞队列成功,重新获取线程池控制状态ctl
        int recheck = ctl.get();
        // !isRunning(recheck) ==true 表示当前线程池关闭了,可能就在这个时候有别人关闭了线程池
        // 需要调用remove方法将刚放入队列的任务取出,执行reject拒绝策略
        // 表示线程池关闭了,不能提交新任务了,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 走到这里说明当前线程池肯定是RUNNING状态
        // workerCountOf计算一下当前线程数,如果是0,说明没有线程,需要调用addWorker方法创建新线程
        else if (workerCountOf(recheck) == 0)
            // 新创建线程,由于command任务放入阻塞队列了,所以此时传给新线程任务为null
            // 新线程自己会从阻塞队列取任务来执行
            addWorker(null, false);
    }
    // 走到这里说明:阻塞队列任务提交失败了,说明阻塞队列满了
    // 这个时候再次去创建新线程,只要当前线程数 < maximumPoolSize就可以创建成功
    // 如果创建成功,将command任务交给新线程运行
    else if (!addWorker(command, false))
        // 走到这里说明新线程创建失败,阻塞队列也满了
        // 没办法,此时线程池饱和了,只能执行拒绝策略了
        reject(command);
}

这里大体说明一下addWorder方法的参数和意义:
(1)command表示创建新线程,如果创建成功将command任务交给新线程运行
(2)true的意思表示是 当前线程数量 小于 corePoolSize才允许创建新线程;false表示当前线程数量 < maximumPoolSize才允许创建新线程
我们来画个图理解一下:

根据上图,我们总结一下外部线程A向执行线程池的execute方法,向线程池提交任务的时候,会经历的流程:
(1)首先计算当前线程池的线程数,如果 当前线程数 < corePoolSize (核心线程数),则会新创建线程,然后将任务交给这个新线程执行,创建成功直接返回,创建新线程失败则继续下一步
(2)当前线程 >= corePoolSize 的时候,判断当前线程池状态是否是RUNNING,如果是则优先会尝试将任务方进入阻塞队列。
如果放入阻塞队列成功,会进行一些线程池判断,比如如果再次判断线程池状态是否是RUNNING。
如果不是说明线程池关闭了,直接将刚放入队列的task取出,执行拒绝策略。比如再次判断当前线程是否为0,如果是0则需要创建线程出来执行任务。
(3)当前线程数 >= corePoolSize,并且当前任务放入阻塞队列失败的时候,则会再次执行addWorker方法,尝试再去创建线程。
只要当前线程数 < maximumPoolSize 并且 线程池为RUNNING才会创建成功,否则创建新线程失败,说明当前线程池饱和了,执行拒绝策略。

那我们来看下上面的疑问:

(1)随着任务的不断提交,线程池中的线程数量什么时候线程数量达到corePoolSize?
当前线程数 < corePoolSize时候,提交新任务的时候会不断创建新的线程出来,然后将任务交给这个新的线程执行,直到当前线程数等于 corePoolSize 为止。
(2)什么时候任务才会进入到阻塞队列?
当线程数量 >= corePoolSize的时候,这个时候提交任务,会优先将任务尝试放入阻塞队列
(3)阻塞队列满了之后会怎么样?
当线程数量 >= corePoolSize,任务会优先进入阻塞队列,这个时候不断的提交任务到阻塞队列,阻塞队列满了之后,会尝试再去创建新的线程出来。
(4)什么时候线程数达到maximumPoolSize?
当线程数量 >= corePoolSize,阻塞队列满了之后,就会尝试创建新的线程,这个时候线程数量逐渐增多,会达到maximumPoolSize数量
(5)线程数量达到了maximumPoolSize之后会怎么样?
当线程数量达到了maximumPoolSize之后,提交任务会尝试放进阻塞队列,如果放入成功则直接返回。如果放入失败,则此时已不能再创建新的线程,线程池已经饱和了,就会执行拒绝策略。
(6)什么时候触发线程拒绝策略的执行?
看了execute方法的的代码,我们知道提交任务的时候,可能触发拒绝策略执行的场景有:

  • 提交任务的时候线程池的状态不是RUNNING,会直接触发拒绝策略执行
  • 提交任务的时候线程池还是RUNNING状态,但是此时阻塞队列满了,线程数量也达到了maximumPoolSize,也会触发拒绝策略执行

3  addWorker方法

上面看完了execute方法之后,我们再继续看下addWorker方法,这个是创建工作者,也就是创建一个新线程的方法,内部是个什么逻辑。

// addWorker方法由两个参数
// firstTask 表示新创建线程的时候,给这个线程的第一个任务是什么,
// 如果不是null,则新线程创建好后会立马执行这个任务
// 如果传入是null,新线程自己会从阻塞对了取任务出来执行

// core 是核心的意思,表示创建线程的数量限制是 核心线程数(corePoolSize)
// 当core == turu,只有当前线程数 < corePoolSize才允许创建新线程
// 当core == false,当前线程数 < maximumPoolSize 才允许创建新线程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 无限循环,会不断进行重试
    for (;;) {
        // 获取当前线程池的控制变量(该变量存储了线程池状态、线程数量)
        int c = ctl.get();
        // 计算得到当前线程池状态
        int rs = runStateOf(c);

        // 这里返回false,也就是不允许创建新线程的情况有
        // 1. 状态非RUNNING,也就是线程池关闭了,并且workQueue非空
        // 2. 状态非RUNNING,并且firstTask 非空
        // 3. 状态非RUNNING,并且非SHUTDOWN,可能为STOP、TIDYING、TERMINATED
        // 以上集中情况说明线程池关闭了,不允许再创建新线程出来,直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 无限循环,里面进行cas操作,直到成功
        for (;;) {
            // 获取当前线程数量
            int wc = workerCountOf(c);
            // 如果线程数量大于等于CAPACITY,则线程数量达到了线程池能记录的上线,不允许创建了
            // 因为线程数量使用ctl的低29位表示,超过2^29次方ctl变量表示不了了
            if (wc >= CAPACITY ||
                // 这就是上面core参数的控制逻辑了
                // 当core == true,当前线程舒朗 < corePoolSize才允许创建新线程
                // 当core == false,当前线程数量 < maxumumPoolSize才允许创建新线程
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 走到这里说明线程数量未达到上限,允许创建新线程
            // 则执行cas操作,将当前线程数量+1
            if (compareAndIncrementWorkerCount(c))
                // 如果cas成功,跳出此次循环
                break retry;
            // 走到这里说明cas失败,重试
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 这里就是创建一个新的工作者Worker出来,将firstTask作为这个工作者的第一个任务
        // 这个工作者内部包含一个线程
        w = new Worker(firstTask);
        // 获取工作者内部的工作线程
        final Thread t = w.thread;
        // 如果工作线程非空,下面的操作可能是将Worker放入HashSet容器了,加锁,释放锁、线程池状态校验之类的
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 进行加锁,互斥锁
            mainLock.lock();
            try {
                // 获取当前线程池状态
                int rs = runStateOf(ctl.get());
                // 1.如果线程池状态为RUNNING,表示线程池状态成功,可以继续
                // 2. 如果线程池为SHUTDOWN,表示线程池关闭中,不接受新任务,但是会继续执行已经提交的任务
                // 并且 firstTask == null 表示不是新任务,此时允许创建新线程,新线程会从阻塞队列取任务来执行
                // 满足上面的1、2两种情况可以创建新线程
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 满足上面的校验之后,就会将新创建的工作者放入容器中,就是放入HashSet中
                    workers.add(w);
                    // 当前线程池工作者数量
                    int s = workers.size();
                    // 更新一下largestestPoolSize,也就是线程池达到过的最大数量是多少
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 更新一下标志变量,表示创建成功
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 启动工作者的内部线程,设置启动成功标识为true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果启动失败了,执行失败操作,肯定是将worker从HashSet容器移除,线程数量减少一个
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回是否创建成功、启动成功标识
    return workerStarted;
}

我们还是画个图来理解一下:

3  小结

好了,这节主要看了execute方法内部的执行流程以及addWorker方法内部原理,有理解不对的地方欢迎指正哈。