java线程原理

发布时间 2023-09-04 22:45:10作者: 无以铭川

Java线程原理

单词 thread 原意是 线, 在计算机科学引申为 一串执行过程组成的线, 线程是可以由调度程序独立管理的最小编程指令序列. 在一般情况下, 线程属于进程, 同一进程的线程可以同时执行, 共享内存等资源.

具体到 Java 的线程, 即 java.lang.Thread 类, 并不能与操作系统的线程等价. Java 线程由 JVM 实现并调度, 进程切换由 JVM 在用户空间完成, 相比之下开销更小, 同步机制也由 JVM 实现.

为什么需要线程

线程的需求源于系统对同时执行多个任务的需求. 同时, 多线程也能利用上多核CPU进行并行计算.

在操作系统中, 多进程也能完成同时执行多个任务, 但相比进程, 线程更轻量级, 线程之间可以共享内存, 线程的切换比进程切换的开销更小.

使用

线程在调用 start 之后, 该线程进入 RUNNABLE 状态, 在获得的CPU资源后, run 方法将会执行.

线程如何调度本身并不由开发者决定, 对于开发者来说, 线程的使用重点是关注线程需要执行的任务, 即 run 方法中所执行的内容. 在默认情况下, run 方法会构造方法执行传入的 Runnable 类型的任务对象.

下面是一个线程最为基本的使用例子:

public class TestThread {
  public static void main(String[] args) throws InterruptedException {
    var thread =
        new Thread(
            () -> {
              System.out.println("Hello World!");
            });
    thread.start();
    System.out.println("thread started");
    thread.join();
    System.out.println("thread finished");
  }
}
thread started
Hello World!
thread finished

然而实际大多数情况不会这么简单(虽然也不是不能用), 不过本文的主题是线程的实现, 如何使用不是主题, 以后有机会再写写看.

Java源码概要

下文中涉及的源码基于 JDK17.

对于开发者比较重要的一些字段和方法

public class Thread implements Runnable {
    // 线程名称
    private volatile String name;
    // 优先级
    private int priority;
    // 是否守护线程
    private boolean daemon = false;
    // 中断状态
    private volatile boolean interrupted;
    // 运行的任务
    private Runnable target;
    // 线程的所属组
    private ThreadGroup group;
    // 该线程的上下文类加载器, 用于SPI
    private ClassLoader contextClassLoader;
    // 当前线程的 ThreadLocal 变量集合, 该字段由 ThreadLocal 类维护
    ThreadLocal.ThreadLocalMap threadLocals = null;
    // 当前线程的 InheritableThreadLocal 变量集合, 该字段由 InheritableThreadLocal 类维护
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
	// 线程ID
    private final long tid;
    // 线程状态, 可以通过jdk.internal.misc.VM.toThreadState(threadStatus)获取Thread.State类表示的状态
    private volatile int threadStatus;
    
    // 启动线程
    public synchronized void start();
    // 线程运行执行的代码
    public void run();
    // 发出中断信号
    public void interrupt();
    // 线程是否存活, 线程 start 以后, 还未终止就是存活
    public final native boolean isAlive();
    // 另当前线程等待目标线程运行结束
    public final void join();
    
    // 获取当前线程
    public static native Thread currentThread();
    // 获取当前线程的中断状态并重设
    public static boolean interrupted();
}  

Hotspot中的线程

Java的线程需要JVM的线程实现, hotspot中定义了一些线程类型, 其类型关系如下

- Thread
  - JavaThread
    - various subclasses eg CompilerThread, ServiceThread
  - NonJavaThread
    - NamedThread
      - VMThread
      - ConcurrentGCThread
      - WorkerThread
        - GangWorker
    - WatcherThread
    - JfrThreadSampler
    - LogAsyncWriter

一个JVM线程的实例必定为 JavaThreadNonJavaThread.

而在Java中, 通过直接 new Thread 或其自定义子类所创建的Java线程对象, 在调用 start 之后创建是 JavaThread 类型的 JVM 线程.

线程创建和执行及结束的全流程

