美文网首页
尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

作者: 尚硅谷教育 | 来源:发表于2018-12-10 09:32 被阅读11次

    第4章 Kafka API****实战

    4****.****1 环境准备

    1)启动zk和kafka集群,在kafka集群中打开一个消费者

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

    --zookeeper hadoop102:2181 --topic first

    2)导入pom依赖

    <dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    
    <dependency>
    
        <groupId>org.apache.kafka</groupId>
    
        <artifactId>kafka-clients</artifactId>
    
        <version>0.11.0.0</version>
    
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    
    <dependency>
    
        <groupId>org.apache.kafka</groupId>
    
        <artifactId>kafka_2.12</artifactId>
    
        <version>0.11.0.0</version>
    
    </dependency>
    

    </dependencies>

    4****.****2 Kafka生产者Java API

    4.2.1 创建生产****者****(****过时的****API)

    package com.atguigu.kafka;

    import java.util.Properties;

    import kafka.javaapi.producer.Producer;

    import kafka.producer.KeyedMessage;

    import kafka.producer.ProducerConfig;

    public class OldProducer {

    @SuppressWarnings("deprecation")

    public static void main(String[] args) {

    Properties properties = new Properties();

    properties.put("metadata.broker.list", "hadoop102:9092");

    properties.put("request.required.acks", "1");

    properties.put("serializer.class", "kafka.serializer.StringEncoder");

    Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));

    KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");

    producer.send(message );

    }

    }

    4.2****.2 创建****生产者****(新****API****)

    package com.atguigu.kafka;

    import java.util.Properties;

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.Producer;

    import org.apache.kafka.clients.producer.ProducerRecord;

    public class NewProducer {

    public static void main(String[] args) {

    Properties props = new Properties();

    // Kafka服务端的主机名和端口号

    props.put("bootstrap.servers", "hadoop103:9092");

    // 等待所有副本节点的应答

    props.put("acks", "all");

    // 消息发送最大尝试次数

    props.put("retries", 0);

    // 一批消息处理大小

    props.put("batch.size", 16384);

    // 请求延时

    props.put("linger.ms", 1);

    // 发送缓存区内存大小

    props.put("buffer.memory", 33554432);

    // key序列化

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // value序列化

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 50; i++) {

    producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));

    }

    producer.close();

    }

    }

    4.2****.****3**** 创建****生产者****带****回调函数****(新****API****)

    package com.atguigu.kafka;

    import java.util.Properties;

    import org.apache.kafka.clients.producer.Callback;

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.apache.kafka.clients.producer.RecordMetadata;

    public class CallBackProducer {

    public static void main(String[] args) {

    Properties props = new Properties();

    // Kafka服务端的主机名和端口号

    props.put("bootstrap.servers", "hadoop103:9092");

    // 等待所有副本节点的应答

    props.put("acks", "all");

    // 消息发送最大尝试次数

    props.put("retries", 0);

    // 一批消息处理大小

    props.put("batch.size", 16384);

    // 增加服务端请求延时

    props.put("linger.ms", 1);

    // 发送缓存区内存大小

    props.put("buffer.memory", 33554432);

    // key序列化

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // value序列化

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

    for (int i = 0; i < 50; i++) {

    kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

    @Override

    public void onCompletion(RecordMetadata metadata, Exception exception) {

    if (metadata != null) {

    System.err.println(metadata.partition() + "---" + metadata.offset());

    }

    }

    });

    }

    kafkaProducer.close();

    }

    }

    本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

    相关文章

      网友评论

          本文标题:尚硅谷大数据技术之Kafka

          本文链接:https://www.haomeiwen.com/subject/sfzlhqtx.html