生产者
核心概念
- 每个topic可以划分成多个分区,不同分区的消息是不同的(每个分区有多个副本)
- 路由到分区:默认使用的分区策略是DefaultPartitioner,实现:
- 消息无key,则均衡分布到各个分区
- 有key,则hash(key)%分区数,这样就能保证同一个key全部写入同一个分区,同时也尽可能均衡分布
- 一个分区只能分配给一个消费组中的一个消费者,一个消费者可以消费多个分区
- 日志压缩功能,开启kafka的日志压缩功能,后台线程会定期把相同的key合并,只保留最新的value值
生产者代码示例
- 可分为同步发送消息和异步发送消息
package com.huaan.javabasic.com.huaan.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerDemo {
public static void main(String[] args) {
boolean isAsync = false;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "DemoProducer");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kafka 核心类
KafkaProducer producer = new KafkaProducer(props);
String topic = "test";
int messageNo = 1;
while (true)
{
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync)
{
producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
new DemoCallBack(startTime, messageNo, messageStr)
);
}
else
{
try {
producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
System.out.println("send message : (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int messageNo, String messageStr) {
this.startTime = startTime;
this.key = messageNo;
this.message = messageStr;
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (recordMetadata != null)
{
System.out.println("message(" + key + ", " + message + ") send to patition (" + recordMetadata.partition() + ")"
+ ", offset(" + recordMetadata.offset() + ") in " + elapsedTime + " ms");
}
else
{
e.printStackTrace();
}
}
}
}
消费者
消费者代码示例
props.put("auto.offset.reset", "earliest");//lastest 确定是否从头开始消费
props.put("group.id", UUID.randomUUID().toString()); 随机生成groupid
package com.huaan.javabasic.com.huaan.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//props.put("group.id", "testa");
props.put("group.id", UUID.randomUUID().toString());
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session,timout.ms", "30000");
//Equivalent to --from-beginning)
props.put("auto.offset.reset", "earliest");//lastest
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 订阅两个主题
//consumer.subscribe(Arrays.asList("test1", "test2"));
consumer.subscribe(Arrays.asList("test"));
try {
while (true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
finally {
consumer.close();
}
}
}
常用命令记录
启动zk
zkServer.cmd start
启动kafka server
kafka-server-start.bat ....\config\server.properties
查看所有topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
查看指定topic信息
kafka-topics.bat --zookeeper localhost:2181 --describe --topic test
查看topic某分区偏移量最大(小)值(time为-1时表示最大值,time为-2时表示最小值)
kafka-run-class.bat kafka.tools.GetOffsetShell --topic test --time -1 --broker-list localhost:9092 --partitions 0
创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
生产者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
网友评论