创建

Java线程对象的创建只需调用 Thread 类或子类的构造方法.

var thread = new Thread(runnable);		

启动

线程在调用 start 方法后启动, 源码如下:

public synchronized void start() {
    // threadStatus 为0时, 对应 Thread.State 为 NEW 的状态
    // 一个线程不能多次 start, 否则抛出异常
    if (threadStatus != 0)
        throw new IllegalThreadStateException();
    
    // 将线程加入线程组
    group.add(this);
	
    boolean started = false;
    try {
        // JNI调用, 由JVM启动线程
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
        }
    }
}

JVM 创建的调用 main 方法的线程以及属于 system 线程组的线程不会调用 start 方法.

线程组是一个线程集合, 用于批量管理线程. 线程组有继承关系, 线程组的最终父级是 system 线程组.

以下代码可以打印 system 线程组和子线程组的线程信息:

Thread.currentThread().getThreadGroup().getParent().list();
java.lang.ThreadGroup[name=system,maxpri=10]
    Thread[Reference Handler,10,system]
    Thread[Finalizer,8,system]
    Thread[Signal Dispatcher,9,system]
    Thread[Attach Listener,5,system]
    Thread[Notification Thread,9,system]
    java.lang.ThreadGroup[name=main,maxpri=10]
        Thread[main,5,main]
        Thread[Monitor Ctrl-Break,5,main]
    java.lang.ThreadGroup[name=InnocuousThreadGroup,maxpri=10]
        Thread[Common-Cleaner,8,InnocuousThreadGroup]

可以看到, 主线程属于main线程组, 而由主线程所创建的线程在默认情况下和主线程属于同一个线程组.

start 方法中, 真正的启动线程的在 start0 中.

本地方法 start0 以及 Thread 类中的其他本地方法是由静态方法 Thread.registerNatives() 在类加载的时候注册到JNI环境中的, 在JDK中源码(Thread.c文件)如下:

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield0",           "()V",        (void *)&JVM_Yield},
    {"sleep0",           "(J)V",       (void *)&JVM_Sleep},
    {"currentCarrierThread", "()" THD, (void *)&JVM_CurrentCarrierThread},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"setCurrentThread", "(" THD ")V", (void *)&JVM_SetCurrentThread},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",       "()[" THD,    (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"getStackTrace0",   "()" OBJ,     (void *)&JVM_GetStackTrace},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
    {"scopedValueCache", "()[" OBJ,    (void *)&JVM_ScopedValueCache},
    {"setScopedValueCache", "([" OBJ ")V",(void *)&JVM_SetScopedValueCache},
    {"getNextThreadIdOffset", "()J",   (void *)&JVM_GetNextThreadIdOffset},
    {"findScopedValueBindings", "()" OBJ, (void *)&JVM_FindScopedValueBindings},
    {"ensureMaterializedForStackWalk",
                         "(" OBJ ")V", (void*)&JVM_EnsureMaterializedForStackWalk_func},
};

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

start0 对应的就是 JVM_StartThread 的函数指针, 函数源码在 jvm.cpp 文件中.

