quartz是什么意思(天王表quartz是什么意思)

1.调度工厂有两个实现类。在上一节中,我们使用了StanderScheduleFactory。事实上,DirectScheduleFactory更易于快速使用。

1.调度工厂

Quartz框架设计以及原理

有两个实现类。在上一节中,我们使用了StanderScheduleFactory。事实上,DirectScheduleFactory更易于快速使用。

开始:

// 直接默认启动Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();// 或者手动注入配置Properties properties = new Properties();properties.setProperty("org.quartz.scheduler.instanceName", "MyScheduler");properties.setProperty("org.quartz.threadPool.threadCount", "4");properties.setProperty("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");StdSchedulerFactory factory = new StdSchedulerFactory(properties);Scheduler scheduler = factory.getScheduler();// 3. 系统传参,指定配置文件位置 : org.quartz.properties=quartz.properties//4. 直接不用传参 ,在Classpath下面建立一个 quartz.properties文件就行了.

你做了什么?配置是如何加载的?其实就是这里。org . quartz . impl . STD scheduler factory # getDefaultScheduler

他在里面实例化了一堆属性,我真的觉得没写。代码太简单了。他会有一个默认的实现,所以他不怕因为没有写配置而开始出错。或者,您可以只构造方法并将其传递到配置中。

public static Scheduler getDefaultScheduler() throws SchedulerException { StdSchedulerFactory fact = new StdSchedulerFactory(); return fact.getScheduler();}

为了组织。quartz . impl . stdschedulerfactory # instantiate()方法对比,调度器的核心方法代码有几千行,不一一展示。主要目的是初始化一堆东西,打包一堆东西。

主要是以下几个。

JobStore js = null;ThreadPool tp = null;QuartzScheduler qs = null;DBConnectionManager dbMgr = null;

这四个都有相同的特征,

第一 properties注入全部采用的是 set方法反射注入 ,org.quartz.impl.StdSchedulerFactory#setBeanProps 这个方法里, 会set注入进去.第二 全部初始化都是 initialize方法

2.功课存储

该接口由希望为org . quartz . core . quartz scheduler提供作业和触发存储机制的类实现& # 39;的用途。

作业和触发器的存储应该以它们的名称和组的组合作为唯一性的关键。

解释很清楚:向QuartzScheduler提供一个作业和触发器的存储。同时,作业和触发器的键应该是唯一的。

Org.quartz.spi.JobStore显然是存储我们工作的东西。如果我们不告诉他使用哪一个,默认实现是org . quartz . simple . ramjobstore

一个基于内存的存储,简单的包装一堆集合。主要的一点是,他将包装作业,并将其称为JobWrapper。

主要支持三种存储配置:

RAM (直接JVM进程的)terracotta (Terracotta是一款由美国Terracotta公司开发的著名开源Java集群平台。)JDBC (MySQL 之类的 ....)

3.线程池

希望为org . quartz . core . quartz scheduler提供线程池的类实现的接口& # 39;的用途。

理想情况下,线程池的实现实例应该只供Quartz使用。最重要的是,当方法blockForAvailableThreads()返回值为1或更大时,在稍后(或许多时候)调用方法runInThread(Runnable)时,池中必须至少有一个可用线程。如果这个假设不成立,可能会导致额外的JobStore查询和更新,如果使用了集群功能,可能会导致更大的负载不平衡。

4.季度计划资源

这是整个石英的核心资源,所有的资源都在这里。

5.石英时间表

整个框架的核心....

这是Quartz的核心,它是Scheduler接口的间接实现,包含调度作业、注册JobListener实例等方法。

是quartz的核心,包括调用job和注册listener的方法。

构造函数,所以核心资源QuartzSchedulerResources也给了QuartzScheduler。

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval){....}

季度计划资源

包含所有资源(JobStore、ThreadPool等。)是创建QuartzScheduler实例所必需的。

资源管理器,包括作业和线程池

空闲等待时间

当调度程序发现当前没有要触发的触发器时,它应该等待多长时间直到再次检查...

只是等待实施。这是一个无限循环。如果等待时间太短,就会转空很快,就是这个意思。默认值为30秒。

dbRetryInterval

弃用了,就交给别人管理了。

主要实现:靠自己....

Quartz框架设计以及原理

6.管理类

管理ConnectionProviders的集合,并提供对其连接的透明访问。

