美文网首页
多线程编程

多线程编程

作者: laowangv2 | 来源:发表于2020-12-15 23:29 被阅读0次

    同步机制

    Atomic类

    1. 原理
      CAS+自旋,CAS依赖unsafe实现。缺点的高并发是自旋消耗cpu
    2. jdk8的优化
      新增了几个继承自Striped64的类,LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator,思想是CAS的目标分段,每个线程分别对应一个段,降低冲突,最后把所有段加起来返回

    1. synchronized关键字
      对象头中指向一个ObjectMonitor(c++实现)对象,monitor实现锁依赖于操作系统的Mutex Lock,使用时需要进行用户态到内核态的切换,所以效率较低。
      锁优化过程:因为synchronized的效率问题,JDK1.6进行了优化,主要依靠对象头设计了三种不同类型的锁——偏向锁、轻量级锁、重量级锁。初始获得偏向锁,待其他线程竞争时,将偏向锁升级到轻量级锁,原持有锁的线程继续执行,竞争者自旋,一定次数后升级为重量级锁,竞争者阻塞。
      详见:
      深入理解Java并发之synchronized实现原理
      Java性能 -- synchronized锁升级优化
    2. Lock
      典型用法:
     Lock l = ...;
     l.lock();
     try {
       // access the resource protected by this lock
     } finally {
       l.unlock();
     }
    

    和synchronized的一些区别,除了使用方式上:

    • try acquire
    • 公平锁
    • 可中断
    1. 读写锁
      • 不保证优先级,支持公平/非公平
      • 可重入
      • 可降级
      • 可中断
      • 支持条件变量

    官方示例:

    class CachedData {
       Object data;
       volatile boolean cacheValid;
       final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
       void processCachedData() {
         rwl.readLock().lock();
         if (!cacheValid) {
           // Must release read lock before acquiring write lock
           rwl.readLock().unlock();
           rwl.writeLock().lock();
           try {
             // Recheck state because another thread might have
             // acquired write lock and changed state before we did.
             if (!cacheValid) {
               data = ...
               cacheValid = true;
             }
             // Downgrade by acquiring read lock before releasing write lock
             rwl.readLock().lock();
           } finally {
             rwl.writeLock().unlock(); // Unlock write, still hold read
           }
         }
    
         try {
           use(data);
         } finally {
           rwl.readLock().unlock();
         }
       }
     }
    
     class RWDictionary {
       private final Map<String, Data> m = new TreeMap<String, Data>();
       private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
       private final Lock r = rwl.readLock();
       private final Lock w = rwl.writeLock();
    
       public Data get(String key) {
         r.lock();
         try { return m.get(key); }
         finally { r.unlock(); }
       }
       public String[] allKeys() {
         r.lock();
         try { return m.keySet().toArray(); }
         finally { r.unlock(); }
       }
       public Data put(String key, Data value) {
         w.lock();
         try { return m.put(key, value); }
         finally { w.unlock(); }
       }
       public void clear() {
         w.lock();
         try { m.clear(); }
         finally { w.unlock(); }
       }
     }
    
    1. 可重入实现
      计数器 + 记录持有锁的线程

    信号量

    本质上是个计数器,可以限制能够访问资源的线程个数。常见场景如池的访问,官方示例:

     class Pool {
       private static final int MAX_AVAILABLE = 100;
       private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
       public Object getItem() throws InterruptedException {
         available.acquire();
         return getNextAvailableItem();
       }
    
       public void putItem(Object x) {
         if (markAsUnused(x))
           available.release();
       }
    
       // Not a particularly efficient data structure; just for demo
    
       protected Object[] items = ... whatever kinds of items being managed
       protected boolean[] used = new boolean[MAX_AVAILABLE];
    
       protected synchronized Object getNextAvailableItem() {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (!used[i]) {
              used[i] = true;
              return items[i];
           }
         }
         return null; // not reached
       }
    
       protected synchronized boolean markAsUnused(Object item) {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (item == items[i]) {
              if (used[i]) {
                used[i] = false;
                return true;
              } else
                return false;
           }
         }
         return false;
       }
     }
    

    当计数器为1时,构成一个二元信号量,可以当做一个“锁”使用,这个“锁”可以由其他线程释放,也就是说信号量没有owner的概念,这一点在死锁恢复中很有用。

    条件变量

    获取锁之后,可以等待某个条件,等待时释放锁。使用时需要和锁绑定在一起。
    与wait、notify/notifyAll的区别类似于synchronized和lock。
    官方示例:

     class BoundedBuffer {
       final Lock lock = new ReentrantLock();
       final Condition notFull  = lock.newCondition(); 
       final Condition notEmpty = lock.newCondition(); 
    
       final Object[] items = new Object[100];
       int putptr, takeptr, count;
    
       public void put(Object x) throws InterruptedException {
         lock.lock();
         try {
           while (count == items.length)
             notFull.await();
           items[putptr] = x;
           if (++putptr == items.length) putptr = 0;
           ++count;
           notEmpty.signal();
         } finally {
           lock.unlock();
         }
       }
    
       public Object take() throws InterruptedException {
         lock.lock();
         try {
           while (count == 0)
             notEmpty.await();
           Object x = items[takeptr];
           if (++takeptr == items.length) takeptr = 0;
           --count;
           notFull.signal();
           return x;
         } finally {
           lock.unlock();
         }
       }
     }
    

    AQS

    AQS是JUC中大部分同步类(如上面说的ReentrantLock、Semaphore等)的底层实现框架。通过继承AQS并实现几个必要的方法,我们可以很容易地实现自己的同步类。简单地说,AQS通过CAS操作和一个等待队列(CLH队列的变体)来实现同步功能,如下图:

    AQS基本原理
    详见美团技术团队的文章:
    从ReentrantLock的实现看AQS的原理及应用

    线程池

    1. Executor,顶层接口,只有execute方法
    2. ExecutorService,继承自Executor,增加了shutdown和返回future的能力,关闭示例:
    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
         // Wait a while for existing tasks to terminate
         if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
               System.err.println("Pool did not terminate");
         }
       } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
       }
     }
    

    线程池的五种状态:


    线程池的状态

    https://www.jianshu.com/p/03ecc5a4316c

    1. ThreadPoolExecutor,核心参数:
      • Core and maximum pool sizes
        线程数小于core size时,提交任务直接创建新线程,不管有没有idle线程;
        线程数介于core size和maximum之间时,只有没有idle线程才会创建
      • Keep-alive times
        超过core size的线程存活时间
      • 队列
        线程数大于core size后,会优先扔进队列,队列满后会新建线程直到已经达到maximumPoolSIze,这时任务会被拒绝
        常见的3种队列:
        1. 直传,如使用SynchronousQueue,不存储,消费一个放一个,一般要求无界的maximumPoolSize防止拒绝任务。这种策略防止任务之间有依赖的时候卡住;
        2. 无界队列,如LinkedBLockingQueue,此时maximumPoolSize是无效的,也不会大于corePoolSize个数的线程被创建出来;
        3. 有界队列,如ArrayBlockingQueue,queue size和maximumPoolSize更难取舍,取决于cpu占用和吞吐量的取舍
      • 拒绝任务
        队列大小和最大线程数被触达后就会执行RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
        四种预置的拒绝策略包括:
        1. ThreadPoolExecutor.AbortPolicy,
          RejectedExecutionException
        2. ThreadPoolExecutor.CallerRunsPolicy
          提交线程自己执行
        3. ThreadPoolExecutor.DiscardPolicy
          丢弃
        4. ThreadPoolExecutor.DiscardOldestPolicy
          丢弃队列头任务,然后重试

    总结ThreadPoolExecutor的工作流程:
    当线程数小于corePoolSize时,每次提交直接创建新线程;当corePoolSize达到后,再提交会放到队列,当队列满后,继续创建线程直到maximumPoolSIze,然后就是开始拒绝任务提交

    如何设置线程池的参数?

    • 机械化的公式
    • IO型/计算型
    • 监控 + 动态配置
    1. Executors
      工厂类,常见的四种线程池:

      • newCachedThreadPool,可缓存线程


        newCachedThreadPool
      • newFixedThreadPool,定长,队列无界


        newFixedThreadPool
      • newScheduledThreadPool,定时,支持延迟,异常之后不会继续执行


        newScheduledThreadPool

        实现延时基于:DelayedWorkQueue,一般而言,延时队列基于优先级队列(堆)实现。
        两种用法:

        • scheduleWithFixedDelay
          固定延时,等上次执行完后,等待延时时间执行
        • scheduleAtFixedRate
          固定频率执行,如果任务执行时间超过定时,就立刻开始,否则等待固定时间到再执行
      • newSingleThreadExecutor


        newSingleThreadExecutor
    2. fork join
      传统的线程池无法处理任务直接存在依赖的情况,也就是分治。fork join可以应用在这种场景中,类似map-reduce,fork把任务拆成小任务,join合并结果,多个线程有各自的队列,当自己空闲时会去其他线程队列偷任务执行。使用要注意不要阻塞父线程使其成为监工。

    Java内存模型

    • 线程和进程
      进程是资源分配的单元,拥有独立的地址空间。线程是任务调度的单元,是进程内部的一个执行序列。一个进程至少有一个线程,进程之间切换开销大,通信困难,线程切换开销小,可以共享进程资源,同步方便。
    • 线程的状态
      NEW、RUNABLE、TERMINATED、WAITING、TIMED_WAITING、BLOCKED


      线程状态
    • JMM
      Java内存模型,JMM,可以理解为一组规范、规则,屏蔽了底层硬件(cpu、寄存器、内存)的差异,为java程序提供了统一的内存访问模型。
    • 三大特性
      JMM围绕三大特性展开,包括原子性、可见性、有序性。加锁可以满足全部三条特性,volatile可以满足可见性和有序性(通过内存屏障)
    • happens-before原则
      除了三大特性以外,还可以通过happens-before原则来推定程序的顺序:
      1. 程序次序规则
      2. 锁定规则
      3. volatile变量规则
      4. 传递规则
      5. 线程启动规则
      6. 线程中断规则
      7. 线程终结规则
      8. 对象终结规则

    理解happens-before原则:因为工作线程的缓存和主内存同步问题,先行发生的线程对内存的操作未必能被后续线程观测到,而如果满足hb原则则可以保证这一点

    其他

    1. ThreadLocal
      每个thread对象里有一个threadlocals属性,这是一个map,以ThreadLocal为key,set的值为value。所以对每个threadlocal,每个thread内部都有一个map存对应的值。

    相关文章

      网友评论

          本文标题:多线程编程

          本文链接:https://www.haomeiwen.com/subject/aruggktx.html