美文网首页
分布式环境下多实例多线程读取同一张表的分析设计

分布式环境下多实例多线程读取同一张表的分析设计

作者: 梦想又照进现实 | 来源:发表于2019-05-20 22:53 被阅读0次

    场景描述:

    数据迁移,应用会部署多个实例,具体几个不确定,每个应用为提高处理效率要求使用多线程处理,读取的数据来源为同一个DB2数据库中的同一个表,表数据量1亿,读取到的数据写入到NoSQL存储中;

    设计分析:

    需要支持水平扩展,实例数量可以随时调整,每个应用的线程数也可以根据部署实例所在机器性能调整,读取的又是同一个大表的数据,需要考虑的问题有:
    1、多实例间处理数据不能重复,线程间处理数据不能重复;
    2、如果设计为外来请求触发数据迁移,需要考虑负载均衡产生请求倾斜问题,要求每个实例地位对等;

    设计方案一:

    数据分段示意.png

    第一步,Redis中设计数据分块锁,各个实例竞争这把锁,拿到锁的实例进行数据分块操作,没有拿到锁的则退出,拿到锁的按设置的分块大小常量进行数据RowNumber的分页查询,仅查主键,拿到的数据集合放到CurrentHashMap中,key值设计为分段序号,并同时创建key的锁,累加锁总数,map、分段锁和锁总数均存放于Redis中,释放分块锁;
    第二步,根据锁总数遍历尝试竞争某个分段锁,拿到任何一个分段锁即退出竞争;
    第三步,根据拿到的分段锁序号,读取key为该分段锁序号中的主键集合,根据总量和线程数进行模运算分发主键做数据查询,这里也可以平均分配,设计CountDownLatch计数器等待每个线程结束后,删除redis中CurrentHashMap中的该key的数据,释放分段锁;
    第四步,检查redis中CurrentHashMap对象的size是否为0,不为0则重复第二步直到为0退出;

    分段锁 (1).png

    附:所有的锁都设置过期时间,以防应用异常无法释放锁;

    单机版:

    代码:
    SegmentDataWithLock

    public class SegmentDataWithLock {
    
    
            //分段锁持有map
            private Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<String, ReentrantLock>();
    
            //分段持有主键数组map
            private Map<String, String[]> segmentMap = new HashMap<>();
    
            //分段前缀
            private final static String SEGMENT_PIX = "seg";
    
            //分段大小
            private final int segmentSize = 8;
    
            //总分段数
            private int segmentCnt = 0;
    
            /**
             * 按分段名称检查分段锁,没有则新建一个锁
             * @param segment
             * @return Lock
             */
            public Lock checkGetLock(String segment, boolean setDefault){
                ReentrantLock reentrantLock = lockMap.get(segment);
                if (reentrantLock == null && setDefault) {
                    synchronized (this){
                        reentrantLock = lockMap.get(segment);
                        if (reentrantLock == null) {
                            reentrantLock = new ReentrantLock();
                            System.out.println("lock for " + segment + " not exists! so create a lock: " + reentrantLock);
                            lockMap.put(segment, reentrantLock);
                            return  reentrantLock;
                        }
                        return  reentrantLock;
                    }
                }
                return  reentrantLock;
            }
    
            /**
             * 对数组进行分段并设置分段锁
             */
            public void blokFullWithSegmentation(String[] arr) {
                List<List<String>> segList=new ArrayList<>();
                List<String> mList= Arrays.asList(arr);
                //System.out.println(mList.toString());
                //开始分段
                this.delivedEachSubList(mList,segList,segmentSize);
    
                //为每个分段赋值及分段锁
                for (int i = 0; i < segList.size(); i++) {
                    //System.out.println(segList.get(i).toString()+"");
                    List<String> singleSegList = segList.get(i);
                    segmentMap.put(this.SEGMENT_PIX.concat(String.valueOf(i)),  singleSegList.toArray(new String[0]));
                    lockMap.put(this.SEGMENT_PIX.concat(String.valueOf(i)),new ReentrantLock());
                    this.segmentCnt = segList.size();
                }
            }
    
            public static void delivedEachSubList(List<String> mList, List<List<String>> segList, int segmentSize) {
                if( mList.size()%segmentSize!=0) {
                    for (int j = 0; j < mList.size() / segmentSize + 1; j++) {
                        if ((j * segmentSize + segmentSize) < mList.size()) {
                            segList.add(mList.subList(j * segmentSize, j * segmentSize + segmentSize));//0-3,4-7,8-11    j=0,j+3=3   j=j*3+1
                        } else if ((j * segmentSize + segmentSize) > mList.size()) {
                            segList.add(mList.subList(j * segmentSize, mList.size()));
                        } else if (mList.size() < segmentSize) {
                            segList.add(mList.subList(0, mList.size()));
                        }
                    }
                }else if(mList.size()%segmentSize==0){
                    for (int j = 0; j < mList.size() / segmentSize; j++) {
                        if ((j * segmentSize + segmentSize) <= mList.size()) {
                            segList.add(mList.subList(j * segmentSize, j * segmentSize + segmentSize));//0-3,4-7,8-11    j=0,j+3=3   j=j*3+1
                        } else if ((j * segmentSize+ segmentSize) > mList.size()) {
                            segList.add(mList.subList(j * segmentSize, mList.size()));
                        } else if (mList.size() < segmentSize) {
                            segList.add(mList.subList(0, mList.size()));
                        }
                    }
                }
            }
    
    
            public void removeDataAndLockOfSegment(String segment){
                segmentMap.remove(segment);
                lockMap.remove(segment);
            }
    
            public Map<String, ReentrantLock> getLockMap() {
                return lockMap;
            }
    
            public Map<String, String[]> getSegmentMap() {
                return segmentMap;
            }
    
            public int getSegmentCnt() { return segmentCnt; }
    }
    

    TransferData

    public class TransferData {
    
        private SegmentDataWithLock segmentDataWithLock  ;
    
        private  List<String> newDataContainer = new ArrayList<>();
    
       public TransferData(){
           segmentDataWithLock = new SegmentDataWithLock();
       }
    
        /**
         * 切分数据
         * @param data
         */
        public void segmentData(String[] data){
            segmentDataWithLock.blokFullWithSegmentation(data);
        }
    
        /**
         * 迁移数据
         * @param key
         */
        public void transfer(String key) {
            Lock lock = segmentDataWithLock.checkGetLock(key, false);
            if (lock == null) {
                System.out.println(key+" lock is null"+": "+Thread.currentThread().getName());
                return;
            }
    
            lock.lock();
            try {
                Map<String, String[]> segmentMap =  segmentDataWithLock.getSegmentMap();
                System.out.println(System.currentTimeMillis()+": "+Thread.currentThread().getName()+": transfer key("+key+")-> start, lock:"+ lock.toString());
                try {
                    if (segmentMap.get(key) != null && segmentMap.get(key).length > 0) {
                        System.out.println(Thread.currentThread().getName()+":"+Arrays.asList(segmentMap.get(key) ));
    //                    Random random = new Random();
    //                    long time = random.nextInt(10000) ;
    //                    Thread.sleep(time);
    
                        newDataContainer.addAll(Arrays.asList(segmentMap.get(key) ));
                    }
    
                    //删除数据和锁
                    segmentDataWithLock.removeDataAndLockOfSegment(key);
                    Thread.sleep(300L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(System.currentTimeMillis()+": "+Thread.currentThread().getName()+": transfer key("+key+")-> complete "  );
            } finally {
                lock.unlock();
            }
        }
    
    
        public boolean isFinish(){
            return segmentDataWithLock.getSegmentMap().isEmpty() ;
        }
    
        public  List<String> getNewDataContainer() {
            return newDataContainer;
        }
    
        public  int getDataSegmentCnt() {
            return segmentDataWithLock.getSegmentCnt();
        }
    
    }
    

    Worker

    class  Worker implements Runnable{
        private CountDownLatch downLatch;
        private TransferData instance;
    
        Worker( TransferData instance,CountDownLatch downLatc){
            this.downLatch = downLatc;
            this.instance = instance;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < instance.getDataSegmentCnt(); i++) {
                instance.transfer("seg" + i);
            }
            downLatch.countDown();
        }
    }
    

    TransferDataTest

    public class TransferDataTest {
    
        private static final int handlerThreads = 10;
    
        public static void main(String[] args) {
            long start =  System.currentTimeMillis();
            int arrLength = 1000;
            TransferData instance = new TransferData();
            String[] data = new String[arrLength];
            for (int i = 0; i < arrLength; i++) {
                data[i] = "TestData".concat(String.valueOf(i));
            }
            //迁移前
            System.out.println("老数据:"+ Arrays.asList(data));
    
            //切分数据块
            instance.segmentData(data);
    
            //迁移开始并计数
            CountDownLatch countDownLatch = new CountDownLatch(handlerThreads);
            for (int i = 0; i < handlerThreads; i++) {
                //模拟并发迁移
                new Thread(new Worker(instance,countDownLatch)).start();
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            //迁移后
            System.out.println("Finished:"+instance.isFinish());
            System.out.println("新数据:"+instance.getNewDataContainer());
            System.out.println("耗时:"+(System.currentTimeMillis() - start)+"ms");
        }
    }
    

    设计方案二:

    对方案一存放的CurrentHashMap改为ConcurrentLinkedQueue,这样可以避免使用分段锁

    单机版

    SegmentDataWithQueue

    public class SegmentDataWithQueue {
    
        //数据段的队列
        private ConcurrentLinkedQueue<List<String>> dataQueue = new ConcurrentLinkedQueue<>();
    
    
        //分段大小
        private final int segmentSize = 8;
    
    
        /**
         * 对数组进行分段并存储到队列
         */
        public void blockFullWithSegmentation(String[] arr) {List<List<String>> segList = new ArrayList<>();
            List<String> srcList= Arrays.asList(arr);
            //开始分段
            SegmentDataWithLock.delivedEachSubList(srcList,segList,segmentSize);
    
            //为每个分段存储到队列
            for (int i = 0; i < segList.size(); i++) {
                //System.out.println(segList.get(i).toString()+"");
                List<String> singleSegList = segList.get(i);
                dataQueue.offer(singleSegList);
            }
        }
    
        public List<String> getNodeFromDataQueue() {
            return dataQueue.poll();
        }
    
        public boolean isEmptyQueue(){
            return  dataQueue.isEmpty();
        }
    }
    
    

    TransferDat

    public class TransferDat {
    
        private SegmentDataWithQueue segmentDataWithQueue;
    
        private List<String> newDataContainer = new ArrayList<>();
    
        public TransferDat() {
            segmentDataWithQueue = new SegmentDataWithQueue();
        }
    
        /**
         * 切分数据
         *
         * @param data
         */
        public void segmentData(String[] data) {
            segmentDataWithQueue.blockFullWithSegmentation(data);
        }
    
        /**
         * 迁移数据
         */
        public void transfer() {
            try {
                while (!segmentDataWithQueue.isEmptyQueue()) {
                    List<String> dataNodeList = segmentDataWithQueue.getNodeFromDataQueue();
    
                    if (!CollectionUtils.isEmpty(dataNodeList)) {
                        System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() +
                                ": transfer data :" + dataNodeList.toString());
                        newDataContainer.addAll(dataNodeList);
                        Thread.sleep(300L);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() + ": transfer complete ");
        }
    
    
        public boolean isFinish() {
            return segmentDataWithQueue.isEmptyQueue();
        }
    
        public List<String> getNewDataContainer() {
            return newDataContainer;
        }
    
    }
    

    Worker

    public class Worker implements Runnable {
    
        private CountDownLatch downLatch;
        private TransferDat transferDat;
    
        public Worker(TransferDat transferDat, CountDownLatch downLatch) {
            this.downLatch = downLatch;
            this.transferDat = transferDat;
        }
    
        @Override
        public void run() {
            transferDat.transfer();
            downLatch.countDown();
        }
    }
    

    TransferDatTest

    public class TransferDatTest {
    
        private static final int handlerThreads = 10;
    
        public static void main(String[] args) {
            long start = System.currentTimeMillis();
            int arrLength = 1000;
    
            String[] data = new String[arrLength];
            for (int i = 0; i < arrLength; i++) {
                data[i] = "TestData".concat(String.valueOf(i));
            }
            //迁移前
            System.out.println("老数据:" + Arrays.asList(data));
    
            TransferDat transferDat = new TransferDat();
            //切分数据块
            transferDat.segmentData(data);
    
            //迁移开始并计数
            CountDownLatch downLatch = new CountDownLatch(handlerThreads);
            ExecutorService es = Executors.newFixedThreadPool(handlerThreads);
            //模拟并发迁移
            try {
                for (int i = 0; i < handlerThreads; i++) {
                    es.submit(new Thread(new Worker(transferDat, downLatch)));
                }
                downLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            es.shutdown();
    
            //迁移后
            System.out.println("Finished:" + transferDat.isFinish());
            System.out.println("新数据:" + transferDat.getNewDataContainer());
            System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
        }
    
    }
    
    

    方案三:
    利用DB2数据自带的锁
    select * From RRTEST where pkID='20070223ORD01267732' for update with RS

    gitee代码:
    https://gitee.com/danni505/SegmentData.git

    相关文章

      网友评论

          本文标题:分布式环境下多实例多线程读取同一张表的分析设计

          本文链接:https://www.haomeiwen.com/subject/cewkoqtx.html