美文网首页一步一步学习Ambari
AmbariAgent-启动、Controller主线逻辑分析

AmbariAgent-启动、Controller主线逻辑分析

作者: 分裂四人组 | 来源:发表于2017-10-20 18:01 被阅读71次

    AmbariAgent进程是由AmbariAgent.py脚本启动的,AmbariAgent.py脚本又会启动main.py脚本,开始真正的处理逻辑。

    def main():
      global status
    
      if (os.environ.has_key("PYTHON")):
        PYTHON = os.environ["PYTHON"]
      else:
        print("Key 'PYTHON' is not defined in environment variables")
        sys.exit(1)
    
      args = list(sys.argv)
      del args[0]
    
      mergedArgs = [PYTHON, AGENT_SCRIPT] + args
    
      # Become a parent for all subprocesses
      os.setpgrp()
    
      try:
        while status == AGENT_AUTO_RESTART_EXIT_CODE:
          # main.py进程,这样有助于重启main.py操作
          mainProcess = subprocess.Popen(mergedArgs)
          mainProcess.communicate()
          status = mainProcess.returncode
          if os.path.isfile(AGENT_PID_FILE) and status == AGENT_AUTO_RESTART_EXIT_CODE:
            os.remove(AGENT_PID_FILE)
      finally:
        os.killpg(0, signal.SIGKILL)
    

    在agent机器上会看到两个进程:

    root     28527  0.0  0.0  70368 20644 pts/1    S    Oct10   0:00 /usr/bin/python /usr/lib/python2.6/site-packages/ambari_agent/AmbariAgent.py start
    root     28533  1.3  0.1 1550348 33132 pts/1   Sl   Oct10 137:15 /usr/bin/python /usr/lib/python2.6/site-packages/ambari_agent/main.py start
    

    main.py 程序启动入口

    main.py中启动Controller控制器,Controller是Agent的核心处理类,负责向Server注册、同Server保持心跳并收集Server命令等功能:

    def run_threads(server_hostname, heartbeat_stop_callback):
      # Launch Controller communication
      
      controller = Controller(config, server_hostname, heartbeat_stop_callback)
      controller.start()
      time.sleep(2) # in order to get controller.statusCommandsExecutor initialized
      while controller.is_alive():
        time.sleep(0.1)
    
        need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
        if need_relaunch:
          controller.get_status_commands_executor().relaunch(reason)
    
      controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)
    

    Controller控制器

    Controller线程的执行逻辑,agent同server之间是通过pull的模式进行的,agent定期向server发送http请求保持心跳,并通过获取、解析server返回的结果,处理命令。

    def run(self):
        try:
          # 创建ActiveQueue
          self.actionQueue = ActionQueue(self.config, controller=self)
          # 如果设置了多进程执行命令的方式,则启动多进程,否则单进程内执行
          if self.config.get_multiprocess_status_commands_executor_enabled():
            self.statusCommandsExecutor = MultiProcessStatusCommandsExecutor(self.config, self.actionQueue)
          else:
            self.statusCommandsExecutor = SingleProcessStatusCommandsExecutor(self.config, self.actionQueue)
          # 任务退出协助器
          ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False)
          # 启动ActiveQueue
          self.actionQueue.start()
          # 注册器
          self.register = Register(self.config)
          # 心跳器
          self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
     
          opener = urllib2.build_opener()
          urllib2.install_opener(opener)
          
          # 该线程持续运行
          while True:
            self.repeatRegistration = False
            # 向Server注册并执行心跳
            self.registerAndHeartbeat()
            if not self.repeatRegistration:
              logger.info("Finished heartbeating and registering cycle")
              break
        except:
          logger.exception("Controller thread failed with exception:")
          raise
    
        logger.info("Controller thread has successfully finished")
    

    Controller每次同server交互的方法,registerAndHeartbeat方法:

    def registerAndHeartbeat(self):
        # 首次启动或者ambari-server重启的时候,会触发agent向server的注册逻辑
        registerResponse = self.registerWithServer()
    
        if "response" in registerResponse:
          message = registerResponse["response"]
          logger.info("Registration response from %s was %s", self.serverHostname, message)
    
          if self.isRegistered:
            # Clearing command queue to stop executing "stale" commands
            # after registration
            logger.info('Resetting ActionQueue...')
            self.actionQueue.reset()
    
            # Process callbacks
            for callback in self.registration_listeners:
              callback()
            
            # sleep 心跳间隔
            time.sleep(self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC)
            # 核心处理函数,开始同Server发起心跳
            self.heartbeatWithServer()
          else:
            logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname))
    
    

    registerWithServer()方法负责agent向server注册逻辑,如果已经注册了会略过:

    #
    def registerWithServer(self):
        """
        :return: returning from current method without setting self.isRegistered
        to True will lead to agent termination.
        """
        LiveStatus.SERVICES = []
        LiveStatus.CLIENT_COMPONENTS = []
        LiveStatus.COMPONENTS = []
        ret = {}
        
        # 如果当前agent没有被注册,则发送注册请求
        while not self.isRegistered:
          try:
            data = json.dumps(self.register.build(self.version))
            prettyData = pprint.pformat(data)
    
            try:
              server_ip = socket.gethostbyname(self.hostname)
              logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData)
            except socket.error:
              logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)",
                          self.hostname, prettyData)
            
            # 向Server发起注册请求
            ret = self.sendRequest(self.registerUrl, data)
            prettyData = pprint.pformat(ret)
            logger.debug("Registration response is %s", prettyData)
    
            # exitstatus is a code of error which was raised on server side.
            # exitstatus = 0 (OK - Default)
            # exitstatus = 1 (Registration failed because different version of agent and server)
            exitstatus = 0
            if 'exitstatus' in ret.keys():
              exitstatus = int(ret['exitstatus'])
              
            # 如果Server返回状态为1,则注册失败,有可能是版本不一致、网络不通等导致。
            if exitstatus == 1:
              # log - message, which will be printed to agents log
              if 'log' in ret.keys():
                log = ret['log']
                logger.error(log)
              self.isRegistered = False
              self.repeatRegistration = False
              return ret
    
            self.responseId = int(ret['responseId'])
            logger.info("Registration Successful (response id = %s)", self.responseId)
    
            self.isRegistered = True
            
            # 如果agent注册Server注册成功,则首先更新各种cache,包括集群配置、恢复管理、Agent配置等(此处就是为什么cluster重启完毕后,配置会自动加载至agent的逻辑!!!)
            # always update cached cluster configurations on registration
            # must be prior to any other operation
            self.cluster_configuration.update_configurations_from_heartbeat(ret)
            self.recovery_manager.update_configuration_from_registration(ret)
            self.config.update_configuration_from_registration(ret)
            logger.debug("Updated config:" + str(self.config))
            
            # 变更statusCommandExecutor中的状态
            # Start StatusCommandExecutor child process or restart it if already running
            # in order to receive up to date agent config.
            self.statusCommandsExecutor.relaunch("REGISTER_WITH_SERVER")
            
            # 如果这次心跳中包含命令
            if 'statusCommands' in ret.keys():
              logger.debug("Got status commands on registration.")
              # 将该命令加至status队列中
              self.addToStatusQueue(ret['statusCommands'])
            else:
              self.hasMappedComponents = False
    
            # always update alert definitions on registration
            # 更新alert中
            self.alert_scheduler_handler.update_definitions(ret)
          except ssl.SSLError:
            self.repeatRegistration = False
            self.isRegistered = False
            return
          except Exception, ex:
            # try a reconnect only after a certain amount of random time
            delay = randint(0, self.max_reconnect_retry_delay)
            logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
            logger.error("Error:" + str(ex))
            logger.warn(""" Sleeping for {0} seconds and then trying again """.format(delay,))
            time.sleep(delay)
    
        return ret
    

    heartbeatWithServer()是agent同server保持心跳的逻辑,为controller的主要执行逻辑:

    def heartbeatWithServer(self):
        self.DEBUG_HEARTBEAT_RETRIES = 0
        self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
        retry = False
        certVerifFailed = False
        state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
    
        # last time when state was successfully sent to server
        last_state_timestamp = 0.0
        
        # 此处是保障日志输出不要太多
        # in order to be able to check from logs that heartbeats processing
        # still running we log a message. However to avoid generating too
        # much log when the heartbeat runs at a higher rate (e.g. 1 second intervals)
        # we log the message at the same interval as 'state interval'
        heartbeat_running_msg_timestamp = 0.0
    
        # Prevent excessive logging by logging only at specific intervals
        getrecoverycommands_timestamp = 0.0
        getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
    
        heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
    
        while not self.DEBUG_STOP_HEARTBEATING:
          current_time = time.time()
          logging_level = logging.DEBUG
          if current_time - heartbeat_running_msg_timestamp > state_interval:
            # log more steps every minute or so
            logging_level = logging.INFO
            heartbeat_running_msg_timestamp = current_time
    
          try:
            logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
    
            send_state = False
            if not retry:
              if current_time - last_state_timestamp > state_interval:
                send_state = True
    
              logger.log(logging_level, "Building heartbeat message")
    
              data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
            else:
              self.DEBUG_HEARTBEAT_RETRIES += 1
    
            if logger.isEnabledFor(logging.DEBUG):
              logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
            else:
              logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
            
            # 发送心跳 请求
            response = self.sendRequest(self.heartbeatUrl, data)
            exitStatus = 0
            if 'exitstatus' in response.keys():
              exitStatus = int(response['exitstatus'])
            
            # 如果发送请求失败,抛出异常
            if exitStatus != 0:
              raise Exception(response)
    
            serverId = int(response['responseId'])
    
            logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
    
            cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
    
            # TODO: this needs to be revised if hosts can be shared across multiple clusters
            heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
              if cluster_size > 0 \
              else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
    
            logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
    
            if 'hasMappedComponents' in response.keys():
              self.hasMappedComponents = response['hasMappedComponents'] is not False
            
            # 添加有等待任务
            if 'hasPendingTasks' in response.keys():
              has_pending_tasks = bool(response['hasPendingTasks'])
              self.recovery_manager.set_paused(has_pending_tasks)
            
            # register命令: 重新注册
            if 'registrationCommand' in response.keys():
              # check if the registration command is None. If none skip
              if response['registrationCommand'] is not None:
                logger.info("RegistrationCommand received - repeat agent registration")
                self.isRegistered = False
                self.repeatRegistration = True
                return
                
            # 处理agent有可能的内存泄露问题,如果超设定内存,则重启agent
            used_ram = get_used_ram()/1000
            # dealing with a possible memory leaks
            if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
              logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb", max_ram=self.max_ram_soft))
              self.restartAgent()
            if self.max_ram_hard and used_ram >= self.max_ram_hard:
              logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb", max_ram=self.max_ram_hard))
              self.restartAgent()
    
            if serverId != self.responseId + 1:
              logger.error("Error in responseId sequence - restarting")
              self.restartAgent()
            else:
              self.responseId = serverId
              if send_state:
                last_state_timestamp = current_time
    
            # if the response contains configurations, update the in-memory and
            # disk-based configuration cache (execution and alert commands have this)
            # 如果返回中包含配置,则更新内存中和磁盘上的配置(执行或者 报警会用到)
            logger.log(logging_level, "Updating configurations from heartbeat")
            self.cluster_configuration.update_configurations_from_heartbeat(response)
    
            response_keys = response.keys()
    
            # there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
            # this can cause command failure instead result suppression
            # so canceling and putting rescheduled commands should be executed atomically
            # 取消命令: 锁定activeQueue, 取消该命令
            # 执行命令: 执行该命令
            if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
              logger.log(logging_level, "Adding cancel/execution commands")
            with self.actionQueue.lock:
              if 'cancelCommands' in response_keys:
                self.cancelCommandInQueue(response['cancelCommands'])
    
              if 'executionCommands' in response_keys:
                execution_commands = response['executionCommands']
                self.recovery_manager.process_execution_commands(execution_commands)
                self.addToQueue(execution_commands)
            
            # status命令: 添加至status队列
            if 'statusCommands' in response_keys:
              # try storing execution command details and desired state
              self.addToStatusQueue(response['statusCommands'])
    
            if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
              getrecoverycommands_timestamp = current_time
              if not self.actionQueue.tasks_in_progress_or_pending():
                logger.log(logging_level, "Adding recovery commands")
                recovery_commands = self.recovery_manager.get_recovery_commands()
                for recovery_command in recovery_commands:
                  logger.info("Adding recovery command %s for component %s",
                              recovery_command['roleCommand'], recovery_command['role'])
                  self.addToQueue([recovery_command])
    
            if 'alertDefinitionCommands' in response_keys:
              logger.log(logging_level, "Updating alert definitions")
              self.alert_scheduler_handler.update_definitions(response)
    
            if 'alertExecutionCommands' in response_keys:
              logger.log(logging_level, "Executing alert commands")
              self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
    
            if "true" == response['restartAgent']:
              logger.error("Received the restartAgent command")
              self.restartAgent()
            else:
              logger.debug("No commands sent from %s", self.serverHostname)
    
            if retry:
              logger.info("Reconnected to %s", self.heartbeatUrl)
    
            if "recoveryConfig" in response:
              # update the list of components enabled for recovery
              logger.log(logging_level, "Updating recovery config")
              self.recovery_manager.update_configuration_from_registration(response)
    
            retry = False
            certVerifFailed = False
            self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
            self.DEBUG_HEARTBEAT_RETRIES = 0
            self.heartbeat_stop_callback.reset_heartbeat()
          except ssl.SSLError:
            self.repeatRegistration=False
            self.isRegistered = False
            logger.exception("SSLError while trying to heartbeat.")
            return
          except Exception, err:
            if "code" in err:
              logger.error(err.code)
            else:
              logException = False
              if logger.isEnabledFor(logging.DEBUG):
                logException = True
    
              exceptionMessage = str(err)
              errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage)
    
              if not retry:
                errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage)
    
              logger.error(errorMessage, exc_info=logException)
    
              if 'certificate verify failed' in str(err) and not certVerifFailed:
                logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
                certVerifFailed = True
    
            self.cachedconnect = None  # Previous connection is broken now
            retry = True
    
            #randomize the heartbeat
            delay = randint(0, self.max_reconnect_retry_delay)
            time.sleep(delay)
    
          # Sleep for some time
          timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
          logger.log(logging_level, "Waiting %s for next heartbeat", timeout)
    
          if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
            # Stop loop when stop event received
            logger.info("Stop event received")
            self.DEBUG_STOP_HEARTBEATING=True
    
          logger.log(logging_level, "Wait for next heartbeat over")
    

    相关文章

      网友评论

        本文标题:AmbariAgent-启动、Controller主线逻辑分析

        本文链接:https://www.haomeiwen.com/subject/iwwvuxtx.html