美文网首页分布式系统
elasticjob-业务分片

elasticjob-业务分片

作者: 松松木tell | 来源:发表于2019-10-29 16:18 被阅读0次
  • 方式1:机器分片和业务自定义分片对应

package com.example.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @Description:此用例是把机器分片和业务自定义分片进行对应
 * @Date: 2019/10/24
 * @Author: zhangjinsong
 */
public abstract class AbstractDataFlowJobBean<T> implements DataflowJob<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataFlowJobBean.class);

    private static final String[] BUSINESS_SHARDING = new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
            "a", "b", "c", "d", "e", "f"};

    /**
     * 拉取业务数据
     *
     * @param shardingContext
     * @return java.util.List<T>
     * @date: 2019/10/25
     * @author: zhangjinsong
     */
    protected abstract List<T> doFetchData(ShardingContext shardingContext);

    /**
     * 执行业务
     *
     * @param task
     * @return void
     * @date: 2019/10/25
     * @author: zhangjinsong
     */
    protected abstract void doProcessData(T task);

    @Override
    public List<T> fetchData(ShardingContext shardingContext) {
        int businessTotalCount = BUSINESS_SHARDING.length;
        int shardingTotalCount = shardingContext.getShardingTotalCount();
        int shardingItem = shardingContext.getShardingItem();
        if (shardingItem < businessTotalCount) {
            int blockSize = businessTotalCount / shardingTotalCount;
            int remainder = businessTotalCount % shardingTotalCount;
            int fromIndex = shardingItem * blockSize;
            List<String> list = new ArrayList<>();
            Collections.addAll(list, Arrays.copyOfRange(BUSINESS_SHARDING, fromIndex, fromIndex + blockSize));
            if (remainder != 0 && shardingItem < remainder) {
                list.add(BUSINESS_SHARDING[businessTotalCount - remainder + shardingItem]);
            }
            List<T> task = doFetchData(shardingContext).parallelStream().filter(t -> validate(t, list)).collect(Collectors.toList());
            return task;
        } else {
            LOGGER.error("任务分片多于业务分片,部分机器获取不到任务");
        }
        return Collections.emptyList();
    }


    @Override
    public void processData(ShardingContext shardingContext, List<T> list) {
        list.parallelStream().forEach(this::doProcessData);
    }

    /**
     * 比较业务数据是否是当前节点执行
     *
     * @param t
     * @param shardingStrings
     * @return boolean
     * @date: 2019/10/25
     * @author: zhangjinsong
     */
    protected boolean validate(T t, List<String> shardingStrings) {
        String hashCode = String.valueOf(t.hashCode());
        for (String shardingString : shardingStrings) {
            if (hashCode.endsWith(shardingString)) {
                return true;
            }
        }

        return false;
    }
}

  • 方式2:hashcode取余

package com.example.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;

import java.util.List;
import java.util.stream.Collectors;

public abstract class AbstractDataFlowJobBeanTwo<T> implements DataflowJob<T> {

    protected abstract List<T> doFetchData(ShardingContext shardingContext);

    protected abstract void doProcessData(T task);

    @Override
    public List<T> fetchData(ShardingContext shardingContext) {
        int shardingTotalCount = shardingContext.getShardingTotalCount();
        int shardingItem = shardingContext.getShardingItem();
        List<T> task = doFetchData(shardingContext).parallelStream().filter(
                t -> validate(t, shardingItem, shardingTotalCount)).collect(Collectors.toList());
        return task;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<T> list) {
        list.parallelStream().forEach(this::doProcessData);
    }

    protected boolean validate(T t, int shardingItem, int shardingTotalCount) {
        int remainder = Math.abs(t.hashCode()) % shardingTotalCount;
        if (remainder == shardingItem) {
            return true;
        }
        return false;
    }
}

public class MyJobDemoTwo extends AbstractDataFlowJobBeanTwo<String> {
    @Override
    protected List<String> doFetchData(ShardingContext shardingContext) {
        UserData user = new UserData();
        List<SystemUser> listUser = user.getUserList();
        return listUser.stream().map(c -> c.getId().toString()).collect(Collectors.toList());
    }

    @Override
    protected void doProcessData(String task) {
        System.out.println("执行方法成功用户ID:"+task);
    }

}
    <job:dataflow id="myJobDemo" class="com.example.demo.job.simple.MyJobDemoTwo" registry-center-ref="regCenter"
                  sharding-total-count="12" cron="0 0/2 * * * ?" description="zjs-MyJobDemo" overwrite="true"/>

相关文章

  • elasticjob-业务分片

    方式1:机器分片和业务自定义分片对应 方式2:hashcode取余

  • 分库分表

    1.纵向分片(垂直分片) 和微服务的思想一样,根据业务场景将不同的数据存入不同的数据库2.横向分片(水平分片) ...

  • Sharding-Jdbc的分片算法及分表分库

    上一篇 << >> 分片算法 分片算法目前需要业务方开发者自行实现,目前支持通过等号(doEqualShardin...

  • 分库分表的故事

    数据分片 一般单表的数据阈值在1TB之内,当超过1TB的时候,就需要进行数据分片了 垂直分片 按照业务进行拆分,不...

  • ElasticJob分片机制

    ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片...

  • mycat分片(枚举分片)

    分片枚举 通过在配置文件中配置可能的枚举 id,自己配置分片,本规则适用于特定的场景,比如有些业务需要按照省份或区...

  • ShardingSphere官网及总结

    中文文档 分片 分片算法 精确分片算法 范围分片算法 复合分片算法 Hint分片算法 分片策略 标准分片策略 复合...

  • 日知录1-数据分片模型和路由算法

    分片模型: 先将数据映射到分片;再将分片映射到机器;都是多对一的关系。 分片策略:哈希分片和范围分片。 哈希分片通...

  • 分库分表第五篇之强制分片路由

    一.强制分片路由 在一些应用场景中,分片条件并不存在于SQL,而存在于外部业务逻辑。因此需要提供一种通过外部指定分...

  • MongoDB 集群

    分片群集 MongoDB分片群集由以下组件组成: 分片:每个分片包含分片数据。每个分片都可以部署为副本集。 mon...

网友评论

    本文标题:elasticjob-业务分片

    本文链接:https://www.haomeiwen.com/subject/gtdgvctx.html