ThreadPoolExecutor源码分析

发布时间 2023-04-04 20:11:18作者: _mcj

1.ctl说明

ctl是线程池的状态控制,他是一个原子整数,主要记录当前线程池状态和当前线程池的工作数量。其实一个32位的整数,其中前三位记录的是当前线程池的状态,后29为记录的是当前工作的线程数量,也就是线程池目前支持的最大工作线程数量为0001 1111 1111 1111 1111 1111 1111 1111 = 536870911个。
其具体的代码如下:

    // 当前线程池的运行状态和当前线程池的工作线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 位数,Integer.SIZE的值为32所以减3等于29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 工作线程的容量,1左移29位为:0010 0000 0000 0000 0000 0000 0000 0000,然后减一值为:0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    //这五个为线程池的运行状态
    //RUNNING状态 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    //SHUTDOWN状态 0000 0000 0000 0000 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP状态 0010 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING状态 0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED状态 0110 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;

    /**
     * 计算当前线程池的运行状态
     * ~CAPACITY值为:1110 0000 0000 0000 0000 0000 0000 0000
     * 按位与的话就是取前三位的值,后面全为0,即为当前线程池的运行状态
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    /**
     * 计算当前线程池的工作线程数量
     * CAPACITY值为:0001 1111 1111 1111 1111 1111 1111 1111
     * 按位与的话就是取后29位的值,前面全为0,即为当前线程池的工作线程数量
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 计算ctl的值,按位或,同为0的时候才为0,只要有一个为1就为1
    private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的四种状态说明:

状态 值(前三位) 说明
RUNNING 111 此状态的线程池能接收新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOWN 000 不能接收新提交的任务,但是可以继续处理阻塞队列中已保存的任务
STOP 001 不能接收新的任务,也不能处理队列中的任务,会中断正在处理任务的线程
TIDYING 010 所有的任务都已经终止了,workerCount(有效线程数)为0
TERMINATED 011 terminated()方法执行完毕后进入该状态

这五个线程池状态的大小比较为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
这个排序在后面的源码中用到的会比较多

线程池五种状态的转换图:

2.源码分析

execute源码:
从线程池的运行过程来看,可以首先从execute的源码来看起:


public void execute(Runnable command) {
	// 判断如果执行的任务为空,则直接抛出空指针异常
	if (command == null)
		throw new NullPointerException();
	// 获取当前线程池的状态和工作线程数量
	int c = ctl.get();
	// 当前线程池的工作线程数量小于核心线程数
	if (workerCountOf(c) < corePoolSize) {
		// 添加工作线程,以当前任务作为第一个要执行的任务,true表示以核心线程数量作为绑定,成功则直接返回,失败继续向下执行
		if (addWorker(command, true))
			return;
		// 重新获取线程池的状态和工作线程数量,防止线程池的状态发生改变
		c = ctl.get();
	}
	// 判断当前线程池是运行状态,并且阻塞队列未满,未满时会将新的加到队尾
	if (isRunning(c) && workQueue.offer(command)) {
		// 再次获取线程池的运行状态和工作线程数量,防止线程池状态发生改变
		int recheck = ctl.get();
		// 如果线程池的状态不是运行状态并且将刚才加入的任务从阻塞队列中移除成功
		if (! isRunning(recheck) && remove(command))
			// 将当前任务拒绝
			reject(command);
		// 工作线程数量为0
		else if (workerCountOf(recheck) == 0)
			// 添加一个工作线程,无任务,false表示以最大线程数量作为绑定
			addWorker(null, false);
	}
	// 当前任务作为第一个要执行的任务,false以最大线程数量作为绑定时添加工作线程时失败
	else if (!addWorker(command, false))
		// 将当前任务拒绝
		reject(command);
}

源码解释:

  • execute的代码逻辑首先会比较当前线程池的工作线程数量,如果工作线程数量小于核心线程数的话,则会调用addWorker()添加一个,然后返回,如果添加失败的话,则会重新获取线程池的状态和工作线程数量。
  • 接着重新判断线程池的状态是否为运行状态,运行状态的话,则会看阻塞队列是否未满,未满的话则会向阻塞队列中添加一个
    • 会重新获取当前线程池的状态,判断当前线程池的状态是否为运行状态,如果不是运行状态的话,则会从阻塞队列中将添加的任务去掉,并将当前任务拒绝掉
    • 如果不满足上面的条件则会判断当前工作线程数量是否为0,如果当前没有工作线程的话,则会使用addWorker以最大工作线程为上限创建一个新的
  • 如果当线程池的状态不是运行状态,或者则色队列已满时,并且使用addWorker以最大工作线程为上限添加新的工作线程失败的话,则会执行拒绝策略。

execute流程图:
image

addWorker源码:
从上面的execute的源码可以看到,其中比较重要的就是addWorker()这个方法了。其具体的源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (;;) {
		// 获取当前线程池的状态和工作线程数量
		int c = ctl.get();
		// 获取当前线程池的状态
		int rs = runStateOf(c);
		/**
		 * 这里的条件可以转化为下面的这些条件
		 * 判断当前线程池的状态是否大于SHUTDOWN,也就是为STOP,TIDYING,TERMINATED这三个状态。
		 * 或者当前线程池的状态为SHUTDOWN并且firstTask != null
		 * 或者当前线程池的状态为SHUTDOWN并且workQueue.isEmpty()
		 * rs > SHUTDOWN || (rs == SHUTDOWN && firstTask != null) || (rs == SHUTDOWN && workQueue.isEmpty())
		 * 当为上面的这些条件时则会返回false,worker添加失败
		 */
		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   firstTask == null &&
			   ! workQueue.isEmpty()))
			return false;

		for (;;) {
			// 获取当前线程池的工作线程数量
			int wc = workerCountOf(c);
			// 判断如果当前线程池的工作线程数量大于最大线程池的数量或者(大于当前绑定的核心线程数量或最大线程数量),此时也是返回false
			if (wc >= CAPACITY ||
				wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
			// 符合条件,worker添加成功,则会结束自旋,执行下面的逻辑
			if (compareAndIncrementWorkerCount(c))
				// 工作线程添加成功,结束自旋
				break retry;
			// 重新获取线程池的状态和工作线程数量
			c = ctl.get(); 
			// 判断当前的线程池状态是否与刚开始自旋时的状态,如果不是的话则重新进行外部自旋,如果还是相等的话则会继续进行内自旋
			if (runStateOf(c) != rs)
				// 当前线程池的状态与刚开始自旋时的状态不一致,则重新进行外部自旋
				continue retry;
		}
	}

	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		// 创建一个新的线程执行当前任务,在初始化Worker时通过线程工厂创建线程:getThreadFactory().newThread(this);
		w = new Worker(firstTask);
		// 获取上面创建的线程
		final Thread t = w.thread;
		// 如果线程不为空,表示创建成功。
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			// 尝试获取锁,获取锁后再执行后面的逻辑,防止在向其中加入线程时出现一些并发的问题
			mainLock.lock();
			try {
				// 获取线程池的运行状态
				int rs = runStateOf(ctl.get());
				// 判断当前线程池是RUNNING状态或者线程池为SHUTDOWN状态,但是其firstTask为null(此时就是当前线程并没有要执行的任务)
				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && firstTask == null)) {
					// 检测线程是否处于存活状态,也就是当前新创建的线程已经执行过start()方法
					if (t.isAlive())
						throw new IllegalThreadStateException();
					// 将当前工作线程添加到当前工作线程集合中
					workers.add(w);
					// 获取当前工作线程的数量
					int s = workers.size();
					// 比较当前最大工作线程数量与当前工作线程数量大小,以最大的为准
					if (s > largestPoolSize)
						largestPoolSize = s;
					// 将添加工作线程标记设置为成功
					workerAdded = true;
				}
			} finally {
				// 加入成功或者失败都将锁释放掉
				mainLock.unlock();
			}
			// 如果添加工作线程成功,则将当前工作线程启动,并将线程启动标志设置为true
			if (workerAdded) {
				t.start();
				workerStarted = true;
			}
		}
	} finally {
		// 如果工作线程启动标志还是false状态,则将之前的一些操作进行回滚
		if (! workerStarted)
			addWorkerFailed(w);
	}
	// 返回工作线程的启动标志
	return workerStarted;
}

