美文网首页
Java-Spark系列8-Spark streaming整合K

Java-Spark系列8-Spark streaming整合K

作者: 只是甲 | 来源:发表于2021-09-30 15:47 被阅读0次

    一. Spark streaming整合Kafka概述

    1.1 Maven配置

    对于使用SBT/Maven项目定义的Scala/Java应用程序,将您的流应用程序与以下工件链接(参见主编程指南中的链接部分获取更多信息)。

    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-0-10_2.11
    version = 2.4.0
    

    不要手动添加依赖于org.apache.kafka的工件(例如kafka-clients)。spark-streaming-kafka-0-10工件已经有了适当的传递依赖,并且不同的版本可能难以诊断的方式不兼容。

    1.2 创建Direct Stream

    注意,导入的命名空间包括版本,org.apache.spark.streaming.kafka010

    import java.util.*;
    import org.apache.spark.SparkConf;
    import org.apache.spark.TaskContext;
    import org.apache.spark.api.java.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.kafka010.*;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import scala.Tuple2;
    
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);
    
    Collection<String> topics = Arrays.asList("topicA", "topicB");
    
    JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        streamingContext,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );
    
    stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
    

    对于可能的kafkaParams,请参阅Kafka消费者配置文档。如果你的Spark批处理时间大于Kafka默认的心跳会话超时时间(30秒),适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批,这将需要在代理上更改group.max.session.timeout.ms。注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅下面的存储偏移量。

    1.3 定位策略

    新的Kafka消费者API会预取消息到缓冲区。因此,出于性能考虑,Spark集成将缓存的消费者保留在执行器上(而不是为每批重新创建它们),并倾向于在拥有适当消费者的主机位置上调度分区,这一点很重要。

    在大多数情况下,你应该使用LocationStrategies。preferred consistent如上所示。这将在可用的执行器之间均匀地分布分区。如果你的执行器和你的Kafka代理在同一个主机上,使用PreferBrokers,它会更喜欢在Kafka的leader上为那个分区调度分区。最后,如果分区之间的负载有显著的倾斜,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。

    消费者的缓存默认最大大小为64。如果你希望处理超过(64 *个执行器)的Kafka分区,你可以通过spark.streaming.kafka.consumer.cache.maxCapacity来改变这个设置。

    如果你想要关闭Kafka消费者的缓存,你可以设置spark.streaming.kafka.consumer.cache.enabled为false。为了解决SPARK-19185中描述的问题,可能需要禁用缓存。一旦Spark -19185被解析,这个属性可能会在Spark的后续版本中被删除。

    缓存是由topicpartition和group.id决定的,所以使用一个单独的组。每个调用createDirectStream的id。

    1.4 消费者的策略

    新的Kafka消费者API有许多不同的方式来指定主题,其中一些需要大量的后对象实例化设置。ConsumerStrategies提供了一个抽象,允许Spark在从检查点重新启动后获得正确配置的消费者。

    ConsumerStrategies。如上所示,订阅允许您订阅固定的主题集合。SubscribePattern允许您使用正则表达式来指定感兴趣的主题。注意,与0.8集成不同,使用Subscribe或SubscribePattern应该在流运行期间响应添加分区。最后,Assign允许指定一个固定的分区集合。这三种策略都有重载构造函数,允许您指定特定分区的起始偏移量。

    如果您有特定的消费者设置需求,而上述选项无法满足这些需求,那么ConsumerStrategy是一个可以扩展的公共类。

    1.5 创建RDD

    如果您有一个更适合批处理的用例,您可以为定义的偏移量范围创建一个RDD。

    // Import dependencies and create kafka params as in Create Direct Stream above
    
    OffsetRange[] offsetRanges = {
      // topic, partition, inclusive starting offset, exclusive ending offset
      OffsetRange.create("test", 0, 0, 100),
      OffsetRange.create("test", 1, 0, 100)
    };
    
    JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
      sparkContext,
      kafkaParams,
      offsetRanges,
      LocationStrategies.PreferConsistent()
    );
    

    注意,您不能使用PreferBrokers,因为如果没有流,就没有驱动端消费者自动为您查找代理元数据。如果有必要,可以将PreferFixed与自己的元数据查找一起使用。

    1.6 获得Offsets

    stream.foreachRDD(rdd -> {
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
      rdd.foreachPartition(consumerRecords -> {
        OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
        System.out.println(
          o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
      });
    });
    

    注意,对hasoffsetrange的类型转换只有在对createDirectStream的结果调用的第一个方法中才会成功,而不是在之后的方法链中。需要注意的是,RDD分区和Kafka分区之间的一对一映射在任何shuffle或重分区方法之后都不会保留,例如reduceByKey()或window()。

    1.7 存储 Offsets

    Kafka的传递语义在失败的情况下取决于偏移量如何和何时存储。Spark输出操作至少一次。因此,如果您想要等价于精确一次语义,则必须要么在幂等输出之后存储偏移量,要么在原子事务中与输出一起存储偏移量。通过这个集成,您有3个选项来存储偏移量,以提高可靠性(和代码复杂度)。

    1.8 检查点

    如果启用Spark检查点,偏移量将存储在检查点中。这很容易启用,但也有缺点。你的输出操作必须是幂等的,因为你会得到重复的输出;事务不是一个选项。此外,如果应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过在旧代码的同时运行新代码来缓解这个问题(因为输出无论如何都需要是幂等的,它们不应该冲突)。但是对于需要更改代码的计划外故障,除非有其他方法来确定已知的良好起始偏移量,否则您将丢失数据。

    1.9 Kafka自身

    Kafka有一个偏移量提交API,它将偏移量存储在一个特殊的Kafka主题中。默认情况下,新使用者将定期自动提交偏移量。这几乎肯定不是您想要的,因为由使用者成功轮询的消息可能还没有导致Spark输出操作,从而导致未定义的语义。这就是为什么上面的流例子将" enable.auto.commit "设置为false。然而,你可以在知道你的输出已经被存储后,使用commitAsync API提交偏移量给Kafka。与检查点相比,Kafka的好处是不管你的应用代码发生了什么变化,它都是一个持久的存储。然而,Kafka不是事务性的,所以你的输出必须仍然是幂等的。

    stream.foreachRDD(rdd -> {
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
      // some time later, after outputs have completed
      ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
    });
    

    1.10 自身数据存储

    对于支持事务的数据存储,将偏移量作为结果保存在同一个事务中可以使两者保持同步,即使在失败的情况下也是如此。如果您非常小心地检测重复或跳过的偏移范围,则回滚事务可以防止重复或丢失的消息影响结果。这提供了等价的“一次精确”语义。甚至对于聚集产生的输出也可以使用这种策略,聚集通常很难使其幂等。

    // The details depend on your data store, but the general idea looks like this
    
    // begin from the the offsets committed to the database
    Map<TopicPartition, Long> fromOffsets = new HashMap<>();
    for (resultSet : selectOffsetsFromYourDatabase)
      fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
    }
    
    JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent(),
      ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
    );
    
    stream.foreachRDD(rdd -> {
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
      
      Object results = yourCalculation(rdd);
    
      // begin your transaction
    
      // update results
      // update offsets where the end of existing offsets matches the beginning of this batch of offsets
      // assert that offsets were updated correctly
    
      // end your transaction
    });
    

    二.Spark Streaming整合Kafka实战

    2.1 Maven配置

    下面是我整个项目的Maven配置

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>org.example</groupId>
      <artifactId>SparkStudy</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11</scala.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.12</artifactId>
          <version>2.4.0</version>
          <scope>provided</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.12</artifactId>
          <version>2.4.0</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.4.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
          <version>2.4.0</version>
        </dependency>
      </dependencies>
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
              <configuration>
                <archive>
                  <manifest>
                    <addClasspath>true</addClasspath>
                    <mainClass>org.zqs.kafka.Producer</mainClass> <!-- 此处为主入口-->
                  </manifest>
                </archive>
              </configuration>
            </plugin>
            <plugin>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
    
    

    2.2 代码

    生产者代码:

    package org.zqs.kafka;
    
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class Producer {
        public static String topic = "test_20210816_2";//定义主题
    
        public static void main(String[] args) throws Exception {
            Properties p = new Properties();
            p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//kafka地址,多个地址用逗号分割
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
    
            try {
                while (true) {
                    String msg = "Hello," + new Random().nextInt(100);
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                    kafkaProducer.send(record);
                    System.out.println("消息发送成功:" + msg);
                    Thread.sleep(500);
                }
            } finally {
                kafkaProducer.close();
            }
    
        }
    }
    

    SparkStreaming消费代码

    package org.zqs.kafka;
    
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.TaskContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.rdd.RDD;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.*;
    
    import scala.Tuple2;
    
    public class SparkStreaming2 {
    
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            SparkConf sparkConf  = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingFromkafka");
            JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf , Durations.seconds(1));
    
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//多个可用ip可用","隔开
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "sparkStreaming");
            Collection<String> topics = Arrays.asList("test_20210816_2");//配置topic,可以是数组
    
            JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream =KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topics, kafkaParams));
    
            JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                    return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
                }
            });
    
            javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String,String>>() {
                @Override
                public void call(JavaPairRDD<String, String> javaPairRDD) throws Exception {
                    // TODO Auto-generated method stub
                    javaPairRDD.foreach(new VoidFunction<Tuple2<String,String>>() {
                        @Override
                        public void call(Tuple2<String, String> tuple2)
                                throws Exception {
                            // TODO Auto-generated method stub
                            System.out.println(tuple2._2);
                        }
                    });
                }
            });
    
            /*JavaInputDStream<ConsumerRecord<String, String>> stream =
                    KafkaUtils.createDirectStream(
                            streamingContext,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                    );
    
            stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
    
            stream.foreachRDD(rdd -> {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                rdd.foreachPartition(consumerRecords -> {
                    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                    System.out.println(
                            o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
                });
            });
             */
    
            streamingContext.start();
            streamingContext.awaitTermination();
            streamingContext.close();
        }
    
    }
    

    2.3 测试

    -- 打包
    mvn package
    -- 上传文件到服务器
    
    -- 提交到spark集群
    spark-submit \
      --class org.zqs.kafka.SparkStreaming2 \
      --master local[2] \
      /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
    

    生产者要提前运行

    image.png

    参考:

    1. https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html
    2. https://blog.csdn.net/a1361585/article/details/101954816

    相关文章

      网友评论

          本文标题:Java-Spark系列8-Spark streaming整合K

          本文链接:https://www.haomeiwen.com/subject/jxulbltx.html