管理ConnectionProviders并提供透明的连接访问。

事实上,他是

job store-& gt;DBConnectionManager-& gt;ConnectionProviders,这可以说是一种桥接模式。有一个管理器管理连接提供者,然后JobStore可以通过他获得连接。

7.作业监听器和触发器监听器

第一点是这些都是ListenerManager管理的,他的实现类是ListenerManagerImpl。

让我们看看JobListener。

由希望在JobDetail执行时得到通知的类实现的接口。通常,使用调度程序的应用程序不会使用这种机制。

执行一个JobDetail。将通知您通常使用Scheduler的应用程序将不使用此侦听器。

public interface JobListener { String getName(); // Job调用前 void jobToBeExecuted(JobExecutionContext context); // veto 否决的意思 , 由 org.quartz.TriggerListener#vetoJobExecution 决定 void jobExecutionVetoed(JobExecutionContext context); // Job调用后 void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException);}public interface TriggerListener { String getName(); // 触发Trigger void triggerFired(Trigger trigger, JobExecutionContext context); // 禁止执行JobExecution boolean vetoJobExecution(Trigger trigger, JobExecutionContext context); // triggerMisfired void triggerMisfired(Trigger trigger); void triggerComplete(Trigger trigger, JobExecutionContext context, CompletedExecutionInstruction triggerInstructionCode);}

执行过程如下。

触发器触发-& gt;

vetoJobExecution?false-& gt;jobToBeExecuted-& gt;调用作业接口中的方法->:jobwascexecuted

?真-& gt;作业执行

?

很容易实现两个接口,

org.quartz.jobListener.j1.class=com.example.springquartz.MyJobListener// 比如属性设置可以这么做, 只要实现set方法就可以// org.quartz.jobListener.l1.name=MyJobListenerorg.quartz.triggerListener.t1.class=com.example.springquartz.MyTriggerListener

8.调度程序

这是Quartz调度程序的主界面。

这是QuartzScheduler的主接口,唯一对外暴露的接口,可以操作所有的资源对象。主要原因是它包含Quartz调度器,所以可以操作其他对象。

我们通常是StdSchedule。

主要实现:

Quartz框架设计以及原理

9.核心操作流程。

QuartzSchedulerResources把所有东西都放进去。

org . quartz . core . quartz scheduler # quartz scheduler是执行任务的核心,也就是心脏。然后交给->: QuartzSchedulerThread轮询处理

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); } // 核心处理线程 - > 这个就是整个心脏 this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); // 心脏启动 schedThreadExecutor.execute(this.schedThread); if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime); } jobMgr = new ExecutingJobsManager(); addInternalJobListener(jobMgr); errLogger = new ErrorLogger(); addInternalSchedulerListener(errLogger); signaler = new SchedulerSignalerImpl(this, this.schedThread); getLog().info("Quartz Scheduler v." + getVersion() + " created.");}

org . quartz . core . quartz scheduler thread # run-& gt;这里描述的是quartzschedulerthread的主处理循环。

不知道是做什么的,主要用来处理作业,封装作业。

