`
jiagyao
  • 浏览: 95993 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Quartz源码分析(一)------ 以线程等待的方式实现按时间调度 (转)

阅读更多
Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:



其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:

view plaincopy to clipboardprint?
public void run() {   
        boolean lastAcquireFailed = false;   
           
        while (!halted) {   
            try {   
                // check if we're supposed to pause...   
                synchronized (pauseLock) {   
                    while (paused && !halted) {   
                        try {   
                            // wait until togglePause(false) is called...   
                            pauseLock.wait(100L);   
                        } catch (InterruptedException ignore) {   
                        }   
                    }   
       
                    if (halted) {   
                        break;   
                    }   
                }   
           ......   
      }   
}  
public void run() {
        boolean lastAcquireFailed = false;
        
        while (!halted) {
            try {
                // check if we're supposed to pause...
                synchronized (pauseLock) {
                    while (paused && !halted) {
                        try {
                            // wait until togglePause(false) is called...
                            pauseLock.wait(100L);
                        } catch (InterruptedException ignore) {
                        }
                    }
    
                    if (halted) {
                        break;
                    }
                }
           ......
      }
} 

以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:
view plaincopy to clipboardprint?
Trigger trigger = null;   
long now = System.currentTimeMillis();   
signaled = false;   
try {   
    trigger = qsRsrcs.getJobStore().acquireNextTrigger(   
            ctxt, now + idleWaitTime);   
    lastAcquireFailed = false;   
} catch (JobPersistenceException jpe) {   
    if(!lastAcquireFailed) {   
        qs.notifySchedulerListenersError(   
            "An error occured while scanning for the next trigger to fire.",   
            jpe);   
    }   
    lastAcquireFailed = true;   
} catch (RuntimeException e) {   
    if(!lastAcquireFailed) {   
        getLog().error("quartzSchedulerThreadLoop: RuntimeException "  
                +e.getMessage(), e);   
    }   
    lastAcquireFailed = true;   
}  
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
    trigger = qsRsrcs.getJobStore().acquireNextTrigger(
            ctxt, now + idleWaitTime);
    lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
    if(!lastAcquireFailed) {
        qs.notifySchedulerListenersError(
            "An error occured while scanning for the next trigger to fire.",
            jpe);
    }
    lastAcquireFailed = true;
} catch (RuntimeException e) {
    if(!lastAcquireFailed) {
        getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                +e.getMessage(), e);
    }
    lastAcquireFailed = true;
} 


这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:

 view plaincopy to clipboardprint?
now = System.currentTimeMillis();   
long triggerTime = trigger.getNextFireTime().getTime();   
long timeUntilTrigger = triggerTime - now;   
long spinInterval = 10;   
int numPauses = (int) (timeUntilTrigger / spinInterval);   
while (numPauses >= 0 && !signaled) {   
    try {   
        Thread.sleep(spinInterval);   
    } catch (InterruptedException ignore) {   
    }   
    now = System.currentTimeMillis();   
    timeUntilTrigger = triggerTime - now;   
    numPauses = (int) (timeUntilTrigger / spinInterval);   
}   
if (signaled) {   
    try {   
        qsRsrcs.getJobStore().releaseAcquiredTrigger(   
                ctxt, trigger);   
    } catch (JobPersistenceException jpe) {   
        qs.notifySchedulerListenersError(   
                "An error occured while releasing trigger '"  
                        + trigger.getFullName() + "'",   
                jpe);   
        // db connection must have failed... keep   
        // retrying until it's up...   
        releaseTriggerRetryLoop(trigger);   
    } catch (RuntimeException e) {   
        getLog().error(   
            "releaseTriggerRetryLoop: RuntimeException "  
            +e.getMessage(), e);   
        // db connection must have failed... keep   
        // retrying until it's up...   
        releaseTriggerRetryLoop(trigger);   
    }   
    signaled = false;   
    continue;   
}  
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
    try {
        Thread.sleep(spinInterval);
    } catch (InterruptedException ignore) {
    }
    now = System.currentTimeMillis();
    timeUntilTrigger = triggerTime - now;
    numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
    try {
        qsRsrcs.getJobStore().releaseAcquiredTrigger(
                ctxt, trigger);
    } catch (JobPersistenceException jpe) {
        qs.notifySchedulerListenersError(
                "An error occured while releasing trigger '"
                        + trigger.getFullName() + "'",
                jpe);
        // db connection must have failed... keep
        // retrying until it's up...
        releaseTriggerRetryLoop(trigger);
    } catch (RuntimeException e) {
        getLog().error(
            "releaseTriggerRetryLoop: RuntimeException "
            +e.getMessage(), e);
        // db connection must have failed... keep
        // retrying until it's up...
        releaseTriggerRetryLoop(trigger);
    }
    signaled = false;
    continue;
} 


此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:

view plaincopy to clipboardprint?
import org.quartz.core.JobRunShell;   
JobRunShell shell = null;   
try {   
    shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();   
    shell.initialize(qs, bndle);   
} catch (SchedulerException se) {   
    try {   
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,   
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);   
    } catch (SchedulerException se2) {   
        qs.notifySchedulerListenersError(   
                "An error occured while placing job's triggers in error state '"  
                        + trigger.getFullName() + "'", se2);   
        // db connection must have failed... keep retrying   
        // until it's up...   
        errorTriggerRetryLoop(bndle);   
    }   
    continue;   
}   
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {   
    try {   
        getLog().error("ThreadPool.runInThread() return false!");   
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,   
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);   
    } catch (SchedulerException se2) {   
        qs.notifySchedulerListenersError(   
                "An error occured while placing job's triggers in error state '"  
                        + trigger.getFullName() + "'", se2);   
        // db connection must have failed... keep retrying   
        // until it's up...   
        releaseTriggerRetryLoop(trigger);   
    }   
}  
import org.quartz.core.JobRunShell;
JobRunShell shell = null;
try {
    shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
    shell.initialize(qs, bndle);
} catch (SchedulerException se) {
    try {
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
    } catch (SchedulerException se2) {
        qs.notifySchedulerListenersError(
                "An error occured while placing job's triggers in error state '"
                        + trigger.getFullName() + "'", se2);
        // db connection must have failed... keep retrying
        // until it's up...
        errorTriggerRetryLoop(bndle);
    }
    continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
    try {
        getLog().error("ThreadPool.runInThread() return false!");
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
    } catch (SchedulerException se2) {
        qs.notifySchedulerListenersError(
                "An error occured while placing job's triggers in error state '"
                        + trigger.getFullName() + "'", se2);
        // db connection must have failed... keep retrying
        // until it's up...
        releaseTriggerRetryLoop(trigger);
    }
} 


此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。

之后的代码就是清扫战场,就不在累述。



本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/cutesource/archive/2009/12/08/4965520.aspx
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics