在安装及测试过程中,可能会发生些错误,那是因为新版本的kafka的一些命令发生了改变。
本人使用的是2.5.0版本
创建消费者报错
错误一:disconnected
WARN [Consumer clientId=consumer-console-consumer-47753-1, groupId=console-consumer-47753] Bootstrap broker 127.0.0.1:2181 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
在做kafka测试的时候,使用命令bin/kafka-console-consumer.sh --zookeeper
原因及解决办法
新版本不适用端口2181,而是9092
错误二:zookeeper is not a recognized option
zookeeper is not a recognized option
原因及解决办法
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --tpoic Rising-topic --from-beginning
启动消费者,发现一直报错consumer zookeeper is not a recognized option,发现在启动的时候说使用 --zookeeper是一个过时的方法,此时,才知道原来在最新的版本中,这种启动方式已经被删除了,
0.90版本之后启动消费者的方法
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
zookeeper如何查看kafka的节点信息及查看方式
详细可阅览这篇博文
如何知道zk是否正常启动
有时启动zk,显示started,但其实zk并不成功启动,可采用如下方式:
telnet 127.0.0.1 2181
然后输入stat,出现下面这些信息即正常启动
image.png
如何远程连接kafka
kafka部署在阿里云服务器,本地java代码如何连接测试呢?
问题一:阿里云服务器安全组问题
阿里云有自己的安全组,禁止访问某些端口,明明防火墙已经关了,但除了22端口其他依然无法访问。
可以参考下这篇博文
问题二:无法解析主机名:java.net.UnknownHostException: xxxxxxx
打开linux下的etc目录下的hosts文件,另起一行添加ip+域名(ip和域名之间使用空格分开),这里的域名就是阿里云的主机名。
修改了可能不会立即生效,我是重启服务器之后才行的。
问题三:远程连接问题
本地java测试显示
Connection refused: no further information
这是需要修改配置文件/config/server.properties
advertised.listener=PLAINTEXT://阿里云外网IP:9092
把这一项注释打开,就可以了
其他博文说创建host.name=外网地址,老版本是可以,但新版本没有host.name这一项,加上去kafka会无法启动,在新属性advertised.listener修改就可以了。
Springboot集成Kafka
引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
注意这里的依赖是有对应关系的,随便对应有可能会出错,对应关系可见Kafka官网,截图如下:
image.png
配置文件application.yml
server:
port: 8081
spring:
application:
name: producer
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: Springboot-groupid
auto-offset-reset: earliest
enable-auto-commit: true
bootstrap-servers: IP地址:9092
可使用KafkaTemplate,producer举例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* @author Rising
* @date 2020/4/23
*/
@Component
public class Producer {
@Autowired
KafkaTemplate kafkaTemplate;
public void send(){
kafkaTemplate.send("topic", "msgKey", "msgData");
}
}
consumer举例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author Rising
* @date 2020/4/23
*/
@Component
public class Consumer {
@KafkaListener(topics = {"topic"})
public void listener(ConsumerRecord record){
Optional msg = Optional.ofNullable(record.value());
if (msg.isPresent()){
System.out.println(msg.get());
}
}
}
启动application即可
/**
* @author Administrator
*/
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(ProducerApplication.class, args);
Producer producer = context.getBean(Producer.class);
for (int i = 0; i < 10; i++) {
producer.send();
TimeUnit.SECONDS.sleep(2);
}
}
}
———————————————————华丽的分割线—————————————————————
Kafka的一些小知识
生成写入流程
producer先从Zookeeper的节点找到改partition分区的Leader
producer将消息发送给leader
leader将消息写入本地log
followers从leader pull消息
写入本地log后向leader发送ack
leader收到所有replication的ack后并向producer发送ack
broker保存消息及存储策略
存储方式
物理上把topic分成一个或多个paticition
存储策略
无论消息是否被消费,kafka都会保存所有消息,有两种策略可以删除旧数据:
1.基于时间:168小时
2.基于大小:1G
kafka读取特定消息复杂度为O(1),删除过期文件并不能提高Kafka性能。
tips:producer不在zk中注册,消费者在zk注册
消费流程
高级API
不能自己控制offset,不能重复读
低级API
可以自己控制offset,但过于复杂
网友评论