package com.example.demo.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Description:此用例是把机器分片和业务自定义分片进行对应
* @Date: 2019/10/24
* @Author: zhangjinsong
*/
public abstract class AbstractDataFlowJobBean<T> implements DataflowJob<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataFlowJobBean.class);
private static final String[] BUSINESS_SHARDING = new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
"a", "b", "c", "d", "e", "f"};
/**
* 拉取业务数据
*
* @param shardingContext
* @return java.util.List<T>
* @date: 2019/10/25
* @author: zhangjinsong
*/
protected abstract List<T> doFetchData(ShardingContext shardingContext);
/**
* 执行业务
*
* @param task
* @return void
* @date: 2019/10/25
* @author: zhangjinsong
*/
protected abstract void doProcessData(T task);
@Override
public List<T> fetchData(ShardingContext shardingContext) {
int businessTotalCount = BUSINESS_SHARDING.length;
int shardingTotalCount = shardingContext.getShardingTotalCount();
int shardingItem = shardingContext.getShardingItem();
if (shardingItem < businessTotalCount) {
int blockSize = businessTotalCount / shardingTotalCount;
int remainder = businessTotalCount % shardingTotalCount;
int fromIndex = shardingItem * blockSize;
List<String> list = new ArrayList<>();
Collections.addAll(list, Arrays.copyOfRange(BUSINESS_SHARDING, fromIndex, fromIndex + blockSize));
if (remainder != 0 && shardingItem < remainder) {
list.add(BUSINESS_SHARDING[businessTotalCount - remainder + shardingItem]);
}
List<T> task = doFetchData(shardingContext).parallelStream().filter(t -> validate(t, list)).collect(Collectors.toList());
return task;
} else {
LOGGER.error("任务分片多于业务分片,部分机器获取不到任务");
}
return Collections.emptyList();
}
@Override
public void processData(ShardingContext shardingContext, List<T> list) {
list.parallelStream().forEach(this::doProcessData);
}
/**
* 比较业务数据是否是当前节点执行
*
* @param t
* @param shardingStrings
* @return boolean
* @date: 2019/10/25
* @author: zhangjinsong
*/
protected boolean validate(T t, List<String> shardingStrings) {
String hashCode = String.valueOf(t.hashCode());
for (String shardingString : shardingStrings) {
if (hashCode.endsWith(shardingString)) {
return true;
}
}
return false;
}
}
package com.example.demo.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import java.util.List;
import java.util.stream.Collectors;
public abstract class AbstractDataFlowJobBeanTwo<T> implements DataflowJob<T> {
protected abstract List<T> doFetchData(ShardingContext shardingContext);
protected abstract void doProcessData(T task);
@Override
public List<T> fetchData(ShardingContext shardingContext) {
int shardingTotalCount = shardingContext.getShardingTotalCount();
int shardingItem = shardingContext.getShardingItem();
List<T> task = doFetchData(shardingContext).parallelStream().filter(
t -> validate(t, shardingItem, shardingTotalCount)).collect(Collectors.toList());
return task;
}
@Override
public void processData(ShardingContext shardingContext, List<T> list) {
list.parallelStream().forEach(this::doProcessData);
}
protected boolean validate(T t, int shardingItem, int shardingTotalCount) {
int remainder = Math.abs(t.hashCode()) % shardingTotalCount;
if (remainder == shardingItem) {
return true;
}
return false;
}
}
public class MyJobDemoTwo extends AbstractDataFlowJobBeanTwo<String> {
@Override
protected List<String> doFetchData(ShardingContext shardingContext) {
UserData user = new UserData();
List<SystemUser> listUser = user.getUserList();
return listUser.stream().map(c -> c.getId().toString()).collect(Collectors.toList());
}
@Override
protected void doProcessData(String task) {
System.out.println("执行方法成功用户ID:"+task);
}
}
<job:dataflow id="myJobDemo" class="com.example.demo.job.simple.MyJobDemoTwo" registry-center-ref="regCenter"
sharding-total-count="12" cron="0 0/2 * * * ?" description="zjs-MyJobDemo" overwrite="true"/>
网友评论