美文网首页
zookeeper的watcher

zookeeper的watcher

作者: 码仙丶 | 来源:发表于2018-11-09 23:02 被阅读0次

    利用zk的watcher功能实时监控zk节点的变化,可以利用这个功能做报警、监控,例如监控kafka的broker,otter的node等,如果有节点挂掉,立时通知

    class MonitorZk(var zkHost: String, var zkPath: String) extends Watcher {
      var zoo: ZooKeeper = new ZooKeeper(zkHost, 10000, this)
      var brokerListStr = ""
      var brokers: util.List[String] = _
      private val logger: Logger = Logger.getLogger(MonitorZk.getClass)
    
      override def process(event: WatchedEvent): Unit = {
        if (event == null) return
        //取得连接状态
        val state = event.getState
        //取得事件类型
        val eventType = event.getType
        //哪一个节点路径发生变更
        val nodePath = event.getPath
        brokers = zoo.getChildren(zkPath, true)
    
        import scala.collection.JavaConversions._
    
        brokers.foreach(x => {
          brokerListStr += s"$x "
        })
    
        if (KeeperState.SyncConnected eq state) {
          //连接成功
          // 没有任何节点,表示创建连接成功(客户端与服务器端创建连接成功后没有任何节点信息)
          if (EventType.None eq eventType) {
            logger.warn(s"成功链接zookeeper服务器,节点数量:${
              brokers.size()
            }, 节点列表:$brokerListStr")
          } else if (EventType.NodeCreated eq eventType) {
            //当服务器端创建节点的时候触发
            logger.warn(s"zookeeper服务端创建新的节点, 节点数量:${
              brokers.size()
            }, 节点列表:$brokerListStr")
          } else if (EventType.NodeDataChanged eq eventType) {
            //被监控该节点的数据发生变更的时候触发
            logger.warn(s"节点的数据更新,节点数量:${
              brokers.size()
            }, 节点列表:$brokerListStr")
          } else if (EventType.NodeChildrenChanged eq eventType) {
            // 对应本代码而言只能监控根节点的一级节点变更。如:在根节点直接创建一级节点,
            //或者删除一级节点时触发。如修改一级节点的数据,不会触发,创建二级节点时也不会触发。
            logger.warn(s"子节点发生变更,节点数量:${
              brokers.size()
            }, 节点列表:$brokerListStr")
          } else if (EventType.NodeDeleted eq eventType) {
            logger.warn(s"节点:$nodePath 被删除,节点数量:${
              brokers.size()
            }, 节点列表:$brokerListStr")
          }
        } else if (KeeperState.Disconnected eq state) logger.warn("客户端连接zookeeper服务器端失败")
        else if (KeeperState.Expired eq state) logger.warn("客户端与zookeeper服务器端会话失败")
        else if (KeeperState.AuthFailed eq state) logger.warn("权限认证失败")
    
        if (brokers.length < nodeCount.toInt) {
          logger.warn(s"报警 $title, 节点列表:$brokerListStr")
          SendMail.sendMail(title, s"节点列表:$brokerListStr", "html")
        }
        brokerListStr = ""
      }
    
    }
    
    object MonitorZk {
      def main(args: Array[String]): Unit = {
        val parseParam = new ZkParam
        val jCommander = new JCommander(parseParam, args: _*)
        if (parseParam.help) {
          jCommander.usage()
          System.exit(0)
        }
    
        val zt = new MonitorZk(parseParam.zkHost, parseParam.nodePath)
        zt.logger.warn("程序启动....")
        //这句必须先执行,才能实现监控
        zt.brokers = zt.zoo.getChildren(parseParam.nodePath, true)
    
        scala.sys.addShutdownHook({
          zt.logger.warn(s"${parseParam.title} 程序关闭")
        })
        //让主线程一直在
        Thread.sleep(Integer.MAX_VALUE)
      }
    }
    

    相关文章

      网友评论

          本文标题:zookeeper的watcher

          本文链接:https://www.haomeiwen.com/subject/lasaxqtx.html