二、Quartz原理及流程

发布时间 2023-09-03 20:41:32作者: Stitches

参考

https://www.zhihu.com/question/41918492/answer/490367825

线程模型

Quartz 的线程模型如上图所示,其中 RegularSchedulerThread 为常规调度线程、MisfireSchedulerThread 为错失触发调度线程、JobThreadPool 为任务执行线程池。

常规调度线程轮询存储的所有 trigger,如果有需要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire 线程是扫描所有的 trigger,查看是否有 misfired trigger,如果有的话根据 misfire 的策略分别处理。

其中,常规调度线程的执行流程如下图所示:

  1. 常规调度线程会不断循环判断,直到任务线程池中有一个空闲线程,接着它会从 Trigger 集合中取出接下来 N 秒内即将触发的任务,然后等待着任务触发执行;最后触发器触发任务会被分派到线程池中的一个线程异步执行。
// 常规调度线程池
public class SimpleThreadPool implements ThreadPool {
    // 线程池总线程数
    private int count = -1;
    private int prio = Thread.NORM_PRIORITY;
    // 线程池当前状态
    private boolean isShutdown = false;
    private boolean handoffPending = false;
    private final Object nextRunnableLock = new Object();


    // 存放所有工作线程引用
    private List<WorkerThread> workers;
    // 存放所有空闲线程
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    // 存放所有工作线程
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

    // 在下一个空闲线程中执行当前任务,如果线程池被要求关闭,会立刻给当前任务分配一个额外线程去执行
    public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
            return false;
        }
        // 如果多线程同时调用,需要加锁
        synchronized (nextRunnableLock) {

            handoffPending = true;

            // Wait until a worker thread is available
            while ((availWorkers.size() < 1) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }

            if (!isShutdown) {
                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                busyWorkers.add(wt);
                wt.run(runnable);
            } else {
                // If the thread pool is going down, execute the Runnable
                // within a new additional worker thread (no thread from the pool).
                WorkerThread wt = new WorkerThread(this, threadGroup,
                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                busyWorkers.add(wt);
                workers.add(wt);
                wt.start();
            }
            nextRunnableLock.notifyAll();
            handoffPending = false;
        }
        return true;
    }

    // 关闭线程池中所有正在执行任务的线程
    public void shutdown(boolean waitForJobsToComplete) {

        synchronized (nextRunnableLock) {
            getLog().debug("Shutting down threadpool...");

            isShutdown = true;

            if(workers == null) // case where the pool wasn't even initialize()ed
                return;

            // signal each worker thread to shut down
            Iterator<WorkerThread> workerThreads = workers.iterator();
            while(workerThreads.hasNext()) {
                WorkerThread wt = workerThreads.next();
                wt.shutdown();
                availWorkers.remove(wt);
            }

            // Give waiting (wait(1000)) worker threads a chance to shut down.
            // Active worker threads will shut down after finishing their
            // current job.
            nextRunnableLock.notifyAll();

            if (waitForJobsToComplete == true) {

                boolean interrupted = false;
                try {
                    // wait for hand-off in runInThread to complete...
                    while(handoffPending) {
                        try {
                            nextRunnableLock.wait(100);
                        } catch(InterruptedException _) {
                            interrupted = true;
                        }
                    }

                    // Wait until all worker threads are shut down
                    while (busyWorkers.size() > 0) {
                        WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
                        try {
                            getLog().debug(
                                    "Waiting for thread " + wt.getName()
                                            + " to shut down");

                            // note: with waiting infinite time the
                            // application may appear to 'hang'.
                            nextRunnableLock.wait(2000);
                        } catch (InterruptedException _) {
                            interrupted = true;
                        }
                    }

                    workerThreads = workers.iterator();
                    while(workerThreads.hasNext()) {
                        WorkerThread wt = (WorkerThread) workerThreads.next();
                        try {
                            wt.join();
                            workerThreads.remove();
                        } catch (InterruptedException _) {
                            interrupted = true;
                        }
                    }
                } finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }

                getLog().debug("No executing jobs remaining, all threads stopped.");
            }
            getLog().debug("Shutdown of threadpool complete.");
        }
    }


    // 工作线程定义为内部类
    class WorkerThread extends Thread {
        // 工作线程执行时的互斥锁
        private final Object lock = new Object();

        // 工作线程暂停的标记
        private AtomicBoolean run = new AtomicBoolean(true);

        private SimpleThreadPool tp;

        // 具体执行的任务,构造函数传入
        private Runnable runnable = null;
        
        private boolean runOnce = false;

        /**
            * <p>
            * Create a worker thread and start it. Waiting for the next Runnable,
            * executing it, and waiting for the next Runnable, until the shutdown
            * flag is set.
            * </p>
            */
        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                        int prio, boolean isDaemon) {

            this(tp, threadGroup, name, prio, isDaemon, null);
        }

        /**
            * <p>
            * Create a worker thread, start it, execute the runnable and terminate
            * the thread (one time execution).
            * </p>
            */
        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                        int prio, boolean isDaemon, Runnable runnable) {

            super(threadGroup, name);
            this.tp = tp;
            this.runnable = runnable;
            if(runnable != null)
                runOnce = true;
            setPriority(prio);
            setDaemon(isDaemon);
        }

        // 标记执行终止或结束
        void shutdown() {
            run.set(false);
        }

        public void run(Runnable newRunnable) {
            synchronized(lock) {
                if(runnable != null) {
                    throw new IllegalStateException("Already running a Runnable!");
                }

                runnable = newRunnable;
                lock.notifyAll();
            }
        }


        @Override
        public void run() {
            boolean ran = false;
            
            while (run.get()) {
                try {
                    synchronized(lock) {
                        while (runnable == null && run.get()) {
                            lock.wait(500);
                        }

                        if (runnable != null) {
                            ran = true;
                            runnable.run();         // 任务执行
                        }
                    }
                } catch (InterruptedException unblock) {
                    // do nothing (loop will terminate if shutdown() was called
                    try {
                        getLog().error("Worker thread was interrupt()'ed.", unblock);
                    } catch(Exception e) {
                        // ignore to help with a tomcat glitch
                    }
                } catch (Throwable exceptionInRunnable) {
                    try {
                        getLog().error("Error while executing the Runnable: ",
                            exceptionInRunnable);
                    } catch(Exception e) {
                        // ignore to help with a tomcat glitch
                    }
                } finally {
                    synchronized(lock) {
                        runnable = null;
                    }
                    // repair the thread in case the runnable mucked it up...
                    if(getPriority() != tp.getThreadPriority()) {
                        setPriority(tp.getThreadPriority());
                    }

                    if (runOnce) {
                            run.set(false);
                        clearFromBusyWorkersList(this);
                    } else if(ran) {
                        ran = false;
                        makeAvailable(this);
                    }

                }
            }

            //if (log.isDebugEnabled())
            try {
                getLog().debug("WorkerThread is shut down.");
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        }
    }
}

错失调度线程的执行流程如下图:

  1. 错失调度线程首先会扫描所有的 trigger 集合判断是否有错失未触发的 misfired triggers
  2. 如果存在错失未触发的集合,循环遍历其中的每个触发器,对于每一个触发器根据所配置的错失触发原则 misfirePolicy 选择对应的处理方法处理;

Quartz 启动流程