kafka

作者: 温岭夹糕 | 来源:发表于2024-08-08 10:08 被阅读0次

写在开头

仅用于自己学习记录,本章节学习目的是快速上手kafka和体验go操作kafka

目录

文章目录

1.消息队列

消息队列MQ(Message Queue)也被称为中间件,不存储消息内容,只是消息的搬运工,具体表现在:

  • 在不同进程间传递消息
  • 在统一进程的不同线程间传递消息
消息队列的基本形态,就是有N个生产者,N个消费者 image.png

在该模式下,生产者只需要向消息队列投递消息,生产者只需要等消息队列搬运消息,此时,生产者和消费者就解耦了

2.Kafka

kafka是一个分布式,支持多分区,基于zookeeper的分布式消息流平台(元数据都保存在zookeeper中,因此3.7版本之前都需要先安装zookeeper)它同时也是一款开源的基于发布订阅模式的消息引擎系统

为什么要学习kafka,对于数据密集型应用来说,kafka能很好帮助我们应对数据量的激增,举个例子,上游比如是300个示例的大型数据中心,下游是一个搜索和查询的引擎,中间件使用kafka隔离上下游业务,将上游激增的流量缓存起来,以平滑的方式传到到下游子系统中,避免了流量的不规则冲击。

2.1消息引擎系统

看名字就知道,它比MQ逼格更高,wiki上的介绍是

消息引擎是一组规范,企业利用这组规范在不同系统之间传递准确的消息,实现松耦合的异步式数据传递

即:

  1. 用于不同系统之间
  2. 传输的对象是消息

这么一看是不是和MQ大差不差,但是之所以把他叫做引擎,是它能把消息转换成一定的格式,即如何传输消息,如何设计待传输消息的格式都属于消息引擎设计的一部分(摩托车引擎把燃油转为动能,消息引擎也是如此,所以才叫引擎)。
实际上kafka在传输时使用的是纯二进制的字节序列

2.2为什么使用kafka

在这章开头举了如何对抗峰值流量例子,就是削峰填谷,缓冲上下游突发的流量,使其平滑,来保护下游服务

2.3Kafka术语

极客时间中有趣的解释

image.png
  • 消息:record,指kafka处理的主要对象,类比就是数据库表中的一行记录

  • 生产者/消费者:指发布/消费消息的应用程序

  • 主题:Topic,承载消息的容器,类比就是数据库中的表,更直观点解释就是一个业务就是一个topic image.png
  • 分区:一个有序不变的消息序列,每个主题下可以有多个分区 image.png
  • 消息位移offset:分区中每条消息的位置

  • 副本replica:Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本,副本分为领导者副本和追随者副本,生产者只与领导者副本交互

  • 消费者组:多个消费者实例组成一个组,同时消费多个消息以实现提高吞吐量(如果一个 topic 有 N 个分区,那么同一个消费组最多有 N 个消费者。多于这个数字的消
    费者会被忽略。)

  • 消费者位移:表示消费者消费进度,每个消费者都有自己的消费者位移

  • 重平衡:组内某个消费者挂了,其他实例自动重新分配订阅主题分区的过程

2.4集群配置参数

2.4.1Broker端参数

Broker需要配置存储信息,即Broker使用哪些磁盘,针对存储信息的重要参数有以下几个:

  • log.dirs:指定Broker需要使用的若干个文件目录路径,没有默认值必须手动指定
  • log.dir:补充上一个参数
    实际只需要配置log.dirs即可,线上生产环境一定要配置多个路径(提升读写性能,实现故障转移),采用CSV格式(用逗号分隔多个路径,如/home/kafka1,/home/kafka2,/home/kafka3)

与zooKeeper相关设置:

  • zookeeper.connect :zooKeeper集群连接,采用csv格式(zk1:2181,zk2:2181,zk3:2181)

Broker连接相关(客户端连接或与其他broker连接)

  • listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
  • advertised.listeners 这组监听器是 Broker 用于对外发布的,即外网访问
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了

Topic相关

  • auto.create.topics.enable:是否允许自动创建 Topic ,推荐设置false
  • unclean.leader.election.enable:是否允许 Unclean Leader 选举,建议设置成false ,坚决不能让那些落后太多的副本竞选 Leader
  • auto.leader.rebalance.enable是否允许定期进行 Leader 选举,推荐设置成false,在生产环境中换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

