Kafka消息发送
1. 构建ProducerRecord对象
该类如下:
public class ProducerRecord<K, V> {
//The topic the record will be appended to
private final String topic;
//The partition to which the record should be sent
private final Integer partition;
//the headers that will be included in the record
private final Headers headers;
//The key that will be included in the record
private final K key;
//The record contents
private final V value;
//The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the timestamp using System.currentTimeMillis().
private final Long timestamp;
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
//Create a record with no key
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
//省略getters和setters
}
这里有5个构造方法,但是最后用的就是其中的一个。实际应用中,构建ProducerRecord对象是非常频繁的操作。
2. 发送消息
发送消息有三种模式,发后即忘(fire-and-forget), 同步(sync),异步(async)
2.1 fire-and-forget(上一篇博客介绍的就是发后即忘)
producer.send(record);
特点: 效率高,可靠性差
2.2 同步
代码如下:
package com.ghq.kafka.server;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 消息的发送
*/
public class ProducerSendMessage {
public static final String brokerList = "192.168.52.135:9092";
public static final String topic = "topic-demo";
public static Properties initProperties(){
Properties prop = new Properties();
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
/**
*
* 配置重试次数,这里重试10次,重试10次之后如果消息还是发送不成功,那么还是会抛出异常
* 那些类可以重试呢?
* org.apache.kafka.common.errors.RetriableException 及其子类
*
*/
prop.put(ProducerConfig.RETRIES_CONFIG,10);
return prop;
}
public static void sync() {
Properties prop = initProperties();
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");
//3. 发送消息
while (true){
/**
* send方法本身就是异步的
*/
Future<RecordMetadata> future = producer.send(record);
try {
/**
* get方法是阻塞的
* 这里返回 RecordMetadata,包含了发送消息的元数据信息
*/
RecordMetadata metadata = future.get();
System.out.println("topic:"+metadata.topic());
System.out.println("partition:"+metadata.partition());
System.out.println("offset:"+metadata.offset());
System.out.println("hasTimestamp:"+metadata.hasTimestamp());
System.out.println("-----------------------------------------");
Thread.sleep(1000);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//4. 关闭资源
//producer.close();
}
}
特点:性能差,可靠性高
注:什么异常可以重试?RetriableException
![](https://img.haomeiwen.com/i13903642/037952bdc613e1fa.jpg)
2.3 异步
public static void async() {
Properties prop = initProperties();
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");
Future<RecordMetadata> future = producer.send(record, new Callback() {
/**
* metadata 和 exception 互斥
* 消息发送成功:metadata != null exception == null
* 消息发送失败:metadata == null exception != null
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送失败:"+metadata);
}else {
System.out.println("消息发送成功:"+metadata);
}
}
});
//这里采用lambda表达式
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("消息2发送失败:"+metadata);
}else {
System.out.println("消息2发送成功:"+metadata);
}
});
producer.close();
}
输出结果如下:
消息1发送成功:topic-demo-0@22
消息2发送成功:topic-demo-2@24
特点:性能 :同步 < 异步 < 发后即忘,可靠性:同步 > 异步 > 发后即忘
结束。
网友评论