美文网首页
JAVA常用的锁机制

JAVA常用的锁机制

作者: tuacy | 来源:发表于2019-07-31 20:41 被阅读0次

           在开发多线程应用的时候(并发编程),为了防止多个线程同时去修改一个变量的时候产生数据不一致性。这个时候就必须要用到锁机制。有一个线程在修改的时候我先加锁,等修改完了在释放锁。资源被锁住的时候不允许其他线程修改。这也是我们JAVA里面锁的初衷。咱们JAVA里面的锁有:synchronized、ReentrantLock、ReadWriteLock、Semaphore、CountDownLatch。

           这样我们先讲怎么来使用这些锁,然后我们在讲锁的分类。

    一 锁的使用

    1.1 synchronized

           synchronized关键字可以用来给方法或者代码块加锁。synchronized是独享锁,互斥锁,可重入锁,非公平锁。

        /**
         * 相对于Object lockObject = new Object();而言
         * 推荐用byte[] lockObject = new byte[0]
         * 后者汇编语句少。执行快
         */
        private final byte[] lockObject = new byte[0];
    
        /**
         * synchronized用来给方法加锁
         */
        public synchronized void lockFunction() {
            //TODO:
        }
    
        public void lockBlok() {
            //TODO
            // synchronized给代码块加锁
            synchronized(lockObject) {
                //TODO
            }
            //TODO
        }
    

    1.2 ReentrantLock

           ReentrantLock重入锁。实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁。ReentrantLock是重入锁,互斥锁,既可以设置成公平锁也可以设置成非公平锁。

    稍微看下ReentrantLock的源码,AbstractQueuedSynchronizer这个类的代码我们没有分析。

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
    
        private final Sync sync;
    
        /**
         * lock锁的同步控制,子类有公平和非公平两个版本。
         * 该抽象锁同步器主要提供了尝试获取(非公平实现)和尝试释放锁等功能
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            // 获取锁,由子类实现
            abstract void lock();
    
            // 执行非公平的尝试获取锁操作
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                // 如果锁没有被占用,则尝试获取锁
                if (c == 0) {
                    // 新的线程可能抢占已经排队的线程的锁的使用权 -- 非公平
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程
                // 再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
                // 成功的获取锁的线程再次获取锁,只是增加了同步状态值
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            // 如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有
            // 同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放
            // 的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            // 判断是否当前线程独占锁
            protected final boolean isHeldExclusively() {
                // While we must in general read state before owner,
                // we don't need to do so to check if current thread is owner
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            // Methods relayed from outer class
    
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            final boolean isLocked() {
                return getState() != 0;
            }
    
            /**
             * Reconstitutes the instance from a stream (that is, deserializes it).
             */
            private void readObject(java.io.ObjectInputStream s)
                    throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
        /**
         * 继承Sync, 并实现非公平锁
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            // 阻塞获取锁, 当前线程优先去获取锁,获取失败在通过队列的方式获取
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        /**
         * 继承Sync, 并实现公平锁
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            // 阻塞获取锁
            final void lock() {
                acquire(1);
            }
    
            // 尝试获取锁(公平策略),不能保证获取,除非递归调用或没有其他的线程等待获取锁
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                // 没有前驱,且设置锁标志位成功,获取锁成功, hasQueuedPredecessors()证了不论是新的线程还是已经排队的线程都顺序使用锁
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                            compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 当前线程已经持有锁,重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
        /**
         * 创建ReentrantLock,默认是非公平锁
         */
        public ReentrantLock();
        public ReentrantLock(boolean fair);
    
        /**
         * 获取锁 拿不到lock就不罢休,不然线程就一直block
         * 1. 如果该锁没有被其他线程占用,则获取该锁并立即返回,将锁的保持计数设置为 1。
         * 2. 如果当前线程已经占用该锁,则将保持计数加 1,并且该方法立即返回。(可重入)。
         * 3. 如果该锁被另一个线程占用,该线程将一直block。直到其他线程释放该锁。在执行1操作。
         */
        public void lock();
    
        /**
         * lockInterruptibly方法和lock方法的区别在于,在阻塞等待获取锁的过程中是可以被其他线程
         * 打断的(比如在其他线程调用了获取锁线程的interrupt方法),而且这个时候会抛出InterruptedException
         */
        public void lockInterruptibly() throws InterruptedException;
    
        /**
         * 获取锁,马上返回,拿到lock就返回true,不然返回false。可以配置等待时间
         * 1. 如果该锁没有被其他线程占用,立即返回true值,则将锁的保持计数设置为1。
         * 即使已将此锁设置为使用公平排序策略,但是调用 tryLock() 仍将 立即获取锁(如果有可用的),
         * 而不管其他线程当前是否正在等待该锁。在某些情况下,此“闯入”行为可能很有用,即使它会打破公
         * 平性也如此。如果希望遵守此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS)
         * ,它几乎是等效的(也检测中断)
         * 2. 如果当前线程已经占有此锁,则将保持计+1,该方法将返回 true。
         * 3. 如果锁被另一个线程占用,则此方法将立即返回false值。
         */
        public boolean tryLock();
        public boolean tryLock(long timeout, TimeUnit unit);
    
        /**
         * 释放锁
         */
        public void unlock();
    
        /**
         * 创建Condition,特定的condition能唤醒特定的线程
         */
        public Condition newCondition();
    
        /**
         * 返回的是查询当前线程保存此lock的个数
         */
        public int getHoldCount();
    
        /**
         * 查询当前线程是否保持此锁定
         */
        public boolean isHeldByCurrentThread();
    
        /**
         * 查询此锁定是否由任意线程保持
         */
        public boolean isLocked();
    
        /**
         * 判断lock锁是公平锁还是非公平锁
         */
        public final boolean isFair();
    
        /**
         * 是否有线程在等待获取锁
         */
        public final boolean hasQueuedThreads();
    
        /**
         * 查询参数线程是否在等待获取此锁
         */
        public final boolean hasQueuedThread(Thread thread);
    
        /**
         * 查询等待获取此锁线程的数量
         */
        public final int getQueueLength();
    
        /**
         * 作用是检测当前是否有线程已调用condition.await()并且处于await状态
         */
        public boolean hasWaiters(Condition condition);
    
        /**
         * 作用是检测当前是否有线程已调用condition.await()并且处于await状态的个数有多少个
         */
        public int getWaitQueueLength(Condition condition);
    
    }
    
    public interface Condition {
        /**
        *Condition线程进入阻塞状态,调用signal()或者signalAll()再次唤醒,
        *允许中断如果在阻塞时锁持有线程中断,会抛出异常;
        *重要一点是:在当前持有Lock的线程中,当外部调用会await()后,ReentrantLock就允许其他线程来抢夺锁当前锁,
        *注意:通过创建Condition对象来使线程wait,必须先执行lock.lock方法获得锁
        */
        void await() throws InterruptedException;
    
        //Condition线程进入阻塞状态,调用signal()或者signalAll()再次唤醒,不允许中断,如果在阻塞时锁持有线程中断,继续等待唤醒
        void awaitUninterruptibly();
    
        //设置阻塞时间,超时继续,超时时间单位为纳秒,其他同await();返回时间大于零,表示是被唤醒,等待时间并且可以作为等待时间期望值,小于零表示超时
        long awaitNanos(long nanosTimeout) throws InterruptedException;
    
        //类似awaitNanos(long nanosTimeout);返回值:被唤醒true,超时false
        boolean await(long time, TimeUnit unit) throws InterruptedException;
    
       //类似await(long time, TimeUnit unit) 
        boolean awaitUntil(Date deadline) throws InterruptedException;
    
       //唤醒指定线程
        void signal();
        
        //唤醒全部线程
        void signalAll();
    }
    

           ReentrantLock实现简单的加锁。lock()、unlock()的使用。

        private final ReentrantLock reentrantLock = new ReentrantLock();
        private int value = 0;
    
        //----------------------------ReentrantLock lock 的使用
        @Test
        public void lock() {
            CountDownLatch latch = new CountDownLatch(20);
            for (int index = 0; index < 20; index++) {
                new Thread(() -> {
                    // 使用之前先获取锁
                    reentrantLock.lock();
                    try {
                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                        value = value + 1;
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 使用完之后释放锁
                        reentrantLock.unlock();
                        latch.countDown();
                    }
                }).start();
            }
    
            try {
                // 等待所有的线程执行完
                latch.await();
                System.out.println("value = " + value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    

           ReentrantLock Condition的使用,Condition的强大之处在于它可以为多个线程间建立不同的Condition。下面我们用Codition实现一个简单的生产消费者。(代码是直接网上找的)

        //-------------------------ReentrantLock condition 的使用
    
        class Buffer {
            private final ReentrantLock reentrantLock;
            private final Condition fullCondition;
            private final Condition emptyCondition;
            private final int maxSize;
            private final List<Date> storage;
    
            Buffer(int size) {
                // 使用锁lock,并且创建两个condition,相当于两个阻塞队列
                reentrantLock = new ReentrantLock();
                fullCondition = reentrantLock.newCondition();
                emptyCondition = reentrantLock.newCondition();
                maxSize = size;
                storage = new LinkedList<>();
            }
    
            // 往队列里面放数据
            public void put() {
                reentrantLock.lock();
                try {
                    while (storage.size() == maxSize) {
                        // 如果队列满了
                        System.out.print(Thread.currentThread().getName() + ": wait \n");
                        // 阻塞生产线程
                        fullCondition.await();
                    }
                    storage.add(new Date());
                    System.out.print(Thread.currentThread().getName() + ": put:" + storage.size() + "\n");
                    Thread.sleep(1000);
                    emptyCondition.signalAll();//唤醒消费线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }
    
            // 从队列里面取出数据
            public void take() {
                reentrantLock.lock();
                try {
                    while (storage.size() == 0) {
                        // 如果队列满了
                        System.out.print(Thread.currentThread().getName() + ": wait \n");
                        // 阻塞消费线程
                        emptyCondition.await();
                    }
                    Date d = ((LinkedList<Date>) storage).poll();
                    System.out.print(Thread.currentThread().getName() + ": take:" + storage.size() + "\n");
                    Thread.sleep(1000);
                    // 唤醒生产线程
                    fullCondition.signalAll();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }
        }
    
        // 生产者
        class Producer implements Runnable {
            private Buffer buffer;
    
            Producer(Buffer b) {
                buffer = b;
            }
    
            @Override
            public void run() {
                while (true) {
                    buffer.put();
                }
            }
        }
    
        // 消费者
        class Consumer implements Runnable {
            private Buffer buffer;
    
            Consumer(Buffer b) {
                buffer = b;
            }
    
            @Override
            public void run() {
                while (true) {
                    buffer.take();
                }
            }
        }
    
        @Test
        public void condition() {
            Buffer buffer = new Buffer(10);
            Producer producer = new Producer(buffer);
            Consumer consumer = new Consumer(buffer);
            for (int i = 0; i < 3; i++) {
                new Thread(producer, "producer-" + i).start();
            }
            for (int i = 0; i < 3; i++) {
                new Thread(consumer, "consumer-" + i).start();
            }
    
            try {
                Thread.currentThread().join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    1.3 ReadWriteLock

           ReadWriteLock读写锁,里面包含两个锁:读锁、写锁。

    public interface ReadWriteLock {
        /**
         * 读锁
         */
        Lock readLock();
    
        /**
         * 写锁
         */
        Lock writeLock();
    }
    

           ReadWriteLock让我们可以读写分离,分别对读和写上不同的锁。从而可以做到读和读互不影响,读和写互斥,写和写互斥,提高读的效率。ReadWriteLock接口也有一个实现类ReentrantReadWriteLock。有兴趣的可以去分析下ReentrantReadWriteLock具体的实现逻辑。

           ReadWriteLock锁的特点:

    • 如果其他线程占领了读锁,其他线程想要获取读锁。不需要等到,直接可以获取到。(读和读互不影响)
    • 如果其他线程占领了读锁,其他线程想要获取写锁。要等待读锁完成。(读和写互斥)
    • 如果其他线程占领了写锁,其他线程想要获取读锁或者写锁。要等到写锁的完成。(写和写互斥,写和读互斥)
        private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
        // 读操作
        private void readFile(Thread thread) {
            readWriteLock.readLock().lock();
            boolean readLock = readWriteLock.isWriteLocked();
            if (!readLock) {
                System.out.println("当前为读锁!");
            }
            try {
                for (int i = 0; i < 5; i++) {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(thread.getName() + ":正在进行读操作……");
                }
                System.out.println(thread.getName() + ":读操作完毕!");
            } finally {
                System.out.println("释放读锁!");
                readWriteLock.readLock().unlock();
            }
        }
    
        // 写操作
        private void writeFile(Thread thread) {
            readWriteLock.writeLock().lock();
            boolean writeLock = readWriteLock.isWriteLocked();
            if (writeLock) {
                System.out.println("当前为写锁!");
            }
            try {
                for (int i = 0; i < 5; i++) {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(thread.getName() + ":正在进行写操作……");
                }
                System.out.println(thread.getName() + ":写操作完毕!");
            } finally {
                System.out.println("释放写锁!");
                readWriteLock.writeLock().unlock();
            }
        }
    
    
        @Test
        public void readWriteLock() {
    
            ExecutorService readService = Executors.newCachedThreadPool();
            readService.execute(new Runnable() {
                @Override
                public void run() {
                    readFile(Thread.currentThread());
                }
            });
            ExecutorService writeService = Executors.newCachedThreadPool();
            writeService.execute(new Runnable() {
                @Override
                public void run() {
                    writeFile(Thread.currentThread());
                }
            });
    
            try {
                Thread.currentThread().join();
            } catch (InterruptedException e) {
                // ignore
            }
        }
    

    1.4 Semaphore

           Semaphore作用是控制线程的并发数量。就这一点而言,单纯的synchronized关键字是实现不了的。Semaphore类是一个计数信号量,必须由获取它的线程释放,通常用于限制可以访问某些资源(物理或逻辑的)线程数目。一个信号量有且仅有3种操作,且它们全部是原子的:初始化、增加和减少:增加可以为一个进程解除阻塞、减少可以让一个进程进入阻塞。

           针对Semaphore,网上有一个很形象的比喻,Semaphore主要用于控制当前活动线程数目,就如同停车场系统一般,而Semaphore则相当于看守的人,用于控制总共允许停车的停车位的个数,而对于每辆车来说就如同一个线程,线程需要通过acquire()方法获取许可,而release()释放许可。如果许可数达到最大活动数,那么调用acquire()之后,便进入等待队列,等待已获得许可的线程释放许可,从而使得多线程能够合理的运行。

    例子比较简单,我就直接从网上copy了一份过来了,

        // 同步关键类,构造方法传入的数字是多少,则同一个时刻,只运行多少个线程同时运行制定代码
        private static final Semaphore semaphore = new Semaphore(3);
    
        private static class InformationThread extends Thread {
            private final String name;
            private final int age;
    
            InformationThread(String name, int age) {
                this.name = name;
                this.age = age;
            }
    
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + ":大家好,我是:" + name + ",我今年:" + age + "岁。当前时间为:" + System.currentTimeMillis());
                    Thread.sleep(1000);
                    System.out.println(name + "要准备释放许可证了,当前时间为:" + System.currentTimeMillis());
                    System.out.println("当前可使用的许可数为:" + semaphore.availablePermits());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }
        }
    
        @Test
        public void semaphore() {
            String[] name = {"李明", "王五", "张杰", "王强", "赵二", "李四", "张三"};
            int[] age = {26, 27, 33, 45, 19, 23, 41};
            for (int i = 0; i < name.length; i++) {
                Thread t1 = new InformationThread(name[i], age[i]);
                t1.start();
            }
    
            try {
                Thread.currentThread().join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    1.5 CountDownLatch

           CountDownLatch也是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化CountDownLatch,由于调用了countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞。只有到零的之后,会释放所有等待的线程,await的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用CyclicBarrier。

        @Test
        public void countDownLatch() {
            // CountDownLatch 初始值给10
            CountDownLatch countDownLatch = new CountDownLatch(10);
            // 启动10个线程,每个线程countDown
            for(int index = 0; index < 10; index++) {
                new Thread(() -> {
                    try {
                        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
                    } finally {
                        // countDownLatch减1
                        countDownLatch.countDown();
                    }
                }).start();
            }
            try {
                // 等待countDownLatch减为0
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    二 锁的分类

    2.1 乐观锁/悲观锁

           乐观锁/悲观锁不是指具体类型的锁,是人们定义出来的概念或思想。主要是指看待并发同步的角度。

    2.1.1 乐观锁

           乐观锁喜欢把所有的事情都往好的方面去想,认为不存在很多并发更新操作,每次去读取数据的时候,不会同时有其他线程去修改数据,不需要加锁操作。但是在更新时会判断其他线程在这之前有没有对数据进行修改。一般会使用“数据版本机制”或“CAS操作”来实现

           数据库版本机制乐观锁的实现一般采用版本号实现乐观锁,而Java中可使用CAS实现乐观锁。

    2.1.1.1 数据库版本机制实现乐观锁

           数据库版本机制实现乐观锁:一般在数据表中加上一个数据版本号version字段,表示数据修改的次数,当数据修改时version+1。当线程A要更新数据的时候,在读取数据的同时也会把version读回来。处理完业务逻辑开始更新的时候,会再次查看version的值是否和数据库中version的值相等。如果相等更新。否则拒绝更新,你需要重新去读取version再次提交更新。

    2.1.1.2 JAVA CAS操作实现乐观锁

           CAS操作实现乐观锁:CAS(Compare and Seap 比较和交换),当多个线程尝试使用CAS更新同一个变量的时候,只有其中一个线程能更新成功,其他线程都失败。这些失败的线程并不会被挂起,而是立马告之这次竞争操作失败。需要你自己去在次尝试。CAS操作中包含三个操作数:需要读取的内存位置(V)、进行比较的预期原值(A)、拟写入的新值(B)。如果内存位置对应的值(V)于预期原值(A)相匹配,那么处理器会自动将该内存位置的值更新为新值(B),否则处理器不做任何操作。

           JDK里面大量源码也是通过CAS来提供线程安全操作。在java.util.concurrent.atomic包下面。我们人为的把这个包里面的CAS实现线程安全操作分为两类:AtomicInteger、AtomicBoolean、AtomicLong、AtomicLongArray分为一类通过Unsafe实现乐观锁;AtomicIntegerFieldUpdater、AtomicLongFieldUpdater则是另一类他们要先通过反射获取到字段信息。接下来我们大概看下他们里面实现的原理。

    2.1.1.2.1 AtomicInteger

           AtomicInteger、AtomicBoolean、AtomicLong、AtomicLongArray都是一样的,我们以AtomicInteger来讲。内部是通过Unsafe类来实现乐观锁。实现原理如下:

    public class AtomicInteger extends Number implements java.io.Serializable {
    
        // unsafe来保证改变value的值是线程安全的
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        // value值的偏移量,static保证了所有的线程都同一份实例
        private static final long valueOffset;
    
        // 类初使化的时候通过unsafe能够得到变量的偏移量
        static {
            try {
                valueOffset = unsafe.objectFieldOffset
                        (AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        // 这里声明为volatile,保证了线程间的可见性。
        private volatile int value;
    
        ...
    
        /**
         * 设置新值,并且返回之前的值
         */
        public final int getAndSet(int newValue) {
            return unsafe.getAndSetInt(this, valueOffset, newValue);
        }
    
        ...
    
    }
    

    getAndSet()函数调用的是Unsafe类里面的getAndSetInt()函数。

    public final class Unsafe {
    
        ...
    
        /**
         * compareAndSwapInt函数里面会做几个步骤:
         * 1. 先获取内存中的值(getIntVolatile把值取出来)
         * 2. 和之前的值做比较
         * 3. 相同则更新,否则重试重新读直,在做这几个操作一直到成功
         */
        public final int getAndSetInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    
            return var5;
        }
    
        ...
    
    
    }
    
    2.1.1.2.1 AtomicIntegerFieldUpdater

           AtomicIntegerFieldUpdater、AtomicLongFieldUpdater。可以对指定类的指定 volatile字段进行原子更新。他们是基于反射的实用工具,先通过反射得到需要操作的字段,之后统一是利用Unsafe类来实现CAS的操作。我们以AtomicIntegerFieldUpdater做实例说明。

    public abstract class AtomicIntegerFieldUpdater<T> {
        /**
         * 定义AtomicIntegerFieldUpdater对象,两个参数第一个参数指定类,第二个参数类里面的某个字段
         * 主意里面返回的是AtomicIntegerFieldUpdaterImpl对想,我们直接看AtomicIntegerFieldUpdaterImpl对象
         */
        @CallerSensitive
        public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                                  String fieldName) {
            return new AtomicIntegerFieldUpdaterImpl<U>
                    (tclass, fieldName, Reflection.getCallerClass());
        }
    
        ...
    
        /**
         * Standard hotspot implementation using intrinsics
         */
        private static class AtomicIntegerFieldUpdaterImpl<T>
                extends AtomicIntegerFieldUpdater<T> {
            // 通过Unsafe来实现CAS的操作
            private static final Unsafe unsafe = Unsafe.getUnsafe();
            // 传入filed的偏移量
            private final long offset;
            private final Class<T> tclass;
            private final Class<?> cclass;
    
            AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                                          final String fieldName,
                                          final Class<?> caller) {
                final Field field;
                final int modifiers;
                try {
                    // 通过反射得到对应的Feld
                    field = AccessController.doPrivileged(
                            new PrivilegedExceptionAction<Field>() {
                                public Field run() throws NoSuchFieldException {
                                    return tclass.getDeclaredField(fieldName);
                                }
                            });
                    modifiers = field.getModifiers();
                    sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                            caller, tclass, null, modifiers);
                    ClassLoader cl = tclass.getClassLoader();
                    ClassLoader ccl = caller.getClassLoader();
                    if ((ccl != null) && (ccl != cl) &&
                            ((cl == null) || !isAncestor(cl, ccl))) {
                        sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                    }
                } catch (PrivilegedActionException pae) {
                    throw new RuntimeException(pae.getException());
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
    
                // 一定要是指定的类型
                Class<?> fieldt = field.getType();
                if (fieldt != int.class)
                    throw new IllegalArgumentException("Must be integer type");
    
                // 需要声明为volatile
                if (!Modifier.isVolatile(modifiers))
                    throw new IllegalArgumentException("Must be volatile type");
    
                this.cclass = (Modifier.isProtected(modifiers) &&
                        caller != tclass) ? caller : null;
                this.tclass = tclass;
                // 通过unsafe得到偏移量
                offset = unsafe.objectFieldOffset(field);
            }
            
        }
    }
    

           通过分析代码我们发现AtomicIntegerFieldUpdater比我们上面分析的AtomicInteger多做了一步。他先通过反射拿到offset偏移量。怕有些人不知道AtomicIntegerFieldUpdater类怎么使用,一个简单的使用实例如下:

        public static final AtomicIntegerFieldUpdater<Person> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
    
        @Test
        public void atomicIntegerFieldUpdater() {
    
            // 可以在多个线程里面操作,线程安全
            Person person = new Person();
            fieldUpdater.set(person, 10);
    
        }
    
        class Person {
            private volatile int age;
    
            public int getAge() {
                return age;
            }
    
            public void setAge(int age) {
                this.age = age;
            }
        }
    

    2.1.2 悲观锁

           悲观锁喜欢把所有的事情都往坏处想,认为一定存在很多并发更新操作。一定要采取加锁操作,如果不加锁一定会有问题。换句话说就是我在操作的时候阻止其他任何人做操作,一定要等我操作完了。你们才可以操作。比如:我们经常使用synchronized关键字来加锁。不管是修饰代码块还是修饰变量还是修饰方法。其实这个时候我们就用上悲观锁了(因为我们不管三七二十一,每次访问的时候,我们都认为有人会修改,我们每次访问的时候都锁住了)。

    2.2 独享锁/共享锁

           独享锁是指该锁一次只能被一个线程所持有。共享锁是指该锁可以被多个线程所持有,可以多个线程访问。

    • 独享锁:ReentrantLock、ReadWriteLock中的写锁、synchronized某种意义上来说也是独享锁。
    • 共享锁:Semaphore、CountDownLatch、ReadWriteLock中的读锁。

           独享锁与共享锁是通过AQS(AbstractQueuedSynchronizer的简称)来实现的,通过实现不同的方法,来实现独享或者共享。AQS使用一个整型的volatile变量(命名为state)来维护同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。推荐大家去看下AbstractQueuedSynchronizer类的实现逻辑。

    2.3 互斥锁/读写锁

           上面讲的独享锁/共享锁是一种广义的说法,互斥锁/读写锁就是他们的具体的实现。

    • 互斥锁:和独享锁概念一样,同一时刻只能被一个线程访问。JAVA中的具体实现就是ReentrantLock。
    • 读写锁:同时持有读写锁,读写锁既是互斥锁,又是共享锁,read模式是共享,write是互斥(排它锁)的。JAVA中对应ReadWriteLock。

    2.4 可重入锁

           可重入锁又叫递归锁,简单来说就是同一个线程可以多次获取锁。ReentrantLock和synchoronize都是可重入锁。

    2.5 公平锁/非公平锁

           公平锁指的是多个线程按照申请的顺序来获取锁,非公平锁指的是多个线程获取锁的顺序不是按申请顺序来的。有可能造成优先级反转或者饥饿的现象。对于ReetrantLock而言我们可以通过构造函数来指定该锁是否公平锁。默认是非公平锁(非公平锁性能稍微高点,吞吐量稍微大一点)。Synchronized也是非公平锁。

    2.6 分段锁

           分段锁是一种锁的设计,并不是具体的一种锁。在JDK1.7中ConcurrentHashMap采用了数组+Segment+分段锁的方式实现。ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁(每个Segment都是一个ReentrantLock),当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。

           我们用一个图来概况下ConcurrentHashMap的内部实现。

    ConcurrentHashMap.png

    2.7 偏向锁/轻量级锁/重量级锁

           偏向锁/轻量级锁/重量级锁这三种锁是指锁的状态,并且是针对Synchronized。为了换取性能。在Java5通过引入锁升级的机制来实现高效Synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。

    • 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
    • 轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
    • 重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让他申请的线程进入阻塞,性能降低。

    2.8 自旋锁

           在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。

    一个自旋锁的简单实现。

    public class SpinLock {
    
        private final AtomicBoolean spinLock = new AtomicBoolean(true);
    
        /**
         * 获取锁
         */
        public void lock() {
            boolean flag;
            // 一直去获取锁
            do {
                // 只有当spinLock的值为true的时候,我们才可以获取成功,并且把值设置为false。
                flag = spinLock.compareAndSet(true, false);
            } while (!flag);
        }
    
        /**
         * 释放锁
         */
        public void unlock() {
            spinLock.compareAndSet(false, true);
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:JAVA常用的锁机制

          本文链接:https://www.haomeiwen.com/subject/ecjxdctx.html