本教程假设你是新手,没有 Kafka 或 ZooKeeper 的历史数据。由于 Kafka 自带的控制台脚本在 Unix 和 Windows 平台上有差异,因此在 Windows 平台上使用 * bin\windows\ * 目录下的命令,脚本扩展名为 * .bat *,Unix 平台上用 * bin/ * 下的命令。
第1步:下载代码
下载 0.10.2.0 版并解压。
tar -xzf kafka_2.11-0.10.2.0.tgz
cd kafka_2.11-0.10.2.0
第2步:启动服务器
Kafka 使用 ZooKeeper,所以你需要首先启动一个 ZooKeeper 服务器,如果你还没有。可以通过与 kafka 一起打包的脚本来获取单节点 ZooKeeper 实例。
bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
现在启动 Kafka 服务器:
bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
第3步:创建 Topic
创建一个单分区(partition)单副本(replica)的 topic,起名为 "test":
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
如果运行 list topic 命令,可以看到这个 topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
另外,可以通过修改 broker 的配置,当消息所属的 topic 不存在时可以自动创建。
第4步:发消息
Kafka 提供了一个命令行客户端,它将从文件或标准输入源获取输入信息,并将其作为消息发送到 Kafka 集群。默认情况下,每行都将作为单独的消息发送。
运行生产者(producer),然后在控制台中输入一些消息发送到服务器。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
第5步:启动消费者(consumer)
Kafka 还有一个命令行消费者(consumer),将消息转储到标准输出。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你已将上述每个命令在不同的终端运行了,那么现在应该能够在生产者终端(producer terminal)中键入消息,看到它们出现在消费者终端(consumer terminal)。
所有命令行工具都有额外的可选参数;运行无参的命令可以显示帮助信息。
第6步:配置 broker 集群
到目前为止,我们已经可以运行单 broker 了,但这没啥意思。对于 Kafka 集群来说,单 broker 只是一个大小为 1 的集群,除了多启动几个 broker 实例之外并没有多少变更。为了体验一下,我们将集群扩展到三个节点(仍然在我们本地机器上)。
首先,我们为每个代理创建一个配置文件(在Windows 上使用 copy 命令):
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
现在编辑这些新文件并设置以下属性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
** broker.id 属性是集群中每个节点的唯一且永久的名称** 。因为我们在同一台机器上运行这些实例,所以必须修改端口和日志目录,我们希望保持所有 broker 尝试在同一端口注册或覆盖彼此的数据。
我们已经启动了 Zookeeper 和一个单节点,所以我们只需要启动两个新的节点:
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
现在创建一个新的 topic,复制因子(replication factor)为 3:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
现在我们有一个集群,我们怎么知道哪个 broker 正在做什么?可以运行 “describe topics” 命令进行查看:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
下面解释一下输出信息。第一行给出所有分区的摘要,每个附加行给出一个分区的信息。由于我们这个 topic 只有一个分区,所以只有一行。
- “leader” 是负责给定分区的所有读写的节点。每个节点都有可能被随机的选为 leader。
- “replicas” 该分区副本节点的列表,无论它们是否为 leader 或是否处于 alive 状态。
- “isr” 是 “in-sync” 副本的集合。这是 replicas 列表的子集,其当前活动并作为 leader 的接替节点。
请注意,在我的示例中,节点1 是 topic 的唯一分区的 leader。
我们可以对我们创建的原始 topic 运行相同的命令,查看它在哪里:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
不出所料,原来的 topic 没有副本,它是刚创建时集群中唯一的服务,它在 ID 为 0 的服务器上。
让我们向我们的新 topic 发布几条消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
现在让我们消费这些消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
现在让我们测试容错性。Broker 1 作 leader,我们停掉它:
ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
kill -9 7564
在 Windows上使用:
wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar" kafka.Kafka config\server-1.properties 644
taskkill /pid 644 /f
Leader 已切换到其中一个从节点,节点1 已不在 in-sync 副本集中:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
即使最初写入的 Leader 已经挂了,消息也是可以正常消费的:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
第7步:使用Kafka 连接导入 / 导出数据
从控制台写入数据并将其写回控制台是一个简单的例子,但你可能希望使用来自其他来源的数据或将数据从 Kafka 导出到其他系统。对于大多数系统,可以使用 Kafka Connect 来导入或导出数据,不用再自己写集成代码。
Kafka Connect 是 Kafka 包含的一个工具,用于将数据导入和导出到Kafka。它是一个可扩展的连接器工具,实现与外部系统交互的自定义逻辑。在这个快速入门中,我们将看到如何使用简单的连接器来运行 Kafka Connect,这些连接器将数据从文件导入 Kafka topic,并将数据从 Kafka topic导出到文件。
首先,我们创建一些种子数据:
echo -e "foo\nbar" > test.txt
接下来,我们将启动在独立模式下运行的两个连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个是Kafka Connect 过程的配置,包含常见的配置,如:要连接的 Kafka broker 和数据的序列化格式。其余的配置文件每个都指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的一些其他配置。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Kafka 包含的这些示例配置文件使用你之前启动的默认本地群集配置,并创建两个连接器:第一个是源连接器,从输入文件读取行并生成每个Kafka topic,第二个是接收连接器,它从 Kafka topic 读取消息,并将其输出到文件中。
在启动期间,可以看到一些日志消息,包括一些指示连接器正在被实例化。一旦 Kafka Connect 进程启动,源连接器应该开始从 *test.txt * 读取行并将其生成到 topic *connect-test *,并且 sink 连接器应该开始从 topic connect-test 读取消息,将它们写入文件 * test.sink.txt *。我们可以通过检查输出文件的内容来验证通过整个流水线传递的数据:
cat test.sink.txt
foo
bar
注意,数据存储在名为 * connect-test * 的 Kafka topic 中,因此我们还可以运行 consumer 控制台来查看 topic 中的数据(或使用自定义消费程序代码来处理它):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器继续处理数据,因此我们可以将数据添加到文件中,并查看它通过管道移动:
echo "Another line" >> test.txt
你应该看到该行在 consumer 控制台输出并在 sink 文件中出现。
第8步:使用 Kafka Streams 来处理数据
Kafka Streams 是 Kafka 的客户端库,用于实时流处理和分析存储在Kafka broker 中的数据。此示例将演示如何用这个库来编码。这里是 WordCountDemo 示例代码的要点(使用 Java 8 lambda 表达式以方便阅读)。
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic ""streams-file-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count("Counts")
// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
它实现 WordCount 算法,从输入文本计算出字出现次数的直方图。但是,与你在操作有界数据之前可能已经看到的其他 WordCount 示例不同,WordCount 演示程序的行为略有不同,因为它设计为对无限制的无限数据流进行操作。类似于有界变量,它是一种有状态的算法,跟踪和更新单词的计数。然而,由于它必须承担潜在的无界输入数据,它将周期性地输出其当前状态和结果,同时继续处理更多的数据,因为它不知道它何时处理了“全部” 输入数据。
作为第一步,我们将准备输入数据到 Kafka topic,随后将由 Kafka Streams 程序处理。
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
或在 Windows 上:
echo all streams lead to kafka> file-input.txt
echo hello kafka streams>> file-input.txt
echo|set /p=join kafka summit>> file-input.txt
接下来,我们使用 producer 控制台将此输入数据发送到名为 * streams-file-input * 的输入 topic,该 producer 控制台逐行读取 STDIN 中的数据,并将每行作为单独的 Kafka 消息以 null 键和值编码一个字符串到topic (实际上,流数据可能会连续流入 Kafka,程序将启动并运行):
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
我们现在可以运行 WordCount 演示程序来处理输入数据:
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
演示程序将从输入 topic streams-file-input 读取,对每个读取消息执行 WordCount 计算,并将其当前结果连续写入输出 topic streams-wordcount-output 。因此,除了日志条目,将不会有任何 STDOUT 输出,因为结果被写回 Kafka 。演示将运行几秒钟,不像一般的流处理应用程序,自动终止。
我们现在可以通过从其输出 topic 中读取来检查 WordCount 演示程序的输出:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
并将以下输出数据打印到控制台:
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
这里,第一列是 java.lang.String 格式的 Kafka 消息键,第二列是 java.lang.Long 格式的消息值。注意,输出实际上是连续的更新流,其中每个数据记录(即,上面的原始输出中的每一行)是单个字(也称为记录密钥,例如 “kafka”)的更新计数。对于具有相同键的多个记录,每个稍后的记录是对前一个记录的更新。
下面的两个图说明了幕后发生的事情。第一列显示计数的字出现的 KTable <String,Long> 的当前状态的演变。第二列显示从 KTable 的状态更新产生并且正在发送到输出 Kafka topic * streams-wordcount-output * 的更改记录。
流程图1 流程图2首先,正在处理文本行 “所有流导向 kafka”。 KTable 正在构建,因为每个新字导致一个新的表条目(以绿色背景突出显示),并且相应的更改记录被发送到下游 *KStream *。
当处理第二个文本行 “hello kafka streams” 时,我们首次观察到 *KTable * 中的现有条目正在更新(这里:对于单词 “kafka” 和 “streams”)。再次,更改记录正在发送到输出 topic。
等等(我们跳过第三行如何处理的插图)。这解释了为什么输出 topic 有我们上面显示的内容,因为它包含了更改的完整记录。
超越这个具体示例的范围,Kafka Streams 在这里做的是利用表和 changelog 流之间的对偶性(这里:table = KTable,changelog stream =下游KStream):你可以发布表到流,并且如果从头到尾使用整个 changelog 流,则可以重建表的内容。
现在,您可以向 streams-file-input topic 写入更多输入消息,并观察添加到 streams-wordcount-output topic 的其他消息,反映更新的字数(例如,如上所述使用 consumer 控制台 和 producer 控制台)。
你可以通过 Ctrl-C 停止 consumer 控制台。
网友评论