美文网首页
基于 Spark Streaming、Kafka、Flume

基于 Spark Streaming、Kafka、Flume

作者: 王龙江_3c83 | 来源:发表于2019-03-19 13:49 被阅读0次

1. 日志生成器

1.1 配置依赖

<!-- log4j -->
<dependency>
     <groupId>log4j</groupId>
     <artifactId>log4j</artifactId>
     <version>1.2.17</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
     <groupId>org.apache.flume</groupId>
     <artifactId>flume-ng-sdk</artifactId>
     <version>1.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
<dependency>
     <groupId>org.apache.flume.flume-ng-clients</groupId>
     <artifactId>flume-ng-log4jappender</artifactId>
     <version>1.8.0</version>
</dependency>

1.2 配置 log4j.properties

注:在 resource 目录下

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.149
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

1.3 代码

import org.apache.log4j.Logger;

public class LogGenerator {
    private static Logger logger=Logger.getLogger(LogGenerator.class);
    private static int index=0;

    public static void main(String[] args) throws InterruptedException {
        while (true){
            Thread.sleep(1000);
            logger.info("value:"+index++);
        }
    }
}

2. flume

2.1 flume 配置

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
#agent1.sinks.log-sink.type=logger
agent1.sinks.kafka-sink.channel = logger-channel
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.kafka.topic = mytopic # 需提前创建
agent1.sinks.kafka-sink.kafka.bootstrap.servers = 192.168.1.149:9092
agent1.sinks.kafka-sink.kafka.flumeBatchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

2.2 启动命令

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming.conf --name agent1 -Dflume.root.logger=INFO,console

3. Spark Streaming

3.1 核心概念

核心概念
StreamingContext 初始化方法。
Dstream
Input Dstream
Transformations
Output Operations

3.2 Operator

函数 功能
updateStateByKey 保持状态,累积计算。
transform 将流转换为 RDD 进行运算,有返回值。
forEachRDD 将流转换为 RDD 进行运算,有返回值。
窗口函数

4. 存储

参考资料

相关文章

网友评论

      本文标题:基于 Spark Streaming、Kafka、Flume

      本文链接:https://www.haomeiwen.com/subject/tepgmqtx.html