美文网首页大数据
Hadoop Guide--Flume

Hadoop Guide--Flume

作者: 24格的世界 | 来源:发表于2016-12-26 13:49 被阅读306次

    Flume的官网地址:http://flume.apache.org/FlumeUserGuide.html#exec-source

    source,sink,channel:https://www.iteblog.com/archives/948

    简介Flume:

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输系统,Flume提供在日志采集系统中定制各类的数据发送方,用于接收数据,同时提供对数据进行简单的处理,并写到各个数据接收方中的过程。


    流程结构:

    Flume的主要结构分为三部分:source,channel,sink;其中source是源头,负责采集日志;channel是管道,负责传输和暂时的存储;sink为目的地,将采集的日志保存起来;

    根据需求对Flume的三部分进行组合,构成一个完整的agent,处理日志的传输;PS:agent为flume处理消息的单位,数据经过agent进行传输。


    具体配置:

    (启动flume的服务要安装JDK)

    source:包括的配置为{Avro,Thrift,Exec,Spooling,...}

    sink:包括的配置为{HDFS,Hive,Avro,Thrift,Logger,...}

    channel:包括的配置为{Memory,JDBC,Kafka,File,...}

    PS:具体的配置参考Flume的官网进行查询;

    一个简单的Flume的配置为:

    Sample example:

    # example.conf: A single-node Flume configuration

    # Name the components on this agent

    a1.sources = r1 #定义数据的入口

    a1.sinks = k1 #定义数据的出口

    a1.channels = c1 #定义管道

    # Describe/configure the source

    a1.sources.r1.type = netcat #定义数据源的类型

    a1.sources.r1.bind = localhost #监听地址

    a1.sources.r1.port = 44444 #监听端口

    # Describe the sink

    a1.sinks.k1.type = logger      #定义数据出口,出口类型

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory  #临时存储文件的方式

    a1.channels.c1.capacity = 1000 #存储大小

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1    #配置连接的方式

    a1.sinks.k1.channel = c1


    source源的配置:(常见配置)

    1.Avro配置:需要配置-channels,type,bind,port四组参数;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = avro #type为组件的名称,需要填写avro,avro方式使用RPC方式接收,故此需要端口号

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.bind = 0.0.0.0#限制接收数据的发送方ID,0.0.0.0是接收任何IP,不做限制

    a1.sources.r1.port = 4141 #接收端口(与flume的客户端的sink相呼应)

    Avro的配置的话需要:需要在客户端和接收端都要配置相应的Avro配置才可。

    2.exec source配置:可以通过指定操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取;exce的配置就是设定一个Linux命令,通过这个命令不断的传输数据;我们使用命令去查看tail -F 命令去查看日志的尾部;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = exce #type为组件的名称,采用命令行的方式进行读取数据

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.command = tail -F /var/log/secure #监控查看日志的尾部进行输出的操作

    3.spooling-directory source配置:spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取文件夹中的所有文件,该文件夹下的文件不可以进行再打开编辑的操作,spool的目录下不可包含相应的子目录;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = spooldir #type为组件的名称,需要填写spooldir为spool类型

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.spoolDir = /home/hadoop/flume/logs #spool接收的目录信息

    4.Syslogtcp source配置:Syslogtcp监听tcp端口作为数据源;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = syslogtcp #type为组件的名称,需要填写syslogtcp,监听端口号dd

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.port = 5140 #端口号

    a1.sources.r1.host = localhost #发送方的IP地址

    5.HTTP Source配置:是HTTP POST和GET来发送事件数据,使用Hander程序实现转换;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = http #type为组件的名称,需要填写http类型

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.handler= org.example.rest.RestHandler

    a1.sources.r1.handler.nickname=random.props

    sink源的配置:(常见配置)

    sink会消费channel中的数据,然后送给外部数据源或者source;

    1.Hive sink:hive的数据是只限制了text和JSON数据直接在hive的表中或partition中;

    hive的sink的主要参数详解:

    type:构建的类型的名字,此处填写hive;

    hive.metastore:Hive metastore的URL;

    hive.database:Hive database 名字;

    hive table:Hive table名字;

    使用hive的额话需要进行先创建表的过程:

    create table weblogs ( id int , msg string )

    partitioned by (continent string, country string, time string)

    clustered by (id) into 5 buckets

    stored as orc;

    a1.channels = c1

    a1.channels.c1.type = memory

    a1.sinks = k1

    a1.sinks.k1.type = hive

    a1.sinks.k1.channel = c1

    a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083

    a1.sinks.k1.hive.database = logsdb

    a1.sinks.k1.hive.table = weblogs

    a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M

    a1.sinks.k1.useLocalTimeStamp = false

    a1.sinks.k1.round = true

    a1.sinks.k1.roundValue = 10

    a1.sinks.k1.roundUnit = minute

    a1.sinks.k1.serializer = DELIMITED

    a1.sinks.k1.serializer.delimiter = "\t"

    a1.sinks.k1.serializer.serdeSeparator = '\t'

    a1.sinks.k1.serializer.fieldnames =id,,msg

    2.Logger Sink:Logs是INFO level,This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.

    a1.channels = c1

    a1.sinks = k1

    a1.sinks.k1.type = logger

    a1.sinks.k1.channel = c1

    3.Avro Sink:

    需要配置hostname和port,需要在两端都要安装Avro的客户端:

    type:组件名称,此处填写Avro;

    hostname和port:填写地址;

    a1.channels = c1

    a1.sinks = k1

    a1.sinks.k1.type = avro

    a1.sinks.k1.channel = c1

    a1.sinks.k1.hostname = 10.10.10.10

    a1.sinks.k1.port = 4545

    4.Kafka Sink:

    a1.sinks.k1.channel = c1

    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

    a1.sinks.k1.kafka.topic = mytopic

    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

    a1.sinks.k1.kafka.flumeBatchSize = 20

    a1.sinks.k1.kafka.producer.acks = 1

    a1.sinks.k1.kafka.producer.linger.ms = 1

    a1.sinks.ki.kafka.producer.compression.type = snappy

    Channel 配置:(常见配置)

    1.Memory Chanel:

    使用Memory作为中间的缓存;

    a1.channels = c1

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 10000

    a1.channels.c1.transactionCapacity = 10000

    a1.channels.c1.byteCapacityBufferPercentage = 20

    a1.channels.c1.byteCapacity = 800000

    2.JDBC Channel:

    a1.channels = c1

    a1.channels.c1.type = jdbc

    3.Kafka Channel:

    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

    a1.channels.channel1.kafka.topic = channel1

    a1.channels.channel1.kafka.consumer.group.id = flume-consumer

    4.File Channel:

    a1.channels = c1

    a1.channels.c1.type = file

    a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

    a1.channels.c1.dataDirs = /mnt/flume/data

    source源的配置:(常见配置)

    1.Avro配置:需要配置-channels,type,bind,port四组参数;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = avro #type为组件的名称,需要填写avro,avro方式使用RPC方式接收,故此需要端口号

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.bind = 0.0.0.0#限制接收数据的发送方ID,0.0.0.0是接收任何IP,不做限制

    a1.sources.r1.port = 4141 #接收端口(与flume的客户端的sink相呼应)

    Avro的配置的话需要:需要在客户端和接收端都要配置相应的Avro配置才可。

    2.exec source配置:可以通过指定操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取;exce的配置就是设定一个Linux命令,通过这个命令不断的传输数据;我们使用命令去查看tail -F 命令去查看日志的尾部;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = exce #type为组件的名称,采用命令行的方式进行读取数据

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.command = tail -F /var/log/secure #监控查看日志的尾部进行输出的操作

    3.spooling-directory source配置:spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取文件夹中的所有文件,该文件夹下的文件不可以进行再打开编辑的操作,spool的目录下不可包含相应的子目录;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = spooldir #type为组件的名称,需要填写spooldir为spool类型

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.spoolDir = /home/hadoop/flume/logs #spool接收的目录信息

    4.Syslogtcp source配置:Syslogtcp监听tcp端口作为数据源;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = syslogtcp #type为组件的名称,需要填写syslogtcp,监听端口号dd

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.port = 5140 #端口号

    a1.sources.r1.host = localhost #发送方的IP地址

    5.HTTP Source配置:是HTTP POST和GET来发送事件数据,使用Hander程序实现转换;

    a1.sources = r1

    a1.channels = c1

    a1.sources.r1.type = http #type为组件的名称,需要填写http类型

    a1.sources.r1.channels = c1 #匹配agent创建的channel即可

    a1.sources.r1.handler= org.example.rest.RestHandler

    a1.sources.r1.handler.nickname=random.props

    sink源的配置:(常见配置)

    sink会消费channel中的数据,然后送给外部数据源或者source;

    1.Hive sink:hive的数据是只限制了text和JSON数据直接在hive的表中或partition中;

    hive的sink的主要参数详解:

    type:构建的类型的名字,此处填写hive;

    hive.metastore:Hive metastore的URL;

    hive.database:Hive database 名字;

    hive table:Hive table名字;

    使用hive的额话需要进行先创建表的过程:

    create table weblogs ( id int , msg string )

    partitioned by (continent string, country string, time string)

    clustered by (id) into 5 buckets

    stored as orc;

    a1.channels = c1

    a1.channels.c1.type = memory

    a1.sinks = k1

    a1.sinks.k1.type = hive

    a1.sinks.k1.channel = c1

    a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083

    a1.sinks.k1.hive.database = logsdb

    a1.sinks.k1.hive.table = weblogs

    a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M

    a1.sinks.k1.useLocalTimeStamp = false

    a1.sinks.k1.round = true

    a1.sinks.k1.roundValue = 10

    a1.sinks.k1.roundUnit = minute

    a1.sinks.k1.serializer = DELIMITED

    a1.sinks.k1.serializer.delimiter = "\t"

    a1.sinks.k1.serializer.serdeSeparator = '\t'

    a1.sinks.k1.serializer.fieldnames =id,,msg

    2.Logger Sink:Logs是INFO level,This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.

    a1.channels = c1

    a1.sinks = k1

    a1.sinks.k1.type = logger

    a1.sinks.k1.channel = c1

    3.Avro Sink:

    需要配置hostname和port,需要在两端都要安装Avro的客户端:

    type:组件名称,此处填写Avro;

    hostname和port:填写地址;

    a1.channels = c1

    a1.sinks = k1

    a1.sinks.k1.type = avro

    a1.sinks.k1.channel = c1

    a1.sinks.k1.hostname = 10.10.10.10

    a1.sinks.k1.port = 4545

    4.Kafka Sink:

    a1.sinks.k1.channel = c1

    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

    a1.sinks.k1.kafka.topic = mytopic

    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

    a1.sinks.k1.kafka.flumeBatchSize = 20

    a1.sinks.k1.kafka.producer.acks = 1

    a1.sinks.k1.kafka.producer.linger.ms = 1

    a1.sinks.ki.kafka.producer.compression.type = snappy

    Channel 配置:(常见配置)

    1.Memory Chanel:

    使用Memory作为中间的缓存;

    a1.channels = c1

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 10000

    a1.channels.c1.transactionCapacity = 10000

    a1.channels.c1.byteCapacityBufferPercentage = 20

    a1.channels.c1.byteCapacity = 800000

    2.JDBC Channel:

    a1.channels = c1

    a1.channels.c1.type = jdbc

    3.Kafka Channel:

    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

    a1.channels.channel1.kafka.topic = channel1

    a1.channels.channel1.kafka.consumer.group.id = flume-consumer

    4.File Channel:

    a1.channels = c1

    a1.channels.c1.type = file

    a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

    a1.channels.c1.dataDirs = /mnt/flume/data

    相关文章

      网友评论

        本文标题:Hadoop Guide--Flume

        本文链接:https://www.haomeiwen.com/subject/hthgmttx.html