kafka

作者: 温岭夹糕 | 来源:发表于2024-08-08 10:08 被阅读0次

    写在开头

    仅用于自己学习记录,本章节学习目的是快速上手kafka和体验go操作kafka

    目录

    文章目录

    1.消息队列

    消息队列MQ(Message Queue)也被称为中间件,不存储消息内容,只是消息的搬运工,具体表现在:

    • 在不同进程间传递消息
    • 在统一进程的不同线程间传递消息
    消息队列的基本形态,就是有N个生产者,N个消费者 image.png

    在该模式下,生产者只需要向消息队列投递消息,生产者只需要等消息队列搬运消息,此时,生产者和消费者就解耦了

    2.Kafka

    kafka是一个分布式,支持多分区,基于zookeeper的分布式消息流平台(元数据都保存在zookeeper中,因此3.7版本之前都需要先安装zookeeper)它同时也是一款开源的基于发布订阅模式的消息引擎系统

    为什么要学习kafka,对于数据密集型应用来说,kafka能很好帮助我们应对数据量的激增,举个例子,上游比如是300个示例的大型数据中心,下游是一个搜索和查询的引擎,中间件使用kafka隔离上下游业务,将上游激增的流量缓存起来,以平滑的方式传到到下游子系统中,避免了流量的不规则冲击。

    2.1消息引擎系统

    看名字就知道,它比MQ逼格更高,wiki上的介绍是

    消息引擎是一组规范,企业利用这组规范在不同系统之间传递准确的消息,实现松耦合的异步式数据传递

    即:

    1. 用于不同系统之间
    2. 传输的对象是消息

    这么一看是不是和MQ大差不差,但是之所以把他叫做引擎,是它能把消息转换成一定的格式,即如何传输消息,如何设计待传输消息的格式都属于消息引擎设计的一部分(摩托车引擎把燃油转为动能,消息引擎也是如此,所以才叫引擎)。
    实际上kafka在传输时使用的是纯二进制的字节序列

    2.2为什么使用kafka

    在这章开头举了如何对抗峰值流量例子,就是削峰填谷,缓冲上下游突发的流量,使其平滑,来保护下游服务

    2.3Kafka术语

    极客时间中有趣的解释

    image.png
    • 消息:record,指kafka处理的主要对象,类比就是数据库表中的一行记录

    • 生产者/消费者:指发布/消费消息的应用程序

    • 主题:Topic,承载消息的容器,类比就是数据库中的表,更直观点解释就是一个业务就是一个topic image.png
    • 分区:一个有序不变的消息序列,每个主题下可以有多个分区 image.png
    • 消息位移offset:分区中每条消息的位置

    • 副本replica:Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本,副本分为领导者副本和追随者副本,生产者只与领导者副本交互

    • 消费者组:多个消费者实例组成一个组,同时消费多个消息以实现提高吞吐量(如果一个 topic 有 N 个分区,那么同一个消费组最多有 N 个消费者。多于这个数字的消
      费者会被忽略。)

    • 消费者位移:表示消费者消费进度,每个消费者都有自己的消费者位移

    • 重平衡:组内某个消费者挂了,其他实例自动重新分配订阅主题分区的过程

    2.4集群配置参数

    2.4.1Broker端参数

    Broker需要配置存储信息,即Broker使用哪些磁盘,针对存储信息的重要参数有以下几个:

    • log.dirs:指定Broker需要使用的若干个文件目录路径,没有默认值必须手动指定
    • log.dir:补充上一个参数
      实际只需要配置log.dirs即可,线上生产环境一定要配置多个路径(提升读写性能,实现故障转移),采用CSV格式(用逗号分隔多个路径,如/home/kafka1,/home/kafka2,/home/kafka3)

    与zooKeeper相关设置:

    • zookeeper.connect :zooKeeper集群连接,采用csv格式(zk1:2181,zk2:2181,zk3:2181)

    Broker连接相关(客户端连接或与其他broker连接)

    • listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
    • advertised.listeners 这组监听器是 Broker 用于对外发布的,即外网访问
    • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了

    Topic相关

    • auto.create.topics.enable:是否允许自动创建 Topic ,推荐设置false
    • unclean.leader.election.enable:是否允许 Unclean Leader 选举,建议设置成false ,坚决不能让那些落后太多的副本竞选 Leader
    • auto.leader.rebalance.enable是否允许定期进行 Leader 选举,推荐设置成false,在生产环境中换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

    数据保留

    • log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间
    • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小,默认值-1,表示保存多少数据都可以
    • message.max.bytes:控制 Broker 能够接收的最大消息大小

    2.4.1 Topic级别参数

    Kafka支持为不同的topic设置不同的参数值,Topic级别参数会覆盖全局broker参数

    • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天
    • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间

    如何设置topic级别参数?

    1. 创建时设置( Kafka 开放了kafka-topics命令供我们来创建 Topic,--config用于设置topic级别参数 )
    2. 修改时设置(更推荐使用该种)

    3.快速上手kafka

    我用的3.7.0 不需要额外安装zookeeper
    参考Docker---apache/kafka

    sudo docker pull apache/kafka:3.7.0
    
    sudo docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0
    

    但是这种方式有一个弊端,我的kafka是安装在云服务器上的,本地的windows上无法访问!!这时我们想到可能是上面的参数在作祟

    advertised.listeners
    

    我们进入容器查看

    sudo docker  exec -it kafka /bin/bash
    
    cd opt/kafka/config
    
    cat server.properties | grep listeners
    

    发现advertised.listeners的值为localhost:9092,只允许本地访问,我们需要将他修改成以下形式

    //我在云服务器上,这个ip就是我云服务器的弹性公网ip
    PLAINTEXT://ip:9092
    

    但是很遗憾,在docker里该文件是只读,我们也没root权限,那么是否启动时修改配置参数就行了,可以,但很麻烦,根据kafka的docker介绍

    Apache Kafka 支持多种代理配置,您可以通过环境变量覆盖这些配置。环境变量必须以 开头KAFKA_,代理配置中的任何点都应在相应的环境变量中指定为下划线。
    需要注意的是,如果您要覆盖任何配置,则不会使用任何默认配置。

    没错你不能光写一个

    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localxxxxxx:9092 
    

    还要连其他的一起补充

    docker run -d  \
      --name broker \
      -e KAFKA_NODE_ID=1 \
      -e KAFKA_PROCESS_ROLES=broker,controller \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
      -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
      -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
      -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
      -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
      -e KAFKA_NUM_PARTITIONS=3 \
      apache/kafka:latest
    

    学习成本蹭蹭上去了,所以我选择直接复制一个配置文件给他启动
    将kafka里的配置拷贝出一份

    mkdir -p config
    sudo docker  kafka:/opt/kafka/config/server.properties ./config
    
    cd config
    vim server.properties //修改成你的弹性公网ip
    
    vim Dockerfile //写入下面两行
    FROM apache/kafka:3.7.0
    COPY server.properties /etc/kafka/docker
    
    //构建kafka
    sudo docker build -t="mykafka:1.0.0" .
    
    //停止并删除之前的容器
    sudo docker stop kafka
    sudo docker rm kafka
    
    //启动自己封装的镜像
    sudo docker run -d -p 9092:9092 --name kafka mykafka:1.0.0
    

    添加topic

    //进入容器
    sudo docker exec -it kafka /bin/bash
    cd opt/kafka/bin
    
    ./kafka-topics.sh --create --bootstrap-server localhost:9092 \
    --topic tests 
    

    检查topic

    ./kafka-topics.sh --list --bootstrap-server localhost:9092 
    

    3.1go连接kafka

    选用sarama 因为用户多,注意现在文件移动到了IBM

    go get -u github.com/IBM/sarama
    

    下载消费者模拟工具模拟消费者消费消息

    go install github.com/IBM/sarama/tools/kafka-console-consumer@latest
    

    启动成功表示已经能成功连接远程kafka

     kafka-console-consumer -topic tests -brokers ip地址:9092
    

    编写生产者

    func TestProducer(t *testing.T) {
        cfg := sarama.NewConfig()
        cfg.Producer.Return.Successes = true
        cfg.Producer.Return.Errors = true
        cfg.Version = sarama.MaxVersion
    
        borkers := []string{"xxxxxxxx:9092"}
    
        producer, err := sarama.NewAsyncProducer(borkers, cfg)
        assert.NoError(t, err)
        defer producer.Close()
    
        msg := &sarama.ProducerMessage{
            Topic: "tests",
            Value: sarama.StringEncoder("hello"),
        }
    
        producer.Input() <- msg
    
        select {
        case success := <-producer.Successes():
            t.Log(success.Partition, success.Offset)
                    return
        case err := <-producer.Errors():
            t.Log("发送失败", err)
        }
    
    }
    

    消费端输出如下

    Partition:      0
    Offset: 2    
    Key:
    Value:  hello
    

    生产者输出如下:

    0,2
    

    相关文章

      网友评论

          本文标题:kafka

          本文链接:https://www.haomeiwen.com/subject/ohtckjtx.html