Flink merge HDSF
小文件任务日志中有大量warn信息,如下:
2021-01-04 11:26:26,818 WARN org.apache.hadoop.hdfs.DFSClient [] - Slow ReadProcessor read fields took 126572ms (threshold=30000ms); ack: seqno: 20868 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 1209775 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.16.255.131:1019,DS-d25d5d48-64f7-47f6-b8e2-0c1c3e3a9d76,DISK], DatanodeInfoWithStorage[172.16.255.70:1019,DS-ed5eb437-0862-4f06-9598-18a2ab986b57,DISK], DatanodeInfoWithStorage[172.16.255.148:1019,DS-c44f87c9-c57e-4a48-9b6f-fce8361bcfc1,DISK]]
登入到日志中显示的节点上,进入日志的目录:
cd /opt/log/xdp/hadoop-hdfs/hdfs_datanode
用如下命令对每个Datanode上的日志来分析一下:
egrep -o "Slow.*?(took|cost)" hadoop-hdfs-datanode-bigdata029.xx.com.cn.log.1 | sort | uniq -c
该命令将提供DataNode日志中所有“Slow”
消息的计数,显示如下:
6 Slow BlockReceiver write packet to mirror took
1114 Slow flushOrSync took
1 Slow PacketResponder send ack to upstream took
1.如果单个节点的一个或多个类别的Slow消息
比其他主机的Slow
消息数量多出数量级,则需要调查底层硬件问题。
2.如果Slow消息数最多的是Slow BlockReceiver write packet tomirror took
,那么通过以下命令的输出来调查可能的网络问题:
ifconfig -a
定期检查问题主机上增加的errors和dropped
的数量,往往代表的是网卡,网线或者上游的网络有问题
netstat -s
与正常节点相比,查找大量重新传输的数据包或其他异常高的指标。
netstat -s | grep -i retrans
整个集群执行,在一个或多个节点上查找大于正常的计数。
3.如果Slow消息最多的是一些其他消息,使用以下命令检查磁盘问题:
iostat
高iowait百分比,超过15%
iostat -x和sar -d
特定分区的高await或%util
dmesg
磁盘错误
使用smartctl
对磁盘进行健康检查:停止受影响节点的所有Hadoop进程,然后运行sudo smartctl -H /dev/<disk>
,检查HDFS使用的每块<disk>
网卡MTU
任务如果跨机房做数据迁移的场景,要考虑网卡、交换机方面的设置,
网卡的MTU设置上,hadoop建议将网卡mtu值从1500设置为9000
,以支持接收jumbo frame
。我记得这个得交换机一起配合修改,光改服务器效果不明显。
DataXceiverServer线程
DataXceiverServer
线程是一个典型的Tcp Socket Server
。客户端每来一个TCP请求
,如果DN
上的DataXceiver
线程数量还没超过限制,就启动一个新的DataXceiver
线程。
默认的最大DataXceiver
线程数量为4096
,这个参数在15360
以内可以根据实际情况调整。
在DN中
有一个专门的线程组来维护这些线程,同时有一个守护线程来监视这个线程组的体量—DataXceiverServer
,它负责监测线程数量是否到达上线,超过就抛出异常
DataXceiverServer
线程在DataNode#runDatanodeDaemon()
方法中启动。
DataXceiverServer#run():
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
...// 检查DataXceiver线程的数量,超过最大限制就抛出IOE
// 启动一个新的DataXceiver线程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
...// 清理
} catch (OutOfMemoryError ie) {
...// 清理并sleep 30s
} catch (Throwable te) {
// 其他异常就关闭datanode
LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
...// 关闭peerServer并清理所有peers
}
在dfs中的open(),create()
的操作对应到真实的动作就是在服务端DN
new一个线程(socket)
来处理。
网友评论