美文网首页
kafka kafka的集群

kafka kafka的集群

作者: dylan丶QAQ | 来源:发表于2020-10-17 16:50 被阅读0次

    起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,在运用过之后,也去学一学kafka,了解一下他们之间的差别,吃一吃架构方面的相关内容,提升自己。


    1. 搭建kafka的集群

    kafka集群搭建的前提:jdk,zookeeper

    使用springboot通过外网访问kafka需要开通一个监听如下,如果是集群的话需要给每台机器设置自己的IP

    advertised.listeners=PLAINTEXT://39.99.195.49:9092

    Topic: topicName PartitionCount: 5 ReplicationFactor: 3 Configs: Topic: topicName Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: topicName Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: topicName Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: topicName Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: topicName Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3

    2. Springboot整合Kafka进行消息收发

    消息接收使用POM

    <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     </dependency>
    

    配置consumer的xml

    spring:
     kafka:
     bootstrap-servers: 39.99.195.49:9092,39.99.196.208:9092,39.99.196.190:9092
     consumer:
     enable-auto-commit: false # 不自动签收
     auto-offset-reset: earliest
     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     listener:
     ack-mode: manual
     concurrency: 5
    # earliest : kafka出现错误重启之后,会找到未消费的offset继续消费
    # latest : kafka出现错误中,如果还有数据往topic里写,只会从最新的offset开始消费
    

    java实现

    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    ​
    @Component
    public class KafkaConsumerService {
    ​
     @KafkaListener(groupId = "group01",topics = "topicName")
     public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
     System.out.println("*****获取的消息: "+record.value());
     acknowledgment.acknowledge();
     }
    }
    
    

    3. kafka消费进度分析以及消费应答

    # 通过命令看消费进度的
    ./kafka-consumer-groups.sh --bootstrap-server 192.168.0.177:9092,192.168.0.178:9092,192.168.0.179:9092 --describe --group group01
    # CURRENT-OFFSET :当前消费的offset进度
    # LOG-END-OFFSET :数据文件里一共有多少位移数据
    # LAG :这是就是还未消费的量
    CURRENT-OFFSET + LAG = LOG-END-OFFSET
    

    4. 程序中指定分区和key的读写方式实现

    5. 在程序中consumer如何重新消费

    重新消费在命令行的方式

    --from-beginning
    

    重新需要明确两个点

    • 每次消费完毕都会记录consumer的offset

    • 如果要从代码里从头消费就需要配置

      • auto-offset-reset: earliest

      • 更换消费者组或者将已消费的offset删除

    6. Kafka监控服务平台Eagle的使用

    kafka-eagle平台监控系统

    # 下载地址
    ---
    # Eagle是通过JMX来拉取kafka信息
    # JMX:是Java Management Extensions(Java管理扩展)的缩写,
    # 首先要对Kafka开启JMX
    ​
    # 1.开启kafka的JMX
    vi bin/kafka-server-start.sh
    # 修改heap内容开启JMX
    # 将这一行进行修改:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-server -Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
    # 如果是集群需要所有机器都将这个配置修改好
    ​
    # 2.配置eagle配置到环境变量中
    vi /etc/profile
    export KE_HOME=/usr/local/kafka-eagle
    export PATH=$KE_HOME/bin:$JAVA_HOME/bin:$PATH
    cd /etc
    source profile
    ​
    # 3.给执行文件授权
    cd /usr/local/kafka-eagle/bin
    chmod 777 ke.sh
    ​
    # 4.eagle系统的配置,eagel可以监控多套kafka集群
    cd /usr/local/kafka-eagle/conf
    vi system-config.properties
    # 设置对应的参数
    # 可以配置多个集群,这里只配置一套
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=127.0.0.1:2181,127.0.0.1:2181,127.0.0.1:2181
    # 显示的broker数量
    cluster1.kafka.eagle.broker.size=20
    # zookeeper客户端线程数
    kafka.zk.limit.size=25
    # eagle端口
    kafka.eagle.webui.port=8048
    # 消费的offset保存位置
    cluster1.kafka.eagle.offset.storage=kafka
    # 是否开启图表并保持30天内容
    kafka.eagle.metrics.charts=true
    kafka.eagle.metrics.retain=30
    # KSQL的查询显示条数和是否自动fix error
    kafka.eagle.sql.topic.records.max=5000
    kafka.eagle.sql.fix.error=false
    # 从界面删除topic的token密码
    kafka.eagle.topic.token=keadmin
    # mysql保存eagle的一些元数据驱动
    kafka.eagle.driver=com.mysql.jdbc.Driver
    kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    kafka.eagle.username=root
    kafka.eagle.password=123456
    

    注意数据库如果不在一个设备上需要提前授权

    use mysql;
    select host,user from user;
    grant all privileges on *.* to gavin@'192.168.%' identified by 'cpcoredb';
    flush privileges;
    

    启动/关闭/查看Kafka-eagle状态

    bin/ke.sh start
    bin/ke.sh stop
    bin/ke.sh stats
    # 访问地址
    http://39.100.39.20:8048/ke
    

    不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!

    相关文章

      网友评论

          本文标题:kafka kafka的集群

          本文链接:https://www.haomeiwen.com/subject/xsrwpktx.html