美文网首页
Kafka Consumer各种提交方式

Kafka Consumer各种提交方式

作者: 有梦想的人不睡觉_4741 | 来源:发表于2018-08-14 10:37 被阅读0次

    普通的API

    publicstaticvoidCommonDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","true");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};              KafkaConsumer consumer =newKafkaConsumer<>(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);try{                Thread.sleep(200);            }catch(InterruptedException e) {                e.printStackTrace();            }for(ConsumerRecord record : records) {                System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s"+System.lineSeparator(),record.topic(),                record.partition(),record.offset(), record.key(), record.value());            }        }    }

    以上是一个非常常见的简单消费者实例,但是,这样,真的,没问题吗?

    以上会存在以下的问题:

    //自动提交,会有问题

    1.默认会5秒提交一次offset,但是中间停止的话会造成重复消费

    2.新添加进消费者组的时候,会再均衡,默认从上次消费提交的地方开始,消息重复

    3.自动提交,虽然提交了偏移量,但并不知道,哪些消息被处理了,是否处理成功,偏移量是否提交成功

    针对以上的内容,做出修改

    同步提交

    publicstaticvoidSyncDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer<>(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                        record.partition(),record.offset(),record.key(),record.value());            }try{                                consumer.commitSync();            }catch(Exception e){                e.printStackTrace();            }        }    }

    以上是同步提交,但是,会存在一些问题,同步提交,会阻塞,直到有返回结果,性能会差一些。

    异步提交

    publicstaticvoidAsyncDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer<>(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                        record.partition(),record.offset(),record.key(),record.value());            }try{//consumer.commitAsync();//发送提交请求,提交失败就纪录下来consumer.commitAsync(newOffsetCommitCallback() {@OverridepublicvoidonComplete(Map map, Exception e){if(e !=null){                          e.printStackTrace();                        }                    }                });            }catch(Exception e){                e.printStackTrace();            }        }    }

    异步提交的特性:与同步提交不同的是,遇到错误,commitSync会一直重试,但是commitAsync不会,原因,很简单,如果异步提交还重试,会存在一个问题,a提交2000的偏移量,网络问题,一直重试,但下一个3000的提交成功,这时候,2000的ok了,就会造成消息重复。

    异步提交和同步提交组合的方式

    publicstaticvoidSyncAndAsyncDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer<>(properties);        consumer.subscribe(Arrays.asList("test"));try{while(true) {                ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                    System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                            record.partition(),record.offset(),record.key(),record.value());                }                consumer.commitAsync();            }        }catch(Exception e){            e.printStackTrace();        }finally{try{                consumer.commitSync();            }catch(Exception e){                consumer.close();            }        }    }

    同步提交和异步提交使用组合的方式进行提交,但,这还是会存在一些问题。 因为提交都是批量提交的,但是有可能在批量处理没完成,偏移量没完成的时候,出错了

    自指定提交

    publicstaticvoidPersonalDemo(){        Map currentOffsets =newHashMap<>();intcount =0;finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer<>(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                        record.partition(),record.offset(),record.key(),record.value());                currentOffsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1,"no meta"));if(count %1000==0){                    consumer.commitAsync();                }                count++;            }        }    }

    指定每1000条提交一次offset。

    再均衡监听器

    publicstaticvoidRebalanceListenDemo(){        Map currentOffsets =newHashMap<>();finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer<>(properties);classHandleRebalaceimplementsConsumerRebalanceListener{@OverridepublicvoidonPartitionsRevoked(Collection collection){            }@OverridepublicvoidonPartitionsAssigned(Collection collection){                System.out.println("partition rebalance offset is "+currentOffsets);                consumer.commitSync(currentOffsets);            }        }try{            consumer.subscribe(Arrays.asList("test"),newHandleRebalace());while(true) {                ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                    System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                            record.partition(),record.offset(),record.key(),record.value());                    currentOffsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1,"no meta"));                }                consumer.commitAsync(currentOffsets,null);            }        }catch(Exception e){            e.printStackTrace();        }finally{try{                consumer.commitSync(currentOffsets);            }catch(Exception e){                e.printStackTrace();            }finally{                consumer.close();            }        }    }

    相关文章

      网友评论

          本文标题:Kafka Consumer各种提交方式

          本文链接:https://www.haomeiwen.com/subject/caumbftx.html