美文网首页
《实战JAVA——高并发程序设计》part1:

《实战JAVA——高并发程序设计》part1:

作者: 让我再睡会儿啊 | 来源:发表于2019-06-23 20:03 被阅读0次

    重点1:常见错误


    ArrayList在并发下的问题:

    eg.

    package org.chain.current.demo.exceptiondemo;
    
    import java.util.ArrayList;
    
    public class ArrayListIssue {
        static class ArrayListThread implements Runnable {
    
            private ArrayList<Integer> list;
    
            ArrayListThread(ArrayList<Integer> lsit) {
                this.list = lsit;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < 1000000; i++) {
                    list.add(i);
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            ArrayList<Integer> list = new ArrayList<>(10);
            Thread t1 = new Thread(new ArrayListThread(list));
            Thread t2 = new Thread(new ArrayListThread(list));
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println("process end...,the list size is " + list.size());
        }
    }
    

    HashMap在并发下的问题:

    eg.

    package org.chain.current.demo.exceptiondemo;
    
    import java.util.HashMap;
    
    public class HashMapIssue {
        static class HashMapThread implements Runnable {
            int startNum;
            HashMap<String, String> hashMap;
    
            HashMapThread(int startNum, HashMap<String, String> hashMap) {
                this.startNum = startNum;
                this.hashMap = hashMap;
            }
    
            @Override
            public void run() {
                for (int i = startNum; i < 10000; i += 2) {
                    hashMap.put(String.valueOf(i), "test");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            HashMap<String, String> hashMap = new HashMap<>(16);
            Thread t1 = new Thread(new HashMapThread(0, hashMap));
            Thread t2 = new Thread(new HashMapThread(1, hashMap));
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println("process end the map size is " + hashMap.size());
        }
    }
    

    ps:JDK8对HashMap内部做了大规模的调整,不会出现程序无法结束的问题


    错误的加锁

    eg.

    package org.chain.current.demo.exceptiondemo;
    
    /**
     * 锁应该加在正确的对象上
     */
    public class LockObjectDemo {
        static class BadLockOnInteger implements Runnable {
    
            Integer integer = 0;
    
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    synchronized (integer) {
                        integer++;
                    }
    //                synchronized (this) {
    //                    integer++;
    //                }
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            BadLockOnInteger badLockOnInteger = new BadLockOnInteger();
            Thread t1 = new Thread(badLockOnInteger);
            Thread t2 = new Thread(badLockOnInteger);
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println("process is end the integer is " + badLockOnInteger.integer);
        }
    }
    

    重点二:并行模式


    并行流水线

    如果我们要计算(B+C)*B/2,必须按照算数优先级的顺序依次计算,无法并行。
    可以借鉴生产流水线的思想,把计算过程拆分为:
    1.p1=B+C
    2.p2=p1*B
    3.p3=p2/2
    eg.

    package org.chain.current.demo.parallelpattern;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * 流水线demo
     * 计算(B+C)*B/2
     */
    public class FlowLineDemo {
        private static BlockingQueue<Msg> qOne = new LinkedBlockingQueue<>();
        private static BlockingQueue<Msg> qTwo = new LinkedBlockingQueue<>();
        private static BlockingQueue<Msg> qThree = new LinkedBlockingQueue<>();
    
        static class Msg {
            float a;
            float b;
            StringBuilder log = new StringBuilder();
        }
    
        static class ProcessOne implements Runnable {
            @Override
            public void run() {
                while (true) {
                    try {
                        Msg temp = qOne.take();
                        float a = temp.a;
                        float b = temp.b;
                        temp.a = (a + b);
                        temp.b = a;
                        temp.log.append("P1:B=").append(a).append(",C=").append(b).append(",(B+C)=").append(temp.a).append("<<- ->>");
                        qTwo.put(temp);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class ProcessTwo implements Runnable {
            @Override
            public void run() {
                while (true) {
                    try {
                        Msg temp = qTwo.take();
                        float a = temp.a;
                        float b = temp.b;
                        temp.a = a * b;
                        temp.log.append("P2:(B+C)=").append(a).append(",B=").append(b).append(",(B+C)*B=").append(temp.a).append("<<- ->>");
                        qThree.put(temp);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class ProcessThree implements Runnable {
            @Override
            public void run() {
                while (true) {
                    try {
                        Msg temp = qThree.take();
                        float a = temp.a / 2;
                        temp.a = a;
                        temp.log.append("P3:(B+C)*B=").append(a).append(",(B+C)*B/2=").append(temp.a);
                        System.out.println("flow end," + temp.log);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        static class Producer implements Runnable {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    Random random = new Random();
                    Msg msg = new Msg();
                    msg.a = random.nextInt(10);
                    msg.b = random.nextInt(10);
                    try {
                        qOne.put(msg);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(new Producer());
            Thread t2 = new Thread(new ProcessOne());
            Thread t3 = new Thread(new ProcessTwo());
            Thread t4 = new Thread(new ProcessThree());
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t1.join();
            t2.join();
            t3.join();
            t4.join();
        }
    }
    

    并行搜索

    比如在一个数组中查询某个数,可以将数组分段利用多线程并行搜索
    eg

    package org.chain.current.demo.producerandconsumer;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 并行搜索demo
     */
    public class parallelSearch {
        private static int[] arr;
        private static AtomicInteger result = new AtomicInteger(-1);
    
        private static Integer search(int val, int start, int end) {
            for (int i = start; i < end; i++) {
                if (result.get() != -1) {
                    return result.get();
                }
                if (val == arr[i]) {
                    result.compareAndSet(-1, i);
                    return result.get();
                }
            }
            return -1;
        }
    
        static class SearchTask implements Callable<Integer> {
            private int val;
            private int start;
            private int end;
    
            public SearchTask(int val, int start, int end) {
                this.val = val;
                this.start = start;
                this.end = end;
            }
    
            @Override
            public Integer call() throws Exception {
                System.out.println("ThreadName:" + Thread.currentThread().getName() + ",start:" + start + ",end:" + end);
                return search(val, start, end);
            }
        }
    
        private static int PSearch(int val, int threadNum) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
            List<Future<Integer>> futures = new ArrayList<>();
            int step = arr.length / threadNum;
            for (int i = 0; i < arr.length; i += step) {
                int end = i + step;
                if ((end + step) > arr.length) {
                    end = arr.length;
                    futures.add(executorService.submit(new SearchTask(val, i, end)));
                    break;
                }
                futures.add(executorService.submit(new SearchTask(val, i, end)));
            }
            executorService.shutdown();
            for (Future<Integer> future : futures) {
                if (future.get() >= 0) {
                    return future.get();
                }
            }
            return -1;
        }
    
        private static void initArray(int i) {
            arr = new int[i];
            Random random = new Random();
            for (int j = 0; j < i; j++) {
                arr[j] = random.nextInt(10);
            }
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            initArray(10);
            System.out.println("arr:" + Arrays.toString(arr));
            System.out.println("result:" + PSearch(5, 3));
        }
    }
    

    并行排序

    回顾了基础的排序算法,冒泡排序,奇偶交换排序,插入排序,希尔排序。
    利用并发实现奇偶交换排序和希尔排序。
    eg1:奇偶交换排序并发版

    package org.chain.current.demo.parallelpattern.sort;
    
    import java.util.Arrays;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 奇偶交换排序并行模式
     */
    public class ParallelOddEvenSortDemo {
    
        private static int[] arr;
    
        private static ExecutorService pool = Executors.newCachedThreadPool();
    
        private static int exchFlag = 1;
    
        private static synchronized int getExchFlag() {
            return exchFlag;
        }
    
        private static synchronized void setExchFlag(int v) {
            exchFlag = v;
        }
    
        private static class ExchangeTask implements Runnable {
            int i;
            CountDownLatch cdl;
    
            ExchangeTask(int i, CountDownLatch cdl) {
                this.i = i;
                this.cdl = cdl;
            }
    
            @Override
            public void run() {
                if (arr[i] > arr[i + 1]) {
                    arr[i] = arr[i] ^ arr[i + 1];
                    arr[i + 1] = arr[i] ^ arr[i + 1];
                    arr[i] = arr[i] ^ arr[i + 1];
                    setExchFlag(1);
                }
                cdl.countDown();
            }
        }
    
        private static void parallelOddEvenSort() throws InterruptedException {
            int start = 0;
            while (getExchFlag() == 1 || start == 1) {
                setExchFlag(0);
                CountDownLatch cdl = new CountDownLatch(arr.length / 2 - (arr.length % 2 == 0 ? start : 0));
                for (int i = start; i < arr.length - 1; i += 2) {
                    pool.submit(new ExchangeTask(i, cdl));
                }
                cdl.await();
                start = start == 0 ? 1 : 0;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            arr = DemoUtil.init(10, 100);
            System.out.println("the origin arr is:" + Arrays.toString(arr));
            parallelOddEvenSort();
            System.out.println("the sorted arr is:" + Arrays.toString(arr));
            pool.shutdown();
        }
    }
    

    eg2:希尔排序并发版本

    package org.chain.current.demo.parallelpattern.sort;
    
    import java.util.Arrays;
    
    /**
     * 希尔排序串行demo
     */
    public class ShellSortDemo {
    
        public static void main(String[] args) {
            int[] arr = DemoUtil.init(19, 100);
            System.out.println("origin array:" + Arrays.toString(arr));
            shellSort(arr);
            System.out.println("sorted array:" + Arrays.toString(arr));
        }
    
        public static void shellSort(int[] arr) {
            //计算最大h
            int h = 1;
            while (h <= arr.length / 3) {
                h = 3 * h + 1;
            }
            while (h > 0) {
                for (int i = h; i < arr.length; i++) {
                    if (arr[i] < arr[i - h]) {
                        int tmp = arr[i];
                        int j = i - h;
                        while (j >= 0 && arr[j] > tmp) {
                            arr[j + h] = arr[j];
                            j -= h;
                        }
                        arr[j + h] = tmp;
                    }
                }
                h = (h - 1) / 3;
            }
        }
    }
    

    点我获取代码

    相关文章

      网友评论

          本文标题:《实战JAVA——高并发程序设计》part1:

          本文链接:https://www.haomeiwen.com/subject/jjccfctx.html