一.概述:
这是一份我们机器使用的Flume的配置,今天我们以这份配置文件详细介绍下flume的使用方法。
# Name the components on this agent
a1.sources = r1 r2 r3 r4
a1.sinks = k1 k2 k3 k4 k5 k6 k7 k8
a1.channels = c1 c2 c3 c4
#ngChannel ngChannel2
# source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /data/logs/tomcat-8.0.11/flume/cms_api_8001.log
a1.sources.r1.channels = c1
a1.sources.r1.restartThrottle = 10
a1.sources.r1.restart = true
a1.sources.r2.type=exec
a1.sources.r2.command=tail -F /data/logs/tomcat-8.0.11/flume/cms_reyun_8001.log
a1.sources.r2.channels = c2
a1.sources.r2.restartThrottle = 10
a1.sources.r2.restart = true
a1.sources.r3.type=exec
a1.sources.r3.command=tail -F /data/logs/tomcat-8.0.11/flume/cms_api_8002.log
a1.sources.r3.channels = c3
a1.sources.r3.restartThrottle = 10
a1.sources.r3.restart = true
a1.sources.r4.type=exec
a1.sources.r4.command=tail -F /data/logs/tomcat-8.0.11/flume/cms_reyun_8002.log
a1.sources.r4.channels = c4
a1.sources.r4.restartThrottle = 10
a1.sources.r4.restart = true
#sinkgroups
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
failover(故障转移)、load_balance(负载均衡)
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
#k1如果没问题是不会往k2走的
a1.sinkgroups.g1.processor.maxpenalty = 10000
#k1如果失败,惩罚10s不发送数据
a1.sinkgroups = g2
a1.sinkgroups.g2.sinks = k3 k4
a1.sinkgroups.g2.processor.type = failover
a1.sinkgroups.g2.processor.priority.k3 = 10
a1.sinkgroups.g2.processor.priority.k4 = 5
a1.sinkgroups.g2.processor.maxpenalty = 10000
a1.sinkgroups = g3
a1.sinkgroups.g3.sinks = k5 k6
a1.sinkgroups.g3.processor.type = failover
a1.sinkgroups.g3.processor.priority.k5 = 10
a1.sinkgroups.g3.processor.priority.k6 = 5
a1.sinkgroups.g3.processor.maxpenalty = 10000
a1.sinkgroups = g4
a1.sinkgroups.g3.sinks = k7 k8
a1.sinkgroups.g3.processor.type = failover
a1.sinkgroups.g3.processor.priority.k7 = 10
a1.sinkgroups.g3.processor.priority.k8 = 5
a1.sinkgroups.g3.processor.maxpenalty = 10000
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flume-collect1
a1.sinks.k1.port = 41415
a1.sinks.k1.channel = c1
a1.sinks.k1.batch-size = 500
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flume-collect3
a1.sinks.k2.port = 41415
a1.sinks.k2.channel = c1
a1.sinks.k2.batch-size = 500
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = flume-collect2
a1.sinks.k3.port = 41415
a1.sinks.k3.channel = c2
a1.sinks.k3.batch-size = 500
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = flume-collect1
a1.sinks.k4.port = 41415
a1.sinks.k4.channel = c2
a1.sinks.k4.batch-size = 500
a1.sinks.k5.type = avro
a1.sinks.k5.hostname = flume-collect3
a1.sinks.k5.port = 41415
a1.sinks.k5.channel = c3
a1.sinks.k5.batch-size = 500
a1.sinks.k6.type = avro
a1.sinks.k6.hostname = flume-collect2
a1.sinks.k6.port = 41415
a1.sinks.k6.channel = c3
a1.sinks.k6.batch-size = 500
a1.sinks.k7.type = avro
a1.sinks.k7.hostname = flume-collect1
a1.sinks.k7.port = 41415
a1.sinks.k7.channel = c4
a1.sinks.k7.batch-size = 500
a1.sinks.k8.type = avro
a1.sinks.k8.hostname = flume-collect3
a1.sinks.k8.port = 41415
a1.sinks.k8.channel = c4
a1.sinks.k8.batch-size = 500
# channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/flume/checkpoint1
a1.channels.c1.dataDirs = /data/flume/data1
a1.channels.c1.capacity = 10240000
a1.channels.c1.transactionCapacity = 30000
a1.channels.c1.maxFileSize = 524288000
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /data/flume/checkpoint2
a1.channels.c2.dataDirs = /data/flume/data2
a1.channels.c2.capacity = 10240000
a1.channels.c2.transactionCapacity = 30000
a1.channels.c2.maxFileSize = 524288000
a1.channels.c3.type = file
a1.channels.c3.checkpointDir = /data/flume/checkpoint3
a1.channels.c3.dataDirs = /data/flume/data3
a1.channels.c3.capacity = 10240000
a1.channels.c3.transactionCapacity = 30000
a1.channels.c3.maxFileSize = 524288000
a1.channels.c4.type = file
a1.channels.c4.checkpointDir = /data/flume/checkpoint4
a1.channels.c4.dataDirs = /data/flume/data4
a1.channels.c4.capacity = 10240000
a1.channels.c4.transactionCapacity = 30000
a1.channels.c4.maxFileSize = 524288000
a1.channels.c0.type = file
a1.channels.c0.checkpointDir = /data/flume/checkpoint
a1.channels.c0.dataDirs = /data/flume/datas
这台机器上有两个节点,分别对应的端口是8001和8002,我们要上传的数据分两部分,一部分是普通的日志,存到【/data/logs/tomcat-8.0.11/flume/cms_api_8001.log】,另一部分是要上报给大数据平台的数据日志【/data/logs/tomcat-8.0.11/flume/cms_reyun_8001.log】。
该配置的结构图如下所示:
Image 3.png
Collect1-3是host文件里面配置的ip地址,因为我们有个日志平台,因此我们的日志都会通过网络的形式上传的日志平台。
我们一段段来说
1.source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /data/logs/tomcat-8.0.11/flume/cms_api_8001.log
a1.sources.r1.channels = c1
a1.sources.r1.restartThrottle = 10
a1.sources.r1.restart = true
a1的source的类型时exec,表示实时获取命令执行的结果,也就是机器实时打印出来的普通日志或大数据日志。restart表示如果命令进程死了,是否重启命令,restartThrottle表示重启之前的等待时间,单位是毫秒。
2.sinkgroups
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
failover(故障转移)、load_balance(负载均衡)
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
#k1如果没问题是不会往k2走的
a1.sinkgroups.g1.processor.maxpenalty = 10000
#k1如果失败,惩罚10s不发送数据
sinkgroups 一组的名字是g1,由k1和k2两个sink组成,其processor的类型是failover(故障转移),表明优先级高的sink如果挂了,用优先级低的sink去做,这里k1的优先级是10,k2的是5,正常情况k1不挂的情况下,k2是不会执行的。maxpenalty表示如果k1失败了,会惩罚10s不发送数据。
3.sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flume-collect1
a1.sinks.k1.port = 41415
a1.sinks.k1.channel = c1
a1.sinks.k1.batch-size = 500
k1的类型是avro,是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础,非常重要,但是需要多台机器。
最后流出到flume-collect1(对应一个ip)的41415端口去,batch-size表示每一批发送events的数量。
4.channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/flume/checkpoint1
a1.channels.c1.dataDirs = /data/flume/data1
a1.channels.c1.capacity = 10240000
a1.channels.c1.transactionCapacity = 30000
a1.channels.c1.maxFileSize = 524288000
channel的类型是File,表示events要封装到文件中(还有内存这种类型)。checkpointDir表示flume会读取源文件的时候记录读取的位置,dataDirs表示events存储的位置,要是把chechpointDir和dataDir删除了,flume会在原来的log文件中重新读取数据。capacity表示默认该通道中最大的可以存储的event数量,transactionCapacity表示从source过来或给到sink时传输的events数量, maxFileSize表示单一日志最大设置字节数。
后记:由于能力有限,若有错误或者不当之处,还请大家批评指正,一起学习交流!
网友评论