1、按分区收集日志
技术选型:exec --- memory --- hdfs
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:9000/data/flume/page_views/%Y%m%d%H%M(按一分钟一个文件夹)
a1.sinks.k1.hdfs.filePrefix = page_views
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.hdfs.batchSize =10
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute (round 这三个参数配合这用)
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel =c1
2、如何构建多个agent的flow
多个agent顺序连接第一个agent的输出当做下一个agent的输入
多个agent数据收集后汇总到一个agent在输出到HDFS生产中使用较多的是这种模式
复用流3、在多个agent中Flume的工作流程
client ----------------> source ------------------> channel -------------------> sink
Interceptors Channel Selectors Sink Processors
4、Channel Selectors
通过设置属性,选择使用那个channel
如果不设置,则默认选择Replicating Channel Selector
1.Replicating Channel Selector
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3
2.Multiplexing Channel Selector
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
5、Sink Processors
SinkGroup允许组织多个sink到一个实体上。SinkProcessors 能够提供在组内所有sink之间实现负载均衡的能力。而且在失败的情况下能够进行故障转移。
sinks:空格分隔多个sink
processor.type:default / failover / load_balance
default:默认是单个agent
load_balance(负载均衡)
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
failover:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
数字越大优先级越高,优先走K2,而K1为空,当K2挂了才会走K1
6、AVRO
工作中如果有多个agent进行串联并联 一定要用avro sink 和avro source
需求:2个机器
agent1:exec - memory - avro sink avro-sink.conf
avro-sink-agent.sources = exec-source
avro-sink-agent.sinks = avro-sink
avro-sink-agent.channels = avro-memory-channel
avro-sink-agent.sources.exec-source.type = exec
avro-sink-agent.sources.exec-source.command = tail -F /home/hadoop/data/avro_access.log
avro-sink-agent.channels.avro-memory-channel.type = memory
avro-sink-agent.sinks.avro-sink.type = avro
avro-sink-agent.sinks.avro-sink.hostname = 0.0.0.0
avro-sink-agent.sinks.avro-sink.port = 44444
avro-sink-agent.sources.exec-source.channels = avro-memory-channel
avro-sink-agent.sinks.avro-sink.channel = avro-memory-channel
agent2:avro source - memory -logger avro-source.conf
avro-source-agent.sources = avro-source
avro-source-agent.sinks = logger-sink
avro-source-agent.channels = avro-memory-channel
avro-source-agent.sources.avro-source.type = avro
avro-source-agent.sources.avro-source.bind = 0.0.0.0
avro-source-agent.sources.avro-source.port = 44444
avro-source-agent.channels.avro-memory-channel = memory
avro-source-agent.sinks.logger-sink.type = logger
avro-source-agent.sources.avro-source.channels = avro-memory-channel
avro-source-agent.sinks.logger-sink.channel = avro-memory-channel
avro-sink的hostname以及ip是要和avro-source的相对应,启动是需要先启动source,再启动sink
flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-sink.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
————————————————————————————————————
flume-ng agent \
--name avro-source-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-source.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
网友评论