美文网首页
Flume接入log4j/log4cplus时SocketApp

Flume接入log4j/log4cplus时SocketApp

作者: 小胖墩 | 来源:发表于2017-10-23 18:12 被阅读0次

    先说两句:

    公司最近需要做一个数据分析平台,主要功能就是收集app中用户的行为日志,然后通过日志分析出一些通用的报表数据、用户行为预测等等,便于运营。

    日志怎么接入:

    接入的日志服务器端开发用的C++,日志打印用的log4cplus,使用java的同学可能比较熟悉log4j,其实都一样。Just so so。直接上配置文件:

    #flume_log

    log4cplus.appender.R6=log4cplus::SocketAppender

    log4cplus.appender.R6.host=10.132.34.12

    log4cplus.appender.R6.port=44444

    log4cplus.appender.R6.layout=log4cplus::PatternLayout

    log4cplus.appender.R6.layout.ConversionPattern=%m

    直接采用SocketAppender发送日志到远程服务器,从上面的配置可以得到,10.132.34.12就是Flume接受日志的服务器,44444就是Flume的端口号。

    Flume配置:

    由于采用的是SocketAppender,Flume这边可以采用SyslogTcpSource,直接上配置文件:

    a1.sources = s1

    a1.sinks = k1

    a1.channels = c1

    a1.sources.s1.type = syslogtcp

    a1.sources.s1.host = 10.132.34.12

    a1.sources.s1.port = 44444

    a1.sources.s1.channels = c1

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 10000

    a1.channels.c1.transactionCapacity = 100

    a1.sinks.k1.type = logger

    a1.sinks.k1.channel = c1

    开始配置的时候感觉一切怎么都这么顺利呢,So Easy有没有。后来测试的时候才发现现在才刚刚开始。

    问题1:消息怎么没有换行呢?

    消息接入的时候log4cplus里面的配置为:log4cplus.appender.R6.layout.ConversionPattern=%m,是不是加上'%n'就ok了,加上'%n'后测试发现问题没有解决,尝试手动在日志后面加上'\n'呢,还是不行。但是消息打印到本地磁盘,一些都那么美好,通过SocketAppender就是没有换行,崩溃中...

    问题2:为什么Flume接收的日志总有乱码?

    Flume通过logger Sink打印的日志中总有乱码,通过tcpdump发现tcp包中确实包含一些无关信息。难道是发送的时候数据就有问题,反复检查C++代码发现没有问题。

    咋办呢?

    本猿是一名java程序员,对C++不太了解。查看了下org.apache.log4j.net.SocketAppender的源码。发现了一些端倪。

    首先在SocketAppender中声明了一个ObjectOutputStream  oos;的变量,

    在connect方法内部对oos进行了实例化,oos=new ObjectOutputStream(newSocket(address,port).getOutputStream());

    在append方法内部调用的是:oos.writeObject(event);其中event类型是org.apache.log4j.spi.LoggingEvent。

    到此为什么出现乱码,为什么消息里的换行符不起作用看起来就明了了。

    原来SocketAppender发送的是一个序列化后的对象,而Flume的SyslogTcpSource接收到tcp包后没有进行反序列化,而是直接将收到的消息作为日志内容进行解析,出现乱码就不奇怪了。

    随后网上download了一份log4cplus的源码,发现里面的实现基本和java一致,SocketAppender发送的消息也是序列化后的对象。具体代码如下:

    其中convertToBuffer的代码如下:

    解决:

    1、客服端修改logcplus源码重新编译安装,在append方法内部修改发送内容,直接将消息内容发送到Flume(需要修改代码bool ret = socket.write(msgBuffer);),联系客户端 同学,说难度比较大,扩展性不好,放弃了。

    2、重写Flume的SyslogTcpSouce源码,在Flume端解析对象内容。C++端解析的源码如下:

    通过上面的代码,可以很容易的写出java的实现版本。

    注意:java解析到消息内容后,如果消息不是'\n'结尾,需要手动添加'\n',否则Flume无法正常解析日志内容。由于SyslogTcpSource的消息默认长度为2500Byte,所以当日志达到最大值的时候会切断消息内容。

    由此所有的问题看似都完美解决了。附修改后的完整代码一份,如下:

    packagecom.mirror.game.flume.source;

    importorg.apache.flume.ChannelException;

    importorg.apache.flume.*;

    importorg.apache.flume.conf.Configurable;

    importorg.apache.flume.conf.Configurables;

    importorg.apache.flume.source.AbstractSource;

    importorg.apache.flume.source.SyslogSourceConfigurationConstants;

    importorg.apache.flume.source.SyslogUtils;

    importorg.jboss.netty.bootstrap.ServerBootstrap;

    importorg.jboss.netty.buffer.ChannelBuffer;

    importorg.jboss.netty.buffer.ChannelBufferFactory;

    importorg.jboss.netty.buffer.ChannelBuffers;

    importorg.jboss.netty.channel.Channel;

    importorg.jboss.netty.channel.ChannelFactory;

    importorg.jboss.netty.channel.*;

    importorg.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

    importorg.slf4j.Logger;

    importorg.slf4j.LoggerFactory;

    importjava.net.InetSocketAddress;

    importjava.nio.ByteOrder;

    importjava.util.Map;

    importjava.util.Set;

    importjava.util.concurrent.Executors;

    importjava.util.concurrent.TimeUnit;

    /**

    *@Author: 小胖墩

    *@Description:

    *      接受log4cplus序列化后的对象,并对tcp包反序列化得到日志内容。log4cplus序列化和反序列化的过程参考socketappender.cxx的源码。

    *

    *      序列化过程如下:

    *      void convertToBuffer(SocketBuffer & buffer, const spi::InternalLoggingEvent& event, const tstring& serverName)

    *      {

    *

    *          buffer.appendByte(LOG4CPLUS_MESSAGE_VERSION);

    *          #ifndef UNICODE

    *              buffer.appendByte(1);

    *          #else

    *              buffer.appendByte(2);

    *          #endif

    *

    *          buffer.appendString(serverName);

    *          buffer.appendString(event.getLoggerName());

    *          buffer.appendInt(event.getLogLevel());

    *          buffer.appendString(event.getNDC());

    *          buffer.appendString(event.getMessage());

    *          buffer.appendString(event.getThread());

    *          buffer.appendInt( static_cast(event.getTimestamp().sec()) );

    *          buffer.appendInt( static_cast(event.getTimestamp().usec()) );

    *          buffer.appendString(event.getFile());

    *          buffer.appendInt(event.getLine());

    *          buffer.appendString(event.getFunction());

    *      }

    *

    *      反序列化过程如下:

    *      spi::InternalLoggingEvent readFromBuffer(SocketBuffer& buffer)

    *      {

    *          unsigned char msgVersion = buffer.readByte();

    *          if(msgVersion != LOG4CPLUS_MESSAGE_VERSION) {

    *              LogLog * loglog = LogLog::getLogLog();

    *              loglog->warn(LOG4CPLUS_TEXT("readFromBuffer() received socket message with an invalid version"));

    *          }

    *

    *          unsigned char sizeOfChar = buffer.readByte();

    *

    *          tstring serverName = buffer.readString(sizeOfChar);

    *          tstring loggerName = buffer.readString(sizeOfChar);

    *          LogLevel ll = buffer.readInt();

    *          tstring ndc = buffer.readString(sizeOfChar);

    *          if(! serverName.empty ()) {

    *              if(ndc.empty ()) {

    *                  ndc = serverName;

    *              }

    *              else {

    *                  ndc = serverName + LOG4CPLUS_TEXT(" - ") + ndc;

    *              }

    *          }

    *          tstring message = buffer.readString(sizeOfChar);

    *          tstring thread = buffer.readString(sizeOfChar);

    *          long sec = buffer.readInt();

    *          long usec = buffer.readInt();

    *          tstring file = buffer.readString(sizeOfChar);

    *          int line = buffer.readInt();

    *          tstring function = buffer.readString(sizeOfChar);

    *

    *          spi::InternalLoggingEvent ev (loggerName, ll, ndc,

    *              MappedDiagnosticContextMap (), message, thread, internal::empty_str,

    *              Time(sec, usec), file, line, function);

    *          return ev;

    *      }

    *

    *@Date: 2017/10/21 16:53

    *@ModifiedBy :

    */

    public classMirrorSyslogTcpSourceextendsAbstractSourceimplementsEventDrivenSource,Configurable {

    private static finalLoggerlogger= LoggerFactory.getLogger(MirrorSyslogTcpSource.class);

    private intport;

    privateStringhost=null;

    privateChannelnettyChannel;

    privateIntegereventSize;

    privateMapformaterProp;

    privateCounterGroupcounterGroup=newCounterGroup();

    privateSetkeepFields;

    public classsyslogTcpHandlerextendsSimpleChannelHandler {

    privateSyslogUtilssyslogUtils=newSyslogUtils();

    public voidsetEventSize(inteventSize){

    syslogUtils.setEventSize(eventSize);

    }

    public voidsetKeepFields(Set keepFields) {

    syslogUtils.setKeepFields(keepFields);

    }

    public voidsetFormater(Map prop) {

    syslogUtils.addFormats(prop);

    }

    @Override

    public voidmessageReceived(ChannelHandlerContext ctx,MessageEvent mEvent) {

    ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();

    while(buff.readable()) {

    try{

    intlength = buff.readInt();//消息总长度

    ChannelBuffer eventBuffer = buff.readBytes(length);//消息你内容,log4cplus将InternalLoggingEvent封装序列化后的结果

    intmessageVersion = eventBuffer.readByte();//消息版本号,log4cplus默认消息版本号为3

    intsizeOfChar = eventBuffer.readByte();//char的字节长度,unicode=1,否则=2

    intserverNameLength = eventBuffer.readInt();//获取server name

    ChannelBuffer serverNameBuffer =null;

    if(serverNameLength >0){

    serverNameBuffer = eventBuffer.readBytes(serverNameLength * sizeOfChar);//serverName

    }

    intloggerNameLength  = eventBuffer.readInt();

    ChannelBuffer loggerNameBuffer =null;

    if(loggerNameLength >0){

    loggerNameBuffer = eventBuffer.readBytes(loggerNameLength * sizeOfChar);//loggerName

    }

    intlogLevel = eventBuffer.readInt();//日志级别

    intndcLength = eventBuffer.readInt();// ndc

    ChannelBuffer ndcBuffer =null;

    if(ndcLength >0){

    ndcBuffer = eventBuffer.readBytes(ndcLength * sizeOfChar);

    }

    intmessageLength = eventBuffer.readInt();//消息内容

    ChannelBuffer messageBuffer =null;

    if(messageLength >0){

    intlen = messageLength * sizeOfChar;

    messageBuffer = eventBuffer.readBytes(len);

    /**

    * 必须在消息末尾添加‘\n’,否则消息解析失败.

    */

    byte[] messageArray = messageBuffer.array();

    if(messageArray[len-1] !='\n'){

    byte[] newArray =new byte[len+1];

    System.arraycopy(messageArray,0,newArray,0,len);

    newArray[len] ='\n';

    messageBuffer = ChannelBuffers.buffer(ByteOrder.LITTLE_ENDIAN,newArray.length);

    messageBuffer.writeBytes(newArray);

    }

    }

    if(logger.isDebugEnabled()){

    intthreadLength = eventBuffer.readInt();//线程名字

    ChannelBuffer threadBuffer =null;

    if(threadLength >0){

    threadBuffer = eventBuffer.readBytes(threadLength * sizeOfChar);

    }

    inttimeStampSec = eventBuffer.readInt();//时间戳

    inttimeStampUsec = eventBuffer.readInt();

    intfileLength =  eventBuffer.readInt();//打印日志的文件

    ChannelBuffer fileBuffer =null;

    if(fileLength >0){

    fileBuffer = eventBuffer.readBytes(fileLength * sizeOfChar);

    }

    intline = eventBuffer.readInt();//代码中的行数

    intfuncLength  = eventBuffer.readInt();//打印日志的方法名

    ChannelBuffer functionBuffer =null;

    if(funcLength >0){

    functionBuffer = eventBuffer.readBytes(funcLength * sizeOfChar);

    }

    StringBuilder sb =newStringBuilder("{");

    sb.append("length=").append(length).append(",")

    .append("messageVersion=").append(messageVersion).append(",")

    .append("serverNameLength=").append(serverNameLength).append(",")

    .append("serverName=").append(serverNameBuffer==null?"null":newString(serverNameBuffer.array(),"utf-8")).append(",")

    .append("loggerNameLength=").append(loggerNameLength).append(",")

    .append("loggerName=").append(loggerNameBuffer==null?"null":newString(loggerNameBuffer.array(),"utf-8")).append(",")

    .append("logLevel=").append(logLevel).append(",")

    .append("ndcLength=").append(ndcLength).append(",")

    .append("ndc=").append(ndcBuffer ==null?"null":newString(ndcBuffer.array(),"utf-8")).append(",")

    .append("messageLength=").append(messageLength).append(",")

    .append("message=").append(messageBuffer ==null?"null":newString(messageBuffer.array(),"utf-8")).append(",")

    .append("threadLength=").append(threadLength).append(",")

    .append("thread=").append(threadBuffer==null?"null":newString(threadBuffer.array(),"utf-8")).append(",")

    .append("timeStampSec=").append(timeStampSec).append(",")

    .append("timeStampUsec=").append(timeStampUsec).append(",")

    .append("fileLength=").append(fileLength).append(",")

    .append("file=").append(fileBuffer==null?"null":newString(fileBuffer.array(),"utf-8")).append(",")

    .append("line=").append(line).append(",")

    .append("funcLength=").append(funcLength).append(",")

    .append("func=").append(functionBuffer==null?"null":newString(functionBuffer.array(),"utf-8")).append(",")

    .append("}");

    logger.debug(sb.toString());

    }

    Event e =syslogUtils.extractEvent(messageBuffer);

    if(e ==null) {

    logger.warn("Event is null, Parsed partial event, event will be generated when rest of the event is received.");

    continue;

    }

    try{

    getChannelProcessor().processEvent(e);

    counterGroup.incrementAndGet("events.success");

    }catch(ChannelException ex) {

    counterGroup.incrementAndGet("events.dropped");

    logger.error("Error writting to channel, event dropped",ex);

    }

    }catch(Exception e){

    logger.error("read message error,",e);

    }

    }

    }

    }

    @Override

    public voidstart() {

    ChannelFactory factory =newNioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());

    ServerBootstrap serverBootstrap =newServerBootstrap(factory);

    serverBootstrap.setPipelineFactory(() -> {

    syslogTcpHandler handler =newsyslogTcpHandler();

    handler.setEventSize(eventSize);

    handler.setFormater(formaterProp);

    handler.setKeepFields(keepFields);

    returnChannels.pipeline(handler);

    });

    logger.info("Mirror Syslog TCP Source starting...");

    if(host==null) {

    nettyChannel= serverBootstrap.bind(newInetSocketAddress(port));

    }else{

    nettyChannel= serverBootstrap.bind(newInetSocketAddress(host,port));

    }

    super.start();

    }

    @Override

    public voidstop() {

    logger.info("Mirror Syslog TCP Source stopping...");

    logger.info("Metrics:{}",counterGroup);

    if(nettyChannel!=null) {

    nettyChannel.close();

    try{

    nettyChannel.getCloseFuture().await(60,TimeUnit.SECONDS);

    }catch(InterruptedException e) {

    logger.warn("netty server stop interrupted",e);

    }finally{

    nettyChannel=null;

    }

    }

    super.stop();

    }

    @Override

    public voidconfigure(Context context) {

    Configurables.ensureRequiredNonNull(context,SyslogSourceConfigurationConstants.CONFIG_PORT);

    port= context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);

    host= context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);

    /**

    * 默认每条日志记录的大小,默认2500字节。由于序列化的过程中会占用大量的空间,此处将默认大小设置为10*DEFAULT_SIZE

    */

    eventSize= context.getInteger("eventSize",SyslogUtils.DEFAULT_SIZE*10);

    formaterProp= context.getSubProperties(SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);

    keepFields= SyslogUtils.chooseFieldsToKeep(

    context.getString(

    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,

    SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));

    }

    }

    相关文章

      网友评论

          本文标题:Flume接入log4j/log4cplus时SocketApp

          本文链接:https://www.haomeiwen.com/subject/xpjdrttx.html