美文网首页
Structured Streaming 消费Kafka 解析与

Structured Streaming 消费Kafka 解析与

作者: 早点起床晒太阳 | 来源:发表于2020-07-08 14:50 被阅读0次

    参考资料
    http://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html

    前言

    现在Flink很火,但是自己还没有系统性的了解Structured Streaming ,今天将相关Structured Streaming消费kafka相关下了解下

    准备工作

    我这里写了个消费kafka的样例代码,可以正常的往kafka消费数据

       Properties props = new Properties();
            props.put("bootstrap.servers", "****:9092");
            props.put("acks", "0");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //生产者发送消息
            String topic = "test";
            org.apache.kafka.clients.producer.Producer<String, String> procuder = new KafkaProducer<String, String>(props);
            String newPathName = "";
            String date;
            for (int i = 1; i < 20; i++) {
                Thread.sleep(2000);
                date = new Date().toString().substring(11);
                String line = "这是第" + i + "message 时间是  " + date;
                procuder.send(new ProducerRecord<String, String>(topic,"demo", line));
                System.out.println(line);
            }
    

    Structured Streaming 消费Kafka需要准备的maven依赖

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
                <version>2.4.1</version>
            </dependency>
    

    Structured Streaming消费kafka相关

    参考spark官网 http://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html

    直接上示例代码 (代码很简单)

        val lines = spark.readStream.format("kafka")
          .option("kafka.bootstrap.servers","****:9092")
          .option("startingOffsets", "earliest")
          .option("subscribe","test")
          .load()
    

    我这边消费了kafka然后直接输出到控制台,代码如图所示

        val query = lines
          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)")
          .writeStream
          .outputMode("update")
          .format("console")
          .start()
    

    这里说一下里面踩到的坑

    1、输出为二进制相关 乱码

    +----+--------------------+-----+---------+------+--------------------+-------------+
    | key| value|topic|partition|offset| timestamp|timestampType|
    +----+--------------------+-----+---------+------+--------------------+-------------+
    |null|[E8 BF 99 E6 98 A...|nokia| 0| 35|2020-07-07 20:17:...| 0|
    +----+--------------------+-----+---------+------+--------------------+-------------+

    类似于这种,查询了下,是因为没有加

    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)")

    这行代码。

    这里多说一下,Structured Streaming 消费kafka的每一行数据都有相关的schema,我们可以在writeStream的语句下 selectExpr 选取我们想要的字段

    key             binary
    value           binary
    topic           string
    partition       int
    offset          long
    timestamp       long
    timestampType   int
    
    2、消费kafka的参数

    startingOffsets 可选值"earliest", "latest" "latest" for streaming, "earliest" for batch

    所以在streaming模式下默认是latest,最新的意思,如果想要修改的话,可以自己修改。

    endingOffsets

    streaming模式下不支持此参数,只在batch模式下可以 (batch模式不讲,可取http://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html 下查看)

    相关文章

      网友评论

          本文标题:Structured Streaming 消费Kafka 解析与

          本文链接:https://www.haomeiwen.com/subject/qslccktx.html