美文网首页
32.Android架构-线程(三)

32.Android架构-线程(三)

作者: 任振铭 | 来源:发表于2020-09-19 11:40 被阅读0次

AbstractQueuedSynchronizer

队列同步器,简称AQS,是一个用来构建锁或者其他同步组件的基础框架,他使用了一个int型的成员变量来表示同步状态,内部通过FIFO队列来完成资源获取线程的排队工作

使用AQS实现的锁加锁和释放的过程(为了简单,以不可重入锁为例说明)

lock()方法调用时,尝试去获取锁,如果当前锁没有被其他线程持有,则获取成功,将state+1;如果当前锁已经被其他线程持有,则将当前线程加入CLH队列,此时还存在两种情况,1.队列为空,创建一个节点作为头,并将新加入的节点加入到队列作为尾节点,2.队列非空,直接将新加入的节点作为尾节点(设置为尾节点过程中涉及到了CAS的操作,即如果有其他线程也同时在做相同的操作,通过cas方式设置直到尾节点设置成功)。加入队列后判断一下自己的前置节点是不是head,如果是再次尝试获取锁,获取成功则将此节点设置为新的head,获取失败则挂起线程,等待前置节点释放锁

unlock()方法调用时,将持有锁的线程置空,并将state设置为0,找到它的next并唤醒它

AQS使用方式和其中的设计模式

AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。

private volatile int state;

在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

AQS基于CLH队列锁实现

CLH队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。

当一个线程需要获取锁时:
1.创建一个新的QNode,将其中的locked设置为true表示需要获取锁,myPred表示对其前驱结点的引用


图片1.png

2.线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred


图片2.png
3.线程B需要获得锁,同样的流程再来一遍
图片3.png
4.线程就在前驱结点的locked字段上旋转,直到前驱结点释放锁(前驱节点的锁值 locked == false)

5.当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点


图片4.png
如上图所示,前驱结点释放锁,线程A的myPred所指向的前驱结点的locked字段变为false,线程A就可以获取到锁。
CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个myNode,L个锁有L个tail)。CLH队列锁常用在SMP体系结构下。
Java中的AQS是CLH队列锁的一种变体实现。

AQS中的方法

acquire():独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquireInterruptibly:与acquire相同,但是该方法会响应中断,当线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptException并返回

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

tryAcquireNanos:在acquireInterruptibly基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步锁状态,那么就会返回false,如果获取到了就返回true

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

acquireShared:共享式的获取同步状态,如果当前线程为获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有过个线程获取到同步状态

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

acquireSharedInterruptibly:与acquireShared相同,该方法会响应中断

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireSharedNanos:在acquireSharedInterruptibly基础上增加了超时限制

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

release:独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

releaseShared:共享式的释放同步状态

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

getQueuedThreads:获取等待在同步队列上的线程集合

public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }

其中可以被重写的方法:

tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively

既然AQS是实现同步器的基础框架,那么我们怎么用他来实现一个我们自己的类似ReentrantLock的工具呢?
如下,定义一个类实现Lock接口(每个锁都要实现的接口),然后定义一个内部类,我们这里命名为MySync继承自AbstractQueuedSynchronizer,然后我们自定义的锁的所有操作都是交给这个内部类MySync实现的

public class MyLock implements Lock {
    MySync sync = new MySync();

    @Override
    public void lock() {
        System.out.println(Thread.currentThread().getName()+" ready get lock");
        sync.acquire(1);
        System.out.println(Thread.currentThread().getName()+" already got lock");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+" ready release lock");
        sync.release(1);
        System.out.println(Thread.currentThread().getName()+" already released lock");
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    static class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0){
                throw new IllegalMonitorStateException("already release lock");
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        protected Condition newCondition(){
            return new ConditionObject();
        }
    }
}

测试类

public class TestMyLock {

    public void test() {
        final Lock lock = new MyLock();
        for (int i = 0; i < 4; i++) {
            Worker w = new Worker(lock);
            w.start();
        }
    }

    public static void main(String[] args) {
        TestMyLock lock = new TestMyLock();
        lock.test();
    }

    class Worker extends Thread {
        private final Lock lock;

        public Worker(Lock lock) {
            this.lock = lock;
        }

        public void run() {
            lock.lock();
            System.out.println(Thread.currentThread().getName());
            try {
                SleepTools.second(3);
            } finally {
                lock.unlock();
            }
        }
    }
}


