1、配置
hadoop开启审计日志需要修改hadoop-env.sh
export HADOOP_NAMENODE_OPTS=.... -Dhdfs.audit.logger=HADOOP_NAMENODE_OPTS
(RFAAUDIT默认已在log4j.properties中配置)
2、源码分析:
源码在FSNamesystem类中
public static final Log auditLog = LogFactory.getLog(
FSNamesystem.class.getName() + ".audit");
对应log4j.properties中的配置:
#
# hdfs audit logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
|
输出日志是在logAuditEvent方法中,该方法有多个重载:
FSNamesystem中最终实现体:
private void logAuditEvent(boolean succeeded,
UserGroupInformation ugi, InetAddress addr, String cmd, String src,
String dst, HdfsFileStatus stat) {
FileStatus status = null;
if (stat != null) {
Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
Path path = dst != null ? new Path(dst) : new Path(src);
status = new FileStatus(stat.getLen(), stat.isDir(),
stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), symlink, path);
}
for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
status, ugi, dtSecretManager);
} else {
logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status);
}
}
}
|
auditLoggers在hdfs-site.xml中配置dfs.namenode.audit.loggers(该参数目前未设置)
默认实现类为:
private static class DefaultAuditLogger extends HdfsAuditLogger {
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) {
final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(userName).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(src).append("\t");
sb.append("dst=").append(dst).append("\t");
if (null == status) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(status.getOwner()).append(":");
sb.append(status.getGroup()).append(":");
sb.append(status.getPermission());
}
if (logTokenTrackingId) {
sb.append("\t").append("trackingId=");
String trackingId = null;
if (ugi != null && dtSecretManager != null
&& ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
if (tid instanceof DelegationTokenIdentifier) {
DelegationTokenIdentifier dtid =
(DelegationTokenIdentifier)tid;
trackingId = dtSecretManager.getTokenTrackingId(dtid);
break;
}
}
}
sb.append(trackingId);
}
sb.append("\t").append("proto=");
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
logAuditMessage(sb.toString());
}
}
|
因为如需设置局部开启审计日志,需修改此处,或者自定义实现org.apache.hadoop.hdfs.server.namenode.AuditLogger。
hadoop的审计日志格式如下:
2014-04-30 10:19:13,173 INFO FSNamesystem.audit: allowed=true ugi=cdh5 (auth:SIMPLE) ip=/10.1.251.52 cmd=create src=/a.COPYING dst=null perm=cdh5:supergroup:rw-r--r--
3、日志同步
日志同步考虑使用elk技术栈,但在调研时发现logstash对性能消耗太大(参考https://blog.csdn.net/u010871982/article/details/79035317/),logstash官方文档推荐使用filebeat采集日志,再经由logstash最终写入到ES。Filebeat是Elastic Stack的一部分(go语言开发),这意味着它可以无缝地与Logstash、Elasticsearch和Kibana结合,考虑到我们需要在hadoop的namenode节点上运行日志采集服务,同时不需要配置日志内容的过滤策略,可以使用filebeat+ES的方式。
3.1 filebeat安装
下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-x86_64.rpm
安装:rpm -vi filebeat-6.3.2-x86_64.rpm
配置:
配置文件位置:/etc/filebeat/filebeat.yml,配置input和output(当设置index名称时需设置setup.template.name、setup.template.pattern。官网文档参考:https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-template.html)
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/hadoop-2.6.5/logs/yarn-hadoop-resourcemanager-luckybinamenode01.intranet.test01.tj1.log
output.elasticsearch:
hosts: ["10.104.106.232:9200","10.104.106.233:9200","10.104.106.234:9200"]
index: "hadoopaudit"
username: "sdk"
password: "pass4Sdk"
loadbalance: true
setup.template.name: "hadoopaudit"
setup.template.pattern: "hadoopaudit-*"
|
es中配置模板:
{
"template" : "hadoopaudit",
"index_patterns": ["hadoopaudit-*"],
"settings": {
"number_of_shards": "5"
},
"mappings": {
"doc": {
"properties": {
"message": {
"type": "string",
"index": "analyzed",
"analyzer": "ik"
},
"@timestamp": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"input": {
"properties": {
"type": {
"type": "string"
}
}
},
"beat": {
"properties": {
"name": {
"type": "string"
},
"hostname": {
"type": "string"
},
"version": {
"type": "string"
}
}
},
"host": {
"properties": {
"name": {
"type": "string"
}
}
},
"source": {
"type": "string"
},
"prospector": {
"properties": {
"type": {
"type": "string"
}
}
},
"offset": {
"type": "long"
}
}
}
}
}
|
启动:service filebeat start
filebeat的日志路径:/var/log/filebeat
网友评论