源码解释:

  • 这边首先会进行自旋判断是否满足下面的三个条件:
    case1:线程池的状态为STOP,TIDYING,TERMINATED这三个状态。
    case2:当前线程池的状态为SHUTDOWN并且firstTask不为null。
    case3:当前线程池的状态为SHUTDOWN并且阻塞队列为空。
    当满足这三种条件的任何一种时则会返回false,结束自旋,否则继续执行
  • 随后进入内部自旋,在内部自选中,首先判断当前线程池的最大的线程容量CAPACITY是否大于最大容量或者设置的(核心线程数或者最大线程数量),如果大于则会返回false,结束自旋,否则继续向下执行
  • 如果当前线程容量小于(核心线程数量或者最大线程数量)则会添加一个worker工作线程,如果添加成功则跳出自旋继续执行自旋后的逻辑,添加失败则继续向下执行
  • 再次判断线程池的状态是否为刚开始进入自旋时的线程池的状态,如果一样则继续执行内部自旋,如果不一致则跳到外部自旋重新执行
  • 自旋后的逻辑,首先会根据当前任务创建一个工作线程,创建成功之后,则会对当前线程尝试加锁,防止并发产生一些问题
  • 接着判断如果为一下两种情况则会继续执行:
    case1:线程池的状态为RUNNING状态
    case2:线程池状态为SHUTDOWN状态,并且当前任务为null
    如果为上面两种情况任一情况都会接着继续向下执行。
  • 接着判断当前任务创建的线程是否处于alive状态,处于该状态的话说名该线程已经start了,则抛出异常
  • 将新创建的工作线程添加到工作线程集合中,然后判断工作线程集合的数量是否大于最大工作线程数量,大于的话则更新最大工作线程数量,随后更新标记
  • 添加成功之后进行解锁
  • 判断是否添加成功,添加成功则启动线程,更新工作线程开始状态
  • 最后判断一下工作线程是否成功启动,如果没有启动的话则添加工作线程失败,将前面的操作回滚。然后返回工作线程是否启动标志。