打印:
Thread-0 ready get lock
Thread-0 already got lock
Thread-0
Thread-1 ready get lock
Thread-2 ready get lock
Thread-3 ready get lock

Thread-0 ready release lock
Thread-0 already released lock
Thread-1 already got lock
Thread-1

Thread-1 ready release lock
Thread-1 already released lock
Thread-2 already got lock
Thread-2

Thread-2 ready release lock
Thread-2 already released lock
Thread-3 already got lock
Thread-3

Thread-3 ready release lock
Thread-3 already released lock

Process finished with exit code 0

我们这个锁有个缺陷,他目前是不可重入的,接下来我们实现一个自己的可重入锁。重入锁和非重入锁的差别其实并不大,关键就在于获取锁和释放锁的时候是否有判断当前线程是否是已经持有锁的线程这一步

public class MyReentrantLock implements Lock {
    MyReentrantSync reentrantSync = new MyReentrantSync();
    @Override
    public void lock() {
        reentrantSync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        reentrantSync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return reentrantSync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return reentrantSync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        reentrantSync.release(1);
    }

    @Override
    public Condition newCondition() {
        return reentrantSync.newCondition();
    }

    class MyReentrantSync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            if (Thread.currentThread() == getExclusiveOwnerThread()){
                setState(getState()+1);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()){
                throw new IllegalMonitorStateException();
            }
            if (getState() == 0){
                throw new IllegalMonitorStateException("already release");
            }
            setState(getState()-1);
            if (getState() == 0){
                setExclusiveOwnerThread(null);
            }
            return true;
        }
        protected Condition newCondition(){
            return new ConditionObject();
        }
    }
}

测试类

public class TestReenterSelfLock {

    static final Lock lock = new MyReentrantLock();

    public void reenter(int x){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+":递归层级:"+x);
            int y = x - 1;
            if (y==0) return;
            else{
                reenter(y);
            }
        } finally {
            lock.unlock();
        }
    }

    public void test() {
        for (int i = 0; i < 3; i++) {
            Worker w = new Worker();
            w.start();
        }
    }

    public static void main(String[] args) {
        TestReenterSelfLock testMyLock = new TestReenterSelfLock();
        testMyLock.test();
    }

    class Worker extends Thread {
        public void run() {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            reenter(3);
        }
    }
}

打印结果
Thread-0
Thread-1
Thread-2
Thread-0:递归层级:3
Thread-0:递归层级:2
Thread-0:递归层级:1
Thread-2:递归层级:3
Thread-2:递归层级:2
Thread-2:递归层级:1
Thread-1:递归层级:3
Thread-1:递归层级:2
Thread-1:递归层级:1

相关文章

  • 32.Android架构-线程(三)

    AbstractQueuedSynchronizer 队列同步器,简称AQS,是一个用来构建锁或者其他同步组件的基...

  • 了解反应器模式:基于线程和事件驱动

    为了处理Web请求,有两种竞争的Web体系架构:基于线程的架构和事件驱动型架构。 基于线程的架构 实现多线程服务器...

  • 线程池初探

    线程池架构图 线程池状态流转图 线程池主要参数介绍 corePoolSize: 核心线程数量 maximumPoo...

  • Java线程池详解1--概述

    线程池架构 Java的线程池架构如下图所示: Executor接口 该接口只提供了一个execute方法,该方法用...

  • 一篇文章完全理解Redis为什么这么快

    单线程架构 Redis使用了单线程架构和I/O多路复用模型来实现高性能的内存数据库. 为什么单线程还能这么快? 纯...

  • 面试重点学习资料

    hashmap和hashtable区别 对线程安全的理解 讲讲web三大架构 为什么要用struts做mvc 什么...

  • Mysql架构

    Mysql逻辑架构 Mysql是三层的逻辑架构,如下图 第一层:连接/线程处理连接处理器,授权认证,安全 第二层:...

  • RunLoop 和线程

    RunLoop 和 线程的关系 (基本理解) RunLoop 是线程的基础架构部分,Cocoa和CoreFund...

  • Java线程池详解(二)

    三、ThreadPoolExecutor解析 上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以...

  • MINA线程

    MINA 网络架构: 客户端到服务器端之间的交互流程 从线程的角度梳理一下,总共有三类线程: 一、IoAccept...

网友评论

      本文标题:32.Android架构-线程(三)

      本文链接:https://www.haomeiwen.com/subject/baedyktx.html