在进行项目学习的时候,有个需求是将日志数据经过flume收集,然后sink到HBase中。
经过查看官方文档,发现flume内置支持两种HBasesink。如下图所示:

其中HBaseSink和HBase2Sink几乎完全一样,只是针对的版本不同而已。
AsyncHBaseSink使用了 Asynchbase API来写HBase,这个API具有完全异步的,非阻塞的,线程安全的,高性能的特性。同时,万一有些events写入HBase失败,那么此sink会replay那个事务中的所有events。
一、flume内置的AsyncHBaseSink分析
在flume官网上下载一下src包,并导入IDEA中。我的版本是1.7.0。目录结构如下:

我们要关注的就是箭头指向的类。点进这个文件中。
1.1 AsyncHBaseSink类字段
首先看一下这个类的一些字段,我的截图不全,因为没必要完全把每个字段都弄明白。That is to say ,可以但没必要=!=

我们可以看到这个类继承了AbstractSink
这个抽象类,并实现了Configurable
接口。
还可以看到一些顾名思义的字段,比如:要写入的HBase表名,列族等等。
现在我们要关注的是这三个字段:
private AsyncHbaseEventSerializer serializer; //一会讲
private String eventSerializerType; //一会讲
private Context serializerContext; //一会讲
首先先给出这三个字段是做什么的。
①serializer:它的类型是AsyncHbaseEventSerializer
接口,这个接口定义了如下方法:

这个接口的作用就是让子类自己自由实现封装向HBase发送数据的请求。也就是getActions()
方法。然后在AsyncHBaseSink的process方法中调用serializer.getActions()就可以得到这些请求,然后写到HBase中。
②eventSerializerType: 这个就是用来从flume的配置文件中读取serializer参数。以确定使用哪个AsyncHbaseEventSerializer具体的实现类

可以看到如果不配置的话,默认就是使用这个org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
类。

然后通过反射,把serializer实例化成eventSerializerType所表示的类。如上图所示。在上图中,我还把我们要介绍的第三个字段serializerContext
也圈起来了。
③serializerContext:这个字段是Context类型。在flume中,Context内部存了一个HashMap,保存的都是键对。所以我们可以把Context简单的理解为存储键值信息的一个类。为什么叫serializerContext呢,因为通过反射实例化的serializer对象也需要从配置文件中获取一些配置信息。所以这个对象相当于就是跟serializer捆绑在一起的。通过上图的putAll函数把配置信息存进来。其中用到了context的getSubProperties()方法。这个方法我举个例子说明。
比如flume的配置文件中有如下配置:
agent1.sinks.hbaseSink.serializer =ClassA
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid
那么getSubProperties("serializer")
的结果就是<"payloadColumn", "datatime,userid">
到了这,我们最关注的的三个字段讲完了。
1.2. AsyncHBaseSink的 process()过程
这个方法就是真正向HBase中插入数据的方法:

首先看上图第一个红框,flume收到的event传给serializer去序列化,然后再通过getActions获得最终要传输的封装好的数据。和我们上一节介绍字段时描述的一样。
第二个红框就是把一个一个的封装好的数据PutRequest对象写入到HBase了。
1.3 AsyncHbaseEventSerializer的默认实现SimpleAsyncHbaseEventSerializer
上一节中我们说过,如果不在配置文件中指定serializer,那么默认就由SimpleAsyncHbaseEventSerializer这个实现类来处理event的序列化。我们最终要自定义的Sink,也是仿照这个类去写的。所以我们这节看一下这个类。

这是这个类的一些字段。看到有payload和payloadColumn,这是干什么的呢? 不知道没关系,我先告诉大家,下面还会有分析。 payload就是列的值,payloadColumn就是列名。
为什么这么说呢?因为在构造向HBase中写数据的请求时,new了一个PutRequest的实例,并把payload和payloadColumn当做参数传入。我们直接看PutRequest类的构造方法参数是什么意思就知道了。如下:

豁然开朗。
二、实现符合自己需求的自定义AsynHBaseSink
2.1 为什么要进行二次开发?
首先明确一个问题,为什么我们要自定义AsynHBaseSink,而不是直接使用flume自带的。肯定是需求不满足我们的要求啊。怎么个不满足法儿呢?且听我娓娓道来。

在SimpleAsyncHbaseEventSerializer类的configure方法中,会为rowPrefix成员变量赋一个默认值(当我们没有在配置文件中配置rowPrefix的时候)。默认是字符串"default"。
然后在构造PutRequest的时候,会使用这个rowPrefix去生成rowkey。

以getTimestampKey
为例。返回的是 rowPrefix + 当前时间戳。

在实际生产中,rowkey的设计好坏可以极大的影响HBase的查询性能。所以,我们一般都是自己设计rowkey,而不会使用flume自带的这个simple,simple,simple的SimpleRowKeyGenerator
。所以这才是我们需要二次开发的理由。
2.2 如何二次开发?
①:写一个符合我们自己的rowkey生成函数。

②:getActions方法中,根据我们自己的需求去构造PutRequest。

2.3 打成jar包放到flume的lib文件夹下并测试程序。


启动flume成功

网友评论