美文网首页
spark streaming+kafka ran out of

spark streaming+kafka ran out of

作者: 程序媛啊 | 来源:发表于2018-03-17 23:00 被阅读0次
Caused by: java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 699792 for topic code-commit partition 7 start 699542. This should not happen, and indicates that messages may have been lost
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    ... 3 more

原因:
1、是由于要消费的kafka的offset(此处是699542~699792)的消息的大小超过了默认设置(1024*1024=1M),应该要将拉取消息的大小的配置扩大。
2、在消费端配置,此处在spark streaming里配置拉取kafka消息的参数,kafkaParameters.put("fetch.message.max.bytes","52428800");注意里面的值不能超过int类型,消费端是spark streaming

kafkaParameters.put("metadata.broker.list","node1:9092");
        kafkaParameters.put("fetch.message.max.bytes","52428800");
        String topic = "testTopic";
        Set<String> topics = new HashSet<>();
        topics.add(topic);
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicAndPartition("testTopic", 0),20680L);
JavaInputDStream<String> message = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                String.class,
                kafkaParameters,
                fromOffsets,
                new Function<MessageAndMetadata<String, String>, String>() {
                    public String call(MessageAndMetadata<String, String> v1) throws Exception {
                        System.out.println(v1.message());
                        return v1.message();
                    }
                }
        );

补充:
--conf spark.streaming.kafka.maxRatePerPartition
这个配置是设置spark streaming每个batch拉取kafka的数据量的,如果kafka的topic的partition是8个,设置的时间间隔是10s,这个时候--conf spark.streaming.kafka.maxRatePerPartition=5,每个batch拉取的数据量就是8x10x5=400条数据。如果设置的时间间隔是60s,每个batch拉取的数据量就是8x60x5=400条数据2400条。

相关文章

网友评论

      本文标题:spark streaming+kafka ran out of

      本文链接:https://www.haomeiwen.com/subject/sresqftx.html