美文网首页
hadoop源码学习之namenode启动

hadoop源码学习之namenode启动

作者: 多彩海洋 | 来源:发表于2019-09-17 14:57 被阅读0次

    概述

    hdfs主要包括两类节点,namenode和datanode,所以hdfs的启动也就是这两类节点的启动.
    namenode管理者所有的datanode信息、数据块信息等,它是整个hdfs的核心,首先要启动namenode,然后再启动datanode。

    namenode格式化

    namenode的class是org.apache.hadoop.hdfs.server.namenode.NameNode,位于hadooop-hdfs项目中,入口方法是main方法,main方法调用了静态方法createNameNode来创建namenode。

    在createNameNode方法中,通过switch中的参数来判断做格式化、升级、启动等操作,代码如下:

        switch (startOpt) {
          case FORMAT: {
            boolean aborted = format(conf, startOpt.getForceFormat(),
                startOpt.getInteractiveFormat());
            terminate(aborted ? 1 : 0);
            return null; // avoid javac warning
          }
          case GENCLUSTERID: {
            System.err.println("Generating new cluster id:");
            System.out.println(NNStorage.newClusterID());
            terminate(0);
            return null;
          }
          case FINALIZE: {
            System.err.println("Use of the argument '" + StartupOption.FINALIZE +
                "' is no longer supported. To finalize an upgrade, start the NN " +
                " and then run `hdfs dfsadmin -finalizeUpgrade'");
            terminate(1);
            return null; // avoid javac warning
          }
          case ROLLBACK: {
            boolean aborted = doRollback(conf, true);
            terminate(aborted ? 1 : 0);
            return null; // avoid warning
          }
          case BOOTSTRAPSTANDBY: {
            String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
            int rc = BootstrapStandby.run(toolArgs, conf);
            terminate(rc);
            return null; // avoid warning
          }
          case INITIALIZESHAREDEDITS: {
            boolean aborted = initializeSharedEdits(conf,
                startOpt.getForceFormat(),
                startOpt.getInteractiveFormat());
            terminate(aborted ? 1 : 0);
            return null; // avoid warning
          }
          case BACKUP:
          case CHECKPOINT: {
            NamenodeRole role = startOpt.toNodeRole();
            DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
            return new BackupNode(conf, role);
          }
          case RECOVER: {
            NameNode.doRecovery(startOpt, conf);
            return null;
          }
          case METADATAVERSION: {
            printMetadataVersion(conf);
            terminate(0);
            return null; // avoid javac warning
          }
          case UPGRADEONLY: {
            DefaultMetricsSystem.initialize("NameNode");
            new NameNode(conf);
            terminate(0);
            return null;
          }
          default: {
            DefaultMetricsSystem.initialize("NameNode");
            return new NameNode(conf);
          }
        }
    

    namenode启动之前需要先进行格式化操作,格式化的主要目的就是初始化namenode的数据目录,初始化集群的id、版本等,会在配置的目录写入相应的属性文件.

    格式化的主要方法是namenode中的format方法,在这里主要从配置文件中读取相应的配置,做一些相应的检查,然后构造了两个hdfs非常核心的类FSImage、FSNamesystem。


    FSImage:在硬盘中存储着hdfs系统的元数据,保存着在某一个时刻hdfs的镜像,后续的各种实时的操作记录在FSEditLog类中。

    FSNamesystem :FSNamesystem does the actual bookkeeping work for the DataNode,这个注释很好的解释了FSNamesystem的功能

        FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
        try {
          FSNamesystem fsn = new FSNamesystem(conf, fsImage);
          fsImage.getEditLog().initJournalsForWrite();
    
          if (!fsImage.confirmFormat(force, isInteractive)) {
            return true; // aborted
          }
    
          fsImage.format(fsn, clusterId);
        } catch (IOException ioe) {
          LOG.warn("Encountered exception during format: ", ioe);
          fsImage.close();
          throw ioe;
        }
    

    最后调用了FSImage#format方法进行格式化操作

      void format(FSNamesystem fsn, String clusterId) throws IOException {
        long fileCount = fsn.getTotalFiles();
        // Expect 1 file, which is the root inode
        Preconditions.checkState(fileCount == 1,
            "FSImage.format should be called with an uninitialized namesystem, has " +
            fileCount + " files");
        NamespaceInfo ns = NNStorage.newNamespaceInfo();
        LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
        ns.clusterID = clusterId;
    
        storage.format(ns);//进行namenode所有的目录的格式化,写属性文件
        editLog.formatNonFileJournals(ns);//初始化编辑日志
        saveFSImageInAllDirs(fsn, 0);//初始化fsimage
      }
    

    创建NameNode对象,接着会执行initialize方法初始化

      protected void initialize(Configuration conf) throws IOException {
            if(conf.get("hadoop.user.group.metrics.percentiles.intervals") == null) {
                String intervals = conf.get("dfs.metrics.percentiles.intervals");
                if(intervals != null) {
                    conf.set("hadoop.user.group.metrics.percentiles.intervals", intervals);
                }
            }
            
            UserGroupInformation.setConfiguration(conf);
            this.loginAsNameNodeUser(conf);
            initMetrics(conf, this.getRole());
            StartupProgressMetrics.register(startupProgress);
            if(NamenodeRole.NAMENODE == this.role) {
                this.startHttpServer(conf);//启动一个web服务器
            }
    
            this.spanReceiverHost = SpanReceiverHost.get(conf, "dfs.htrace.");
            this.loadNamesystem(conf);//从磁盘加载元数据到内存中,
            this.rpcServer = this.createRpcServer(conf);//初始化namenode的rpc服务
            if(this.clientNamenodeAddress == null) {
                this.clientNamenodeAddress = NetUtils.getHostPortString(this.rpcServer.getRpcAddress());
                LOG.info("Clients are to use " + this.clientNamenodeAddress + " to access" + " this namenode/service.");
            }
    
            if(NamenodeRole.NAMENODE == this.role) {
                this.httpServer.setNameNodeAddress(this.getNameNodeAddress());
                this.httpServer.setFSImage(this.getFSImage());
            }
    
            this.pauseMonitor = new JvmPauseMonitor(conf);
            this.pauseMonitor.start();
            metrics.getJvmMetrics().setPauseMonitor(this.pauseMonitor);
            this.startCommonServices(conf);//启动服务
        }
    

    namenode的启动

    整体流程

    namenode的启动是直接进入了了上述switch中的default选项,使用配置文件调用构造方法构造了namenode对象

    在构造方法中,首先进行了一系列的赋值操作,然后调用initialize(Configuration conf)来初始化namenode

    首先通过startHttpServer来启动一个web服务器,我们可以通过这个web服务器来查询hdfs的各种使用情况,其次用loadNamesystem(conf)从磁盘加载元数据到内存中,然后 rpcServer = createRpcServer(conf);来初始化namenode的rpc服务。最后startCommonServices(conf);来启动服务,这个时候namnode的各项服务就初始化完成了。

    服务启动流程详解

    通过FSNamesystem中的startCommonServices方法来启动服务
    namesystem.startCommonServices(conf, haContext);
    然后调用了org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.activate(Configuration)

      public void activate(Configuration conf) {
        pendingReplications.start();
        datanodeManager.activate(conf);
        this.replicationThread.start();
        this.blockReportThread.start();
      }
    

    pendingReplications

    pendingReplications.start()构造了一个PendingReplicationMonitor线程并且启动,
    PendingReplicationMonitor的功能参考以下注释

      /*
       * A periodic thread that scans for blocks that never finished
       * their replication request.
       */
      class PendingReplicationMonitor implements Runnable {
          .................
      }
    

    datanodeManager

    datanodeManager.activate(conf);启动了监控线程和心跳处理线程
    
      void activate(final Configuration conf) {
        decomManager.activate(conf);
        heartbeatManager.activate(conf);
      }
    

    replicationThread

    初始化副本的监控线程ReplicationMonitor并启动

    blockReportThread

    初始化块处理线程,对datanode上报的数据块进行处理,此线程为守护线程
    setDaemon(true)

    相关文章

      网友评论

          本文标题:hadoop源码学习之namenode启动

          本文链接:https://www.haomeiwen.com/subject/bytyyctx.html