美文网首页
kafka拦截器实现队列插队效果

kafka拦截器实现队列插队效果

作者: ajack12138 | 来源:发表于2020-10-22 10:21 被阅读0次

前言

突然出现一个任务需要对kafka处理的数据进行插队操作(内心小崩溃。。。),研究了一下,还是可以使用拦截器进行实现这样的效果的。

拦截器(Interceptor)

是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。

ProducerInterceptor

先看代码


@Slf4j
@Service
public class MyProducerInterceptor implements ProducerInterceptor {

    //在发送broke之前的一个操作,可以对数据进行加工处理或者进行topic pationtion重新指向     
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return null;
    }
    /**
    * KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法
    * 
    *
    **/
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    //close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

onSend

就是在发送之前对数据的处理

onAcknowledgement

接收kafka服务端接受到消息响应的处理

    服务端应答的模式可以通过acks进行设置: 1 0 -1
1 : 在生产者发送消息之后,从节点保存完数据,就会进行响应,如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。

0 : acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

-1: acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况

close

方法主要用于在关闭拦截器时执行一些资源的清理工作。

configure

可以获取到生产者的配置

ConsumerInterceptor

    AtomicInteger atomicInteger=new AtomicInteger(0);
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        
        return new ConsumerRecords( );
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((k,v)->{
            System.out.println(k+"---"+v);
        });
    }

    private String getJsonTrackingMessage(ConsumerRecord<String, String> record) {
        return record.value();
    }

    //拦截器关闭做一些操作
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

onConsume

在consumer poll 返回之前的一个操作,一般我们如果没有设置(MAX_POLL_RECORDS_CONFIG,这个配置在0.9之后才有用) 那么默认poll返回的数据是500条,注意这个500也许来自多个分区,因为一个消费者可能被分配到多个分区。我们可能对poll到的数据进行重新修改或者过滤,然后返回一个新的ConsumerRecords,注意poll之后的commit是对服务端提交消费数据的偏移量,如果修改或者新增数据,需要注意如果修改了offset数据是否会造成重复消费的问题

onCommit

在commit成功之后,这个方法我们可以获取到消费数据的具体偏移量

onClose

和之前的生产者一样作用

实现方案

实现一个消费者拦截器,并且重新构造返回的消费数据,如果是新加入的消费数据,就不进行消费,如下面onConsume测试代码

        int mark = atomicInteger.incrementAndGet();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
                = new HashMap<>();
        ConsumerRecord consumerRecord=
                new ConsumerRecord("xpp_test",0,0,null,1000);
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> tpRecords =
                    records.records(tp);
            List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
            newTpRecords.add(consumerRecord);
            newTpRecords.addAll(tpRecords);
            if (!newTpRecords.isEmpty()) {
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);

相关文章

  • kafka拦截器实现队列插队效果

    前言 突然出现一个任务需要对kafka处理的数据进行插队操作(内心小崩溃。。。),研究了一下,还是可以使用拦截器进...

  • springboot项目架构(4)activemq、rabbit

    消息队列实现 支持的消息队列 ActiveMq RabbitMq RocketMq Kafka 各个队列实现队列与...

  • Kafka_核心

    kafka集群 Kafka的设计都是为了实现kafak消息队列消费数据的语义Kafka消息队列中数据消费的三种语义...

  • Kafka源码之客户端内存缓冲池

    Kafka是一个高吞吐的消息队列,为了实现高吞吐,kafka在实现方面用到了以下技术: Zero Copy机制,内...

  • 尚硅谷大数据技术之Kafka

    第5章 Kafka producer拦截器(interceptor) 5****.****1 拦截器原理 Prod...

  • Kafka 设计详解之队列

    前言 在上文中我们介绍了 Kafka 的网络通信,本文打算详细分析 Kafka 的核心 — 队列的设计和实现,来对...

  • kafka的架构和基本使用

    kafka是分布式高吞吐的基于发布订阅模式的消息队列,kafka利用磁盘顺序读写实现了高吞吐。 系统架构 topi...

  • 笔记

    SpringBoot中利用AOP实现拦截器效果: https://blog.csdn.net/xxkalychen...

  • OKHTTP

    OkHttp 1.Okhttp 基本实现原理 OkHttp 主要是通过 5 个[拦截器]和 3 个双端队列(2 个...

  • Kafka学习笔记05()

    kafka flume事务kafka监控ISR 1、kafka是什么kafka 是基于发布/订阅模式的消息队列,主...

网友评论

      本文标题:kafka拦截器实现队列插队效果

      本文链接:https://www.haomeiwen.com/subject/irjumktx.html