摘要:如何构建具备作业分片和弹性扩缩容的定时任务系统是每个大型业务系统在设计时需要考虑的重要问题?
对于构建一般的业务系统来说,使用Quartz或者Spring Task即可基本满足我们的单体服用应用需要。然而随着线上业务量的不断发展,这两种定时任务已经日渐无法满足我们的需求。一般,使用这两种定时任务框架都会遇到如下的两个痛点问题:
(1)如果业务工程采用集群化的部署,可能会多次重复执行定时任务而导致系统的业务逻辑错误,并产生系统故障。
(2)Quartz的集群方案具备HA功能,可以实现定时任务的分发,但是通过增加机器节点数量的方式并不能提高每次定时任务的执行效率,无法实现任务的弹性分片。
一线互联网大厂都有他们自己为其业务定制化研发的分布式定时任务系统,业务研发工程师可以通过在其Web Console的界面上进行简单的任务配置即可使得大型业务系统实现定时任务的调度、分发、分片、监控和扩缩容等功能。那么,业界是否有开源的组件框架同样具备这些功能呢?答案是肯定的!本文将向大家介绍一款开源的分布式定时任务调度框架—Elastic-Job的功能和原理,同时通过一个简单的案例阐述如何在Spring Boot工程完成Elastic-Job的集成。
一、Elastic-Job的简要介绍
Elastic-Job是当当开源的一款分布式弹性定时任务调度框架,它是从ddframe项目中dd-job的作业模块中分离出来的。Elastic-Job本身是基于开源定时任务框架Quartz和分布式应用程序协调服务Zookeeper及其客户端Curator进行二次开发的。
Elastic-Job在2.X版本以后主要分为Elastic-Job-Lite和Elastic-Job-Cloud两个子项目。其中,Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。而Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。本文叙述主要以Elastic-Job-Lite这个轻量级分布式任务调度框架为主。如下为Elastic-Job分布式协调框架的系统架构图:
Elasitc-Job架构图.jpg从Elastic-Job的架构图上基本就可以看出,其以Jar的形式为业务工程(诸如,Spring Boot工程)的快速集成提供了简便的方式。同时,其提供的定时任务分片、弹性扩缩容、失效转移、作业监控和支持多种作业模式等强大的功能,使业务开发人员无需在这些方面花费较大多的精力,而可以更加专注于平台的业务开发。其主要的功能如下:
(1)定时任务:基于成熟的定时任务作业框架Quartz cron表达式执行定时任务;
(2)作业注册中心:基于Zookeeper和其客户端Curator实现的全局作业注册控制中心;作业注册中心仅用于作业任务注册和监控信息的暂存;
(3)定时任务分片:可以将原本一个较大任务分片成为多小的子任务项分别在多个服务器上同时执行,提高总任务的执行处理效率;
(4)弹性扩容缩容:运行中定时任务所在的服务器崩溃,或新增加n台作业服务器,作业框架将在下次任务执行前重新进行任务调度分发,不影响当前任务的处理与执行;
(5)支持多种任务模式:分别支持Simple、Dataflow和Script类型的定时任务。具体的任务模式后面会详细介绍;
(6)失效转移:运行中的定时任务所在的服务器崩溃不会导致重新分片,会在下次定时任务启动时重新分发和调度;
(7)运行时定时任务状态收集:监控任务运行时的状态,统计最近一段时间任务处理成功和失败的数量,记录作业上次运行开始时间,结束时间和下次运行时间;
(8)支持配置定时任务停止、恢复和禁用:用于操作定时任务的启停,并可以禁止某任务的执行;
(9)Spring支持:Elastic-Job-Lite项目完美支持spring的容器,自定义命名空间,支持占位符
(10)运维平台:提供运维界面,方便开发和运维人员管理生产环境上已经发布的定时任务和注册中心;
二、Elastic-Job的应用场景分析与特点介绍
从第一节内容的介绍中,读者可能还是对Elastic-Job这款分布式弹性定时任务调度框架能够在哪些场景下应用以及其能够为业务系统带来的优点仍然不甚了解。接下来,本节将主要为读者对Elastic-Job的典型应用场景进行分析和介绍。
这里,先举一个典型的业务应用场景,比如在每天的凌晨之前需要对线上用户每日账户余额的增减情况进行汇总统计产生报表。如果数据库中的用户数据较少,那么放在一个普通的Quartz Job中跑这个汇总统计任务一点都不会有任何问题,只要设置适当的Quartz cron定时任务表达式,循环遍历所用用户数据,并对每个用户调用账户余额汇总统计的Api服务即可(这里假设采用微服务构建系统,用户管理为单一服务,账户资产为另外一个服务)。
但是,随着业务系统的发展,用户量越来越大,那么我们很有可能面临在第二天零点之前无法处理完所有用户每日账户余额增减的汇总统计。为了解决单节点部署处理效率较低、定时任务跑超时的问题,那么我们可能采用定时任务Job集群部署,这里需要注意的是多节点部署较容易造成任务的重复执行,在这个例子中,就可能重复累计每个用户账户余额的增减值了。为了解决定时任务重复执行的问题,我们可以对Job加锁来保证每一个时刻仅有同一个任务在执行,但这样也会增加系统的复杂度。
1、Elastic-Job定时任务水平切分
为了根本上解决上述业务的问题,这里就可以引入Elastic-Job。作为业务系统研发人员,可以考虑采用对集群内每一台机器分别处理一部分用户业务的方案来提高整体的处理执行效率。这样,无论用户量有多少,均可以采用增加机器节点数,水平切分总体任务为多个子任务项并调度至不同节点来分别执行,从而最大限度的利用资源。例如,我们想让3台虚拟机跑这个定时任务Job,那么我们可以将总任务分成3个定时分片任务,Elastic-Job分布式弹性定时任务调度框架通过Zookeeper进行协调,最终会让3台虚拟机分别执行0、1、2的三个分片任务,比如server0-->0任务、server1-->1任务、server2-->2任务,其中当server0执行时,可以只针对“id%3==0”的用户进行业务处理;当server1和server2执行时,则分别针对“id%3==1”、 “id%3==2”的用户进行处理。示意图如下:
Elastic-Job定时任务水平切分.png2、Elastic-Job弹性扩缩容
如果某一天线上生产环境出现故障,集群(server0/ server1/ server2)中有一台机器崩溃(假设server2崩溃),那么跑在server2上的一个定时任务在下一个周期执行时会动态分发至server0或者server1这两个正常的节点,此时就会server0->0任务,server1->1/2任务。以此类推,如果server0或server1中再有一台机器出现故障,则三个分片定时任务将会同时在最后一个正常机器节点上执行。示意图如下:
Elastic-Job弹性扩缩容.png三、Elastic-Job在Spring Boot中的实践
本节将主要详细介绍在SpringBoot工程中如何集成Elastic-Job这款这款分布式弹性定时任务调度框架组件,并使用其对定时任务的动态分片和弹性扩缩进行一个示例展示。
1、版本环境
Spring Boot 1.4.1.RELEASE、JDK 1.8、Zookeeper3.4.6、Elastic-Job-Lite 2.1.5。
2、添加Elastic-Job的pom依赖
因为当当开源了Elasitc-job-lite的源码,我们可以通过maven仓库来获得jar包依赖。当访问 http://mvnrepository.com/artifact/com.dangdang/elastic-job-lite选择自己项目需要的版本(在本次集成中选择的版本为2.1.5),点击进入后复制maven内容到pom.xml内即可。Pom文件的配置引入如下图所示:
<elastic-job.version>2.1.5</elastic-job.version>
......
......
......
<!-- elastic-job-core-->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<!-- elastic-job-lite-spring -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>
3、Elastic-Job的定时任务编写
引入Elastic-Job的pom配置后,就可以编写第一个分片定时任务了。这里有必要说明下的是,Elastic-job总共提供了三种类型的定时任务:Simple类型定时任务、Dataflow类型定时任务和Script类型定时任务。其中,Script类型作业意为脚本类型作业,支持shell,python和perl等所有类型脚本,应用得不太多。SimpleJob需要实现SimpleJob接口,是未经过任何封装的定时任务简单实现,与quartz原生接口相似,下面示例中用的就是该类型的Job。Dataflow类型的定时任务主要用于处理数据流,需实现DataflowJob接口。该接口可以提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
package com.test.elasticjob
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyTestSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",
Thread.currentThread().getId(),shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
}
}
4、Spring Boot工程中的Spring配置
在Spring Boot工程中需要在Configuration配置类中将Elastic-Job的Zookeeper作业注册中心、Elastic-Job的数据库日志和上面写的第一个分片定时任务配置进来。具体的配置类和Xml配置文件分别如下(test-elasticjob-cloudbill.properties配置文件中配置了分片定时任务的具体参数和zookeeper注册中心的配置,其中分片定时任务的参数诸如Cron时间表达式、总共分片数量和分片项参数健值对):
@Configuration
@PropertySource("classpath:op-elasticjob-cloudbill.properties")
@ImportResource("classpath:spring-elasticjob.xml")
public class ElasticJobConfig {
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists="${regcenter.zookeeper.serverList}"
namespace="${regcenter.zookeeper.namespace}"
base-sleep-time-milliseconds="${regcenter.zookeeper.base.sleep.time.milliseconds}"
max-sleep-time-milliseconds="${regcenter.zookeeper.max.sleep.time.milliseconds}"
max-retries="${regcenter.zookeeper.max.retries}" />
<!-- Elastic-Job在数据库中的日志 -->
<bean id="elasticJobLog" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://{ip}:{port}/res_hw_cloud_bill"/>
<property name="username" value="root"/>
<property name="password" value="bcrdb"/>
</bean>
<!-- 第一个Elastic-Job分片定时任务 -->
<job:simple id="mySimpleJob" class="com.test.cloudbill.elastic.jobs.MyTestSimpleJob" registry-center-ref="regCenter"
cron="${simplejob.cron}"
sharding-total-count="${simplejob.shardingTotalCount}"
sharding-item-parameters="${simplejob.shardingItemParameters}"
description="我的第一个简单作业" overwrite="true" event-trace-rdb-data-source="elasticJobLog"
job-exception-handler="com.test.cloudbill.elastic.jobs.CustomJobExceptionHandler"/>
</beans>
5、实际环境中的运行Elastic-Job
下面这一节主要来看下在实际环境中运行集成上面示例代码的Elastic-Job的SpringBoot工程的几个主要场景。需要说明的是,在运行工程之前,需要先在另外一台机器上部署上面提到的Zookeeper注册中心。并将部署Zookeeper机器的ip地址和2181端口设置到上面的test-elasticjob-cloudbill.properties配置文件中。
(1)场景1:单台服务器部署工程
当只有一台服务器部署带有定义好两个Elastic-Job分片任务的工程时,两个分片任务会同时调度到这台服务器上,这个跟普通的quartz定时任务没有啥区别。具体可以参考如下的日志输出:
(2)场景2:新增一台部署工程服务器
当在另外一台服务器上部署同样带有定义好两个Elastic-Job分片任务的工程时,Elastic-Job分布式弹性定时任务调度框架会将其中一个分片任务,弹性调度至新增的服务器节点上。此时,两个分片任务就会分别同时运行在两台不同的服务器上,这样极大地提高了整体业务处理的执行效率。两台服务器上的日志输出输出为如下(服务器1的日志和服务器2的日志):
elastic_job增加一个节点后机器2的截图.png
(3)场景3:kill掉一台服务器上tomcat的进程
现在将其中一台服务器上面的tomcat进程kill掉,这台服务器上原先跑的分片定时任务会在下一个执行周期执行前重新调度分发至当前运行正常的服务器上运行,起到分片任务动态缩容的效果。具体可以参考如下的日志输出:
elastic_job双节点机器kill掉一个节点后的截图.png
四、总结
本文主要介绍了一款开源的分布式弹性定时任务调度框架—Elastic-Job的功能特点和框架结构,以及如何在SpringBoot这样的单体应用工程中集成起来,最后根据实际环境中的几种运行场景来的阐述其分片定时任务动态扩缩的功效。由于文章篇幅的原因,本文无法对Elastic-Job框架其他更多的功能点进行介绍,作者将在后面文章中继续进行阐述。同时,限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。
在此顺便为自己打个Call,有兴趣的朋友可以关注下我的个人公众号:“匠心独运的博客”,对于Java并发、Spring和数据库的一些细节、问题的文章将会在这个公众号上发布,欢迎交流与讨论。二维码如下图所示:
匠心独运的博客_二维码.jpg
网友评论