美文网首页
并发编程的学习笔记。

并发编程的学习笔记。

作者: 会飞的蜗牛66666 | 来源:发表于2019-03-09 11:28 被阅读0次

    关于锁的问题:
    1,避免一个线程同时获取多个锁
    2,避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
    3,尝试使用定时锁,使用lock.tryLock(timeout)来替代内部锁机制 lock.tryLock(300L, TimeUnit.SECONDS);
    4,对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的现象

    synchronize和volatile
    volatile是轻量级的synchronize,在线程中保证了共享变量的可见性,即当一个线程修改了这个值以后,其他线程能被同步刷新,而不会被锁住
    synchronized可以修饰方法或者以同步代码块的形式进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中
    volatile基本替代了synchronize重量级的锁
    采用TimeUnit.SECONDS.sleep(1) 替代Thread.sleep(200)
    public class SafeDoubleCheckedLocking {
    private volatile static SafeDoubleCheckedLocking instance;
    public SafeDoubleCheckedLocking() {
    }
    public static SafeDoubleCheckedLocking getInstance() {
    if (instance == null) {
    synchronized (SafeDoubleCheckedLocking.class) {
    if (instance == null) {
    instance = new SafeDoubleCheckedLocking();
    }
    }
    }
    return instance;
    }
    }

    线程
    //获取java线程管理mxbean
    final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    final ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
    for (ThreadInfo threadInfo : threadInfos) {
    System.out.println(threadInfo.getThreadName());
    }
    线程的优先级
    在java中,通过一个整型成员变量priority来控制优先级,优先级的范围从1到10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认的优先级是5
    优先级高的线程分配的时间片的数量要多于优先级低的线程
    安全的终止线程
    public class ShutDown {

    public static void main(String[] args) throws Exception {
        final Runner one = new Runner();
        Thread countThread = new Thread(one, "CountThread");
        countThread.start();
        //睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();
        final Runner two = new Runner();
        countThread = new Thread(two, "CountThread");
        countThread.start();
        //睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        two.cancle();
    }
    
    private static class Runner implements Runnable {
        private long i;
        private volatile boolean on = true;
    
        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()) {
                i++;
            }
            System.out.println("Count i=" + i);
        }
    
        private void cancle() {
            on = false;
        }
    }
    

    }

    线程之间的通信
    notify()与wait()
    notify()通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁
    notifyAll()通知所有等待在该对象上的线程
    wait()调用该方法的线程进入waitting状态,只有等待另外的线程的通知或被中断才会返回,需要注意的是调用该方法后会释放对象的锁
    wait(long)是等待多少毫秒

    ThreadLocal 线程变量
    是一个以ThreadLocal对象为主键,任意对象为值的存储结构,这个结果被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一值

    CountDownLatch类只提供了一个构造器:
    public CountDownLatch(int count) { }; //参数count为计数值
    然后下面这3个方法是CountDownLatch类中最重要的方法:
    public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
    public void countDown() { }; //将count值减1,直到count的值为0才开始执行

    hashmap与ConcurrentHashMap
    在多线程环境中采用线程不安全的hashMap进行操作会有死循环的现象,导致cpu的利用率接近100%。
    为何呢?因为多线程会导致HashMap的entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry
    Hashtable容器使用sychronized来保证线程安全,但是在线程竞争激烈的情况下,其效率非常低下,因为当一个线程访问hashtable的同步方法,其他线程也访问hashtable的同步方法的时候
    会进入阻塞和轮询的状态。
    ConcurrentHashMap采用分段锁技术,每一把锁用于容器其中一部分数据,所以线程间不存在锁竞争。

    CountDownLatch允许一个或多个线程等待其他线程完成操作
    现在有这样一个场景,现在有一个excel文件,文件里面有多个sheet数据,此时可以考虑多线程去处理。
    每个线程解析一个sheet里面的数据,等到所有的sheet都解析完成之后,程序需要提示解析完成,在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的方式是join()
    join用于让当前执行线程等待join线程执行结束。其实原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中wait(0)表示永远等待下去,代码片段如下:
    while (isAllLive()){
    wait(0)
    }
    public class Test {

    public static void main(String[] args) throws InterruptedException {
        Thread parse1 = new Thread(new Runnable() {
            public void run() {
                System.out.println("parse1 finished");
            }
        });
        Thread parse2 = new Thread(new Runnable() {
            public void run() {
                System.out.println("parse2 finished");
            }
        });
    
        parse1.start();
        parse2.start();
        //join用于让当前执行线程等待join线程执行结束
        parse1.join();//不停的检查join线程是否存活,如果还存活就永远的让当前线程等待
        parse2.join();
        //检查到join线程终止以后,线程就会调用this.notifyAll()方法,这个是在jvm里面自动实现的。
        System.out.println("all parse finished");
    }
    

    }

    那么我们来看下CountDownLatch的具体使用方式:
    public class CountDownLatchTest {
    //传入一个int类型的参数作为计数器N
    private static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
    
        new Thread(new Runnable() {
            public void run() {
                System.out.println(1);
                //当调用countDown方法时,N就会减1,它的await方法会阻塞当前线程,直到N变成0,这里的N可以是N个线程也可以是N个步骤
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        //如果有某个解析sheet的线程比较慢,我们不可能一直让主线程等待,所以采用一个带时间参数的await方法如await(long timeout, TimeUnit unit)
        //这个方法就是等待特定的时间后,就不会再阻塞当前线程
        c.await();
        System.out.println("3");
    }
    

    同步屏障CyclicBarrier的使用
    /**

    • @Author: xwj

    • @Date: 2019/3/8 0008 17:13

    • @Version 1.0

    • 同步屏障,他的作用是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,屏障才会开门,所有被屏障阻截的线程才会继续运行
      */
      public class CyclicBarrierTest {

      //构造方法CyclicBarrier(N)表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
      static CyclicBarrier c = new CyclicBarrier(2);

      public static void main(String[] args) {

       new Thread(new Runnable() {
           public void run() {
               try {
                   c.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               } catch (BrokenBarrierException e) {
                   e.printStackTrace();
               }
               System.out.println(1);
           }
       }).start();
       try {
           c.await();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (BrokenBarrierException e) {
           e.printStackTrace();
       }
       System.out.println(2);
       //因为主线程和子线程的调度都是由cpu决定的,2个线程都有可能先执行,所以会有2种结果。
       //如果把new CyclicBarrier(2);改为new CyclicBarrier(3);则会一直等待下去,因为没有第3个线程到达屏障
      

      }
      }

    /**

    • @Author: xwj

    • @Date: 2019/3/8 0008 17:25

    • @Version 1.0

    • CyclicBarrier提供了一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction)

    • 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
      */
      public class CyclicBarrierTest2 {

      static CyclicBarrier c = new CyclicBarrier(2, new A());

      public static void main(String[] args) {

       new Thread(new Runnable() {
           public void run() {
               try {
                   c.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               } catch (BrokenBarrierException e) {
                   e.printStackTrace();
               }
               System.out.println(1);
           }
       }).start();
      
       try {
           c.await();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (BrokenBarrierException e) {
           e.printStackTrace();
       }
       System.out.println(2);
      

      }

      //因为设置了拦截线程的数据是2,所以等代码中线程A和主线程都执行完之后,才会继续执行主线程。
      static class A implements Runnable {

       public void run() {
           System.out.println(3);
       }
      

      }
      }

    CyclicBarrier实际应用例子

    /**

    • @Author: xwj

    • @Date: 2019/3/9 0009 11:09

    • @Version 1.0

    • CyclicBarrier的应用,多线程进行计算数据,最后合并计算结果的场景

    • 有这样一个场景,用一个excel保存了用户所有银行流水,每个sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水

    • 实现方法:先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后再用barrierAction用这些线程计算结果,计算出整个EXCEL的日均银行流水
      */
      public class BandWaterService implements Runnable {

      /**

      • 创建4个线程屏障,处理完之后执行当前类的run方法
        */
        private CyclicBarrier c = new CyclicBarrier(4, this);

      /**

      • 假设只有4个sheet,所以只启动4个线程
        */
        private ExecutorService executor = newFixedThreadPool(4);

      /**

      • 保存每个sheet计算出的银流结果
        */
        private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

      private void count() {
      for (int i = 0; i < 4; i++) {
      executor.execute(new Runnable() {
      @Override
      public void run() {
      //计算当前sheet的银流数据,计算代码省略
      sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
      //银流计算完成,插入一个屏障
      try {
      //让4个线程都到达屏障后等待,再执行主线程的汇总结果
      c.await();
      } catch (InterruptedException | BrokenBarrierException e) {
      e.printStackTrace();
      }
      }
      });
      }
      }

      @Override
      public void run() {
      int result = 0;
      //汇总每个sheet计算出的结果
      final Set<Map.Entry<String, Integer>> entries = sheetBankWaterCount.entrySet();
      for (Map.Entry<String, Integer> sheet : entries) {
      result += sheet.getValue();
      }
      //将结果输出
      sheetBankWaterCount.put("result", result);
      System.out.println(result);
      }

      public static void main(String[] args) {
      final BandWaterService service = new BandWaterService();
      service.count();
      }
      }

    下面来了解CyclicBarrier和CountDownLatch的区别
    CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以用getNumberWaiting获取线程阻塞的数量,isBorken方法来了解线程是否阻塞等。

    控制并发线程数的Semaphore
    Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。可以理解为红绿灯
    /**

    • @Author: xwj

    • @Date: 2019/3/9 0009 11:38

    • @Version 1.0

    • 假如有一个需求,需要读取几万个文件的数据,所以我们启动几十个线程并发地读取,然后将它写入到数据库,但是数据库连接数只有10个

    • 这个时候,我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。

    • 这种情况,就可以使用Semaphore来做流量控制了。
      */
      public class SemaphoreTest {
      private static final int THREAD_COUNT = 30;

      private static ExecutorService threadPool = newFixedThreadPool(THREAD_COUNT);

      private static Semaphore s = new Semaphore(10);

      /**

      • Semaphore的构造方法要求传入一个int的参数,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后release归还许可证。
      • 还可以用tryAcquire()方法尝试获取许可证
      • 它的用法类似于去图书馆借书,由于资源有限,每次只云允许最后10个人来看书,每个人去看书都要申请许可证,获取许可后才可以看书
      • 看完书以后,就要归还许可证,然后其他人又可以去申请许可证。
      • 我们来看它的一些方法
      • s.getQueueLength();等待获取许可证的线程数
      • s.availablePermits();许可证的可用数量
      • s.hasQueuedThreads();是否有线程正在等待获取许可证
      • @param args
        */
        public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
        threadPool.execute(new Runnable() {
        @Override
        public void run() {
        try {
        s.acquire();
        System.out.println("save data");
        s.release();
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        }
        });
        }
        threadPool.shutdown();
        }
        }

    相关文章

      网友评论

          本文标题:并发编程的学习笔记。

          本文链接:https://www.haomeiwen.com/subject/rtlspqtx.html