Caused by: java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 699792 for topic code-commit partition 7 start 699542. This should not happen, and indicates that messages may have been lost
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
... 3 more
原因:
1、是由于要消费的kafka的offset(此处是699542~699792)的消息的大小超过了默认设置(1024*1024=1M),应该要将拉取消息的大小的配置扩大。
2、在消费端配置,此处在spark streaming里配置拉取kafka消息的参数,kafkaParameters.put("fetch.message.max.bytes","52428800");注意里面的值不能超过int类型,消费端是spark streaming
kafkaParameters.put("metadata.broker.list","node1:9092");
kafkaParameters.put("fetch.message.max.bytes","52428800");
String topic = "testTopic";
Set<String> topics = new HashSet<>();
topics.add(topic);
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicAndPartition("testTopic", 0),20680L);
JavaInputDStream<String> message = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParameters,
fromOffsets,
new Function<MessageAndMetadata<String, String>, String>() {
public String call(MessageAndMetadata<String, String> v1) throws Exception {
System.out.println(v1.message());
return v1.message();
}
}
);
补充:
--conf spark.streaming.kafka.maxRatePerPartition
这个配置是设置spark streaming每个batch拉取kafka的数据量的,如果kafka的topic的partition是8个,设置的时间间隔是10s,这个时候--conf spark.streaming.kafka.maxRatePerPartition=5,每个batch拉取的数据量就是8x10x5=400条数据。如果设置的时间间隔是60s,每个batch拉取的数据量就是8x60x5=400条数据2400条。
网友评论