美文网首页
Kafka java api-消费者代码

Kafka java api-消费者代码

作者: 小猪Harry | 来源:发表于2018-10-05 16:03 被阅读0次

    1、消费者代码
    用到消费者,所以也必须先把前面写过的生产者代码也贴一下吧
    生产者代码与自定义partition
    使用maven导包

    <dependencies>
           <dependency>
               <groupId>org.apache.kafka</groupId>
               <artifactId>kafka_2.8.2</artifactId>
               <version>0.8.1</version>
           </dependency>
    </dependencies>
    
    /**
     * 这是一个简单的Kafka producer代码
     * 包含两个功能:
     * 1、数据发送
     * 2、数据按照自定义的partition策略进行发送
     *
     *
     * KafkaSpout的类
     */
    public class KafkaProducerSimple {
        public static void main(String[] args) {
            /**
             * 1、指定当前kafka producer生产的数据的目的地
             *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
             *  bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
             */
    
            String TOPIC = "orderMq";
            /**
             * 2、读取配置文件
             */
            Properties props = new Properties();
            /*
             * key.serializer.class默认为serializer.class  key的序列化使用哪个类
             */
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            /*
             * kafka broker对应的主机,格式为host1:port1,host2:port2
             */
            props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
            /*
             * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
             * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
             * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
             * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
             * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
             * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
             * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
             * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
             */
            props.put("request.required.acks", "1");
            /*
             * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
             * 默认值:kafka.producer.DefaultPartitioner
             * 用来把消息分到各个partition中,默认行为是对key进行hash。
             */
            props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
    //        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
            /**
             * 3、通过配置文件,创建生产者
             */
            Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
            /**
             * 4、通过for循环生产数据
             */
            for (int messageNo = 1; messageNo < 100000; messageNo++) {
                /**
                 * 5、调用producer的send方法发送数据
                 * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
                 */
                producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
            }
        }
    }
    
    public class MyLogPartitioner implements Partitioner {
        private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
    
        public MyLogPartitioner(VerifiableProperties props) {
        }
    
        /**
         *
         * @param obj 传来的key 用它来进行hash分到partition
         * @param numPartitions 几个partition 如果集群中已存在该topic,那么partition数为原本存在数,否则默认是2
         * @return 生产到哪个partition
         */
        public int partition(Object obj, int numPartitions) {
            return Integer.parseInt(obj.toString())%numPartitions;
        }
    
    }
    

    注:orderMq这个topic很早就通过命令行创建好了,指定了partition是3个。
    下面是消费者代码

    public class KafkaConsumerSimple implements Runnable {
        public String title;
        public KafkaStream<byte[], byte[]> stream;
        public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
            this.title = title;
            this.stream = stream;
        }
        public void run() {
            System.out.println("开始运行 " + title);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            /**
             * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
             * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
             * */
            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> data = it.next();
                String topic = data.topic();
                int partition = data.partition();
                long offset = data.offset();
                String msg = new String(data.message());
                System.out.println(String.format(
                        "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                        title, topic, partition, offset, msg));
            }
            System.out.println(String.format("Consumer: [%s] exiting ...", title));
        }
    
        public static void main(String[] args) throws Exception{
            Properties props = new Properties();
            props.put("group.id", "dashujujiagoushi");//消费组组组名,任意取
            props.put("zookeeper.connect", "mini1:2181,mini2:2181,mini3:2181");//zookeeper连接
            props.put("auto.offset.reset", "largest");//最新位置开始消费,earliest从最早位置开始消费
            props.put("auto.commit.interval.ms", "1000");//consumer向zookeeper提交offset的频率
            props.put("partition.assignment.strategy", "roundrobin");//分区分配策略
            ConsumerConfig config = new ConsumerConfig(props);
            String topic1 = "orderMq";
    
            //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
            ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
            //定义一个map
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic1, 3);
            //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
            Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
            //取出 `kafkaTest` 对应的 streams
            List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
            //创建一个容量为4的线程池
            ExecutorService executor = Executors.newFixedThreadPool(3);
            //创建20个consumer threads
            for (int i = 0; i < streams.size(); i++)
                executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));
        }
    }
    

    测试:
    先执行消费者程序,尽管partition目录里面的segment文件是有以前生成的数据,但是不会打印出来而是一直提示(已经标记为消费状态的就不再消费了,默认情况就是这样,可以自己设置从0开始消费)

    15:10:38.228 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 1ms
    15:10:40.230 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 4ms
    

    需要进行生产,所以再执行生产者程序,控制台打印如下:

    ...
    Consumer: [消费者1], Topic: [orderMq], PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
    Consumer: [消费者2], Topic: [orderMq], PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
    Consumer: [消费者1], Topic: [orderMq], PartitionId: [1], Offset: [17858], msg: [appidb145da08-bb61-42e7-b140-9fed576c2faeitcast]
    Consumer: [消费者1], Topic: [orderMq], PartitionId: [1], Offset: [17859], msg: [appid909a90ae-c0fb-42ac-97de-6d7438895e07itcast]
    Consumer: [消费者3], Topic: [orderMq], PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
    Consumer: [消费者3], Topic: [orderMq], PartitionId: [2], Offset: [17714], msg: [appidb93b9355-4713-4e22-823a-756b4fe75bdfitcast]
    Consumer: [消费者3], Topic: [orderMq], PartitionId: [2], Offset: [17715], msg: [appidf82ca658-528a-4f40-a023-8a155c15eaa1itcast]
    ...
    

    精简下如下

    Consumer: [消费者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
    Consumer: [消费者2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
    Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
    

    能看到三个消费者对应消费的partition。
    那么考虑以下问题
    在创建orderMq的时候指定partition是3个,那么如果此时我指定创建5个KafkaStream,那么会怎么消费呢?
    消费者代码修改两次如下

    topicCountMap.put(topic1, 5);
    ExecutorService executor = Executors.newFixedThreadPool(5);
    

    再次同上一样执行,输出结果能看到只有3个消费者,所以指定KafkaStream比partition多是没用的,只会有对应数量的消费者去消费对应的partition上的数据。

    Consumer: [消费者2],  Topic: [orderMq],  PartitionId: [2], Offset: [26420], msg: [appid4b778b51-33c7-42de-83c2-5b85f8f2428aitcast]
    Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [0], Offset: [26423], msg: [appid86045c25-7b3f-4c82-ad2a-3e8e11958b28itcast]
    Consumer: [消费者4],  Topic: [orderMq],  PartitionId: [1], Offset: [26562], msg: [appid213b5a91-a7bf-4a39-b585-456d95748566itcast]
    

    如果指定的KafkaStream只有2呢?不做测试了,结果是其中一个消费者会消费2个partition,另外一个消费1个partition中的数据。

    相关文章

      网友评论

          本文标题:Kafka java api-消费者代码

          本文链接:https://www.haomeiwen.com/subject/scyqaftx.html