美文网首页HBase我爱编程
HBase学习 - Procedure

HBase学习 - Procedure

作者: aaron1993 | 来源:发表于2017-09-03 14:31 被阅读0次

    本文基于hbase-1.3.0源码

    1. 前言

    写在前面,hbase有两个地方有Procedure:

    1. 一个Procedure的类的路径是org.apache.hadoop.hbase.procedure2.Procedure。 这是一个抽象类,它的一系列实现类。诸如DeleteTableProcedureDeleteColumnFamilyProcedure等等,HMaster将对table的元数据修改作为一个Procedure来实现,一个Procedure又涉及到若干个可以嵌套的Subprocedure,一个procedure的成功一定是在所有subprocedure成功后才算成功,某个subprocedure失败则需要回滚整个过程,是不是有点事务的特征。
    2. 一个Procedure的类的路径是org.apache.hadoop.hbase.procedure.Procedure,这个Procedure是一个分布式的实现,一个Procedure同样有一些Subprocedure,而这些Subprocedure的执行则分布在多个HRegionServer上,和1中一样只有所有subprocedure成功执行后一个procedure才能成功,在其中某一个subprocedure失败后,需要有一种机制去通知其他成功执行的subprocedure,这些成功执行subprocedure根据需要可能需要执行回退操作。

    本文讲的是2中procedure。

    2. Procedure设计原理

    不清楚该怎么翻译procedure比较好,我的理解是它有点像存储过程。实际上用户也可以像使用存储过程一样自定义Procedure让Hbase去加载,然后在Hbase启动后通过rpc调用Procedure,并且可以获得Procedure执行的结果。

    「前言」2 中提到Procedure的执行过程中,会产生多个subprocedure分布到不同的HRegionServer上执行,是完全的分布式执行。分布式执行也必然会面对诸如网络分区,部分执行成功,以及全局状态协调等等处理不好会导致一致性的问题。

    Procedure的实现有点类似两阶段提交(2 Phase Commit,下面简称2PC), 2PC是实现分布式事务的一种经典方法,关于2PC可以参考一下2PC, 这里简单介绍:

    2PC中存在两种角色:协调者和参与者(对应到本文Procedure和Subprocedure)。2PC分为两个阶段:
    1. 第一阶段为prepare阶段,协调者发起提议(proposal),询问所有参与者是否接收提议,
       参与者接收到提议后根据自身情况决定恢复协调者肯定或者否定的答复。
    2. 第二阶段,协调者等待所有参与者的回复,如果收到所有参与者肯定的答复,就会通知所有参与者提交之前发起的提议。
       否者不论是收到否定的答复或者等待超时,都同志所有参与者终止提议。
    
    2PC在协调者不出现问题的情况下可以保证最终一致性。但是协调者在prepare阶段发起提议后奔溃,没有新的协调者替换导致所有参与者阻塞。
    又或者在第二阶段协调者通知第一个参与者提交后协调者和收到提交请求的参与者都奔溃,新的协调者起来后无从判断上次提议状态,从而都会出现不一致。
    
    HBase里Procedure通过zookeeper解决了这些问题。
    

    3. 设计细节

    3.1 基本概念

    「2」中提到procedure是分布式执行的,并且实现了2PC,有协调者和参与者两种角色,两种角色分别运行在HMaster和HRegionServer上,下文会分别讲HMaster和HRegionServer上不同角色的实现,这其中又会涉及到以下的一些类:

    1. HMaster上运行

    • MasterProcedureManagerHost
      名字是XXXHost的一般都是提供运行环境,MasterProcedureManagerHost在HMaster初始化过程中创建:

      1. 调用loadProcedures然后它会默认加载SnapshotManagerMasterFlushTableProcedureManager这两个procedure管理类(这两个管理类一个负责管理snapshot procedure,一个管理flush table的procedure)。
        除了默认加载以外,还会加载xml配置项hbase.procedure.master.classes中用户自定义的XXXManager。
      2. 加载好这些manager类之后,就是调用它们的initialize方法完成初始化了。
    • MasterProcedureManager
      这是一个抽象类,继承自抽象类ProcedureManager。它的实现类比如上面提到的SnapshotManager,运行在HMaster上,由MasterProcedureManagerHost加载,这些manager负责启动并管理同一种类的Procedure,因为一个Procedure可以被客户端调用多次。
      用户如果实现自己的XXXManager,那么提供给HMaster运行的就需要实现MasterProcedureManager,它有如下的抽象方法需要实现:

      1. public abstract String getProcedureSignature();  
         它应当返回一个唯一值,客户端调用Procedure是通过对应的Manager完成的,这就需要通过这个signature识别manager。
      2. public abstract void initialize(MasterServices master, MetricsMaster metricsMaster)
        MasterProcedureManagerHost加载好这个类并实例化类对象后会调用这个方法完成初始化工作
      3. public void execProcedure(ProcedureDescription desc) ;
         客户端通过rpc有两种rpc方法去调用一个procedure,这是其中一种,没有返回值,参数类型ProcedureDescription定在protobuf文件中,下面是它的定义:
           message ProcedureDescription {
                   required string signature = 1;  xxxManager的getProcedureSignature返回值
                   optional string instance = 2;   相当于给每次调用的procedure取一个名字
                   optional int64 creation_time = 3 [default = 0];
                  repeated NameStringPair configuration = 4;  name:value形式的参数
            }
         HBase只是替我们实现了2PC框架,提供一些帮助类,至于调用的procedure具体做什么用户应该自己在exeProcedure中实现。
      
    • Procedure(org.apache.hadoop.hbase.procedure.Procedure
      上面提到用户自定义的procedure,在HMaster端需要继承MasterProcedureManager,然后在execProcedure中去实现核心逻辑,一个完整的procedure还需要分布在hregionServer上的Subprocedure们配合。HBase提供了2pc实现然用户能够使用它以使得一个procedure中各个subprocedure配合完成,类Procedure,我的理解就是HBase提供2PC实现的核心,下面是一段这个类介绍注释:

      This class encapsulates state and methods for tracking and managing a distributed procedure, as well as aborting if any member encounters a problem or if a cancellation is requested

      Procedure有2个重要方法:

      1. sendGlobalBarrierStart
        相当于2PC prepare阶段,调用这个方法等待所有参与者(Subprocedure)完成prepare。
      2. sendGlobalBarrierReached
        相当于2PC的commit阶段,调用这个方法等待所有参与者(Subprocedure)commit完成。
        关于Procedure的具体实现原理下文会介绍。

    2. HRegionServer上运行
    HRegionServer上基本都会有一个与HMaster相对应的类实现,一个procedure的全部过程是需要RegionServer和HMaster配合完成的。

    • RegionServerProcedureManagerHost
      对应MasterProcedureManagerHost,它在HRegionServer启动过程中完成创建和初始化:

      1. 调用loadProcedures,加载用户配置项hbase.procedure.regionserver.classes定义的procedureManager,运行在HRegionServer上的procedureManager必须要继承RegionServerProcedureManager, 这些XXXManager和HMaster上的XXXManager对应,相当于管理procedure中的subpProcedure(参与者)。
        处理用户配置的以外,还会默认加载RegionServerSnapshotManagerRegionServerFlushTableProcedureManager,对应HMaster上默认加载的SnapshotManagerMasterFlushTableProcedureManager.
      2. 调用initialize,方法中调用每一个加载的XXXManager的initialize方法完成初始化。
      3. 调用start, start方法中调用每一个XXXManager的start方法。
    • RegionServerProcedureManager
      对应MasterProcedureManager,同样是一个抽象类,同样继承自ProcedureManager。客户端调用procedure时,总是会先走到MasterProcedureManager的具体实现类上执行execProcedure,在execProcedure的实现逻辑里使用Procedure这个类提供的方法完成协调者的工作。接着就是参与者RegionServerProcedureManager的工作:接受协调者发送的两个阶段的请求,处理请求,反馈给协调者。RegionServerProcedureManager有两个主要的抽象方法需要实现:

       public abstract void initialize(RegionServerServices rss) throws KeeperException;
      
       public abstract void start();
       这两个方法正是RegionServerProcedureManagerHost的initialize和start时会调用的。
       由于在MasterProcedureManager的execProcedure中可以使用Procedure这个类提供的方法完成两个阶段的协调。那么在start方法中,我们应该可以去接受协调者的两个阶段请求,然后做出处理。
      
    • Subprocedure
      这是一个抽象类,同时它继承Callable接口,参与者的重要逻辑应该继承Subprocedure,并实现它的三个主要方法:

       1. abstract public void acquireBarrier() throws ForeignException;
        prepare阶段参与者接收到协调者prepare请求后需要在这个方法里完成主要逻辑。抛出异常意味着否定的答复。
       2. abstract public byte[] insideBarrier() throws ForeignException;
         第二阶段,意味着所有参与者(也就是运行在其他RegionServer上的Subprocdure完成了prepare阶段acquireBarrier调用),进入提交阶段。
         这是一个有返回值的方法,它的返回值会传递给协调者,也就是Procedure。
       3. abstract public void cleanup(Exception e);
         以上两个阶段出现异常,都会调用这个方法完成本地的一些清理操作:资源释放,已完成操作的回滚等等。
      
      以上方法都在call()方法中完成,作为用户来说不需要关心,如何接受协调者 的请求,以及回复协调者。call方法中完成了这些调用。
      call 方法有以下调用:
       -> acquireBarrier
       -> rpcs.sendMemberAcquired(this); 参与者答复协调者prepare肯定请求。
       -> waitForReachedGlobalBarrier();等待所有参与者完成prepare阶段
       -> insideBarrier
       -> rpcs.sendMemberCompleted();通知协调者,当前参与者完成第二阶段  commit
      以上过程出现异常,都会调用cancel通知协调者取消本次procedure,再调用cleanup完成本地清理工作。
      
    • ProcedureMember
      ProcedureMemeber负责创建、运行、管理subprocedure。

    3.2 细节 - 基于zookeeper的实现

    尽管「3.1」中提到了master端(协调者),使用Procedure这个类来协调两个阶段。regionserver端(参与者)在Subprocedure完成具体逻辑, 使用ProcedureMember来管理Subprocedure,但没有涉及2PC的实现。
    HBase目前提供了基于Zookeeper实现的2PC, 当然由于运行在HMaster和HRegionServer上的XXXProcedureMananger都完全是有用户自己实现,用户也可以基于其他途径实现2PC, 然后替换掉Procedure类中的zk的实现。
    下面主要讲一下基于zookeeper的实现,同样分为HMaster和HRegionServer量方面。

    3.2.1 HMaster端实现

    1. Procedure
    由于前文提到XXXProcedureManager主要使用Procedure类去实现两个阶段的协调,先看看Procedure类。下面是Procedure的核心成员:

      1. final private String procName;
         每创建一个新的procedure都应该有一个不同的名称,客户端rpc调用procedure时可以指定,参考前面说的protobuf定义文件ProcedureDescription的instance成员。
      2.final private byte[] args;
         args参数是在prepare请求阶段发送给参与者的,所有的参与者都会收到参数。
      3. final CountDownLatch acquiredBarrierLatch;
         final CountDownLatch releasedBarrierLatch;
         final CountDownLatch completedLatch;
         这三个计数同步器,分别用来等待所有参与者完成prepare,完成commit,等待procedure结束。
      4. private final List<String> acquiringMembers;
         private final List<String> inBarrierMembers;
         private final HashMap<String, byte[]> dataFromFinishedMembers;
            acquiringMembers中保存进入prepare阶段的参与者;
            inBarrierMembers完成prepare需要进入commit阶段的参与者;
            dataFromFinishedMembers保存完成commit后的参与者返回的数据,参考前文Subprocedure # insideBarrier方法返回值。
      5. private ProcedureCoordinator coord;
          看类的名字也知道Procedure使用它来完成真正的两个阶段的协调工作.
          它还负责Procedure的创建以及运行。下文重点讲一下ProcedureCoordinator。
    

    2. ProcedureCoordinator
    ProcedureCoordinator负责创建Procedure,并且提交Procedure到线程池运行,反过来Procedure又调用ProcedureCoordinator去完成所有subprocedure的两个阶段的协调。
    上文提到Procedure的两个主要方法:

    1. sendGlobalBarrierStart  -- 请求所有参与者prepare
    2. sendGlobalBarrierReached -- 请求所有参与者commit
    
    这两个方法里面分别调用了:
    1. coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
    - 通知所有参与者也就是this.acquiringMembers 进入prepare阶段
    2. coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
    - 所有参与者完成prepare,通知它们进入commit阶段。
    
    coord.getRpcs返回ProcedureCoordinatorRpcs,这是一个接口,显然它是委派它来通知所有参与者进入prepare,再进入commit阶段。
    
    HBase提供了ProcedureCoordinatorRpcs一种基于Zookeeper的实现:ZKProcedureCoordinatorRpcs 。
    而且现在运行在HMaster上的各种XXXProcedureManager都使用这种基于zookeeper的实现。
    

    3. ZKProcedureCoordinatorRpcs
    它实现了接口ProcedureCoordinatorRpcs,有如下核心方法:

    1. boolean start(final ProcedureCoordinator listener);
      - ProcedureCoordinator在构造函数中调用start
    1. sendGlobalBarrierAcquire(Procedure procName, byte[] info, List<String> members) 
      - 通知进入prepare阶段, info是传递给各个参与者的信息,members即参与者名称。
    2. sendGlobalBarrierReached(Procedure procName, List<String> members)
      - 通知进入commit阶段
    3. sendAbortToMembers(Procedure procName, ForeignException cause) 
      - 参与者失败时调用,通知自身无法完成本次procedure。
    4. resetMembers(Procedure procName) 
      - 完成procedure后调用,通知参与者重置自身的状态
    

    下面说说ZKProcedureCoordinatorRpcs怎么基于zk实现了这些方法:
    它有如下成员:

    ZooKeeperWatcher watcher; -- zookeeper访问客户端
    String procedureType; -- HMaster上的XXXProcedureManager创建ZKProcedureCoordinatorRpcs一般使用自身getProcedureSignature返回值初始化这个域,这个字段需要保持唯一
    String coordName;
    -- 不是很重要,一般就是当前运行的server名称。
    ProcedureCoordinator coordinator; 
    -- ProcedureCoordinator实例,也就是它们ProcedureCoordinator和ZKProcedureCoordinatorRpcs互相持有对方。
    
    1. start方法
    final public boolean start(final ProcedureCoordinator coordinator) {
        if (this.coordinator != null) {
          throw new IllegalStateException(
            "ZKProcedureCoordinator already started and already has listener installed");
        }
        this.coordinator = coordinator;
    
        try {
          /**
            假设procedureType是pt, ZKProcedureUtil构造函数会在zk上创建下面三个node:
              1. acquiredZnode : /hbase/pt/acquired
              2. reachedZnode: /hbase/pt/reached
              3. abortZnode: /hbase/pt/abort
           所以基于zk的实现的2PC是通过node的改变来通知所有参与者当前处在哪一个阶段。
          */
          this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
            @Override
            // nodeCreated方法会在zk上新的node创建时回调
            public void nodeCreated(String path) {
              //判断一下node的路径是不是/hbase/pt,不是的话表示node改变和当前procedure没关系
              if (!isInProcedurePath(path)) return;
              LOG.debug("Node created: " + path);
              logZKTree(this.baseZNode);
             // 判断新建的node是不是符合路径/hbase/pt/acquired/{procedureName}/{memberName}. 
             // 关于procedureName,每一个procedure调用都会有一个新的名称.  memberName即参与者名称。
             // 这个路径的创建表明有一个参与者完成prepare阶段。后面讲到参与者部分时会提到参与者完成prepare后会创建这个路径。
              if (isAcquiredPathNode(path)) {
                  // 通知一下一个参与者完成prepare,毕竟Procedure 还阻塞在acquiredBarrierLatch上等待参与者都完成prepare阶段。
                coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
                  ZKUtil.getNodeName(path));
             // 和上面的if类似,判断新建的node符不符合/hbase/pt/reached/{procedureName}/{memberName}. 符合表明一个参与者完成commit阶段。
              } else if (isReachedPathNode(path)) {
                String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
                String member = ZKUtil.getNodeName(path);
                // get the data from the procedure member
                try {
                  /**
                    前面说到Subprocedure # insideBarrier会有返回值,这个返回值被设置成node的data,此处解析出返回值。
                  */
                  byte[] dataFromMember = ZKUtil.getData(watcher, path);
                  // ProtobufUtil.isPBMagicPrefix will check null
                  if (dataFromMember != null && dataFromMember.length > 0) {
                    if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
                      ForeignException ee = new ForeignException(coordName,
                        "Failed to get data from finished node or data is illegally formatted:"
                            + path);
                      coordinator.abortProcedure(procName, ee);
                    } else {
                      dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
                        dataFromMember.length);
                      LOG.debug("Finished data from procedure '" + procName
                        + "' member '" + member + "': " + new String(dataFromMember));
                     /**
                        通知一个参与者完成commit,此时Procedure阻塞在releasedBarrierLatch等待所有参与者完成commit。
                     */
                      coordinator.memberFinishedBarrier(procName, member, dataFromMember);
                    }
                  } else {
                    coordinator.memberFinishedBarrier(procName, member, dataFromMember);
                  }
                } catch (KeeperException e) {
                  ForeignException ee = new ForeignException(coordName, e);
                  coordinator.abortProcedure(procName, ee);
                } catch (InterruptedException e) {
                  ForeignException ee = new ForeignException(coordName, e);
                  coordinator.abortProcedure(procName, ee);
                }
              } else if (isAbortPathNode(path)) {
                abort(path);
              } else {
                LOG.debug("Ignoring created notification for node:" + path);
              }
            }
          };
          zkProc.clearChildZNodes();
        } catch (KeeperException e) {
          ...
        }
       ...
      }
    
    1. sendGlobalBarrierAcquire 通知参与者prepare
    // 1. info 是传递给进入prepare的subprocedure的
    // 2. nodeNames 即所有参与者,
    final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
          throws IOException, IllegalArgumentException {
        String procName = proc.getName();
    
        String abortNode = zkProc.getAbortZNode(procName);
        try {
          // 检查/hbase/pt/abort/{procedureName}是否存在,存在则表明放弃procedureName这个procedure。
          if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
            abort(abortNode);
          }
        
        } catch (KeeperException e) {
          ...
        }
    
        // node: /hbase/pt/acquired/{procedureName}
        String acquire = zkProc.getAcquiredBarrierNode(procName);
        
        try {
          byte[] data = ProtobufUtil.prependPBMagic(info);
          // 在zk上创建/hbase/pt/acquired/{procedureName}节点
          // 后面在说subprocedure会提到,subprocedure会监控这个阶段判断是不是一个新的procedure启动了,一旦监控到这个节点参与者就进入prepare阶段
          ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
          
          for (String node : nodeNames) {
            // znode : /hbase/pt/acquired/{procedureName}/{memberName}
            String znode = ZKUtil.joinZNode(acquire, node);
            
            // 检查/hbase/pt/acquired/{procedureName}/{memberName}是否存在,一旦存在表明memberName这个参与者完成prepare。
            if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
             //通知一个member完成prepare
              coordinator.memberAcquiredBarrier(procName, node);
            }
          }
        } catch (KeeperException e) {
          ...
        }
      }
    
    1. sendGlobalBarrierReached
      这里不再贴代码,它和sendGlobalBarrierAcquire工作过程差不多:

      • 创建/hbase/pt/reached/{procedureName}这个node,所有监控这个node的参与者因此进入commit阶段。
      • 对所有的member, 检查/hbase/pt/reached/{procedureName}/{memberName}是否存在,存在表明memberName这个参与者完成commit。
    2. 总结
      基于zk的实现, zk上会存在如下node:

      /hbase/{procedureSignature}
                            | - /required
                                     |- /{procedureName}
                                                | - /{memberName-1}
                                                   ...
                                                | - /{memberName-n}
                            | - /reached
                                     |- /{procedureName}
                                                | - /{memberName-1}
                                                   ...
                                                | - /{memberName-n}
                            | -/abort
                                   | - /{procedureName}
                                        
      1. 协调者创建/required/{procedureName}
      2. 参与者监听到/required/{procedureName}的存在后,进入prepare阶段
      3. 参与者memberName-i调用Subprocedure # acquireBarrier
      4. 参与者memberName-i完成prepare阶段,创建/required/{procedureName}/{memberName-i}这个node通知协调者完成prepare
      4. 协调者等待所有参与者完成prepare,也就是n个参与者创建了memberName-1 to memberName-n这n个子节点。
      5. 协调者创建/reached/{procedureName},通知参与者进入commit
      ...
      

    3.2.2 HRegionServer端实现

    1.ZKProcedureMemberRpcs
    3.1 # 2节中提到Subprocedure使用rpcs来响应协调者Procedure。rpcs对应接口ProcedureMemberRpcs, ZKProcedureMemberRpcs实现了这个接口,该接口有如下主要方法:

    1. void start(final String memberName, final ProcedureMember member);
        - 完成一些初始化或者线程池启动任务,一般是运行在HRegionServer上的XXXProcedureManager会在其start方法中完成这个方法的调用。
    2. void sendMemberAborted(Subprocedure sub, ForeignException cause) throws IOException;
        - 通知协调者放弃
    3. void sendMemberAcquired(Subprocedure sub) throws IOException;
        - 通知协调者,完成prepare
    4. void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException;
        - 通知协调者,完成commit
    上后三个方法也就是Subprocedure使用来反馈给协调者状态的方法。
    

    下面是ZKProcedureMemberRpcs的核心成员:

    1. protected ProcedureMember member;
       用于创建,运行subprocedure
    

    看看ZKProcedureMemberRpcs是怎么实现这些接口的,下面是ZKProcedureMemberRpcs构造函数:

    public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
          throws KeeperException {
        // 监控zookeeper node变化,上文协调者端‘ ZKProcedureCoordinatorRpcs’也使用了, procType也就是getProcedureSignature返回值
        this.zkController = new ZKProcedureUtil(watcher, procType) {
          // 新node创建是回调
          @Override
          public void nodeCreated(String path) {
            // 如果不是 /hbase/{procedureSignature}路径,表明不是procedure相关的事件
            if (!isInProcedurePath(path)) {
              return;
            }
            // 如果是新建acquiredNode: /hbase/{procedureSignature}/required
            // 按照上文讲述ZKProcedureCoordinatorRpcs会在start方法中创建,
            // 也就是在运行在HMaster上的XXXProcedureManager初始化过程中创建。
            if (isAcquiredNode(path)) {
              // 这个方法查看acquiredNode的子节点,由于子节点名字是procedureName
              //表示一个新的procedure, 这个方法里获取子节点,使用ProcedureMember启动创建启动Subprocedure.
              waitForNewProcedures();
              return;
            } else if (isAbortNode(path)) {
              // 监控/hbase/{procedureSignature}/abort节点
              watchForAbortedProcedures();
              return;
            }
            String parent = ZKUtil.getParent(path);
            // 如果父节点是/hbase/{procedureSignature}/reached
            // 表示协调者通知procedureName这个subprocedure进入commit阶段
            // 由于ProcedureMember管理所有subprocedure,委托ProcedureMember去通知相应subprocedure去调用insideBarrier完成commit
            if (isReachedNode(parent)) {
              receivedReachedGlobalBarrier(path);
              return;
            } else if (isAbortNode(parent)) {
              abort(path);
              return;
            } else if (isAcquiredNode(parent)) {
            // 如果父节点是/hbase/{procedureSignature}/required
            // 表示协调者通知procedureName这个subprocedure进入prepare阶段
            // 由于ProcedureMember管理所有subprocedure,委托ProcedureMember去通知相应subprocedure去调用acquireBarrier完成prepare
              startNewSubprocedure(path);
            } else {
              LOG.debug("Ignoring created notification for node:" + path);
            }
          }
          
          @Override
          public void nodeChildrenChanged(String path) {
            if (path.equals(this.acquiredZnode)) {
              LOG.info("Received procedure start children changed event: " + path);
              waitForNewProcedures();
            } else if (path.equals(this.abortZnode)) {
              LOG.info("Received procedure abort children changed event: " + path);
              watchForAbortedProcedures();
            }
          }
        };
      }
    
    1. sendMemberAcquired
      前文提到Subprocedure调用acquireBarrier完成prepare阶段后,会使用这个方法通知协调者自己完成prepare。
      同时在「3.2.1」最后提到协调者端ZKProcedureCoordinatorRpcs会在prepare阶段一直监听 /hbase/{procedureSignature}/acquire,直到该node下n个参与者都创建了子节点memberName-i(0 <= i < n)。

    显然这个方法里就是在 /hbase/{procedureSignature}/acquire下以当前member name创建新的节点。

    1. sendMemberCompleted
      和上面一样,Subprocedure调用insideBarrier完成commit之后,调用这个方法在/hbase/{procedureSignature}/reached/下创建以当前member name命名的新节点通知协调者。

    以上,差不多就是HBase procedure实现两阶段协议的过程。
    总结一下用户如果自定义Procedure需要实现一下部分:

    1. master端
      • 实现MasterProcedureManager抽象:
        • initialize 初始化工作
        • getProcedureSignature 返回一个唯一的名称
        • execProcedure, 中创建Procedure,创建ProcedureCoordinator,使用ProcedureCoordinator提交运行Procedure。ProcedureCoordinator又委托ZkProcedureCoordinatorRpcs来协调两个阶段。
    2. hregionserver端:
      • 实现RegionServerProcedureManager抽象类,
        • 实现initialize 初始化,比如创建ProcedureMember,需要提供给ProcedureMember一个工厂类创建Subprocedure。 创建ZKProcedureMemberRpcs实例,这个实例委托ProcedureMember创建、管理subprocedure。
        • getProcedureSignature
          返回值应该和对应的Master上的XXXProcedureManager一样
        • start方法
          启动ZKProcedureMemberRpcs实例,它会开始监听相关节点。
      • 实现Subprocedure抽象类
        • 实现acquireBarrier方法,完成prepare阶段,比如提前锁住一些需要提交的资源
        • 实现insideBarrier方法,完成commit阶段。

    以上可以看出master和regionsever上分别使用ZkProcedureCoordinatorRpcs和ZKProcedureMemberRpcs来实现基于zk的2pc,用户也可以实现接口ProcedureCoordinatorRpcs使用其他途径完成2PC的过程。

    相关文章

      网友评论

        本文标题:HBase学习 - Procedure

        本文链接:https://www.haomeiwen.com/subject/qgneqxtx.html