场景描述:
数据迁移,应用会部署多个实例,具体几个不确定,每个应用为提高处理效率要求使用多线程处理,读取的数据来源为同一个DB2数据库中的同一个表,表数据量1亿,读取到的数据写入到NoSQL存储中;
设计分析:
需要支持水平扩展,实例数量可以随时调整,每个应用的线程数也可以根据部署实例所在机器性能调整,读取的又是同一个大表的数据,需要考虑的问题有:
1、多实例间处理数据不能重复,线程间处理数据不能重复;
2、如果设计为外来请求触发数据迁移,需要考虑负载均衡产生请求倾斜问题,要求每个实例地位对等;
设计方案一:
数据分段示意.png第一步,Redis中设计数据分块锁,各个实例竞争这把锁,拿到锁的实例进行数据分块操作,没有拿到锁的则退出,拿到锁的按设置的分块大小常量进行数据RowNumber的分页查询,仅查主键,拿到的数据集合放到CurrentHashMap中,key值设计为分段序号,并同时创建key的锁,累加锁总数,map、分段锁和锁总数均存放于Redis中,释放分块锁;
第二步,根据锁总数遍历尝试竞争某个分段锁,拿到任何一个分段锁即退出竞争;
第三步,根据拿到的分段锁序号,读取key为该分段锁序号中的主键集合,根据总量和线程数进行模运算分发主键做数据查询,这里也可以平均分配,设计CountDownLatch计数器等待每个线程结束后,删除redis中CurrentHashMap中的该key的数据,释放分段锁;
第四步,检查redis中CurrentHashMap对象的size是否为0,不为0则重复第二步直到为0退出;
附:所有的锁都设置过期时间,以防应用异常无法释放锁;
单机版:
代码:
SegmentDataWithLock
public class SegmentDataWithLock {
//分段锁持有map
private Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<String, ReentrantLock>();
//分段持有主键数组map
private Map<String, String[]> segmentMap = new HashMap<>();
//分段前缀
private final static String SEGMENT_PIX = "seg";
//分段大小
private final int segmentSize = 8;
//总分段数
private int segmentCnt = 0;
/**
* 按分段名称检查分段锁,没有则新建一个锁
* @param segment
* @return Lock
*/
public Lock checkGetLock(String segment, boolean setDefault){
ReentrantLock reentrantLock = lockMap.get(segment);
if (reentrantLock == null && setDefault) {
synchronized (this){
reentrantLock = lockMap.get(segment);
if (reentrantLock == null) {
reentrantLock = new ReentrantLock();
System.out.println("lock for " + segment + " not exists! so create a lock: " + reentrantLock);
lockMap.put(segment, reentrantLock);
return reentrantLock;
}
return reentrantLock;
}
}
return reentrantLock;
}
/**
* 对数组进行分段并设置分段锁
*/
public void blokFullWithSegmentation(String[] arr) {
List<List<String>> segList=new ArrayList<>();
List<String> mList= Arrays.asList(arr);
//System.out.println(mList.toString());
//开始分段
this.delivedEachSubList(mList,segList,segmentSize);
//为每个分段赋值及分段锁
for (int i = 0; i < segList.size(); i++) {
//System.out.println(segList.get(i).toString()+"");
List<String> singleSegList = segList.get(i);
segmentMap.put(this.SEGMENT_PIX.concat(String.valueOf(i)), singleSegList.toArray(new String[0]));
lockMap.put(this.SEGMENT_PIX.concat(String.valueOf(i)),new ReentrantLock());
this.segmentCnt = segList.size();
}
}
public static void delivedEachSubList(List<String> mList, List<List<String>> segList, int segmentSize) {
if( mList.size()%segmentSize!=0) {
for (int j = 0; j < mList.size() / segmentSize + 1; j++) {
if ((j * segmentSize + segmentSize) < mList.size()) {
segList.add(mList.subList(j * segmentSize, j * segmentSize + segmentSize));//0-3,4-7,8-11 j=0,j+3=3 j=j*3+1
} else if ((j * segmentSize + segmentSize) > mList.size()) {
segList.add(mList.subList(j * segmentSize, mList.size()));
} else if (mList.size() < segmentSize) {
segList.add(mList.subList(0, mList.size()));
}
}
}else if(mList.size()%segmentSize==0){
for (int j = 0; j < mList.size() / segmentSize; j++) {
if ((j * segmentSize + segmentSize) <= mList.size()) {
segList.add(mList.subList(j * segmentSize, j * segmentSize + segmentSize));//0-3,4-7,8-11 j=0,j+3=3 j=j*3+1
} else if ((j * segmentSize+ segmentSize) > mList.size()) {
segList.add(mList.subList(j * segmentSize, mList.size()));
} else if (mList.size() < segmentSize) {
segList.add(mList.subList(0, mList.size()));
}
}
}
}
public void removeDataAndLockOfSegment(String segment){
segmentMap.remove(segment);
lockMap.remove(segment);
}
public Map<String, ReentrantLock> getLockMap() {
return lockMap;
}
public Map<String, String[]> getSegmentMap() {
return segmentMap;
}
public int getSegmentCnt() { return segmentCnt; }
}
TransferData
public class TransferData {
private SegmentDataWithLock segmentDataWithLock ;
private List<String> newDataContainer = new ArrayList<>();
public TransferData(){
segmentDataWithLock = new SegmentDataWithLock();
}
/**
* 切分数据
* @param data
*/
public void segmentData(String[] data){
segmentDataWithLock.blokFullWithSegmentation(data);
}
/**
* 迁移数据
* @param key
*/
public void transfer(String key) {
Lock lock = segmentDataWithLock.checkGetLock(key, false);
if (lock == null) {
System.out.println(key+" lock is null"+": "+Thread.currentThread().getName());
return;
}
lock.lock();
try {
Map<String, String[]> segmentMap = segmentDataWithLock.getSegmentMap();
System.out.println(System.currentTimeMillis()+": "+Thread.currentThread().getName()+": transfer key("+key+")-> start, lock:"+ lock.toString());
try {
if (segmentMap.get(key) != null && segmentMap.get(key).length > 0) {
System.out.println(Thread.currentThread().getName()+":"+Arrays.asList(segmentMap.get(key) ));
// Random random = new Random();
// long time = random.nextInt(10000) ;
// Thread.sleep(time);
newDataContainer.addAll(Arrays.asList(segmentMap.get(key) ));
}
//删除数据和锁
segmentDataWithLock.removeDataAndLockOfSegment(key);
Thread.sleep(300L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()+": "+Thread.currentThread().getName()+": transfer key("+key+")-> complete " );
} finally {
lock.unlock();
}
}
public boolean isFinish(){
return segmentDataWithLock.getSegmentMap().isEmpty() ;
}
public List<String> getNewDataContainer() {
return newDataContainer;
}
public int getDataSegmentCnt() {
return segmentDataWithLock.getSegmentCnt();
}
}
Worker
class Worker implements Runnable{
private CountDownLatch downLatch;
private TransferData instance;
Worker( TransferData instance,CountDownLatch downLatc){
this.downLatch = downLatc;
this.instance = instance;
}
@Override
public void run() {
for (int i = 0; i < instance.getDataSegmentCnt(); i++) {
instance.transfer("seg" + i);
}
downLatch.countDown();
}
}
TransferDataTest
public class TransferDataTest {
private static final int handlerThreads = 10;
public static void main(String[] args) {
long start = System.currentTimeMillis();
int arrLength = 1000;
TransferData instance = new TransferData();
String[] data = new String[arrLength];
for (int i = 0; i < arrLength; i++) {
data[i] = "TestData".concat(String.valueOf(i));
}
//迁移前
System.out.println("老数据:"+ Arrays.asList(data));
//切分数据块
instance.segmentData(data);
//迁移开始并计数
CountDownLatch countDownLatch = new CountDownLatch(handlerThreads);
for (int i = 0; i < handlerThreads; i++) {
//模拟并发迁移
new Thread(new Worker(instance,countDownLatch)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//迁移后
System.out.println("Finished:"+instance.isFinish());
System.out.println("新数据:"+instance.getNewDataContainer());
System.out.println("耗时:"+(System.currentTimeMillis() - start)+"ms");
}
}
设计方案二:
对方案一存放的CurrentHashMap改为ConcurrentLinkedQueue,这样可以避免使用分段锁
单机版
SegmentDataWithQueue
public class SegmentDataWithQueue {
//数据段的队列
private ConcurrentLinkedQueue<List<String>> dataQueue = new ConcurrentLinkedQueue<>();
//分段大小
private final int segmentSize = 8;
/**
* 对数组进行分段并存储到队列
*/
public void blockFullWithSegmentation(String[] arr) {List<List<String>> segList = new ArrayList<>();
List<String> srcList= Arrays.asList(arr);
//开始分段
SegmentDataWithLock.delivedEachSubList(srcList,segList,segmentSize);
//为每个分段存储到队列
for (int i = 0; i < segList.size(); i++) {
//System.out.println(segList.get(i).toString()+"");
List<String> singleSegList = segList.get(i);
dataQueue.offer(singleSegList);
}
}
public List<String> getNodeFromDataQueue() {
return dataQueue.poll();
}
public boolean isEmptyQueue(){
return dataQueue.isEmpty();
}
}
TransferDat
public class TransferDat {
private SegmentDataWithQueue segmentDataWithQueue;
private List<String> newDataContainer = new ArrayList<>();
public TransferDat() {
segmentDataWithQueue = new SegmentDataWithQueue();
}
/**
* 切分数据
*
* @param data
*/
public void segmentData(String[] data) {
segmentDataWithQueue.blockFullWithSegmentation(data);
}
/**
* 迁移数据
*/
public void transfer() {
try {
while (!segmentDataWithQueue.isEmptyQueue()) {
List<String> dataNodeList = segmentDataWithQueue.getNodeFromDataQueue();
if (!CollectionUtils.isEmpty(dataNodeList)) {
System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() +
": transfer data :" + dataNodeList.toString());
newDataContainer.addAll(dataNodeList);
Thread.sleep(300L);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() + ": transfer complete ");
}
public boolean isFinish() {
return segmentDataWithQueue.isEmptyQueue();
}
public List<String> getNewDataContainer() {
return newDataContainer;
}
}
Worker
public class Worker implements Runnable {
private CountDownLatch downLatch;
private TransferDat transferDat;
public Worker(TransferDat transferDat, CountDownLatch downLatch) {
this.downLatch = downLatch;
this.transferDat = transferDat;
}
@Override
public void run() {
transferDat.transfer();
downLatch.countDown();
}
}
TransferDatTest
public class TransferDatTest {
private static final int handlerThreads = 10;
public static void main(String[] args) {
long start = System.currentTimeMillis();
int arrLength = 1000;
String[] data = new String[arrLength];
for (int i = 0; i < arrLength; i++) {
data[i] = "TestData".concat(String.valueOf(i));
}
//迁移前
System.out.println("老数据:" + Arrays.asList(data));
TransferDat transferDat = new TransferDat();
//切分数据块
transferDat.segmentData(data);
//迁移开始并计数
CountDownLatch downLatch = new CountDownLatch(handlerThreads);
ExecutorService es = Executors.newFixedThreadPool(handlerThreads);
//模拟并发迁移
try {
for (int i = 0; i < handlerThreads; i++) {
es.submit(new Thread(new Worker(transferDat, downLatch)));
}
downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
es.shutdown();
//迁移后
System.out.println("Finished:" + transferDat.isFinish());
System.out.println("新数据:" + transferDat.getNewDataContainer());
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
}
方案三:
利用DB2数据自带的锁
select * From RRTEST where pkID='20070223ORD01267732' for update with RS
网友评论