美文网首页
Kafka组件部署文档

Kafka组件部署文档

作者: 明训 | 来源:发表于2021-03-15 00:37 被阅读0次

    获取安装包

    打开浏览器输入网址http://kafka.apache.org/downloads选择版本 kafka_2.10-0.10.1.0.tgz 下载即可。

    [root@rtp-fk-51 ~]# wget https://archive.apache.org/dist/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
    

    kafka_2.10-0.10.1.0.tgz说明

    Scala 版本为 2.10

    kafka的版本为0.10.1.0

    kafka_A-B.tgz即A的版本是Scala的版本,B指的是kafka的版本

    解压安装包

    [root@rtp-fk-51 ~]# cd /iflytek/server/
    [root@rtp-fk-51 server]# ls
    kafka_2.10-0.10.1.0.tgz
    [root@rtp-fk-51 server]# tar -zxvf kafka_2.10-0.10.1.0.tgz
    [root@rtp-fk-51 server]# ls
    kafka_2.10-0.10.1.0
    kafka_2.10-0.10.1.0.tgz
    [root@rtp-fk-51 server]# rm -rf kafka_2.10-0.10.1.0.tgz 
    [root@hotax-139 server]# mv kafka_2.10-0.10.1.0 kafka
    

    创建数据目录

    [root@hotax-139 ~]# mkdir -p  /iflytek/data/kafka-logs
    

    配置调整

    [root@rtp-fk-51 config]# cd /iflytek/server/kafka/config
    [root@rtp-fk-51 config]# ls
    connect-console-sink.properties    connect-file-source.properties  log4j.properties        zookeeper.properties
    connect-console-source.properties  connect-log4j.properties        producer.properties
    connect-distributed.properties     connect-standalone.properties   server.properties
    connect-file-sink.properties       consumer.properties             tools-log4j.properties
    [root@rtp-fk-51 config]# vim server.properties
    

    编辑配置项

    delete.topic.enable=true
    listeners=PLAINTEXT://172.31.234.139:9092
    log.dirs=/iflytek/data/kafka-logs
    num.partitions=4
    zookeeper.connect=172.31.234.139:2181
    

    注:如需要把kafka所有节点放在一个节点下的话就配置zookeeper.connect=IP:端口/kafka

    zookeeper.connect=172.31.234.139:2181/kafka
    

    开启JMX

    修改kafka-server-start.sh 添加一行即可

    [root@rtp-fk-51 bin]# cd /iflytek/server/kafka_2.10-0.10.1.0/bin/
    [root@rtp-fk-51 bin]# ls
    connect-distributed.sh            kafka-preferred-replica-election.sh  kafka-topics.sh
    connect-standalone.sh             kafka-producer-perf-test.sh          kafka-verifiable-consumer.sh
    kafka-acls.sh                     kafka-reassign-partitions.sh         kafka-verifiable-producer.sh
    kafka-configs.sh                  kafka-replay-log-producer.sh         windows
    kafka-console-consumer.sh         kafka-replica-verification.sh        zookeeper-security-migration.sh
    kafka-console-producer.sh         kafka-run-class.sh                   zookeeper-server-start.sh
    kafka-consumer-groups.sh          kafka-server-start.sh                zookeeper-server-stop.sh
    kafka-consumer-offset-checker.sh  kafka-server-stop.sh                 zookeeper-shell.sh
    kafka-consumer-perf-test.sh       kafka-simple-consumer-shell.sh
    kafka-mirror-maker.sh             kafka-streams-application-reset.sh
    [root@rtp-fk-51 bin]# vim kafka-server-start.sh 
    
    

    原始配置

     28 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     29     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
     30 fi
    
    

    调整配置

     28 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     29     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
     30     export JMX_PORT="8999"
     31 fi
    

    适当加大kakfka的堆内存-Xmx4G -Xms4G

    启动kakfa

    前台启动kafka

    [root@hotax-139 ~]# cd /iflytek/server/kafka/
    [root@hotax-139 kafka]# ls
    bin  config  libs  LICENSE  NOTICE  site-docs
    [root@hotax-139 kafka]# ./bin/kafka-server-start.sh config/server.properties
    

    后台启动kafka

    [root@hotax-139 ~]# cd /iflytek/server/kafka/
    [root@hotax-139 kafka]# ls
    bin  config  libs  LICENSE  logs  NOTICE  site-docs
    [root@hotax-139 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties 
    [root@hotax-139 kafka]#
    

    验证kafka是否启动

    [root@hotax-139 kafka]# jps -l |grep kafka
    27715 kafka.Kafka
    [root@hotax-139 kafka]#
    

    停止kafka

    [root@hotax-139 kafka]# cd /iflytek/server/kafka/
    [root@hotax-139 kafka]# ./bin/kafka-server-stop.sh 
    [root@hotax-139 kafka]# jps -lm |grep kafka
    [root@hotax-139 kafka]#  
    

    集群说明

    1. 当集群部署时,需要指定配置文件server.properties中的broker.id的值为1或者依次递增的值。

    问题解决

    消息重复消费问题

    解决方案

    1.进到kafka配置文件目录

    cd /iflytek/server/kafka/config 
    

    2.编辑配置文件
    在配置文件server.properties中追加配置项

    offsets.retention.minutes=129600
    

    1天=60241=1440
    7天=60247=10080
    30天=602430=43200
    60天=602460=86400
    90天=602490=129600

    消息发送失败问题

    问题描述

    03-28 15:11:51.998 [ERROR] [pool-3-thread-3] skynet.boot.stream.kafka.MqProducer4KFK[MqProducer4KFK.java:146] {"tag":"[send result error]:org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.;","context":"VE1Y5LVKMRE7EN41"}
    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at skynet.boot.stream.kafka.MqProducer4KFK.doSend(MqProducer4KFK.java:135)
        at skynet.boot.mq.core.impl.BaseMqProducer.send(BaseMqProducer.java:132)
        at skynet.boot.mq.core.impl.BaseMqProducer.send(BaseMqProducer.java:73)
        at com.iflytek.swwl.clustertask.action.GeneratorWorkerHandler.onProcess(GeneratorWorkerHandler.java:181)
        at com.iflytek.swwl.clustertask.action.GeneratorWorkerHandler.onProcess(GeneratorWorkerHandler.java:1)
        at skynet.boot.mq.MqSvcHandler.process(MqSvcHandler.java:113)
        at skynet.boot.mq.core.impl.MsgProcesserImpl.process(MsgProcesserImpl.java:130)
        at skynet.boot.mq.core.impl.BaseMqConsumer$1.onMessage(BaseMqConsumer.java:96)
        at skynet.boot.stream.kafka.MyConsumer.run(MqConsumer4KFK.java:155)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
    
    

    解决方案

    1.进到kafka配置文件目录

    cd /iflytek/server/kafka/config 
    

    2.编辑配置文件
    在配置文件server.properties中追加配置项

    message.max.bytes=10485760
    replica.fetch.max.bytes=11534336
    

    其中replica.fetch.max.bytes需要大于message.max.bytes

    1M=102410241=1048576bytes
    5M=102410245=5242880bytes
    6M=102410246=6291456bytes
    10M=1024102410=10485760bytes
    10M=1024102411=11534336bytes

    相关文章

      网友评论

          本文标题:Kafka组件部署文档

          本文链接:https://www.haomeiwen.com/subject/izjhcltx.html