【Java 线程池】【二】ThreadPoolExecutor 参数详解及拒绝策略

发布时间 2023-04-10 15:41:18作者: 酷酷-

1  前言

上一节我们对线程池以及它的基本使用做了介绍,后面我们就开始分析线程池内部的原理了,内部的核心流程是怎么设计的,那么这节我们先来分析一下ThreadPoolExecutor这个线程池的有哪些属性,以及这些属性代表什么意思。

2  线程池状态和线程数量的表示

首先来看下ThreadPoolExecutor内部有哪些属性:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 使用一个Int类型的32位同时表示: 线程池状态、线程数量
    // 其中高3位表示线程池状态,也就是0~2位表示线程池状态
    // 低29位也就是3~31位表示线程数量,最多能容纳2^29 - 1 约为500多万个线程
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Integer.SIZE = 32, COUNT_BITS = 32 - 3 = 29
    // 这里的意思就是使用29位表示线程数量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程池的容量为 CAPACITY = 2^29 - 1 约为500多万,(1 << COUNT_BITS也就是1左移29位也就是2的29次方)
    // 这里CAPACITY对应的32二进制位为 000 11111111111111111111111111111(高3位全为0,低29位全为1)
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    // -1 左移29位得到 111 00000000000000000000000000000
    // 其中高3位也就是111表示当前线程池为运行状态RUNNING
    private static final int RUNNING = -1 << COUNT_BITS;
    // 0 左移动29位得到 000 00000000000000000000000000000
    // 其中高3位为000表示当前线程池为状态为SHUTDOWN
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    // 1 左移动29位得到 001 00000000000000000000000000000
    // 其中高3位为001表示当前线程池为状态为STOP
    private static final int STOP = 1 << COUNT_BITS;
    // 2 左移动29位得到 010 00000000000000000000000000000
    // 其中高3位为010表示当前线程池为状态为TIDYING
    private static final int TIDYING = 2 << COUNT_BITS;
    // 3 左移动29位得到 011 00000000000000000000000000000
    // 其中高3位为011表示当前线程池为状态为TERMINATED
    private static final int TERMINATED = 3 << COUNT_BITS;
}

上面的高3位表示线程池状态,低29表示线程池大小,如下图:

 对于高3位的值,也就是线程池的状态可以通过runStateOf方法计算得到:

private static int runStateOf(int c)     {
    // 1. 这里的c就是传入ctl的值
    // 2. ~CAPACITY 的32位值是 111 00000000000000000000000000000(高3位为1,低29位为0)
    // 这样c & ~ CAPACITY操作就能保留c的高三位值
    // 由于使用ctl的高3位表示线程池状态,这样也就得到了线程池的状态
    return c & ~CAPACITY;
}

同理,对于计算当前线程池的数量,只需要计算出ctl的低29的值即可:

private static int workerCountOf(int c)  {
    // CAPACITY的32位为 000 11111111111111111111111111111(高3位全为0,低29位全为1)
    // c与CAPACITY进行与运算,会得到低29位结果,也就是得到当前线程数量
    return c & CAPACITY;
}

同样,当线程池状态、线程池大小更新之后,如何更新ctl的值;只需要 (线程池状态 | 线程池大小),这两个进行或操作就可以了:

private static int ctlOf(int rs, int wc) {
    // rc表示线程池状态(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED)
    // wc表示当前线程池大小
    // rc | wc 进行或操作就可以得到ctl的值(相当于得到一个新的数组,高3位就是rc的值,低29位是wc的值)
    return rs | wc;
}

使用一个ctl变量同时表示线程池状态、线程池大小,同时为了得到线程池状态、线程池大小提供了一些位运算的方法来得到。

3  线程池状态种类和意义

线程池的RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED这5个状态分别表示:

  • RUNNING:表示运行中,处于此状态的线程池,能接收新的任务(用户能提交新任务),对于已经接收的任务,会继续处理,这个是线程池正常的情况。
  • SHUTDOWN:不再接收新的任务,对于已经接收的任务会继续处理完毕
  • STOP:不再接收新的任务,对积压在阻塞队列的任务也不再处理,对于正在运行的任务的线程,会进行中断。
  • TIDYING:不再接收新的任务,所有的任务都会被终止,释放线程池中的线程池,直到线程数量为0
  • TERMINATED:终止状态,不接受、不运行、线程池被关闭了

4  线程池的其它属性

我们继续看下线程池内部的其它属性:

