美文网首页
flume 中ETL拦截器、日志类型区分拦截器的实现方法

flume 中ETL拦截器、日志类型区分拦截器的实现方法

作者: 大道至简_6a43 | 来源:发表于2019-11-29 17:06 被阅读0次

    package ****.****.flume.interceptor;

    import org.apache.flume.Context;

    import org.apache.flume.Event;

    import org.apache.flume.interceptor.Interceptor;

    import java.nio.charset.Charset;

    import java.util.ArrayList;

    import java.util.List;

    import java.util.Map;

    public class LogTypeInterceptorimplements Interceptor {//实现该类实现其方法

    @Override

        public void initialize() {

    }

    @Override

        public Event intercept(Event event) {//处理单个event

    byte[] body = event.getBody();

    String log =new String(body, Charset.forName("UTF-8"));

    Map headers = event.getHeaders();

    if(log.contains("start")){

    headers.put("topic","topic_start");

    }else {

    headers.put("topic","topic_event");

    }

    return event;

    }

    @Override

        public List intercept(List events) {//遍历多个event数组,调用上面的Interceptor方法

    ArrayList interceptors =new ArrayList<>();

    for (Event event : events) {

    Event intercept1 = intercept(event);

    interceptors.add(event);

    }

    return interceptors;

    }

    @Override

        public void close() {

    }

    public static class Builderimplements Interceptor.Builder{//创建一个静态类 返回一个LogTypeInterceptor对象

    @Override

            public Interceptor build() {

    return new LogTypeInterceptor();

    }

    @Override

            public void configure(Context context) {

    }

    }

    }

    -----------------------------------------------------------------------------------------------

    package ***.******.flume.interceptor;

    import org.apache.flume.Context;

    import org.apache.flume.Event;

    import org.apache.flume.interceptor.Interceptor;

    import java.nio.charset.Charset;

    import java.util.ArrayList;

    import java.util.List;

    public class LogETLInterceptorimplements Interceptor{

    @Override

        public void initialize() {

    }

    @Override

        public Event intercept(Event event) {

    //获取数据

            byte[] body = event.getBody();

    String log =new String(body, Charset.forName("UTF-8"));

    //校验  启动日志(json)  事件日志(服务器时间|json)

            if(log.contains("start")){

    //校验启动日志

                if (LogUtils.vaLuateStart(log)){

    return event;

    }

    }else{

    if(LogUtils.vaLuateEvent(log)){

    return event;

    }

    }

    return null;

    }

    @Override

        public List intercept(List events) {

    ArrayList interceptors =new ArrayList<>();

    for (Event event : events) {

    Event intercept1 = intercept(event);

    if(intercept1!=null){

    interceptors.add(intercept1);

    }

    }

    return interceptors;

    }

    @Override

        public void close() {

    }

    public static class Builderimplements Interceptor.Builder{

    @Override

            public Interceptor build() {

    return new LogETLInterceptor();

    }

    @Override

            public void configure(Context context) {

    }

    }

    }

    --------------------------------------------------------------------------------------------------

    package ***.******.flume.interceptor;

    import org.apache.commons.lang.math.NumberUtils;

    public class LogUtils {

    //该类的功能  主要是判断日志格式是否完整 是否符合对应日志应有的格式  从而进行过滤

        public static boolean vaLuateStart(String log) {

    // json

            if(log==null){

    return false;

    }

    if(!log.trim().startsWith("{") || !log.trim().endsWith("}")){

    return false;

    }

    return true;

    }

    public static boolean vaLuateEvent(String log) {

    // 服务器时间 | json

            if(log==null){

    return false;

    }

    String[] logContents = log.split("\\|");

    if(logContents.length!=2){

    return false;

    }

    if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){

    return false;

    }

    if(!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){

    return false;

    }

    return true;

    }

    }

    相关文章

      网友评论

          本文标题:flume 中ETL拦截器、日志类型区分拦截器的实现方法

          本文链接:https://www.haomeiwen.com/subject/oukgwctx.html