美文网首页
查看源码——并发类

查看源码——并发类

作者: 取名废同学 | 来源:发表于2018-11-30 15:26 被阅读0次

    java.util.current包:包含许多线程安全、测试良好、高性能的并发构建块
    包含3个包:
    Atomic(原子数据构建)
    Locks(基本锁实现,AQS,locksupport)
    Concurrent(构建高级工具,线程池、并发队列等

    一、CAS:compare and swap
    允许多线程更新一个内存位置,同时能检测其他线程的冲突并恢复。


    image.png

    compareAndSet的实现:

        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    

    直接调用的是UnSafe这个类的compareAndSwapInt方法。全称是sun.misc.Unsafe. 这个类是Oracle(Sun)提供的实现. 可以在别的公司的JDK里就不是这个类了

    compareAndSwapInt的实现

        /**
         * Atomically update Java variable to <tt>x</tt> if it is currently
         * holding <tt>expected</tt>.
         * @return <tt>true</tt> if successful
         */
        public final native boolean compareAndSwapInt(Object o, long offset,
                                                      int expected,
                                                      int x);
    

    可以看到, 不是用Java实现的, 而是通过JNI调用操作系统的原生程序
    所以不用看源码了,懂道理就ok

    存在的问题:
    1、ABA问题
    因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A

    从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

    2、 循环时间长开销大。
    3、只能保证一个共享变量的原子操作。

    二、ReentrantLock:
    1、可重入锁的原理:
    一个线程获取锁之后,再去获取同一个锁,仅是把状态值累加,而不用重新排队

    对象是:
    当前获取资源的权利—— volatile int state
    申请者—— Thread
    排队—— Node双向队列

    (1)公平锁模型原理:
    过程是:
    a、初始化,state=0,无人抢占资源。
    b、线程A请求锁,获得了锁,state:1,


    image.png

    c、A获得锁,执行中;线程B请求锁,B无法获取锁,生成节点进行排队。


    image.png

    d.A又请求(不用排队,否则死锁),A还是获得锁,仅state+1



    e.A继续占有锁的过程,其他请求锁的线程在Node双向队列等待,进入休眠;
    A每释放一次锁,则state-1


    image.png
    f.A完全释放锁,此时state=0,其他线程有机会得到锁,此时通知队列唤醒B节点,B再次竞争锁
    g.公平锁按顺序去让线程得到锁

    (2)非公平锁模型原理:
    基本和公平差不多,除了A执行完,因为通知唤醒B需要时间,B唤醒后还要继续竞争锁,所以后面的线程可能会获取到锁。

    2、继承与实现关系:
    ReentrantLock类相关的类关系:
    继承于Lock类,sync类是其内部类,NonFairSynv和fairSynv继承于sync;
    NonfairSync/FairSync--->Sync--AbstractqueuedSynchronizer---->AbstractOenableSynchronizer
    另外ReentrantLock类实现了接口serializable和lock

    实现了Lock接口(有lock()、unlock()、tryLock、Condition newCondition() )

    3、构造方法:
    其中有sync对象,构造方法可以构造一个nonfairSync(默认,false)、或者fairsync对象。private final Sync sync;(不可修改)
    nonfairsync:非公平锁。后进的线程也可以先执行
    fairsync:公平锁,新进的线程先执行。

    public ReentrantLock()
      {
        sync = new NonfairSync();
      }
      
      public ReentrantLock(boolean paramBoolean)
      {
        sync = (paramBoolean ? new FairSync() : new NonfairSync());
      }
    

    4、主要方法:主要都是在其中又调用了sync的相关方法
    lock():加锁
    unlock():解锁
    tryLock()

    非公平锁加锁:
    简化版的步骤:(非公平锁的核心)
    基于CAS尝试将state(锁数量)从0设置为1
    A、如果设置成功,设置当前线程为独占锁的线程;
    B、如果设置失败,还会再获取一次锁数量,
    B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
    B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

    公平锁加锁:
    简化版的步骤:(公平锁的核心)
    获取一次锁数量,
    B1、如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
    B2、如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

    5、sync/fairsync/nonsync:
    构造方法
    Lock():此时尝试调用tryAccquires(),看是否能获得锁。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。
    boolean tryAccquire()
    unlock()

    6、总结:公平锁与非公平锁对比
    FairSync:lock()少了插队部分(即少了CAS尝试将state从0设为1,进而获得锁的过程)
    FairSync:tryAcquire(int acquires)多了需要判断当前线程是否在等待队列首部的逻辑(实际上就是少了再次插队的过程,但是CAS获取还是有的)。

    最后说一句,
    ReentrantLock是基于AbstractQueuedSynchronizer实现的,AbstractQueuedSynchronizer可以实现独占锁也可以实现共享锁,ReentrantLock只是使用了其中的独占锁模式

    墙裂推荐一篇写的好的博客:
    https://www.cnblogs.com/java-zhao/p/5131544.html

    7、其他:
    (1)jdk1.5之后才出现了ReentrantLock,标准格式下要在finally块中unlock()。

    (2)ReentrantLock与内置锁synchronized相比:
    a、ReetrantLock提供了其他功能(包括定时的锁等待、可中断的锁等待、公平性以及非块结构的加锁),性能更优,但是危险性更高,可能会忘记在finally中调用unlock()。
    b、ReentrantLock的非块结构特性,使其获取锁的操作不能与特定的栈帧关联,而内置锁可以。
    c、在内置锁无法满足需求的情况下,ReentrantLock作为高级工具,当需要一些高级功能使使用,包括:可定时的、可轮询与可中断的锁获取操作、公平队列、及非块结构的锁,否则优先使用synchronized
    (synchronized会在未来有更大的提升,因为他是JVM的内置数学,能执行一些优化。)

    (3)一般默认使用非公平锁,因为公平锁在挂起线程和恢复线程时存在巨大开销而极大地降低了性能。多数情况下,非公平锁的性能高于公平锁。

    三、AQS(AbstractQueuedSynchronier类,用于构建锁和同步器的框架)
    ReentrantLock.Sync继承于AQS,
    子类:ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、Future Task
    基于AQS构建的同步器,只可能在一个时刻发生阻塞。

    1、数据结构
    AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
    state的访问方式有三种:
    getState()、setState()、compareAndSetState()

    2、两种资源共享方式
    AQS定义两种资源共享方式:
    Exclusive(独占,只有一个线程能执行,如ReentrantLock)
    Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
    不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

    3、AQS主要方法:
    自定义同步器实现时主要实现以下几种方法:
    isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    源码:

     protected boolean tryAcquire(int paramInt)
      {
        throw new UnsupportedOperationException();
      }
      
      protected boolean tryRelease(int paramInt)
      {
        throw new UnsupportedOperationException();
      }
      
      protected int tryAcquireShared(int paramInt)
      {
        throw new UnsupportedOperationException();
      }
      
      protected boolean tryReleaseShared(int paramInt)
      {
        throw new UnsupportedOperationException();
      }
      
    

    AQS只是一个框架,只定义了接口,具体资源的获取/释放方式(通过state的get/set/CAS) 交由自定义同步器去实现。之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

    四、CountDownLatch:
    相当于计数器,在java.util.current下


    image.png

    1、数据结构:只有一个对象sync,并且也有内部类Sync
    private final Sync sync;

    2、构造方法:只有一个构造方法,设置闭锁需要等待的线程数,只能设置一次,不能更改。

    public CountDownLatch(int paramInt)
      {
        if (paramInt < 0) {
          throw new IllegalArgumentException("count < 0");
        }
        sync = new Sync(paramInt);
      }
    

    3、await():能够阻塞线程 直到调用N次end.countDown() 方法才释放线程
    public void await() throws InterruptedException { }; :
    调用await(),线程被挂起等待直到count为0才继续执行

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
    调用await(),线程被挂起,等待一定的时间后count值还没变为0的话就会继续执行

    4、countDown():计数方法
    public void countDown() { }; //每次调用将count值减1
    可以在多个线程中调用 计算调用次数是所有线程调用次数的总和

    5、使用

    五、CyclicBarrier:回环栅栏,在java.util.current包下。
    实现让一组线程等待至某个状态之后再全部同时执行。所有等待线程都被释放
    之后,CyclicBarrier可以被重用,此时状态叫barrier,当调用await()后,线程就处于barrier了。

    六、semaphore:信号量。再java.util.concurrent包下。
    可以控制同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可。

    acquire()获取许可
    release()释放许可。

    七、线程执行器
    线程池:线程池包含若干个准备运行的空闲线程,线程在程序运行的开始传教,把创建的Runnable对象交给线程池中线程运行,运行完成,无其他任务,线程转入休眠状态,有任务时唤醒,所有任务执行完成,关闭线程池。线程池机制分离了任务的创建和执行,减少了重复创建和销毁线程的开销。

    仅需要实现Runnable对象,把对象交给执行器,执行器会使用到线程池中的线程执行

    jdk1.5开始 Executor框架:
    Executor接口、其子接口ExecutorSevice、实现两个接口的类ThreadPoolExecutor;
    工厂类Executors

    源码分析:
    1、Executor接口:接收提交到线程池的Runnable任务,实现任务提交和执行分离
    只有一个方法:

    public abstract interface Executor
    {
      public abstract void execute(Runnable paramRunnable);
    }
    
    

    使用:

    一个类xxx implements Runnable
    
    Executor executor=...;
    executor.execute(xxx);
    

    2、ExecutorService接口:
    继承Executor
    有shutdown() 关闭线程池

    3、类ThreadPoolExecutor:
    用于创建一个线程池,有4个构造方法,可将Runnable和Callable对象交给该线程池运行。

    void execute(Runnable runnable);
    

    4、工厂类Executors:
    可创建4种线程池:(四种静态方法,可直接调用)

    newFiexedThreadPool:   创建一个固定大小的线程池
    
    newSingleThreadPool:创建只有一个线程的线程池
    
    newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    
    newScheduledThreadPool:创建一个有指定时间间隔/周期执行的线程池。
    

    使用:
    创建worker类 implements Runnable

    ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();

    executor.execute(worker);

    一般,一个线程执行类(implements Runnable),一个线程池执行类(创建了线程池执行器,有ThreadPoolExecutor类实例对象,在方法中,调用了executor,execute(worker),即将实例对象送到线程池执行) 一个测试类,去调用执行类的方法

    八、Fork/Join框架:多核处理器上并行编程的轻量级并行框架。
    Fork:对任务和数据进行划分,将一个大问题划分为若干个小问题;
    Join:对各个部分的运行结果进行合并,相当于障栅。

    相关文章

      网友评论

          本文标题:查看源码——并发类

          本文链接:https://www.haomeiwen.com/subject/cnaccqtx.html