前言
前面我们已经说过了flume的简单入门,这篇文章继续深入,来熟悉下source,并通过自定义 source 来了解其工作原理,接下来的一系列文章都会以flume的各个小组件慢慢深入,欢迎和我一起学习
source 到 channel 过程
source-to-channel.png- 上图大致描述了 source 收集到数据推送到 channel 的基本过程,可以发现中间多出了一个 channel processor 的组件
- source 收集到的数据会经过拦截器链进行过滤,然后通过channel selector 发送到对应的 channel,从中我们可以想到,如果你要多数据进行特别的处理,可以自定义拦截器,可以通过selector来控制数据储存的channel的位置。
source 是如何产生数据的
source 分为两大类:PollableSource 和 EventDrivenSource,不过笔者倒是没怎么弄清楚,这两大类区分的目的何在?如果你有什么想法,欢迎留言指教。
- PollableSource
public interface PollableSource extends Source {
public Status process() throws EventDeliveryException;
public static enum Status {READY, BACKOFF}
}
- 当一个agent 启动之后,就会不断循环调用 process 以获取数据
- 当
process
返回 READY,表示数据产生正常,如果是 BACKOFF 则表示异常,当产生异常时候,agent 会等待一段时间再来调用process
,异常次数越多,间隔时间越长,最长不超过 5s。 - 自带一个线程,工作都是在自己的独立线程之内的
- EventDrivenSource
public interface EventDrivenSource extends Source
- 简单的一个标记接口,区分 PollableSource
- 运行流程
- 当一个agent启动时候,会开始执行 application 的
main()
方法 - 进程启动之后,会通过
AbstractConfigurationProvider$getConfiguration
解析配置文件中的各个组件和属性 - 针对 source 会生成 sourceRunner 通过
supervisor
来运行和管理其生命周期。 - source 的生命周期
start
方法正式开始执行,这样也就到了我们将要自定义代码的实现执行了。
这里只是大概说了一下流程,具体详情还是需要自己看源码的,我们的目的就是梳理一下整个流程,知道自己一个大概就好了,深究反而落的下乘,同时也是为了接下来自定义 source 打个基础,知道我们自己写的东西是怎么运行的。
自定义source
- 创建一个类,继承自
AbstractSource
并实现Configurable
和(EventDrivenSource
或者PollableSource
) - 实现相关方法,以下是简单的一个生成序列的source
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.inveno.flume;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {
private static final Logger logger = LoggerFactory
.getLogger(SequenceSource.class);
private long seq;
private int batchSize = 10;
private List<Event> batchArrayList = new ArrayList<>();
@Override
public void configure(Context context) {
//自定义配置属性
batchSize = context.getInteger("batchSize", 1);
//打印自定义属性
ImmutableMap<String, String> map = context.getParameters();
for (String s : map.keySet()) {
logger.warn(s + "==============configure=============================" + map.get(s));
}
}
private void process(){
try {
batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
if(batchArrayList.size()>=batchSize){
getChannelProcessor().processEventBatch(batchArrayList);
batchArrayList.clear();
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
//开启一个线程来生产数据,当然你也可以整个线程池
new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
//这里有个java知识点 ,InterruptedException捕获后,
// 这个标记点会被重置 ,需要再次 interrupt才能正确退出
Thread.currentThread().interrupt();
}
process();
}
}
}).start();
logger.debug("==========================start");
}
@Override
public void stop() {
super.stop();
logger.info("==========================stop", getName());
}
}
- 我们在
configure
方法里面获取配置属性-batchsize。 - 我们上面说过,source 最后开始会被调用start 方法,我们在start 方法里面开启一个线程,实现循环产生消息,并隔 batchsize 个消息就推送到 channel 里面。
- 这样一个简单的生产 source 就完成了
- 如果想实现
PollableSource
类型的 source ,只是不需要自己开启线程罢了,其余都差不多,就是这么简单。
上面我们自定义了一个 source,事件是交给 flume 自带的 ChannelProcessor 自己处理的,下一节,我们来说说 ChannelProcessor 相关细节
写在忘记后
搞了半天忘记写部署自定义代码了。。。抱歉!!!
- 首先将代码打 jar 包
- 放到 FLUME_HOME 目录的 lib 文件夹下
- 以下是配置文件
- example.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.inveno.flume.SequenceSource
a1.sources.r1.batchSize = 5
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
启动脚本:bin/flume-ng agent --conf conf --conf-file ./my-conf/example.conf --name a1 -Dflume.root.logger=INFO,console
-
大功告成。。。
网友评论