ThreadPoolExecutor

发布时间 2023-10-26 17:16:43作者: anpeiyong

 

概述

public class ThreadPoolExecutor extends AbstractExecutorService {

        private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
            final Thread thread;
            Runnable firstTask;

            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }

            public void run() {
                runWorker(this);
            }

            public void unlock()      { release(1); }
            public void lock()        { acquire(1); }

            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }

            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
        }


        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // The queue used for holding tasks and handing off to worker threads.
        private final BlockingQueue<Runnable> workQueue;
        // Set containing all worker threads in pool. Accessed only when holding mainLock. 包含所有线程的Set,仅持有mainlock访问
        private final HashSet<Worker> workers = new HashSet<Worker>();
        // Tracks largest attained pool size. Accessed only under mainLock. 达到最大的线程数量,仅持有mainlock访问
        private int largestPoolSize;
        // Handler called when saturated or shutdown in execute. 当执行时线程池 饱和或关闭 时调用的拒绝策略
        private volatile RejectedExecutionHandler handler;
        // Timeout in nanoseconds for idle threads waiting for work. 超出coreSize的空闲线程生存时间
        private volatile long keepAliveTime;
        // Core pool size is the minimum number of workers to keep alive (and not allow to time out etc) unless allowCoreThreadTimeOut is set, in which case the minimum is zero.
        // 线程池中 最小的保持活着的线程数量
        private volatile int corePoolSize;
        // Maximum pool size.
        private volatile int maximumPoolSize;
        // The default rejected execution handler
        private static final RejectedExecutionHandler defaultHandler = new java.util.concurrent.ThreadPoolExecutor.AbortPolicy();

        final void runWorker(Worker w) {
            
        }

    }

  

链路

execute(Runnable command)

// java.util.concurrent.ThreadPoolExecutor.execute
    /**
     * Executes the given task sometime in the future.The task may execute in a new thread or in an existing pooled thread.
     *  执行给定的任务;可能在 新线程或已存在的线程 中执行;
     * @param command
     */
    public void execute(Runnable command) {
        int c = ctl.get();
        // If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.
        // 如果 运行的线程数 < 核心线程数 -> 创建一个新的线程执行该任务
        if (workerCountOf(c) < corePoolSize) {
            //
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    // java.util.concurrent.ThreadPoolExecutor.addWorker
    /**
     * Checks if a new worker can be added with respect to current pool state and the given bound (either core or maximum).
     *  检测 当前线程池的state和边界,一个新的线程是否能被添加
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);                                          // 将提交的Runnable封装为一个Worker(创建一个新的线程)
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);                                           // 将新建的worker添加到workers
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();                                                     // 如果worker创建成功 -> 启动该worker线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    // java.util.concurrent.ThreadPoolExecutor.Worker.run
    public void run() {
        runWorker(this);
    }

    // java.util.concurrent.ThreadPoolExecutor.runWorker

    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and executes them
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts                                     // 使用AQS的独占模式释放lock
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();                                                   // 使用AQS的独占模式获取lock
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);                                    // 任务执行前
                    Throwable thrown = null;
                    try {
                        task.run();                                             // 任务执行
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);                             // 任务执行后
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();                                                 // 使用AQS的独占模式释放lock
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  

Future<?> submit(Runnable task)

// java.util.concurrent.AbstractExecutorService.submit(java.lang.Runnable)
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    // java.util.concurrent.AbstractExecutorService.newTaskFor(java.lang.Runnable, T)
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    // java.util.concurrent.ThreadPoolExecutor.execute
    // execute(Runnable command)链路一致

  

<T> Future<T> submit(Callable<T> task)

// java.util.concurrent.AbstractExecutorService.submit(java.util.concurrent.Callable<T>)
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    // java.util.concurrent.AbstractExecutorService.newTaskFor(java.util.concurrent.Callable<T>)
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    // java.util.concurrent.ThreadPoolExecutor.execute
    // execute(Runnable command)链路一致