美文网首页
读书笔记:Kafka简单入门

读书笔记:Kafka简单入门

作者: 东南枝下 | 来源:发表于2020-11-19 10:01 被阅读0次

[TOC]

基础概念

主题(Topic)与分区(Partition)

Kafka中的消息以主题为单位进行归类,主题是一个逻辑上的概念,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。

偏移量(offset)是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性

Kafka保证的是分区有序而不是主题有序。

Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker

多副本(Replica)机制

增加副本数量可以提升容灾能力

同一分区的不同副本中保存的是相同的消息

副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步

leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务

分区中的所有副本统称为AR(Assigned Replicas)。

所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)

与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),

由此可见,AR=ISR+OSR

在正常情况下, AR=ISR,OSR集合为空。

只有在ISR集合中的副本才有资格被选举为新的leader

HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset

分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。

安装Kafka

一个典型的 Kafka 体系架构包括若干 Producer、若干Broker、若干 Consumer

ZooKeeper中共有3个角色:leader、follower和observer

安装 Zookeeper

  1. 下载Zookeepr

    注意不要下载源码版本

    https://zookeeper.apache.org/releases.html

    图片.png
  1. 配置环境变量

    export ZOOKEEPER_HOME=/Users/gy/MyMacDocuments/apache-zookeeper-3.6.2(zookeeper路径)
    export PATH="$PATH:$ZOOKEEPER_HOME/bin"
    
  1. 修改配置文件

    cd $ZOOKEEPER_HOME/conf
    cp zoo_sample.cfg zoo.cfg
    

    修改 zoo.cfg 如下:

    # The number of milliseconds of each tick
    # Zookeeper服务器心跳时间,单位为ms
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    # 投票选举新leader的初始化时间
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    # leader与follower心跳检测最大容忍时间,响应超过 syncLimit * tickTime,
    # leader认为follower死掉,从服务器列表中删除follower
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    # 数据目录
    dataDir=/tmp/zookeeper/data
    # 日志目录
    dataLogDir=/tmp/zookeeper/log
    # the port at which the clients will connect
    # Zookeeper对外服务端口
    clientPort=2181
    
  1. 创建数据和日志目录

    mkdir -p /tmp/zookeeper/data
    mkdir -p /tmp/zookeeper/log
    
  1. 在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。

  2. 启动Zookeepr

    zkServer.sh start  // 启动
    zkServer.sh status // 状态
    zkServer.sh stop //停止
    
  1. 集群模式配置(由于我没有三台机器就算了)

    在这3台机器的/etc/hosts文件中添加3台集群的IP地址与机器域名的映射,如

    192.168.0.2  node1  
    192.168.0.3  node2  
    192.168.0.4  node3  
    

然后在这3台机器的zoo.cfg文件中添加以下配置:

```sh

server.0=192.168.0.2:2888:3888
server.1=192.168.0.3:2888:3888
server.2=192.168.0.4:2888:3888
```

server.A=B:C:D

A: 服务器的编号,myid里面的值

B: 服务器ip地址

C: 服务器与集群中的leader服务器交换信息的端口

D: 表示选举时服务器相互通信的端口

在这3台机器上各自执行zkServer.sh start命令来启动服务

安装Kafka

  1. 下载Kafka

    archive.apache.org/dist/kafka


    图片.png
  1. 解压并修改配置文件

    $KAFKA_HOME/conf/server.properties

    # The id of the broker. This must be set to a unique integer for each broker.
    # broker 编号,如果 集群 中有多个broker,则每个broker编号要设置不同
    broker.id=0
    
    # broker 对外提供的服务入口地址
    listeners=PLAINTEXT://:9092
    
    # A comma separated list of directories under which to store log files
    # 存放消息日志文件地址
    log.dirs=/tmp/kafka-logs
    
    # Kafka 所需的 Zookeeper 集群地址
    zookeeper.connect=localhost:2181/kafka
    
  1. 启动Kafka

    在$KAFKA_HOME目录下执行

    ./bin/kafka-server-start.sh ./config/server.properties //启动     
    ./bin/kafka-server-start.sh -daemon ./config/server.properties //后台启动
    /kafka-server-start.sh ./config/server.properties & //后台启动
    
  1. 查看Kafka服务进程是否已经启动

    jps -l
    
图片.png

Kafka中一些重要的服务端参数

