4-多线程-concurrent包

作者: 宠辱不惊的咸鱼 | 来源:发表于2018-02-28 19:37 被阅读16次

    概述

    • 对象锁
      • 一般指synchronized,和对象有关
      • 每个对象都有个隐形的监视器,用于线程的同步
      • 线程状态:创建(new)->就绪(start)->运行(run)->阻塞(lock/wait/join/sleep)->销毁
    • ReentrantLock
      • 互斥锁
      • 可重入
    • Condition
      • 实现wait,notify,notifyAll的功能
    • ReadWriteLock - ReentrantReadWriteLock
      • 共享锁:读读共享

    ReentrantLock

    • synchronized的功能增强版;JDK 1.5之前性能优于synchronized;现在性能相差不多
    // 简单用法
    public static ReentrantLock lock = new ReentrantLock();
        try {
            lock.lock();
            i++;
        } finally {
            lock.unlock(); // 保证异常情况下也释放;synchronized由JVM释放
        }
    
    • 可重入
      • 获取计数器,锁几次,就需要释放几次
    • 可中断
      • lock.lock()
        • 不能响应中断
        • 在park时被中断,park会解除,设置interrupt标识,但是不抛异常,线程继续
      • lock.lockInterruptibly()
        • 能响应中断
        • 被lock.lockInterruptibly() block的线程,收到interrupt信号时,会抛出InterruptedException,进而退出
    • 可限时
      • lock.tryLock(long timeout, TimeUnit unit),超时返回false
    • 公平锁
      • 默认非公平
        • 先尝试抢占,再排队
      • 公平
        • 直接排队
        • public ReentrantLock(boolean fair)
      • synchronized是非公平锁

    Condition

    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();
    
    • lock内部维护一个同步队列
      • 同步队列头的类型是AbstractQueuedSynchronizer
    • condition内部维护一个条件队列
      • 条件队列头的类型为AbstractQueuedSynchronizer$ConditionObject
    • 一个lock可以有多个condition,即多个条件队列
    • 持锁者才可以执行await
      • 因为await中有release的逻辑,会抛IllegalMonitorStateException
    • condition.await():类似Object.wait()
      • 持锁线程将自己加入条件队列
      • release同步队列中后继节点
        • unparkSuccessor(),这步一定要做,不然同步队列里的线程就死在那里了
      • 调用LockSupport.park()将自己阻塞
      • 当有别的线程调用了signal(),实际就是调用LockSupport.unpark()把该线程唤醒
      • 被唤醒线程继续await()方法中的剩余逻辑
        • acquireQueued,这是lock.lock()中的逻辑,即获取锁
    • condition.signal():Object.notify()
      • 唤醒条件队列头节点
      • 线程进入同步队列
    • condition.signalAll():Object.notifyAll()
    • awaitUninterruptibly()
      • 与await()不同:不会在等待过程中响应中断

    Semaphore

    概述

    Semaphore semaphore = new Semaphore(5);
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
            NonfairSync(int permits) {
                super(permits);
            }
                Sync(int permits) {
                    setState(permits);
                }
                    AbstractQueuedSynchronizer:
                    protected final void setState(int newState) {
                        state = newState;
                    }
    
    // 公平锁
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    • 默认非公平
    • 构造方法参数即许可数,被设置到了AbstractQueuedSynchronizer的state属性
    • 线程先尝试获得许可,如果成功,许可数-1;线程运行结束后释放许可,许可数+1
    • 许可数为0,则获取失败;线程进入AQS的同步队列,被其他释放许可的线程唤醒

    acquire

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
                final int nonfairTryAcquireShared(int acquires) {
                    for (;;) {
                        int available = getState();
                        int remaining = available - acquires;
                        if (remaining < 0 || compareAndSetState(available, remaining))
                            return remaining;
                    }
                }
    
    • 比较available(其实就是state属性)和acquires
    • 如果拿掉许可之后不小于0,就可以拿许可
    • 若是小于0,则表示拿许可失败,在doAcquireSharedInterruptibly(arg)中将自己阻塞,即进入同步队列
      • 阻塞:在parkAndCheckInterrupt中通过LockSupport.park(this)进行自我阻塞

    release

    public void release() {
        sync.releaseShared(1);
    }
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
    • 将AQS中的state值+1
    • 通过doReleaseShared()唤醒同步队列中的第一个节点

    场景

    CountDownLatch

    概述

    CountDownLatch countDownLatch = new CountDownLatch(10);
    
    • 有点高级join的感觉,主线程调用countDownLatch.await(),进入等待
    • countDownLatch.countDown()执行10次,把那个10消耗光
    • 主线程继续

    机制

    • CountDownLatch持有一个内部类Sync,这个10会传给AbstractQueuedSynchronizer的state属性
    • 执行await时,只要state不等于0,线程就会进入阻塞
    • 队列头是AbstractQueuedSynchronizer类型的,貌似只有Condition例外,其余的队列头应该都是AQS类型
    • countDown方法中,当state变为0时(一定要是变为0哦,不能本来就是0)
      • 执行doReleaseShared把同步队列里的第一个有效Node给unpark,其实就是上面被阻塞的那个主线程

    CyclicBarrier

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    // parties:栅栏放开前需要调用的线程数
    // count:当前剩余需要调用的线程数
    // barrierCommand:调用线程数满足后,栅栏放开前执行的命令
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    // 用法,每个线程调用await,耗完parties即可往下走
    CyclicBarrier.await
        final ReentrantLock lock = this.lock; // this就是CyclicBarrier
        lock.lock();                          // 先拿锁
        int index = --count;
        if (index == 0) {                     // 判断index,为0表示线程数已达到
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)          // 可能是空的,看构造函数了
                    command.run();
                ranAction = true;
                nextGeneration();             // signalAll,重置count,重置generation
                return 0;                     // await结束
            } finally {
                if (!ranAction)               // command.run异常时,打破栅栏,释放线程
                    breakBarrier();           // signalAll,重置count,设置broken属性
                }
            }
            
        trip.await                            // 自我阻塞
    
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();  // trip:lock生成的Condition
        // set up next generation
        count = parties;
        generation = new Generation();
    }
    
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    LockSupport

    /**
        * Makes available the permit for the given thread, if it
        * was not already available. If the thread was blocked on
        * {@code park} then it will unblock.  Otherwise, its next call
        * to {@code park} is guaranteed not to block. This operation
        * is not guaranteed to have any effect at all if the given
        * thread has not been started.
        *
        * @param thread the thread to unpark, or {@code null}, in which case
        *        this operation has no effect
    */
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    // 从上面注释可以看出,unpack可以释放park状态线程,或者将执行park的线程;不过对于还没开始执行的线程,unpark并不保证效果
    
    /**
        * Disables the current thread for thread scheduling purposes unless the
        * permit is available.
        *
        * <p>If the permit is available then it is consumed and the call
        * returns immediately; otherwise the current thread becomes disabled
        * for thread scheduling purposes and lies dormant until one of three
        * things happens:
        *
        * <ul>
        *
        * <li>Some other thread invokes {@link #unpark unpark} with the
        * current thread as the target; or
        *
        * <li>Some other thread {@linkplain Thread#interrupt interrupts}
        * the current thread; or
        *
        * <li>The call spuriously (that is, for no reason) returns.
        * </ul>
        *
        * <p>This method does <em>not</em> report which of these caused the
        * method to return. Callers should re-check the conditions which caused
        * the thread to park in the first place. Callers may also determine,
        * for example, the interrupt status of the thread upon return.
    */
        public static void park() {
            UNSAFE.park(false, 0L);
        }
    
    • 测试park的线程能不能被interrupt
      • 可以的,而且不抛异常

    相关文章

      网友评论

        本文标题:4-多线程-concurrent包

        本文链接:https://www.haomeiwen.com/subject/kxiixftx.html