ThreadPoolExecutor

发布时间 2023-07-26 15:18:31作者: anpeiyong

任务类型

1、java.lang.Runnable

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

2、java.util.concurrent.Callable

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

  

任务提交

方式

1、java.util.concurrent.ThreadPoolExecutor#execute

  public void execute(Runnable command) {}

2、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

3、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)  

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

4、java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)

 public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

  

任务提交拒绝策略

1、java.util.concurrent.ThreadPoolExecutor#execute

  

// command the task to execute
// @throws RejectedExecutionException at discretion of {@code 
// RejectedExecutionHandler}, if the task cannot be accepted for execution

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);                                         -- 拒绝
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);                                            -- 拒绝
    }

  

2、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);  -- 将Runnable包装
        execute(ftask);  -- java.util.concurrent.ThreadPoolExecutor#execute
        return ftask;
    }


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

  

3、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result); // 将Runnable包装
        execute(ftask);    // java.util.concurrent.ThreadPoolExecutor#execute
        return ftask;
    }


 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

  

4、java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);  // 将Callable包装
        execute(ftask);  //java.util.concurrent.ThreadPoolExecutor#execute
        return ftask;
    }


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

  

  【execute、submit时,可能会有异常,需要手动try...catch...】

 

任务线程异常

java.util.concurrent.ThreadPoolExecutor#execute

public class ThreadPoolExecutor extends AbstractExecutorService {

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{

        public void run() {
            runWorker(this);
        }

    }


     final void runWorker(Worker w) {

        ...    
        beforeExecute(wt, task);
        run...
        throw...
        ...
        afterExecute(task, thrown);
        ...

    }
}

  

  任务线程执行异常,将由 java.lang.Thread.UncaughtExceptionHandler 进行处理

 

// When a thread is about to terminate due to an uncaught exception
// the Java Virtual Machine will query the thread for its
// UncaughtExceptionHandler using getUncaughtExceptionHandler 
// and will invoke the handler's uncaughtException method, 
// passing the thread and the exception as arguments.

@FunctionalInterface
    public interface UncaughtExceptionHandler {
        /**
         * Method invoked when the given thread terminates due to the
         * given uncaught exception.
         * <p>Any exception thrown by this method will be ignored by the
         * Java Virtual Machine.
         * @param t the thread
         * @param e the exception
         */
        void uncaughtException(Thread t, Throwable e);
    }

  

/**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.
     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

  当任务线程异常,JVM会调用dispatchUncaughtException处理异常

 

  用户可以自定义UncaughtExceptionHandler,实现UncaughtExceptionHandler,通过java.lang.Thread#setUncaughtExceptionHandler自定义线程异常处理器

public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
        checkAccess();
        uncaughtExceptionHandler = eh;
    }

  

java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)/submit(java.lang.Runnable, T)/submit(java.util.concurrent.Callable<T>)

  submit方式,将Runnable、Callble包装成java.util.concurrent.FutureTask,任务执行体就是FutureTask的run方法

public interface RunnableFuture<V> extends Runnable, Future<V> {

     void run();

}


public class FutureTask<V> implements RunnableFuture<V> {

    // The result to return or exception to throw from get() 
    private Object outcome;

    public void run() {

        ...
        try {
                    result = c.call();   // 任务执行
                    ...
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);      // 任务异常处理
                }
        ....

    }


    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;   // 将异常信息存储到outcome
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


    


}

  

  当调用java.util.concurrent.FutureTask#get(),任务线程的异常就会获取到

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);    // 获取异常信息
    }


/**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;     // 从outcome获取异常信息
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }