美文网首页
Java常用并发工具及其实现

Java常用并发工具及其实现

作者: 橙味菌 | 来源:发表于2019-10-03 16:52 被阅读0次

    Java并发工具

    并发工具实现一般都将实现了AQS的自定义同步器sync定义为内部类,而同步类实现某个接口对外服务,sync只需实现state的 获取-释放方式tryAcquire-tryRelease,线程排队、等待、唤醒都由上层AQS实现

    AQS基础原理

    闭锁CountDownLatch

    用以协调多个线程之间的同步和通信

    说明 函数
    等待计数器归零(可中断) await()
    限时等待计数器归零 await(long time,TimeUnit unit)
    计数器-- countDown()
    获取当前计数器 getCount()

    使用共享式AQS,await对应acquireSharedInterruptibly(1),countDown对应releaseShared(1)

    //尝试获取:state归零方可成功获取
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    //尝试释放:当前state--
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    

    计数信号量 Semephore

    允许n个任务同时访问某个资源,可将信号量看作向外分发资源的许可证

    作用 API
    构造一个信号量 Semaphore(int permits, boolean fair)
    获取一个【指定个】资源,可中断 acquire(【int permits】)
    释放一个【指定个】资源 release(【int permits】)
    可用资源数 availablePermits
    减少一个资源 drainPermits
    减少制定个资源 reducePermits(int reduction)
    获取入队获取资源的Sync队列长度 getQueueLength
    获取入队获取资源的线程集合 getQueuedThreads
    获取一个【指定个】资源,不可中断 acquireUninterruptibly(【int permits】)
    尝试获取一个【指定个】资源<br />返回是否成功<br />非公平型 tryAcquire(【int permits】)
    尝试在指定时限内获取一个【指定个】资源<br />返回是否成功 tryAcquire(【int permits】,long timeout, TimeUnit unit)

    公平型——tryAcquireShared要判断是否有其他节点排在此节点前,无方可进行获取

    非公平型——tryAcquireShared直接获取资源

    //非公平尝试获取:state-获取数 >= 0方可获取,state-获取数
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    //尝试释放:state+释放数
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    

    可重入锁 ReentrantLock

    递归无阻塞的同步机制

    作用 API
    lock
    解锁 unLock
    可中断锁 lockInterruptibly
    限时等待锁<br />配合Condition的方法await(),signal(),signalAll()使用(必须在获得锁后) tryLock
    //非公平尝试获取:无线程占有锁——占有锁并设置资源为获取数,当前线程占有锁——state+获取数
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    //尝试释放:当前线程拥有锁时方可释放,state-释放数,若state归0则释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    可重入读写锁 ReentrantReadWriteLock

    ReentrantReadWriteLock 读写锁,适用于读多写少的并发情况
    有写锁,除写锁持有线程外无法获取读锁和写锁

    作用 API
    构造一个可重入读写锁 ReentrantReadWriteLock(boolean fair)
    获取读锁ReentrantReadWriteLock.ReadLock readLock
    获取写锁ReentrantReadWriteLock.WriteLock writeLock

    state 高16位读状态,低16位写状态
    读锁使用共享模式,写锁使用独占模式

    写锁

    写锁lock

    public void lock() {
        sync.acquire(1);
    }
    //独占式尝试获取,条件无读锁且无其他线程写锁,state+获取数
    protected final boolean tryAcquire(int acquires) {    
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            //有读线程或者有其他写线程——失败
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //当前线程有其他写操作——state++
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        //尝试state+获取资源数,设置当前线程独占锁
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    

    写锁unLock

    public void unlock() {
        sync.release(1);
    }
    //独占式尝试释放:state-释放数,state归零则释放锁
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    

    读锁

    读锁lock

    public void lock() {
        sync.acquireShared(1);
    }
    //共享式尝试获取:有其他写线程-失败->当前读线程不应阻塞&读锁未达上限->state+1个读单元
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        //有其他写线程且非当前线程——失败
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        //读锁不应阻塞且读锁未达到上限(65535),CAS设置state+1个读单元(65536)
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        //读锁应该阻塞或达到上限
        return fullTryAcquireShared(current);
    }
    
    final int fullTryAcquireShared(Thread current) {
        
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            //有写锁且非当前线程——失败
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            //读锁应该阻塞
            } else if (readerShouldBlock()) {
                //第一个获取读锁的线程是当前线程
                if (firstReader == current) {
            
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    

    读锁unLock

    //共享式尝试释放
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    

    循环栅栏 CyclicBarrier

    允许一组线程互相等待直到全部到达栅栏位置

    作用 API
    构造一个循环栅栏 CyclicBarrier(int 互相等待数量,[Runnabl全部抵达栅栏时的回调方法])
    等待直到所有线程都调用此方法<br />返回此现场到达栅栏的序号 await
    重置栅栏 reset
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        final ReentrantLock lock = this.lock;
        //可重入锁
        lock.lock();
        try {
            final Generation g = generation;
    
            //栅栏已被破坏——破栅栏异常
            if (g.broken)
                throw new BrokenBarrierException();
    
            //线程被中断——破环栅栏,中断异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            //序号
            int index = --count;
            //最终到达栅栏者——回调事件并唤醒所有之前抵达的线程
            if (index == 0) {  
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // 等待其他线程抵达
            for (;;) {
                try {
                    if (!timed)
                        //无时限等待(开锁)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    

    交换数据栅栏 Exchanger

    允许两个任务交换对象

    方法
    V exchange(V data) 调用线程陷入阻塞状态直到另一线程调用此方法,然后以线程安全的方式交换数据,释放线程

    相关文章

      网友评论

          本文标题:Java常用并发工具及其实现

          本文链接:https://www.haomeiwen.com/subject/jgidpctx.html