美文网首页
Flume学习二(数据采集场景模拟)

Flume学习二(数据采集场景模拟)

作者: 刘子栋 | 来源:发表于2018-08-07 12:24 被阅读0次

    1、单Agent模式

    场景说明:

    source采用netcat(可以直接通过Telnet命令做数据测试),channel统一采用memory,sink在这里采用HDFS sink

    配置(netcat-memory-hdfs.conf):

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 33333

    # Describe the sink

    a1.sinks.k1.type = hdfs

    a1.sinks.k1.hdfs.path = hdfs://192.168.205.131:9000/data/%Y%m%d%H%M

    a1.sinks.k1.hdfs.filePrefix = app_name

    a1.sinks.k1.hdfs.fileSuffix = .log

    a1.sinks.k1.hdfs.inUseSuffix = .tmp

    a1.sinks.k1.hdfs.rollInterval = 30

    a1.sinks.k1.hdfs.rollSize = 10485760

    a1.sinks.k1.hdfs.rollCount = 100000

    a1.sinks.k1.hdfs.round = true

    a1.sinks.k1.hdfs.roundValue = 10

    a1.sinks.k1.hdfs.roundUnit = minute

    a1.sinks.k1.hdfs.useLocalTimeStamp = true

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    启动:

    flume-ng agent \

    --name a1 \

    --conf $FLUME_HOME/conf \

    --conf-file /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/agents/netcat-memory-hdfs.conf \

    -Dflume.root.logger=INFO,console

    测试:

    telnet localhost 33333 ==>输入测试数据+Enter

    2、多Agent场景

    场景说明:

    多Agent串联工作,这里采用avro作为两个Agent之间的数据传输,foo的source采用netcat,两个Agent的channel都是memory,bar的sink采用的是logger(测试方便)

    配置1(netcat-memory-avro.conf):

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 33333

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = localhost

    a1.sinks.k1.port = 44444

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    配置2(avro-memory-logger.conf):

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 44444

    a1.sources.r1.ipFilter = true

    a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

    a1.sources.r1.interceptors = i1 i2 i3

    a1.sources.r1.interceptors.i1.type = timestamp

    a1.sources.r1.interceptors.i2.type = host

    a1.sources.r1.interceptors.i3.type = static

    a1.sources.r1.interceptors.i3.key = test_key

    a1.sources.r1.interceptors.i3.value = test_value

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    启动(其他启动逻辑一致,都是先启动消费者,再启动生产者):

    先启动消费者(即avro-memory-logger的Agent(bar))

    flume-ng agent \

    --name a1 \

    --conf $FLUME_HOME/conf \

    --conf-file /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/agents/avro-memory-logger.conf \

    -Dflume.root.logger=INFO,console

    再启动生产者(即netcat-memory-avro的agent(foo))

    flume-ng agent \

    --name a1 \

    --conf $FLUME_HOME/conf \

    --conf-file /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/agents/netcat-memory-avro.conf \

    -Dflume.root.logger=INFO,console

    测试:

    telnet  localhost 33333  ==>输入测试数据+Enter

    3、多Agent(多对一)

    场景说明:

    为了测试方便,这里采用2对1的模式,Agent1采用netcat-memory-avro模式,Agent2采用avro-memory-avro模式,Agent4采用avro-memory-logger模式

    配置Agent1:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 33333

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = localhost

    a1.sinks.k1.port = 44444

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    配置Agent2:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 44444

    a1.sources.r1.ipFilter = true

    a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

    a1.sources.r1.interceptors = i1 i2 i3

    a1.sources.r1.interceptors.i1.type = timestamp

    a1.sources.r1.interceptors.i2.type = host

    a1.sources.r1.interceptors.i3.type = static

    a1.sources.r1.interceptors.i3.key = test_key

    a1.sources.r1.interceptors.i3.value = test_value

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = localhost

    a1.sinks.k1.port = 22222

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    配置Agent4:

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 44444

    a1.sources.r1.ipFilter = true

    a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

    a1.sources.r1.interceptors = i1 i2 i3

    a1.sources.r1.interceptors.i1.type = timestamp

    a1.sources.r1.interceptors.i2.type = host

    a1.sources.r1.interceptors.i3.type = static

    a1.sources.r1.interceptors.i3.key = test_key

    a1.sources.r1.interceptors.i3.value = test_value

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    测试:

    telnet  localhost  33333==>输入测试数据+Enter

    avro测试:

    4、一对多场景

    场景说明:

    为了方便演示,我才用1对2模式,source为avro,Log4jAppender采用不同的logger对象轮流发送数据,然后测试channel selector的作用。channel都是memory,sink1是logger,sink3是avro,对应另一个Agent。

    配置(avro-memorys-logger_avro):

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1 c2

    # Describe/configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 22222

    a1.sources.r1.selector.type = multiplexing

    a1.sources.r1.selector.header = flume.client.log4j.logger.name

    a1.sources.r1.selector.mapping.logger-1 = c1

    a1.sources.r1.selector.mapping.logger-0 = c2

    # Describe the sink

    a1.sinks.k1.type = logger

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = localhost

    a1.sinks.k2.port = 44444

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    a1.channels.c2.type = memory

    a1.channels.c2.capacity = 1000

    a1.channels.c2.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2

    配置(avro-memory-logger):

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 44444

    a1.sources.r1.ipFilter = true

    a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

    a1.sources.r1.interceptors = i1 i2 i3

    a1.sources.r1.interceptors.i1.type = timestamp

    a1.sources.r1.interceptors.i2.type = host

    a1.sources.r1.interceptors.i3.type = static

    a1.sources.r1.interceptors.i3.key = test_key

    a1.sources.r1.interceptors.i3.value = test_value

    # Describe the sink

    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    测试:

    相关文章

      网友评论

          本文标题:Flume学习二(数据采集场景模拟)

          本文链接:https://www.haomeiwen.com/subject/doxnvftx.html