美文网首页
自定义flume拦截器-练习1

自定义flume拦截器-练习1

作者: 夜希辰 | 来源:发表于2021-11-27 16:51 被阅读0次

    参考文章1:Flume 自定义 Interceptor(拦截器)
    参考文章2:java静态内部类和非静态内部类对外部类属性的使用
    问题1:flume自定义拦截器时,为什么要分单event处理,和多个event处理
    问题2:静态内部类,创建外部类对象并访问外部类对象
    问题3:avro source 、avro sink 定义
    问题4:avro source 、avro sink 采集通道还有问题,数据传输不过去

    一、flume拦截器介绍

    拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。

    本篇文章主要讲解自定义连接器。flume内置连接器,可参考该文章

    二、自定义连接器

    需求:在bigdata02机器上,监听44444端口。将包含hello的数据发送到bigdata03机器控制台,将不包含hello的数据发送到bigdata03机器控制台。

    步骤:
    1、自定义flume拦截器
    2、在bigdata02、bigdata03、bigdata04服务器上编写conf配置文件。flume2.conf 、flume3.conf、flume4.conf
    3、测试

    1、自定义flume拦截器

    1)引入 pom 依赖
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.ydm</groupId>
        <artifactId>flumeinterceptor1127</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <!--包名,一般是域名的反写-->
                <groupId>org.apache.flume</groupId>
                <!--项目名-->
                <artifactId>flume-ng-core</artifactId>
                <!--所需要的jar的版本-->
                <version>1.7.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    
    </project>
    
    2)编写拦截器类
    package com.atguigu.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 定义flume连接器
     * 1、实现org.apache.flume.interceptor.Interceptor 接口,需要重写接口中的方法
     *  接口比如电脑的接口,插座的接口,虽然是不同厂家生产的,但是我们都可以用。
     *  接口是一个公共的规范,只要符合规范,大家都可以使用,java中的接口更多体现在对行为的抽象。
     */
    public class TypeInterceptor implements Interceptor {
        //声明一个存放事件的集合
        private List<Event> addHeaderEvents;
    
        @Override
        public void initialize() {
            //初始化
            addHeaderEvents = new ArrayList<>();
    
        }
        //单个事件拦截
        @Override
        public Event intercept(Event event) {
            //1.获取事件中的投信息
            Map<String, String> headers = event.getHeaders();
    
            //2.获取事件中的body 信息
    //        event.getBody(); 返回字节数组,需要将数组转化为字符串
            String body = new String(event.getBody());
    
            //3.根据body中是否含有hello单词来决定添加怎样的头信息
            if(body.contains("hello")){
                //4.添加头信息
                headers.put("type","hello");
            }else {
                //4.添加头信息
                headers.put("type","unhello");
            }
            return  event;
        }
        //批量事件拦截
        @Override
        public List<Event> intercept(List<Event> events) {
            //1.清空集合
            addHeaderEvents.clear();
    
            //2.遍历events
            for(Event event: events){
                //3.给每一个事件添加头信息
                addHeaderEvents.add(intercept(event));
    
            }
            //4.返回结果
            return addHeaderEvents;
        }
    
        @Override
        public void close() {
    
        }
    
        /**
         * 定义一个静态内部类
         * 内部类:就是在一个类中定义一个类。举例:在一个类A的内部定义一个类B,类B就称为内部类
         * EG:生活中,在笔记本内部有CPU,笔记本可以看成外部类,CPU可以看成内部类
         *
         * 静态内部类
         * 1.只可以访问外部类的静态属性,包括静态私有属性
         * 2.不可以访问外部类的非静态属性,包括私有属性。但可以通过new 外部类().成员的方式访问
         *
         * 使用内部类最吸引人的原因是:每个内部类都能独立地继承一个(接口的)实现
         *
         */
        public static class  Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new TypeInterceptor();
            }
            //配置信息
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    

    2、在bigdata02、bigdata03、bigdata04服务器上编写conf配置文件

    1) 在bigdata02服务器上,cd /usr/flume/conf 在conf目录下新建flume2.conf文件
    #name
    a2.sources = r1
    a2.channels = c1 c2
    a2.sinks = k1 k2
    
    #source
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = localhost
    a2.sources.r1.port = 44444
    
    #Interceptor
    a1.sources.r1.interceptors = i1
    #全类名
    a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
    
    #channel selector
    a2.sources.r1.selector.type = multiplexing
    a2.sources.r1.selector.header = type
    a2.sources.r1.selector.mapping.hello = c1
    a2.sources.r1.selector.mapping.unhello = c2
    
    #channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    #sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop103
    a2.sinks.k1.port = 4142
    
    a2.sinks.k2.type = avro
    a2.sinks.k2.hostname = hadoop104
    a2.sinks.k2.port = 4142
    
    #bind
    a2.sources.r1.channels = c1 c2
    a2.sources.k1.channel = c1
    a2.sources.k2.channel = c2
    
    2) 在bigdata03服务器上,cd /usr/flume/conf 在conf目录下新建flume3.conf文件
    #name
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1
    
    #source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = bigdata03
    a3.sources.r1.prot = 4142
    
    #channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    #sink
    a3.sinks.k1.type = logger
    
    #Bind
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    3) 在bigdata03服务器上,cd /usr/flume/conf 在conf目录下新建flume4.conf文件
    #name
    a4.sources = r1
    a4.channels = c1
    a4.sinks = k1
    
    #source
    a4.sources.r1.type = avro
    a4.sources.r1.bind = bigdata04
    a4.sources.r1.prot = 4142
    
    #channel
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 1000
    a4.channels.c1.transactionCapacity = 100
    
    #sink
    a4.sinks.k1.type = logger
    
    #Bind
    a4.sources.r1.channels = c1
    a4.sinks.k1.channel = c1
    

    3、测试

    flume3 和 flume4 需要先启动,flume2 需要连接flume3 和 flume4,若先启动 flume2 会报连接不上(也可以无视错误日志,先启动)

    cd /opt/apache-flume-1.7.0-bin 
    
    bin/flume-ng agent --conf conf/ --name a3 --conf-file /tmp/flume-job/interceptor/flume3 -Dflume.root.logger=INFO,console
    bin/flume-ng agent --conf conf/ --name a2 --conf-file /tmp/flume-job/interceptor/flume2 -Dflume.root.logger=INFO,console
    bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/interceptor/flume1 -Dflume.root.logger=INFO,console
    

    先启动bigdata03\bigdata04 的配置文件

    相关文章

      网友评论

          本文标题:自定义flume拦截器-练习1

          本文链接:https://www.haomeiwen.com/subject/zazfxrtx.html