Mac 下安装启动 kafka
- 安装
kakfa
,安装过程将依赖安装zookeeper
brew install kafka
软件位置
/usr/local/Cellar/zookeeper
/usr/local/Cellar/kafka
配置文件位置
/usr/local/etc/kafka/zookeeper.properties
/usr/local/etc/kafka/server.properties
备注:后续操作均需进入 /usr/local/Cellar/kafka/xxx/bin
目录下。
- 启动zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
- 启动kafka服务
kafka-server-start /usr/local/etc/kafka/server.properties &
- 创建
topic
,命名为test1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
- 查看创建的topic
kafka-topics --list --zookeeper localhost:2181
- 生产数据
kafka-console-producer --broker-list localhost:9092 --topic test1
- 消费数据
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1 --from-beginning
备注:--from-beginning 将从第一个消息还是接收
SpringBoot 整合 Kafka
-
pom.xml
添加spring-kafka
依赖,根据自身版本做调整
<!--springboot 版本:2.1.1.RELEASE 对应 spring-kafka 版本 2.2.2.RELEASE 对应 kafka 版本 2.1.0-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
-
application.yml
配置,根据具体服务器填写bootstrap-servers
spring:
# Kafka
kafka:
# Kafka 服务器地址
bootstrap-servers: 192.168.10.31:9092
# 生产者
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# key-value 序列化
# 消费者
consumer:
group-id: consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100s
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-
KafkaProducer.java
生产者
package tc.smartlockapplet.utils.http.kafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import tc.smartlockapplet.utils.http.json.JSONUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 生产者
*
* Created by panzhangbao on 2018-12-27 16:57:54
* Copyright © 2018 panzhangbao. All rights reserved.
*/
@Component
@EnableScheduling
public class KafkaProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 定时器测试 生产者发送信息
*
* @param
* @return void
* @date 2018/12/28 08:37
* @author panzhangbao
*/
@Scheduled(cron = "0/3 * * * * *")
public void schedulingTest() {
String topic = "personalInfoTopic";
Map<String, String> messageMap = new HashMap<>();
messageMap.put("name", "JasonPan");
messageMap.put("motto", "If not now, when? if not me, who?");
System.out.println("\n---------- producer ----------\n");
System.out.println("topic is " + topic + "\nmessage is " + messageMap + "\n");
kafkaTemplate.send(topic, "personalInfo", JSONUtils.mapToJson(messageMap));
}
}
-
KafkaConsumer.java
消费者
package tc.smartlockapplet.utils.http.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* Kafka 消费者
*
* Created by panzhangbao on 2018-12-27 19:03:19
* Copyright © 2018 panzhangbao. All rights reserved.
*/
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"personalInfoTopic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("---------- consumer ----------");
System.out.println("record is " + record + "\nmessage is " + message);
}
}
}
-
运行项目,结果如下:
SpringBoot 整合 Kafka 运行结果
网友评论