美文网首页
Storm常见问题

Storm常见问题

作者: 尼小摩 | 来源:发表于2018-11-21 20:05 被阅读185次

    问题1:Storm Topology Fail过多问题

    可能的原因:

    • 用户在Bolt代码中主动调用 collector.fail()
    • 继承 BaseRichBolt而代码没有手动调用collector.ack()
    • 继承 BaseRichBolt而代码执行逻辑中某些情况下跳过了collector.ack()
    • tupe锚点连续锚,ack树过长
    • 下游某个Bolt处理能力过低,上游的数据积压在Worker及Executor的buffer中
    • topology.max.spout.pending设置过大。Spout上游taskXpending >> 任意下游taskX(1000/ExecuteLatency)
    • topology.message.timeout.secs覆盖,设置消息超时时间过短
    • Worker进程 GC 频繁 及时间过长造成执行停顿
    • 集群某台机器CPU利用率过高,争抢资源

    问题2:(数据库)连接泄露问题

    现象:

    在 zabbix 监控 storm-01 机器 文件打开数据在一周时间内从20K 上升到 40K

    原因:

    查看stormplatform平台操作日志,查看最后连接数从40K下降到20K那个点的操作日志,发现有WxSmallProgramTopology被kill联系业务,重启WxSmallProgramTopology运行一天时间观察,发现该Topology文件打开数一天时间内从600上升到3000+导出jstack线程堆栈,可见 Hikari Housekeeping Timer 线程有300多个,该线程为HikariPool连接池一个定时器线程



    也就是说该Topology在一天内创建了300多个数据库连接池,没有释放,每个连接池包含10个数据库连接,则占用了3000个文件打开数。

    确定是连接池造成的句柄泄露后,拉出WxSmallProgramTopology_1_0_1_3 jar包,反编译得到如下代码:

    1、在execute()方法中每隔5分钟执行refreshPageFilter()
    2、每次执行refreshPageFilter()方法都会 new Jdbclien
    3、Jdbclient 初始化的时候会新建一个数据库HikariDataSource,创建一个新的 HikariPool 数据库连接池

    改进方法:

    1、如果要用数据库连接池,那么要把获取创建连接池要做成单例模式。
    2、在storm Topology中尽量不要使用数据库连接池,直接使用JDBC连接,建立连接后不要释放即可,对于每个Task建立一个连接。


    问题3:KafkaSpout 拉取数据异常问题

    日志现象

    image.png

    可能原因:

     String topic = topoConfig.getString(KAFKASPOUT_TOPIC);
     String zkRoot = topoConfig.getString(KAFKASPOUT_ZK_ROOT);
     String spoutId = topoConfig.getString(KAFKASPOUT_ZK_ID);
     SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
    Zookeeper上记录kafka topic消费offset信息的znode地址重复,读取其它topology的消费进度。
    

    问题4 worker频繁重启问题(GC问题)

    现象:

    程序启动后,十几秒后所有worker不停重启

    worker日志


    nimbus日志显示worker因为worker心跳超时被supervisor主动kill 初步怀疑 worker未上报心跳。

    GC日志显示老年代一直是满的伴随着非常频繁的FULL GC:

    改进方法:

    Topology中的Bolt读取MySQL数据时,返回太多数据到resultSet中,通过设置游标防止内存爆掉

          con = this.jdbcTemplate.getDataSource().getConnection();
          ps = (PreparedStatement) con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
          ps.setFetchSize(Integer.MIN_VALUE);
          ps.setFetchDirection(ResultSet.FETCH_REVERSE);
          resultSet = ps.executeQuery();
    

    相关文章

      网友评论

          本文标题:Storm常见问题

          本文链接:https://www.haomeiwen.com/subject/nbhhqqtx.html