参数 介绍
zookeeper.connect 该参数指明broker要连接的ZooKeeper集群的服务地址(包含端口号),必填,如果集群中有多个节点,则用逗号分开
多个节点:这种情况可以使用chroot 路径???(啥意思,之后再了解) 如果不指定chroot,那么默认使用ZooKeeper的根路径
listeners 该参数指明broker监听客户端连接的地址列表,逗号分隔,默认值为null
格式为 protocol://hostname:port ,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可
不指定主机名,则表示绑定默认网卡
如果主机名是0.0.0.0,则表示绑定所有的网卡
advertised.listeners 作用和listeners类似,默认值也为 null
主要用于IaaS(Infrastructure as a Service)环境
使用场景:多块网卡,有公网,有私网,可以设置advertised.listeners参数绑定公网IP供外部客户端使用,而配置listeners参数来绑定私网IP地址供broker间通信使用。
broker.id 用来指定Kafka集群中broker的唯一标识,默认值为-1。如果没有设置,那么Kafka会自动生成一个。
log.dir和log.dirs Kafka 日志文件存放的根目录
log.dirs用来配置多个根目录(以逗号分隔)
log.dirs 的优先级比 log.dir 高
message.max.bytes 指定broker所能接收消息的最大值,默认值为1000012(B)
Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出RecordTooLargeException的异常

生产者与消费者

生产者:将消息发布到Kafka主题的分区中

消费者:订阅主题从而消费消息

脚本使用

主题相关脚本 : $KAFKA_HOME/bin/kafka-topics.sh

// 创建一个主题
// --zookeeper 指定了Kafka所连接的Zookeeper服务地址
// --create 创建主题的动作
// --topic 创建的主题的名称
// --replication-factor 副本因子 【用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。】
// --partitions 分区个数
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4

// --describe 展示更多主题的具体信息
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe  --topic topic-demo

生产者相关脚本: $KAFKA_HOME/bin/kafka-console-producer.sh

// 发布消息
// --broker-list 连接Kafka集群的地址
// --topic 主题
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo

消费者相关脚本: $KAFKA_HOME/bin/kafka-console-consumer.sh

// 订阅相关topic
// --bootstrap-server 连接的Kafka集群的地址
// --topic 指定主题
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo

Java中使用

  1. 依赖

    <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.0.0</version>
    </dependency>
    
  1. 生产者

    /**
     * 生产者
     *
     * @author Jenson
     */
    public class ProducerFastStart {
    
     private static final String BROKER_LIST = "localhost:9092";
     private static final String TOPIC = "topic-demo";
    
     public static void main(String[] args) {
         Properties properties = new Properties();
         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.put("bootstrap.servers", BROKER_LIST);
    
         // 配置生产者客户端参数,并创建生产者实例
         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
         // 构造所需要发送的消息
         ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "hello kafka!");
    
         // 发送消息
         producer.send(record);
    
         // 关闭生产者客户端
         producer.close();
     }
    }
    
  1. 消费者

    /**
     * 消费者
     *
     * @author Jenson
     */
    public class ConsumerFastStart {
    
     private static final String BROKER_LIST = "localhost:9092";
     private static final String TOPIC = "topic-demo";
     private static final String GROUP_ID = "group.demo";
    
     public static void main(String[] args) {
         Properties properties = new Properties();
         properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("bootstrap.servers", BROKER_LIST);
    
         // 设置消费组名称
         properties.put("group.id", GROUP_ID);
    
         // 创建一个消费者实例
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    
         // 订阅主题
         consumer.subscribe(Collections.singletonList(TOPIC));
    
         // 循环消费消息
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
             for (ConsumerRecord<String, String> record : records) {
                 System.out.println(record.value());
             }
         }
    
    
     }
    }
    

遇到的报错

java 使用kafka-clients时报错

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决

<!--加入依赖-->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.2</version>
</dependency>

相关文章

  • 再看kafka——spring boot集成kafka

    之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习...

  • 读书笔记:Kafka简单入门

    [TOC] 基础概念 主题(Topic)与分区(Partition) Kafka中的消息以主题为单位进行归类,主题...

  • Kafka视频集

    kafka企业级入门实战完整版 Kafka系列教程 Kafka入门 分布式消息通信Kafka原理剖析 阿里架构师直...

  • Kafka学习

    MQ入门总结(六)Kafka的原理和使用 Kafka的架构原理,你真的理解吗? 真的,Kafka 入门一篇文章就够...

  • Kafka快速开始

    入门 1.简介 Kafka is a distributed streaming platform,kafka是一...

  • Kafka学习笔记

    《kafka快速学习入门与实践》 第一讲,Kafka术语 知乎上文章简单解释了为什么要用消息中间件,为了解耦消息(...

  • 【kafka】为什么要学习Kafka?

    KAFKA官方文档入门指南 http://ifeve.com/kafka-1 为什么要学习Kafka? http:...

  • (3)kafka的安装部署以及基本操作

    1.kafka 的安装部署 可以去看kafka的快速入门:http://kafka.apache.org/quic...

  • kafka入门

    Apache Kafka 入门 1.kafka简介和产生的背景 什么是 Kafka Kafka 是一款分布式消息发...

  • kafka极简入门(四)--常用配置

    回顾:kafka极简入门(三)--创建topic 前言 kafka针对broker, topic, produce...

网友评论

      本文标题:读书笔记:Kafka简单入门

      本文链接:https://www.haomeiwen.com/subject/txfkbktx.html