美文网首页
Spark Streaming整合Flume、Kafka打造实时

Spark Streaming整合Flume、Kafka打造实时

作者: 机灵鬼鬼 | 来源:发表于2019-06-22 15:23 被阅读0次

    这一篇文章详细介绍企业级实时流处理的核心流程,是我们学习大数据的重要里程碑。重要性和重要等级是最高的,请大家务必仔细阅读并认真的进行实操练习。

    在我们进行整个整合工作之前,我们需要有一个整体架构图来直观的展示我们的思路:

    第一步  整合日志输出到Flume

    由于直接通过log4j appender写入到flume,对程序的耦合性太高,我使用了flume主动监控日志文件夹的方式来收集日志的。

    关于flume监控文件夹的方式我在flume如何监控多个动态变化的日志文件一文中有详细介绍

    第二步  整合Flume到Kafka

    #test-avro-memory-hdfsAndkafka的angent,同一份数据写入到两个服务,只需要在flume中配置多个sink即可是实现,配置详情如下。

    ##配置source

    test-avro-memory-hdfsAndkafka.sources=avro-source

    ##同一份数据,输出到两个目标服务

    ##输出到hdfs

    test-avro-memory-hdfsAndkafka.sinks=hdfs-sink kafka-sink

    ##输出到kafka

    #test-avro-memory-hdfsAndkafka.sinks=kafka-sink

    test-avro-memory-hdfsAndkafka.channels=memory-channel

    test-avro-memory-hdfsAndkafka.sources.avro-source.type=avro

    test-avro-memory-hdfsAndkafka.sources.avro-source.bind=10.101.3.3

    test-avro-memory-hdfsAndkafka.sources.avro-source.port=44444

    # Describe the hdfs-sink

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.type=hdfs

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.path = hdfs://10.101.3.3:9000/xcx/test/%y-%m-%d/%H%M/

    #表示文件的前缀

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.filePrefix = xcx

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileSuffix =.txt

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.writeFormat=Text

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileType=DataStream

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.round = true

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundValue =30

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundUnit = minute

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollSize=1048576

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollCount=0

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.minBlockReplicas=1

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true

    # Describe the kafka-sink

    test-avro-memory-hdfsAndkafka.sinks.kafka-sink.type =org.apache.flume.sink.kafka.KafkaSink

    test-avro-memory-hdfsAndkafka.sinks.kafka-sink.brokerList=10.101.3.3:9092

    ###需要提前在kafka中新建一个topic,名字为flume_kafka_streaming,供flume和streaming使用

    test-avro-memory-hdfsAndkafka.sinks.kafka-sink.topic=flume_kafka_streaming

    test-avro-memory-hdfsAndkafka.sinks.kafka-sink.requiredAcks=1

    test-avro-memory-hdfsAndkafka.sinks.kafka-sink.batchSize=5

    # Use a channel which buffers events in memory

    test-avro-memory-hdfsAndkafka.channels.memory-channel.type = memory

    # Bind the source and sink to the channel

    test-avro-memory-hdfsAndkafka.sources.avro-source.channels=memory-channel

    test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.channel=memory-channel

    test-avro-memory-hdfsAndkafka.sinks.kafka-sink.channel=memory-channel

    第三步  整合Kafka到Spark Streaming

    请参看Spark Streaming整合kafka

    第四步 使用Spark Streaming对接收到的数据进行处理

    请参看Spark Streaming整合kafka

    相关文章

      网友评论

          本文标题:Spark Streaming整合Flume、Kafka打造实时

          本文链接:https://www.haomeiwen.com/subject/oufyqctx.html