获取安装包
打开浏览器输入网址http://kafka.apache.org/downloads选择版本 kafka_2.10-0.10.1.0.tgz 下载即可。
[root@rtp-fk-51 ~]# wget https://archive.apache.org/dist/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
kafka_2.10-0.10.1.0.tgz说明
Scala 版本为
2.10
kafka的版本为
0.10.1.0
kafka_A-B.tgz即A的版本是Scala的版本,B指的是kafka的版本
解压安装包
[root@rtp-fk-51 ~]# cd /iflytek/server/
[root@rtp-fk-51 server]# ls
kafka_2.10-0.10.1.0.tgz
[root@rtp-fk-51 server]# tar -zxvf kafka_2.10-0.10.1.0.tgz
[root@rtp-fk-51 server]# ls
kafka_2.10-0.10.1.0
kafka_2.10-0.10.1.0.tgz
[root@rtp-fk-51 server]# rm -rf kafka_2.10-0.10.1.0.tgz
[root@hotax-139 server]# mv kafka_2.10-0.10.1.0 kafka
创建数据目录
[root@hotax-139 ~]# mkdir -p /iflytek/data/kafka-logs
配置调整
[root@rtp-fk-51 config]# cd /iflytek/server/kafka/config
[root@rtp-fk-51 config]# ls
connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties
connect-console-source.properties connect-log4j.properties producer.properties
connect-distributed.properties connect-standalone.properties server.properties
connect-file-sink.properties consumer.properties tools-log4j.properties
[root@rtp-fk-51 config]# vim server.properties
编辑配置项
delete.topic.enable=true
listeners=PLAINTEXT://172.31.234.139:9092
log.dirs=/iflytek/data/kafka-logs
num.partitions=4
zookeeper.connect=172.31.234.139:2181
注:如需要把kafka所有节点放在一个节点下的话就配置zookeeper.connect=IP:端口/kafka
zookeeper.connect=172.31.234.139:2181/kafka
开启JMX
修改kafka-server-start.sh 添加一行即可
[root@rtp-fk-51 bin]# cd /iflytek/server/kafka_2.10-0.10.1.0/bin/
[root@rtp-fk-51 bin]# ls
connect-distributed.sh kafka-preferred-replica-election.sh kafka-topics.sh
connect-standalone.sh kafka-producer-perf-test.sh kafka-verifiable-consumer.sh
kafka-acls.sh kafka-reassign-partitions.sh kafka-verifiable-producer.sh
kafka-configs.sh kafka-replay-log-producer.sh windows
kafka-console-consumer.sh kafka-replica-verification.sh zookeeper-security-migration.sh
kafka-console-producer.sh kafka-run-class.sh zookeeper-server-start.sh
kafka-consumer-groups.sh kafka-server-start.sh zookeeper-server-stop.sh
kafka-consumer-offset-checker.sh kafka-server-stop.sh zookeeper-shell.sh
kafka-consumer-perf-test.sh kafka-simple-consumer-shell.sh
kafka-mirror-maker.sh kafka-streams-application-reset.sh
[root@rtp-fk-51 bin]# vim kafka-server-start.sh
原始配置
28 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
29 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
30 fi
调整配置
28 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
29 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
30 export JMX_PORT="8999"
31 fi
适当加大kakfka的堆内存-Xmx4G -Xms4G
启动kakfa
前台启动kafka
[root@hotax-139 ~]# cd /iflytek/server/kafka/
[root@hotax-139 kafka]# ls
bin config libs LICENSE NOTICE site-docs
[root@hotax-139 kafka]# ./bin/kafka-server-start.sh config/server.properties
后台启动kafka
[root@hotax-139 ~]# cd /iflytek/server/kafka/
[root@hotax-139 kafka]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@hotax-139 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@hotax-139 kafka]#
验证kafka是否启动
[root@hotax-139 kafka]# jps -l |grep kafka
27715 kafka.Kafka
[root@hotax-139 kafka]#
停止kafka
[root@hotax-139 kafka]# cd /iflytek/server/kafka/
[root@hotax-139 kafka]# ./bin/kafka-server-stop.sh
[root@hotax-139 kafka]# jps -lm |grep kafka
[root@hotax-139 kafka]#
集群说明
- 当集群部署时,需要指定配置文件server.properties中的broker.id的值为1或者依次递增的值。
问题解决
消息重复消费问题
解决方案
1.进到kafka配置文件目录
cd /iflytek/server/kafka/config
2.编辑配置文件
在配置文件server.properties中追加配置项
offsets.retention.minutes=129600
1天=60241=1440
7天=60247=10080
30天=602430=43200
60天=602460=86400
90天=602490=129600
消息发送失败问题
问题描述
03-28 15:11:51.998 [ERROR] [pool-3-thread-3] skynet.boot.stream.kafka.MqProducer4KFK[MqProducer4KFK.java:146] {"tag":"[send result error]:org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.;","context":"VE1Y5LVKMRE7EN41"}
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at skynet.boot.stream.kafka.MqProducer4KFK.doSend(MqProducer4KFK.java:135)
at skynet.boot.mq.core.impl.BaseMqProducer.send(BaseMqProducer.java:132)
at skynet.boot.mq.core.impl.BaseMqProducer.send(BaseMqProducer.java:73)
at com.iflytek.swwl.clustertask.action.GeneratorWorkerHandler.onProcess(GeneratorWorkerHandler.java:181)
at com.iflytek.swwl.clustertask.action.GeneratorWorkerHandler.onProcess(GeneratorWorkerHandler.java:1)
at skynet.boot.mq.MqSvcHandler.process(MqSvcHandler.java:113)
at skynet.boot.mq.core.impl.MsgProcesserImpl.process(MsgProcesserImpl.java:130)
at skynet.boot.mq.core.impl.BaseMqConsumer$1.onMessage(BaseMqConsumer.java:96)
at skynet.boot.stream.kafka.MyConsumer.run(MqConsumer4KFK.java:155)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
解决方案
1.进到kafka配置文件目录
cd /iflytek/server/kafka/config
2.编辑配置文件
在配置文件server.properties中追加配置项
message.max.bytes=10485760
replica.fetch.max.bytes=11534336
其中replica.fetch.max.bytes需要大于message.max.bytes
1M=102410241=1048576bytes
5M=102410245=5242880bytes
6M=102410246=6291456bytes
10M=1024102410=10485760bytes
10M=1024102411=11534336bytes
网友评论