FutureTask源码分析

发布时间 2023-04-24 08:47:51作者: 无虑的小猪

1、Callable 与 FutureTask介绍

1.1、Callable

  创建线程有两种方式,一种是继承Thread类,一种是实现Runnable接口重写run方法。其实Thread也实现了Runable接口。

  在Runable接口中,仅有一个无参无返回结果的run方法。Runable接口详情:

1 @FunctionalInterface
2 public interface Runnable {
3     public abstract void run();
4 }

  Callable接口的功能与Runable接口类似,唯一不同的是Callable接口可以返回线程执行的结果并抛出异常,在Runable接口中,仅有一个无参有结果的call方法,可抛出异常。Callable接口详情:

1 @FunctionalInterface
2 public interface Callable<V> {
3     V call() throws Exception;
4 }

1.2、FutureTask

  Thread构造函数详情: 

  

  在Thread中,没有Callable类型的入参构造函数,只有Runable类型。需要一个实现了Runable接口的对象封装Callable,这个对象是FutureTask。有关FutureTask的详细介绍,在后文会做详细介绍。

2、Callable与FutureTask 的使用

  示例代码如下:

 1 import java.util.concurrent.Callable;
 2 import java.util.concurrent.FutureTask;
 3 import java.util.concurrent.TimeUnit;
 4 public class TestCallable {
 5 
 6     public static void main(String[] args) throws Exception {
 7         // 创建FutureTask
 8         FutureTask task = new FutureTask<>(new Callable<String>() {
 9             @Override
10             public String call() throws Exception {
11                 // sleep  5 秒
12                 TimeUnit.SECONDS.sleep(5);
13                 // 返回线程执行结果
14                 return Thread.currentThread().getName() + " == CumCallable == ";
15             }
16         });
17         // 启动线程
18         new Thread(task).start();
19         // 阻塞等待
20         Object o = task.get();
21         // 主线程执行
22         System.out.println(Thread.currentThread().getName() +  " == main ==");
23         System.out.println(o);
24     }
25 }

3、Callable与FutureTask源码分析

  FutureTask类图关系如下;

  

  FutureTask实现了Runable、Future接口。

1、Future

  Runable接口这里不做介绍了,主要看Future接口详情如下:

 1 // 操作线程任务
 2 public interface Future<V> {
 3 
 4     /**
 5      * 试图取消当前执行的任务
 6      *     如果任务已经执行完成、或已经被取消、或不能被取消,返回false
 7      *     如果任务还未启动,就已经被取消,该任务永远不会被运行         
 8      *     如果任务已经启动,mayInterruptIfRunning参数决定正在执行的线程是否被中断停止该任务
 9      */
10     boolean cancel(boolean mayInterruptIfRunning);
11 
12     /**
13      * 在任务正常执行完成前,判断其是否被取消
14      */
15     boolean isCancelled();
16 
17     /**
18      * 判断任务是否执行完成
19      */
20     boolean isDone();
21 
22     /**
23      * 一直阻塞等待,直到获取执行结果 
24      */
25     V get() throws InterruptedException, ExecutionException;
26 
27     /**
28      * 阻塞等待timeout时间,获取执行结果,阻塞时间timeout已到达,抛出异常
29      */
30     V get(long timeout, TimeUnit unit)
31         throws InterruptedException, ExecutionException, TimeoutException;
32 }

  Future接口提供了操作线程的方法,如取消当前执行的任务、获取执行的结果、判断是否执行完成等。Future对线程的任务做了增强处理。

2、RunnableFuture

  RunnableFuture继承Runable、Future接口,并重写了run方法。FutureTask实现此方法,并在run方法中调用了call方法,获取执行结果。
1 public interface RunnableFuture<V> extends Runnable, Future<V> {
2     // 线程执行未取消,通过此方法设置线程执行结果
3     void run();
4 }

3、FutureTask

  FutureTask是基于 CAS + state + WaitNode节点链表 实现的。CAS保证多线程场景下的原子性;state线程状态,控制代码的返回及执行流程;WaitNode节点链表记录挂起的线程。

1、属性

1.1、执行任务线程状态

  

1.2、线程执行结果、等待队列头 

  callable:构造函数中传入的Callable对象,用于执行call()方法;

  outcome:正常执行完,outcome记录执行结果;执行出现异常,outcome记录异常;

  runner:当前正在执行的线程;

  waiters:等待队列的头节点,单向链表的头。

  

1.3、WaitNode

  waitNode是一个节点,存储在单向链表中,用于记录等待执行的线程,便于当前线程执行完后,唤醒等待的线程。

