美文网首页定时任务调度
TBSchedule使用简介

TBSchedule使用简介

作者: 雪韵天际 | 来源:发表于2018-09-21 12:24 被阅读0次

    TBSchedule是什么

    TBSchedule是一个支持分布式的调度框架,让批量任务或者不断变化的任务能够被动态的分配到多个主机的JVM中,在不同的线程组中并行执行,所有的任务能够被不重复,不遗漏的快速处理。基于ZooKeeper的纯Java实现,由Alibaba开源。    

    TBSchedule能干什么

    TBSchedule可以将调度作业从业务系统中分离出来,降低或者是消除和业务系统的耦合度,进行高效异步任务处理。在互联网和电商领域TBSchedule的使用非常广泛,目前被应用于阿里巴巴、淘宝、支付宝、京东、聚美、汽车之家、国美等很多互联网企业的流程调度系统。

    TBSchedule如何使用

    1、下载TBSchdule源码

    下载地址:TBSchedule源码

    官网目前打不开了,可从本人的百度网盘下载。链接:TBSchedule源码 密码:vuzg

    2、部署zookeeper

    下载zookeeper并安装部署。ZK下载地址

    3、部署管理控制台

    把下载的源码的console文件夹中ScheduleConsole.war文件部署到tomcat容器中,并启动tomcat服务。

    图1.管理控制台war包

    访问http://localhost:8080/ScheduleConsole/schedule/config.jsp地址进行链接ZK的基础信息配置,如下图:

    图2.配置ZK连接信息

    4、编写客户端代码

    引入jar包

    <dependency>

        <groupId>com.taobao.pamirs.schedule</groupId>

        <artifactId>tbschedule</artifactId>

        <version>3.3.3.2</version>

    </dependency>

    配置zk连接信息

    <bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"

          init-method="init">

        <property name="zkConfig">

                <entry key="zkConnectString" value="localhost:2181" />

                <entry key="rootPath" value="/tb-schedule/test" />

                <entry key="zkSessionTimeout" value="60000" />

                <entry key="userName" value="admin" />

                <entry key="password" value="123456" />

                <entry key="isCheckParentPath" value="true" />

    </bean>

    实现IScheduleTaskDealSingle接口及selectTasks()方法和execute()方法。

    package com.zhl.tbSchedule;

    import com.alibaba.fastjson.JSONObject;

    import com.taobao.pamirs.schedule.IScheduleTaskDealSingle;

    import com.taobao.pamirs.schedule.TaskItemDefine;

    import com.zhl.tbSchedule.business.dao.TBOrderMapper;

    import com.zhl.tbSchedule.business.dao.TBOrderMapperCopy;

    import com.zhl.tbSchedule.business.domain.TBOrder;

    import lombok.extern.slf4j.Slf4j;

    import org.apache.commons.lang.StringUtils;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Component;

    import java.util.ArrayList;

    import java.util.Comparator;

    import java.util.Date;

    import java.util.List;

    @Slf4j

    @Component("iScheduleTaskDealSingleTest")

    public class IScheduleTaskDealSingleTestimplements IScheduleTaskDealSingle {

    @Autowired

        public TBOrderMappertbOrderMapper;

        @Autowired

        public TBOrderMapperCopytbOrderMapperCopy;

        @Override

        public ComparatorgetComparator() {

    return null;

        }

    @Override

        public ListselectTasks(String taskParameter, String ownSign, int taskQueueNum,

                                        List taskItemList, int eachFetchDataNum)throws Exception {

    System.out.println(Thread.currentThread().getName() +" _IScheduleTaskDealSingleTest start to selectTasks..........");

            if (taskItemList ==null || taskItemList.size() <0) {

    return null;

            }

    System.out.println("IScheduleTaskDealSingleTest config params,taskParameter:{" + taskParameter +"},ownSina:{"

                    + ownSign +"},taskQueueNum:{" + taskQueueNum +"},taskItemList:{" + JSONObject.toJSONString(taskItemList)

    +"}, eachFetchDataNum:{" + eachFetchDataNum +"}");

            List models =new ArrayList();

            String billingNumber ="";

            for (TaskItemDefine taskItemDefine : taskItemList) {

    billingNumber += taskItemDefine.getTaskItemId() +"";

            }

    if (StringUtils.isNotBlank(billingNumber)) {

    //billingNumber = billingNumber.substring(0,billingNumber.length() - 1);

                models =tbOrderMapper.selectByBillNumber(billingNumber, eachFetchDataNum);

            }

    System.out.println("IScheduleTaskDealSingleTest selectTasks result..........models.size:" + models.size());

            return models;

        }

    @Override

        public boolean execute(TBOrder model, String ownSign)throws Exception {

    System.out.println(Thread.currentThread().getName() +" _IScheduleTaskDealSingleTest执行开始啦.........." +new Date());

            // System.out.println(model);

            tbOrderMapperCopy.insertTBOrder(model);

            tbOrderMapper.updateStatus(model.getBillNumber());

    return true;

        }

    }

    selectTasks方法参数说明:

    taskParameter:对应控制台自定义参数,可自定义传入做逻辑上的操作

    taskQueueNum:对应控制台任务项数量

    taskItemList:集合中TaskItemDefine的id值对应任务项值,多线程处理时,根据任务项协调数据一致性和完整性

    eachFetchDataNum:对应控制台每次获取数量,由于子计时单元开始后,会不断的去取数据进行处理,直到取不到数据子计时才停止,等待下一个子计时开始。可以限制每次取数,防止一次性数据记录过大,内存不足。

    ownSign:环境参数,可用于区分生产、测试、开发环境

    5、配置任务管理和调度策略

    图3.任务管理配置

    配置参数说明:

    任务名称:策略调度的标示,一旦创建保存,不可更改

    任务处理的SpringBean:注册到spring的任务bean,如iScheduleTaskDealSingleTest

    心跳频率/假定服务死亡时间/处理模式/没有数据时休眠时长/执行结束时间:一般保持默认即可

    线程数:处理该任务的线程数(一个线程组的线程数量),在没有划分多任务项的情况下,多线程是没有意义的,且线程数量大于任务项也是没有意义的(线程数小于等于任务项),注意如果开启多线程,必须对数据做任务项过滤

    单线程组最大任务项:配置单JVM处理的最大任务项数量,多任务项情况下,可按需限制,一般默认,多执行机会均衡分配

    每次获取数量:子计时单元开始,线程会不断的去获取数据(selectTasks方法每次获取的限制)并处理数据,直到获取不到数据子计时才结束(方法内不用就可以随意配置)

    每次执行数量:每次execute方法执行的数据量,只在bean实现IScheduleTaskDealMulti才生效

    每次处理完休眠时间:子计时单元开始,只要有数据,就会不停的获取不停的处理,这个时间设置后,子计时单元开始每次获取执行后,不管还有没有数据,都先歇会儿再获取处理

    自定义参数:可自定义控制任务逻辑操作

    任务项:这项很重要,在多线程情况下,划分任务项是有意义的,但是要注意必须通过任务项参数,协调待处理数据

    图4.调度策略配置

    配置参数说明:

    策略名称:策略标示,可任意填写

    任务类型:一般保持默认Schedule

    任务名称:对应任务栏被调度任务名称

    任务参数:一般不用,保持默认

    单JVM最大线程组数量:单个JVM允许开启的线程组数

    最大线程组数量:多处理机情况下的线程总数限制(总线程为2,任务项线程为4是没有意义的)

    IP地址:127.0.0.1或者localhost会在所有机器上运行,注意多处理机若没有根据任务子项划分数据处理,会导致多处理机重复处理数据,谨慎配置

    配置完成后启动客户端即可进行任务调度。

    6、分布式高可用高效率保障

    1)调度机的高可用有保障

    多调度机向注册中心注册后,共享调度任务,且同一调度任务仅由一台调度机执行调度,当前调度机异常宕机后,其余的调度机会接上

    2)执行机的高可用有保障

    多执行机向注册中心注册后,配置执行机单线程(多机总线程为1)执行任务,调度机会随机启动一台执行机执行,当前执行异常机宕机后,调度机会会新调度一台执行机。

    3)执行机的并行高效保障

    配置执行机多线程且划分多任务子项后,各任务子项均衡分配到所有执行机,各执行机均执行,多线程数据一致性协调由任务项参数区分。

    4)弹性扩展失效转移保障

    运行中的执行机宕机,或新增执行机,调度机将在下次任务执行前重新分配任务项,不影响正常执行机任务(崩溃的执行机当前任务处理失效);运行中的调度机宕机或动态新增调度机,不影响执行机当前任务,调度机宕机后动态切换。

    相关文章

      网友评论

        本文标题:TBSchedule使用简介

        本文链接:https://www.haomeiwen.com/subject/rajanftx.html