kafka的基础demo
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception{
// 配置Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // acks参数对消息持久化的影响?
props.put("retries", 0); //重试次数
props.put("batch.size", 16384); // 批量发送的数据大小
props.put("linger.ms", 1); // 发送延迟(默认是0,有消息就发)
props.put("buffer.memory", 33554432);
// kafka是把消息先放到本地内存中,很多很多个消息缓存成一个batch,
// 再发送到Broker上去,这样性能才高
// buffer.memory 本质就是约束kafka Producer能够使用的内存缓冲的大下的
// 默认是32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到Kafka
String topic = "test-topic";
String key = "key1";
String value = "value1";
producer.send(new ProducerRecord<>(topic, key, value));
// 关闭Kafka生产者
producer.close();
}
}
acks参数对消息持久化的影响?
- ack : 0 生产者只要发送就是成功
- ack : 1 只要leader接受到消息,并写入成功,不管Follwer同步没同步
- ack all 不仅leader写入成功了,ISR里和leader保持同步的也写入成功了才算完成
设置为all就一定会成功吗 - 如果只有这个节点,leader死机了,也会失败
- 所以只有2个以上的副本配合使用时,这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失
kafka 的 buffer.memory 与batch.size参数的区别
- buffer.memory设置太小,会导致生产者缓存消息的数量太小,发送频繁,导致发送受阻
- batch.size太小,会导致频繁发送,增加网络开销
kakfa入门demo
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class xync {
public static void main(String[] args) {
// 新建对象
Properties properties = new Properties();
properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
properties.put("acks","all");
properties.put("retries",1);
properties.put("batch.size",16384);
properties.put("linger.ms",1);
properties.put("buffer.memory",33554432);
properties.put("enable.idempotence","true");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> kafkaProducer = new KafkaProducer<>(properties);
// 操作集群
for (int i = 0; i <100 ; i++) {
kafkaProducer.send(new ProducerRecord("first",Integer.toString(i),"hello"+i),
new Callback(){
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(recordMetadata != null){
System.out.println("发送成功了,发到了"+recordMetadata.topic()+"第"+
recordMetadata.partition()+"分区第"+recordMetadata.offset()+"消息");
}
}
});
}
// 关闭资源
kafkaProducer.close();
}
}
网友评论