美文网首页hbase
Hbase架构师3-Hbase源码分析

Hbase架构师3-Hbase源码分析

作者: fat32jin | 来源:发表于2020-09-22 09:20 被阅读0次

    HBase的源码:

    1、集群启动
    master启动
    regionsever启动
    2、put 插入数据源码分析
    delete
    3、get 查询数据源码分析
    scan
    4、三个重要的行为:
    flush
    split
    compact
    5、寻路
    根据rowkey定位region的位置

    注意要点:

    1、版本问题:
    hbase-0.96 hbase-0.98
    hbase-1.2.x hbase-1.4.x
    hbase-2.x 这是我们选择的版本

    2、版本越低,其他非核心流程的代码相对更少,对你的干扰更少

    3、切记最好画图!

    4、关于hbase的源码是否要编译呢。
    选择性的。如果要编译:推荐安装 cygwin
    zookeeper
    hive
    hdfs
    kafka
    hbase

    HBase2.x源码分析-HMaster启动流程分析

    正常的启动历程:

    启动一个 master
    再启动 N 个 regionserver
    启动一个 master
    在这个过程中,会有选举!大概率:先启动的master会成为 active

    HBase集群启动的命令:start-hbase.sh
    这个命令的底层是:
    hbase-daemon.sh start master
    这个命令的底层:
    java org.apache.hadoop.hbase.master.HMaster arg1 arg2
    底层转到
    HMaster.main()

    class HMaster extends HRegionServer implements MasterServices
    1、HMaster 是 HRegionServer 的子类
    2、HMaster 绝大部分的服务,都是通过 MasterServices 来实现的

    比如:listTable()  craeteTable()
    

    ↓ ***▲ ▼ ——》

    HMaster正常的启动历程: 0;25:00 ~ 2:00:00
    HMaster.main() 
                   ▼ 
                new HMasterCommandLine(HMaster.class).doMain(args);
             ——》 父类 ServerCommandLine#doMain
                    ▼
                int ret = ToolRunner.run(HBaseConfiguration.create() 
                 ↓  
              HMasterCommandLine#run()
                   ▼
             return startMaster();
              ......
             return stopMaster();
                     ▲
              ——0  》上面   startMaster()  方法 启动服务器
                            ▼
                      /********                 
                     *   注释: 通过反射构造 HMaster 实例, HMaster 是 HRegionServer 的子类,
                     *   1、先看 HMaster 中的构造方法做了什么
                     *   2、再进入 Hmaster 的 run() 方法看看做了什么
                     *   3、masterClass = HMaster.class
                     */
                      if(LocalHBaseCluster.isLocal(conf)) {
                        // local模式
                       ......    
                     } else {
                           // 集群模式
                       ......    
                           HMaster master = HMaster.constructMaster(masterClass, conf);
                        ......  
                       /******** 
                     *   1、hmaster 通过 start() 启动
                     *   2、hmaster 肯定也是一个线程
                     *   3、当执行 start 方法之后,应该转到 hmaster 的 run() 继续阅读
                     */ 
                            master.start();
                            master.join();                       
                      }
                          ▲ 
                 ——0. 1  》上面  HMaster#constructMaster 方法是调用  HMaster(final Configuration conf) 构造函数                               
                         ——0. 1.1 》super(conf); // HMaster父类 HRegionServer#构造器里面的方法
                                   ▼
                                 /********        
                                  *   1、创建 RPCServer
                                  *   2、最重要的操作:初始化一个 ZKWatcher 类型的 zookeeper 实例,
                                  *        用来初始化创建一些 znode 节点
                                  *   3、启动 RPCServer
                                  *   4、创建 ActiveMasterManager  用来选举和管理hmaster的状态
                                  *   hmaster除了调用自己的构造方法,也会调用 父类hregionserver的构造方法
                                  */
                               rpcServices = createRpcServices();
                              RpcRetryingCallerFactory.instantiate(this.conf);
                             regionServerAccounting = new RegionServerAccounting(conf);
                              initializeFileSystem();
                             zooKeeper = new ZKWatcher()   
                                this.rpcServices.start(zooKeeper);
                                   //ChoreService 里面包装了一个线程池,用来处理 hmaster 或者 hre
                                this.choreService = new ChoreService(getName(), true);
                                 // ExecutorService 同样也是包装了一个线程池,用来处理其他的 RpcClient 发送过来的请求
                                  this.executorService = new ExecutorService(getName());
                                 //  启动 jetty,Region Server WebUI
                                putUpWebUI();
                                     ▲ 
                              ——》上面的   new ZKWatcher()   #构造函数
                                              ▼
                                         /**** 
                                          *   注释: 初始化一堆关于 hbase 在 zookeeper 创建的 znode 节点          
                                           *   hbae装好了之后u,默认在zookeeper有一个/hbase 的root znode
                                           *   zookeeper.znode.rs = /hbase/rs
                                          *   zookeeper.znode.draining.rs = /hbase/draining
                                          *   zookeeper.znode.backup.masters = /hbase/backup-masters
                                          *  /
                                         this.znodePaths = new ZNodePaths(conf);
                                        .......
                                           /* 注释: 获取 zk 链接
                                              *   recoverableZooKeeper 这个实例里面有一个 ZooKeeper zk 的成员变量
                                             *   HMaster HRegionserver 都是通过 recoverableZooKeeper 的 
                                            *  zk 实例来完成操作 zookeeper
                                            * /
                                     this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
                                      ......
                                    /****
                                      *   注释: 创建一堆必要的父节点
                                      *   1、baseZNode = /hbase
                                      *   2、rsZNode = /hbase/rs
                                      *   .............
                                      */
                                       createBaseZNodes();
                  
                          —— 0.1.2 》 HMaster 自己构造器里面的代码
                                   /*** 
                                     *   注释: 创建 activeMasterManager
                                     *   帮助 HMaster 去选举  Active 角色
                                    *   监控 HMaster的状态,也要去监控 zookeper 集群中的对应的  active master 的znode
                                     */ 
                                   if(!conf.getBoolean("hbase.testing.nocluster", false)) {
                                      this.activeMasterManager = 
                                         new ActiveMasterManager(zooKeeper, this.serverName, this);
                                    } else {
                                            this.activeMasterManager = null;
                                   }
            —— 0. 2 》 回到 上面 0处  master.start(); 方法,即运行HMaster 的run()方法
                                   ▼                          
                            /*
                             *   注释: 启动 Web 服务
                             */
                            int infoPort = putUpJettyServer();
    
                            /*
                             *   注释:重要的方法
                             */
                            startActiveMasterManager(infoPort);
                                 ▼
                  /* 
                    *   注释: 一上线,就通过 MasterAddressTracker.setMasterAddress() 方法创建代表自己存在与否的 backup znode emphamaral 节点
                  *   一上线,都不是 active master,所以先成为 backup master,然后创建代表自己的临时 znode 节点
                   *   1、如果时机条件正确,则成为 active master,则刚才创建的 临时znode 节点被 ActiveMasterManager 显示删除
                   *   2、如果不能成为 active master 不需要做什么
                    *   3、如果这个 backup master宕机,则 zookeeper 会自动删除这个临时节点
                     */
                     String backupZNode = 
                     ZNodePaths.joinZNode(zooKeeper.getZNodePaths().backupMasterAddressesZNode, 
                    serverName.toString());
                   //  :ZNodePaths.joinZNode 唯一的目的只有一个:拼接一个znode路径:
                  //   当前master一启动就立即在 /hbsae/backup-masters/ 下面创建一个临时节点
                 //   代表自己上线了。但是并没有立即成为  active  
                 
                   //  这儿是去创建 Backup Master ZNode
                  if(!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
                   。。。。。。
                   /* 重点!!: 阻塞到直到有 ActiveMaster 为止, 但是默认是 False,不会执行,
                      *   除非有人显示调整这个参数为 True
                       *   因为可能同时有多个 amster 启动。
                        *   等待是否有 activeMaster
                       */      
                 if(conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, 
                      HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { 
                                   while(!activeMasterManager.hasActiveMaster()) {                
                                     Threads.sleep(timeout);  
                      }
                    }
                  /* 注释: if 中的方法,完成注册和争抢成为 Active Master 的工作
                 */
                   @@@   if (  activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
                             ——》 ActiveMasterManager#blockUntilBecomingActiveMaster
                                             ▼
                     /** 去注册 Active Master, 记住:也是临时节点
                      *   分布式锁的方式来实现的!
                     *   MasterAddressTracker.setMasterAddress() 创建一个 临时 znode 节点,
                     *   多个 hmsater 如果同时启动,则可能同时都在创建这个 代表 active 身份的 znode 节点
                     *   谁创建成功了,谁才能成为 active
                     *  所以:如下的代码,如果能进入到 if ,就证明当前 hmaster 争抢这把分布式锁成功了。
                     *  当前 hmaster 才能成为 active                 *
                     *  接着再做一件事:就是删除之前一上线 就创建的 backup znode 节点
                     */
                    if(MasterAddressTracker.setMasterAddress(this.watcher,  // 抢锁
    
                        /* 如果注册成功,则删除自己原先创建的 backup master znode 临时节点
                         */                    
                        if(ZKUtil.checkExists(this.watcher, backupZNode) != -1) {
                            LOG.info("Deleting ZNode for " + backupZNode + " from backup master directory");
                            ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
                        }
    
                        /*  将znode保存在文件中,这将允许检查启动脚本是否崩溃
                         */                   
                        ZNodeClearer.writeMyEphemeralNodeOnDisk(this.sn.toString());
                        /*  设置 Master 为 Active 状态
                         */                  
                        startupStatus.setStatus("Successfully registered as active master.");
                        /* : 到上面为止,也只是知道有 Active Master了,但是到底是谁不知道
                     *   所以通过获取 Active Master ZNode节点的数据得知
                     */
                    String msg;
                    byte[] bytes = ZKUtil.getDataAndWatch(this.watcher, 
                                             this.watcher.getZNodePaths().masterAddressZNode);
                    if(bytes == null) {
                        msg = ("A master was detected, but went down before its address " 
                                      + "could be read.  Attempting to become the next active master");
                    } 
                                           ▲ 
                           ——》 回到@@@   if (  activeMasterManager.blockUntilBecomingActiveMaster处
                                  // 下面这个方法非常复杂重要
                                  finishActiveMasterInitialization(status);         
                                                 ▼
                                  /*  初始化 ProcedureExecutor
                                     *   将来你的各种请求,比如 createTable, listTable, put, get 等等这些用户请求,
                                     *    都是被封装成一个个的
                                     *   Procedure 来执行的。 放在线程池里面执行。
                                   *   ProcedureExecutor 命令执行器!!!
                                   */
                                   createProcedureExecutor();
                                 /*  初始化 AssignmentManager 用来帮助 HMaster 去分配那些 region
                                      给那些 regionsever 去管理
                                    *   默认的实现就是:AssignmentManager
                                    */
                                       this.assignmentManager = createAssignmentManager(this);
    
    
    ![image.png](https://img.haomeiwen.com/i11332520/735619ee163723d5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    2 RegionServer正常的启动历程: 2;00:00 ~ 2;12:00

    HBase集群启动的命令:start-hbase.sh
    这个命令的底层是:
    hbase-daemon.sh start regionserver
    这个命令的底层:
    java org.apache.hadoop.hbase.regionserver.HRegionServer arg1 arg2
    底层转到
    HMaster.main()

    class HRegionServer extends HasThread implements RegionServerServices
    1、HMaster 是 HRegionServer 的子类, HRegionServer 是 HasThread 的子类
    HasThread 是 RUnnable 的子类,所以 HMaster 和 HRegionServer 都是线程
    入口:main() run() 方法

    HRegionServer.main()
        HRegionServer 的构造方法
        HRegionServer 的 run()
            preRegistrationInitialization();
                initializeZooKeeper();
                setupClusterConnection();
                this.rpcClient = RpcClientFactory.createClient()
            reportForDuty();
                masterServerName = createRegionServerStatusStub(true);
                this.rssStub.regionServerStartup(null, request.build());
            handleReportForDutyResponse(w);
                createMyEphemeralNode();
                initializeFileSystem();
                ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
                setupWALAndReplication();
                startServices();
                startReplicationService();
            tryRegionServerReport(lastMsg, now);
                rss.regionServerReport(null, request.build());
    
    3 Put流程源码分析: 2;21:00 ~

    核心流程:

    代码入口:
    table.put(new Put())

    大致流程:
    1、首先客户端会请请求zookeepre拿到meta表的位置信息,这个meta表的region到底在那个regionserver1里面
    2、发请求请求这个regionserver1扫描这个meta表的数据,确定我要插入的数据rowkey到底在那个用户表的region里面。并且还拿到这个region在那个regioinserver2的信息
    3、发送请求,请求regionserver2扫描 当前用户表的regioin, 执行插入
    1、先记录日志
    2、写入数据到 memstore
    写到 ConcurrentSkipListMap delegatee
    3、判断是否需要进行flush
    1、再次判断是否需要进行 compact
    2、判断是否需要进行 split

    ——》开始 HTable#put()

    /* callable = ClientServiceCallable, 是 RegionServerCallable 的子类。
    * 最终执行:ClientServiceCallable.doMutate(request); 提交
    * 将来各种兜兜转转,一定会回到调用: callable.rpcCall()
    /
    ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
    this.rpcControllerFactory.newController(), put.getPriority()) {
    @Override
    protected Void \color{#FF3030}{rpcCall}() throws Exception {
    /
    * 构建一个 Put 请求
    * 仅仅只是把 put 变成 request
    /
    MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
    // 具体提交执行(使用probubuf协议序列化,并提交RPC请求)
    doMutate(request);
    ▲ ↓
    ——》 RSRpcServices#Mutate()
    /
    *

    3.1 -Region定位分析 开始 3:00:00
    • connection.locateRegion(){
    • locateMeta(tableName, useCache, replicaId);
      
    • locateRegionInMeta(tableName, row, useCache, retry, replicaId);
      
         *   1、虽然经过层层递进的寻找,发现,返回的对象类型是:BlockingStub
         *   2、最终跳转到 RSRpcServices
         *
         *   现在在探查put方法的底层实现, put方法的服务端响应者是  master regionserver ?
         *   是发送给 RegionServer 的, 是怎么确定到底发送给那个 Regionserver ?
         *
         */
                      ——》 return getStub().mutate(getRpcController(), request);
                                                ↓ 
               ###     RpcRetryingCallerImpl#callWithRetries
                                          ▼   
      
                 / *   3、这里面会获取 region 的位置
                     *   callable = ClientServiceCallable, 是 RegionServerCallable 的子类
                      *      笔记中的第一步,第二步就是这句代码实现的
                  *  1、首先客户端会请请求zookeepre拿到meta表的位置信息,
                  *  这个meta表的region到底在那个regionserver1里面
                 *  2、发请求请求这个regionserver1扫描这个meta表的数据,
                 *  确定我要插入的数据rowkey到底在那个用户表的region里面。
                *  并且还拿到这个region在那个regioinserver2的信息
                 */              
             ——0  》     callable.prepare(tries != 0);
                                                 ↓ 
                          RegionServerCallable #prepare     
                                           ▼  
                           /***  这个 row 就是 Callble 在初始化的时候拿到的 rowkey
                                *   该方法负责拿到 rowkey 在该表中的 region 的信息, 封装在 HRegionLocation 对象中
                                *   row = put 对象中的 rowkey
                               *   location = HRegionLocation (RegionServerName HRegionInfo)
                            */
                          this.location = regionLocator.getRegionLocation(row);
                                                                ↓ 
                               ConnectionImplementation#locateRegion()
                                                              ▼  
                                  if(tableName.equals(TableName.META_TABLE_NAME)) {
                                        // 如果是 hbase:meta 表,则发请求给 zookeeper              
                                         《1》     return locateMeta(tableName, useCache, replicaId);
                                   } else {
                                         //**  如果是普通表,则发请求给 meta 所在的 regionserver            
                                         《2》    return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
                                  }
                                ——1 》 ConnectionImplementation# locateMeta
                                                                  ▼   
                                               /***  定位 zookeeper 访问 meta 表所在的位置
                                                  *   registry = ZKAsyncRegistry
                                                    */        
                                              locations = get(this.registry.getMetaRegionLocation());
                                                                                          ↓ 
                                                           ZKAsyncRegistry.getMetaRegionLocation() 
                                                                                         ▼  
                                                               //**  从zookeeper中获取 meta 表的位置                    
                                                               getMetaRegionLocation(future, metaReplicaZNodes);
                                                             ——》getMetaRegionLocation
                                                                                          ▼  
                                                              /*** : 由于meta的Region可能有多个副本,因此遍历
                                                                ** /hbase/meta-region-server-i,返回所有副本对应的ServerName
                                                                   *   遍历(i, /hbase/meta-region-server-i)         
                                                                  *   因为 hbae-2.x 版本中, meta 表的 regoin 有副本的概念
                                                                      */
                                                                 for(String metaReplicaZNode : metaReplicaZNodes) 
                                                                      ......
                                                                     //  注释: getAndConvert() 帮助我们去获取数据 
                                                                       addListener(getAndConvert(path, ZKAsyncRegistry::),
                                                                       
                                                                       ——》getAndConvert()
                                                                                  addListener(zk.get(path)
                                                                               ——》zk.get()
                                                                                        ▼  
                                                                                 /*  ZKTask 加入 tasks 队列,去寻找  tasks.poll 代码,
                                                                                     *   会看到调用 zkTasks.run() 代码执行,
                                                                                      *   里面执行 doExec(ZooKeeper zk) 方法
                                                                                      */
                                                                                    tasks.add(new ZKTask<byte[]>(path, future, "get") {
                                                                                         @Override
                                                                                      protected void doExec(ZooKeeper zk) {
                                                                                       /**  终于看到 meta region 的位置是从 zk 获取的
                                                                                *   path = /hbase/meta-region-server/  meta表
                                                                                的唯一的region的位置信息
                                                                              */
                                                                              zk.getData(path, false, 
                              ——2 》 ConnectionImplementation# locateMeta 
                                              cacheLocation()
                                           ——》MetaCache.  cacheLocation()
                                                     addToCachedServers(locations);
      
             ——回到 0  处继续 》   callable.prepare(tries != 0);  
                   /** 
                 *   1、如果是元数据请求操作:MasterCallble, DDL 操作,走这儿
                 *   2、如果是数据请求操作:RegionServerCallable, DML 走这儿
                 */
                return callable.call(getTimeout(callTimeout));
                                                    ↓ 
                  RegionServerCallable.call()
                                 rpccall()
                                          ↓ 
                               HTable#put()
                                                 ▼  
                                  ClientServiceCallable<Void> callable = new ClientServiceCallable
                                    protected Void rpcCall() throws Exception {
                                           doMutate(request);
                                  ——》 ClientServiceCallable#doMutate()
                                                          ▼  
                                           /* 
         *   1、虽然经过层层递进的寻找,发现,返回的对象类型是:BlockingStub
         *   2、最终跳转到 RSRpcServices         *
         *   现在在探查put方法的底层实现, put方法的服务端响应者是  master regionserver ?
         *   是发送给 RegionServer 的, 是怎么确定到底发送给那个 Regionserver ?         *
         */
                                          return getStub().mutate(getRpcController(), request);
                                                                              ↓ 
                                                  RSRpcServices#mutate()
                                                                              ▼  
                                            /** : 获取当前 请求的 region 位置
                                             *   获取 region 的方式方法是通过 regoinName 获取 regionInfo 信息
                                              */
                                           region = getRegion(request.getRegion());
                                           /** 如果没有,提交给HRegion.put()方法处理 
                                                 *   clientTable.put(put) 
                                                 *   regioniserver.put(put)
                                                  *   region.put(put)
                                                     */
                                                 region.put(put);
      
    3.1 -Region定位分析 结束 3:23:00
                                                    ——》HRegion.put()
    
    3.2 -写入内存memstore 及Flush分析 Memstore形成HFile 开始 3:23:00
                                                                      ▼ 
                                                      HRegion#checkResources() //检查资源
                                                                 ......
                                                       /** 
                                                        *   1、checkResources(); 会涉及到flush, 
                                                       *      进行 put 操作之前检查资源是否足够
                                                        *   2、doBatchMutate(put); 是用来真正完成put操作的。
                                                       *      完成 put 操作之后,也需要判断 memstore 是不是要进行 flush
                                                       */ 
                                                  doBatchMutate(put);    
                                                ——  》HRegion#doBatchMutate
                                                     —— ... 》HRegion#batchMutate(BatchOperation<?> batchOp)
                                                                ▼
                  // 完成数据插入到 memstore 的动作 
                            3    doMiniBatchMutate(batchOp);
                                                           ......
                                // 写完数据之后,判断是否有需要进行 flush                  
                             4    requestFlushIfNeeded();
                                                         ▲
                       ——接3 》 doMiniBatchMutate(batchOp);
                                                             ▼  
                                                     //六个步骤
                                                   /*第一步
                                                      *   对BatchOperation对象上锁,返回的是一个表示
                                                     * 正在处理中的对象 MiniBatchOperationInProgress                                                       
                                                      *   HBase 是有行级的事务的!  
                                                     *     现在执行的各种  Put 的 rowkey 所对应的 这一行数据的 锁我们都得拿到
                                                    */ 
                                             miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);
                                                /*第2步   
                                                   *   更新所有操作对象的时间戳,确保是最新的。
                                                *        就只是更新 keyvalue 的时间戳为 本机服务器的时间戳!
                                                 */
                                       long now = EnvironmentEdgeManager.currentTime();
                                     batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
                                                 /*第3步   
                                                   *   初始化或构造 WAL edit对象 
                                                 */
                                          List<Pair<NonceKey, WALEdit>> walEdits = 
                                            batchOp.buildWALEdits(miniBatchOp);
                                                  /*第4步   
                                                   *  将WALEdits对象提交并持久化(即写WAL)
                                                 */
                                           mvcc.complete(writeEntry);
                                                 /*第5步   
                                                   *  写 memStore
                                                 */
                                        writeEntry = 
                                  5    batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
                                                 /*第6步   
                                                   *  完成写入操作
                                                 */
                                  6     batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
                                                         ▲
                                  ——上接  5   》 batchOp.writeMiniBatchOperationsToMemStore
                                                                                  ↓                                                          
                                                                    HRegion#
                                                                    MutationBatchOperation#
                                                                    writeMiniBatchOperationsToMemStore()
                                                                               ▼    
                                                                   super.writeMiniBatchOperationsToMemStore(
                                                                                ▼  
                                          /* 
                                           *   1、一个region其实按照列簇会分成多个不同的 store
                                            *   2、现在插入数据的时候,可以这一个miniBatch 会包含多个不同的列簇的数据
                                         *   3、不同的列簇,拥有不同的memstore
                                     *   4、到了这个步骤必须把所有的数据,区分开,不同的列簇的,数据,分别进行插入
                                  */
                                        applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);                                
                                                                                ▼  
                                    //按照列簇循环写入
                                  for(Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
                                         region.applyToMemStore(region.getStore(family), cells, 
                                                                             ▼  
                             if(upsert) {
                                           store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
                                        } else {   //  store 包含  Memstore 和 StoreFile 
                                                  store.add(cells, memstoreAccounting);
                                                            ▼  添加到内存
                                                                     memstore.add(cells, memstoreSizing);
                                                                  ↓
                                               AbstractMemStore.add
                                                            ▼  循环cell添加
                                               for(Cell cell : cells) { 
                                                    add(cell, memstoreSizing);    }
                                                      ——》AbstractMemStore.add
                                                          ——》doAddOrUpsert
                                                            ——》AbstractMemStore#doAdd()
                                                                 ——》AbstractMemStore#internalAdd()
                                                                    ——》 MutableSegment#add()
                                                                        ——》Segment#internalAdd()
                                                                                       ▼  
                                                                               //终于放进内存
                                                            5.1       boolean succ = getCellSet().add(cell);
                                                                                    //更新元数据信息 
                                                                                updateMetaInfo(cell, succ,  
                                                                                       ▲
                                                                      ——上接5.1 》CellSet.add()
                                                                                        ▼  
                                                                         / *   delegatee 是 ConcurrentSkipListMap
                                                                      *   即 memstore的内存结构: 跳表  
                                                                *   基于链表来实现的,
                                                     *  兼顾插入和排序,和查询效率,这是最终的步骤
                                                             */
                                                                              return this.delegatee.put(e, e) == null;
                                                                                                               ▲
                                   ——上接  6   完成写入》 batchOp.completeMiniBatchOperations
                                           ——》HRegion.BatchOperation.completeMiniBatchOperations
    
    3.2 -写入内存memstore 及Flush分析 Memstore形成HFile 结束 3:45:00
                             ——上接4 》requestFlushIfNeeded()
                                 ——》reqeustFlush()
                                 ——》requestFlush0()
                                                  ↓
                                          MemStoreFlusher#requestFlush()
                                                   ▼
                                      // 加入队列 一定有一个地方是取poll  fqe 来执行 flush                 
                                       this.flushQueue.add(fqe);
                                                      ↓     搜索flushQueue.poll 
                                                MemStoreFlusher#FlushHandler#run()
                                                      ▼ 
                                        //*** 注释: 取出队列执行处理                  
                                 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
                                               ......
                                   //*** 溢写! 
                                  if(!flushRegion(fre)) { 
                                    ——》flushRegion(HRegion region, boolean emergencyFlush,
                                                                     boolean forceFlushAllStores, FlushLifeCycleTracker tracker) 
                                                                                  ▼3件事
                                                1  FlushResult flushResult = region.flushcache(
                                                 // 注释:溢写完毕之后,判断是否需要进行 compact 
                                               2  boolean shouldCompact = flushResult.isCompactionNeeded();
                                              // 注释:溢写完毕之后,判断是否需要进行 split
                                               3   boolean shouldSplit = region.checkSplit() != null;
                                             ——1 》 HRegion#flushcache
                                                                             ▼
                                         //   注释: 执行真正的 Flush 方法                
                FlushResultImpl fs = internalFlushcache(specificStoresToFlush, status, 
                                                                                     writeFlushRequestWalMarker, tracker);
                                                        ——》HRegion#internalFlushcache
                                                            ——》HRegion#internalFlushCacheAndCommit
                                                                                                ▼
                                                                           flush.flushCache(status);
                                                                                                 ↓ 
                                                                              Hstore#flushCache
                                                                             ▼HStore.this.flushCache
                                                                                    ▼ flusher.flushSnapshot
                                                                                                            ↓
                                                                                   DefaultStoreFlusher#flushSnapshot   
                                                                                                ▼
                                                         / *   注释: writer = StoreFileWriter
                                                         *   flush 把  跳表中的数据 写到磁盘文件。  需要输出流
                                                         */                                                         
                                                               writer = store.createWriterInTmp(cellsCount,.
                                                                           /* 
                                                                              *   1、scanner 扫描器
                                                                              *   2、writer 输出流
                                                                              */
                                                              performFlush(scanner, writer, smallestReadPoint, throughputController);
                                                        ——》 StoreFlusher#performFlush()
                                                                                             ▼
                                                     /* 注释:sink = StoreFileWriter
                                                                   *   sink = Writer                          *   c = Cell
                                                                    */ 
                                                                sink.append(c);
                                                                               ↓
                                                               StoreFileWriter#append()
                                                                                ▼
                         // TODO_MA 注释: 每次溢写的一个HFile,我们都要为其构建一个 bloomfilter
                                  appendGeneralBloomfilter(cell);
                                   appendDeleteFamilyBloomFilter(cell);
    
                                   // TODO_MA 注释: 写出cell到文件
                                 writer.append(cell);
                                trackTimestamps(cell);
                                                                   ↓
                                          HFileWriterImpl.append(cell)
                                                                  ▼
                        / *   注释:
                            *   1、hbase的底层都是存储的 二进制字节数据
                            *   2、encode  编码, 事实上充当写出的动作 
                            */
                          this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, 
                           this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode;
                                                                   ↓
                                             NoOpDataBlockEncoder#encode
                                           ——》 NoneEncoder#write 
                                                                           ▼                                                          
                                                              /* : 真正写出数据
                                                                 *   数据终于写到 HFile 中了,完全符合期望
                                                                   *   把一个 Cell 对象,分成10个细节组成,写到 HFile 文件
                                                                    *   这个 HFile 文件的 输出流对象就是: out
                                                                   */
                                                          int size = KeyValueUtil.oswrite(cell, out, false);
                                                                           ▼   
                                                                      /*   把一个 Cell 对象,分成10个细节组成,写到 HFile 文件
                                                                       *   这个 HFile 文件的 输出流对象就是: out
                                                                      */
                                          // 注释: 写出 rowkey 数据
                                           out.write(cell.getRowArray(), cell.getRowOffset(), rlen);
                                           //  注释: 写出 family length
                                           // Write cf - 1 byte of cf length followed by the family bytes
                                           out.write(flen);
                                           out.write(cell.getFamilyArray(), cell.getFamilyOffset(), flen);
                                           // write qualifier
                                           out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qlen);
                                           // write timestamp
                                           StreamUtils.writeLong(out, cell.getTimestamp());
                                           // write the type
                                           out.write(cell.getTypeByte());
                                             // write value
                                             out.write(cell.getValueArray(), cell.getValueOffset(), vlen);
    
    image.png
    3.3 -split分析

    reqeustSplit()
    HBase2.x源码分析-Compact分析
    reqeustCompact()

    3.4 -Compact分析

    reqeustCompact()

    4 Get流程源码分析:

    代码入口:
    table.get(new Get())

    大致流程:
    1、首先客户端会请请求zookeepre拿到meta表的位置信息,这个meta表的region到底在那个regionserver1里面
    2、发请求请求这个regionserver1扫描这个meta表的数据,确定我要插入的数据rowkey到底在那个用户表的region里面。并且还拿到这个region在那个regioinserver2的信息
    3、发送请求,请求regionserver2
    1、首先去blockcache ,进行查询 读缓存
    2、先去布隆过滤器中进行判断
    1、如果判断这个rowkey不存在,则不需要扫描 HFile
    2、如果判断这个rowkey存在,则需要扫描HFile

    相关文章

      网友评论

        本文标题:Hbase架构师3-Hbase源码分析

        本文链接:https://www.haomeiwen.com/subject/hlkpyktx.html