2、构造函数

  FutureTask的构造函数入参支持Callable,也支持Runable。构造函数完成对callable、state属性的初始化操作。详情如下:  

  

  入参为Runable的构造函数是如何转换成Callable的呢?Executors#callable() 详情如下:

1 // Runable 转为 Callable,并返回入参 result
2 public static <T> Callable<T> callable(Runnable task, T result) {
3     // 非空判断
4     if (task == null)
5         throw new NullPointerException();
6     // 使用适配器,将Runable转换为Callable
7     return new RunnableAdapter<T>(task, result);
8 }

  RunnableAdapter采用了适配器模式,将Runable转换为Callable。我们来看看是怎么转换的。

  RunableAdapter 与 Runable 组合。RunableAdapter实现了Callable接口,重写了call方法,在该方法中实际执行的是Runable的run方法,并返回传入的result结果。

  

3、run() - 入口方法

1、流程图

 

2、run()源码分析

  FutureTask实现了Runable接口,从写了run()方法,run()作为线程执行的入口方法,先来看看具体做了哪些操作。

  FutureTask#run() 详情如下:

 1 public void run() {
 2     // 当前线程不为新建状态  或者 当前线程cas获取锁资源失败,返回
 3     if (state != NEW ||
 4         !UNSAFE.compareAndSwapObject(this, runnerOffset,
 5                                      null, Thread.currentThread()))
 6         return;
 7     try {
 8         // callable成员变量赋给局部变量
 9         Callable<V> c = callable;
10         // callable不为空,当前线程状态为 新建
11         if (c != null && state == NEW) {
12             // 执行结果
13             V result;
14             // 执行结束标识
15             boolean ran;
16             try {
17                 // 执行实现的call方法,并获取返回结果
18                 result = c.call();
19                 // 执行结束标识设置为true -> 正常执行
20                 ran = true;
21             } catch (Throwable ex) {
22                 // 执行异常,返回null
23                 result = null;
24                 // 执行结束标识设置为true -> 执行出现异常
25                 ran = false;
26                 // 设置异常信息
27                 setException(ex);
28             }
29             // 正常执行结束,存储执行结果
30             if (ran)
31                 set(result);
32         }
33     } finally {
34         // 线程释放锁资源
35         runner = null;
36         // 中断线程的处理
37         int s = state;
38         if (s >= INTERRUPTING)
39             handlePossibleCancellationInterrupt(s);
40     }
41 }

  在run()方法中,实际执行的是callable的call()方法,并用outcome成员变量存储线程执行的结果,若出现异常,outcome则存储异常信息。

1、正常执行结束

  线程正常执行完,将线程的执行结果设置到FutureTask的outCome属性中,FutureTask#set()方法。

 1 // 线程执行结果设置到outcome属性中
 2 protected void set(V v) {
 3     // 修改当前线程状态   NEW(新建) -> COMPLETING(运行中)
 4     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 5         // 线程执行结果赋值到成员变量outcome
 6         outcome = v;
 7         // 修改线程状态  COMPLETING(运行中) -> NORMAL(正常执行结束)
 8         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
 9         // 唤醒因get()操作挂起的线程
10         finishCompletion();
11     }
12 }

  线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> NORMAL 的变化,将结果赋值给outcome,唤醒因get()操作挂起的线程。

2、异常的处理

  线程执行过程中异常的处理,FutureTask#setException() 详情如下:

 1 // 设置异常信息
 2 protected void setException(Throwable t) {
 3     // cas操作,修改当前执行的线程状态  NEW(新建) -> COMPLETING(运行中)
 4     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 5         // 线程执行结果设置为异常
 6         outcome = t;
 7         // 修改当前执行的线程状态  COMPLETING(运行中) -> EXCEPTIONAL(异常)
 8         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
 9         // 唤醒因get()操作挂起的线程
10         finishCompletion();
11     }
12 }

  线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> EXCEPTIONAL 的变化,将异常赋值给outcome,唤醒因get()操作挂起线程。