@Overridepublic void run() { int acquiresFailed = 0; while (!halted.get()) { try { // 因为提前开启了 , 所以需要等待真正启动了- > 调用start, 才能继续执行. paused暂停.在 // org.quartz.core.QuartzScheduler#start执行. // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } } // 这里是获取正在等待的ThreadPool中的线程 . 如果有执行,你要知道`QuartzSchedulerResources` 把所有东西都放进去了 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... // 获取当前时刻的triggers List<OperableTrigger> triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { // 打印日志 qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) { if (acquiresFailed == 0) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } // 这里就是如果有triggers继续执行 if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; // 这个处理比较灵性.. 死循环, 你差多久, 我就 wait多久 while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // wait // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers // 防止其他发生 if(triggers.isEmpty()) continue; // 初始化一个 TriggerFiredResult // set triggers to 'executing' List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } // 循环执行 for (int i = 0; i < bndles.size(); i++) { // TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // 初始化任务 JobRunShell shell = null; try { // 初始化流程 shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } // 这里就是任务执行了. ... 这里就涉及到 -> `org.quartz.simpl.SimpleThreadPool.WorkerThread#run()` 这里了 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null;}

是始作俑者。我们知道我们必须执行一个线程。开始开始,所以这并不奇怪。上面的线程肯定是守护线程,都是,因为我们启动后不阻塞,只是GG和JVM退出。

org . quartz . core . quartz scheduler # start-& gt;给你。核心是schedThread.togglePause(false),

public void start() throws SchedulerException { if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } // QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null) { initialStart = new Date(); this.resources.getJobStore().schedulerStarted(); startPlugins(); } else { resources.getJobStore().schedulerResumed(); } // 这里设置的目的就是将它继续执行 -> `org.quartz.core.QuartzSchedulerThread#run`252取消暂停. schedThread.togglePause(false); getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted();}

调用qssrcs。getthreadpool()。runinthread (shell)->:调用org。quartz . simple . simple thread pool # runinthread->:那么在这个下面

public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available , 这里就是等 , 等有可用的线程,这里我不理解为啥不使用队列... 其实可以使用队列的. while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { // 等到了, 拿着第一个可用线程执行 WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } // 释放锁 nextRunnableLock.notifyAll(); handoffPending = false; } // 返回OK , 这里其实是异步的 , 因为处理流程交给了子线程, 他只是返回了他已经处理了, 但是有一个问题就是线程处理时间过长, 就会影响周期性.(所以建议设置多点,但是也不好,线程一直处于不断运行阶段) return true;}

调用wt . run(runnable);-& gt;调用组织。quartz . simple . simple thread pool . worker thread # run(Java . lang . runnable)来执行run方法。

public void run(Runnable newRunnable) { synchronized(lock) { if(runnable != null) { throw new IllegalStateException("Already running a Runnable!"); } runnable = newRunnable; lock.notifyAll(); }}

然后执行等待线程,等待-->:等待下面的执行。..

org . quartz . simple . simple thread pool . worker thread # run()每个工作线程在初始化的时候都一直在这里等待。等等等等等等等等。

@Overridepublic void run() { boolean ran = false; // 转 .... while (run.get()) { try { // 同步等待. 500ms , 直到有 runnable ... 很可怜, 其实基本大多都是这么实现的 eventloop synchronized(lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; // 启动任务, 这里直接将Runnable对象启动, 而不是new thread(runnable).start,而是直接调用的方式. 让这个线程去处理. runnable.run(); } } } catch (InterruptedException unblock) { // do nothing (loop will terminate if shutdown() was called try { getLog().error("Worker thread was interrupt()'ed.", unblock); } catch(Exception e) { // ignore to help with a tomcat glitch } } catch (Throwable exceptionInRunnable) { try { getLog().error("Error while executing the Runnable: ", exceptionInRunnable); } catch(Exception e) { // ignore to help with a tomcat glitch } } finally { synchronized(lock) { runnable = null; } // repair the thread in case the runnable mucked it up... if(getPriority() != tp.getThreadPriority()) { setPriority(tp.getThreadPriority()); } if (runOnce) { run.set(false); clearFromBusyWorkersList(this); } else if(ran) { ran = false; makeAvailable(this); } } } //if (log.isDebugEnabled()) try { getLog().debug("WorkerThread is shut down."); } catch(Exception e) { // ignore to help with a tomcat glitch }}

等待使用

public class Demo { public static final Object OBJECT = new Object(); public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); while (true) { synchronized (OBJECT) { System.out.println("time : " + (System.currentTimeMillis() - start) + "ms."); OBJECT.wait(1000L); } } }}

输出

time : 0ms.time : 1000ms.time : 2001ms.time : 3002ms.time : 4002ms.time : 5003ms.

10.摘要

石英框架分析一波,主要类似于NIO的思路。主线程不断地监视和轮询工作线程,同时也监视触发器,看是否可以触发。是的,找一个空闲的工作线程来处理,所以是NIO的主意。

但是,有几个问题。如果初始化线程(工作线程)太多,这些线程在初始化阶段就会被启动,所以会消耗CPU资源。

如果初始化线程(工作线程)太少,很容易阻塞,所以..................

他的设计框架还是很不错的。可以说是一个设计模式,整体设计完全解耦。挺好看的。其实学习框架是为了完善自己的设计思路,不断学习和发现新的世界。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。

作者:美站资讯,如若转载,请注明出处:https://www.meizw.com/n/145853.html

发表回复

登录后才能评论