美文网首页
Kafka中API简单调用

Kafka中API简单调用

作者: CodeYang | 来源:发表于2021-12-29 09:20 被阅读0次

一、pom引入依赖

<!-- Kafka 依赖 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

二、创建拦截器

package com.yang.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * Kafka拦截器
 */
public class MyInterceptor implements ProducerInterceptor<String,String> {

    //统计成功、失败发送的消息数
    static int success = 0 ;
    static int error = 0;

    /**
     * 拦截处理发送的消息等
     * @param producerRecord
     * @return
     */
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        String oldValue = (String)producerRecord.value();
        String newValue = System.currentTimeMillis() + "_" + oldValue;
        ProducerRecord record = new ProducerRecord(producerRecord.topic(),newValue);
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if(e== null){
            success++;
        }else{
            error++;
        }
    }

    @Override
    public void close() {
        System.out.println("成功发送消息数:"+success);
        System.out.println("失败发送消息数:"+error);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

二、创建生产者

/**
     * 简单发送消息
     */
public static void producer() throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.8.8.101:9092");
    //应答机制
    properties.put(ProducerConfig.ACKS_CONFIG, "1");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG, "1");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    //RecordAccumulator 缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

    //key  value 序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    //配置拦截器,消息新增时间戳,及统计成功、失败发送数
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.yang.kafka.interceptor.MyInterceptor");

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    for (int i = 0; i < 100; i++){
        //异步操作
        //producer.send(new ProducerRecord<String, String>("t01", "Hello Kafka " + i));

        //异步操作  调用get方法会等待该线程执行完成继续操作
        //producer.send(new ProducerRecord<String, String>("t01", "Hello Kafka " + i)).get();

        //回调函数
        producer.send(new ProducerRecord<String, String>("t01", "Hello Kafka " + i ),new Callback(){
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if(e == null){
                    System.out.println("success:offset:"+recordMetadata.offset());
                }else{
                    e.printStackTrace();
                }
            }
        });

    }
    producer.close();
}

三、创建消费者

public static void consumer() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.8.8.101:9092");

    //设置消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    //key  value 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    //设置拿取大小
    properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 100 * 1024 * 1024);
    //设置自动提交offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    //设置自动提交延时
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    //订阅消息主题
    consumer.subscribe(Arrays.asList("t01"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(10);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.topic() + "====" + record.partition() + "====" + record.value());
        }
    }
}

四、测试

正常生产、正常消费、正常拦截

生产者截图 消费者截图

相关文章

网友评论

      本文标题:Kafka中API简单调用

      本文链接:https://www.haomeiwen.com/subject/ofhvqrtx.html