elastic-job-lite 快速入门

作者: 二月_春风 | 来源:发表于2017-07-30 23:50 被阅读1619次

    最近打算研究一下elastic-job,同事都没有这方面的使用经验,自己打算看官方文档跟踪源码一步步完善一系列的使用博客,并打算使用到工作中去。算是自己调研,自己推一个框架,希望完成。

    简介

    Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
    Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

    官方主页
    Elastic-Job-Lite官方文档
    Elastic-Job-Cloud官方文档

    使用的是官方最新的release版本2.1.5,下载最新的源码,

    git clone https://github.com/dangdangdotcom/elastic-job.git
    

    下载之后的文件目录,

    elastic-job-example目录中包括Elastic-Job-Lite和Elastic-Job-Cloud的几个demo。本篇主要讲解elastic-job-example-lite的快速入门,elastic-job-example-lite-java项目。

    快速入门

    关于Elastic-Job-Lite的一些简介,可以看官网文档

    Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。

    环境准备
    maven(不会的自己百度)
    Zookeeper(最好是3.4.6版本以上,网上的安装教程太多)
    jdk1.7+

    将elastic-job-example导入到idea中,

    修改JavaMain的部分代码:

    public final class JavaMain {
        
        private static final int EMBED_ZOOKEEPER_PORT = 2181;
        
        private static final String ZOOKEEPER_CONNECTION_STRING = "127.0.0.1:" + EMBED_ZOOKEEPER_PORT;
        
        private static final String JOB_NAMESPACE = "elastic-job-example-lite-java";
        
        // switch to MySQL by yourself
        private static final String EVENT_RDB_STORAGE_DRIVER = "com.mysql.jdbc.Driver";
        private static final String EVENT_RDB_STORAGE_URL = "jdbc:mysql://localhost:3306/elastic_job_log";
    
        private static final String EVENT_RDB_STORAGE_USERNAME = "root";
    
        private static final String EVENT_RDB_STORAGE_PASSWORD = "root";
    
        //h2数据库
    //    private static final String EVENT_RDB_STORAGE_DRIVER = "org.h2.Driver";
    //
    //    private static final String EVENT_RDB_STORAGE_URL = "jdbc:h2:mem:job_event_storage";
    //
    //    private static final String EVENT_RDB_STORAGE_USERNAME = "sa";
    //
    //    private static final String EVENT_RDB_STORAGE_PASSWORD = "";
        
        // CHECKSTYLE:OFF
        public static void main(final String[] args) throws IOException {
        // CHECKSTYLE:ON
            EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
            CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
            JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(setUpEventTraceDataSource());
            setUpSimpleJob(regCenter, jobEventConfig);
            setUpDataflowJob(regCenter, jobEventConfig);
            setUpScriptJob(regCenter, jobEventConfig);
        }
        
        private static CoordinatorRegistryCenter setUpRegistryCenter() {
            ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
            CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
            result.init();
            return result;
        }
        
        private static DataSource setUpEventTraceDataSource() {
            BasicDataSource result = new BasicDataSource();
            result.setDriverClassName(EVENT_RDB_STORAGE_DRIVER);
            result.setUrl(EVENT_RDB_STORAGE_URL);
            result.setUsername(EVENT_RDB_STORAGE_USERNAME);
            result.setPassword(EVENT_RDB_STORAGE_PASSWORD);
            return result;
        }
        
        private static DataSource setUpEventTraceDataSource() {
            BasicDataSource result = new BasicDataSource();
            result.setDriverClassName(EVENT_RDB_STORAGE_DRIVER);
            result.setUrl(EVENT_RDB_STORAGE_URL);
            result.setUsername(EVENT_RDB_STORAGE_USERNAME);
            result.setPassword(EVENT_RDB_STORAGE_PASSWORD);
            return result;
        }
    
        //Simple类型作业,需要实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。
        private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
            JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
            SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
            new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
        }
    
        //Dataflow类型作业,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
        private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
            JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaDataflowElasticJob", "0/10 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
            DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, JavaDataflowJob.class.getCanonicalName(), true);
            new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(dataflowJobConfig).build(), jobEventConfig).init();
        }
    
        //Script类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,
        private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
            JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0/15 * * * * ?", 3).build();
            ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
            new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
        }
        
        private static String buildScriptCommandLine() throws IOException {
            if (System.getProperties().getProperty("os.name").contains("Windows")) {
                return Paths.get(JavaMain.class.getResource("/script/demo.bat").getPath().substring(1)).toString();
            }
            Path result = Paths.get(JavaMain.class.getResource("/script/demo.sh").getPath());
            Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x"));
            return result.toString();
        }
    }
    
    

    官网默认选用的是h2数据库(内存数据库),切换到mysql(自己新建elastic_job_log数据库)。

    编译elastic-job-lite控制台

    到下载的源码路径下elastic-job/elastic-job-lite-console编译,

    mvn install
    

    编译之后在target下面的到elastic-job-lite-console-2.1.5.tar.gz包,解压运行,


    tar -zxvf elastic-job-lite-console-2.1.5.tar.gz
    cd elastic-job-lite-console-2.1.5/bin
    ➜  ll
    total 16
    -rwxr-xr-x  1 naeshihiroshi  staff   459B  7 25 21:19 start.bat
    -rwxr-xr-x  1 naeshihiroshi  staff   582B  7 25 21:19 start.sh
    ➜  ./start.sh
    [INFO ] 2017-07-26 10:55:32,306 --main-- [com.dangdang.ddframe.job.restful.RestfulServer] Elastic Job: Start RESTful server
    [INFO ] 2017-07-26 10:55:32,330 --main-- [org.eclipse.jetty.server.Server] jetty-8.1.19.v20160209
    [INFO ] 2017-07-26 10:55:32,652 --main-- [org.eclipse.jetty.server.AbstractConnector] Started SelectChannelConnector@0.0.0.0:8899
    

    根据不同的环境执行不同的脚本,

    配置注册中心:

    配置注册中心

    执行上面的JavaMain方法,启动了三个作业,可以在控制台上显示,

    JavaMain方法中执行了三个定时任务,分别是

    a.Simple类型作业
    意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

    b.Dataflow类型作业
    Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

    c. Script类型作业
    Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

    这一篇博客先讲到这里。

    相关文章

      网友评论

        本文标题:elastic-job-lite 快速入门

        本文链接:https://www.haomeiwen.com/subject/fzsnkxtx.html