美文网首页
事件消息问题排查

事件消息问题排查

作者: 枫叶_huazhe | 来源:发表于2018-05-04 10:25 被阅读0次

    我们在业务系统之间通过kafka进行消息的传送,有时候如果发现业务上消息没有传达,定位到消息这一块的业务问题,我们就需要对整个消息系统进行排查。排查分以下几个步骤进行:

    Kafka broker 是否正常运行

    这一步骤大多数情况下可以忽略,因为kafka服务器一般不会随意就挂掉

    检查方式(可直接按照下面命令一步一步操作):

    docker exec -it kafka sh
    cd /opt/kafka
    sh bin/kafka-console-producer --broker-list localhost:9092 --topic testAlive
    > 输入任意内容,如果不报错,说明kafka正常
    

    上述原理是,使用kafka命令行工具启动一个命令行模式的生产者进行消息的发送测试。

    检查业务系统连接的是不是当前 kafka

    检查业务服务在dc-all.yml中的配置,重点关注environment下的kafka生产者和消费者kafka host配置:

    • 如果该业务系统只发送消息,不需要监听消息,例如purchase,只需要在environment中配置生产者host,如下面所示:
    • 注意kafka_producer_host不能为大写,大写的模式在很久之前已经修改为统一的小写模式了
    purchaseService:
        container_name: purchaseService
        environment:
          ... (省略其他配置)
          - kafka_producer_host=${kafka_host_ip}:${kafka_port} (正确写法)
          - KAFKA_PRODUCER_HOST=... (错误写法)
          ... (省略其他配置)
        volumes:
          ... (省略其他配置)
    
    • 在tscompose目录下,查看.local.ini文件
    cat .local.ini
    // 查看kafka部分内容,确保 kafka_host_ip 指向的是 host_ip
    #kafka
    kafka_host_ip=${host_ip}
    kafka_port=9092
    
    • 如果,指向不正确,请修改 initEnvs/develop.ini文件,确保kafka_host_ip等于的值为host_ip。然后回到根目录,执行下面脚本,刷新环境配置,然后重启业务系统。
    sh prepare .sh develop
    
    • 需要订阅消息的系统配置如下:
    stockService:
        container_name: stockService
        environment:
          ... (省略其他配置)
          - kafka_consumer_host=${kafka_host_ip}:${kafka_port} 
          ... (省略其他配置)
        volumes:
          ... (省略其他配置)
    
    • 业务系统同时需要订阅消息和发送消息时,配置如下
      生产者kafka_host和消费者kafka_host 进行配置
    goodsService:
        container_name: goodsService
        environment:
          ... (省略其他配置)
          - kafka_producer_host=${kafka_host_ip}:${kafka_port}
          - kafka_consumer_host=${kafka_host_ip}:${kafka_port} 
          ... (省略其他配置)
        volumes:
          ... (省略其他配置)
    

    检查当前事件消息的生产和消费

    如果一个消息流程出现故障,我们需要有条不絮的进行消息的逐步排查,从生产者是否产生消息,到消费者能否接收到消息,分别进行排查

    检查生产者是否将消息发送出来

    1. 启动一个命令行消费者,订阅当前发送出来的事件的主题
    docker exec -it kafka sh
    cd /opt/kafka
    sh bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic your_topic
    

    上述your_topic 换成你自己正在测试的 topic

    1. 开始业务测试,如果命令行消费者 能够接收到消息(不管是否乱码,乱码是正常情况),如下图:


      蓝线即为收到的消息.png
    2. 如果上述命令行能够收到消息,说明生产者端,没有问题,请跳往消费者端进行问题排查。如果这里没有收到消息内容:

    情形一: 生产消息被另一个实例发送出去

    目前,eventbus生产者发送消息逻辑为:

    • 业务系统触发事件,Event.fireEvent()
    • 当前事件对象会被thrift序列化为byte字节数组,并保存到数据库中,当前业务系统dp_common_event这张表里
    • eventbus 会有另外一个定时轮询线程(默认1s,可配置为100ms)定时轮询数据库dp_common_event这张表里的数据,拿到数据后,将当前消息的字节码数据发送到kafka上面,然后删除表里这条消息记录。

    这里可能会出现如下问题:

    • 如果,开发人员连接的数据库是sandbox上当前正在测试的业务连接的数据库
    • 开发人员在本地启了相同的服务,所以他的服务的定时轮询线程去轮询的表和sandbox上业务系统轮询的将会是同一张表
    • 这时,dp_common_event里的数据有可能会被开发人员本地的轮询线程查询到并发送出去,
    • 本地连接又可能不是测试环境中的kafka,他的消息被发往了本地系统连接的kafka上。
    • 此时,sandbox上的业务系统将获取不到数据库这条消息,所以消息没有被发送出去。

    情形二: 消息堆积(可能行非常小)

    检查当前业务系统所在db的dp_common_event这张表(例如采购,purcahse_db 里的dp_common_event,看当前表中是否有消息内容。

    总结:

    生产者端如果消息发送不成功,基本上是上述两种情形,尤其第一种情形可能性最大。


    接收消息端(消费者) 问题排查

    如果生产者能够将消息发送出去(即命令行能够接收到生产者发送的消息),而此时整体消息流程还不通,那说明问题处在消费者这一边。

    关于kafka的前置知识

    消费者组 (Consumer Group)

    consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即groupId。

    组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

    consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程.
    group.id是一个字符串,唯一标识一个consumer group
    consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer


    问题:

    • 了解了kafka的消费者组,我们知道,在同一个消费组里,一个分区的消息内容只会发给消费者组中的一个消费者实例。

    • 目前我们的测试服务的kafka所有topic都只分了一个区(目的是减少开发的混乱行,以后正式上线了会进行多分区)

    • 如果本地或者其他sandbox有系统同时订阅了某个topic的消息,而且它们的groupId都相同,这样就会导致消息可能被另外一个消费者实例给消费了。当前测试的服务器上的消息没有收到消息。


    问题验证:

    • 进入kafka容器,查询当前kafka所有的消费者组
    docker exec -it kafka sh
    cd /opt/kafka
    //通过此命令查询当前kafka所有消费者组
    sh bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --list
    
    image.png
    • 查询你的业务消费者的消费组下面的实例列表
    bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --describe --group your_group_name
    
    //eg,例子
    bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --describe --group goodsEventsConsumer
    
    
    image.png

    如上图所示,表示当前消费者组只有一个消费者实例,消费的分区是 0,这种情况,该分区的所有消息都会发送到此消费者上。

    错误情况

    A应用:

    05-04 11:58:27 292 Thread-9 INFO - [Consumer clientId=consumer-1, groupId=SUPPLIER_0.0.1_EVENT] Successfully joined group with generation 8
    05-04 11:58:27 295 Thread-9 INFO - [Consumer clientId=consumer-1, groupId=SUPPLIER_0.0.1_EVENT] Setting newly assigned partitions [Binlog-0]
    

    B应用:

    05-04 11:58:28 798 Thread-3 INFO - [Consumer clientId=consumer-1, groupId=SUPPLIER_0.0.1_EVENT] Successfully joined group with generation 8
    05-04 11:58:28 801 Thread-3 INFO - [Consumer clientId=consumer-1, groupId=SUPPLIER_0.0.1_EVENT] Setting newly assigned partitions []
    

    注意 Setting newly assigned partitions [Binlog-0] 字样,A消费者实例成功分配了分区Binlog-0,此时B应用实例无分区分配,所以为空,那么之后B应用将不会接收到消息。除非等下一次重新分配(ReBlance)

    解决方案:

    如果出现消费者组存在多个实例的情况,大部分情况是本地开发人员的业务实例注册上去了,或者其他sandbox环境的业务服务注册了当前kafka。
    为了避免这种情况,尽量不要在本地连接线上的kafka,每一个sandbox环境要保证各个服务连接都是当前sandbox环境下的kafka

    相关文章

      网友评论

          本文标题:事件消息问题排查

          本文链接:https://www.haomeiwen.com/subject/ifvrrftx.html