kafka总结

作者: maige | 来源:发表于2018-01-05 00:01 被阅读4次

生产者

核心概念

  1. 每个topic可以划分成多个分区,不同分区的消息是不同的(每个分区有多个副本)
  2. 路由到分区:默认使用的分区策略是DefaultPartitioner,实现:
  • 消息无key,则均衡分布到各个分区
  • 有key,则hash(key)%分区数,这样就能保证同一个key全部写入同一个分区,同时也尽可能均衡分布
  1. 一个分区只能分配给一个消费组中的一个消费者,一个消费者可以消费多个分区
  2. 日志压缩功能,开启kafka的日志压缩功能,后台线程会定期把相同的key合并,只保留最新的value值

生产者代码示例

  • 可分为同步发送消息和异步发送消息
package com.huaan.javabasic.com.huaan.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDemo {
    public static void main(String[] args) {
        boolean isAsync = false;
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("client.id", "DemoProducer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // kafka 核心类
        KafkaProducer producer = new KafkaProducer(props);
        String topic = "test";

        int messageNo = 1;
        while (true)
        {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync)
            {
                producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
                        new DemoCallBack(startTime, messageNo, messageStr)
                        );
            }
            else
            {
                try {
                    producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
                    System.out.println("send message : (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }

            ++messageNo;
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    private static class DemoCallBack implements Callback {
        private final long startTime;
        private final int key;
        private final String message;
        public DemoCallBack(long startTime, int messageNo, String messageStr) {
            this.startTime = startTime;
            this.key = messageNo;
            this.message = messageStr;
        }

        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (recordMetadata != null)
            {
                System.out.println("message(" + key + ", " + message + ") send to patition (" + recordMetadata.partition() + ")"
                + ", offset(" + recordMetadata.offset() + ") in " + elapsedTime + " ms");
            }
            else
            {
                e.printStackTrace();
            }
        }
    }
}

消费者

消费者代码示例

props.put("auto.offset.reset", "earliest");//lastest 确定是否从头开始消费
props.put("group.id", UUID.randomUUID().toString()); 随机生成groupid

package com.huaan.javabasic.com.huaan.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;

public class ConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        //props.put("group.id", "testa");
        props.put("group.id", UUID.randomUUID().toString());
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session,timout.ms", "30000");
        //Equivalent to --from-beginning)
        props.put("auto.offset.reset", "earliest");//lastest

        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 订阅两个主题
        //consumer.subscribe(Arrays.asList("test1", "test2"));
        consumer.subscribe(Arrays.asList("test"));
        try {
            while (true)
            {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                {
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally {
            consumer.close();
        }

    }
}

常用命令记录

启动zk
zkServer.cmd start
启动kafka server
kafka-server-start.bat ....\config\server.properties
查看所有topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
查看指定topic信息
kafka-topics.bat --zookeeper localhost:2181 --describe --topic test
查看topic某分区偏移量最大(小)值(time为-1时表示最大值,time为-2时表示最小值)
kafka-run-class.bat kafka.tools.GetOffsetShell --topic test --time -1 --broker-list localhost:9092 --partitions 0
创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
生产者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

相关文章

网友评论

    本文标题:kafka总结

    本文链接:https://www.haomeiwen.com/subject/pfzdnxtx.html