美文网首页
Flume自定义AsynHBaseSink

Flume自定义AsynHBaseSink

作者: 小北觅 | 来源:发表于2019-04-17 19:32 被阅读0次

    在进行项目学习的时候,有个需求是将日志数据经过flume收集,然后sink到HBase中。
    经过查看官方文档,发现flume内置支持两种HBasesink。如下图所示:

    其中HBaseSink和HBase2Sink几乎完全一样,只是针对的版本不同而已。
    AsyncHBaseSink使用了 Asynchbase API来写HBase,这个API具有完全异步的,非阻塞的,线程安全的,高性能的特性。同时,万一有些events写入HBase失败,那么此sink会replay那个事务中的所有events。

    一、flume内置的AsyncHBaseSink分析

    在flume官网上下载一下src包,并导入IDEA中。我的版本是1.7.0。目录结构如下:

    我们要关注的就是箭头指向的类。点进这个文件中。

    1.1 AsyncHBaseSink类字段

    首先看一下这个类的一些字段,我的截图不全,因为没必要完全把每个字段都弄明白。That is to say ,可以但没必要=!=

    我们可以看到这个类继承了AbstractSink这个抽象类,并实现了Configurable接口。
    还可以看到一些顾名思义的字段,比如:要写入的HBase表名,列族等等。

    现在我们要关注的是这三个字段:

      private AsyncHbaseEventSerializer serializer;   //一会讲
      private String eventSerializerType;   //一会讲
      private Context serializerContext;   //一会讲
    

    首先先给出这三个字段是做什么的。

    ①serializer:它的类型是AsyncHbaseEventSerializer接口,这个接口定义了如下方法:

    这个接口的作用就是让子类自己自由实现封装向HBase发送数据的请求。也就是getActions()方法。然后在AsyncHBaseSink的process方法中调用serializer.getActions()就可以得到这些请求,然后写到HBase中。

    ②eventSerializerType: 这个就是用来从flume的配置文件中读取serializer参数。以确定使用哪个AsyncHbaseEventSerializer具体的实现类

    可以看到如果不配置的话,默认就是使用这个org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer类。

    然后通过反射,把serializer实例化成eventSerializerType所表示的类。如上图所示。在上图中,我还把我们要介绍的第三个字段serializerContext也圈起来了。

    ③serializerContext:这个字段是Context类型。在flume中,Context内部存了一个HashMap,保存的都是键对。所以我们可以把Context简单的理解为存储键值信息的一个类。为什么叫serializerContext呢,因为通过反射实例化的serializer对象也需要从配置文件中获取一些配置信息。所以这个对象相当于就是跟serializer捆绑在一起的。通过上图的putAll函数把配置信息存进来。其中用到了context的getSubProperties()方法。这个方法我举个例子说明。

    比如flume的配置文件中有如下配置:

    agent1.sinks.hbaseSink.serializer =ClassA
    agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid
    

    那么getSubProperties("serializer")的结果就是<"payloadColumn", "datatime,userid">

    到了这,我们最关注的的三个字段讲完了。

    1.2. AsyncHBaseSink的 process()过程

    这个方法就是真正向HBase中插入数据的方法:

    首先看上图第一个红框,flume收到的event传给serializer去序列化,然后再通过getActions获得最终要传输的封装好的数据。和我们上一节介绍字段时描述的一样。

    第二个红框就是把一个一个的封装好的数据PutRequest对象写入到HBase了。

    1.3 AsyncHbaseEventSerializer的默认实现SimpleAsyncHbaseEventSerializer

    上一节中我们说过,如果不在配置文件中指定serializer,那么默认就由SimpleAsyncHbaseEventSerializer这个实现类来处理event的序列化。我们最终要自定义的Sink,也是仿照这个类去写的。所以我们这节看一下这个类。

    这是这个类的一些字段。看到有payload和payloadColumn,这是干什么的呢? 不知道没关系,我先告诉大家,下面还会有分析。 payload就是列的值,payloadColumn就是列名。

    为什么这么说呢?因为在构造向HBase中写数据的请求时,new了一个PutRequest的实例,并把payload和payloadColumn当做参数传入。我们直接看PutRequest类的构造方法参数是什么意思就知道了。如下:

    豁然开朗。

    二、实现符合自己需求的自定义AsynHBaseSink

    2.1 为什么要进行二次开发?

    首先明确一个问题,为什么我们要自定义AsynHBaseSink,而不是直接使用flume自带的。肯定是需求不满足我们的要求啊。怎么个不满足法儿呢?且听我娓娓道来。

    在SimpleAsyncHbaseEventSerializer类的configure方法中,会为rowPrefix成员变量赋一个默认值(当我们没有在配置文件中配置rowPrefix的时候)。默认是字符串"default"。

    然后在构造PutRequest的时候,会使用这个rowPrefix去生成rowkey。

    getTimestampKey为例。返回的是 rowPrefix + 当前时间戳。

    在实际生产中,rowkey的设计好坏可以极大的影响HBase的查询性能。所以,我们一般都是自己设计rowkey,而不会使用flume自带的这个simple,simple,simple的SimpleRowKeyGenerator。所以这才是我们需要二次开发的理由。

    2.2 如何二次开发?

    ①:写一个符合我们自己的rowkey生成函数。

    ②:getActions方法中,根据我们自己的需求去构造PutRequest。


    2.3 打成jar包放到flume的lib文件夹下并测试程序。

    启动flume成功


    相关文章

      网友评论

          本文标题:Flume自定义AsynHBaseSink

          本文链接:https://www.haomeiwen.com/subject/doxgwqtx.html