美文网首页
hadoop日志审计

hadoop日志审计

作者: JX907 | 来源:发表于2022-08-17 14:09 被阅读0次

    1、配置

    hadoop开启审计日志需要修改hadoop-env.sh

    export HADOOP_NAMENODE_OPTS=.... -Dhdfs.audit.logger={HDFS_AUDIT_LOGGER:-INFO,RFAAUDIT}HADOOP_NAMENODE_OPTS

    (RFAAUDIT默认已在log4j.properties中配置)

    2、源码分析:

    源码在FSNamesystem类中

    
    public static final Log auditLog = LogFactory.getLog(
    
    FSNamesystem.class.getName() + ".audit");
    
    对应log4j.properties中的配置:
    
    #
    
    # hdfs audit logging
    
    #
    
    hdfs.audit.logger=INFO,NullAppender
    
    hdfs.audit.log.maxfilesize=256MB
    
    hdfs.audit.log.maxbackupindex=20
    
    log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
    
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
    
    log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
    
    log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
    
    log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
    
    log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
    
    log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
    
    log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
    
    

    |

    输出日志是在logAuditEvent方法中,该方法有多个重载:

    FSNamesystem中最终实现体:

    
    private void logAuditEvent(boolean succeeded,
    
     UserGroupInformation ugi, InetAddress addr, String cmd, String src,
    
     String dst, HdfsFileStatus stat) {
    
    FileStatus status = null;
    
    if (stat != null) {
    
    Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
    
     Path path = dst != null ? new Path(dst) : new Path(src);
    
     status = new FileStatus(stat.getLen(), stat.isDir(),
    
     stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
    
     stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
    
     stat.getGroup(), symlink, path);
    
     }
    
     for (AuditLogger logger : auditLoggers) {
    
     if (logger instanceof HdfsAuditLogger) {
    
    HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
    
     hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
    
     status, ugi, dtSecretManager);
    
     } else {
    
    logger.logAuditEvent(succeeded, ugi.toString(), addr,
    
     cmd, src, dst, status);
    
     }
    
    }
    
    }
    
    

    |

    auditLoggers在hdfs-site.xml中配置dfs.namenode.audit.loggers(该参数目前未设置)

    默认实现类为:

    private static class DefaultAuditLogger extends HdfsAuditLogger {
    
    @Override
    
    public void logAuditEvent(boolean succeeded, String userName,
    
     InetAddress addr, String cmd, String src, String dst,
    
     FileStatus status, UserGroupInformation ugi,
    
     DelegationTokenSecretManager dtSecretManager) {
    
     if (auditLog.isInfoEnabled()) {
    
     final StringBuilder sb = auditBuffer.get();
    
     sb.setLength(0);
    
     sb.append("allowed=").append(succeeded).append("\t");
    
     sb.append("ugi=").append(userName).append("\t");
    
     sb.append("ip=").append(addr).append("\t");
    
     sb.append("cmd=").append(cmd).append("\t");
    
     sb.append("src=").append(src).append("\t");
    
     sb.append("dst=").append(dst).append("\t");
    
    if (null == status) {
    
    sb.append("perm=null");
    
     } else {
    
    sb.append("perm=");
    
     sb.append(status.getOwner()).append(":");
    
     sb.append(status.getGroup()).append(":");
    
     sb.append(status.getPermission());
    
     }
    
     if (logTokenTrackingId) {
    
    sb.append("\t").append("trackingId=");
    
     String trackingId = null;
    
    if (ugi != null && dtSecretManager != null
    
     && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
    
     for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
    
     if (tid instanceof DelegationTokenIdentifier) {
    
    DelegationTokenIdentifier dtid =
    
    (DelegationTokenIdentifier)tid;
    
     trackingId = dtSecretManager.getTokenTrackingId(dtid);
    
    break;
    
     }
    
    }
    
    }
    
    sb.append(trackingId);
    
     }
    
    sb.append("\t").append("proto=");
    
     sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
    
     logAuditMessage(sb.toString());
    
     }
    
    }
    

    |

    因为如需设置局部开启审计日志,需修改此处,或者自定义实现org.apache.hadoop.hdfs.server.namenode.AuditLogger。

    hadoop的审计日志格式如下:

    2014-04-30 10:19:13,173 INFO FSNamesystem.audit: allowed=true ugi=cdh5 (auth:SIMPLE) ip=/10.1.251.52 cmd=create src=/a.COPYING dst=null perm=cdh5:supergroup:rw-r--r--

    3、日志同步

    日志同步考虑使用elk技术栈,但在调研时发现logstash对性能消耗太大(参考https://blog.csdn.net/u010871982/article/details/79035317/),logstash官方文档推荐使用filebeat采集日志,再经由logstash最终写入到ES。Filebeat是Elastic Stack的一部分(go语言开发),这意味着它可以无缝地与Logstash、Elasticsearch和Kibana结合,考虑到我们需要在hadoop的namenode节点上运行日志采集服务,同时不需要配置日志内容的过滤策略,可以使用filebeat+ES的方式。

    3.1 filebeat安装

    下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-x86_64.rpm

    安装:rpm -vi filebeat-6.3.2-x86_64.rpm

    配置:

    配置文件位置:/etc/filebeat/filebeat.yml,配置input和output(当设置index名称时需设置setup.template.name、setup.template.pattern。官网文档参考:https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-template.html

    
    filebeat.inputs:
    
    - type: log
    
    enabled: true
    
    paths:
    
    - /usr/local/hadoop-2.6.5/logs/yarn-hadoop-resourcemanager-luckybinamenode01.intranet.test01.tj1.log
    
    output.elasticsearch:
    
    hosts: ["10.104.106.232:9200","10.104.106.233:9200","10.104.106.234:9200"]
    
    index: "hadoopaudit"
    
    username: "sdk"
    
    password: "pass4Sdk"
    
    loadbalance: true
    
    setup.template.name: "hadoopaudit"
    
    setup.template.pattern: "hadoopaudit-*"
    
    

    |

    es中配置模板:

    
    {   
    
        "template" : "hadoopaudit",
    
        "index_patterns": ["hadoopaudit-*"],
    
        "settings": {
    
            "number_of_shards": "5"
    
        },
    
        "mappings": {
    
            "doc": {
    
                "properties": {
    
                    "message": {
    
                        "type": "string",
    
                        "index": "analyzed",
    
                        "analyzer": "ik"
    
                    },
    
                    "@timestamp": {
    
                        "format": "strict_date_optional_time||epoch_millis",
    
                        "type": "date"
    
                    },
    
                    "input": {
    
                        "properties": {
    
                            "type": {
    
                                "type": "string"
    
                            }
    
                        }
    
                    },
    
                    "beat": {
    
                        "properties": {
    
                            "name": {
    
                                "type": "string"
    
                            },
    
                            "hostname": {
    
                                "type": "string"
    
                            },
    
                            "version": {
    
                                "type": "string"
    
                            }
    
                        }
    
                    },
    
                    "host": {
    
                        "properties": {
    
                            "name": {
    
                                "type": "string"
    
                            }
    
                        }
    
                    },
    
                    "source": {
    
                        "type": "string"
    
                    },
    
                    "prospector": {
    
                        "properties": {
    
                            "type": {
    
                                "type": "string"
    
                            }
    
                        }
    
                    },
    
                    "offset": {
    
                        "type": "long"
    
                    }
    
                }
    
            }
    
        }
    
    }
    
    

    |

    启动:service filebeat start

    filebeat的日志路径:/var/log/filebeat

    相关文章

      网友评论

          本文标题:hadoop日志审计

          本文链接:https://www.haomeiwen.com/subject/bkbjgrtx.html