美文网首页
快乐大数据第7课 Flume日志收集系统

快乐大数据第7课 Flume日志收集系统

作者: 快乐大数据 | 来源:发表于2018-08-20 16:22 被阅读0次

    Flume 日志收集系统

    #安装

    在node01下

    在hadoop用户下

    cd ~/apps

    在此路径下 解压后是 flume-1.8.0

    ll

    cd conf

    再把演示用的各种conf文件上传给node01

    再通过scp命令把 这个flume-1.8.0传给另外4给node

    cat example.conf

    sources = r1 #多个source可以用空格隔开

    channels = c1

    sinks = k1

    sources.r1.type = netcat #绑定一个本地端口,往flume里面传输数据

    sources.r1.bind = localhost

    sources.r1.port = 44444

    sources.r1.channels = c1 #把source和channel关联起来

    channels.c1.type = memory ¥在内存中存储

    channels.c1.capacity = 1000 #可以存1000个event

    channels.c1.transactionCapacity = 100 #一次事务提交多少给event

    sinks.k1.type =logger #以日志的形式在黑窗口中打出来

    sinks.k1.channel = c1 #在哪个channel中拉取数据

    启动flume

    再新开一个node01

    telnet localhost 44444

    访问这个端口

    输入 哈哈哈

    #详细介绍flume的组件

    一个event在一个agent中的传输处理流程如下:source--interceptor--selector->channel->sink processor--sink->中心存储/下一级

    agent

    #1.avro source

    Avro Source:支持Avro协议,接收RPC事件请求。Avro Source通过监

    听Avro端口接收外部Avro客户端流事件(event),在Flume的多层架

    构中经常被使用接收上游Avro Sink发送的event

    ? 关键参数说明

    ? type:类型名称avro

    ? bind :绑定的IP

    ? port :监听的端口

    ? threads:接收请求的线程数,当需要接收多个avro客户端的数据流时要设置合

    适的线程数,否则会造成avro客户端数据流积压

    ? compression-type:是否使用压缩,如果使用压缩设则值为“deflate”,avro

    source一般用于多个Agent组成的数据流,接收来自avro sink的event,如果avro

    source设置了压缩,name上一阶段的avro sink也要设置压缩。默认值none

    ? channels:Source对接的Channel名称

    #2.exec source

    Exec Source:支持Linux命令,收集标准输的方式监听指定文件。

    Exec Source可以实现实时的消息传输,但文件的位置,不支持断点续传,当Exec Source后续增加的消息丢失,一般在测试环境使用

    关键参数说明

    ? type :source类型为exec

    ? command :Linux命令

    ? channels :Source对接的Channel名称。

    -----演示Avro Source和exec Source

    在启动前 先vim avrosource.conf

    修改绑定的IP为192.16.183.101

    在node01中启动 bin/flume-ng agent --conf conf --conf-file conf/avrosource.conf --name avroagent -Dflume.root.logger=INFO,console

    接着新开一个node01(1)的窗口

    mkdir -p /home/hadoop/apps/flume/execsource/

    touch exectest.log

    echo 123 > exectest.log

    echo 34567 >> exectest.log

    启动execagent

    再新建一个窗口node01(2),启动execagent

    再回到node01(1)的窗口

    cd /home/hadoop/apps/flume/execsource/

    echo 8040 >>  exectest.log

    原理是 execsource.conf 监听node01/home/hadoop/apps/flume/execsource/exectest.log中日志文件的变化,收集日志里面的数据再传给avrosource

    他接收后,在黑窗口中打印出数据

    ----演示Spooling Directory Source

    Spooling Directory Source:监听一个文件夹,收集文件夹下文件数据,收集完文件数据会将文件名称的后缀改为.COMPLETED

    缺点不支持已存在文件新增数据的收集,且不能够对嵌套文件夹递归监听

    关键参数说明

    ? type :source类型为spooldir

    ? spoolDir:source监听的文件夹

    ? fileHeader :是否添加文件的绝对路径到event的header中,默认值false

    ? fileHeaderKey:添加到event header中文件绝对路径的键值,默认值file

    ? fileSuffix:收集完新文件数据给文件添加的后缀名称,默认值:

    .COMPLETED

    ? channels :Source对接的Channel名称

    先做好预备工作

    在node01下

    cd ~/apps/flume

    mkdir spoolDir

    mkdir selector

    mkdir taildir

    mkdir filechannel

    mkdir multiplexing

    启动Spooling Directory Source

    在node01的一个新窗口中 bin/flume-ng agent --conf conf --conf-file conf/spooldirsource.conf --name a1 -Dflume.root.logger=INFO,console

    换一个窗口

    cd ~/apps/flume/spoolDir

    echo 134 > test1

    echo 477 >> test1    不会响应,也不能监听子文件夹下面的数据

    echo 477 >> test2  新的文件就会收集到

    ---演示Kafka source

    先创建kafka 主题 先在node03,node04,node05上启动kafka

    在node03上, cd ~/apps/kafka_2.11-0.10.2.1

    bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181,192.168.183.102:2181,192.168.183.103:2181,192.168.183.104:2181,192.168.183.105:2181, --replication-factor 2 --partitions 3 --topic flumetopictest1

    在node01上先vim kafkasource.conf的主机名称

    然后

    bin/flume-ng agent --conf conf --conf-file conf/kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console

    在node03上启动kafka的客户端

    cd /home/hadoop/apps/kafka_2.11-0.10.2.1

    bin/kafka-console-producer.sh --broker-list 192.168.183.103:9092,192.168.183.104:9092 --topic flumetopictest1

    写入12345

    node01上就会输出写入的结果

    ---演示taildir source

    Taildir Source:监听一个文件夹或者文件,通过正则表达式匹配需要监听的数据源文件,Taildir Source通过将监听的文件位置写入到文件中来实现断点

    续传,并且能够保证没有重复数据的读取

    关键参数说明

    ? type:source类型TAILDIR

    ? positionFile:保存监听文件读取位置的文件路径

    ? idleTimeout:关闭空闲文件延迟时间,如果有新的记录添加到已关闭的空闲文件,taildir srouce将继续打开该空闲文件,默认值120000毫秒(2分钟)

    ? writePosInterval:向保存读取位置文件中写入读取文件位置的时间间隔,默认值

    3000毫秒

    ? batchSize:批量写入channel最大event数,默认值100

    ? maxBackoffSleep:每次最后一次尝试没有获取到监听文件最新数据的最大延迟时间,默认值5000毫秒

    先做好预备工作 在node01下

    cd ~/apps/flume/taildir

    mkdir test1

    mkdir test2

    mkdir position

    在node01中启动 在flume-1.8.0文件夹下

    bin/flume-ng agent --n a1 --conf conf --conf-file conf/taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console

    在另一个窗口中

    cd ~/apps/flume/taildir/test1

    echo 123 > test.log

    cd ~/apps/flume/taildir/test2

    echo 4590  > file2.log

    ----演示filechannel

    先做好预备工作 在node01下

    cd  /home/hadoop/apps/flume/filechannel

    mkdir data

    mkdir checkpoint

    mkdir backup

    cd /home/hadoop/apps/flume-1.8.0

    bin/flume-ng agent --conf conf --conf-file conf/filechannle.conf --name a1 -Dflume.root.logger=INFO,console

    在node01的另一个窗口

    telnet localhost 44444

    发送12345

    再开一个node01的窗口

    cd /home/hadoop/apps/flume/filechannel/data 发现已经创建了文件。

    cd /home/hadoop/apps/flume/filechannel/checkpoint 已经创建了检查点文件

    ----演示kafkachannel(首选)

    存储容量更大,容错更好

    关键参数说明:

    ? type:Kafka Channel类型org.apache.flume.channel.kafka.KafkaChannel

    ? kafka.bootstrap.servers:Kafka broker列表,格式为ip1:port1, ip2:port2…,建

    议配置多个值提高容错能力,多个值之间用逗号隔开

    ? kafka.topic:topic名称,默认值“flume-channel”

    ? kafka.consumer.group.id:Consumer Group Id,全局唯一

    ? parseAsFlumeEvent:是否以Avro FlumeEvent模式写入到Kafka Channel中,

    默认值true,event的header信息与event body都写入到kafka中

    ? pollTimeout:轮询超时时间,默认值500毫秒

    ? kafka.consumer.auto.offset.reset:earliest表示从最早的偏移量开始拉取,latest

    表示从最新的偏移量开始拉取,none表示如果没有发现该Consumer组之前拉

    取的偏移量则抛异常

    首先做好预备工作

    在node01上

    cd /home/hadoop/apps/flume-1.8.0/conf

    vim kafkachannel.conf(修改kafka broker的机器号)

    在node03上

    创建一个topic

    cd /home/hadoop/apps/kafka_2.11-0.10.2.1

    bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic flumechannel2

    在node01上启动agent

    cd /home/hadoop/apps/flume-1.8.0

    bin/flume-ng agent --conf conf --conf-file conf/kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console

    在node01的一个新窗口中

    telnet localhost 44444  发送数据123456789

    ----演示HDFSsink

    在node01上,cd /home/hadoop/apps/flume-1.8.0

    bin/flume-ng agent --conf conf --conf-file conf/hdfssink.conf --name a1 -Dflume.root.logger=INFO,console

    在node01上

    使用telnet发送数据

    telnet localhost 44444    发送12345555

    在node01的一个新窗口上,

    hadoop fs -ls /data/flume/20180811 可以发现一个前缀为hdfssink的文件

    ---演示kafkasink(略)

    ---演示replicating seletor

    cd /home/hadoop/apps/flume-1.8.0/conf

    先修改 vim replicating_selector.conf  kafka的server

    修好后

    在node03上创建kakfka的 topic

    cd /home/hadoop/apps/kafka_2.11-0.10.2.1

    bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1

    在node01上启动agent

    cd /home/hadoop/apps/flume-1.8.0/

    bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1

    新开一个node01的窗口

    telnet localhost 44444 

    再在一个新窗口中可以发现

    cd /home/hadoop/apps/flume/selector  发现已经写入数据

    在node03 启动kafka客户端监听主题

    bin/kafka-console-consumer.sh --zookeeper 192.168.183.101:2181 --from-beginning --topic FlumeSelectorTopic1

    回到node01的窗口

    telnet localhost 44444  发送数据 78834    发送node03的kafka已经读到了

    ----演示Multiplexing  Channel Selector

    在node01上 cd ~/apps/flume/multiplexing

    mkdir k11

    mkdir k22

    mkdir k33

    修改四个配置文件绑定的端口号

    avro_sink1.conf avro_sink2.conf avro_sink3.conf multiplexing.conf

    在node01上

    bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console

    再分别启动三个阶段的agent

    在node01的一个新窗口下

    cd /home/hadoop/apps/flume-1.8.0

    bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &

    bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &

    bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &

    jps后

    发现的application进程就是新的agent进程

    看看端口在不在

    lsof -i:44444

    lsof -i:44445

    lsof -i:44446 发现端口正常监听

    在node01的一个新窗口中

    telnet localhost 44444  发送4444444444

    telnet localhost 44445  发送5555555555

    telnet localhost 44446  发送6666666666

    查看

    cd /home/hadoop/apps/flume/multiplexing/k11

    cat 1533997666003-10

    发现了4444444444这个数据

    cd /home/hadoop/apps/flume/multiplexing/k12

    cat 1533997666003-10

    发现5555555555这个数据

    cd /home/hadoop/apps/flume/multiplexing/k12

    cat 1533997666003-10

    发现6666666666这个数据

    ---sink processor

    多个sink processor需要

    负载均衡或者容错的processor

    相关文章

      网友评论

          本文标题:快乐大数据第7课 Flume日志收集系统

          本文链接:https://www.haomeiwen.com/subject/umyliftx.html