参考资料
http://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html
前言
现在Flink很火,但是自己还没有系统性的了解Structured Streaming ,今天将相关Structured Streaming消费kafka相关下了解下
准备工作
我这里写了个消费kafka的样例代码,可以正常的往kafka消费数据
Properties props = new Properties();
props.put("bootstrap.servers", "****:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生产者发送消息
String topic = "test";
org.apache.kafka.clients.producer.Producer<String, String> procuder = new KafkaProducer<String, String>(props);
String newPathName = "";
String date;
for (int i = 1; i < 20; i++) {
Thread.sleep(2000);
date = new Date().toString().substring(11);
String line = "这是第" + i + "message 时间是 " + date;
procuder.send(new ProducerRecord<String, String>(topic,"demo", line));
System.out.println(line);
}
Structured Streaming 消费Kafka需要准备的maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>2.4.1</version>
</dependency>
Structured Streaming消费kafka相关
参考spark官网 http://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html
直接上示例代码 (代码很简单)
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","****:9092")
.option("startingOffsets", "earliest")
.option("subscribe","test")
.load()
我这边消费了kafka然后直接输出到控制台,代码如图所示
val query = lines
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)")
.writeStream
.outputMode("update")
.format("console")
.start()
这里说一下里面踩到的坑
1、输出为二进制相关 乱码
+----+--------------------+-----+---------+------+--------------------+-------------+
| key| value|topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----+---------+------+--------------------+-------------+
|null|[E8 BF 99 E6 98 A...|nokia| 0| 35|2020-07-07 20:17:...| 0|
+----+--------------------+-----+---------+------+--------------------+-------------+
类似于这种,查询了下,是因为没有加
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)")
这行代码。
这里多说一下,Structured Streaming 消费kafka的每一行数据都有相关的schema,我们可以在writeStream的语句下 selectExpr 选取我们想要的字段
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
2、消费kafka的参数
startingOffsets 可选值"earliest", "latest" "latest" for streaming, "earliest" for batch
所以在streaming模式下默认是latest,最新的意思,如果想要修改的话,可以自己修改。
endingOffsets
streaming模式下不支持此参数,只在batch模式下可以 (batch模式不讲,可取http://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html 下查看)
网友评论