美文网首页
kafka消费者

kafka消费者

作者: 七海的游风 | 来源:发表于2017-05-22 23:59 被阅读429次

    1.kafka消费组基本概念

    kafka消费topic是以group为单位来的,一个group消费一个topic。一个group能容纳多个consumer。consumer消费是以分区(partition)来的,一个consumer可以消费一个或多个partition,一个partition只能被一个consumer消费。(如果一个consumer group中的consumer个数多于topic中的partition的个数,多出来的consumer会闲置(idle),所以如果为了增加消费者能力,只简单增加消费者数量不一定会有用).

    消费与分区对应关系

    消费者数量小于partition的数量
    消费者数量小于partition的数量
    消费者数量小于partition的数量

    2. consumer group的分区再平衡

    每个consumer负责自己对应的分区,但是当group中有consumer退出或者新加入consumer,再或者topic中新增partition,group中的消费者负责的partition都得重新计算,Rebalance 期间consumer不能再消费消息,做rebalance的时候是会影响整个consumer group。

    consumer获知自己消费的分区以及group内其他成员信息都是通过向一个叫做Group Coordinator的broker发送心跳来的,不同的group的broker可能不同。只要consumer再给Coordinator发送心跳,就被认为是正常的。触发心跳是通过consumer客户端轮询处理消息来的。如果consumer长时间没有心跳group coordinator就会认为consumer已经挂了,触发rebalance,新版本的java api(kafka_2.11的0.10.2.0已经支持了)支持显示的关闭客户端,这样可以避免有group coordinator因为超时来触发rebalance有此导致消息积压。

      分区分配流程:
      1.第一个加入group的consumer是consumer的leader(这个consumer奔溃之后会怎么样暂时不清楚)
      2.新加入的consumer向group coordinator发送加入请求
      3.leader从group coordinator接收消费者列表,然后给每个consumer分配分区
      4.leader将重新分配的信息发送给group coordinator,group coordinator再将信息发送给所有的consumer
    

    3.启动一个consumer

    使用java api只需要配置 bootstrap.servers, key.deserializer, value.deserializer三个配置就可以。一般还要带上group.id,指定所属的消费组。

      Properties properties = new Properties();
            properties.put("bootstrap.servers", "127.0.0.1:9092");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("group.id", "testTopicGroup1");
            new KafkaConsumer<String, String>(properties);
    

    4.订阅topic

    consumer.subscribe(Collections.singletonList("testTopic"));
    可以指定多个topic,可以使用正则表达式:
    consumer.subscribe("test.*");
    demo:

     private volatile boolean shutdown = false;
        public void poll(){
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "120.27.8.221:9092");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("group.id", "testGroup");
            KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
    
            //关闭轮询
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
                @Override
                public void run() {
                    shutdown = true;
                }
            }));
    
            try{
                while (!shutdown){
                    //开始轮询消息,poll会找到group coordinator,加入consumer group,确认消费的分区,获取消息
                    //poll会获取本地最大的offset之后的消息,而不是commit到kafka中的offset
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for(ConsumerRecord record:records){
                        System.out.println("topic = " + record.topic()
                          + ", partition = " + record.partition()
                          + ", offset = " + record.offset()
                          + ", customer = " + record.key()
                          + ", country = " + record.value());
                    }
                }
            }finally {
                //及时关闭消费者
                consumer.close();
            }
    
        }
    

    5.commit offset

    不管什么时候调用poll方法都会获取到还未被消费过的消息,这个实现通过消息的offset来实现的,每个分区的offset的管理是通过consumer自己向一个特殊的topic(__consumer_offsets)提交消息来实现的.

    1.自动提交

    开启自动提交之后,在每次调用poll获取消息的时候会检查时间查看是否需要提交offset,如果已经到时间之后会提交offset,自动提交的好处是方便,劣势是不能灵活控制,如果间隔期间consumer奔溃,已经处理且未提交的消息会被处理两遍。
    自动提交配置:
    enable.auto.commit=true ##开启自动提交,默认5s提交一次
    auto.commit.interval.ms=1000 ##设置自动提交时间间隔

    相关文章

      网友评论

          本文标题:kafka消费者

          本文链接:https://www.haomeiwen.com/subject/uotwxxtx.html