美文网首页
Kafka学习笔记一

Kafka学习笔记一

作者: 第八共同体 | 来源:发表于2019-09-27 14:30 被阅读0次

    1.简介

    kafka是一个分布式的流平台,一个流平台有以下三个能力:

    • 发布和订阅记录流,就像一个消息队列或者企业级消息系统
    • 以容错的持久方式存储记录流。
    • 在记录发生时处理记录流。

    Kafka通常用于两大类应用程序:

    • 构建实时流数据管道,在系统或应用程序之间可靠地获取数据
    • 构建实时流应用程序,转换或响应数据流

    为了理解Kafka是如何做这些事情的,让我们深入研究一下Kafka的能力。
    首先是几个概念:

    • Kafka作为集群运行在一个或多个服务器上,这些服务器可以跨多个数据中心
    • Kafka集群将记录流存储在称为主题的类别中。
    • 每条记录由一个键、一个值和一个时间戳组成。

    卡夫卡有四个核心api:

    • 生产者接口,允许一个应用发布记录流到一个或者多个kafka主题中
    • 消费者接口,允许一个用用订阅一个或者多个主题并且处理产生给它们的记录流。
    • 流接口,允许一个应用作为一个流处理器,从一个或者多个主题中消费一个输入流,产生一个输出流到一个或者多个主题中,有效的转换输入流到输出流。
    • 连接接口,允许构建和运行可重用的生产者或消费者,将Kafka主题连接到现有的应用程序或数据系统。例如,关系数据库的连接器可能捕获对表的每个更改。

    在Kafka中,客户端和服务器之间的通信使用简单、高性能、语言无关的TCP协议完成。该协议是版本化的,并与旧版本保持向后兼容性。我们为卡夫卡提供了一个Java客户端,但是客户端有多种语言版本。

    Topics and Logs

    让我们首先深入卡夫卡为记录流提供的核心抽象——主题。
    主题是记录发布到的类别或提要名称,主题在kafka中常是多订阅用户的,也就是说,一个主题可以有零个、一个或多个订阅写入其中的数据的消费者。

    对于每个主题,Kafka集群维护一个分区日志,每个分区都是一个有序的,不变的记录序列,这些记录连续地附加到一个结构化的提交日志中。分区中的每条记录都被分配了一个名为偏移量的连续id号,该偏移量惟一地标识分区中的每条记录。

    卡夫卡群集使用可配置的保留期持久保存所有已发布的记录,无论它们是否已被使用。例如,如果保留策略设置为两天,则在记录发布后的两天内,该记录可供使用,之后将被丢弃以释放空间。Kafka的性能在数据大小方面是稳定的,所以长时间存储数据不是问题。

    事实上,基于每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。该偏移量由消费者控制:通常消费者在读取记录时会线性地推进偏移量,但是,事实上,因为位置是由消费者控制的,所以它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为较旧的偏移量来重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。

    这些功能的结合意味着卡夫卡的消费者非常便宜——他们可以来去自由,而不会对集群或其他消费者产生太大影响。例如,您可以使用我们的命令行工具来“跟踪”任何主题的内容,而不改变任何现有消费者所消费的内容。

    日志中的分区有几个目的。首先,它们允许日志扩展到适合单个服务器的大小之外。每个单独的分区必须适合托管它的服务器,但是一个主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们充当并行性的单位——稍后会详细介绍。

    Distribution

    日志的分区分布在卡夫卡集群中的服务器上,每个服务器处理数据和分区共享请求。为了容错,每个分区在可配置数量的服务器上复制。

    每个分区有一个充当“领导者”的服务器和零个或多个充当“追随者”的服务器。领导者处理分区的所有读写请求,而追随者被动地复制领导者。如果领导者失败,其中一个追随者将自动成为新的领导者。每个服务器充当它的一些分区的领导者和其他分区的追随者,因此集群内的负载非常平衡。

    Geo-Replication

    Kafka MirrorMaker为集群提供地理复制支持。使用MirrorMaker,消息可以跨多个数据中心或云区域复制。您可以在主动/被动情况下使用它进行备份和恢复;或者在主动/主动场景中,将数据放置得更靠近用户,或者支持数据局部性要求。

    Producers

    生产者发布数据到选择的主题上,生产者负责选择将哪个记录分配给主题中的哪个分区。这可以简单地通过循环方式来平衡负载,也可以根据一些语义分区函数(比如基于记录中的某个键)来完成。稍后将详细介绍分区的使用!

    Consumers

    消费者用一个消费者组名称来标记自己,发布到一个主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的过程中,也可以在不同的机器上。

    如果所有的消费者实例具有相同的消费者组,那么记录将在消费者实例上有效地负载平衡。

    如果所有的消费者实例都有不同的消费者组,那么每个记录将被广播给所有的消费者进程。

    一个双服务器卡夫卡集群托管四个分区(P0-P3),有两个消费群。消费者组甲有两个消费者实例,组乙有四个。

    然而,更常见的是,我们发现主题有少量的消费者群体,每个“逻辑订户”有一个。每个组由许多可伸缩性和容错的消费者实例组成。这只不过是发布-订阅语义,其中订阅者是一群消费者,而不是一个流程。

    卡夫卡实现消费的方式是在消费实例上划分日志中的分区,这样每个实例在任何时间点都是分区“公平份额”的唯一消费方。卡夫卡协议动态地处理保持团体成员身份的过程。如果新实例加入该组,它们将从该组的其他成员那里接管一些分区;如果一个实例死亡,它的分区将被分配给其余的实例。

    卡夫卡只提供了一个分区内记录的总顺序,而不是一个主题中不同分区之间的顺序。对于大多数应用程序来说,按分区排序以及按键划分数据的能力就足够了。但是,如果您需要对记录进行总排序,这可以通过只有一个分区的主题来实现,尽管这意味着每个消费者组只有一个消费者进程。

    Multi-tenancy

    您可以将kafka部署为多租户解决方案。多租户是通过配置哪些主题可以生产或消费数据来实现的。配额也有运营支持。管理员可以对请求定义和实施配额,以控制客户端使用的代理资源。有关更多信息,请参见安全文档。

    Guarantees

    kafka给出以下保障:

    • 生产者发送到特定主题分区的消息将按照发送顺序进行附加。也就是说,如果记录M1和M2是由同一个生产者发送的,而M1是第一个发送的,那么M1的偏移量将低于M2,并且会出现在日志中的更早位置
    • 消费者实例按照记录在日志中的存储顺序查看记录。
    • 对于复制因子为N的主题,我们将容忍多达N-1个服务器故障,而不会丢失提交给日志的任何记录。

    文档的Design部分给出了关于这些保证的更多细节。

    Kafka as a Messaging System

    Kafka的流概念与传统的企业消息系统相比如何?
    消息传递传统上有两种模式:队列和发布订阅。在一个队列中,一个消费者池可以从服务器中读取,并且每个记录都到达其中一个;在发布-订阅中,记录被广播给所有消费者。这两种模式各有优缺点。队列的优势在于,它允许您将数据处理划分到多个消费者实例上,这使您可以扩展您的处理。不幸的是,队列不是多用户的——一旦一个进程读取了数据,它就消失了。发布-订阅允许您向多个进程广播数据,但是没有扩展处理的方法,因为每个消息都发送给每个订阅者。

    卡夫卡的消费组概念概括了这两个概念。与队列一样,消费者组允许您将处理划分为多个进程(消费者组的成员)。如同发布订阅一样,卡夫卡允许你向多个消费者组广播信息。

    卡夫卡模型的优点是每个主题都有这两个属性——它可以扩展处理,也是多用户的——没有必要选择其中一个。

    卡夫卡也比传统的信息系统有更强的顺序保证。

    传统的队列在服务器上按顺序保留记录,如果多个消费者从队列中消费,那么服务器将按照记录的存储顺序分发记录。然而,尽管服务器按顺序分发记录,但记录是异步传递给消费者的,因此它们可能会在不同的消费者中无序到达。这实际上意味着在并行消费的情况下,记录的排序会丢失。消息传递系统通常通过“独占消费者”的概念来解决这个问题,该概念只允许一个进程从队列中消费,但这当然意味着在处理中没有并行性。

    卡夫卡做得更好。通过在主题中引入并行性(分区)的概念,卡夫卡能够在消费者进程池中提供排序保证和负载平衡。这是通过将主题中的分区分配给消费者组中的消费者来实现的,这样每个分区正好被组中的一个消费者使用。通过这样做,我们确保消费者是该分区的唯一读者,并且按顺序消费数据。由于有许多分区,这仍然平衡了许多消费者实例的负载。但是,请注意,消费者组中的消费者实例不能多于分区。

    Kafka as a Storage System

    任何允许发布消息和消费消息分离的消息队列都是同样作为一个存储系统。Kafka不同的地方在于,他是一个很好的存储系统。

    写入卡夫卡的数据被写入磁盘并复制以实现容错。卡夫卡允许生产者等待确认,这样写操作就不会被认为是完整的,直到它被完全复制,并保证存留,即使写入的服务器发生故障。

    卡夫卡的磁盘结构很好地利用了可伸缩性——无论服务器上有50 KB还是50 TB的持久数据,卡夫卡都会执行相同的操作。

    由于重视存储并允许客户端控制其读取位置,您可以将卡夫卡视为一种专用分布式文件系统,专门用于高性能、低延迟的提交日志存储、复制和传播。

    有关卡夫卡提交日志存储和复制设计的详细信息,请阅读本页

    Kafka for Stream Processing

    仅仅读取、写入和存储数据流是不够的,目的是实现数据流的实时处理。

    在Kafka中,流处理器是指从输入主题获取连续的数据流,对这个输入执行一些处理,并产生连续的数据流到输出主题。

    例如,一个零售应用程序可能接受销售和发货的输入流,并根据这些数据计算出重新订购和价格调整的输出流。

    可以直接使用生产者和消费者APIs进行简单的处理。然而,对于更复杂的转换,卡夫卡提供了一个完全集成的流应用编程接口。这允许构建执行非平凡处理的应用程序,这些应用程序通过流计算聚合或将流连接在一起。

    此功能有助于解决此类应用程序所面临的难题:处理无序数据、在代码更改时重新处理输入、执行有状态计算等等。

    流应用编程接口建立在卡夫卡提供的核心原语之上:它使用生产者和消费者应用编程接口作为输入,使用卡夫卡作为状态存储,并在流处理器实例之间使用相同的组机制作为容错。

    Putting the Pieces Together

    这种消息传递、存储和流处理的结合可能看起来不寻常,但对于卡夫卡作为流平台的角色来说却是至关重要的。

    像HDFS这样的分布式文件系统允许为批处理存储静态文件。像这样的系统实际上允许存储和处理过去的历史数据。

    传统的企业消息系统允许处理您订阅后将到达的未来消息。以这种方式构建的应用程序在未来数据到达时对其进行处理。

    卡夫卡结合了这两种能力,这种结合对于卡夫卡作为流媒体应用平台的使用以及流媒体数据管道都至关重要。

    通过结合存储和低延迟订阅,流式应用程序可以以相同的方式处理过去和未来的数据。也就是说,单个应用程序可以处理历史的、存储的数据,但不会在到达最后一条记录时结束,而是可以在未来数据到达时继续处理。这是流处理的广义概念,包括批处理和消息驱动的应用程序。

    同样,对于流式数据管道,对实时事件的订阅的组合使得将卡夫卡用于非常低延迟的管道成为可能;但是可靠地存储数据的能力使得它可以用于必须保证数据交付的关键数据,或者用于与离线系统集成,离线系统仅周期性地加载数据,或者可能长时间停机进行维护。流处理设施使数据到达时进行转换成为可能。

    有关卡夫卡提供的保证、APIs和功能的更多信息,请参见其他文档

    2.使用案例

    下面是kafka的一些常见的使用案例

    Messaging

    Kafka可以很好地替代传统的消息代理。消息代理用于各种各样的原因(将处理与数据生成器解耦,缓冲未处理的消息,等等)。与大多数消息传递系统相比,Kafka具有更好的吞吐量、内置分区、复制和容错能力,这使它成为大型消息处理应用程序的一个很好的解决方案。

    根据我们的经验,消息传递的使用通常是相对较低的吞吐量,但是可能需要较低的端到端延迟,并且通常取决于卡夫卡提供的强大的持久性保证。

    在这一领域是卡夫卡比得上传统的消息系统,例如ActiveMQ的或RabbitMQ的。

    Website Activity Tracking

    卡夫卡最初的使用案例是能够将用户活动跟踪管道重建为一组实时发布订阅源。这意味着网站活动(页面视图、搜索或用户可能采取的其他操作)发布到中心主题,每个活动类型有一个主题。这些源可用于订阅一系列用例,包括实时处理、实时监控,以及加载到Hadoop或离线数据仓库系统中进行离线处理和报告。

    活动跟踪通常非常大,因为每个用户页面视图都会生成许多活动消息。

    Metrics

    Kafka通常用于操作监控数据。这包括聚合来自分布式应用程序的统计信息,以生成操作数据的集中提要。

    Log Aggregation

    许多人用卡夫卡来代替日志聚合解决方案。日志聚合通常从服务器收集物理日志文件,并将它们放在中心位置(可能是文件服务器或HDFS)进行处理。卡夫卡将文件的细节抽象化,并将日志或事件数据更清晰地抽象化为消息流。这允许更低延迟的处理,更容易支持多个数据源和分布式数据消耗。与以日志为中心的系统(如抄写员或水槽)相比,卡夫卡提供了同样好的性能、更强的复制耐久性保证以及低得多的端到端延迟。

    Stream Processing

    卡夫卡的许多用户在由多个阶段组成的处理管道中处理数据,在这些阶段,原始输入数据从卡夫卡主题中被消费,然后被聚集、丰富或以其他方式转换成新的主题以供进一步消费或后续处理。例如,用于推荐新闻文章的处理管道可能从RSS源抓取文章内容,并将其发布到“文章”主题;进一步的处理可能会对该内容进行规范化或重复数据消除,并将清理后的文章内容发布到新主题;最后的处理阶段可能会尝试向用户推荐该内容。这种处理流水线基于各个主题创建实时数据流的图形。从0.10.0.0开始,Apache Kafka提供了一个称为Kafka Streams的轻量级但功能强大的流处理库来执行上述数据处理。除了卡夫卡流,其他开源流处理工具包括阿帕奇风暴(Apache Storm)和阿帕奇萨姆扎(Apache Samza)。

    Event Sourcing

    事件源是应用程序设计的一种风格,其中状态变化被记录为按时间顺序排列的记录序列。卡夫卡对非常大的存储日志数据的支持使得它成为以这种风格构建的应用程序的优秀后端。

    Commit Log

    Kafka可以作为分布式系统的一种外部提交日志。日志帮助在节点之间复制数据,并充当失败节点恢复数据的重新同步机制。Kafka中的日志压缩特性有助于支持这种用法。在这种用法中,Kafka类似于Apache BookKeeper项目。

    快速开始

    本教程假设你是新手,并且没有Kafka和Zookeeper数据,因为kafka控制台脚本在unix和windows平台是不同的,在windows平台上,使用bin\windows\ 取代bin/,并且脚本扩展名为.bat

    1.下载并解压

    下载地址2.3.0版本

    tar -xzf kafka_2.12-2.3.0.tgz
    cd kafka_2.12-2.3.0
    

    2.启动服务

    kafka使用Zookeeper,所以你需要首先启动一个Zookeeper服务器,你可以使用包中的脚本,开启一个单节点的Zookeeper实例:

    ➜  kafka_2.12-2.3.0 bin/zookeeper-server-start.sh config/zookeeper.properties
    [2019-09-27 13:13:12,607] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2019-09-27 13:13:12,612] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
    [2019-09-27 13:13:12,613] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
    [2019-09-27 13:13:12,613] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
    [2019-09-27 13:13:12,613] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
    [2019-09-27 13:13:12,641] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2019-09-27 13:13:12,641] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
    [2019-09-27 13:13:12,674] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
    [2019-09-27 13:13:12,674] INFO Server environment:host.name=192.168.44.193 (org.apache.zookeeper.server.ZooKeeperServer)
    [2019-09-27 13:13:12,674] INFO Server environment:java.version=1.8.0_211 (org.apache.zookeeper.server.ZooKeeperServer)
    [2019-09-27 13:13:12,674] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
    [2019-09-27 13:13:12,674] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer)
    ......
    

    现在就可以开启kafka服务了

    ➜  kafka_2.12-2.3.0 bin/kafka-server-start.sh config/server.properties
    [2019-09-27 13:14:35,753] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2019-09-27 13:14:36,630] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
    [2019-09-27 13:14:36,631] INFO starting (kafka.server.KafkaServer)
    [2019-09-27 13:14:36,633] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
    [2019-09-27 13:14:36,674] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
    [2019-09-27 13:14:36,687] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
    [2019-09-27 13:14:36,687] INFO Client environment:host.name=192.168.44.193 (org.apache.zookeeper.ZooKeeper)
    [2019-09-27 13:14:36,687] INFO Client environment:java.version=1.8.0_211 (org.apache.zookeeper.ZooKeeper)
    [2019-09-27 13:14:36,687] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
    [2019-09-27 13:14:36,687] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
    ......
    

    现在,让我们创建一个主题,名为test,只有一个分区,一个副本

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    

    现在查看主题,运行以下命令:

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    
    
    
    test
    

    此外,您还可以配置代理,使其在发布不存在的主题时自动创建主题,而不是手动创建主题。

    卡夫卡有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送给卡夫卡集群。默认情况下,每一行都将作为单独的消息发送。
    运行生成器,然后在控制台中键入一些消息发送到服务器:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    

    卡夫卡还有一个命令行消费者,可以将消息转储到标准输出中。

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message
    

    如果您在不同的终端上运行上述每个命令,那么您现在应该能够在生产者终端中键入消息,并看到它们出现在消费者终端中。

    所有的命令行工具都有额外的选项;在没有参数的情况下运行该命令将显示详细记录它们的使用信息。

    最后,设置多代理集群

    到目前为止,我们一直在运行一个单一代理,但这一点都不好玩。对kafka来说,一个单一的代理只是一个大小为1的集群,所以除了再启动几个代理实例之外,没有什么变化。但是为了感受一下,让我们将集群扩展到三个节点(仍然都在本地机器上)。

    首先,我们为每个代理创建一个配置文件(在windows平台上使用copy命令):

    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    

    现在编辑新的文件,设置以下属性:

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2
    

    broker.id属性是群集中每个节点的唯一和永久名称。我们必须覆盖端口和日志目录,只是因为我们在同一台机器上运行这些目录,并且我们希望阻止代理在同一端口上注册或覆盖彼此的数据

    我们已经有了Zookeeper,我们的单个节点已经启动,所以我们只需要启动两个新节点:

    > bin/kafka-server-start.sh config/server-1.properties &
    ...
    > bin/kafka-server-start.sh config/server-2.properties &
    ...
    

    现在创建一个复制因子为3的新主题:

    > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    

    好吧,但是现在我们有了一个集群,我们怎么知道哪个代理在做什么?要查看,运行“describe topics”命令:

    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
        
        
    

    以下是对输出的解释。第一行给出了所有分区的概要,每一行给出了关于一个分区的信息。因为这个主题只有一个分区,所以只有一行。

    • “leader”是负责给定分区的所有读写的节点。每个节点将是随机选择的分区部分的领导者。
    • “replicas”是复制这个分区日志的节点列表,无论它们是主节点还是当前活动节点。
    • “isr”是一组“同步”副本。这是副本列表的子集,它当前是活动的,并赶上了领导者。

    请注意,在我的示例中,节点1是主题唯一分区的领导者。

    我们可以对我们创建的原始主题运行相同的命令来查看它在哪里:

    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    

    因此,一点也不奇怪--原来的主题没有副本且位于0服务节点上,也是我们创建的集群中唯一的服务节点。

    让我们在新的主题上发布一些消息:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    现在,来消费这些消息:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    现在让我们测试一下容错能力。节点1是领导者,所以让我们杀了它:

    > ps aux | grep server-1.properties
    7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
    > kill -9 7564
    

    在windows平台上,可使用:

    > wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
    ProcessId
    6016
    > taskkill /pid 6016 /f
    

    领导层已切换到其中一个追随者,节点1不再在同步副本集中:

    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
    

    但是这些信息仍然可以被消费,即使最初记录这些信息的领导者已经离开了:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    从控制台写入数据并将其写入控制台是一个方便的起点,但是您可能希望使用其他来源的数据或将Kafka的数据导出到其他系统。对于许多系统,您可以使用Kafka Connect导入或导出数据,而不是编写定制的集成代码。

    Kafka Connect是Kafka附带的一个工具,可以导入和导出数据到Kafka。它是一个运行连接器的可扩展工具,连接器实现了与外部系统交互的自定义逻辑。在这个快速入门中,我们将看到如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题,并将数据从Kafka主题导出到文件。

    首先,我们将创建一些用于测试的种子数据:

    > echo -e "foo\nbar" > test.txt
    

    在windows平台:

    > echo foo> test.txt
    > echo bar>> test.txt
    

    接下来,我们将启动两个以独立模式运行的连接器,这意味着它们运行在单个本地专用进程中。我们提供了三个配置文件作为参数。第一种始终是Kafka连接进程的配置,包括要连接的常见配置,如Kafka代理和数据的序列化格式。其余的配置文件每个都指定要创建的连接器。这些文件包括一个惟一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

    > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    

    kafka附带的这些示例配置文件使用了您之前启动的默认本地集群配置,并创建了两个连接器:第一个是源连接器,它从输入文件中读取行,并将每一行发送到一个kafka主题;第二个是接收连接器,它从kafka主题中读取消息,并将每一行生成输出文件中的行。

    在启动过程中,您会看到许多日志消息,包括一些指示连接器正在实例化的消息。一旦kafka连接进程开启,源连接器应该开始从test.txt中读取行,并将其生成到主题connect-test,而接收连接器应该开始从主题connect-test中读取消息,并将其写入文件test.sink.txt.我们可以通过检查输出文件的内容来验证数据是否已经通过整个管道传送:

    > more test.sink.txt
    foo
    bar
    

    数据被存贮在kafka名为connect-test的主题中,所以,我们可以运行一个控制台消费者来查看主题中的数据,如下:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}
    ...
    

    连接器继续处理数据,因此我们可以将数据添加到文件中,并看到它在管道中移动:

    > echo Another line>> test.txt
    

    您应该看到这一行出现在控制台消费者输出和接收器文件中。

    kafka Streams是一个客户库,用于构建关键任务实时应用和微服务,其中输入和/或输出数据存储在卡夫卡集群中。kafka Streams将在客户端编写和部署标准的Java和Scala应用程序的简单性与卡夫卡的服务器端集群技术的优势结合起来,使这些应用程序具有高度的可伸缩性、弹性、容错性、分布式等等。这个快速入门示例将演示如何运行在这个库中编码的流式应用程序。

    Ecosystem

    在主要发行版之外,有太多的工具与卡夫卡相结合。生态系统页面列出了其中的许多,包括流处理系统、Hadoop集成、监控和部署工具。

    相关文章

      网友评论

          本文标题:Kafka学习笔记一

          本文链接:https://www.haomeiwen.com/subject/udfauctx.html