ThreadPoolExecutor源码学习

发布时间 2023-12-27 21:55:40作者: 东方来客

Java构建线程的方式

  1. 集成Thread
  2. 实现Runnable
  3. 实现CallAble
  4. 线程池方式
    1. Java提供了Executors创建(不推荐,不方便进行控制)
    2. 推荐手动创建线程池ThreadPoolExecutor。

ThreadPoolExecutor参数

  1. int corePoolSize 核心线程数
  2. int maximumPoolSize 最大线程数,最大减核心是非核心线程。
  3. long keepAliveTime 最大空闲时间
  4. TimeUnit unit 时间单位
  5. BlockingQueue workQueue 阻塞队列
  6. ThreadFactory threadFactory, 线程工厂
  7. RejectedExecutionHandler handler 拒绝策略

线程池的执行流程

image

拒绝策略在线程池无法接受新任务时会被执行,例如当线程池已满并且任务队列也已满时。

线程池属性

// ctl的高3位代表线程池状态,低29位代表线程池线程个数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 即29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2的29次方减1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;


// 运行状态保存在高三位中
// -1的补码(绝对值取反加1)高三位为111,表示正常接受任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 代表线程池不接受新任务,但仍会处理阻塞队列和正在进行的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 不接收新任务,也不会处理阻塞队列中的任务,中断正在执行的任务。
private static final int STOP       =  1 << COUNT_BITS;
// 010 代表线程池即将关闭到达TERMINATED状态
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 线程池最终状态,已关闭。
private static final int TERMINATED =  3 << COUNT_BITS;


// 得到线程池状态
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// 获得线程池中线程数量
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

线程池状态

image

execute 执行过程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. 如果当前运行的线程少于核心线程数(corePoolSize),
     * 则尝试使用给定的任务启动一个新线程。通过调用addWorker原子性
     * 地检查runState和workerCount,防止在不应该添加线程时出现误报,
     * 从而返回false。
     *
     * 2. 如果任务可以成功排队,那么我们仍然需要再次检查是否应该添加线程
     * (因为自上次检查以来现有线程已经死亡),或者线程池是否在
     * 进入此方法后关闭。因此,我们重新检查状态,
     * 在stopped状态必要时则回滚排队,如果线程为空则启动一个新线程。 
     *
     * 3. 如果无法排队任务, 那么我们尝试添加一个新线程。
     *  如果失败, 我们会知道线程池已关闭或饱和,因此执行reject。
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 创建核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 线程池如果正在运行,将任务放到阻塞队列,在执行signal信号后运行
    if (isRunning(c) && workQueue.offer(command)) { // 此为创建核心线程失败时
        int recheck = ctl.get();
        // 不在运行时将任务移除掉并执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 并发条件下此时任务可能为空
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
    }

addWorker()

不断尝试获取线程池的控制状态并检查是否可以添加新的工作线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    // 尝试获取线程池的控制状态并检查是否可以添加新的工作线程。
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
            // 如果线程池处于SHUTDOWN状态并且满足以下条件之一:已经处于STOP状态、有待执行的任务或工作队列为空,则返回false
            return false;
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                // 如果工作线程数量达到核心线程数或最大线程数的限制,则返回false
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry; // 返回到retry处
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                // 如果CAS操作失败是由于workerCount更改,则继续内部循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    // 尝试创建新的工作线程,并在必要时将其添加到线程池中。
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 如果线程池正在运行或者处于非STOP状态且没有待执行的任务,
                    // 则检查线程状态是否为NEW,
                    // 否则抛出IllegalThreadStateException异常
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    // 将工作线程添加到集合中
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        // 更新最大线程池大小
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 标记线程为已启动
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回工作线程的状态
    return workerStarted;
}

线程的运行 runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循环获取任务,task不为空执行task,否则从阻塞队列中获取任务。
        while (task != null || (task = getTask()) != null) {
            w.lock();
            
            // 检查线程池是否正在停止或线程是否被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||  // 即将关闭或已关闭,即TIDYING或者TERMINATED
                 (Thread.interrupted() &&             // 测试是否被中断
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())                  // 获取中断状态
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    // 执行任务
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                // 增加已完成任务的数量。
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}