JVM_StartThread 函数所做的事情主要是:

  • 创建 JavaThread 并初始化

    JVM_StartThread 所创建的C++线程是 JavaThread (C++) 类的实例, 会持有一个函数指针 entry_point, 指向 jvm.cpp 文件中的 thread_entry 函数.

    native_thread = new JavaThread(&thread_entry, sz);
    
    static void thread_entry(JavaThread* thread, TRAPS) {
      HandleMark hm(THREAD);
      Handle obj(THREAD, thread->threadObj());
      JavaValue result(T_VOID);
      // invoke_virtual 调用Java平台线程的run方法
      JavaCalls::call_virtual(&result,
                              obj,
                              vmClasses::Thread_klass(),
                              vmSymbols::run_method_name(),
                              vmSymbols::void_method_signature(),
                              THREAD);
    }
    

    JavaThread 的实例化函数会调用 os::create_thread 创建操作系统线程, 其在 windows 平台的实现在 os_windows.cpp 文件中.

    JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) : JavaThread() {
      _jni_attach_state = _not_attaching_via_jni;
      // 设置线程要执行的函数
      set_entry_point(entry_point);
      os::ThreadType thr_type = os::java_thread;
      // CompilerThread 是JIT编译器的线程
      thr_type = entry_point == &CompilerThread::thread_entry ? os::compiler_thread :
                                                                os::java_thread;
      // 创建os线程
      os::create_thread(this, thr_type, stack_sz);
    }
    
    // os_windows.cpp
    bool os::create_thread(Thread* thread, ThreadType thr_type,
                           size_t stack_size) {
      unsigned thread_id;
      // 创建和初始化OS线程
      OSThread* osthread = new (std::nothrow) OSThread();
      if (osthread == nullptr) {
        return false;
      }
      osthread->set_state(ALLOCATED);
      // jdk 中断事件支持(windows平台)
      HANDLE interrupt_event = CreateEvent(nullptr, true, false, nullptr);
      if (interrupt_event == nullptr) {
        delete osthread;
        return false;
      }
      osthread->set_interrupt_event(interrupt_event);
      // Java 平台Thread -> JVM JavaThread -> OS Thread
      thread->set_osthread(osthread);
    
      
      if (stack_size == 0) {
        switch (thr_type) {
        case os::java_thread:
          // -Xss可以设置最小栈大小
          if (JavaThread::stack_size_at_create() > 0) {
            stack_size = JavaThread::stack_size_at_create();
          }
          break;
        case os::compiler_thread:
          if (CompilerThreadStackSize > 0) {
            stack_size = (size_t)(CompilerThreadStackSize * K);
            break;
          }
        case os::vm_thread:
        case os::gc_thread:
        case os::asynclog_thread:
        case os::watcher_thread:
          if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K);
          break;
        }
      }
      
      // windows的线程flag
      const unsigned initflag = CREATE_SUSPENDED | STACK_SIZE_PARAM_IS_A_RESERVATION;
      // 线程句柄
      HANDLE thread_handle;
      int limit = 3;
      do {
        thread_handle =
          // windows创建线程的系统调用
          (HANDLE)_beginthreadex(nullptr,
                                 (unsigned)stack_size,
                                 // 操作系统线程将执行 thread_native_entry
                                 &os::win32::thread_native_entry,
                                 // thread_native_entry 的参数
                                 thread,
                                 // 创建线程的flag, 此处是挂起并预留栈空间
                                 initflag,
                                 &thread_id);
      } while (thread_handle == nullptr && errno == EAGAIN && limit-- > 0);
    
      ResourceMark rm;
      char buf[64];
      if (thread_handle != nullptr) {
        log_info(os, thread)("Thread \"%s\" started (tid: %u, attributes: %s)",
                             thread->name(), thread_id,
                             describe_beginthreadex_attributes(buf, sizeof(buf), stack_size, initflag));
      } else {
        log_warning(os, thread)("Failed to start thread \"%s\" - _beginthreadex failed (%s) for attributes: %s.",
                                thread->name(), os::errno_name(errno), describe_beginthreadex_attributes(buf, sizeof(buf), stack_size, initflag));
        log_info(os, thread)("Number of threads approx. running in the VM: %d", Threads::number_of_threads());
        LogStream st(Log(os, thread)::info());
        os::print_memory_info(&st);
      }
      // 创建失败处理
      if (thread_handle == nullptr) {
        thread->set_osthread(nullptr);
        delete osthread;
        return false;
      }
        
      // 创建成功设置OS Thread字段
      osthread->set_thread_handle(thread_handle);
      osthread->set_thread_id(thread_id);
      osthread->set_state(INITIALIZED);
      return true;
    }
    
  • 调用 Thread::start 启动线程

    Thread::start 设置 Java 的 Thread 类的 threadState 字段的值为 RUNNABLE 对应的值, 并通过 os::start_thread 运行操作系统线程.

    void Thread::start(Thread* thread) {
      if (thread->is_Java_thread()) {
        // 设置Java平台线程对象的状态
        java_lang_Thread::set_thread_status(JavaThread::cast(thread)->threadObj(),
                                            JavaThreadStatus::RUNNABLE);
      }
      // 启动OS线程
      os::start_thread(thread);
    }
    
    void os::start_thread(Thread* thread) {
      OSThread* osthread = thread->osthread();
      osthread->set_state(RUNNABLE);
      // 启动线程, 不同平台实现不同
      pd_start_thread(thread);
    }
    

