美文网首页
大数据之flume source、interceptor、sin

大数据之flume source、interceptor、sin

作者: 枫叶无言_1997 | 来源:发表于2021-01-06 14:07 被阅读0次

    依赖:

    <groupId>org.apache.flume

    <artifactId>flume-ng-core

    <version>1.9.0</version>

    一、interceptor自定义 
    public class MyInterceptorimplements Interceptor {

    public void initialize() {

    }

    public Eventintercept(Event event) {

    byte[] body = event.getBody();

          if ((body[0] >='A' && body[0] <='Z') || (body[0] >='a' && body[0] <='z')) {

    event.getHeaders().put("type","letter");

          }else if (body[0] >='0' && body[0] <='9') {

    event.getHeaders().put("type","number");

          }

    return event;

        }

    public Listintercept(List list) {

    for (Event e : list

    ) {

    intercept(e);

            }

    return list;

        }

    public void close() {

    }

    /*

    静态内部类

    */

        public static class MyBuilderimplements Interceptor.Builder {

    public Interceptorbuild() {

    return new MyInterceptor();

            }

    public void configure(Context context) {

    }

    }

    二、source自定义 
    /*

    自定义source

    使用flume接收数据,并给每条数据添加前缀,输出控制台,前缀可从flume配置

    */

    public class MySourceextends AbstractSourceimplements Configurable, PollableSource {

    private StringpreString;

        public Statusprocess()throws EventDeliveryException {

    SimpleEvent event =new SimpleEvent();

            //给event设置数据

            event.setBody((preString+"hello").getBytes());

            //将数据放入channel中

            ChannelProcessor processor = getChannelProcessor();

            processor.processEvent(event);

    return null;

        }

    public long getBackOffSleepIncrement() {

    return 0;

        }

    public long getMaxBackOffSleepInterval() {

    return 0;

        }

    public void configure(Context context) {

    //初始化context,从配置文件读取数据

          preString = context.getString("prefix","test");

        }

    }

    三、sink自定义 
    public class MySinkextends AbstractSinkimplements Configurable {

    private Stringsuffix;

        Loggerlogger = LoggerFactory.getLogger(MySink.class);

        public Statusprocess()throws EventDeliveryException {

    Status status = Status.READY;

            Channel channel = getChannel();

            Transaction transaction = channel.getTransaction();

            try {

    transaction.begin();

                Event event = channel.take();

                byte[] body = event.getBody();

                logger.info(Arrays.toString(body) +suffix);

                //事务提交

                transaction.commit();

            }catch (Exception e) {

    status = Status.BACKOFF;

                //事务回滚

                transaction.rollback();

            }finally {

    //事务关闭

                transaction.close();

            }

    return status;

        }

    public void configure(Context context) {

    suffix = context.getString("suffix","test");

        }

    }

    相关文章

      网友评论

          本文标题:大数据之flume source、interceptor、sin

          本文链接:https://www.haomeiwen.com/subject/fmvnoktx.html