原生方式
无论是生产者还是消费者,引入的依赖都是kafka-clients,maven坐标如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
生产者
kafka生产者对象就是KafkaProducer,构造方式如下:
Properties props = new Properties();
// kafka集群地址
props.put("bootstrap.servers", "10.0.55.229:9092");
// kafka消息key的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kafka消息value的序列化方式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
消息
-创建生产者
KafkaProducer构造好后,需要构造待发送的消息。kafka消息对象是ProducerRecord,根据源码可知,构造方式有多种:
public class ProducerRecord<K, V> {
/**
* 所有构造方法最后都是调用这个构造方法, 所以弄明白这个构造方法所有参数含义就可以了
* Creates a record with a specified timestamp to be sent to a specified topic and partition
* @param topic - The topic the record will be appended to, topic名称
* @param partition - The partition to which the record should be sent, 消息发送的目标分区名称, 如果不指定, kafka会根据Partitioner计算目标分区
* @param timestamp - The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
* the timestamp using System.currentTimeMillis(). 消息发送的指定时间戳, 默认为当前时间
* @param key - The key that will be included in the record, 消息的key, kafka根据这个key计算分区
* @param value - The record contents 消息的内容
* @param headers - the headers that will be included in the record
*/
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
// topic是构造ProducerRecord的必传参数
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
// 发送的时间戳不能为负数
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
// 分区值不能为负数
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
... ...
}
- 创建消息
下面构造一个最常用的ProducerRecord,只指定topic和value,由kafka去决定分区:
// ProducerRecord就是发送的信息对象, 包括: topic名称, key(可选), value(发送的内容)
// key的用途主要是:消息的附加信息,用来决定消息被写到哪个分区,拥有相同key的消息会被写到同一个分区
ProducerRecord<String, String> record = new ProducerRecord<>("ORDER-DETAIL",
JSON.toJSONString(new Order(201806260001L, new Date(), 98000, "desc", "165120001")));
消费者
- 创建消费者
kafka消费者者对象就是KafkaConsumer,构造方式如下:
Properties props = new Properties();
// kafka集群地址
props.put("bootstrap.servers", "10.0.55.229:9092");
// ConsumerGroup即消费者组名称
props.put("group.id", "afei");
// kafka消息key的反序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// kafka消息value的序列化方式
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
- 订阅并消费
// 订阅的topic名称
kafkaConsumer.subscribe(Lists.newArrayList("ORDER-DETAIL"));
try {
while (true) {
// 消费者必须持续从kafka进行轮询, 否则会被认为死亡, 从而导致它处理的分区被交给同一ConsumerGroup的其他消费者
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
// 为了防止消费者被认为死亡, 需要尽可能确保处理消息工作尽快完成
for (ConsumerRecord<String, String> record : records) {
System.out.println("message content: "+GSON.toJson(record));
System.out.println("message value : "+record.value());
}
// 每次消费完后异步提交
kafkaConsumer.commitAsync();
}
}finally {
// 消费者关闭之前调用同步提交
kafkaConsumer.commitSync();
kafkaConsumer.close();
}
集成spring方式
现在的项目一般标配了spring,通过spring集成kafka能够大大的方便业务开发。集成方式也比较简单,只需增加如下maven坐标:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>3.0.3.RELEASE</version>
</dependency>
生产者
- 定义生产者
spring集成kafka的生产者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="retries" value="${kafka.reties}" />
<entry key="retry.backoff.ms" value="${kafka.retry.backoff.ms}" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg ref="producerProperties"/>
</bean>
<!-- 创建KafkaTemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="defaultTopic" />
</bean>
</beans>
- 发送消息
发送消息进行如下封装,封装后如果要发送kafka消息,只需一行代码即可,例如kafkaProducer.send(topicName, obj);
(obj就是要发送的消息对象):
/**
* @author afei
* @version 1.0.0
* @since 2018年06月11日
*/
@Component
public class MyKafkaProducer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
// 发送的消息统一通过google-gson序列化
private static final Gson GSON = new Gson();
// KafkaTemplate就是上面xml文件中定义的bean
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public boolean send(String topic, String key, Object msg){
// 将发送的消息序列化为json
String json = toJsonString(msg);
try {
ListenableFuture<SendResult<String, String>> futureResult = kafkaTemplate.send(
topic, key, json);
logger.info("Kafka send json: {}, topicName: {}", json, topic);
SendResult<String, String> result = futureResult.get();
// 这里的输出日志, 不能用fastjson, fastjson默认依赖bean的setter/getter方法,
// 而SendResult中的RecordMetadata的属性并没有setter/getter方法
logger.info("Kafka send result: {}", GSON.toJson(result));
return result!=null;
} catch (Throwable e) {
logger.error("Kafka send failed.", e);
}
return false;
}
public boolean send(String topic, Object msg){
return send(topic, null, msg);
}
private String toJsonString(Object o) {
String value;
if (o instanceof String) {
value = (String) o;
} else {
value = JSON.toJSONString(o);
}
return value;
}
}
消费者
- 定义消费者
spring集成kafka的消费者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="retries" value="${kafka.reties}" />
<entry key="retry.backoff.ms" value="${kafka.retry.backoff.ms}" />
<!-- 是否自动提交 -->
<entry key="enable.auto.commit" value="${kafka.enable.auto.commit}"/>
<!-- 自动提交的间隔时间 -->
<entry key="auto.commit.interval.ms" value="${kafka.auto.commit.interval.ms}"/>
<!-- 重启后是否从最新的offset地址消费 -->
<entry key="auto.offset.reset" value="${kafka.auto.offset.reset}"/>
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg ref="consumerProperties"/>
</bean>
<bean id="containerProperties_openLevel3Account" class="org.springframework.kafka.listener.config.ContainerProperties">
// 消费者消费的topic名称
<constructor-arg value="${kafka.topic.name}"/>
// 以开户为例, 消息由OpenAccountKafkaListener处理
<property name="messageListener" ref="openAccountKafkaListener"/>
// 即表示ConsumerGroup的groupId
<property name="groupId" value="${kafka.group.id}"/>
</bean>
<bean id="messageListenerContainer_openAccount" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties_openAccount"/>
</bean>
</beans>
- 消息处理
由上面的配置可以,${kafka.topic.name}
指定的topic,其消息由OpenAccountKafkaListener处理,OpenAccountKafkaListener的核心源码如下:
/**
* 钱包开户后送积分
* @author afei
* @version 1.0.0
* @since 2018年06月26日
*/
@Component
public class OpenAccountKafkaListener implements MessageListener<String, String> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(ConsumerRecord<String, String> record) {
logger.debug("msg: {}", record.value());
// 拿到消息后, 反序列化为OpenAccount
OpenAccount mqInput = JSON.parseObject(record.value(), OpenAccount.class);
//TODO 拿到开户信息后, 可以送积分, 送优惠券等
}
}
显而易见,spring集成kafka后,消费端的简单的很多。另外,我们在没用使用spring集成kafka时可以拿到kafak消费者异步提交,也可以同步提交,但是集成spring后,如何实现呢?客官老爷们稍安勿躁,继续往下看。
上面的开发消息监听器OpenAccountKafkaListener实现了接口org.springframework.kafka.listener.MessageListener,有另外一个接口org.springframework.kafka.listener.ConsumerAwareMessageListener实现了这个接口,这个接口源码如下:
@FunctionalInterface
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
// 看这个方法的定义,增加了"default",即我们的业务类如果实现这个这个接口,就不需要实现这个接口,而只需要实现下面的接口即可,下面的接口有Consumer,我们就能主动执行同步提交或者异步提交了
@Override
default void onMessage(ConsumerRecord<K, V> data) {
throw new UnsupportedOperationException("Container should never call this");
}
@Override
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
- 消息处理(第二版)
@Component
public class OpenAccountKafkaListener implements ConsumerAwareMessageListener<String, String> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(ConsumerRecord<String, String> record, Consumer<?, ?> consumer) {
logger.info("Receive msg with consumer: {}", record.value());
OpenAccount mqInput = JSON.parseObject(record.value(), OpenAccount.class);
try {
//TODO 拿到开户信息后, 可以送积分, 送优惠券等
} finally {
// 消息处理完后异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception==null){
// offsets需要用gson序列化输出
logger.info("The offset info of commit async: {}", GsonUtils.format(offsets));
}else{
logger.error("Commit async failed. ", exception);
}
});
}
}
}
深入发送消息
前面已经介绍了如何使用kafka生产者发送消息,以及如何用消费者接收消息,包括原生方式和spring集成方式,接下来我们跟踪源码看看消息在调用KafkaProducer中的send()后发送到kafka broker之前需要经过哪些处理。
- 拦截器
无论是同步调用send(),还是异步调用send()发送消息,最终都是调用下面的方法:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
由这段代码可知,消息发送前第一步就是调用拦截器(如果有的话),拦截器可以对消息进行加工。后面会单独有一篇文章详细的分析拦截器。
接下来调用doSend()方法,源码如下:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
// 得到集群信息和已经消耗的时间
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
// 根据参数max.block.ms和已经消息的时间差,得到剩余时间
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
// 集群信息本地变量化
Cluster cluster = clusterAndWaitTime.cluster;
// 序列化key
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
// 序列化value
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 选择分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
// 发送的消息size不允许超过max.request.size和buffer.memory两个参数的值
ensureValidRecordSize(serializedSize);
// 得到消息发送时间,默认为当前时间,除非构造ProducerRecord时指定了timestamp
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (Exception e) {
this.errors.record();
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
// 如果发送消息时有异常,那么调用所有拦截器上的onAcknowledgement()方法,所以通过拦截器种onAcknowledgement()方法的exception是否为空,判断消息是否发送成功,从而可以统计发送成功率
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
- 获取集群信息
由doSend()方法源码可知,获取集群信息的源码就在waitOnMetadata()中,其源码如下:
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
metadata.add(topic);
// 第一次获取集群信息,初始化的集群信息为bootstrap.servers参数指定的集群信息
Cluster cluster = metadata.fetch();
// 从缓存的主题&分区信息map(Map<String, List<PartitionInfo>>)中获取分区总数
Integer partitionsCount = cluster.partitionCountForTopic(topic);
if (partitionsCount != null && (partition == null || partition < partitionsCount))
// 如果已经缓存了,那么直接返回,且因为没有花时间在获取集群信息上,所以构造方法的第二个参数为0
return new ClusterAndWaitTime(cluster, 0);
// 记下开始时间(为了计算获取集群信息消耗的时间)
long begin = time.milliseconds();
// 初始化剩余时间就是max.block.ms参数指定的时间(即获取集群信息最大允许阻塞时间,官方文档指KafkaProducer.send() and KafkaProducer.partitionsFor()两步的时间差)
long remainingWaitMs = maxWaitMs;
long elapsed;
do {
log.trace("Requesting metadata update for topic {}.", topic);
metadata.add(topic);
// needUpdate置为true,并返回版本
int version = metadata.requestUpdate();
sender.wakeup();
try {
// 等待元数据信息更新,直到当前版本号超过上一次版本号version。另外,这个更新过程不能耗时不允许超过remainingWaitMs
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
// 更新后,再次获取集群信息
cluster = metadata.fetch();
// 计算此次更新过程耗时
elapsed = time.milliseconds() - begin;
// 如果耗时超过了max.block.ms参数指定的时间,那么抛出异常
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
// 根据刚才计算的此次更新消耗的时间,计算剩余时间
remainingWaitMs = maxWaitMs - elapsed;
// 得到这个topic的分区数
partitionsCount = cluster.partitionCountForTopic(topic);
// 如果这个topic分区数获取失败,那么继续获取,直到耗尽max.block.ms指定的时间
} while (partitionsCount == null);
// 如果构造ProducerRecord时指定了分区,且指定的值大于或等于分区数,那么抛出异常(例如,名为"ORDER-DETAIL"的topic,有7个分区,如果构造ProducerRecord时指定了partition的值且为7或者大于7,那么就会抛出这个异常,异常信息为:Invalid partition given with record: 7 is not in the range [0...7).)
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
// 返回集群信息和剩余时间
return new ClusterAndWaitTime(cluster, elapsed);
}
通过这段源码分析可知,当我们构造KafkaProducer时指定的bootstrap.servers的值,不一定要和kafka集群信息完全一致,kafka-client可以通过参数bootstrap.servers指定的broker,然后从broker上获取到整个kafka集群元数据信息。但是即使是这样,参数bootstrap.servers也建议尽量完整。例如整个集群有3个broker,如果bootstrap.servers只指定了1个broker,那么当这个broker宕机后,虽然集群状态可用。但是
- 序列化
即经过拦截器链后另一个非常重要的操作:对key&value的序列化。核心代码是如下两行,对key的序列化,调用的方法是构造KafkaProducer时参数key.serializer指定的serializer,对value的序列化,调用的方法是构造KafkaProducer时参数value.serializer指定的serializer:
keySerializer.serialize(record.topic(), record.headers(), record.key());
valueSerializer.serialize(record.topic(), record.headers(), record.value());
- 分区
接下来就是选择分区,核心代码如下:
int partition = partition(record, serializedKey, serializedValue, cluster);
- 总结
根据上面的分析可知,消息发送经过的几个重要过程按照先后顺序依次是:拦截器,获取元数据,序列化,选择分区。接下来的文章会一一详细分析这些必要重要的过程。
网友评论