美文网首页
生产者(2019-02-15)

生产者(2019-02-15)

作者: Rondo9 | 来源:发表于2019-02-15 21:56 被阅读0次

                                                      Kafka生产者

    架构图: 

    Kafka生产者组件图

    必选属性:

        bootstrap.servers: broker的地址清单(host:port)

        key.serializer: 键的序列化器(ByteArraySerializer[这个只做很少的事情], StringSerializer, IntegerSerializer, 自定义序列化器)

        value.serializer: 值的序列化器(同上)

    创建Kafka生产者:

        1. 新建一个Properties对象;

        2. 因为我们打算把键和值定义成字符串类型, 所以使用内置的StringSerializer;

        3. 在这里我们创建了一个新的生产者对象, 并为键和值设置了恰当的类型, 然后把Properties对象传给它。

        private Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");

        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new kafkaProducer<String, String>(kafkaProps);

    发送消息:

        1.同步发送消息

            ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

            try {

                producer.send(record).get();

            } catch (Exception e) {

                e.printStackTrace();

            }

        2.异步发送

            private class DemoProducerCallback implements Callback {

                @Override

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                    if (e != null) {

                        e.printStackTrace();

                    }

                }

            }

            ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

            producer.send(record, new DemoProducerCallback());

    可配置参数:

        1.acks: 有多少个分区副本收到消息生产者才会认为消息写入是成功的;

        2.buffer.memory: 设置生产者内存缓冲区的大小;

        3.compression.type: 指定消息发送时使用哪一种压缩算法进行压缩(snappy, gzip, lz4);

        4.retries: 生产者可以重发消息的次数;

        5.batch.size: 同一批次发送到同一分区使用的内存大小;

        6.linger.ms: 同批次等待时间;

        7.client.id: 任意字符串, 识别消息的来源;

        8.max.in.flight.requests.per.connection: 生产者在收到服务器的响应之前可以发送多少个消息;

        9.timeout.ms, request.timeout.ms 和 metadata.fetch.timeout.ms: 

            timeout.ms: 等待同步副本返回消息确认的时间;

            request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间;

            metadata.fetch.timeout.ms: 生产者在获取元数据时等待服务器返回响应的时间;

        10.max.block.ms: 获取元数据时的阻塞时间;

        11.max.request.size: 生产者发送请求的大小;

        12.receive.buffer.bytes 和 send.buffer.bytes: TCP socket 接收和发送数据宝的缓冲区大小;

    序列化器:

        主要实现 org.apache.kafka.common.serialization.Serializer 的 byte[] serialize(String topic, Customer data) 方法

    分区器:

        主要实现 org.apache.kafka.clients.producer.Partitioner 的 int partition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) 方法

    相关文章

      网友评论

          本文标题:生产者(2019-02-15)

          本文链接:https://www.haomeiwen.com/subject/bofmeqtx.html