一 、部署服务
1、下载(一定要下载二进制的包,而不是源码的包)
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.13-2.5.0.tgz
2、安装
mv kafka_2.13-2.5.0.tgz /usr/local
cd /usr/local
tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
3 、启动zookeeper(kafka自带的二进制包文件里面就有zookeeper)
> bin/zookeeper-server-start.sh config/zookeeper.properties
INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
4、启动kafka
另开一个终端窗口
> bin/kafka-server-start.sh config/server.properties
INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
这个时候zookeeper 和kafka 都已经启动
5 创建一个 topic
另开一个终端窗口
创建一个名为“test”的topic,它有一个分区和一个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在我们可以运行list(列表)命令来查看这个topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
6 、发送一些消息
另开一个终端窗口
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。
[root@localhost kafka_2.13-2.5.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is a message from liupeng
this is another message for test
liuminbo is a good boy
7 、启动一个 consumer
另开一个终端窗口
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
[root@localhost kafka_2.13-2.5.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a message from liupeng
>this is another message for test
>liuminbo is a good boy
所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。
二、 java程序嵌入
image.png1、引入依赖
pom.xml
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
2、编写produce
package org.springblade.common.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Kafka Producer
*
* @author liupeng
* @date 2020-11-12
*/
@Slf4j
public class ProduceKafka {
private static final String TOPIC = "test";
private static final String BROKER_LIST = "localhost:9092";
private static KafkaProducer<String,String> producer = null;
// 初始化生产者
static {
Properties configs = initConfig();
producer = new KafkaProducer<>(configs);
}
/**
* 初始化配置
*/
private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
//消息实体
ProducerRecord<String , String> record = new ProducerRecord<>(TOPIC,"message"+"this is a message");
producer.send(record);
producer.close();
}
}
3、编写consumer
package org.springblade.common.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;
/**
* Kafka Consumer
*
* @author liupeng
* @date 2020-11-12
*/
@Slf4j
public class ConsumerKafka {
private static final String BROKER_LIST = "localhost:9092";
private static KafkaConsumer<String,String> consumer = null;
static {
Properties configs = initConfig();
consumer = new KafkaConsumer<>(configs);
}
private static Properties initConfig(){
Properties properties = new Properties();
properties.put("bootstrap.servers",BROKER_LIST);
properties.put("group.id","0");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}
public static void main(String[] args) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
log.info(String.valueOf(record));
}
}
}
}
运行produce的main函数
consumer终端窗口可以看见发送的消息
image.png
官方文档中文版:
https://kafka.apachecn.org/uses.html
网友评论