美文网首页
Kafka攻略(入门篇)

Kafka攻略(入门篇)

作者: 若罗 | 来源:发表于2020-04-30 10:58 被阅读0次

    一、环境搭建

    kafka的搭建依赖zookeeper,所以我们先配置zookeeper

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/home/mq/zkdata
    # the port at which the clients will connect
    clientPort=2181
    
    server.1=172.18.182.32:2888:3888
    server.2=172.18.182.33:2888:3888
    server.3=172.18.182.31:2888:3888
    

    zookeeper正常启动后用zkServer.sh status查看状态。
    然后配置Kafka的server.properties文件.

    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    delete.topic.enable=true
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    listeners=PLAINTEXT://:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    advertised.listeners=PLAINTEXT://172.18.182.32:9092
    
    port=9092
    host.name=172.18.182.32
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    
    

    之后启动三台kafka

    /bin/kafka-server-start.sh -daemon kafka_2.12-2.4.0/config/server.properties
    

    如果jps发现没启动,前台启动查看报错日志,或者取kafka/logs里面查看.

    二、常用命令

    # 启动kafka
    bin/kafka-server-start.sh -daemon config/server.properties
    # 创建一个topic
    bin/kafka-topics.sh --create --bootstrap-server 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181 --replication-factor 2 --partitions 2 --topic queuing-user-create
    # topic列表
    bin/kafka-topics.sh --list --zookeeper 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181
    # topic描述,如果加--topic则可以看某个topic
    # PartitionCount:分区数量,ReplicationFactor:副本数量,Partition:分区编号,Leader:是否是负责读写的分区,如果只有一个则为None,Replicas:这个分区的副本在哪些节点,Isr:数据已同步的节点。
    bin/kafka-topics.sh --describe --zookeeper 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181
    Topic: queuing-survey-answer-status     PartitionCount: 2   ReplicationFactor: 2    Configs: 
        Topic: queuing-survey-answer-status Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1
        Topic: queuing-survey-answer-status Partition: 1    Leader: 0   Replicas: 2,0   Isr: 0
    Topic: queuing-survey-anwser    PartitionCount: 1   ReplicationFactor: 1    Configs: 
        Topic: queuing-survey-anwser    Partition: 0    Leader: none    Replicas: 2 Isr: 2
    # 开启一个调试生产者
    bin/kafka-console-producer.sh --broker-list 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181 --topic queuing-survey-anwser
    # 开启一个调试消费者,也可以9092
    bin/kafka-console-consumer.sh --bootstrap-server 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181  --topic queuing-user-create --from-beginning
    # 日志查看
    bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/queuing-survey-answer-status-1/00000000000000000000.log --print-data-log
    # 消费组查看
    bin/kafka-consumer-groups.sh --bootstrap-server 172.18.182.32:9092,172.18.182.33:9092,172.18.182.31:9092 --group UserCenterProd --describe
    # 手动设置offset
    bin/kafka-consumer-groups.sh --bootstrap-server 172.18.182.32:9092,172.18.182.33:9092,172.18.182.31:9092 --group UserCenterProd --reset-offsets --topic queuing-user-add --to-offset 20 --execute
    

    三、基础概念

    • broker:即节点
    • partition:分区,消息会根据key进入不同的分区,由不同的消费者(也可以相同,自动分配)消费。
      --replication-factor:备份,一般小于等于broker的数量。
    • ISR:leader会追踪和维护ISR中所有follower的滞后状态。如果滞后太多(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置),leader会把该replica从ISR中移除。被移除ISR的replica一直在追赶leader。如下图,leader写入数据后并不会commit,只有ISR列表中的所有folower同步之后才会commit,把滞后的follower移除ISR主要是避免写消息延迟。设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择。(转自 https://blog.csdn.net/dshf_1/article/details/82467558
    • consumer group: 同一个消费组中的不同消费者负责topic的一部分partition;如果是不同消费组,则独立计算offset,即对一个topic消费多次。
    • zookeeper:分布式协调框架,负责协调管理并保存kafka的元数据,比如哪些broker在运行,创建了哪些topic,它们有哪些分区,leader在哪。

    四、集群环境建议

    因素 考量点 建议
    操作系统 操作系统I/O模型 Linux
    磁盘 磁盘I/O性能 普通机械硬盘,不需要RAID
    磁盘容量 根据消息量和留存时间预估 建议多预留30%空间
    带宽 根据业务 如果千兆贷款,建议按照700Mbps来计算

    五、最重要的集群参数配置

    1)broker端参数

    参数 设置建议 概述
    log.dir 可以不设置 一般设置log.dirs
    log.dirs /home/kafka1,/home/kafka2 设置在不同的物理磁盘上可以提升读写性能,实现failover
    zookeeper.connect zk1:2181,zk2:2181,zk2:2181/kafka1 可以多个kafka集群共用一个zk集群
    listeners 协议://HOST_NAME:9092 外部连接要通过什么协议访问指定主机名和端口开放的kafka服务
    advertised.listeners 协议://HOST_NAME:9092 这组监听器是Broker用于对外发布的,会存在zookeeper中
    host.name/port 不要填这两个,早就不用了
    auto.create.topics.enable false 不要开启自动创建
    unclean.leader.election.enable false 不要unclean选举
    auto.leader.rebalance.enable false 关闭定时选举
    log.retention.hours 168 数据保存7天
    log.retention.bytes 根据空闲硬盘设定 防止服务器爆
    message.max.bytes 1000120 kafka最大消息大小,默认是100012

    2)topic端参数

    参数 设置建议 概述
    retention.ms 数据保存时间
    retention.bytes 预留空间
    max.message.bytes 1000120 kafka最大消息大小,默认是100012
    replica.fetch.max.bytes 1000120 根据max.message.bytes设置保证复制
    fetch.message.max.bytes 1000120 根据max.message.bytes设置保证消费

    3)JVM参数

    在启动kafka前配置java的默认堆大小

    $> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
    $> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
    $> bin/kafka-server-start.sh config/server.properties
    

    4)操作系统参数

    ulimit -n 1000000
    

    文件系统可以选择XFS
    swappiess设置为1
    落盘时间可以适当延长

    相关文章

      网友评论

          本文标题:Kafka攻略(入门篇)

          本文链接:https://www.haomeiwen.com/subject/hpaawhtx.html