addWorker流程图:
image
说明:其中红色的条件为或的关系,只要满足其中一条条件即可。

Worker的构造函数源码:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
        ...
}

源码分析:
从上面的源码能够看出其构造函数通过调用线程工厂来创建新的线程,用来执行任务。最主要的是run()方法中执行的runWorker()方法,这个方法是运行工作线程的逻辑。

runWorker源码:
从上面的Worker的构造函数源码可以看出,其中最重要的就是runWorker这一部分的代码,具体的源码如下:

/** 运行工作线程       */
final void runWorker(Worker w) {
	// 获取当前线程
	Thread wt = Thread.currentThread();
	// 该工作线程要执行的任务
	Runnable task = w.firstTask;
	w.firstTask = null;
	w.unlock(); // allow interrupts
	boolean completedAbruptly = true;
	try {
		// 当前要执行的任务不为空或者从任务队列中获取到的要执行的任务不为空,进行自旋
		while (task != null || (task = getTask()) != null) {
			// 尝试获取锁,获取锁后执行后面的逻辑
			w.lock();
			/**
			 * 满足下面的两种条件则会执行wt.interrupt(),当前线程打上中断标记
			 * case1:当前线程池的状态大于等于STOP状态,也就是为STOP,TIDYING,TERMINATED这三个状态,并且当前线程没有被标记中断
			 * case2:当前线程是中断状态(清除中断标志)并且线程池的状态为STOP,TIDYING,TERMINATED状态,并且当前线程没有被标记中断
			 */
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				wt.interrupt();
			try {
				// 前置处理
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					// 真正执行任务的地方
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					// 后置处理
					afterExecute(task, thrown);
				}
			} finally {
				// 任务执行接触,任务置空,能够被GC,完成任务数量加一,解锁,从任务等待队列中获取下一个要执行的任务
				task = null;
				w.completedTasks++;
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		processWorkerExit(w, completedAbruptly);
	}
}

源码分析:

  • 首先先解锁,使其能够可中断
  • 判断当前要执行的任务是否为空,为空的话则从阻塞队列中获取下一个要执行的任务,此处自旋
  • 自旋中的逻辑,首先加锁,防止并发的影响,随后判断如果为下面两种情况:
    case1:当前线程池的状态为STOP,TIDYING,TERMINATED这三个状态并且当前线程被标记中断
    case2:当前线程是中断状态(清除中断标记)并且线程池的状态为STOP,TIDYING,TERMINATED状态,并且当前线程没有被标记中断
    当为上面两种情况的任何一种情况是则会执行wt.interrupt()为当前线程打上中断标记
  • 然后进行真正的运动线程,在运行前会及逆行相应的前置处理,运行后会进行相应的后置处理
  • 运行之后任务置空,已完成任务嘉义,解锁

runworker流程图:
image
说明:其中红色部分表示或关系的判断,只要满足其中一个条件即可。

PS:interrupt(),interrupted(),isInterrupted()的区别:

调用interrupt()方法仅仅是在当前线程中打了一个停止的标记,并不是真正的停止线程

interrupted()测试当前线程是否已经是中断状态,执行后具有清除中断状态flag的功能

isInterrupted()测试线程Thread对象是否已经是中断状态,但不清除中断状态flag 。

getTask源码:
从runWorker的源码来看,其中比较重要的就是其从阻塞队列中获取下次要执行的任务getTask()的代码,其中具体的一些设置的存活时间限制也是在该方法中进行判断的,具体代码如下:

/** 从任务队列中获取要执行的任务   */
private Runnable getTask() {
	// 超时标记,是否对线程进行超时处理
	boolean timedOut = false;

	for (;;) {
		int c = ctl.get();
		// 获取当前线程池的运行状态
		int rs = runStateOf(c);

		/**
		 * 满足下面的条件则会将当前工作线程数量减一,返回null
		 * case1: 线程池的状态为STOP,TIDYING,TERMINATED
		 * case2: 线程池的状态为SHUTDOWN并且阻塞队列中的任务为空
		 */
		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
			decrementWorkerCount();
			return null;
		}
		// 获取当前线程池的工作线程数
		int wc = workerCountOf(c);

		// 标记是否进行超时处理,允许核心线程超时设置为true或者当前工作线程数大于核心线程数
		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		
		/**
		 * 当符合下面四种情况中的一个时,则会将当前工作线程数量减一,返回null
		 * case1: 当前工作线程数量大于最大线程数量
		 * case2: 当前工作线程数量大于最大线程数量并且阻塞队列为空
		 * case3: 超时控制并且上次从阻塞队列中获取任务发生了超时并且当前工作线程数量大于1
		 * case4: 超时控制并且上次从阻塞队列中获取任务发生了超时并且阻塞队列为空
		 */
		if ((wc > maximumPoolSize || (timed && timedOut))
			&& (wc > 1 || workQueue.isEmpty())) {
			if (compareAndDecrementWorkerCount(c))
				return null;
			// 工作线程数量减一失败,则重新进行循环重试
			continue;
		}

		try {
			/**
			 * poll(keepAliveTime, TimeUnit.NANOSECONDS): 获取阻塞队列中排在首位的对象,如果不能立即取出,则可以等待keepAliveTime设置的时间,取不到则返回null
			 * take(): 获取阻塞队列中排在首位的对象,如果阻塞队列为空,阻断进入等待状态,直到阻塞队列有新的任务加入取到为止
			 *
			 */
			Runnable r = timed ?
				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
				workQueue.take();
			if (r != null)
				return r;
			// r为null,获取阻塞队列超时
			timedOut = true;
		} catch (InterruptedException retry) {
			timedOut = false;
		}
	}
}

