美文网首页
乐观锁和悲观锁

乐观锁和悲观锁

作者: Jorvi | 来源:发表于2018-11-08 19:53 被阅读0次

    参考:
    https://www.cnblogs.com/qjjazry/p/6581568.html
    https://www.cnblogs.com/xuyuanjia/p/6027414.html

    悲观锁(Pessimistic Locking):
    总是假设最坏的情况发生,因此每次在取数据的时候就会加锁,操作完成后才释放锁。

    乐观锁(Optimistic Locking):
    假设大数据情况下不会发生数据冲突,因此在取数据的时候不加锁,只有在更新的时候判断该数据是否在此期间被修改过了。

    1. Java中的悲观锁和乐观锁

    • 悲观锁
      Java中典型的悲观锁就是synchronized。

    • 乐观锁
      java.util.concurrent.atomic包下面的原子变量类就使用了乐观锁实现。
      Compare and Swap(CAS)是一种乐观锁的实现方式。
      CAS基本原理:CAS有三个要素:需要读写的内存位置(V)、进行比较的预期原值(A)和拟写入的新值(B),如果发现位置V的值和预期原值A相同,则将新值B更新到V处,否则不处理。

    以AtomicInteger为例,看看CAS原理。

    public class AtomicInteger extends Number implements java.io.Serializable {
        // setup to use Unsafe.compareAndSwapInt for updates
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long valueOffset;
    
        static {
            try {
                valueOffset = unsafe.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        private volatile int value;
    
        /**
         * Atomically increments by one the current value.
         *
         * @return the previous value
         */
        public final int getAndIncrement() {
            return unsafe.getAndAddInt(this, valueOffset, 1);
        }
    }
    

    private static final Unsafe unsafe = Unsafe.getUnsafe()
    AtomicInteger类使用sun.misc.Unsafe类的compareAndSwapInt()方法执行更新操作。Unsafe是JDK的内部工具类,通过调用Native方法(C/C++)可以直接读写内存、获得地址偏移值、锁定或释放线程等。

    private volatile int value
    将value声明为volatile,可保证线程间的数据可见性,但不能保证原子性。

    private static final long valueOffset
    valueOffset表示value字段相对于对象起始地址的偏移量,利用valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"))方法获得,该方法首先通过字段名value获取到该字段的Field,然后调用Unsafe的objectFieldOffset方法获取value字段相对于对象起始地址的偏移量。

    public final int getAndIncrement()
    该法的功能是获取值并+1,它调用unsafe.getAndAddInt(this, valueOffset, 1):

    public final int getAndAddInt(Object arg0, long arg1, int arg3) {
        int arg4;
        do {
            arg4 = this.getIntVolatile(arg0, arg1);
        } while (!this.compareAndSwapInt(arg0, arg1, arg4, arg4 + arg3));
        return arg4;
    }
    
    public native int getIntVolatile(Object arg0, long arg1);
    
    public final native boolean compareAndSwapInt(Object arg0, long arg1, int arg3, int arg4);
    

    compareAndSwapInt()就是乐观锁的一种实现(CAS)。
    期望值:getIntVolatile()方法获取主内存中的value值;
    读写的位置:工作内存中对象+偏移量;
    拟写入的新值:期望值+1
    如果期望值和读写位置上值相同,则在读写位置上写入新值,返回true,否则不写入新值,返回false。

    getAndAddInt()方法利用循环,不停的CAS直到写入成功为止。

    CAS的缺陷:ABA问题
    线程1从位置V上取出A,线程2从位置V上也取出A;
    线程2将A改成了B,然后又改成了A;
    线程1根据CAS操作时,发现期望值是A,和原值相同,则执行成功。
    但这个过程存在问题,线程1感知不到A-B-A的过程。

    解决:
    从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。
    该类本质上是在原来的CAS基础上加入了一个int类型的stamp(版本号),每次更新的时候检查当前Reference和期望的Reference是否相同,当前stamp和期望的stamp是否相同,如果相同则更新并返回true,否则啥也不做返回false。

    2. 数据库中的悲观锁和乐观锁

    建表(test)并插入一条数据:

    id count
    1 0
    • 无锁

    封装Task:每次查出count值并+1更新

    /**
     * 封装Task(无锁)
     */
    public class TaskWithOutLock implements Runnable {
    
        @Override
        public void run() {
            int id = 1;
            int count = 0;
            Connection connection = DBUtil.getConnection();
            PreparedStatement psQuery = null;
            PreparedStatement psUpdate = null;
    
            for (int i = 0; i < 1000; i++) {
                try {
                    // 查询数据
                    String querySql = "select count from test where id = ?";
                    psQuery = connection.prepareStatement(querySql);
                    psQuery.setInt(1, id);
                    ResultSet rs = psQuery.executeQuery();
                    if (rs.next()) {
                        AtomicInteger atomicInteger = new AtomicInteger(rs.getInt("count"));
                        atomicInteger.getAndIncrement();
                        count = atomicInteger.get();
                    }
    
                    // 更新数据
                    String updateSql = "update test set count = ? where id = ?";
                    psUpdate = connection.prepareStatement(updateSql);
                    psUpdate.setInt(1, count);
                    psUpdate.setInt(2, id);
                    Integer res = psUpdate.executeUpdate();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
    
            try {
                psUpdate.close();
                psQuery.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    

    注:由于count变量是各线程私有的,所以不用AtomicInteger也可以。

    测试:多线程跑任务

        public static void main(String[] args) throws InterruptedException {
            int corePoolSize = 2;
            ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());
    
            for (int i = 0; i < 2; i++) {
                TaskWithOutLock task = new TaskWithOutLock();
                pool.execute(task);
            }
    
            // 线程池不再接收新任务,但线程池中的任务继续执行
            pool.shutdown();
    
            // 阻塞当前线程直到 线程池中所有任务完成 或 超时 或 当前线程被中断
            pool.awaitTermination(5, TimeUnit.MINUTES);
    
            System.out.println("DONE!");
        }
    

    结果:
    数据库中count变为1993,可见在不加锁的情况下,计算结果与期望的2000不同。

    • 悲观锁
      封装Task:
    /**
     * 封装Task(悲观锁)
     */
    public class TaskPessimisticLock implements Runnable {
    
        @Override
        public void run() {
            int id = 1;
            int count = 0;
            Connection connection = DBUtil.getConnection();
            PreparedStatement psQuery = null;
            PreparedStatement psUpdate = null;
    
            for (int i = 0; i < 1000; i++) {
                try {
                    // 开启事务
                    connection.setAutoCommit(false);
    
                    // 查询数据,加悲观锁
                    String querySql = "select count from test where id = ? for update";
                    psQuery = connection.prepareStatement(querySql);
                    psQuery.setInt(1, id);
                    ResultSet rs = psQuery.executeQuery();
                    if (rs.next()) {
                        AtomicInteger atomicInteger = new AtomicInteger(rs.getInt("count"));
                        atomicInteger.getAndIncrement();
                        count = atomicInteger.get();
                    }
    
                    // 更新数据
                    String updateSql = "update test set count = ? where id = ?";
                    psUpdate = connection.prepareStatement(updateSql);
                    psUpdate.setInt(1, count);
                    psUpdate.setInt(2, id);
                    Integer res = psUpdate.executeUpdate();
    
                    // 提交事务
                    connection.commit();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
    
            try {
                psUpdate.close();
                psQuery.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    

    (1)开启事务
    (2)select count from test where id = ? for update打开悲观锁
    (3)提交事务
    注:悲观锁必须在事务中间,否则不生效。

    测试:

        public static void main(String[] args) throws InterruptedException {
            int corePoolSize = 2;
            ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());
    
            for (int i = 0; i < 2; i++) {
                TaskPessimisticLock task = new TaskPessimisticLock();
                pool.execute(task);
            }
    
            // 线程池不再接收新任务,但线程池中的任务继续执行
            pool.shutdown();
    
            // 阻塞当前线程直到 线程池中所有任务完成 或 超时 或 当前线程被中断
            pool.awaitTermination(5, TimeUnit.MINUTES);
    
            System.out.println("DONE!");
        }
    

    结果:
    count值为2000,与预期值相同。

    • 乐观锁
      修改表结构,加入version字段:
    id count version
    1 0 0

    封装Task:

    /**
     * 封装Task(乐观锁)
     */
    public class TaskOptimisticLock implements Runnable {
    
        @Override
        public void run() {
            int id = 1;
            int count = 0;
            int version = 0;
            Connection connection = DBUtil.getConnection();
            PreparedStatement psQuery = null;
            PreparedStatement psUpdate = null;
    
            for (int i = 0; i < 1000; i++) {
                try {
                    // 循环直到更新完成
                    for (;;) {
                        // 查询数据
                        String querySql = "select count,version from test where id = ?";
                        psQuery = connection.prepareStatement(querySql);
                        psQuery.setInt(1, id);
                        ResultSet rs = psQuery.executeQuery();
                        if (rs.next()) {
                            AtomicInteger atomicInteger = new AtomicInteger(rs.getInt("count"));
                            atomicInteger.getAndIncrement();
                            count = atomicInteger.get();
    
                            AtomicInteger atomicVersion = new AtomicInteger(rs.getInt("version"));
                            version = atomicVersion.get();
                        }
    
                        // 更新数据
                        String updateSql = "update test set count = ?,version = ? where id = ? and version = ?";
                        int newVersion = version + 1;
                        psUpdate = connection.prepareStatement(updateSql);
                        psUpdate.setInt(1, count);
                        psUpdate.setInt(2, newVersion);
                        psUpdate.setInt(3, id);
                        psUpdate.setInt(4, version);
                        Integer res = psUpdate.executeUpdate();
    
                        if (res > 0) {
                            break;
                        }
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
    
            try {
                psUpdate.close();
                psQuery.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    

    在更新数据的时候判断version是否正确,如果正确则更新数据和version;否则循环判断。

    测试:

        public static void main(String[] args) throws InterruptedException {
            int corePoolSize = 2;
            ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());
     
            for (int i = 0; i < 2; i++) {
                TaskOptimisticLock task = new TaskOptimisticLock();
                pool.execute(task);
            }
    
            // 线程池不再接收新任务,但线程池中的任务继续执行
            pool.shutdown();
    
            // 阻塞当前线程直到 线程池中所有任务完成 或 超时 或 当前线程被中断
            pool.awaitTermination(5, TimeUnit.MINUTES);
    
            System.out.println("DONE!");
        }
    

    结果:
    count = 2000,version = 2000。和预期一致。

    相关文章

      网友评论

          本文标题:乐观锁和悲观锁

          本文链接:https://www.haomeiwen.com/subject/mkimxqtx.html