配置
server.properties是配置文件
具体文档可以查看https://kafka.apache.org/24/documentation.html
数据清空策略
log.flush.interval.messages分区消息达到设置数后清空到磁盘上
log.flush.interval.ms间隔时间数据清空到磁盘上
数据保存策略,达到条件后就会删除
log.retention.hours数据保存时间
log.retention.bytes数据保存大小
log.retention.check.interval.ms数据保存条件检查间隔时间
生产者数据通讯策略
acks为1,得到leader收到的消息后,发送下一条
acks为all,得到所有节点收到的消息后,发送下一条
acks为0,不需要回复,直接发送
acks为all,可以保存数据不丢
同一个group_id消费者数量如果大于分区数,则多余消费者无法得到消息。少于则一个消费者会消费多个分区的数据。
数据Broker新增后,负载均衡开启和检查时间
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds
手动执行负载均衡命令
bin/kafka-leader-election.sh --bootstrap-server broker_ip:port --election-type preferred --all-topic-partitions
JavaAPI
生产者
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "broker_ip:port");
//指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
//指定topic
String topic = "kafka_topic";
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//向topic中生产数据
producer.send(new ProducerRecord<String, String>(topic, "hello kafka"));
//关闭链接
producer.close();
消费者
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "broker_ip:port");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "test");
//latest表示找不到offset或offset对应的数据就消费最新的,earliest消费最早的,none找不到之前offset抛出异常
prop.put("auto.offset.reset","latest");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("kafka_topic");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
网友评论