分布式定时任务(二)

作者: lyndon_nfc | 来源:发表于2016-09-19 22:03 被阅读3585次

    Quartz应用和集群原理分析:

    使用的环境版本:spring4.x+quartz2.2.x

    ****1.1 如何在spring中集成quartz集群****

    1.1.1 基于maven项目,需要在pom.xml引入的j依赖为:

    <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
    </dependency>
    

    1.1.2 Quartz集群的基本配置信息:命名为quartz.properties

    #调度标识名 集群中每一个实例都必须使用相同的名称
    org.quartz.scheduler.instanceName: DefaultQuartzScheduler
    #远程管理相关的配置,全部关闭
    org.quartz.scheduler.rmi.export: false
    org.quartz.scheduler.rmi.proxy: false
    org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
    ThreadPool 实现的类名
    org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
    #线程数量
    org.quartz.threadPool.threadCount: 10
    #线程优先级 
    org.quartz.threadPool.threadPriority: 5
    #自创建父线程
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
    #容许的最大作业延
    org.quartz.jobStore.misfireThreshold: 60000
    #ID设置为自动获取 每一个必须不同 
    org.quartz.scheduler.instanceId: AUTO
    #数据保存方式为持久化
    org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
    #加入集群
    org.quartz.jobStore.isClustered: true
    #调度实例失效的检查时间间隔
    org.quartz.jobStore.clusterCheckinInterval: 10000
    

    1.1.3 在项目中加入Quartz的初始化信息: 命名spring-quartz.xml

    <?xml version="1.0" encoding="UTF-8"?>
      <beans       xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"    xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/mvc
           http://www.springframework.org/schema/mvc/spring-mvc.xsd 
           http://www.springframework.org/schema/aop 
           http://www.springframework.org/schema/aop/spring-aop.xsd
           http://www.springframework.org/schema/context 
           http://www.springframework.org/schema/context/spring-context.xsd">
           <bean id="quartzScheduler"        class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
                <!-- 自定义的bean注入类,解决job里面无法注入spring的service的问题 -->
                <property name="jobFactory">
                    <bean class="com.fc.sales.control.statistics.job.SpringBeanJobFactory" />
                </property>
                <!-- quartz的数据源 -->
                <property name="dataSource" ref="quartz" />
                <!-- quartz的基本配置信息引入 -->
                <property name="configLocation" value="classpath:quartz.properties"/>
    <!-- 调度标识名 -->
                <property name="schedulerName" value="DefaultQuartzScheduler" />
                <!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动 -->
                <property name="startupDelay" value="30" />
                <!-- 通过applicationContextSchedulerContextKey属性配置spring上下文 -->
                <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
                <!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
                <property name="overwriteExistingJobs" value="true" />
                <!-- 设置自动启动 -->
                <property name="autoStartup" value="true" />
                <!-- 注册触发器 -->
                <property  name="triggers">
                    <list>
                        <ref bean="orderSyncScannerTrigger" />
                    </list>
                </property>
                <!-- 注册jobDetail -->
                <property name="jobDetails">
                    <list>
                        <ref bean="orderSyncDetail" />
                    </list>
                </property>
            </bean>  
            <!--配置调度具体执行的方法-->  
            <bean id="orderSyncDetail"  
                class="org.springframework.scheduling.quartz.JobDetailFactoryBean">  
               <property name="jobClass" value="com.fc.sales.control.statistics.job.OrderSyncJob"/>
                <property name="durability" value="true" />    
                <property name="requestsRecovery" value="true" /> 
            </bean> 
            <!--配置调度执行的触发的时间-->  
      <bean id="orderSyncScannerTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">  
                <property name="jobDetail" ref="orderSyncDetail" />  
                <property name="cronExpression">  
                    <!-- 每天上午00:30点执行任务调度 -->  
                    <value>0 30 00 * * ?</value>  
                </property>  
            </bean>
    
    • 1.1.4 在web.xml启动项中加入spring-quartz.xml文件*
     <servlet>
            <servlet-name>dispatcherServlet</servlet-name>
            <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
            <init-param>
                <param-name>contextConfigLocation</param-name>
                <param-value>classpath:/spring/spring-quartz.xml</param-value>
            </init-param>
            <load-on-startup>1</load-on-startup>
      </servlet>
    
    • 1.1.5 附上对应的解决无法注入的jobFactory的代码*:
    
    import org.quartz.spi.TriggerFiredBundle;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
    import org.springframework.scheduling.quartz.AdaptableJobFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by lyndon on 16/9/13.
     */
    @Component
    public class jobFactory extends AdaptableJobFactory {
    
        //这个对象Spring会帮我们自动注入进来,也属于Spring技术范畴.
        @Autowired
        private AutowireCapableBeanFactory capableBeanFactory;
    
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            //调用父类的方法
            Object jobInstance = super.createJobInstance(bundle);
            //进行注入,这属于Spring的技术,不清楚的可以查看Spring的API.
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
    
    

    ****1.2 quartz框架实现分布式定时任务的原理****;
    Quartz集群中每个节点都是一个单独的Quartz应用,它又管理着其他的节点。这个集群需要每个节点单独的启动或停止;和我们的应用服务器集群不同,独立的Quratz节点之间是不需要 通信的。不同节点之间是通过数据库表来感知另一个应用。只有使用持久的JobStore才能完成Quartz集群。

    untitled21.jpg
    • 1.2.1 既然Quartz分布式集群是利用数据库锁机制来实现集群环境下的并发控制,我们就需要了解Quratz的数据库表:可以去官方现在对于版本的sql文件导入。
    untitled22.png
    • 1.2.2 Quartz线程模型:
      Quartz中有两类线程:Scheduler调度线程和任务执行线程。

    • 任务执行线程: Quartz不会在主线程(QuartzSchedulerThread)中处理用户job。Quratz是将线程管理的职责委托给ThreadPool,一般的设置使用SimpleThreadPool,SimpleThreadPool创建一定数量的工作线程(WorkerThread),当然这样就意味所有的线程都是异步操作的,所以我们在工作线程的job里面实现业务的时候是没必要重新去创建一个新的线程的,在Quartz创建工作线程的时候已经完成了异步任务的创建。

    • Scheduler调度线程:QuartzScheduler被创建的时候会创建一个QuratzSchedulerThread实例。

    • 1.2.3 Quartz源码分析:

    • QuartzSchedulerThreand包含有决定何时下一个Job将被触发的处理循环,主要的逻辑在其的run()方法中,如下图:

    untitled23.png

    由此可知,QuartzSchedulerThread不断的在获取trigger,触发trigger,释放trigger。
    那么具体又是如何获取trigger的呢,可以从上面的源码中可以发现:qsRsrcs.getJobStore()返回对象是JobStore ,具体的集群配置参考1.1.2. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
    JobStoreTx继承自JobStoreSupport,而JobStoreSupport的acquireNextTrigger,triggerFired,releaseAcquiredTrigger方法负责具体trigger相关操作,都必须获得TRIGGER-ACCESS锁。核心逻辑在executeInNonManagedTxLock方法中。

    untitled24.png

    由上代码可知Quartz集群基于数据库锁的同步操作流程如下图所示:

    untitled25.png

    相关文章

      网友评论

        本文标题:分布式定时任务(二)

        本文链接:https://www.haomeiwen.com/subject/scvnettx.html