// 核心线程数
private volatile int corePoolSize;
// 最大线程池数
private volatile int maximumPoolSize;
// 创建线程的工厂
private volatile ThreadFactory threadFactory;
// 阻塞队列,用户存放任务的容器
private final BlockingQueue<Runnable> workQueue;
// 线程容器,Worker就是工作者,一个Worker内部封装了一个Thread线程
// 这里使用HashSet进行存储,表示工作者容器,线程容器
private final HashSet<Worker> workers = new HashSet<Worker>();
// 互斥锁,由于上面的HashSet不是并发安全的,所以操作的时候肯定需要上锁
private final ReentrantLock mainLock = new ReentrantLock();
// 等待条件
private final Condition termination = mainLock.newCondition();
// 当前线程数 > corePoolSize时候,这部分多出的线程在空闲多久之后会被销毁掉
private volatile long keepAliveTime;
// 是否允许核心线程超时
// 如果是false,核心线程不会被销毁
// 如果是true,核心线程在keepAliveTime时间后依然空闲,则会被销毁掉
private volatile boolean allowCoreThreadTimeOut;
// 一个标记变量,标记曾经线程池的最大大小
// 举例:corePoolSize = 5, maximumPoolSize = 10
// 线程池最大的时候有过8个线程,那么largestPoolSize = 8,只是标记一些曾经的最大值
private int largestPoolSize;
// 一个统计变量,线程池已经完成的任务数量
private long completedTaskCount;
// 线程池拒绝策略
// 当线程池饱和(阻塞队列满了,存不进新的任务;同时线程池的数量达到了maximumPoolSize无法再增加新线程)的时候提交新任务会触发此策略的执行
// 当线程池状态为SHUTDOWN,表示我要关闭了,不再接收新任务;此时向线程池提交新任务也会触发此策略的执行
private volatile RejectedExecutionHandler handler;

// 默认的拒绝策略,直接抛出异常(Abort是中止的意思)
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

5  线程池拒绝策略

我们说下这个拒绝策略,首先看下执行拒绝策略的方法入口:

final void reject(Runnable command) {
    // reject方法是线程池执行拒绝策略的入口
    // 这里的handler就是拒绝策略
    // rejectedExecution方法就是拒绝策略内部的逻辑
    handler.rejectedExecution(command, this);
}

然后看下ThreadPoolExecutor线程池提供了哪些拒绝策略:

5.1  AbortPolicy中止策略

public static class AbortPolicy implements RejectedExecutionHandler {
    
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 这里直接抛出一个异常,也就是线程池饱和或者线程池SHUTDOWN的时候直接抛出异常
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

5.2  DiscardPolicy 抛弃最新的任务

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }
    // r是最新提交的任务
    // 看这里什么都不干,就是将最新提交的任务忽略了,不会执行它,相当于将它丢弃了
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

5.3  DiscardOldestPolicy 丢弃阻塞队列中最久的任务

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果当前线程池未关闭,也就是正常运行的
        if (!e.isShutdown()) {
            // 从阻塞队列中poll出一个任务,按照先进先出的方式
            // 也就是当前阻塞队列头部的任务,是这个阻塞队列中最先进去的,最老的意思
            e.getQueue().poll();
            // 抛弃最老的任务之后,调用e.execute(r)将当前任务交给线程池
            e.execute(r);
        }
    }

5.4  CallerRunsPolicy 使用调用者线程运行任务策略

public static class CallerRunsPolicy implements RejectedExecutionHandler {
  
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果当前线程池状态不是SHUTDOWN,也就是正常的
        if (!e.isShutdown()) {
            // 直接执行任务的run方法
            // 注意,这个时候不是使用线程池里面的线程,是直接使用你当前的线程去执行
            r.run();
        }
    }
}

上面就是ThreadPoolExecutor线程池提供的4种拒绝策略了,默认是使用AbortPolicy策略。这四种策略都实现了RejectedExecutionHandler接口,如果你需要定制自己的执行策略,直接实现RejectedExecutionHandler接口就好了,然后在构建线程池的时候将你的拒绝策略传入即可。

对于上面的线程池的其它核心参数,我们后面会一一来说的哈。

6  小结

本节我们先分析线程池的控制变量ctl(同时表示线程池状态、线程池大小)、还有有哪些属性、哪些拒绝策略,我们一步一步来,后面会根据源码实际串这些参数的哈,有理解不对的地方欢迎指正哈。