运行和退出

操作系统线程运行的是 thread_native_entry 函数, 该函数的参数是前一步所创建的 C++ 线程对象.

thread_native_entry 是JVM线程执行的函数入口, 该函数内部调用了线程类的 call_run 函数, 该函数会按顺序调用线程的 pre_run, run , post_run 函数.

对于 JavaThread 的实现来说:

  • pre_run 函数为空什么也不做

  • run 函数按顺序执行一下过程:

    • 初始化 TLAB
    • 调用 thread_main_inner 方法, thread_main_inner 方法中将会调用 JavaThread 所持有的 entry_point 函数指针. entry_pointthread_entry 的函数指针, 该函数将会通过 invoke_virtual 指令调用Java平台的线程类对象的 run 方法.
    void JavaThread::run() {
      // 初始化TLAB
      initialize_tlab();
      // ... 其它还没搞懂的代码
      // 执行构造 JavaThread 时的函数指针 entry_point
      thread_main_inner();
    }
    
    void JavaThread::thread_main_inner() {
      assert(JavaThread::current() == this, "sanity check");
      assert(_threadObj.peek() != nullptr, "just checking");
    
      if (!this->has_pending_exception()) {
        {
          ResourceMark rm(this);
          this->set_native_thread_name(this->name());
        }
        HandleMark hm(this);
        // 执行构造 JavaThread 时传入的函数指针
        this->entry_point()(this, this);
      }
    
      DTRACE_THREAD_PROBE(stop, this);
    }
    
  • post_run 函数

    • 调用 JavaThreadexit 函数, 如果有异常, exit 函数将会对退出的异常进行处理, 在正常退出的情况下, 将会通过 invoke_virtual 指令调用Java平台的线程类对象的 exit 方法
    • 设置Java平台线程的状态为终止
    • 通知所有 join 了当前线程的线程
    • 其它清理工作
    void JavaThread::post_run() {
      this->exit(false);
      this->unregister_thread_stack_with_NMT();
      // jstack会显示线程SMR信息
      this->smr_delete();
    }
    
    void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
      /* 其它代码*/
      if (!destroy_vm) {
        if (uncaught_exception.not_null()) {
          EXCEPTION_MARK;
          
          // 调用Java线程的dispatchUncaughtException方法处理未捕获的异常
          Klass* thread_klass = vmClasses::Thread_klass();
          JavaValue result(T_VOID);
          JavaCalls::call_virtual(&result,
                                  threadObj, thread_klass,
                                  vmSymbols::dispatchUncaughtException_name(),
                                  vmSymbols::throwable_void_signature(),
                                  uncaught_exception,
                                  THREAD);
          if (HAS_PENDING_EXCEPTION) {
            ResourceMark rm(this);
            jio_fprintf(defaultStream::error_stream(),
                        "\nException: %s thrown from the UncaughtExceptionHandler"
                        " in thread \"%s\"\n",
                        pending_exception()->klass()->external_name(),
                        name());
            CLEAR_PENDING_EXCEPTION;
          }
        }
          
        if (!is_Compiler_thread()) {
          int count = 3;
          while (java_lang_Thread::threadGroup(threadObj()) != NULL && (count-- > 0)) {
            EXCEPTION_MARK;
            // 调用Java线程的exit方法
            JavaValue result(T_VOID);
            Klass* thread_klass = vmClasses::Thread_klass();
            JavaCalls::call_virtual(&result,
                                    threadObj, thread_klass,
                                    vmSymbols::exit_method_name(),
                                    vmSymbols::void_method_signature(),
                                    THREAD);
            CLEAR_PENDING_EXCEPTION;
          }
        }
          
        if (JvmtiExport::should_post_thread_life()) {
          JvmtiExport::post_thread_end(this);
        }
      } else {
        assert(!is_terminated() && !is_exiting(), "must not be exiting");
      }
      /* 其它代码 */
        
      // notify所有通过 join 阻塞的线程
      ensure_join(this);
      /* 其它代码 */
    }
    
    private void exit() {
        if (threadLocals != null && TerminatingThreadLocal.REGISTRY.isPresent()) {
            TerminatingThreadLocal.threadTerminated();
        }
        if (group != null) {
            group.threadTerminated(this);
            group = null;
        }
        /* Aggressively null out all reference fields: see bug 4006245 */
        target = null;
        /* Speed the release of some of these resources */
        threadLocals = null;
        inheritableThreadLocals = null;
        inheritedAccessControlContext = null;
        blocker = null;
        uncaughtExceptionHandler = null;
    }
    

