美文网首页
elastic-job-master 运行实例

elastic-job-master 运行实例

作者: 爱笑lo | 来源:发表于2017-07-21 10:06 被阅读0次

    1.环境搭建

    使用的官方的example-job-example中的elastic-job-example-lite-java
    这里粘上github的地址:https://github.com/dangdangdotcom/elastic-job
    下载后解压导入elastic-job-example maven工程
    没装maven插件的需要装个maven插件,下载个最新的eclipse都配着有
    项目导进去了,好像扯得有点偏,我们回到环境的搭建上
    需要配置:
    (1)zookeeper,这个网上的教程很详细
    (2)mysql,还需要建个库命名为:elastic_job_log,后面会用到

    (3)elastic-job-lite-console-2.1.2.tar.gz,这个百度搜一下吧,应该能下到。

    2.环境的启动

    (1)zookeeper的配置这里就不说了,打开目录进入bin


    运行zkServer.cmd;
    (2)把下载的这个解压elastic-job-lite-console-2.1.2.tar.gz,进入目录bin文件,运行start.bat启动服务后,用浏览器进入http://localhost:8899 看是否启动成功,默认的用户名:root,密码:root;
    进入注册中心添加zookeeper

    进入事件追踪数据添加数据库(数据这里要开启的,名字,地址也要对,不然连不上的)

    到这里我们的环境也就配好了,现在进入最后一步。

    3.修改代码并运行

    添加aliyun仓库

    http://maven.aliyun.com/nexus/content/groups/public/

    然后修改一下代码
    修改com.dangdang.ddframe.job.example.JavaMain.java

    package com.dangdang.ddframe.job.example;
    import com.dangdang.ddframe.job.config.JobCoreConfiguration;
    import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
    import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
    import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
    import com.dangdang.ddframe.job.event.JobEventConfiguration;
    import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
    import com.dangdang.ddframe.job.example.job.dataflow.JavaDataflowJob;
    import com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob;
    import com.dangdang.ddframe.job.lite.api.JobScheduler;
    import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
    import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
    import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import org.apache.commons.dbcp.BasicDataSource;
    import javax.sql.DataSource;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.nio.file.attribute.PosixFilePermissions;
    public final class JavaMain {
    // zookeeper config
    private static final int EMBED_ZOOKEEPER_PORT = 2181;
    private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
    private static final String DIGEST_STR = "admin.admin";
    private static final String JOB_NAMESPACE = "elastic-job-example-lite-java";
    // MySQL config
    private static final String EVENT_RDB_STORAGE_DRIVER = "com.mysql.jdbc.Driver";
    private static final String EVENT_RDB_STORAGE_URL = "jdbc:mysql://localhost:3306/elastic_job_log";
    private static final String EVENT_RDB_STORAGE_USERNAME = "root";
    private static final String EVENT_RDB_STORAGE_PASSWORD = "root";
    public static void main(final String[] args) throws IOException {
    // 连接到注册中心
    CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
    // 数据源配置
    JobEventConfiguration jobEventConfig =
    new JobEventRdbConfiguration(setUpEventTraceDataSource());
    // 设置简单的任务
    setUpSimpleJob(regCenter, jobEventConfig);
    //setUpDataflowJob(regCenter, jobEventConfig);
    //setUpScriptJob(regCenter, jobEventConfig);
    }
    private static CoordinatorRegistryCenter setUpRegistryCenter() {
    ZookeeperConfiguration zkConfig =
    new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
    zkConfig.setDigest(DIGEST_STR); // 设置digest
    CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
    result.init();
    return result;
    }
    private static DataSource setUpEventTraceDataSource() {
    BasicDataSource result = new BasicDataSource();
    result.setDriverClassName(EVENT_RDB_STORAGE_DRIVER);
    result.setUrl(EVENT_RDB_STORAGE_URL);
    result.setUsername(EVENT_RDB_STORAGE_USERNAME);
    result.setPassword(EVENT_RDB_STORAGE_PASSWORD);
    return result;
    }
    private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
    JobCoreConfiguration coreConfig = JobCoreConfiguration
    .newBuilder("javaSimpleJob", "0 0/2 * * * ?", 3)
    .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
    // 全局参数
    .jobParameter("jobParameter=jobParameter")
    // 自定义的异常处理类
    .jobProperties("job_exception_handler",
    "com.dangdang.ddframe.job.example.TestHandler")
    // 自定义的线程池
    .jobProperties("executor_service_handler", "com.dangdang.ddframe.job.lite.spring.fixture.handler.SimpleExecutorServiceHandler")
    .build();
    SimpleJobConfiguration simpleJobConfig =
    new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
    LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
    .newBuilder(simpleJobConfig)
    .monitorPort(9888)  // 启动监听  监听的端口为9888
    .build();
    new JobScheduler(regCenter,        // 注册中心
    liteJobConfiguration,      // job的配置
    jobEventConfig,              // 数据源的配置
    new MyElasticJobListener()  // 任务的监听器
    ).init();                            // 启动任务
    }
    private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
    JobCoreConfiguration coreConfig = JobCoreConfiguration
    .newBuilder("javaDataflowElasticJobfalse", "0 0/3 * * * ?", 3)
    .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
    .build();
    DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig,
    JavaDataflowJob.class.getCanonicalName(),
    false);
    new JobScheduler(regCenter,
    LiteJobConfiguration.newBuilder(dataflowJobConfig).build(),
    jobEventConfig).init();
    }
    private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
    JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0 0/2 * * * ?", 3).build();
    ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
    new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
    }
    private static String buildScriptCommandLine() throws IOException {
    if (System.getProperties().getProperty("os.name").contains("Windows")) {
    return Paths.get(JavaMain.class.getResource("/script/demo.bat").getPath().substring(1))
    .toString();
    }
    Path result = Paths.get(JavaMain.class.getResource("/script/demo.sh").getPath());
    Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x"));
    return result.toString();
    }
    }
    

    修改 com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob

    packagecom.dangdang.ddframe.job.example.job.simple;
    importcom.dangdang.ddframe.job.api.ShardingContext;
    importcom.dangdang.ddframe.job.api.simple.SimpleJob;
    importcom.dangdang.ddframe.job.example.fixture.entity.Foo;
    import com.dangdang.ddframe.job.example.fixture.repository.FooRepository;
    importcom.dangdang.ddframe.job.example.fixture.repository.FooRepositoryFactory;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    public class JavaSimpleJob implementsSimpleJob {
    private FooRepository fooRepository =FooRepositoryFactory.getFooRepository();
    @Override
    public void execute(final ShardingContext shardingContext) {
    System.out.println(String.format("Item: %s | Time: %s | Thread: %s| %s",
    shardingContext.getShardingItem(),new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId(), "SIMPLE"));
    System.out.println("----------------------------" +shardingContext.getShardingParameter());
    try {
    Thread.sleep(10000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("------------------------------------------------------------------------------------"+ shardingContext.getShardingParameter());
    List data =fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
    for (Foo each : data) {
    fooRepository.setCompleted(each.getId());
    }
    }
    }
    

    创建类 com.dangdang.ddframe.job.example.MyElasticJobListener

    package com.dangdang.ddframe.job.example;
    importcom.dangdang.ddframe.job.executor.ShardingContexts;
    importcom.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
    publicclass MyElasticJobListener implements ElasticJobListener {
    @Override
    publicvoid afterJobExecuted(ShardingContexts shardingContexts) {
    System.out.println("-----------------afterJobExecuted---------");
    }
    @Override
    publicvoid beforeJobExecuted(ShardingContexts shardingContexts) {
    System.out.println("-----------------beforeJobExecuted---------");
    }
    }
    

    运行JavaMain.java

    4.运行结果分析

    如图三个分片都给一个实例运行

    运行成功。
    我们来试一下开两个JavaMain,看看它是怎么分配的

    运行实例变成了两个

    作业维度分片状态中的分片也改变了,把分片1分给了另一个实例。


    在历史状态中可以看到,0,2片分给一个实例,1分给一个实例,他们的状态如图。可以看到分片1是先运行中,然后等待运行,时间是一样的,可能是后台响应的问题吧。
    看到这里也就告一段落了,第一次写这样的文档,还请大家指教,有什么不懂的下面留言,有时间我就会答复。

    070721更新

    刚才遇到一个朋友导入项目之后项目出错,看了一下是log.,或者.set方法出错。试着重装了一下lombok,就可以了。原来是它的lombok装到另一个eclipse里面了。。。。
    双击.jar文件,选取你用的eclipse,安装重启后clean一下工程即可。

    捕获.PNG

    相关文章

      网友评论

          本文标题:elastic-job-master 运行实例

          本文链接:https://www.haomeiwen.com/subject/lmvskxtx.html