源码分析:

  • 首先会进入自旋,然后判断是否满足下面的两种情况:
    case1:线程池的状态为STOP,TIDYING,TERMINATED
    case2:线程池的状态为SHUTDOWN并且阻塞队列中的任务为空
    满足上面两种情况其中一种情况则会将工作线程的数量减一,返回null。
  • 随后会获取当前线程池的工作线程数量,获取是否进行超时处理的标记
  • 接着会判断是否满足下面的几种情况:
    case1:当前工作线程数量大于最大线程数量
    case2:当前巩固走线程数量大于最大线程数量并且阻塞队列为空
    case3:超时控制并且上次从阻塞队列中获取任务发生了超时并且当前工作线程数量大于1
    case4:超时控制并且上次从阻塞队列中获取任务发生了超时并且阻塞队列为空
    只要满足上面其中一种情况都会将当前工作线程数量减一,减一成功则会返回null,减一失败则会重新进行自旋。
  • 接着根据是否超时标志,来分别通过poll()和take()方法来获取任务
  • 如果获取到任务则返回该任务,如果没获取到则设置从阻塞队列中获取任务超时标记

getTask流程图:

image

3.总结

从上面的分析可以看出其中比较重要的几个方法分别是,execute,addWorker,runWorker,getTask这几个方法,这几个方法概括threadPoolExecute线程池的大致流程,首先会判断当前线程池的工作线程是否小于核心线程数,小于的话则会直接创建一个线程来执行该任务,如果不小于的话则会将当前任务加入到阻塞队列中,如果阻塞队列已满,则会判断当前工作线程数量是否大于最大线程数量,如果小于的话,则会创建个临时线程来执行这个任务,临时线程的存活时间就是参数种设置的存活时间,如果不小于最大线程数量的话,则会执行拒绝策略将该任务给拒绝掉。另外,需要注意的是如果设置了允许核心线程超时的话,那么核心线程也会有相应的存活时间。