线程 join 的原理

Thread 类的 join 可以让执行调用时的线程阻塞等待调用该方法的线程完成.

var thread = new Thread(() -> System.out.println("running"));
thread.start();
thread.join(); // 阻塞直到线程执行完成(前文提到的 `post_run` 函数的退出通知完成)
System.out.println("finished");

join 方法源码:

源码注释:

最多等待 millis 毫秒到线程终止, 如果时间为0则永远等待.

通过以 this.isAlive 为条件循环调用 this.wait 实现, 当线程终止时, 将调用 this.notifyAll 方法.

建议应用程序不要在 Thread 实例上使用 wait, notifynotifyAll.

public final synchronized void join(final long millis) throws InterruptedException {
    if (millis > 0) {
        if (isAlive()) {
            final long startTime = System.nanoTime();
            long delay = millis;
            do {
                wait(delay); // this.wait(millis) 锁住的就是当前 thread 对象的监视器
            } while (isAlive() && (delay = millis -
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
        } // else 如果线程已经终止, 就直接返回
    } else if (millis == 0) {
        while (isAlive()) {
            wait(0);
        } // else 如果线程已经终止, 就直接返回
    } else {
        throw new IllegalArgumentException("timeout value is negative");
    }
}

可以看到, join 本质上就是调用了当前线程从 Object 类继承的 wait 方法, 如果被唤醒, 如果线程还未结束(带超时时间的则会也判断是否超时), 将继续调用 wait 阻塞, 直至线程完成, 而线程完成的时候, 会由 JVM 对该线程的监视器发出通知唤醒.

那么在看完上面的源码后回答一个问题: 对当前线程 Thread.currentThread().join() 会发生什么? 答案是永远阻塞, 除非调用该线程的 interrupt 方法.

public class TestJoin {
  public static void main(String[] args) throws InterruptedException {
    var mainThread = Thread.currentThread();
    var thread =
        new Thread(
            () -> {
              sleep(1000);
              System.out.println("before notify");
              synchronized (mainThread) {
                mainThread.notify();
              }
              sleep(1000);
              System.out.println("notified");
              System.out.println("before interrupt");
              mainThread.interrupt();
            });
    thread.start();
    try {
      mainThread.join();
    } catch (InterruptedException e) {
      System.out.println("interrupt");
    }
  }

  private static void sleep(long millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

如果你在 join 方法中的 wait(0) 和 那一行以及上面这段代码的 mainThread.notify() 打上断点, 并Debug 执行上面这段代码, 可以发现首次 wait(0) 之后, 再执行 notify, mainThread 又会继续运行并再次调用 wait(0), 而在 interrupt 之后, wait(0) 就会抛出 InterruptedException, 在被捕获之后执行异常处理.

线程 interrupt 的原理

Java线程的 interrupt 方法可以中断线程, 这种中断并非中止, 中断只能中断线程的特点操作. 源码如下:

源码注释(机翻警告):

中断该线程。
除非当前线程正在中断自身(这始终是允许的),否则将调用该线程的checkAccess方法,这可能会导致抛出SecurityException 。
如果此线程在调用Object类的wait() 、 wait(long)或wait(long, int)方法或join() 、 join(long) 、 join(long, int)时被阻止该类的 、 sleep(long)或sleep(long, int)方法,则其中断状态将被清除,并且将收到InterruptedException 。
如果该线程在InterruptibleChannel上的 I/O 操作中被阻塞,则通道将被关闭,线程的中断状态将被设置,并且线程将收到java.nio.channels.ClosedByInterruptException 。
如果该线程在java.nio.channels.Selector中被阻塞,则该线程的中断状态将被设置,并且它将立即从选择操作返回,可能返回一个非零值,就像调用选择器的wakeup方法一样。
如果前面的条件都不成立,则该线程的中断状态将被设置。
中断不活动的线程不需要产生任何效果。

public void interrupt() {
    if (this != Thread.currentThread()) {
        checkAccess();

        // 线程可能阻塞在调用 I/O
        synchronized (blockerLock) {
            // 不为空时, 代表线程阻塞在 I/O
            Interruptible b = blocker;
            if (b != null) {
                interrupted = true;
                interrupt0();  // inform VM of interrupt
                b.interrupt(this);
                return;
            }
        }
    }
    interrupted = true;
    // inform VM of interrupt
    interrupt0();
}

interrupt 可以中断的线程操作有:

  1. 会抛出 InterruptedException 的方法, 包括 wait, join, sleep 等等.

    这些方法在被中断后, 将会抛出 InterruptedException, 并且其中断标识即 Thread 类的 interrupted 字段将被设置为 false.

  2. 会抛出 ClosedByInterruptException 的方法, 即 InterruptibleChannel 的各种子类的 I/O 操作.

    这些方法在被中断后, 将会抛出 ClosedByInterruptException, 并且其中断标识将设置为 true.

    private static void testChannelInterrupt() throws InterruptedException {
      var thread =
          new Thread(
              () -> {
                try {
                  var serverSocketChannel = ServerSocketChannel.open();
                  serverSocketChannel.bind(new InetSocketAddress(8080));
                  serverSocketChannel.accept();
                } catch (ClosedByInterruptException e) {
                  // channel interrupt true
                  System.out.println("channel interrupt " + Thread.currentThread().isInterrupted());
                } catch (IOException e) {
                  throw new RuntimeException(e);
                }
              });
      thread.start();
      Thread.sleep(1000);
      thread.interrupt();
      Thread.sleep(1000);
    }
    
  3. java.nio.channels.Selector 的阻塞操作. 在中断后, 中断标识将为 true, 但并不会抛出异常, 而是直接返回. 与 Channel 的区别在于对 Interruptible 实现的区别.

    // AbstractSelector 类阻塞I/O操作会调用该方法
    protected final void begin() {
        if (interruptor == null) {
            // 线程中断时的操作
            interruptor = new Interruptible() {
                    public void interrupt(Thread ignore) {
                        AbstractSelector.this.wakeup();
                    }};
        }
        AbstractInterruptibleChannel.blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }
    
    

​ 关于 Interruptible 下文会说明.

java.io 的 I/O 操作并不会会被 interrupt 中断.

如果在目标线程调用这些方法之前调用 interrupt 线程, 那么这些操作同样会被中断并产生对应的结果.

private static void testInterruptBeforeSleep() {
    // false
    System.out.println(Thread.currentThread().isInterrupted());
    // true
    Thread.currentThread().interrupt();
    System.out.println(Thread.currentThread().isInterrupted());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      // interrupted false
      System.out.println("interrupted " + Thread.currentThread().isInterrupted());
    }
}

可中断IO的实现

前文提到的第 2 和 第 3 种可中断的操作, 本质都是可中断I/O, 其实现借助于 InterruptibleThread 类的 blocker 字段.

/* The object in which this thread is blocked in an interruptible I/O
 * operation, if any.  The blocker's interrupt method should be invoked
 * after setting this thread's interrupt status.
 */
private volatile Interruptible blocker;
private final Object blockerLock = new Object();

/* Set the blocker field; invoked via jdk.internal.access.SharedSecrets
 * from java.nio code
 */
static void blockedOn(Interruptible b) {
    Thread me = Thread.currentThread();
    synchronized (me.blockerLock) {
        me.blocker = b;
    }
}

在调用可中断的阻塞I/O操作前, 调用方将通过 blockedOn 方法设置线程的 blocker 字段.

Interruptible 只有一个 interrupt 方法, 本质是一个回调, 通过设置中断时的回调, 在调用线程的 interrupt 方法的时候就可以调用该回调中断 I/O 操作.

在调用 blockedOn 前, 可能线程就已经被中断了, 因此, 在 blockedOn 完成后都需要检查线程中断状态, 如果已经中断就直接调用 Interruptibleinterrupt 方法.

例如 AbstractInterruptibleChannelbegin 方法源码:

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (closed)
                            return;
                        closed = true;
                        interrupted = target;
                        try {
                            // 线程 interrupt 的时候关闭 Channel
                            AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor); // 设置线程的 blocker
    Thread me = Thread.currentThread();
    if (me.isInterrupted()) // 检查中断状态
        interruptor.interrupt(me); // 调用 Interruptible 的 interrupt 方法
}

因此, 也可以自己来实现一个 Interruptible:

public class TestInterrupt {
  public static void main(String[] args) throws InterruptedException, IOException {
    SharedSecrets.getJavaLangAccess()
        .blockedOn(t -> System.out.println("Interruptible: " + t.isInterrupted()));
    var mainThread = Thread.currentThread();
    new Thread(mainThread::interrupt).start();
    while (!Thread.interrupted()) {
      System.out.println("main thread is not interrupted");
    }
  }
}

这段代码需要增加 --add-exports=java.base/jdk.internal.access=ALL-UNNAMED --add-exports=java.base/sun.nio.ch=ALL-UNNAMED 才可以编译运行.

在输出若干次 main thread is not interrupted 后将输出 Interruptible: true.

interrupt0 的实现

本地方法 interrupt0 对应的 CPP 函数指针将调用 JavaThreadinterrupt 函数.

void JavaThread::interrupt() {
  // All callers should have 'this' thread protected by a
  // ThreadsListHandle so that it cannot terminate and deallocate
  // itself.
  debug_only(check_for_dangling_thread_pointer(this);)

  // For Windows _interrupt_event
  WINDOWS_ONLY(osthread()->set_interrupted(true);)

  // For Thread.sleep
  _SleepEvent->unpark();

  // For JSR166 LockSupport.park
  parker()->unpark();

  // For ObjectMonitor and JvmtiRawMonitor
  _ParkEvent->unpark();
}

_SleepEvent_ParkEvent 都是 ParkEvent 类型, parker 函数则会返回一个 Parker 类的对象, 他们源码都有比较多的说明.

ParkEventPacker 都设计了一对 park - unpark 操作, park 可以让线程挂起直到 unpark.

以此实现了 Java 平台的 sleep, wait, join 以及 juc 中的一些可中断方法.

private static void testJucInterrupt() throws InterruptedException {
  var reentrantLock = new ReentrantLock();
  reentrantLock.lock();
  var thread =
      new Thread(
          () -> {
            try {
              System.out.println("thread interrupt " + Thread.currentThread().isInterrupted());
              reentrantLock.lockInterruptibly();
            } catch (Exception e) {
              System.out.println("lock interrupt");
            }
          });
  thread.start();
  thread.interrupt();
  Thread.sleep(1000);
}

另外, Thread 类的 interrupted 会静态方法读取线程的中断状态并重设中断状态.

public static boolean interrupted() {
    Thread t = currentThread();
    boolean interrupted = t.interrupted;
    // We may have been interrupted the moment after we read the field,
    // so only clear the field if we saw that it was set and will return
    // true; otherwise we could lose an interrupt.
    if (interrupted) {
        t.interrupted = false;
        clearInterruptEvent();
    }
    return interrupted;
}