1.为什么要升级
因为spark2.0的Structured Streaming增加了新特性,而这些新特性,基于kafka0.10+的,公司目前的kafka集群是0.9.0.0
image.png
2.升级后的变化
- 数据增加一个
timestamp.type
字段,CreateTime :标识producer发送数据的时间, LogAppendTime :标识broker接收的时间
- 0.10+中,默认是
CreateTime
形式,创建topic命令
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 --config message.timestamp.type=LogAppendTime
- 如果是
CreateTime
的话,producer写入数据需要指定时间戳
image.png - consumer中,ConsumerRecord中通过
timestamp
获取时间戳
image.png -
注意
因为每个消息新时间戳字段的引入,生产者在发送小包消息可能出现因为负载上升造成的吞吐量的下降。同理,现在复制过程每个消息也要多传输8个比特。如果你的集群即将达到网络容量的瓶颈,这可能造成网卡打爆并因为超载引起失败和性能问题。
3.升级步骤
image.png-
升级192.168.16.129,192.168.16.130,192.168.16.131
-
准备好安装包,操作129这台机器
image.png -
停止kafka,使用kafka自带stop脚本bin下的
image.pngkafka-server-stop.sh
,或者使用kill -s TERM $PIDS
,不需要手动切换partition的leader,因为kafka本身支持自动切换,这个配置默认是开启的
image.png -
覆盖配置config/server.properties到新的kafka的config目录下,server.properties添加以下两个配置
inter.broker.protocol.version=0.9.0.0
log.message.format.version=0.9.0.0
- 启动脚本+开启jmx+promethus监控
IP=`ifconfig eth0 | grep "inet addr" | awk '{ print $2}' | awk -F: '{print $2}'`
echo ${IP}
KAFKA_HOME=/root/apps/kafka
echo ${KAFKA_HOME}
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${IP} -javaagent:/root/apps/prometheus_kafka/jmx_prometheus_javaagent-0.6.jar=7071:/root/apps/prometheus_kafka/kafka-0-8-2.yml" \
nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties 2>&1 &
#如果不使用promethus,启动脚本为这样
IP=`ifconfig eth0 | grep "inet addr" | awk '{ print $2}' | awk -F: '{print $2}'`
echo ${IP}
KAFKA_HOME=/root/apps/kafka
echo ${KAFKA_HOME}
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${IP} \
nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties 2>&1 &
3.异常处理
- 观察是否正常,如果正常,保持这样的状态1小时,如果异常,则停止kafka进程,启动旧版的kafka进程
4.最后
- 如果没有异常,观察1小时候,将剩下的两台机器都这样操作
- 逐台修改server.properties以下两个 0.9.0.0改为0.10.2.0,并逐台重启
inter.broker.protocol.version=0.10.2.0
log.message.format.version=0.10.2.0
网友评论