3、唤醒等待队列中的挂起线程

  FutureTask#finishCompletion() 详情如下:

 1 // 移除并唤醒所有等待执行的线程
 2 private void finishCompletion() {
 3     // q执行waiters链表的头节点
 4     for (WaitNode q; (q = waiters) != null;) {
 5         // cas操作,将waiters设置为null,担心外部线程使用 cancel 取消当前任务触发  finishCompletion()
 6         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 7              // 自旋
 8              for (;;) {
 9                 // 获取当前Node封装的 thread
10                 Thread t = q.thread;
11                 // 当前线程不为  null
12                 if (t != null) {
13                     // 将当前等待节点的线程设置为null
14                     q.thread = null;
15                     // 唤醒当前节点因 get 操作阻塞的线程
16                     LockSupport.unpark(t);
17                 }
18                 // 获取下一个WaitNode
19                 WaitNode next = q.next;
20                 // 等待节点链表中没有待唤醒的线程,结束循环
21                 if (next == null)
22                     break;
23                     
24                 // 将下一个待唤醒的等待节点从  WaitNode链表中移除,便于GC
25                 q.next = null;
26                 q = next;
27             }
28             // 等待节点链表中没有待唤醒的线程,结束循环
29             break;
30         }
31     }
32     
33     // JDK提供修改当前线程执行状态的拓展方法,默认不实现
34     done();
35     
36     // 执行完毕,callable
37     callable = null;
38 }
39 
40 // JDK提供修改当前线程执行状态的拓展方法,默认不实现
41 protected void done() { }

4、处于中断过程中的线程处理

  FutureTask实现了Future接口,Future提供了线程取消的方法cancel。若当前线程被取消,FutureTask执行cancel方法过程中,线程状态有一段时间是 INTERRUPTING (中断处理中) 的状态,在线程执行结束之前,FutureTask在 handlePossibleCancellationInterrupt 方法中 INTERRUPTING (中断处理中)状态 的线程做了特殊处理。

  FutureTask#handlePossibleCancellationInterrupt() 详情如下:

1 private void handlePossibleCancellationInterrupt(int s) {
2     // 线程状态 INTERRUPTING (中断处理中) 
3     if (s == INTERRUPTING)
4         while (state == INTERRUPTING)
5             // 让出CPU,等待线程状态变为  INTERRUPTED  (已中断)
6             Thread.yield();
7 }

4、get() - 获取线程执行结果

1、流程图

2、get()源码分析

  获取线程执行结果,FutureTask#get() 详情如下:

1 public V get() throws InterruptedException, ExecutionException {
2     // 当前线程执行状态
3     int s = state;
4     // 当前线程处于  NEW(新建)、COMPLETING (运行中) ,阻塞挂起外部的get()获取结果的线程
5     if (s <= COMPLETING)
6         s = awaitDone(false, 0L);
7     // 线程被唤醒后,返回执行结果
8     return report(s);
9 }

  当前线程状态为 NEW或COMPLETING ,挂起线程;等待线程正常执行结束、执行异常、线程取消调用finishCompletion()方法唤醒这些挂起的线程,再通过report方法,返回执行结果。

1、挂起新建、运行中的线程

  挂起线程 FutureTask#awaitDone() 详情如下:

 1 private int awaitDone(boolean timed, long nanos) throws InterruptedException {
 2     // 是否带超时的阻塞,0不带超时
 3     final long deadline = timed ? System.nanoTime() + nanos : 0L;
 4     // 引用当前线程 封装成waitNode对象
 5     WaitNode q = null;
 6     // 当前线程 waitNode对象是否  已经添加进等待链表中,默认未添加
 7     boolean queued = false;
 8     // 自旋
 9     for (;;) {
10         // 4.1、线程唤醒,说明线程是使用中断的方式唤醒,若interrupted() 返回为true 之后会将中断标记重置为false
11         if (Thread.interrupted()) {
12             // 当前线程node出队
13             removeWaiter(q);
14             // get方法抛出  中断异常
15             throw new InterruptedException();
16         }
17         
18         // 4.2、当前线程被其他线程 使用unpark()方式
19         
20         // 获取当前线程执行状态
21         int s = state;
22         // 当前线程执行完成,将当前任务线程设置为null,并返回任务状态
23         // 状态是终态: NORMAL 、 EXCEPTIONAL 、 CANCELLED
24         if (s > COMPLETING) {
25             // 当前线程已创建WaitNode对象,将其中的thread设置为null,helpGC
26             if (q != null)
27                 q.thread = null;
28             return s;
29         }
30         
31         // 当前线程处于  运行中,让出CPU,进行下一次的抢占
32         else if (s == COMPLETING) 
33             Thread.yield();
34             
35         // 1、第一次自旋,是新线程,当前等待节点为null,为当前线程创建WaitNode对象
36         else if (q == null)
37             // 创建等待节点
38             q = new WaitNode();
39             
40         // 2、第二次自旋,线程已创建WaitNode 对象, WaitNode未入队
41         else if (!queued)
42             // 当前线程node节点指向 原队列的头节点  waiters:一直指向队列的头
43             // cas设置waiters引用指向当前线程Node节点,若成功,queued为true;若失败,可能其他线程先一步入队了
44             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
45                                                  q.next = waiters, q);
46         
47         // 3、第三次自旋,挂起线程操作        
48         // 如果设置了超时时间,get操作的线程会被parkNanos,并过了超时时间的话,从 waiters 链表中删除当前 wait
49         
50         else if (timed) {
51             // 获取剩余阻塞时间
52             nanos = deadline - System.nanoTime();
53             // 剩余阻塞时间不足,将WaitNode从等待链表中移除,返回响应状态
54             if (nanos <= 0L) {
55                 removeWaiter(q);
56                 return state;
57             }
58             // 没有过超时时间,线程进入 TIMED_WAITING 状态
59             LockSupport.parkNanos(this, nanos);
60         }
61         // 未设置超时时间,get操作的线程会被park,进入 WAITING 状态
62         // 除非有其他线程唤醒 或 将当前线程中断
63         else
64             LockSupport.park(this);
65     }
66 }

    awaitDone使用了自旋,根据每次自旋变量值的不同,走不同的分支,执行流程图如下:

  

