美文网首页
Kafka笔记

Kafka笔记

作者: 阿福德 | 来源:发表于2019-06-04 10:24 被阅读0次

Kafka安装

1、下载:http://kafka.apache.org/downloads
2、安装:tar -zxvf /tmp/kafka_2.11-2.2.0.tgz -C /usr/local/kafka
3、配置、vi kafka_2.11-2.2.0/config/server.properties
3.1、监听器列表
将 #listeners=PLAINTEXT://:9092 的注释打开即为:
listeners=PLAINTEXT://:9092
3.2、发布在zk上的监听器,供客户端使用
将 #advertised.listeners=PLAINTEXT://your.host.name:9092 的注释打开,且把 your.host.name 修改成本机的 IP 即为:
advertised.listeners=PLAINTEXT://x.x.x.x:9092
3.3、zookeeper 连接信息
将 zookeeper.connect=localhost:2181 修改到预计的zookeeper server 上,即为:
zookeeper.connect=x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181
3.4、日志目录
默认日志存放在 /tmp/kafka-logs 目录下,但 /tmp 目录为 临时文件目录,在重启时会清空该目录,建议修改成其他目录:
log.dirs=/usr/local/kafka/logs/
4、启动、nohub bin/kafka-server-start.sh config/server.properties &
5、停止、bin/kafka-server-stop.sh

参数说明

kafka生产者参数说明

1、acks
含义:在消息被认为是“已提交”之前,producer需要leader确认的produce请求的应答数, 取值范围(-1,0,1)
建议:如果要较高的持久性要求以及无数据丢失的需求,设置acks = -1。其他情况下设置acks = 1 (我们设置为1)
2、max.blocks.ms:
发送缓冲区满,发送等待的最大时间,超过这个时间,就会抛出TimeoutException
3、buffer.memory
含义:该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432合计为32M
建议:发送缓冲区大小,如果经常出现发送TimeoutException时,则考虑调大这个数值。
4、retries(注意消息顺序性)
含义:producer重试的次数设置,
建议:试时producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等
5、max.in.flight.requests.per.connection
含义:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
6、batch.size
含义:producer都是按照batch进行发送的,producer会把发往同一分区的多条消息封装进一个batch中,
建议:当batch满了后,或者linger.ms时间到了,producer才会把消息发送出去。默认值为16K,合计为16384.
7、request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间。
8、metadata.fetch.timeout.ms
含义:指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。
如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
9、timeout.ms
指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。
10、max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

kafka消费者参数说明

1、enable.auto.commit
含义:启动自动提交offset,true or false
建议:对于精确到一次的语义,最好手动提交位移
2、fetch.max.bytes
含义:单次获取数据的最大消息数。
建议:与consumer.poll(timeout)配合使用
3、fetch.min.bytes
含义:与fetch.max.bytes相对,要等到这个数时,才返回。默认1表示理解接收。
4、max.poll.records:
含义:单次获取数据的最大消息数。
5、auto.offset.reset
含义:在没有偏移量或者偏移量无效时(停了很久,offset已经被删除了),消费是从头开始,还是从最新开始(latest or earliest)
6、max.poll.interval.ms
处理逻辑的最大时间,就是或者每次要去pull的大间隔时间,如果逻辑处理超过这个时间,就会被提出consumerGroup。然后进行consumer的rebalance。
7、group.initial.rebalance.delay.ms
含义:新consumer加入后延时进行relalance。

一些命令

Kafka:
创建topic ./kafka-topics.sh --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181 --topic kafka_karl_topic --replication-factor 2 --partitions 8 --create
删除topic ./kafka-topics.sh --topic kafka_test_topic --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181 --delete
查看topic明细 ./kafka-topics.sh --topic test --describe --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181

./kafka-topics.sh --topic kafka_karl_topic --describe --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181

查看所有topic ./kafka-topics.sh --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181 --list
消费topic ./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092 --topic test --from-beginning
消息发送console: ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

查看group消费情况 ./kafka-consumer-groups.sh --group DEFAULT_GROUP_ID --describe --bootstrap-server x.x.x.x:9092

修改partition 数:
./kafka-topics.sh --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181 -alter --partitions 12 --topic kafka_karl_topic

修改副本数
./kafka-reassign-partitions.sh --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181 --reassignment-json-file addpartition.json --execute

查看进度
./kafka-reassign-partitions.sh --zookeeper x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181 --reassignment-json-file addpartition.json --verify

addpartition.json

{
    "partitions": [{
        "topic": "kafka_karl_topic",
        "partition": 0,
        "replicas": [1, 2]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 1,
        "replicas": [1, 3]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 2,
        "replicas": [2, 3]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 3,
        "replicas": [1, 2]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 4,
        "replicas": [2, 3]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 5,
        "replicas": [1, 3]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 6,
        "replicas": [1, 2]
    }, {
        "topic": "kafka_karl_topic",
        "partition": 7,
        "replicas": [1, 3]
    }],
    "version": 1
}

相关文章

  • Kafka学习笔记

    kafka笔记 0. Kafka 安装 下载 wget http://mirrors.shu.edu.cn/apa...

  • 【Kafka】Kafka入门手记

    1. 前言 本文为 Kafka 入门笔记,主要包括 Kafka 单节点部署、生产消费消息,以及新手踩坑记录。 Ka...

  • sptest

    # Spring Kafka 学习笔记 ## 1 接收消息 接收消息需要提供MessageListenerCont...

  • Kafka学习笔记

    Kafka 学习笔记 内容大部分引用自Info - Apache Kafka:下一代分布式消息系统 原文作者Abh...

  • Kafka笔记

    earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开...

  • kafka笔记

    kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...

  • Kafka笔记

    Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息...

  • Kafka笔记

    Kafka安装 1、下载:http://kafka.apache.org/downloads2、安装:tar -z...

  • Kafka笔记

    原创文章,转载请注明原文章地址,谢谢! 消息队列的两种模式 点对点模式(一对一,消费者主动拉取数据,消息收到后消息...

  • kafka笔记

    docker运行Kafka kafka操作命令 进入容器 创建topic 查看topic 删除topic 启动生产...

网友评论

      本文标题:Kafka笔记

      本文链接:https://www.haomeiwen.com/subject/vpokxctx.html