美文网首页
kafka06 简单的consumer开发

kafka06 简单的consumer开发

作者: 6c0fe9142f09 | 来源:发表于2018-09-11 18:08 被阅读12次

    简单的consumer开发

    简单的consumer
    • 配置信息
            Properties props = new Properties();
            //设置连接kafka集群的地址
            props.put("bootstrap.servers", "132.232.14.247:9092");
            props.put("group.id","group2");
            /**
             * 设置反序列化器,注意:consumer程序中key/value的反序列化器类型必须与producer程序中key/value的序列化器类型一致
             * 也就是说,如果producer程序中的key/value的序列化器类型为String(StringSerializer)类型
             * 那么consumer程序中
             */
            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
    • 创建KafkaConsumer对象
    KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
    
    • 订阅topic:mySecondTopic,即读取对应topic中的消息
    consumer.subscribe(Arrays.asList("mySecondTopic"));
    
    • 在循环中调用consumer的poll方法接收消息
            try {
                while (true){
                    /**
                     * poll()方法会基于当前消费位置offset读取消息记录。
                     * 当group首次创建时,消费位置由reset策略决定(设置参数auto.offset.reset值:earliest或latest)
                     * 如果group中的consumer开始commit提交offset,那么当前消费位置以最后一次提交的offset为准
                     * poll(100):100是获取消息的等待超时时间,单位毫秒,不能为负数
                     * 如果有消息记录可以读取,poll方法会理科返回。如果没有消息可以获取,poll方法会等待,直至超时
                     */
                    ConsumerRecords<String,byte[]> records = consumer.poll(2000);
                    for (ConsumerRecord<String,byte[]> record : records){
                        System.out.println("MyFirstConsumer's consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                /**
                 * 关闭consumer,在用完consumer对象时,要关闭它。
                 * 这么做不仅会清理在用socket连接
                 * 而且会通知group:我离开了!这样group就可以将其消费的partition分配给其他consumer
                 */
                consumer.close();
            }
    
    consumer知识点
    • consumer group消费者组
      在props中,我们配置了一个group.id代表一个消费者组,消费者组具有以下特性
    props.put("group.id","group2");
    
    - 当topic中的partition为4时候,消费者组中有1个consumer,那么这个comsumer会消费所有的partition
    - 当topic中的partition为4时候,消费者组中有2个consumer,那么每个comsumer会消费2个partition
    - 当topic中的partition为4时候,消费者组中有5个consumer,那么四个comsumer会对应4个partition,有一个consumer是空闲的
    ***********
    - 当topic中的partition为4时候,现在有两个消费者组,每个消费者组有4个comsumer。
    - 那么两个消费者组是相互独立的,也就是说8个comsumer都在工作
    
    • 查看mySecondTopic中的partition数量4个
    [root@tencent2 bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mySecondTopic
    [2018-09-11 18:05:15,638] INFO Accepted socket connection from /127.0.0.1:35734 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    [2018-09-11 18:05:15,650] INFO Client attempting to establish new session at /127.0.0.1:35734 (org.apache.zookeeper.server.ZooKeeperServer)
    [2018-09-11 18:05:15,672] INFO Established session 0x165c7a392630003 with negotiated timeout 30000 for client /127.0.0.1:35734 (org.apache.zookeeper.server.ZooKeeperServer)
    Topic:mySecondTopic PartitionCount:4    ReplicationFactor:2 Configs:
        Topic: mySecondTopic    Partition: 0    Leader: 2   Replicas: 2,1   Isr: 1,2
        Topic: mySecondTopic    Partition: 1    Leader: 0   Replicas: 0,2   Isr: 0,2
        Topic: mySecondTopic    Partition: 2    Leader: 1   Replicas: 1,0   Isr: 0,1
        Topic: mySecondTopic    Partition: 3    Leader: 2   Replicas: 2,1   Isr: 1,2
    
    • group.id = group1开启5个comsumer,并且发送消息,可以观察到有一个comsumer是没有进行工作的
    • group.id = group1开启4个comsumer,group.id = group2开启4个comsumer.可以观察到8个comsumer都在工作

    相关文章

      网友评论

          本文标题:kafka06 简单的consumer开发

          本文链接:https://www.haomeiwen.com/subject/chougftx.html