美文网首页HBase
HBase 1.2.0源码分析:HRegionServer启动

HBase 1.2.0源码分析:HRegionServer启动

作者: Alex90 | 来源:发表于2018-12-17 18:31 被阅读1次

    RegionServer 实现类org.apache.hadoop.hbase.regionserver.HRegionServer.

    类描述:HRegionServer 管理一些 HRegion,使其对 Client 可用。需要与 HMaster 通信,通知状态。(HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There are many HRegionServers in a single HBase deployment.

    0. HRegionServer 初始化

    构造方法需要参数:
    conf 对应配置文件,csm 是一个协调服务,提供启停 Server 等方法

    public HRegionServer(Configuration conf) throws IOException, InterruptedException {
        // 构建默认 CoordinatedStateManager
        this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
    }
    
    public HRegionServer(Configuration conf, CoordinatedStateManager csm)
          throws IOException, InterruptedException {
        
        // ...
        
        // 测试 hbase.regionserver.codecs 配置的编码方式是可用的
        checkCodecs(this.conf);
        
        // 初始化 userProvider 用于权限认证
        this.userProvider = UserProvider.instantiate(conf);
        
        // 设置 short circuit read buffer,即短路本地读
        // 校验是否跳过 checksum,默认是 false,不推荐跳过
        // dfs.client.read.shortcircuit.skip.checksum
        FSUtils.setupShortCircuitRead(this.conf);
        
        // 设置一系列配置项
        // 设置客户端连接重试次数 hbase.client.retries.number(31)
        this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
        // 线程执行周期 hbase.server.thread.wakefrequency(10 * 1000)    
        // 周期内会执行 CompactionChecker
        this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
        // RegionServer 发消息给 Master 时间间隔,单位是毫秒
        this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
        this.sleeper = new Sleeper(this.msgInterval, this);
    
        // 设置 Nonce 标志,初始化 nonceManager
        // 客户端的每次申请及重复申请使用同一个 nonce 进行描述,解决 Client 重复操作提交的情况
        boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
        this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
    
        // 向 Master 进行 report 的 region 个数
        // HBase Client 操作的 Timeout 时间
        // HBase rpc 短操作的 Timeout 时间
        // ...
    
        // 创建 RegionServer 的 RPC 服务端
        rpcServices = createRpcServices();
        
        // 工厂类实例化
        rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
        rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
    
        // 用来记录 region server 中所有的 memstore 所占大小
        regionServerAccounting = new RegionServerAccounting();
    
        // 表示使用 zk 管理状态,并不会对Region状态进行持久化
        // 默认是 true,hbase.assignment.usezk
        useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
    
        // 初始化 fs,封装hdfs
        this.fs = new HFileSystem(this.conf, useHBaseChecksum);
        
        // 初始化 htable meta 信息
        this.tableDescriptors = new FSTableDescriptors(
          this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
    
        // 初始化rs处理任务的线程池,同master的任务线程池
        // 具体任务类型参考 ExecutorType
        service = new ExecutorService(getServerName().toShortString());
    
        if (!conf.getBoolean("hbase.testing.nocluster", false)) {
            // 创建当前rs与zk的连接
            zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
              rpcServices.isa.getPort(), this, canCreateBaseZNode());
    
            // csm在分布式环境下用来协调集群所有server的状态信息
            this.csm = (BaseCoordinatedStateManager) csm;
            this.csm.initialize(this);
            this.csm.start();
    
            // 基于zk实现的分布式的锁管理器,用于锁表
            tableLockManager = TableLockManager.createTableLockManager(
              conf, zooKeeper, serverName);
    
            // 创建master跟踪器,等待master的启动
            // 在zk节点上注册,zookeeper.znode.master
            masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
            masterAddressTracker.start();
    
            // 创建cluster的跟踪器,等待cluster的启动
            // master注册clusterid到zk节点(zookeeper.znode.state),表示集群已经启动
            clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
            clusterStatusTracker.start();
        }
    
        // 启动 Region Server RPC服务
        rpcServices.start();
        // 启动 jetty,Region Server WebUI
        putUpWebUI();
        
        // 负责当前 rs 上所有 wal 的log roll(滚动并清理日志)
        this.walRoller = new LogRoller(this, this);
        // 封装了线程池,负责周期性的调度任务
        // 心跳、检查compact、compact完成后hfile清理...
        this.choreService = new ChoreService(getServerName().toString(), true);
    
    }
    

    什么是短路本地读?

    在 HDFS 中,读取操作都是通过 Datanode 来进行的。
    当客户端向 Datanode 发起读取文件请求时,Datanode 从磁盘读取文件,并且通过 TCP socket 发送给客户端。
    所谓的“短路”就是不通过 Datanode,允许客户端直接读取文件。
    显然,这仅在客户端与数据位于同一位置的情况下才有可能。短路读取能让许多应用性能显著提升。
    

    什么是Nonce机制

    客户端发送 RPC 请求给服务器后,如果响应超时,那么客户端会重复发送请求,直到达到参数配置的重试次数上限。
    客户端第一次发送和以后重发请求时,会附带相同的 nonce,服务端只要根据 nonce 进行判断,就能得知是否为同一请求,
    并根据之前请求处理的结果,决定是等待、拒绝还是直接处理。
    

    1. HRegionServer 运行

    /**
     * The HRegionServer sticks in this loop until closed.
     */
    @Override
    public void run() {
        
        // 向HMaster注册之前完成一些初始化工作   
        // 在ZK节点 /hbase/rs 下创建当前region server信息的节点,HMaster 监听这个路径
        preRegistrationInitialization();
    
        try {
            if (!isStopped() && !isAborted()) {
                // 在ZK节点 /hbase/rs 下创建当前region server信息的节点
                createMyEphemeralNode();
                // 加载的 coprocessor,提供 coprocessor 的运行环境
                this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
            }
    
            while (keepLooping()) { // !this.stopped && isClusterUp();
                // 通知master,region server启动成功
                RegionServerStartupResponse w = reportForDuty();
                if (w == null) {
                    this.sleeper.sleep();
                } else {
                    // Sets up wal and starts up all server threads.
                    handleReportForDutyResponse(w);
                    break;
                }
            }
      
            // 启动 rspmHost \ rsQuotaManager
            // ...
    
            // The main run loop.
            while (!isStopped() && isHealthy()) {
                // 监控ZK节点(zookeeper.znode.state)
                if (!isClusterUp()) {
                    // .. 处理集群 down 的情况
                    // 关闭所有的 user region
                }
                long now = System.currentTimeMillis();
                // 定期报告心跳
                if ((now - lastMsg) >= msgInterval) {
                    tryRegionServerReport(lastMsg, now);
                    lastMsg = System.currentTimeMillis();
                    doMetrics();
                }
              
                // ...  
            } 
        } catch (Throwable t) {
            // ...
        }
        
        // Run shutdown ...
        // 关闭连接、服务、region、WAL、proxy
        // 清除ZK节点 /hbase/rs (强制删除 znode 存储文件)
        // 关闭rpcClient、rpcservice、monitor、线程池、Zookeeper
        // ...
    }
    

    1.1 preRegistrationInitialization

    预初始化

    private void preRegistrationInitialization(){
        try {
            // 初始化 clusterConnection、metaTableLocator
            setupClusterConnection();
        
            // Health checker thread.
            if (isHealthCheckerConfigured()) {
                int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
                HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
                healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
            }
            this.pauseMonitor = new JvmPauseMonitor(conf);
            pauseMonitor.start();
        
            // 初始化ZK连接
            initializeZooKeeper();
            if (!isStopped() && !isAborted()) {
                // 初始化线程
                initializeThreads();
            }
        } catch (Throwable t) {
            // ...
        }
    }
    
    /**
     * 初始化一系列线程、monitor、和连接
     */
    private void initializeThreads() throws IOException {
        // Cache flushing thread.
        this.cacheFlusher = new MemStoreFlusher(conf, this);
    
        // Compaction thread
        this.compactSplitThread = new CompactSplitThread(this);
    
        // check for compactions
        this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
        
        this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
        
        // creat lease monitor
        this.leases = new Leases(this.threadWakeFrequency);
    
        // Create the thread to clean the moved regions list
        movedRegionsCleaner = MovedRegionsCleaner.create(this);
    
        if (this.nonceManager != null) {
            // Create the scheduled chore that cleans up nonces.
            nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
        }
    
        // Setup the Quota Manager
        rsQuotaManager = new RegionServerQuotaManager(this);
        
        // Setup RPC client for master communication
        rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
            rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
    
        boolean onlyMetaRefresh = false;
        int storefileRefreshPeriod = conf.getInt(
            StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
          , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
        if (storefileRefreshPeriod == 0) {
            storefileRefreshPeriod = conf.getInt(
              StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
              StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
            onlyMetaRefresh = true;
        }
        if (storefileRefreshPeriod > 0) {
            this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
              onlyMetaRefresh, this, this);
        }
        registerConfigurationObservers();
    }
    
    

    1.2 handleReportForDutyResponse

    启动 WAL 和线程

    // response包含conf:
    // hbase.regionserver.hostname.seen.by.master
    // fs.default.name
    // hbase.rootdir
    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
      throws IOException {
        try {
            for (NameStringPair e : c.getMapEntriesList()) {
                String key = e.getName();
                // The hostname the master sees us as.
                if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
                    // master 为rs重新定义 hostname
                    // rs得到新的 serverName
                    String hostnameFromMasterPOV = e.getValue();
                    this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
                      rpcServices.isa.getPort(), this.startcode);
                
                    // 校验 hostname 
                    continue;
                }
                // 覆盖原有 conf
                String value = e.getValue();
                this.conf.set(key, value);
            }
    
            // ZK节点写到磁盘,用于处理程序异常情况
            ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
    
            // 创建 cacheConfig、walFactory
            this.cacheConfig = new CacheConfig(conf);
            this.walFactory = setupWALAndReplication();
            this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
    
            // 启动前一步创建的线程,启动ExecutorService
            startServiceThreads();
            startHeapMemoryManager();
    
            // 通知其他监听线程 rs online
            synchronized (online) {
                online.set(true);
                online.notifyAll();
            }
        } catch (Throwable e) {
            // ...
        }
    }
    

    2. HRegionServer主要干以下事情:

    • 在ZK上注册,表明rs启动
    • 跟 HMaster 通知
    • 设置 WAL 和 Replication
    • 注册协作器 RegionServerCoprocessorHost
    • 启动 hlogRoller
    • 定期刷新 memstore
    • 定期检测是否需要压缩合并
    • 启动 lease
    • 启动 jetty
    • 创建 SplitLogWorker,用于拆分 HLog
    • 启动快照管理
    • 创建其他工作线程和对象

    相关文章

      网友评论

        本文标题:HBase 1.2.0源码分析:HRegionServer启动

        本文链接:https://www.haomeiwen.com/subject/lkwckqtx.html