美文网首页Kafka
kafka_03_Kafka消息发送

kafka_03_Kafka消息发送

作者: 平头哥2 | 来源:发表于2019-03-23 22:08 被阅读0次

Kafka消息发送

1. 构建ProducerRecord对象

该类如下:

public class ProducerRecord<K, V> {

    //The topic the record will be appended to
    private final String topic;
    //The partition to which the record should be sent
    private final Integer partition;
    //the headers that will be included in the record
    private final Headers headers;
    //The key that will be included in the record
    private final K key;
    //The record contents
    private final V value;
    //The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the timestamp using System.currentTimeMillis().
    private final Long timestamp;
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    
    //Create a record with no key
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
    //省略getters和setters
}

这里有5个构造方法,但是最后用的就是其中的一个。实际应用中,构建ProducerRecord对象是非常频繁的操作。

2. 发送消息

发送消息有三种模式,发后即忘(fire-and-forget), 同步(sync),异步(async)

2.1 fire-and-forget(上一篇博客介绍的就是发后即忘)

 producer.send(record);

特点: 效率高,可靠性差

2.2 同步

代码如下:

package com.ghq.kafka.server;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 消息的发送
 */
public class ProducerSendMessage {

    public static final String brokerList = "192.168.52.135:9092";
    public static final String topic = "topic-demo";

    public static Properties initProperties(){
        Properties prop = new Properties();
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
        /**
         *
         * 配置重试次数,这里重试10次,重试10次之后如果消息还是发送不成功,那么还是会抛出异常
         * 那些类可以重试呢?
         * org.apache.kafka.common.errors.RetriableException 及其子类
         *
         */
        prop.put(ProducerConfig.RETRIES_CONFIG,10);
        return prop;
    }

    public static void sync() {

        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        //3. 发送消息
        while (true){
            /**
             * send方法本身就是异步的
             */
            Future<RecordMetadata> future = producer.send(record);

            try {
                /**
                 * get方法是阻塞的
                 * 这里返回 RecordMetadata,包含了发送消息的元数据信息
                 */
                RecordMetadata metadata = future.get();
                System.out.println("topic:"+metadata.topic());
                System.out.println("partition:"+metadata.partition());
                System.out.println("offset:"+metadata.offset());
                System.out.println("hasTimestamp:"+metadata.hasTimestamp());
                System.out.println("-----------------------------------------");
                Thread.sleep(1000);
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }

        }
        //4. 关闭资源
        //producer.close();

    }
}

特点:性能差,可靠性高

注:什么异常可以重试?RetriableException

RetriableException.jpg

2.3 异步

public static void async() {
        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        Future<RecordMetadata> future = producer.send(record, new Callback() {

            /**
             * metadata 和 exception 互斥
             * 消息发送成功:metadata != null exception == null
             * 消息发送失败:metadata == null exception != null
             */
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if (exception != null) {
                    System.out.println("消息发送失败:"+metadata);
                }else {
                    System.out.println("消息发送成功:"+metadata);
                }
            }
        });
        
        //这里采用lambda表达式
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.out.println("消息2发送失败:"+metadata);
            }else {
                System.out.println("消息2发送成功:"+metadata);
            }
        });
        producer.close();
    }

输出结果如下:

消息1发送成功:topic-demo-0@22
消息2发送成功:topic-demo-2@24

特点:性能 :同步 < 异步 < 发后即忘,可靠性:同步 > 异步 > 发后即忘

结束。

相关文章

  • kafka_03_Kafka消息发送

    Kafka消息发送 1. 构建ProducerRecord对象 该类如下: 这里有5个构造方法,但是最后用的就是其...

  • 发送消息

    1 消息设置 普通,需回复型,拒绝型 2 发送消息 2-1 发送 显示勾选人数,及他们的状态。 例子: 总共勾选1...

  • 消息发送

    普通消息: RongIM入口: 可用的方法(2个) 废弃的方法 (4个) 现可用的RongIM --> sendM...

  • 消息发送

    消息发送1.首先,通过obj的isa指针找到它的class;2.在class的method list找对应方法;3...

  • 消息发送

    查找方法的本质都是消息发送,objc_msgSend是由汇编代码实现的,目的是更快更高效。之后的慢速查找函数loo...

  • iOS 融云发送一条消息

    发送图片消息 发送文字消息

  • 消息队列-1 五问

    01 如何保证消息不丢失 三个阶段,发送消息,存储消息,消费消息 发送消息阶段 到消息服务器,有同步发送和异步发送...

  • 消息发送,消息转发

    [receiver message]; 这一句的含义是:向receiver发送名为message的消息。 运行 c...

  • 发送消息&&接收消息

    发送消息 发送消息的API参考:https://www.jianshu.com/p/a2bc82f33e4e[ht...

  • rocketmq 消息发送源码分析总结

    DefaultMQProduce 主要作用 1发送消息,异步,同步,和OneWay发送。 消息发送消息流程...

网友评论

    本文标题:kafka_03_Kafka消息发送

    本文链接:https://www.haomeiwen.com/subject/gimevqtx.html