美文网首页
API普通生产者

API普通生产者

作者: Shaw_Young | 来源:发表于2022-02-21 20:39 被阅读0次

1)导入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

2)编写代码

需要用到的类
KafkaProducer:需要创建一个生产者对象,用来发送数据。
ProducerConfig:获取所需的一系列配置参数。
ProducerRecord:每条数据都要封装成一个ProducerRecord对象。
1.不带回调函数的API

package com.young.springbootdemo.kafkademo;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        //1.创建Kafka生产者的配置信息
        Properties properties = new Properties();

        //2.指定连接的kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.235.3:9092");

        //3.ACK应答级别
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        //4.重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        //5.批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        //6.等待时间
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        //7.RecordAccmulator缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3355432);

        //8.Key,Value序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //9.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            //10.发送数据
            producer.send(new ProducerRecord<String, String>("first", "kafka_message_" + i));
        }

        //11.关闭资源
        producer.close();

    }
}

启动一个消费者消费消息

/opt/kafka_2.11-0.11.0.3/bin # kafka-console-consumer.sh --zookeeper zkKafka1:2181 --topic first
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
kafka_message_1
kafka_message_3
kafka_message_5
kafka_message_7
kafka_message_9
kafka_message_0
kafka_message_2
kafka_message_4
kafka_message_6
kafka_message_8

相关文章

  • API普通生产者

    1)导入依赖 2)编写代码 需要用到的类KafkaProducer:需要创建一个生产者对象,用来发送数据。Prod...

  • 四、Kafka API实战

    4.1 环境准备 4.2 Kafka生产者Java API 4.2.1 创建生产者(过时的API) 4.2.2 创...

  • API经济下,API管理平台哪家强

    聊聊API经济 API经济,简单点讲,就是将API这种资源像商品一样进行售卖的模式。其中有三种角色: API生产者...

  • API生产者流程

    Kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和...

  • Kafka-1.APIS

    Kafka包含5个核心APIs: 生产者API,向Kafka集群中的主题发送数据流; 消费者API,从Kafka集...

  • 我花了一周读了Kafka Producer的源码

    talk is easy,show me the code,先来看一段创建producer的代码 生产者的API使...

  • Kafka 客户端生产者使用详解(Java)

    本文将从使用 Kafka 的 Java 客户端生产者入手,剖析 Producer API 是如何向 Kafka 集...

  • 软件篇-kakfa(三) - API操作

    Api操作 1.生产者 1.1 创建 通过之前的介绍[https://www.jianshu.com/p/2081...

  • 基础服务搭建:pulsar MQ

    独立模式安装 pulsar 服务(适合本地测试) 8080: api 端口 6650: 服务端口,即生产者和消费...

  • Kafka消费者API总结

    相对于Kafka的生产者API,消费者的API略显繁杂,本文总结了0.11.0版本的kafka消费者的几种消费模式...

网友评论

      本文标题:API普通生产者

      本文链接:https://www.haomeiwen.com/subject/gnhilrtx.html