kafka 生产者
配置类
/**
* kafka配置类
*
* @author olafwang
* @since 2020/9/29 2:45 下午
*/
@Configuration
public class KafkaConfig {
@Bean
public KafkaProducer<String, String> producerRecord() {
Properties properties = new Properties();
// 配置kafka集群地址,不用将全部机器都写上,zk会自动发现全部的kafka broke
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
// 设置发送消息的应答方式
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
// 重试间隔时间
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
// 一批次发送的消息大小 16KB
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
// 一个批次等待时间,10ms
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
// RecordAccumulator 缓冲区大小 32M,如果缓冲区满了会阻塞发送端
properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 配置拦截器, 多个逗号隔开
properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xiaolyuh.interceptor.TraceInterceptor");
Serializer<String> keySerializer = new StringSerializer();
Serializer<String> valueSerializer = new StringSerializer();
return new KafkaProducer<>(properties, keySerializer, valueSerializer);
}
}
发送端
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootStudentKafkaApplicationTests {
@Autowired
private KafkaProducer<String, String> kafkaProducer;
@Test
public void testSyncKafkaSend() throws Exception {
// 同步发送测试
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test_cluster_topic", "key-" + i, "value-" + i);
// 同步发送,这里我们还可以指定发送到那个分区,还可以添加header
kafkaProducer.send(producerRecord, new KafkaCallback<>(producerRecord)).get(50, TimeUnit.MINUTES);
}
System.out.println("ThreadName::" + Thread.currentThread().getName());
}
@Test
public void testAsyncKafkaSend() {
// 异步发送测试
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test_cluster_topic2", "key-" + i, "value-" + i);
// 异步发送,这里我们还可以指定发送到那个分区,还可以添加header
kafkaProducer.send(producerRecord, new KafkaCallback<>(producerRecord));
}
System.out.println("ThreadName::" + Thread.currentThread().getName());
// 记得刷新,否则消息有可能没有发出去
kafkaProducer.flush();
}
}
/**
* 异步回调函数,该方法会在 Producer 收到 ack 时调用,当Exception不为空表示发送消息失败。
*
* @param <K>
* @param <V>
*/
class KafkaCallback<K, V> implements Callback {
private final ProducerRecord<K, V> producerRecord;
public KafkaCallback(ProducerRecord<K, V> producerRecord) {
this.producerRecord = producerRecord;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("ThreadName::" + Thread.currentThread().getName());
if (Objects.isNull(exception)) {
System.out.println(metadata.partition() + "-" + metadata.offset() + ":::" + producerRecord.key() + "=" + producerRecord.value());
}
if (Objects.nonNull(exception)) {
// TODO 告警,消息落库从发
}
}
}
消费者
Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。
@Component
public class KafkaConsumerDemo {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1));
@PostConstruct
public void startConsumer() {
executor.submit(() -> {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 请求超时时间
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);
consumer.subscribe(Arrays.asList("test_cluster_topic"));
// KafkaConsumer的assignment()方法来判定是否分配到了相应的分区,如果为空表示没有分配到分区
Set<TopicPartition> assignment = consumer.assignment();
while (assignment.isEmpty()) {
// 阻塞1秒
consumer.poll(1000);
assignment = consumer.assignment();
}
// KafkaConsumer 分配到了分区,开始消费
while (true) {
// 拉取记录,如果没有记录则柱塞1000ms。
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
String traceId = new String(record.headers().lastHeader("traceId").value());
System.out.printf("traceId = %s, offset = %d, key = %s, value = %s%n", traceId, record.offset(), record.key(), record.value());
}
// 异步确认提交
consumer.commitAsync((offsets, exception) -> {
if (Objects.isNull(exception)) {
// TODO 告警、落盘、重试
}
});
}
});
}
}
拦截器
/**
* @author olafwang
* 链路ID
*/
public class TraceInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
/**
* 最先调用,读取配置信息,只调用一次
*/
@Override
public void configure(Map<String, ?> configs) {
System.out.println(JSON.toJSONString(configs));
}
/**
* 它运行在用户主线程中,在消息序列化和计算分区之前调用,这里最好不小修改topic 和分区参数,否则会出一些奇怪的现象。
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
Headers headers = new RecordHeaders();
headers.add("traceId", UUID.randomUUID().toString().getBytes(Charset.forName("UTF8")));
// 修改消息
return new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), headers);
}
/**
* 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程 中失败时调用。
* 并且通常都是在 producer 回调逻辑触发之前调用。
* onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息 发送效率。
*
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (Objects.isNull(exception)) {
// TODO 出错了
}
}
/**
* 关闭 interceptor,主要用于执行一些资源清理工作,只调用一次
*/
@Override
public void close() {
System.out.println("==========close============");
}
}
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-kafka 工程
网友评论