一、安装netcat工具
1、sudo yum install -y nc
2、检查44444端口是否被占用: sudo netstat -tunlp | grep 44444
3、操作命令
1)监听: nc -l 端口号
2)发送: nc 主机名(ip地址) 端口号
二、安装flume包
1、解压缩1.9.0安装包
2、配置/etc/profile下flume环境变量
三、命令
1、将日志文件打印至控制台
flume-ng --name a1 --conf conf/ --conf-file datas/netcatsource_loggersink.conf -Dflume.root.logger=INFO,console
2、flume-ng -n a1 -c conf/ -f-file datas/netcatsource_loggersink.conf
四、配置范例
区分: exec source适用于实时追加数据,但不支持断点续传
spooldir source适用于新添加文件,但不适用于实时追加数据
taildir source实时追加数据,也支持断点续传
1、exec范例
#1.自定义agent名称、source、channel、sink组件
a2.sources = r2
a2.channels = c2
a2.sinks = k2
#2.设置source类型和配置
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/flume-1.9.0/demo/123.log
#3.设置channel类型和配置
a2.channels.c2.type = memory
a2.channels.c2.cappacity = 200
#4.设置sink的类型和配置
a2.sinks.k2.type = hdfs
#如果要使用时间转义序列,需满足两要求1、使用本地时间戳 2、在event中headers必须使用时间戳
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#积攒多少个Event才flush到hdfs一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件(秒)
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与event数量无关
a2.sinks.k2.hdfs.rollCount = 0
2、监控目录范例,
a2.sources = r2
a2.channels = c2
a2.sinks = k2
#用来监听一个目录进行自动收集目录的内容
#1、当目录中某个文件被读取完毕后,该文件两种处理方式:1)删除 2)更改扩展名
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume-1.9.0/upload
a2.sources.r2.fileSuffix = .COMPLETED
#3.设置channel类型和配置
a2.channels.c2.type = memory
a2.channels.c2.cappacity = 200
#4.设置sink的类型和配置
a2.sinks.k2.type = hdfs
#如果要使用时间转义序列,需满足两要求1、使用本地时间戳 2、在event中headers必须使用时间戳
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#积攒多少个Event才flush到hdfs一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与event数量无关
a2.sinks.k2.hdfs.rollCount = 0
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、taildir范例
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_poition.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/demo/123.log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、复制范例
1、agent1(hadoop102):
#a1:agent的名字
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#声明source具体类型和对应的配置属性
a1.sources.r1.type = exec
#a1.sources.r1.bind = hadoop102
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log
a1.sources.r1.selector.type = replicating
#声明channel具体的类型和对应配置属性
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c1.capacity = 100
a1.channels.c2.capacity = 100
#a1.channels.c1.keep-alive = 3
#声明sink具体的类型和对应配置属性
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k2.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444
#声明source、sink和channel之间的关系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2、agent2(hadoop103):
#a1:agent的名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#声明source具体类型和对应的配置属性
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 33333
#声明channel具体的类型和对应配置属性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
#声明sink具体的类型和对应配置属性
a1.sinks.k1.type = logger
#声明source、sink和channel之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、agent3(hadoop104):
#a1:agent的名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#声明source具体类型和对应的配置属性
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 44444
#声明channel具体的类型和对应配置属性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
#声明sink具体的类型和对应配置属性
#将event数据存储在本地磁盘
a1.sinks.k1.type = file_roll
#配置存储event目录
a1.sinks.k1.sink_directory = /opt/module/flume-1.9.0/demo
#声明source、sink和channel之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
六、复用范例
1、agent1(hadoop102):
#a1:agent的名字
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#声明source具体类型和对应的配置属性
a1.sources.r1.type = exec
#a1.sources.r1.bind = hadoop102
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log
#a1.sources.r1.selector.type = replicating
#复用
a1.sources.r1.selector.type = multiplexing
#state指的是headers中key的值
a1.sources.r1.selector.header = state
#CZ指的是headers中value的值
a1.sources.r1.selector.mapping.CZ = c1
#US指的是headers中value的值
a1.sources.r1.selector.mapping.US = c2
#拦截器:给event中headers添加数据
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = CZ
#声明channel具体的类型和对应配置属性
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c1.capacity = 100
a1.channels.c2.capacity = 100
#a1.channels.c1.keep-alive = 3
#声明sink具体的类型和对应配置属性
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k2.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444
#声明source、sink和channel之间的关系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2、agent2、agent3同复制范例
七、故障转移范例
1、agent1
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#声明source具体类型和对应的配置属性
a1.sources.r1.type = netcat
#a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 22222
#声明channel具体的类型和对应配置属性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
#故障转移配置sinkgroups
a1.sinkgroups = g1
a1.singgroups.g1.sinks = k1 k2
a1.singgroups.g1.processor.type = failover
a1.singgroups.g1.processor.priority.k1 = 5
a1.singgroups.g1.processor.priority.k2 = 10
#负载均衡配置sinkgroups
#a1.sinkgroups=g1
#a1.sinkgroups.g1.sinks=k1k2
#a1.sinkgroups.g1.processor.type=load_balance
#a1.sinkgroups.g1.processor.backoff=true
#a1.sinkgroups.g1.processor.selector=random
#声明sink具体的类型和对应配置属性
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k2.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444
#声明source、sink和channel之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
2、agent2、agent3同复制范例
网友评论