前几章一起学习了Spark Streaming整合Flume,Spark Streaming整合Kafka。这一章一起学习三者的整合搭建一个流处理平台环境。整体数据流向和处理流程如下:
整合日志输出到Flume
- Flume agent的配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.30.131
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:这里sink的类型为logger,在测试log4j能够把日志输入到flume后在把sink改为KafkaSink。在开发中做一步测一步是较好的习惯,如果串联了整个流程测试,出现了问题不方便定位。
启动agent
flume-ng agent --name a1 --conf ../conf --conf-file ../conf/log4j_to_flume.conf -Dflume.root.logger=INFO,console
- log4j配置
添加pom依赖整合flume
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.9.0</version>
</dependency>
配置
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=192.168.30.131
log4j.appender.flume.Port=41414
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
此处的Hostname和Port要和flume agent配置中的相同
- 模拟日志产生
public class LoggerGenerator2 {
private static Logger logger = Logger.getLogger(LoggerGenerator2.class.getName());
public static void main(String[] args) throws Exception{
int index = 0;
while(true) {
Thread.sleep(1000);
logger.info("value : " + index++);
}
}
}
注意
我在启动程序后遇到了以下问题
经过排查,是因为我的pom文件中引入了spark streaming的依赖包,其中包含了avro的依赖,和flume-ng-log4jappender的avro依赖起了冲突,通过以下方式解决了问题
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
- 测试
启动程序,观察agent的客户端,发现可以打印出日志信息,表示log4j到flume的整合成功。
整合Flume到Kafka
这一步把Flume收集的日志输出到Kafka,关键的一步就是把上一把agent的类型配置为KafkaSink
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.30.131
a1.sources.r1.port = 41414
# Describe the sink
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=192.168.30.131:9092
#设置Kafka的Topic
a1.sinks.k1.topic=kafka_spark
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#一批中要处理的消息数。较大批量可提高吞吐量,同时增加延迟
a1.sinks.k1.batchSize = 3
a1.sinks.k1.requiredAcks = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动kafka环境,并启动一个消费者。启动日志模拟生成的程序,观察消费者客户端,出现以下内容说明日志已经输出到了Kafka。
image.png
整合Kafka到Spark Streaming
在Spark Streaming整合Kafka已经做了详细介绍,这里简单回顾
核心pom
<!-- Spark Streaming 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spark Streaming 集成 kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.3</version>
</dependency>
scala程序
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("DirectKafka")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val topicsSet = Array("kafka_spark")
val kafkaParams = mutable.HashMap[String, String]()
//必须添加以下参数,否则会报错
kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
kafkaParams.put("group.id", "group1")
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
)
)
// 业务逻辑处理,这里为简单打印
val lines = messages.map(_.value)
lines.print()
//开始计算
ssc.start()
ssc.awaitTermination()
}
}
先根据以上流程,启动Kafka环境、agent、模拟日志生成程序。启动Spark Streaming程序,观察控制台
image.png
根据百度查询添加以下配置
val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
重新启动
image.png
成功
Spark Streaming对接收到的数据进行处理
根据以上步骤,已经成功的把日志信息输出到了spark streaming。此时可以通过Spark Streaming对接收到的数据进行处理。把处理后的结果保存到关系型数据库(如MySQL)。在通过Javaweb程序把结果可视化即可。后面的文章将一起学习如果使用Spark Streaming对日志进行处理。
网友评论