美文网首页
尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

作者: 尚硅谷教育 | 来源:发表于2018-12-10 09:32 被阅读11次

第4章 Kafka API****实战

4****.****1 环境准备

1)启动zk和kafka集群,在kafka集群中打开一个消费者

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--zookeeper hadoop102:2181 --topic first

2)导入pom依赖

<dependencies>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>0.11.0.0</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka_2.12</artifactId>

    <version>0.11.0.0</version>

</dependency>

</dependencies>

4****.****2 Kafka生产者Java API

4.2.1 创建生产****者****(****过时的****API)

package com.atguigu.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class OldProducer {

@SuppressWarnings("deprecation")

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("metadata.broker.list", "hadoop102:9092");

properties.put("request.required.acks", "1");

properties.put("serializer.class", "kafka.serializer.StringEncoder");

Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));

KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");

producer.send(message );

}

}

4.2****.2 创建****生产者****(新****API****)

package com.atguigu.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服务端的主机名和端口号

props.put("bootstrap.servers", "hadoop103:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 请求延时

props.put("linger.ms", 1);

// 发送缓存区内存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {

producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));

}

producer.close();

}

}

4.2****.****3**** 创建****生产者****带****回调函数****(新****API****)

package com.atguigu.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服务端的主机名和端口号

props.put("bootstrap.servers", "hadoop103:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 增加服务端请求延时

props.put("linger.ms", 1);

// 发送缓存区内存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {

kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (metadata != null) {

System.err.println(metadata.partition() + "---" + metadata.offset());

}

}

});

}

kafkaProducer.close();

}

}

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

相关文章

网友评论

      本文标题:尚硅谷大数据技术之Kafka

      本文链接:https://www.haomeiwen.com/subject/sfzlhqtx.html