2、移除链表中的WaitNode

  将当前Node节点从链表中移除,FutureTask#removeWaiter() 详情如下:
 1 // 试图将超时 或 已中断的等待节点 从链表中移除,同时将被删除节点的上一节点指向被删除节点的下一节点,避免gc无法回收
 2 private void removeWaiter(WaitNode node) {
 3     // 要移除的WaitNode不为null
 4     if (node != null) {
 5         // 要移除的WaitNode中线程设置为null
 6         node.thread = null;
 7         retry:
 8         // 自旋
 9         for (;;) {
10             // pred :前驱节点
11             // q:初始为头节点,代表当前正在遍历的节点
12             // s:后继节点
13             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
14                 // 当前遍历的节点的后继节点赋值给 s
15                 s = q.next;
16                 // 1、当前遍历的节点线程不为null,将当前遍历节点赋值给 它的前驱节点pred,开始下一次自旋
17                 if (q.thread != null)
18                     pred = q;
19                     
20                 // 当前遍历的节点不为头节点,并且当前遍历节点的线程为null,将当前遍历节点的前驱节点指向它的后继节点
21                 else if (pred != null) {
22                     pred.next = s;
23                     // 当前遍历线程节点的前驱节点线程也为null,说明它的前驱节点也要被删除,重新开始自旋
24                     if (pred.thread == null) 
25                         continue retry;
26                 }
27 
28                 // q.thread = null,  pred == null  ==> 头节点
29                 // 当前遍历节点为头节点,cas操作,将队列头节点waiters的引用指向当前节点的后继节点,
30                 // 即当前节点的下一节点作为头节点
31                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
32                                                       q, s))
33                     continue retry;
34             }
35             break;
36         }
37     }
38 }

  移除等待队列中执行完成、被取消的WaitNode节点,若WaitNode是头节点,将队列头节点的引用指向当前头节点的后继节点;若不为队列头节点,将当前WaitNode的前驱节点指向它的后继节点,流程图如下 :

  

   为了便于理解removeWaiters的流程,下面给出等待队列的大体变化过程:

  

3、返回执行结果

  根据线程状态返回执行结果,FutureTask#report() 详情如下:
 1 private V report(int s) throws ExecutionException {
 2     // 获取线程状态
 3     Object x = outcome;
 4     // 正常执行结束
 5     if (s == NORMAL)
 6         // 返回执行结果
 7         return (V)x;
 8     // 已取消 | (已中断) ,抛CancellationException异常
 9     if (s >= CANCELLED)
10         throw new CancellationException();
11     // 异常,抛ExecutionException异常
12     throw new ExecutionException((Throwable)x);
13 }

  通过当前线程的状态,判断是抛出异常,还是返回线程执行结果。

5、cancel() - 获取线程执行结果

5.1、流程图

  

5.2、源码分析

 1 public boolean cancel(boolean mayInterruptIfRunning) {
 2     // 当前正执行的线程状态不为 NEW(新建) 状态 ,并且线程状态变更失败 ,返回false
 3     if (!(state == NEW &&
 4           UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
 5               mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 6         return false;
 7     try {   
 8         // 允许中断正在运行线程
 9         if (mayInterruptIfRunning) {
10             try {
11                 // 获取当前正在执行的线程
12                 Thread t = runner;
13                 // 执行interrupt()中断方法
14                 if (t != null)
15                     t.interrupt();
16             } finally {
17                 // 修改线程状态为已中断
18                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
19             }
20         }
21     } finally {
22         // 唤醒因get()而挂起线程
23         finishCompletion();
24     }
25     // 返回取消中断线程结果
26     return true;
27 }