美文网首页
JanusGraph的锁机制

JanusGraph的锁机制

作者: 过雨神 | 来源:发表于2019-01-03 14:59 被阅读21次

    PS:简书的markdown与笔记格式不太一样,原文请戳这里

    一、锁的实现方式

    涉及的类:

    Locker -> AbstractLocker -> ConsistentKeyLocker

    LocalLockMediator

    1. Locker接口

    文档中对这个接口的解释:

    该接口代表线程安全的任意的锁,可能是在JanusGraph进程中的,也可能是在多个进程之间的。Lock本身就是一个确认了的KeyColumn的实例。如果两个KeyColumn使用了同一个Lock,则这两个KeyColumn调用equals方法应该返回true。

    获取锁的线程是通过StoreTransaction来确认的。

    该接口使用三步锁模型来获取锁,支持阻塞锁和非阻塞锁。不过不是所有的StoreTransaction都需要锁并使用这个接口。如果StoreTransaction需要一个或者多个锁,JanusGraph会按照以下顺序调用接口中的方法(tx代表每一步中的同一个StoreTransaction):

    writeLock(kc, tx) 使用tx事务对象获取对某个KeyColumn的锁。每个KeyColumn对象都调用一次或多次。

    checkLocks(tx) 检查当前的事务之前调用的writeLock方法实际上有没有成功。如果用户提交事务或者用户放弃了这个事务时,会调用一次。

    deleteLocks(tx) 释放当前事务所持有的锁。不论checkLocks方法有没有调用,这个方法都会调用一次。

    2. AbstractLocker接口:

    Doc中的解释:

    构建Locker的抽象基础类。实现了线程之间的锁(使用LocakLockMediator)。进程之间的锁由子类来实现。

    AbstractLocker中实现了Locker接口中的方法。针对这些实现的方法,又添加了几个新的方法由子类实现(例如,Locker中是writeLock方法,AbstractLocker类中,又添加了writeSingleLock方法,此方法在writeLock中被调用)。而实现的Locker接口中的方法都需要调用新定义的这几个方法。

    publicabstractclassAbstractLocker<SextendsLockStatus>implementsLocker{abstractSwriteSingleLock(KeyColumnlockID,StoreTransactiontx);abstractvoid checkSingleLock(KeyColumnlockID,SlockStatus,StoreTransactiontx);abstractvoid deleteSingleLock(KeyColumnlockID,SlockStatus,StoreTransactiontx);}

    3. ConsistentKeyLocker的使用

    ConsistentKeyLocker实现了AbstractLocker中定义的方法。

    API doc的介绍

    ConsistentKeyLocker是一个全局的锁,通过AbstractLocker中的方法实现了线程之间的锁的竞争。并通过使用KeyColumnValueStore来进行数据的读和写实现了进程之间的锁的竞争。

    加锁的过程如下:

    加锁是通过两个阶段来完成的:首先在进程之间的线程中获取锁,然后再JanusGraph Cluster的多个进程之间来获取锁。

    阶段一:线程之间的锁的竞争

    在一个共享的进程中,线程的锁的竞争是由LocalLockMeditator来“仲裁(原文是arbitrated)”的。这个仲裁者使用concurrent包中的类确保对于任何一个KeyColumn,在任何时间段内最多只有一个线程持有他的锁。这段代码是在AbstractLocker中。

    阶段二:进程之间的锁的竞争

    如果在阶段一中,meditator发出信号表示当前线程持有了进程内的锁,则接下来会通过读和写KeyColumnValueStore来检查是否当前进程是唯一持有该锁的进程。在Cassandra或者HBase中,都有专用的store来专门存储锁数据(BigTable模型中,store一般表示column family)。

    进程之间竞争锁所涉及的IO操作:

    写一个column来存储key(KeyColumn.getKey()+KeyColumn.getValue()), column(大致的时间戳+进程的rid)和value(0的byte表示)。

    如果写失败了或者超过了LockWait的实现,就重试写操作,带一个修改的时间戳,直到超出了配置的重试次数(获取失败)或者在LockWait的时间内完成了写操作(但写操作成功了不代表获取锁成功了)。

    如果需要的话,执行等待,直到lockWait的时间耗尽,或者写成功。

    获取这个key的所有的column。

    擦除掉timestamp已经过了lockExpire的记录。

    如果我们的column是第一个读的column,或者只有当前的rid持有该column(阶段一的meditator保证了一个KeyColumn在一个进程中只有一个线程能获取到锁),则获取锁成功。否则失败。

    释放锁时,删除掉记录中的该条column(注意不是整个key都删掉)。

    结合代码:

    publicvoidwriteLock(KeyColumn lockID, StoreTransaction tx)throwsTemporaryLockingException, PermanentLockingException{    .....//调用meditator先赢得线程之间的锁的竞争。if(lockLocally(lockID, tx)){booleanok =false;try{//往HBase中写一条ColumnS stat = writeSingleLock(lockID, tx);//修改local lock的过期时间lockLocally(lockID, stat.getExpirationTimestamp(), tx);// lockState是记录锁状态的缓存lockState.take(tx, lockID, stat);            ok =true;        }catch(){//对异常的处理}finally{if(!ok){                unlockLocally(lockID, tx);                ....            }        }    }else{thrownewPermanentLockingException("Local lock contention");    }}

    4. LocalLockMediator的实现:

    LocalLockMediator是用于在同一个JVM之间的事务竞争锁而使用的。其底层是基于一个ConcurrentHashMap的putIfAbsent。putIfAbsent返回一个Map中的Value,这个value如果是null,代表ConcurrentHashMap中原本不存在这个key。但如果返回不是null,则放弃put操作。当key不存在时,putIfAbsent成功了就是获取锁成功了。

    二、 Lock的使用场景:

    在Backend类中,有一个ConcurrentHashMap用于保存Locker:

    publicclassBackendimplementsLockerProvider,AutoCloseable{    privatefinalFunction lockerCreator;    privatefinalConcurrentHashMap lockers =newConcurrentHashMap<>();@Overridepublic Locker getLocker(StringlockerName) {        Preconditions.checkNotNull(lockerName);        Locker l = lockers.get(lockerName);if(null== l) {            l = lockerCreator.apply(lockerName);finalLocker x = lockers.putIfAbsent(lockerName, l);if(null!= x) {                l = x;            }        }returnl;    }}

    而Backend中的getLocker方法最终所使用的地方则是ExpectedValueCheckingStore。

    三、ExpectedValueChecking家族

    ExpectedValueChecking家族是指以ExpectedValueChecking开头的几个类。分别包括:

    ExpectedValueCheckingStore

    ExpectedValueCheckingStoreManager

    ExpectedValueCheckingTransaction

    ExpectedValueCheckingStore:

    该类是KeyColumnValueStore的一个包装类。KeyColumnValueStore是一个接口,该接口表示的是针对BigTable模型的数据操作接口。该接口提供了读和写数据的方法。

    ExpectedValueCheckingStore主要是对KeyColumnValueStore中的mutate方法和acquireLock方法进行了封装,其内部有一个KeyColumnValueStore变量,在这两个方法前后又加了一些逻辑,其余的方法都是直接调用KeyColumnValueStore的对应方法。其最终仍然需要依赖一个内部的KeyColumnValueStore变量。

    这个类一般是跟ExpectedValueCheckingTransaction一起,为每个StoreTransaction跟踪所有的传入acquireLock方法中的expectedValue参数。当事务调用mutate方法时,这些类会协同一起检查所有之前提供的expected value是否匹配实际的值。如果不匹配抛出异常。

    //对KeyColumnValueStore的基础代理类,所实现的继承的方法都是直接调用其内部的store变量来实现。publicclassKCVSProxyimplementsKeyColumnValueStore{        protectedfinal KeyColumnValueStore store;}publicclassExpectedValueCheckingStoreextendsKCVSProxy{finalLocker locker;//对象的初始化需要使用代理的KeyColumnValueStore,以及locker。publicExpectedValueCheckingStore(KeyColumnValueStore store, Locker locker){super(store);this.locker = locker;    }// (1)确认事务txh之前调用acquireLock获取锁是否成功// (2)如果成功了就将additions和deletetions写到底层存储的key中// (3) Deletions是在additions之前执行的。也就是说,如果某个column既有deletion也有addition,会先删除,然后添加。//如果实现类中不支持锁,则跳过锁的认证阶段,并执行后来的阶段。publicvoidmutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh)throwsBackendException{        .....    }//试图获取key value对所声明的锁。锁是随机分配的。//如果锁获取失败了,可以抛出PermanentLockingException。但这并不是强制的,如果获取锁失败了,也可以不抛出异常。锁的获取只要在KeyColumnValueStore调用mutate方法时确认获取成功即可。// expectedValue必须匹配key value代表的publicvoidacquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh)throwsBackendException{        ....    }}

    ExpectedValueCheckingTransaction

    ExpectedValueCheckingTransaction是一个StoreTransaction的实现类。支持通过LocalLockMediator来获取锁,在ExpectedValueCheckingSotre中用于读和写锁记录。

    其父类StoreTransaction代表一个事务的把手,用于唯一标识后端存储的一个事务。所有对后端的修改操作必须有一个单个的事务作为context。这样的事务能被JanusGraph中间件识别出来就是通过StoreTransaction。图的事务依赖于底层存储的事务。

    要注意StoreTransaction本身不提供任何的隔离以及一致性的保证。如果对应的后端支持的话,Graph Transaction必须自己去实现。

    在这个类中,有一个强一致的事务和一个不一致的事务。

    publicclassExpectedValueCheckingTransactionimplementsStoreTransaction{//在事务锁的阶段一致是false。事务中调用mutate或者mutateMany方法开始时设置为true。privatebooleanisMutationStarted;//用于对锁相关的元数据进行读和写的事务。privatefinalStoreTransaction strongConsistentTx;//用于读和写客户端数据的事务。不保证强一致性。privatefinalStoreTransaction inconsistentTx;}

    Backend的初始化

    Backend中,关键的元数据的初始化如下,以HBase为例:

    publicclassBackendimplementsLockerProvider,AutoCloseable{//HBaseStoreManager。Backend中有多个KeyColumnValueStore的初始化,是通过该变量来实现的。这些变量各自负责hbase的一个column family的操作(所以,在mutate或者mutateMany方法中,是没有column family的)。privatefinalKeyColumnValueStoreManager storeManager;//ExpectedKeyColumnValueStoreManagerprivatefinalKeyColumnValueStoreManager storeManagerLocking;privatefinalMap stores;privatefinalConcurrentHashMap lockers =newConcurrentHashMap<>();//构造方法。构造方法调用完之后调用initialize方法。publicBackend(Configuration config){        storeManager = ...;        storeManagerLocking =newExpectedValueCheckingStoreManager(storeManager,LOCK_STORE_SUFFIX,this,maxReadTime);            }publicvoidinitialize(){//使用ExpectedValueCheckingStoreManager来创建ExpectedValueCheckingStore。创建的对象包括://IDAuthority idAuthority;//KCVSCache edgeStore//KCVSCache indexStore//KCVSCache txLogStore//KCVSConfiguration systemConfig//KCVSConfiguration userConfig}publicLockergetLocker(String lockerName){//创建ConsistentKeyLocker。}}

    总结:

    这里的设计完美的提现了面向接口编程,代理模式。

    KeyColumnValueStore的实现类:HbaseKeyColumnValueStore ExpectedValueCheckingStore其中使用了HBaseKeyColumnValueStore,作为代理,添加了调用writeLock的方法保证了进程之间只有一个线程能获取KeyColumn的锁。

    KeyColumnValueStoreManager用于创建KeyColumnValueStore(openDatebase)方法。各个不同的存储后端都需要实现mutateMany方法用于将缓存中的内容持久化到DB中。其实现类是HBaseStoreManager。

    在Backend创建systemConfig和userConfig时,会创建ExpectedValueCheckingStoreTransaction(通过ExpectedValueCheckingStoreManager.beginTransaction方法)。ExpectedValueCheckingStoreTransaction中使用的两个StoreTransaction最终都是HBaseTransaction类。至少从HBase的角度来讲,这里两个变量的设计似乎略显多余,因为HBase不是强一致的,由于CAP定理的限制,HBase采用的是最终一致性。

    关于ExpectedValueCheckingTransaction如何检查expectedValue的,可以查看checkSingleExpectedValueUnsafe这个私有方法。

    ExpectedValueCheckingStore在执行mutate方法时,会调用该检查。

    ```

    @Override

    public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {

        ExpectedValueCheckingTransaction etx = (ExpectedValueCheckingTransaction)txh;

        // 这一步会对expectedValue做检查。如果检查失败会抛异常。

        boolean hasAtLeastOneLock = etx.prepareForMutations();

        if (hasAtLeastOneLock) {

            // Force all mutations on this transaction to use strong consistency

            store.mutate(key, additions, deletions, getConsistentTx(txh));

        } else {

            store.mutate(key, additions, deletions, unwrapTx(txh));

        }

    }

    ```

    相关文章

      网友评论

          本文标题:JanusGraph的锁机制

          本文链接:https://www.haomeiwen.com/subject/wbifrqtx.html