线程池execute 和 submit 的区别

发布时间 2023-04-07 16:52:05作者: 下半夜的风

1. executesubmit 的区别

前面说了还需要介绍多线程中使用 executesubmit 的区别(这两个方法都是线程池 ThreadPoolExecutor 的方法)。

1.1 方法来源不同

execute 方法是线程池的顶层接口 Executor 定义的,在 ThreadPoolExecutor 中实现:

void execute(Runnable command);

submit()是在ExecutorService接口中定义的,并定义了三种重载方式:

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

AbstractExecutorService类中有它们的具体实现,而ThreadPoolExecutor继承了AbstractExecutorService类。所以 ThreadPoolExecutor 也有这三个方法。

1.2 接收参数不同

从上面的方法来源中可以看出,二者接收参数类型不同:

  1. execute()方法只能接收实现Runnable接口类型的任务
  2. submit()方法则既可以接收Runnable类型的任务,也可以接收Callable类型的任务

1.3 返回值不同

由于 RunnableCallable 的区别就是,Runnable 无返回值,Callable 有返回值。

所以 executesubmit 的返回值也不同。

  1. execute()的返回值是void,线程提交后不能得到线程的返回值

  2. submit()的返回值是Future,通过Future的get()方法可以获取到线程执行的返回值,get()方法是同步的,执行get()方法时,如果线程还没执行完,会同步等待,直到线程执行完成

    虽然submit()方法可以提交Runnable类型的参数,但执行Future方法的get()时,线程执行完会返回null,不会有实际的返回值,这是因为Runable本来就没有返回值
    

1.4 异常处理机制不同

  1. submit 提交任务,任务内有异常也不会打印异常信息,而是调用get()方法时,打印出任务执行异常信息
  2. execute 提交任务时,任务内有异常会直接打印出来

后面源码分析中会体现这个不同点!

2. executesubmit 源码分析

2.1 submit 源码分析

submit 方法是 ExecutorService 接口定义,由 AbstractExecutorService 抽象类实现,有三个重载方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

可以看一下上面 submit 的三个重载方法,方法体很相似,都调用了一个方法 newTaskFor(...) ,那么就来看看这个方法,可以看到它有两个重载方法:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

解释一下上面两个重载方法吧:

  1. 第一个newTaskFor(Runnable runnable, T value):可以看到它应该是将 submit 方法传进来的 Runnable 转化成了 Callable,并给一个返回值
  2. 第二个newTaskFor(Callable<T> callable):就是submit直接传进了一个 Callable,包装成 FutureTask 返回。

上面代码中可以看一下 RunnableFutureFutureTask 的关系:

先看一下 RunnableFuture:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
RunnableFuture 实现了 Runnable 和 Future,它的子类就是 FutureTask
    
public class FutureTask<V> implements RunnableFuture<V> {
    // ...
}

到这里就明白了吧,当 submit 传入的参数是 Runnable 的时候,就需要 FutureTask的构造方法将 Runnable 转化成 Callable

下面看一下 FutureTask 的两个构造函数:

// 传入Runnable则是执行了一共方法,看一下这个方法,具体转化逻辑就有了
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

// 传入Callable直接赋值给类的成员变量
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

下面看一下当 submit 传入 Runnable 的时候,其实到这里就是调用了 FutureTask(Runnable runnable, V result) 构造函数,看一下这个构造函数中将 Runable 转化成了 Callable,看一下 Executors.callable(runnable, result) 方法:

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
看看这里有创建了一个类,就是 RunnableAdapter,下面再看一下这个内部类:
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

看看这个内部类,实现了 Callable 接口,并在 call 方法中 调用了 task 的 run 方法,就是相当于任务代码直接在 call 方法中了。

这里必须说一下线程池中的线程是怎么执行的,这里就不说全部了,直说与这里相关的一部分

  1. 看到上面submit 方法最终也是调用了 execute 方法,经过上main源码分析的一系列转换,submit最终调用了ThreadPoolExecutorexecute 方法
  2. execute 方法里面有一个很关键的方法是 addWorker(command, true)
  3. 进入 addWorker 方法,可以看到里面 new 了一个 WorkerWorker 里面有一个属性是 Thread,后面直接调用了它的 start 方法启动了线程
  4. 可以看一下 Worker 类,它实现了 Runnable,这里就要看看Workerrun 方法了,调用了 runWorker
  5. runWorker方法中,有一行是task.run(),调用 submit 时最终这个run方法就是RunnableFuture中的 run() 方法。具体实现在 FutureTask

下面就看一下 FutureTask 中实现的 run 方法:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //从这里可以看到,调用了call()方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 看看,这里是将异常放在了一个属性中,所以 submit执行的时候不会抛出异常,只有在调用 get 方法时才会抛出异常
                setException(ex);
            }
            if (ran)
                //这里将返回值设置到了outcome,执行完后可以通过get()获取
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

看一下上面两个主要的方法(setException(ex)set(result)):

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

可以看到两个方法一个是将异常放进了 outcome ,一个是将 call 方法的返回值放进了 outcome。不管是异常还是线程执行的返回值,都会在get 方法中获取到,下面看一下get 方法,方法在 FutureTask 类中:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

看看get 方法调用了 report 方法,取到了上面setException(ex)set(result)方法 放进 outcome 的值并返回。

这里如果线程抛出了异常,这个线程会被从线程哈希表中移除,取消强引用,让 GC 回收,并且重新创建一个新的线程。

到这里 submit 方法的源码就分析完了!!!

2.2 execute 源码分析

此方法是线程顶层接口 Executor 定义的,在 ThreadPoolExecutor 有其实现,直接看实现:

其实 submit 方法最终调用了 execute 也是这一段,不同的是最后调用的线程的 run 方法是不同实现类实现的

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里主要就是看 submitexecute 的区别点,所以线程池的看具体源码就不看了,我之前写的有一篇线程池源码的笔记很详细:线程池源码

上面有个很重要的方法,是将线程加入队列并执行的,就是 addWorker 方法,这里就不copy addWorker 的代码了,只需要知道里面创建了一个 Worker 对象即可。Worker 对象中有一个属性是 Thread,后面获取到了这个 Thread ,并执行了 start 方法。

然而在 Worker 本身是实现了 Runnable 的,所以后面执行的 start 方法,实际是执行了 Worker 中的 run 方法:

public void run() {
    runWorker(this);
}

看看 Workerrun 方法调用的 runWorker 方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 这个地方是重点,run 方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

其实这里的源码就和 submit 一样了,只是上面 task.run() 调用的是不同实现类的 run 方法。

  • execute 方法传进来的最终是调用的 Runnable 或其子类的run 方法
  • submit 方法进来的最终是调用了 FutureTaskrun 方法

基于上面的区别,再去看 FutureTaskrun 方法的源码,就可以知道一下结论:

  1. execute 是没返回值的,submit 有返回值
  2. execute 提交任务时,任务内有异常会直接打印出来;用 submit 提交任务,任务内有异常也不会打印异常信息,而是调用get()方法时,打印出任务执行异常信息