Elastic-job 介绍与使用

作者: 偶像本人 | 来源:发表于2018-09-29 13:59 被阅读2706次

    目标

    把定时任务通过集群的方式进行管理调度,并采用分布式部署,保证系统的高可用,提高了容错。那么如何保证定时任务只在集群的某一个节点上执行,或者一个任务如何拆分为多个独立的任务项,由分布式的机器去分别执行, 众多的定时任务如何统一管理,现在有很多成熟的分布式定时任务框架,都能很好的实现上述的功能。

    基本概念

    elastic-job 是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。
    elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。

    1. 分片

    任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

    例如:有一个遍历数据库某张表的作业,现有2台服务器。
    为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。
    作业遍历数据的逻辑可以为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。
    如果分成10片,则服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9。
    作业遍历数据的逻辑可以为:服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据

    2. 分片项与业务处理解耦

    Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。以上面例子分成10片为例,框架只负责决定服务器分配到哪些分片项,由作业分配策略决定,但是每个分片处理哪一部分数据,比如第一个分片处理id以0-4结尾的数据,是由开发者去决定和处理的。

    3. 中心化

    xxl-job是中心化设计,在xxl-job中,所有定时任务的执行是在调度中心判断作业到了执行的时间,然后通知业务系统去执行,即是作业节点并不知道自己应该什么时候执行定时任务,只能通过调度中心去决定作业的执行。缺点是部署麻烦。

    4. 去中心化

    elastic-job是去中心化设计,作业调度中心节点,各个作业节点是自治的,作业框架的程序在到达相应时间点时各自触发调度,缺点是可能会存在各个作业服务器的时间不一致的问题。

    使用

    1. 引入maven依赖

            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-spring</artifactId>
                <version>2.1.5</version>
            </dependency>
    
            <dependency>
                <artifactId>elastic-job-lite-core</artifactId>
                <groupId>com.dangdang</groupId>
                <version>2.1.5</version>
            </dependency>
    
    

    2. 配置注册中心

        <reg:zookeeper id="regCenter" server-lists="192.168.3.191:2181" namespace="elastic-job-zookeeper" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />
    
    
    image.png

    3. 事件追踪(可选)

        <bean id="elasticJobLog" class="com.alibaba.druid.pool.DruidDataSource" init-method="init"  destroy-method="close">
            <!--<property name="driverClassName" value="${event.rdb.driver}"/>-->
            <property name="url" value="${event.rdb.url}"/>
            <property name="username" value="${event.rdb.username}"/>
            <property name="password" value="${event.rdb.password}"/>
        </bean>
    

    4.作业开发

    package com.isuwang.soa.crm.dbc.action.shareCrm
    
    import java.util.{Date, Optional}
    ...
    /**
      * 每晚定时统计获取纷享CRM当日更新的回访记录同步至快塑系统
      */
    @Transactional(value = "crm", rollbackFor = Array(classOf[Exception]))
    class FxxkUpdateRecordAction() extends Action[Unit] with SimpleJob{
    
      override def preCheck: Unit = {}
    
      override def action: Unit = {
        val beginTime = System.currentTimeMillis()
        rangeConditions.append(rangeCondition)
        searchQuery.rangeConditions(rangeConditions)
    
        getDatas(fXRecords)
    
        // 同步
        fXRecords.foreach(x => {
          val tripList = getTableColumnValue(x)
          tripList.foreach(trip => {
            try {
              executeUpdate(trip._4, trip, x)
            } catch {
              case e: Throwable => {
                logger.error(e.getMessage, e)
                logger.info("=====>纷享更新回访记录失败内容id:{}", x._id)
              }
            }
          })
        })
        logger.info(s"====>${getClass.getName}耗时:{}秒", (System.currentTimeMillis() - beginTime) / 1000)
      }
    ...
      override def execute(shardingContext: ShardingContext): Unit = action
    }
    
        <job:simple id="FxxkUpdateRecordAction" class="com.isuwang.soa.crm.dbc.action.shareCrm.FxxkUpdateRecordAction" registry-center-ref="regCenter" sharding-total-count="1" cron="0 40 * * * ? "  failover="true" description="每晚定时统计获取纷享CRM当日更新的回访记录同步至快塑系统" overwrite="true"  event-trace-rdb-data-source="elasticJobLog" />
    
    

    failover:是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
    description:作业描述
    overwrite:本地配置是否可覆盖注册中心配置,如果可覆盖,每次启动作业都以本地配置为准
    event-trace-rdb-data-source:作业事件追踪的数据源Bean引用

    实现原理

    1. 作业启动

    image.png

    2. 作业执行

    image.png

    相关文章

      网友评论

        本文标题:Elastic-job 介绍与使用

        本文链接:https://www.haomeiwen.com/subject/libeoftx.html