quartz 2.2.x 源码学习 基本执行流程分析

Lucianoesu 2018-10-27

quartz 官网中给出了一些基本概念,请先阅读官网相关概念。

http://www.quartz-scheduler.org/documentation/quartz-2.2.x/tutorials/

下面对最简单的一个任务调度工作进行分析,下面的代码每隔三秒中不断重复执行任务SimpleJob。

public class JobStart {

public static void main(String[] args) throws SchedulerException {

SchedulerFactory sf = new StdSchedulerFactory();

Scheduler sched = sf.getScheduler();

sched.clear();

String instanceId = sched.getSchedulerInstanceId();

JobDetail detail = JobBuilder.newJob(SimpleJob.class)

.withIdentity("Ins Id " + instanceId, instanceId)

.requestRecovery()

.build();

Trigger trigger = TriggerBuilder.newTrigger()

.withIdentity("Ins Id " + instanceId,instanceId)

.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();

sched.scheduleJob(detail, trigger);

sched.start();

}

}

@DisallowConcurrentExecution

public class SimpleJob implements Job{

private static Logger _log = LoggerFactory.getLogger(SimpleJob.class);

@Override

public void execute(JobExecutionContext context) throws JobExecutionException {

if(context.isRecovering()){

_log.info("is recovering");

}else{

_log.info("not recovering");

}

try {

_log.info("do time consumed job start");

Thread.sleep(5000);

_log.info("do time consumed job end");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

quartz 执行任务可以总结为创建JobDetail描述待执行任务,创建Trigger描述什么时候去触发任务。最后将JobDetail与Trigger注册在调度器中,调度器启动后遍开始对任务调度。

因此了解其基本过程需要明白 创建调度器完成哪些工作?调度工作什么时候开始?任务如何被执行?

示例代码中,首先获取一个调度器。

Scheduler sched = sf.getScheduler();

1

进入该方法内部

org.quartz.impl.StdSchedulerFactory.getScheduler()

if (cfg == null) {

initialize();

}

SchedulerRepository schedRep = SchedulerRepository.getInstance();

Scheduler sched = schedRep.lookup(getSchedulerName());

if (sched != null) {

if (sched.isShutdown()) {

schedRep.remove(getSchedulerName());

} else {

return sched;

}

}

sched = instantiate();

return sched;

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

获取调度器对象时,首先查找缓存,如果有就直接拿,若不存在则调用instantiate()方法来创建调度器对象。

进入instantiate方法内部,定义了许多变量。

org.quartz.impl.StdSchedulerFactory.instantiate()

private Scheduler instantiate() throws SchedulerException {

if (cfg == null) {

initialize();

}

if (initException != null) {

throw initException;

}

JobStore js = null;

ThreadPool tp = null;

QuartzScheduler qs = null;

DBConnectionManager dbMgr = null;

String instanceIdGeneratorClass = null;

Properties tProps = null;

......

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

ThreadPool tp为线程池对象,在该方法内部可以看到该线程池的创建 是通过从属性文件quartz.properties中拿到org.quartz.threadPool所定义的线城池类来创建。本文使用的是 org.quartz.simpl.SimpleThreadPool。而最终任务通过线程池来管理。

public static final String PROP_THREAD_POOL_PREFIX = "org.quartz.threadPool";

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

if (tpClass == null) {

initException = new SchedulerException(

"ThreadPool class not specified. ");

throw initException;

}

try {

tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();

} catch (Exception e) {

initException = new SchedulerException("ThreadPool class '"

+ tpClass + "' could not be instantiated.", e);

throw initException;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

后续代码中会创建QuartzScheduler对象,最终来启动调动线程。

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

1

在该构造器内部,会创建QuartzSchedulerThread对象,而QuartzSchedulerThread负责对我们定义的Job进行调度。当创建完后

提交给执行器对象schedThreadExecutor.execute(this.schedThread);准备执行。

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)

throws SchedulerException {

this.resources = resources;

if (resources.getJobStore() instanceof JobListener) {

addInternalJobListener((JobListener)resources.getJobStore());

}

this.schedThread = new QuartzSchedulerThread(this, resources);

ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();

schedThreadExecutor.execute(this.schedThread);

if (idleWaitTime > 0) {

this.schedThread.setIdleWaitTime(idleWaitTime);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

当sf.getScheduler();执行完毕后,线程池,执行器,调度器,负责调度Job的线程都已创建完毕。而任务调度线程QuartzSchedulerThread也准备执行。

可以看到sf.getScheduler();调用完毕后,任务调度线程处于暂停状态,并不断检查状态等待恢复并执行任务。

org.quartz.core.QuartzSchedulerThread.run()

while (!halted.get()) {

try {

// check if we're supposed to pause...

synchronized (sigLock) {

while (paused && !halted.get()) {

try {

// wait until togglePause(false) is called...

sigLock.wait(1000L);

} catch (InterruptedException ignore) {

}

}

if (halted.get()) {

break;

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

在 QuartzSchedulerThread 构造方法中会设置两个变量初始值。

paused = true;

halted = new AtomicBoolean(false);

1

2

当我们获取到调度器对象,并指定trigger与detail后,调用start方法。

当调用执行器的 org.quartz.core.QuartzScheduler.start()方法在该方法内部会调用方法 schedThread.togglePause(false);将paused变量设置为false,并唤醒QuartzSchedulerThread线程。随QuartzSchedulerThread线程开始执行调度任务。

org.quartz.core.QuartzScheduler.start()

void togglePause(boolean pause) {

synchronized (sigLock) {

paused = pause;

if (paused) {

signalSchedulingChange(0);

} else {

sigLock.notifyAll();

}

}

1

2

3

4

5

6

7

8

9

10

11

12

至此任务调度工作准备开始,Job等待被调用。

QuartzSchedulerThread的run方法中检测到变量状态被修改,开始调度工作。

public void run() {

boolean lastAcquireFailed = false;

while (!halted.get()) {

try {

// check if we're supposed to pause...

synchronized (sigLock) {

while (paused && !halted.get()) {

try {

// wait until togglePause(false) is called...

sigLock.wait(1000L);

} catch (InterruptedException ignore) {

}

}

if (halted.get()) {

break;

}

}

int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

List<OperableTrigger> triggers = null;

long now = System.currentTimeMillis();

clearSignaledSchedulingChange();

... 省略

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

run方法中的执行过程可粗略归纳为

1.查找将要触发的Trigger

2.检测条件,等待执行

3.执行任务。

4.释放Trigger

在执行时,任务被包装为JobRunShell来运行。

当qsRsrcs.getThreadPool().runInThread(shell)被调用时,我们自己的Job被提交,并等待线程池调度执行。

JobRunShell shell = null;

JobRunShell shell = null;

try {

shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);

shell.initialize(qs);

} catch (SchedulerException se) {

qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);

continue;

}

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {

// this case should never happen, as it is indicative of the

// scheduler being shutdown or a bug in the thread pool or

// a thread pool being used concurrently - which the docs

// say not to do...

getLog().error("ThreadPool.runInThread() return false!");

qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

在 qsRsrcs.getThreadPool().runInThread(shell)方法中会创建一个线程来运行我们定义的Job,而Job的信息则是通过JobDetail来获取。经过上述过程就形成了quartz 进行任务调度的基本过程。

最后总结,启动时的三个主要方法,及工作概述如下。

Scheduler sched = sf.getScheduler();

完成了线程池创建,调度线程创建,调度线程初始处理暂停状态。

sched.scheduleJob(detail, trigger);

指定JobDetail与Trigger。

sched.start();

修改状态未,使调度线程开始运行。任务以JobRunShell的形式被执行。

quartz 2.2.x 源码学习 基本执行流程分析

工艺流程图图

相关推荐