起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,在运用过之后,也去学一学kafka,了解一下他们之间的差别,吃一吃架构方面的相关内容,提升自己。
1. 搭建kafka的集群
kafka集群搭建的前提:jdk,zookeeper
使用springboot通过外网访问kafka需要开通一个监听如下,如果是集群的话需要给每台机器设置自己的IP
advertised.listeners=PLAINTEXT://39.99.195.49:9092
Topic: topicName PartitionCount: 5 ReplicationFactor: 3 Configs: Topic: topicName Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: topicName Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: topicName Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: topicName Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: topicName Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
2. Springboot整合Kafka进行消息收发
消息接收使用POM
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置consumer的xml
spring:
kafka:
bootstrap-servers: 39.99.195.49:9092,39.99.196.208:9092,39.99.196.190:9092
consumer:
enable-auto-commit: false # 不自动签收
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
concurrency: 5
# earliest : kafka出现错误重启之后,会找到未消费的offset继续消费
# latest : kafka出现错误中,如果还有数据往topic里写,只会从最新的offset开始消费
java实现
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerService {
@KafkaListener(groupId = "group01",topics = "topicName")
public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
System.out.println("*****获取的消息: "+record.value());
acknowledgment.acknowledge();
}
}
3. kafka消费进度分析以及消费应答
# 通过命令看消费进度的
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.177:9092,192.168.0.178:9092,192.168.0.179:9092 --describe --group group01
# CURRENT-OFFSET :当前消费的offset进度
# LOG-END-OFFSET :数据文件里一共有多少位移数据
# LAG :这是就是还未消费的量
CURRENT-OFFSET + LAG = LOG-END-OFFSET
4. 程序中指定分区和key的读写方式实现
5. 在程序中consumer如何重新消费
重新消费在命令行的方式
--from-beginning
重新需要明确两个点
-
每次消费完毕都会记录consumer的offset
-
如果要从代码里从头消费就需要配置
-
auto-offset-reset: earliest
-
更换消费者组或者将已消费的offset删除
-
6. Kafka监控服务平台Eagle的使用
kafka-eagle平台监控系统
# 下载地址
---
# Eagle是通过JMX来拉取kafka信息
# JMX:是Java Management Extensions(Java管理扩展)的缩写,
# 首先要对Kafka开启JMX
# 1.开启kafka的JMX
vi bin/kafka-server-start.sh
# 修改heap内容开启JMX
# 将这一行进行修改:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
# 如果是集群需要所有机器都将这个配置修改好
# 2.配置eagle配置到环境变量中
vi /etc/profile
export KE_HOME=/usr/local/kafka-eagle
export PATH=$KE_HOME/bin:$JAVA_HOME/bin:$PATH
cd /etc
source profile
# 3.给执行文件授权
cd /usr/local/kafka-eagle/bin
chmod 777 ke.sh
# 4.eagle系统的配置,eagel可以监控多套kafka集群
cd /usr/local/kafka-eagle/conf
vi system-config.properties
# 设置对应的参数
# 可以配置多个集群,这里只配置一套
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=127.0.0.1:2181,127.0.0.1:2181,127.0.0.1:2181
# 显示的broker数量
cluster1.kafka.eagle.broker.size=20
# zookeeper客户端线程数
kafka.zk.limit.size=25
# eagle端口
kafka.eagle.webui.port=8048
# 消费的offset保存位置
cluster1.kafka.eagle.offset.storage=kafka
# 是否开启图表并保持30天内容
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30
# KSQL的查询显示条数和是否自动fix error
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false
# 从界面删除topic的token密码
kafka.eagle.topic.token=keadmin
# mysql保存eagle的一些元数据驱动
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
注意数据库如果不在一个设备上需要提前授权
use mysql;
select host,user from user;
grant all privileges on *.* to gavin@'192.168.%' identified by 'cpcoredb';
flush privileges;
启动/关闭/查看Kafka-eagle状态
bin/ke.sh start
bin/ke.sh stop
bin/ke.sh stats
# 访问地址
http://39.100.39.20:8048/ke
不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!
网友评论