美文网首页
flume安装与使用

flume安装与使用

作者: 紫玥迩 | 来源:发表于2016-07-06 11:55 被阅读301次

    简介

    安装

    java开发

    依赖

    <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.5.2</version>
    </dependency>
    <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.5.2</version>
    </dependency>
    

    说明

    192.168.1.100是hadoop主服务器
    192.168.60.8是安装flume的服务器
    

    配置文件
    example.conf(192.168.60.8)

    agent1.channels = c1
    agent1.sources = r1
    agent1.sinks = sink1
    
    agent1.channels.c1.type = memory
    
    agent1.sources.r1.channels = c1
    agent1.sources.r1.type = avro
    agent1.sources.r1.bind = 0.0.0.0
    agent1.sources.r1.port = 41414
    
    agent1.sinks.sink1.type=hdfs
    agent1.sinks.sink1.hdfs.path=hdfs://192.168.1.100:50040/flume
    agent1.sinks.sink1.hdfs.fileType=DataStream
    agent1.sinks.sink1.hdfs.writeFormat=TEXT
    agent1.sinks.sink1.hdfs.rollInterval=4
    agent1.sinks.sink1.channel=c1
    

    运行配置

    ./bin/flume-ng agent -n agent1 -c conf -f example.conf -Dflume.root.logger=DEBUG,console
    

    java程序

    import java.nio.charset.Charset;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    
    public class MyApp {
          public static void main(String[] args) {
            MyRpcClientFacade client = new MyRpcClientFacade();
            //配置文件机器地址、端口
            client.init("192.168.60.8", 41414);
            String sampleData = "Hello Flume!";
            for (int i = 0; i < 10; i++) {
              client.sendDataToFlume(sampleData);
            }
            client.cleanUp();
          }
    }
    
        class MyRpcClientFacade {
          private RpcClient client;
          private String hostname;
          private int port;
    
          public void init(String hostname, int port) {
            this.hostname = hostname;
            this.port = port;
            this.client = RpcClientFactory.getDefaultInstance(hostname, port);
          }
    
          public void sendDataToFlume(String data) {
            Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
            try {
              client.append(event);
            } catch (EventDeliveryException e) {
              client.close();
              client = null;
              client = RpcClientFactory.getDefaultInstance(hostname, port);
            }
          }
    
          public void cleanUp() {
            client.close();
            System.out.println("程序执行完毕。。。");
          }
    
        }
    

    查看结果(192.168.1.100)

    /opt/hadoop/bin/hadoop fs -ls /flume
    
    Paste_Image.png
    /opt/hadoop/bin/hadoop fs -cat /flume/FlumeData.1467803628159
    
    Paste_Image.png

    参考文章

    flume开发详解
    开发者API

    相关文章

      网友评论

          本文标题:flume安装与使用

          本文链接:https://www.haomeiwen.com/subject/yknhjttx.html