美文网首页
Flume配置案例:常用配置

Flume配置案例:常用配置

作者: 水他 | 来源:发表于2016-08-26 11:01 被阅读727次
  1. 常用的source
    1.1 nettcat
    1.2 Avro Source
    1.3 Exec Source
    1.4 spool Source
    1.5 HTTP source
  2. 常用的sink
    2.1 HDFS Sink
    2.2 Avro Sink
  3. Channel Selector
    3.1 Replicating Channel Selector
    3.2 Multiplexing Channel Selector
  4. Sink Processor
    4.1 Failover Sink Processor
    4.2 Load balancing Sink Processor
  5. Interceptor
    5.1 Timestamp Interceptor
    5.2 static Interceptor

常用的source

netcat

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.s1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1

start

flume-ng agent -n a1 -c conf -f xxx.conf

test

telnet 127.0.0.1 44444
hello world!

Avro Source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =192.168.137.2
a1.sources.r1.port = 44444

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

start

flume-ng avro-client -c . -H localhost -p 44444 -F ~/bbb

test

avro-client发送文件:
echo "aaa" > ~/bbb

Exec Source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail –f aaa

a1.sources.r1.channels = c1

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

echo "exec test" >> aaa

spool Source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = aaa
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

HTTP source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = http # org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

常用的sink

HDFS Sink

config

a1.sources = r1
a1.sinks = s1

a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.port = 44444

a1.sources.r1.channels = c1

a1.sinks.s1.type = hdfs
a1.sinks.s1.channel = c1
a1.sinks.s1.hdfs.path = hdfs://master:9000/testtttt
a1.sinks.s1.hdfs.filePrefix = Syslog
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundValue = 10
a1.sinks.s1.hdfs.roundUnit = minute

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Avro Sink

config 第一个配置

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =192.168.137.2
a1.sources.r1.port = 44445

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

config 第二个配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = http
a2.sources.r1.port = 44444
a2.sources.r1.channels = c1

a2.sinks.s1.type = avro
a2.sinks.s1.channel = c1
a2.sinks.s1.hostname =192.168.137.2
a2.sinks.s1.port = 44445

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Channel Selector

Replicating Channel Selector

config 第一个配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

config 第二个配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三个配置

a3.sources = r1
a3.sinks = s1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Multiplexing Channel Selector

config 第一个配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.header = aaa
a1.sources.r1.selector.mapping.bbb = c1
a1.sources.r1.selector.mapping.bbb1 = c2
a1.sources.r1.selector.default = c1

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

config 第二个配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三个配置

a3.sources = r1
a3.sinks = s1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

curl -X POST -d '[{ "headers" :{"aaa" : "bbb1","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

curl -X POST -d '[{ "headers" :{"aaa" : "bbb2","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Sink Processor

Failover Sink Processor

config 第一个配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.s1 = 5
a1.sinkgroups.g1.processor.priority.s2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

config第二个配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三个配置

a3.sources = r1
a3.sinks = s1

a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Load balancing Sink Processor

config 第一个配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c1
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

config 第二个配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三个配置

a3.sources = r1
a3.sinks = s1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Interceptor

Timestamp Interceptor

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

static Interceptor

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = eee
a1.sources.r1.interceptors.i1.value = fff

a1.sinks.s1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

相关文章

  • Flume配置案例:常用配置

    常用的source1.1 nettcat1.2 Avro Source1.3 Exec Source1.4 spo...

  • flume应用案例

    flume应用案例 1 flume用法 flume的使用非常简单,只需书写一个配置文件,在配置文件中描述sourc...

  • flume实战1

    Flume实战: Flume的使用关键就是配置文件 A)配置source B)配置Channel C)配置SInk...

  • Flume 实战

    概述 Flume官网配置文档 使用Flume的关键就是写配置文件A) 配置SourceB) 配置ChannelC)...

  • Flume

    安装Flume 1.6.0-cdh5.7.0 配置环境变量 配置Flume JAVA_HOME 配置 conf 启...

  • Flume连接HDFS和Hive

    Flume连接HDFS 进入Flume配置 配置flume.conf 测试telnet通信 查看日志找到HDFS文...

  • 尚硅谷大数据技术之Flume

    4.执行配置文件分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat...

  • kerberos环境下flume监控目录写kafka配置

    ------------------flume的kerberos认证配置配置------------- 1、在fl...

  • Flume案例实战

    案例一:监听某一个端口的数据 (1)Flume主要工作为写配置文件,具体可查阅官网:http://flume.ap...

  • flume初次體驗

    一.flume安裝及配置 安裝flume上传解压 2.修改配置文件 3.修改flume.conf文件 修改內容 啓...

网友评论

      本文标题:Flume配置案例:常用配置

      本文链接:https://www.haomeiwen.com/subject/batzsttx.html