RegionServer 实现类org.apache.hadoop.hbase.regionserver.HRegionServer
.
类描述:HRegionServer 管理一些 HRegion,使其对 Client 可用。需要与 HMaster 通信,通知状态。(
HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There are many HRegionServers in a single HBase deployment.
)
0. HRegionServer 初始化
构造方法需要参数:
conf 对应配置文件,csm 是一个协调服务,提供启停 Server 等方法
public HRegionServer(Configuration conf) throws IOException, InterruptedException {
// 构建默认 CoordinatedStateManager
this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
}
public HRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
// ...
// 测试 hbase.regionserver.codecs 配置的编码方式是可用的
checkCodecs(this.conf);
// 初始化 userProvider 用于权限认证
this.userProvider = UserProvider.instantiate(conf);
// 设置 short circuit read buffer,即短路本地读
// 校验是否跳过 checksum,默认是 false,不推荐跳过
// dfs.client.read.shortcircuit.skip.checksum
FSUtils.setupShortCircuitRead(this.conf);
// 设置一系列配置项
// 设置客户端连接重试次数 hbase.client.retries.number(31)
this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
// 线程执行周期 hbase.server.thread.wakefrequency(10 * 1000)
// 周期内会执行 CompactionChecker
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
// RegionServer 发消息给 Master 时间间隔,单位是毫秒
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
// 设置 Nonce 标志,初始化 nonceManager
// 客户端的每次申请及重复申请使用同一个 nonce 进行描述,解决 Client 重复操作提交的情况
boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
// 向 Master 进行 report 的 region 个数
// HBase Client 操作的 Timeout 时间
// HBase rpc 短操作的 Timeout 时间
// ...
// 创建 RegionServer 的 RPC 服务端
rpcServices = createRpcServices();
// 工厂类实例化
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
// 用来记录 region server 中所有的 memstore 所占大小
regionServerAccounting = new RegionServerAccounting();
// 表示使用 zk 管理状态,并不会对Region状态进行持久化
// 默认是 true,hbase.assignment.usezk
useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
// 初始化 fs,封装hdfs
this.fs = new HFileSystem(this.conf, useHBaseChecksum);
// 初始化 htable meta 信息
this.tableDescriptors = new FSTableDescriptors(
this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
// 初始化rs处理任务的线程池,同master的任务线程池
// 具体任务类型参考 ExecutorType
service = new ExecutorService(getServerName().toShortString());
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
// 创建当前rs与zk的连接
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
rpcServices.isa.getPort(), this, canCreateBaseZNode());
// csm在分布式环境下用来协调集群所有server的状态信息
this.csm = (BaseCoordinatedStateManager) csm;
this.csm.initialize(this);
this.csm.start();
// 基于zk实现的分布式的锁管理器,用于锁表
tableLockManager = TableLockManager.createTableLockManager(
conf, zooKeeper, serverName);
// 创建master跟踪器,等待master的启动
// 在zk节点上注册,zookeeper.znode.master
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
masterAddressTracker.start();
// 创建cluster的跟踪器,等待cluster的启动
// master注册clusterid到zk节点(zookeeper.znode.state),表示集群已经启动
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
}
// 启动 Region Server RPC服务
rpcServices.start();
// 启动 jetty,Region Server WebUI
putUpWebUI();
// 负责当前 rs 上所有 wal 的log roll(滚动并清理日志)
this.walRoller = new LogRoller(this, this);
// 封装了线程池,负责周期性的调度任务
// 心跳、检查compact、compact完成后hfile清理...
this.choreService = new ChoreService(getServerName().toString(), true);
}
什么是短路本地读?
在 HDFS 中,读取操作都是通过 Datanode 来进行的。
当客户端向 Datanode 发起读取文件请求时,Datanode 从磁盘读取文件,并且通过 TCP socket 发送给客户端。
所谓的“短路”就是不通过 Datanode,允许客户端直接读取文件。
显然,这仅在客户端与数据位于同一位置的情况下才有可能。短路读取能让许多应用性能显著提升。
什么是Nonce机制
客户端发送 RPC 请求给服务器后,如果响应超时,那么客户端会重复发送请求,直到达到参数配置的重试次数上限。
客户端第一次发送和以后重发请求时,会附带相同的 nonce,服务端只要根据 nonce 进行判断,就能得知是否为同一请求,
并根据之前请求处理的结果,决定是等待、拒绝还是直接处理。
1. HRegionServer 运行
/**
* The HRegionServer sticks in this loop until closed.
*/
@Override
public void run() {
// 向HMaster注册之前完成一些初始化工作
// 在ZK节点 /hbase/rs 下创建当前region server信息的节点,HMaster 监听这个路径
preRegistrationInitialization();
try {
if (!isStopped() && !isAborted()) {
// 在ZK节点 /hbase/rs 下创建当前region server信息的节点
createMyEphemeralNode();
// 加载的 coprocessor,提供 coprocessor 的运行环境
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
}
while (keepLooping()) { // !this.stopped && isClusterUp();
// 通知master,region server启动成功
RegionServerStartupResponse w = reportForDuty();
if (w == null) {
this.sleeper.sleep();
} else {
// Sets up wal and starts up all server threads.
handleReportForDutyResponse(w);
break;
}
}
// 启动 rspmHost \ rsQuotaManager
// ...
// The main run loop.
while (!isStopped() && isHealthy()) {
// 监控ZK节点(zookeeper.znode.state)
if (!isClusterUp()) {
// .. 处理集群 down 的情况
// 关闭所有的 user region
}
long now = System.currentTimeMillis();
// 定期报告心跳
if ((now - lastMsg) >= msgInterval) {
tryRegionServerReport(lastMsg, now);
lastMsg = System.currentTimeMillis();
doMetrics();
}
// ...
}
} catch (Throwable t) {
// ...
}
// Run shutdown ...
// 关闭连接、服务、region、WAL、proxy
// 清除ZK节点 /hbase/rs (强制删除 znode 存储文件)
// 关闭rpcClient、rpcservice、monitor、线程池、Zookeeper
// ...
}
1.1 preRegistrationInitialization
预初始化
private void preRegistrationInitialization(){
try {
// 初始化 clusterConnection、metaTableLocator
setupClusterConnection();
// Health checker thread.
if (isHealthCheckerConfigured()) {
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}
this.pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
// 初始化ZK连接
initializeZooKeeper();
if (!isStopped() && !isAborted()) {
// 初始化线程
initializeThreads();
}
} catch (Throwable t) {
// ...
}
}
/**
* 初始化一系列线程、monitor、和连接
*/
private void initializeThreads() throws IOException {
// Cache flushing thread.
this.cacheFlusher = new MemStoreFlusher(conf, this);
// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// check for compactions
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
// creat lease monitor
this.leases = new Leases(this.threadWakeFrequency);
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.create(this);
if (this.nonceManager != null) {
// Create the scheduled chore that cleans up nonces.
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
}
// Setup the Quota Manager
rsQuotaManager = new RegionServerQuotaManager(this);
// Setup RPC client for master communication
rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
boolean onlyMetaRefresh = false;
int storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if (storefileRefreshPeriod == 0) {
storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
onlyMetaRefresh = true;
}
if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh, this, this);
}
registerConfigurationObservers();
}
1.2 handleReportForDutyResponse
启动 WAL 和线程
// response包含conf:
// hbase.regionserver.hostname.seen.by.master
// fs.default.name
// hbase.rootdir
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
throws IOException {
try {
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
// master 为rs重新定义 hostname
// rs得到新的 serverName
String hostnameFromMasterPOV = e.getValue();
this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
rpcServices.isa.getPort(), this.startcode);
// 校验 hostname
continue;
}
// 覆盖原有 conf
String value = e.getValue();
this.conf.set(key, value);
}
// ZK节点写到磁盘,用于处理程序异常情况
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
// 创建 cacheConfig、walFactory
this.cacheConfig = new CacheConfig(conf);
this.walFactory = setupWALAndReplication();
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
// 启动前一步创建的线程,启动ExecutorService
startServiceThreads();
startHeapMemoryManager();
// 通知其他监听线程 rs online
synchronized (online) {
online.set(true);
online.notifyAll();
}
} catch (Throwable e) {
// ...
}
}
2. HRegionServer主要干以下事情:
- 在ZK上注册,表明rs启动
- 跟 HMaster 通知
- 设置 WAL 和 Replication
- 注册协作器 RegionServerCoprocessorHost
- 启动 hlogRoller
- 定期刷新 memstore
- 定期检测是否需要压缩合并
- 启动 lease
- 启动 jetty
- 创建 SplitLogWorker,用于拆分 HLog
- 启动快照管理
- 创建其他工作线程和对象
网友评论