美文网首页
大数据之flume

大数据之flume

作者: 枫叶无言_1997 | 来源:发表于2021-01-05 15:46 被阅读0次

    一、安装netcat工具 
    1、sudo yum install -y nc 
    2、检查44444端口是否被占用: sudo netstat -tunlp | grep 44444   
    3、操作命令 
    1)监听: nc -l 端口号 
    2)发送: nc 主机名(ip地址) 端口号

    二、安装flume包 
    1、解压缩1.9.0安装包 
    2、配置/etc/profile下flume环境变量 

    三、命令 
    1、将日志文件打印至控制台
    flume-ng --name a1 --conf conf/ --conf-file datas/netcatsource_loggersink.conf -Dflume.root.logger=INFO,console 
    2、flume-ng -n a1 -c conf/ -f-file datas/netcatsource_loggersink.conf 

    四、配置范例     
    区分:  exec source适用于实时追加数据,但不支持断点续传  
    spooldir source适用于新添加文件,但不适用于实时追加数据 
    taildir source实时追加数据,也支持断点续传

    1、exec范例

    #1.自定义agent名称、source、channel、sink组件

    a2.sources = r2

    a2.channels = c2

    a2.sinks = k2

    #2.设置source类型和配置

    a2.sources.r2.type = exec

    a2.sources.r2.command = tail -F /opt/module/flume-1.9.0/demo/123.log

    #3.设置channel类型和配置

    a2.channels.c2.type = memory

    a2.channels.c2.cappacity = 200

    #4.设置sink的类型和配置

    a2.sinks.k2.type = hdfs

    #如果要使用时间转义序列,需满足两要求1、使用本地时间戳 2、在event中headers必须使用时间戳

    a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H

    a2.sinks.k2.hdfs.useLocalTimeStamp = true

    #上传文件的前缀

    a2.sinks.k2.hdfs.filePrefix = logs-

    #是否按照时间滚动文件夹

    a2.sinks.k2.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a2.sinks.k2.hdfs.roundValue = 1

    #重新定义时间单位

    a2.sinks.k2.hdfs.roundUnit = hour

    #积攒多少个Event才flush到hdfs一次

    a2.sinks.k2.hdfs.batchSize = 100

    #设置文件类型,支持压缩

    a2.sinks.k2.hdfs.fileType = DataStream

    #多久生成一个新的文件(秒)

    a2.sinks.k2.hdfs.rollInterval = 60

    #设置每个文件的滚动大小

    a2.sinks.k2.hdfs.rollSize = 134217700

    #文件的滚动与event数量无关

    a2.sinks.k2.hdfs.rollCount = 0

    2、监控目录范例,

    a2.sources = r2

    a2.channels = c2

    a2.sinks = k2

    #用来监听一个目录进行自动收集目录的内容

    #1、当目录中某个文件被读取完毕后,该文件两种处理方式:1)删除 2)更改扩展名

    a2.sources.r2.type = spooldir

    a2.sources.r2.spoolDir = /opt/module/flume-1.9.0/upload

    a2.sources.r2.fileSuffix = .COMPLETED

    #3.设置channel类型和配置

    a2.channels.c2.type = memory

    a2.channels.c2.cappacity = 200

    #4.设置sink的类型和配置

    a2.sinks.k2.type = hdfs

    #如果要使用时间转义序列,需满足两要求1、使用本地时间戳 2、在event中headers必须使用时间戳

    a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H

    a2.sinks.k2.hdfs.useLocalTimeStamp = true

    #上传文件的前缀

    a2.sinks.k2.hdfs.filePrefix = logs-

    #是否按照时间滚动文件夹

    a2.sinks.k2.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a2.sinks.k2.hdfs.roundValue = 1

    #重新定义时间单位

    a2.sinks.k2.hdfs.roundUnit = hour

    #积攒多少个Event才flush到hdfs一次

    a2.sinks.k2.hdfs.batchSize = 100

    #设置文件类型,支持压缩

    a2.sinks.k2.hdfs.fileType = DataStream

    #多久生成一个新的文件

    a2.sinks.k2.hdfs.rollInterval = 60

    #设置每个文件的滚动大小

    a2.sinks.k2.hdfs.rollSize = 134217700

    #文件的滚动与event数量无关

    a2.sinks.k2.hdfs.rollCount = 0

    a2.sources.r2.channels = c2

    a2.sinks.k2.channel = c2

    3、taildir范例

    a1.sources = r1

    a1.channels = c1

    a1.sinks = k1

    a1.sources.r1.type = TAILDIR

    a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_poition.json

    a1.sources.r1.filegroups = f1

    a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/demo/123.log

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.sinks.k1.type = logger

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    五、复制范例

    1、agent1(hadoop102): 
    #a1:agent的名字

    a1.sources = r1

    a1.channels = c1 c2

    a1.sinks = k1 k2

    #声明source具体类型和对应的配置属性

    a1.sources.r1.type = exec

    #a1.sources.r1.bind = hadoop102

    a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log

    a1.sources.r1.selector.type = replicating

    #声明channel具体的类型和对应配置属性

    a1.channels.c1.type = memory

    a1.channels.c2.type = memory

    a1.channels.c1.capacity = 100

    a1.channels.c2.capacity = 100

    #a1.channels.c1.keep-alive = 3

    #声明sink具体的类型和对应配置属性

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop103

    a1.sinks.k2.port = 33333

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = hadoop104

    a1.sinks.k2.port = 44444

    #声明source、sink和channel之间的关系

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2

    2、agent2(hadoop103): 
    #a1:agent的名字

    a1.sources = r1

    a1.channels = c1

    a1.sinks = k1

    #声明source具体类型和对应的配置属性

    a1.sources.r1.type = avro

    a1.sources.r1.bind = hadoop103

    a1.sources.r1.port = 33333

    #声明channel具体的类型和对应配置属性

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 100

    #声明sink具体的类型和对应配置属性

    a1.sinks.k1.type = logger

    #声明source、sink和channel之间的关系

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    3、agent3(hadoop104):

    #a1:agent的名字

    a1.sources = r1

    a1.channels = c1

    a1.sinks = k1

    #声明source具体类型和对应的配置属性

    a1.sources.r1.type = avro

    a1.sources.r1.bind = hadoop104

    a1.sources.r1.port = 44444

    #声明channel具体的类型和对应配置属性

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 100

    #声明sink具体的类型和对应配置属性

    #将event数据存储在本地磁盘 
    a1.sinks.k1.type = file_roll  
    #配置存储event目录
    a1.sinks.k1.sink_directory = /opt/module/flume-1.9.0/demo

    #声明source、sink和channel之间的关系

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    六、复用范例

    1、agent1(hadoop102):

    #a1:agent的名字

    a1.sources = r1

    a1.channels = c1 c2

    a1.sinks = k1 k2

    #声明source具体类型和对应的配置属性

    a1.sources.r1.type = exec

    #a1.sources.r1.bind = hadoop102

    a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log

    #a1.sources.r1.selector.type = replicating

    #复用 
    a1.sources.r1.selector.type = multiplexing

    #state指的是headers中key的值 
    a1.sources.r1.selector.header = state

    #CZ指的是headers中value的值 
    a1.sources.r1.selector.mapping.CZ = c1

    #US指的是headers中value的值 
    a1.sources.r1.selector.mapping.US = c2

    #拦截器:给event中headers添加数据

    a1.sources.r1.interceptors = i1

    a1.sources.r1.interceptors.i1.type = static

    a1.sources.r1.interceptors.i1.key = state

    a1.sources.r1.interceptors.i1.value = CZ

    #声明channel具体的类型和对应配置属性

    a1.channels.c1.type = memory

    a1.channels.c2.type = memory

    a1.channels.c1.capacity = 100

    a1.channels.c2.capacity = 100

    #a1.channels.c1.keep-alive = 3

    #声明sink具体的类型和对应配置属性

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop103

    a1.sinks.k2.port = 33333

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = hadoop104

    a1.sinks.k2.port = 44444

    #声明source、sink和channel之间的关系

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2        

    2、agent2、agent3同复制范例

    七、故障转移范例

    1、agent1

    a1.sources = r1

    a1.channels = c1

    a1.sinks = k1 k2

    #声明source具体类型和对应的配置属性

    a1.sources.r1.type = netcat

    #a1.sources.r1.bind = hadoop102

    a1.sources.r1.port = 22222

    #声明channel具体的类型和对应配置属性

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 100

    #故障转移配置sinkgroups     
    a1.sinkgroups = g1

    a1.singgroups.g1.sinks = k1 k2

    a1.singgroups.g1.processor.type = failover

    a1.singgroups.g1.processor.priority.k1 = 5

    a1.singgroups.g1.processor.priority.k2 = 10 

    #负载均衡配置sinkgroups 

    #a1.sinkgroups=g1
    #a1.sinkgroups.g1.sinks=k1k2
    #a1.sinkgroups.g1.processor.type=load_balance
    #a1.sinkgroups.g1.processor.backoff=true
    #a1.sinkgroups.g1.processor.selector=random 

    #声明sink具体的类型和对应配置属性

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop103

    a1.sinks.k2.port = 33333

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = hadoop104

    a1.sinks.k2.port = 44444

    #声明source、sink和channel之间的关系

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c1

    2、agent2、agent3同复制范例

    相关文章

      网友评论

          本文标题:大数据之flume

          本文链接:https://www.haomeiwen.com/subject/wpqvoktx.html