美文网首页
(八十)java版spring cloud+spring boo

(八十)java版spring cloud+spring boo

作者: IT小跑兵 | 来源:发表于2019-06-18 09:53 被阅读0次

    电子商务平台源码请加企鹅求求:三伍三六贰四柒二伍九。基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:

    acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false

    release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true

    具体实现

    public class Lock {
     
        private static final String prefix = "lock/";  // 同步锁参数前缀
     
        private ConsulClient consulClient;
        private String sessionName;
        private String sessionId = null;
        private String lockKey;
     
        /**
         *
         * @param consulClient
         * @param sessionName   同步锁的session名称
         * @param lockKey       同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询
         */
        public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
            this.consulClient = consulClient;
            this.sessionName = sessionName;
            this.lockKey = prefix + lockKey;
        }
     
        /**
         * 获取同步锁
         *
         * @param block     是否阻塞,直到获取到锁为止
         * @return
         */
        public Boolean lock(boolean block) {
            if (sessionId != null) {
                throw new RuntimeException(sessionId + " - Already locked!");
            }
            sessionId = createSession(sessionName);
            while(true) {
                PutParams putParams = new PutParams();
                putParams.setAcquireSession(sessionId);
                if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
                    return true;
                } else if(block) {
                    continue;
                } else {
                    return false;
                }
            }
        }
     
        /**
         * 释放同步锁
         *
         * @return
         */
        public Boolean unlock() {
            PutParams putParams = new PutParams();
            putParams.setReleaseSession(sessionId);
            boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
            consulClient.sessionDestroy(sessionId, null);
            return result;
        }
     
        /**
         * 创建session
         * @param sessionName
         * @return
         */
        private String createSession(String sessionName) {
            NewSession newSession = new NewSession();
            newSession.setName(sessionName);
            return consulClient.sessionCreate(newSession, null).getValue();
        }
     
    }
    

    单元测试

    下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程获得锁之后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕之后释放锁。

    public class TestLock {
     
        private Logger logger = Logger.getLogger(getClass());
     
        @Test
        public void testLock() throws Exception  {
            new Thread(new LockRunner(1)).start();
            new Thread(new LockRunner(2)).start();
            new Thread(new LockRunner(3)).start();
            new Thread(new LockRunner(4)).start();
            new Thread(new LockRunner(5)).start();
            Thread.sleep(200000L);
        }
      
        class LockRunner implements Runnable {
     
            private Logger logger = Logger.getLogger(getClass());
            private int flag;
     
            public LockRunner(int flag) {
                this.flag = flag;
            }
     
            @Override
            public void run() {
                Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
                try {
                    if (lock.lock(true)) {
                        logger.info("Thread " + flag + " start!");
                        Thread.sleep(new Random().nextInt(3000L));
                        logger.info("Thread " + flag + " end!");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
      
    }
    

    单元测试执行结果如下:

    2017-04-12 21:28:09,698 INFO  [Thread-0] LockRunner - Thread 1 start!
    2017-04-12 21:28:12,717 INFO  [Thread-0] LockRunner - Thread 1 end!
    2017-04-12 21:28:13,219 INFO  [Thread-2] LockRunner - Thread 3 start!
    2017-04-12 21:28:15,672 INFO  [Thread-2] LockRunner - Thread 3 end!
    2017-04-12 21:28:15,735 INFO  [Thread-1] LockRunner - Thread 2 start!
    2017-04-12 21:28:17,788 INFO  [Thread-1] LockRunner - Thread 2 end!
    2017-04-12 21:28:18,249 INFO  [Thread-4] LockRunner - Thread 5 start!
    2017-04-12 21:28:19,573 INFO  [Thread-4] LockRunner - Thread 5 end!
    2017-04-12 21:28:19,757 INFO  [Thread-3] LockRunner - Thread 4 start!
    2017-04-12 21:28:21,353 INFO  [Thread-3] LockRunner - Thread 4 end!
    

    从测试结果我们可以看到,通过分布式锁的形式来控制并发时,多个同步操作只会有一个操作能够被执行,其他操作只有在等锁释放之后才有机会去执行,所以通过这样的分布式锁,我们可以控制共享资源同时只能被一个操作进行执行,以保障数据处理时的分布式并发问题。

    相关文章

      网友评论

          本文标题:(八十)java版spring cloud+spring boo

          本文链接:https://www.haomeiwen.com/subject/cxsxqctx.html