Kafka 生产者

作者: 一生逍遥一生 | 来源:发表于2019-11-06 09:55 被阅读0次

    Kafka的版本为2.0.0、jdk 1.8.0_231、scala 2.11.8。

    生产者客户端开发

    必要的参数配置

    • bootstrap.servers:指定生产者客户端连接Kafka集群所需的broker地址,多个使用地址使用逗号分割。
    • key.serializer和value.serializer:broker端接收的消息必须以字节数组的形式存在。

    在编写代码的过程中,参数的名称,可以使用ProducerConfig中的变量(在java代码中),如果是scala代码,需要自己编写参数。

    KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

    消息的发送

    Kafka生产者发送消息的模式为:发后即忘和异步(Kafka0.8.2之后,全部都是使用异步调用)。

    发后即忘

    只管发送消息,不管消息是否正确到达。会出现消息丢失,这种发送方式,性能最高,可靠性最差。

    异步调用

    Kafka发送消息都是异步,都会发送一个Future<RecordMetadata>对象,如果想要使用同步的方式,可以在后面调用get方法,
    get方法会阻塞Kafka的响应,直到消息发送成功,或者发送异常。RecordMetaData中含有一些元数据:主题、分区号、分区中的偏移量、hash值。

    try{
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata metadata = future.get();
        System.out.println(metadata.topic()+"-"+metadata.partition()+"-"+metadata.offset());
    }catch (ExecutionException | InterruptedException e){ //jdk版本要合适,本人使用的jdk1.8,否则会报错。
        e.printStackTrace();
    }
    

    Kafka中一般发生两种类型的异常:可重试异常和不可重试异常。常见的可重试异常:NetworkException(网络故障)、LeaderNotAvailableException(分区leader副本不可用)、
    UnknownTopicOrPartitionException、NotEnoughReplicasExceotion、NotCoordinatorException等。不可重试异常:RecordTooLargeException等。为了保证
    消息可以顺利达到,可以设置重试次数,参数为retries。

    在调用send方法来发送消息时,可以指定或者不指定调用函数,如果想用阻塞请求,可以调用get方法。

    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic-demo", "key".getBytes(), "value".getBytes());
    producer.send(record,
        new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
        if(e != null)
            e.printStackTrace();
        System.out.println("The offset of the record we just sent is: " + metadata.offset());
        }
    });
    

    同一分区发送的数据,每条记录的callback发送都是按照顺序执行的。
    KafkaProducer的close方法会阻塞等待之前所有发送的请求完成之后再关闭KafkaProducer。
    KafkaProducer使用total.memory.bytes来控制Producer缓存数据的最大字节数(要保证内存充足)。
    异步调用,需要设置block.on.buffer.full=false。

    序列化

    • 1.默认序列化实现
      KafkaProducer默认的编码格式为UTF-8。在编写自动代码之前,需要添加相应的依赖:
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
    

    下面是自定义的序列化代码:

    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CompanySerializer implements Serializer<Company> {
        private String encoding = "UTF8";
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("serializer.encoding");
            if (encodingValue != null && encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        public byte[] serialize(String topic, Company data) {
            try {
                if (null == data){
                    return null;
                }
                byte[] name,address;
                if (null != data.getName()){
                    name = data.getName().getBytes(encoding);
                }else {
                    name = new byte[0];
                }
                if (null != data.getAddress()){
                    address = data.getAddress().getBytes(encoding);
                }else {
                    address = new byte[0];
                }
                ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length+address.length);
                buffer.putInt(name.length);
                buffer.put(name);
                buffer.putInt(address.length);
                buffer.put(address);
                return buffer.array();
            }catch (UnsupportedEncodingException e){
                throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
            }
            return new byte[0];
        }
        public void close() {
    
        }
    }
    

    反序列化:

    package com.edu.kafka;
    
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CompanyDeserializer implements Deserializer<Company> {
    
        private String encoding = "UTF8";
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("deserializer.encoding");
            if (encodingValue != null && encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        public Company deserialize(String topic, byte[] data) {
            if (null == data){
                return null;
            }
            if (data.length < 8){
                throw new SerializationException("Size of data received by deserializer is shorter than expected!");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int nameLen,addressLen;
            String name,address;
            nameLen = buffer.getInt();
            byte[] nameBytes = new byte[nameLen];
            buffer.get(nameBytes);
            addressLen = buffer.getInt();
            byte[] addressBytes = new byte[addressLen];
            buffer.get(addressBytes);
            try {
                name = new String(nameBytes,encoding);
                address = new String(addressBytes,encoding);
            }catch (UnsupportedEncodingException e){
                throw new SerializationException("Error occur when deserializing!");
            }
            return new Company(name,address);
        }
        public void close() {
    
        }
    }
    
    • 2.使用Protostuff实现序列化

    添加相应依赖:

    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-core</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-runtime</artifactId>
        <version>1.6.2</version>
    </dependency>
    

    序列化:

    package com.edu.kafka;
    
    import io.protostuff.LinkedBuffer;
    import io.protostuff.ProtobufIOUtil;
    import io.protostuff.Schema;
    import io.protostuff.runtime.RuntimeSchema;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CompanySerializer implements Serializer<Company> {
        private String encoding = "UTF8";
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("serializer.encoding");
            if (encodingValue != null && encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        public byte[] serialize(String topic, Company data) {
            if (null == data){
                return null;
            }
            Schema schema = RuntimeSchema.getSchema(data.getClass());
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            byte[] protostuff = null;
            try {
                protostuff = ProtobufIOUtil.toByteArray(data,schema,buffer);
            }catch (Exception e){
                throw new IllegalStateException(e.getMessage(),e);
            }finally {
                buffer.clear();
            }
            return protostuff;
        }
    
        public void close() {
    
        }
    }
    

    反序列化:

    package com.edu.kafka;
    
    import io.protostuff.ProtostuffIOUtil;
    import io.protostuff.Schema;
    import io.protostuff.runtime.RuntimeSchema;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CompanyDeserializer implements Deserializer<Company> {
    
        private String encoding = "UTF8";
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("deserializer.encoding");
            if (encodingValue != null && encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        public Company deserialize(String topic, byte[] data) {
            if (null == data){
                return null;
            }
            Schema schema = RuntimeSchema.getSchema(Company.class);
            Company company = new Company();
            ProtostuffIOUtil.mergeFrom(data,company,schema);
            return company;
        }
    
        public void close() {
    
        }
    }
    

    使用序列化:

    package com.edu.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerAnalysis {
    
        public static final String topic = "topic-demo";
    
        public static void main(String[] args) {
            Properties prop = new Properties();
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
            KafkaProducer<String, Company> producer = new KafkaProducer<String, Company>(prop);
            Company company = Company.builder().name("xiaoyao").address("Beijing").build();
            ProducerRecord<String, Company> record = new ProducerRecord<String, Company>("topic-demo", company);
            System.out.println("===start===");
            try {
                producer.send(record);
            } catch (Exception e) {
                System.out.println("===exception===");
                e.printStackTrace();
            }
    
            producer.close();
            System.out.println("===finish===");
        }
    }
    

    区分器

    消息经过序列化之后就需要确定他们发往的分区。如果ProducerRecord指定分区,就会发往指定的分区,
    如果没有指定,就需要依赖分区器,根据key字段的哈希值选择一个分区,如果分区和key都没有指定,使用轮训的方式。
    在Kafka0.8.2.2中,有一个默认的区分器:Partitioner。
    在Kafka2.0.0中有一个默认分区器:DefaultPartitioner,并且提供了Partitioner接口,用户可以自定义实现分区器。

    自定义分区器:

    package com.edu.kafka;
    
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.utils.Utils;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class CustomPartitioner implements Partitioner {
    
        private final AtomicInteger counter = new AtomicInteger(0);
    
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
            int numPartitions = partitionInfos.size();
            if (null == keyBytes){
                return counter.getAndIncrement() % numPartitions;
            }else {
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        public void close() {
    
        }
    
        public void configure(Map<String, ?> map) {
    
        }
    }
    

    在Kafka 0.8.2.2版本中,也可以使用上面的代码,需要将implements Partitioner去掉,然后修改下面的key移动到相应分区的代码。

    生产者拦截器

    拦截器是在Kafka0.10.0.0之后才引入的功能。
    KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。
    KafkaProducer会在消息被应答之前或者消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于
    用户设定的Callback之前执行。
    close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
    自定义的拦截器:

    package com.edu.kafka;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class CustomProducerInterceptor implements ProducerInterceptor<String,String> {
        private volatile long sendSuccess = 0;
        private volatile long sendFailure = 0;
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String modifiedValue = "prefix1-"+record.value();
            return new ProducerRecord<String, String>(record.topic(),record.partition(),
                    record.timestamp(),record.key(),modifiedValue,record.headers());
        }
    
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if (null == exception){
                sendFailure++;
            }else{
                sendFailure++;
            }
        }
    
        public void close() {
            double successRatio = (double)sendSuccess/(sendSuccess + sendFailure);
            System.out.println("[INFO] send ratio="+String.format("%f",successRatio*100)+"%");
        }
    
        public void configure(Map<String, ?> configs) {
    
        }
    }
    

    KafkaProducer可以指定多个拦截器,形成拦截链,按照指定的顺序执行。

    原理分析

    整体架构

    20190620170845181.png

    执行流程:
    1.在启动main方法之后,KafkaProducer发送的数据到ProducerInterceptor拦截器。
    2.ProducerInterceptor拦截器对数据进行过滤,然后将数据发送到序列化。
    3.将过滤之后的数据发送到序列化,然后发送到不同的分区。
    4.分区中数据,会发送到RecordAccumulator(消息累加器),RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络的资源消耗以提升性能,默认大小为 buffer.memory=32MB,max.block.ms = 60000(超时时间)。
    RecordAccumulator维护多个双端队列,数据会堆积到一个ProducerBatch(多个ProducerRecord),然后批量发送。消息在网络上都是以字节的形式传输,RecordAccumulator的内部有一个BufferPool,用来实现ByteBuffer的复用,以实现缓存的高效利用。
    5.Sender从RecordAccumulator中获取缓存数据,转换为<Node,List<ProducerBatch>>,Node表示broker节点。
    6.将<Node,List<ProducerBatch>>转换为<NodeRequest>的形式。
    7.将信息缓存起来,保存到InFlightRequests中,InFlightRequest保存对象的具体形式为Map<NodeId,Deque<Request>>,它的主要作用是缓存已经发出去但还没有收到响应的请求。
    8.将信息提交给selector准备发送。
    9.selector将信息发送到KafkaCluster,KafkaCluster信息接收到信息之后,返回数据。
    10.selector将KafkaCluster发送过来的数据,传递给InFlightRequests。
    11.InFlightRequests接收到信息之后,会将信息发挥给主进程。

    元数据的更新

    InFlightRequests中可以获取LeastLoadedNode,即所有Node中负载最小。元数据操作是在客户端内部进行的。更新元数据时,
    会挑选出LeastLoadedNode,然后这个Node发送MetadataRequest请求来获取具体的元数据信息。这里的数据同步是通过
    synchronized和final来保证。

    生产者参数

    参数 默认值 描述
    acks 1 指定分区中必须要有多少个副本收到这条信息
    acks = 1,如果失败,会重发,只要有一个接收,就是成功;acks = 0,不需要等待服务端的响应;acks = -1,需要将所有的副本都返回成功才可以。
    max.request.sizes 1MB 发送消息的最大值
    retries 0 重试次数
    retries.backoff.ms 100 重试之间时间间隔
    compress.type none 可以对消息进行压缩
    connections.max.idle.ms 540000ms 闲置多长时间之后关闭连接
    receive.buffer.bytes 32KB 接收消息缓存区大小
    send.buffer.bytes 128KB 发送消息缓存区大小
    request.timeout.ms 3000ms 等待请求响应的最大时间

    其他的参数请参考官网

    参考文献

    kafka消息发送模式
    深入理解Kafka:核心设计与实践原理
    Kafka Documentation

    后续

    本文是《深入理解Kafka:核心设计与实践原理》的读书笔记。

    相关文章

      网友评论

        本文标题:Kafka 生产者

        本文链接:https://www.haomeiwen.com/subject/ohnybctx.html