数据保留

  • log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间
  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小,默认值-1,表示保存多少数据都可以
  • message.max.bytes:控制 Broker 能够接收的最大消息大小

2.4.1 Topic级别参数

Kafka支持为不同的topic设置不同的参数值,Topic级别参数会覆盖全局broker参数

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间

如何设置topic级别参数?

  1. 创建时设置( Kafka 开放了kafka-topics命令供我们来创建 Topic,--config用于设置topic级别参数 )
  2. 修改时设置(更推荐使用该种)

3.快速上手kafka

我用的3.7.0 不需要额外安装zookeeper
参考Docker---apache/kafka

sudo docker pull apache/kafka:3.7.0

sudo docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0

但是这种方式有一个弊端,我的kafka是安装在云服务器上的,本地的windows上无法访问!!这时我们想到可能是上面的参数在作祟

advertised.listeners

我们进入容器查看

sudo docker  exec -it kafka /bin/bash

cd opt/kafka/config

cat server.properties | grep listeners

发现advertised.listeners的值为localhost:9092,只允许本地访问,我们需要将他修改成以下形式

//我在云服务器上,这个ip就是我云服务器的弹性公网ip
PLAINTEXT://ip:9092

但是很遗憾,在docker里该文件是只读,我们也没root权限,那么是否启动时修改配置参数就行了,可以,但很麻烦,根据kafka的docker介绍

Apache Kafka 支持多种代理配置,您可以通过环境变量覆盖这些配置。环境变量必须以 开头KAFKA_,代理配置中的任何点都应在相应的环境变量中指定为下划线。
需要注意的是,如果您要覆盖任何配置,则不会使用任何默认配置。

没错你不能光写一个

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localxxxxxx:9092 

还要连其他的一起补充

docker run -d  \
  --name broker \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  apache/kafka:latest

学习成本蹭蹭上去了,所以我选择直接复制一个配置文件给他启动
将kafka里的配置拷贝出一份

mkdir -p config
sudo docker  kafka:/opt/kafka/config/server.properties ./config

cd config
vim server.properties //修改成你的弹性公网ip

vim Dockerfile //写入下面两行
FROM apache/kafka:3.7.0
COPY server.properties /etc/kafka/docker

//构建kafka
sudo docker build -t="mykafka:1.0.0" .

//停止并删除之前的容器
sudo docker stop kafka
sudo docker rm kafka

//启动自己封装的镜像
sudo docker run -d -p 9092:9092 --name kafka mykafka:1.0.0

添加topic

//进入容器
sudo docker exec -it kafka /bin/bash
cd opt/kafka/bin

./kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic tests 

检查topic

./kafka-topics.sh --list --bootstrap-server localhost:9092 

3.1go连接kafka

选用sarama 因为用户多,注意现在文件移动到了IBM

go get -u github.com/IBM/sarama

下载消费者模拟工具模拟消费者消费消息

go install github.com/IBM/sarama/tools/kafka-console-consumer@latest

启动成功表示已经能成功连接远程kafka

 kafka-console-consumer -topic tests -brokers ip地址:9092

编写生产者

func TestProducer(t *testing.T) {
    cfg := sarama.NewConfig()
    cfg.Producer.Return.Successes = true
    cfg.Producer.Return.Errors = true
    cfg.Version = sarama.MaxVersion

    borkers := []string{"xxxxxxxx:9092"}

    producer, err := sarama.NewAsyncProducer(borkers, cfg)
    assert.NoError(t, err)
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "tests",
        Value: sarama.StringEncoder("hello"),
    }

    producer.Input() <- msg

    select {
    case success := <-producer.Successes():
        t.Log(success.Partition, success.Offset)
                return
    case err := <-producer.Errors():
        t.Log("发送失败", err)
    }

}

消费端输出如下

Partition:      0
Offset: 2    
Key:
Value:  hello

生产者输出如下:

0,2

相关文章

网友评论

      本文标题:kafka

      本文链接:https://www.haomeiwen.com/subject/ohtckjtx.html