ThreadPoolExecutor

发布时间 2023-12-28 20:39:20作者: dongxp
title: ThreadPoolExecutor
date: 2022-08-20 20:47:02
permalink: /pages/674c3d/
details: 源码阅读
categories:
  - JAVA
  - JDK
tags:
  - 源码阅读
author:
  name: dongxinping
  link: https://gitee.com/dxpchina

ThreadPoolExecutor

线程池

1. 实例化

1.1 最终构造器

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1.2 解读参数

  • corePoolSize 核心线程数,保持常活跃.
    • 当设置了allowCoreThreadTimeOut, 核心线程也会被释放
  • maximumPoolSize 当前线程池允许的最大线程数
  • keepAliveTime 线程数大于核心数(corePoolSize)时启用,多余的空闲线程在终止前等待新任务的最长时间
  • unit 时间单位,为 keepAliveTime 服务
  • workQueue 通过 execute 投递的任务将会放在这个队列中,慢慢消费(通过采用有界队列)
  • threadFactory 线程池创建新线程的工厂类
  • handler 当且仅当 maximumPoolSize 饱和和 workQueue 达到最大值时启用,定义如何处理新投递的任务

1.3 核心属性解读

		private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;  //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池的状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 获取状态码
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
   // 获取正在运行的线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
  • ctl 两种含义, 高三位表示线程池的状态, 低 29 位表示活跃的线程数量

2.任务投递关键方法

2.1 execute

往线程池中提交任务,也是整个线程池最关键的方法

2.1.1 源码

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
  // 线程数是否小于核心数
        if (workerCountOf(c) < corePoolSize) {
          // 创建核心线程并运行
            if (addWorker(command, true))
                return;  // 投递成功直接结束
            c = ctl.get();
        }
  // 线程池运行中, 尝试添加到缓存队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
          // 二次判断线程池是否运行, 若没有则移除任务,并触发拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
          // 缓存队列添加成功了, 判断工作线程是不是空的, 是就创建一个非核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
  // 队列添加失败(一般是队列满了), 尝试创建新的非核心线程来运行
        else if (!addWorker(command, false))
          // 执行拒绝策略, 调用 rejectedExecution 方法
            reject(command);
    }

2.1.2 流程图

graph TB A(execute中投递任务) Z(投递完成) B{判断核心数是否满了} A--->B D[尝试创建核心线程来运行] B--没有-->D B-->是-->F E{是否成功} D--->E E--是-->Z F[将任务丢进任务队列] E--否-->F G{是否成功} F--->G I[当工作线程为0,创建一个非核心线程] G--是-->I I-.->Z H[尝试创建非核心线程运行] G==否==>H J{创建是否成功} H-->J J--是-->Z K[执行拒绝策略] J--否-->K

由流程看出: 任务优先度是 coreThread -> workQueue -> maximumPoolSize ->

RejectedExecutionHandler(过程中出现异常或者线程池非运行状态都会运行)

2.2 addWorker

两个参数, Runnable firstTask, boolean core

创建新的线程的入口方法

  • firstTask 创建新线程是投的的任务,可为null
  • core 是否是核心线程
  1. 判断线程池是否将要关闭了,是直接返回 false
  2. 判断工作线程数是否大于等于核心线程数或者总的线程数, 是直接false
  3. 工作线程数数加1
  4. 调用 threadFactory 创建线程,并将其放到 works 线程池中
  5. 放入成功则直接启动线程, 否则线程数减1

3. 拒绝策略

默认提供了4种

  • AbortPolicy 直接抛出异常, 默认的就是中,通过6个参数的构造函数可看到
  • CallerRunsPolicy 让投递这个任务的线程自己去执行这个任务
  • DiscardPolicy 直接丢弃刚刚投递进来的任务, 什么也不做
  • DiscardOldestPolicy 从队列中放弃最早投进去的任务,将新任务加进去

4. 几种常见的队列

注意使用无边界的队列,容易内存溢出

  • PriorityBlockingQueue优先级阻塞队列, 无边界. 队列会根据优先级排序
  • LinkedBlockingQueue 链表,通过构造器限定队列大小, 默认 Integer.MAX
  • SynchronousQueue 可认为队列大小为1, 放入必须有人take
  • ArrayBlockingQueue 基于数组的队列, 内部维护了一个putIndex,达到环形数组的效果