美文网首页
spark streaming+kafka ran out of

spark streaming+kafka ran out of

作者: 程序媛啊 | 来源:发表于2018-03-17 23:00 被阅读0次
    Caused by: java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 699792 for topic code-commit partition 7 start 699542. This should not happen, and indicates that messages may have been lost
        at scala.Predef$.assert(Predef.scala:170)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        ... 3 more
    

    原因:
    1、是由于要消费的kafka的offset(此处是699542~699792)的消息的大小超过了默认设置(1024*1024=1M),应该要将拉取消息的大小的配置扩大。
    2、在消费端配置,此处在spark streaming里配置拉取kafka消息的参数,kafkaParameters.put("fetch.message.max.bytes","52428800");注意里面的值不能超过int类型,消费端是spark streaming

    kafkaParameters.put("metadata.broker.list","node1:9092");
            kafkaParameters.put("fetch.message.max.bytes","52428800");
            String topic = "testTopic";
            Set<String> topics = new HashSet<>();
            topics.add(topic);
    Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
    fromOffsets.put(new TopicAndPartition("testTopic", 0),20680L);
    JavaInputDStream<String> message = KafkaUtils.createDirectStream(
                    jssc,
                    String.class,
                    String.class,
                    StringDecoder.class,
                    StringDecoder.class,
                    String.class,
                    kafkaParameters,
                    fromOffsets,
                    new Function<MessageAndMetadata<String, String>, String>() {
                        public String call(MessageAndMetadata<String, String> v1) throws Exception {
                            System.out.println(v1.message());
                            return v1.message();
                        }
                    }
            );
    

    补充:
    --conf spark.streaming.kafka.maxRatePerPartition
    这个配置是设置spark streaming每个batch拉取kafka的数据量的,如果kafka的topic的partition是8个,设置的时间间隔是10s,这个时候--conf spark.streaming.kafka.maxRatePerPartition=5,每个batch拉取的数据量就是8x10x5=400条数据。如果设置的时间间隔是60s,每个batch拉取的数据量就是8x60x5=400条数据2400条。

    相关文章

      网友评论

          本文标题:spark streaming+kafka ran out of

          本文链接:https://www.haomeiwen.com/subject/sresqftx.html