项目中用到了elastic-job,自己用的时候也是根据别人的例子复制下来,直接模仿用的,一直也没去理解它的工作原理,一知半解的,各个参数意思也不是很懂,然后今天突然想了解下它的工作原理,看了官网,以及网上的一些文章,在这里做个记录。
elastic-job是什么?
elastic-job是什么呢?下面是摘自官网的话:
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能(PS:我在这里只说Elastic-Job-Lite,因为Job-Cloud我没去研究)。
简单的说Elastic-Job-Lite就是一个分布式定时任务。
怎么用elastic-job
使用elastic-job很简单,在elastic-job官网上也有例子。
- 添加maven依赖
<!-- 引入elastic-job-lite核心模块 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<!-- 使用springframework自定义命名空间时引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elasticjob.version}</version>
</dependency>
2.写定时任务类
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int item = shardingContext.getShardingItem();
// System.out.println(item);
switch (item) {
case 0:
// do something by sharding item 0
System.out.println(String.format("-----ThreadId:%s,当前分片项:%s",Thread.currentThread().getId(),item));
break;
case 1:
// do something by sharding item 1
System.out.println(String.format("-----ThreadId:%s,当前分片项:%s",Thread.currentThread().getId(),item));
break;
case 2:
// do something by sharding item 2
System.out.println(String.format("-----ThreadId:%s,当前分片项:%s",Thread.currentThread().getId(),item));
break;
}
}
}
job有好几种作业类型,这里只是简单的作业,实现SimpleJob 即可
3.配置作业
与spring容器配合作业,把bean依赖注入
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作业注册中心 -->
<reg:zookeeper id="jobRegesterCenter" server-lists="192.168.3.98:2181" namespace="lfp-elastic-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置作业-->
<job:simple id="testJob" class="cn.laikyy.lfp.job.TestJob" registry-center-ref="jobRegesterCenter" cron="0 50 23 * * ?"
disabled="false" overwrite="true" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" description="测试" />
</beans>
OK,配置好后我们在elastic-job-lite-console控制台去触发执行,看看输出结果。
输出结果:
-----ThreadId:81,当前分片项:0
-----ThreadId:82,当前分片项:1
-----ThreadId:83,当前分片项:2
好现在我们来分析一下输出结果:
首先对于testJob我们配了sharding-total-count="3",意思是分3片,但是我们现在只有一台机器执行,所以输出结果是当前机器用了三个线程去跑这个定时任务,每个线程对应一个分片。
好,那么如果我们用两台机器去执行那么会怎么样呢?由于elastic-job实例是根据ip来的,所以我这边需要通过虚拟机或另一台来执行,那么结果会怎么样呢?
实例1:
-----ThreadId:81,当前分片项:0
实例2:
-----ThreadId:92,当前分片项:1
-----ThreadId:93,当前分片项:2
我们可以看到,3片分到两台机器,有一台执行了两个分片项,一台执行一个分片项;当然如果三台实例去执行,那么结果会是每台执行一个分片项;如果四台机器去分3个任务,那么最终执行也是3台,每台执行一个分片项,还有一台空闲。
当然当某一台挂了,那么调度中心就会自动把挂了的那台的任务分给空闲的实例。如果没有空闲的实例那么就会把挂了的那台的任务分给其他的一台上去。
现实场景怎么用?
好,我们大致了解了如何使用elastic-job,那如何应用到我们实际的项目里呢?比方说余额宝每天定时给用户计算收益。如果两台机器,我们可以把用户id是奇数的给一台机器去执行,偶数的给另一台执行。像这样
@Override
public void execute(ShardingContext shardingContext) {
int item = shardingContext.getShardingItem();
switch (item) {
case 0:
//1 查询用户id是奇数的用户
//2 计算的收益
break;
case 1:
//1 查询用户id是偶数的用户
//2 计算的收益
break;
}
sql查询:
SELECT * FROM lfp_user WHERE mod(id,2)=0; //查询id为偶数的用户
SELECT * FROM lfp_user WHERE mod(id,2)=1; //查询id为奇数的用户
这样的话,就把压力平均分摊到两台服务器上去了,而且也能更快执行完成。
但是如果发现这段时间多了很多人,以前只要两台机器在1个小时就能跑完的,现在5个小时也跑不完,怎么办呢?加机器?加机器肯定是必须的,但是我们发现代码里写死了,分2片,我们总不能去改成3、4、5,万一以后还有更多呢,所以我们可以
对sql进一步优化。我们把分片数和当前分片项传到sql,这样sql可以动态去查询对应分片后的用户了
改进后sql:
//动态查询该分片下要执行的用户
SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
改进后代码:
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardingTotalCount = shardingContext.getShardingTotalCount();
int shardingItem = shardingContext.getShardingItem();
//1 查询改分片下要执行的用户,带参数shardingTotalCount和shardingItem
//2 计算这些用户收益
}
}
改进后我们的代码也简洁了,当shardingTotalCount 有5片,并且5台机器
机器0:查询用户id能整除5的,如:5、10、15、20、、、、
机器1:查询用户id除以5余1的,如:1、6、11、16、、、、
机器2:查询用户id除以5余2的,如:2、7、12、17、、、、
机器3:查询用户id除以5于余3的,如:3、8、13、18、、、、
机器4:查询用户id除以5余4的,如:4、9、14、19、、、、
同理,当6台、7台等,这样我们就可以把服务器压力分摊到每一台上去了,如果临时压力增大,我们还可以继续加机器来缓解,当不需要这么多时我们可以减少机器实现分布式部署。
这里只是记录一下自己使用elastic-job,具体的实现与原理我也还没有深入去了解,这段时间我也要深入了解下它的实现,这篇文章就写到这。祝大家周末愉快。
网友评论