访问远程kafka集群
如查看远程kafka集群的消息内容,
[root@localhost bin]# ./kafka-console-consumer.sh--zookeeper 10.255.209.37:2181 --topic risk-black --from-beginning
参数与查看本地时一样,只需zookeeper地址换成远程zookeeper地址即可。
查看kafka topic列表
使用--list参数,如
[root@localhost bin]# ./kafka-topics.sh --zookeeper10.255.209.36:2181 --list
risk-black
risk-device
risk-hn
topic的创建
由于消息存放在topic中的,因此我们先手动创建topic。例如创建2个topic名称分别为news、music,partitions数量分别为2和3,
[root@localhost kafka_2.9.1-0.8.2.1]# bin/kafka-topics.sh--create --zookeeper localhost:2181 --replication-factor 1 --partitions 2--topic news
Created topic "news".
[root@localhost kafka_2.9.1-0.8.2.1]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions3 --topic music
Created topic "music".
其中,replication-factor 指定了副本数(主副本和从副本之和),partitions指定了分区数。副本数不能大于kafka实例节点数,否则报错:副本数大于节点数,如
[root@localhost kafka_2.9.1-0.8.2.1]# bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 5 --partitions 3 --topictest-replicate-topic2
Error while executing topic command replicationfactor: 5 larger than available brokers: 3
kafka.admin.AdminOperationException: replicationfactor: 5 larger than available brokers: 3
…..
这时我们可以在数据文件存储根目录(数据文件存储根目录在Kafka broker中server.properties文件配置(参数log.dirs=xxx)中指定)下看到如下几个文件
[root@localhost kafka-logs]# ll
total 28
drwxr-xr-x 2 root root 4096 Jun 6 14:38 music-0
drwxr-xr-x 2 root root 4096 Jun 6 14:38 music-1
drwxr-xr-x 2 root root 4096 Jun 6 14:38 music-2
drwxr-xr-x 2 root root 4096 Jun 6 14:37 news-0
drwxr-xr-x 2 root root 4096 Jun 6 14:37 news-1
消息发送时都被发送到一个topic,其本质就是一个目录,在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。而topic由是由一些Partition组成,而每个partition的文件夹中又有多组小文件(segment file)组成。segment file由2大部分组成,分别为index file和log file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件。如
[root@localhost kafka-logs]# ll news-0
total 0
-rw-r--r-- 1 root root 10485760 Jun 6 14:37 00000000000000000000.index
-rw-r--r-- 1 root root 0 Jun 6 14:37 00000000000000000000.log
查看主题详细信息
使用describe命令查看指定主题的详细信息,如
[root@localhost bin]# ./kafka-topics.sh --describe--zookeeper localhost:2181 --topicnews
Topic:news PartitionCount:2 ReplicationFactor:1 Configs:
Topic: news Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: news Partition: 1 Leader: 0 Replicas: 0 Isr: 0
[root@localhost
bin]#
第一个行显示所有partitions的一个总结,以下每一行给出一个partition中的信息。
每个分区都有5个属性。Topic:主题名称;partition:分区编号,从0开始;leader:当前分区负责读写的节点,只有主副本才会接收消息的读写;replicas:分区的复制节点(主副本和从副本)列表,默认只有一个,即主副本;Isr:同步状态的副本,是replicas的一个子集。
删除topic
使用delete选项,如
[root@localhost bin]# ./kafka-topics.sh --delete--zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic news
Topic news is marked for deletion.
Note: This will have no impact ifdelete.topic.enable is not set to true.
可见,这并没有完全删除topic,
我们需要在server.properties中增加下面一句
delete.topic.enable=true
首先停止所有的consumer和producer,删除kafka-logs下的所有文件,然后重启kafka服务即可。如
[root@localhost bin]# ./kafka-server-stop.sh
[root@localhost bin]# ./kafka-server-start.sh ../config/server.properties
生产者-向kafka写数据
开发者可以使用kafka内置的客户端API开发kafka应用程序。除了内置的客户端外, Kafka 还提供了二进制连接协议,也就是说,我们直接向 Kafka 网络端口发送适当的字节序列,就可以实现从 Kafka 读取消息或往 Katka 写入消息。还有很多用其他语言实现的 Kafka 客户端,比如 C++、 Python 、 Go 语言等,它们都实现了 Kafka 的连接协议,使得 Kafka不仅仅局限于在 Java 里使用。这些客户端不属于 Katka 项目,不过 Kafka 项目 wiki 上提供了一个清单,列出了所有可用的客户端。
还有一种方法是使用kafka提供的控制台生产者脚本kafka-console-producer.sh。
向topic写消息,启动一个控制台的生产者producer (指定broker topic) 输入内容回车,如
[root@localhost kafka_2.9.1-0.8.2.1]#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic news
[2019-08-08 19:03:56,943] WARN Property topic isnot valid (kafka.utils.VerifiableProperties)
test1
test2
test3
test4
1test
2test
网友评论