美文网首页
kaldi 源码分析(三) - run.pl 分析

kaldi 源码分析(三) - run.pl 分析

作者: 走在成长的道路上 | 来源:发表于2018-08-03 14:05 被阅读0次

    在 kaldi 样本(egs/xxx/s5)目录下,大都会存在如下文件及文件夹:

    cmd.sh                     # 并行执行命令,通常分 run.pl, queue.pl 两种
    config                       # 参数定制化配置文件, mfcc, decode, cmvn 等配置文件
    local                         # 工程定制化内容
    path.sh                    # 环境变量相关脚本
    run.sh                      # 整体流程控制脚本,主入口脚本
    steps                       # 存放单一步骤执行的脚本
    utils                         # 存放解析文件,预处理等相关工具的脚本
    

    对于 kaldi 这套系统而言,queue.pl 主要对 SUN 公司的 GridEngine 相对而言友好一些,对于单机执行,通常在 cmd.sh 中配置为 run.pl 即单机多进程执行。

    train_cmd="utils/run.pl"
    decode_cmd="utils/run.pl"
    

    如上述目录结构分析类似, 与并行计算相关的内容均在 utils/parallel 目录下

    这里简单分析下 run.pl 文件, 如下:

    #!/usr/bin/env perl
    use warnings; #sed replacement for -w perl parameter
    
    # 主要介绍了两种方式执行:
    #    * 常见用法:
    #             run.pl some.log a b c
    #        即在 bash 环境中执行 a b c 命令,并将日志输出到 some.log 文件中
    #    * 并行任务:
    #             run.pl JOB=1:4 some.JOB.log  a b c JOB
    #        即在 bash 环境中执行 a b c JOB 命令,并将日志输出到 some.JOB.log 文件中, 其中 JOB 表示执行任务的名称, 任意一个 Job 失败,整体失败。
    # In general, doing
    #  run.pl some.log a b c is like running the command a b c in
    # the bash shell, and putting the standard error and output into some.log.
    # To run parallel jobs (backgrounded on the host machine), you can do (e.g.)
    #  run.pl JOB=1:4 some.JOB.log a b c JOB is like running the command a b c JOB
    # and putting it in some.JOB.log, for each one. [Note: JOB can be any identifier].
    # If any of the jobs fails, this script will fail.
    
    # 典型样例如下:
    #        run.pl some.log my-prog "--opt=foo bar" foo \| other-prog baz
    # 即其与在 bash 中执行 如下命令类似: 
    #        ( my-prog '--opt=foo bar' foo |  other-prog baz ) >& some.log
    #
    # A typical example is:
    #  run.pl some.log my-prog "--opt=foo bar" foo \|  other-prog baz
    # and run.pl will run something like:
    # ( my-prog '--opt=foo bar' foo |  other-prog baz ) >& some.log
    #
    # Basically it takes the command-line arguments, quotes them
    # as necessary to preserve spaces, and evaluates them with bash.
    # In addition it puts the command line at the top of the log, and
    # the start and end times of the command at the beginning and end.
    # The reason why this is useful is so that we can create a different
    # version of this program that uses a queueing system instead.
    
    # use Data::Dumper;
    
    # 参数少于 2 则退出程序并输出帮助信息
    @ARGV < 2 && die "usage: run.pl log-file command-line arguments...";
    
    # 对多可执行的 jobs 数据量
    $max_jobs_run = -1;
    # job 起止量
    $jobstart = 1;
    $jobend = 1;
    $ignored_opts = ""; # These will be ignored.
    
    # 首先解析 JOB=1:4 类似选项
    # First parse an option like JOB=1:4, and any
    # options that would normally be given to
    # queue.pl, which we will just discard.
    
    # 通过循环方式取出前两个参数
    for (my $x = 1; $x <= 2; $x++) { # This for-loop is to
      # allow the JOB=1:n option to be interleaved with the
      # options to qsub.
      while (@ARGV >= 2 && $ARGV[0] =~ m:^-:) {
        # parse any options that would normally go to qsub, but which will be ignored here.
       # 取出选项内容
        my $switch = shift @ARGV;
        if ($switch eq "-V") {
          $ignored_opts .= "-V ";
        } elsif ($switch eq "--max-jobs-run" || $switch eq "-tc") {
          # 获取最大 jobs 数量
          # we do support the option --max-jobs-run n, and its GridEngine form -tc n.
          $max_jobs_run = shift @ARGV;
          if (! ($max_jobs_run > 0)) {
            die "run.pl: invalid option --max-jobs-run $max_jobs_run";
          }
        } else {
          my $argument = shift @ARGV;
          if ($argument =~ m/^--/) {
            print STDERR "run.pl: WARNING: suspicious argument '$argument' to $switch; starts with '-'\n";
          }
          if ($switch eq "-sync" && $argument =~ m/^[yY]/) {
            # 获取同步选项
            $ignored_opts .= "-sync "; # Note: in the
            # corresponding code in queue.pl it says instead, just "$sync = 1;".
          } elsif ($switch eq "-pe") { # e.g. -pe smp 5
            # 获取 -pe 选项
            my $argument2 = shift @ARGV;
            $ignored_opts .= "$switch $argument $argument2 ";
          } elsif ($switch eq "--gpu") {
            # 获取 gpu 选项
            $using_gpu = $argument;
          } else {
            # Ignore option.
            $ignored_opts .= "$switch $argument ";
          }
        }
      }
      # 用来匹配 JOB=1:20 选项, 并将其分别放置到 jobname, jobstart, jobend 当中
      if ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+):(\d+)$/) { # e.g. JOB=1:20
        $jobname = $1;
        $jobstart = $2;
        $jobend = $3;
        shift;
        if ($jobstart > $jobend) {
          die "run.pl: invalid job range $ARGV[0]";
        }
        if ($jobstart <= 0) {
          die "run.pl: invalid job range $ARGV[0], start must be strictly positive (this is required for GridEngine compatibility).";
        }
      # 用来匹配 JOB=1 选项, 并将其分别放置到 jobname, jobstart, jobend 当中
      } elsif ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+)$/) { # e.g. JOB=1.
        $jobname = $1;
        $jobstart = $2;
        $jobend = $2;
        shift;
      } elsif ($ARGV[0] =~ m/.+\=.*\:.*$/) {
        print STDERR "run.pl: Warning: suspicious first argument to run.pl: $ARGV[0]\n";
      }
    }
    
    # Users found this message confusing so we are removing it.
    # if ($ignored_opts ne "") {
    #   print STDERR "run.pl: Warning: ignoring options \"$ignored_opts\"\n";
    # }
    
    # 对 max_jobs_run 进行设置默认值
    if ($max_jobs_run == -1) { # If --max-jobs-run option not set,
                               # then work out the number of processors if possible,
                               # and set it based on that.
      $max_jobs_run = 0;
      if ($using_gpu) {
    # 当使用 GPU 时,通过执行 nvidia-smi -L 命令通过读取行数来获取最大可执行Job数量
        if (open(P, "nvidia-smi -L |")) {
          $max_jobs_run++ while (<P>);
          close(P);
        }
    # 没有 GPU 时,输出错误信息
        if ($max_jobs_run == 0) {
          $max_jobs_run = 1;
          print STDERR "run.pl: Warning: failed to detect number of GPUs from nvidia-smi, using ${max_jobs_run}\n";
        }
      } elsif (open(P, "</proc/cpuinfo")) {  # Linux
    # 在 linux 系统下,通过读取 /proc/cpuinfo 来获取 processor 数量并传递给最大jobs数量
        while (<P>) { if (m/^processor/) { $max_jobs_run++; } }
        if ($max_jobs_run == 0) {
          print STDERR "run.pl: Warning: failed to detect any processors from /proc/cpuinfo\n";
          $max_jobs_run = 10;  # reasonable default.
        }
        close(P);
      } elsif (open(P, "sysctl -a |")) {  # BSD/Darwin
    # 在 BSD/Darwin 系统下,通过 sysctl -a 命令匹配 hw\.ncpu\s*[:=]\s*(\d+) 出CPU数量并传递给最大jobs数量
        while (<P>) {
          if (m/hw\.ncpu\s*[:=]\s*(\d+)/) { # hw.ncpu = 4, or hw.ncpu: 4
            $max_jobs_run = $1;
            last;
          }
        }
        close(P);
        if ($max_jobs_run == 0) {
          print STDERR "run.pl: Warning: failed to detect any processors from sysctl -a\n";
          $max_jobs_run = 10;  # reasonable default.
        }
      } else {
        # 对于非 UNIX 系统下,设置默认 32 个,可根据实际情况进行修改
        # allow at most 32 jobs at once, on non-UNIX systems; change this code
        # if you need to change this default.
        $max_jobs_run = 32;
      }
      # The just-computed value of $max_jobs_run is just the number of processors
      # (or our best guess); and if it happens that the number of jobs we need to
      # run is just slightly above $max_jobs_run, it will make sense to increase
      # $max_jobs_run to equal the number of jobs, so we don't have a small number
      # of leftover jobs.
      $num_jobs = $jobend - $jobstart + 1;
      if (!$using_gpu &&
          $num_jobs > $max_jobs_run && $num_jobs < 1.4 * $max_jobs_run) {
        $max_jobs_run = $num_jobs;
      }
    }
    
    # 配置日志文件
    $logfile = shift @ARGV;
    
    if (defined $jobname && $logfile !~ m/$jobname/ &&
        $jobend > $jobstart) {
      print STDERR "run.pl: you are trying to run a parallel job but "
        . "you are putting the output into just one log file ($logfile)\n";
      exit(1);
    }
    
    $cmd = "";
    # 重新组合指令内容
    foreach $x (@ARGV) {
        if ($x =~ m/^\S+$/) { $cmd .=  $x . " "; }
        elsif ($x =~ m:\":) { $cmd .= "'$x' "; }
        else { $cmd .= "\"$x\" "; }
    }
    
    #$Data::Dumper::Indent=0;
    $ret = 0;
    $numfail = 0;
    %active_pids=();
    
    # 按 jobstart 和 jobend 创建进程,并执行相应的命令
    use POSIX ":sys_wait_h";
    for ($jobid = $jobstart; $jobid <= $jobend; $jobid++) {
      if (scalar(keys %active_pids) >= $max_jobs_run) {
    
        # Lets wait for a change in any child's status
        # Then we have to work out which child finished
        $r = waitpid(-1, 0);
        $code = $?;
        if ($r < 0 ) { die "run.pl: Error waiting for child process"; } # should never happen.
        if ( defined $active_pids{$r} ) {
            $jid=$active_pids{$r};
            $fail[$jid]=$code;
            if ($code !=0) { $numfail++;}
            delete $active_pids{$r};
            # print STDERR "Finished: $r/$jid " .  Dumper(\%active_pids) . "\n";
        } else {
            die "run.pl: Cannot find the PID of the chold process that just finished.";
        }
    
        # In theory we could do a non-blocking waitpid over all jobs running just
        # to find out if only one or more jobs finished during the previous waitpid()
        # However, we just omit this and will reap the next one in the next pass
        # through the for(;;) cycle
      }
      $childpid = fork();
      if (!defined $childpid) { die "run.pl: Error forking in run.pl (writing to $logfile)"; }
      if ($childpid == 0) { # We're in the child... this branch
        # executes the job and returns (possibly with an error status).
        if (defined $jobname) {
          $cmd =~ s/$jobname/$jobid/g;
          $logfile =~ s/$jobname/$jobid/g;
        }
       # 创建日志目录
        system("mkdir -p `dirname $logfile` 2>/dev/null");
       # 打开日志文件
        open(F, ">$logfile") || die "run.pl: Error opening log file $logfile";
       # 输出执行命令,及时间
        print F "# " . $cmd . "\n";
        print F "# Started at " . `date`;
        $starttime = `date +'%s'`;
        print F "#\n";
        close(F);
    
        # 开始执行命令
        # Pipe into bash.. make sure we're not using any other shell.
        open(B, "|bash") || die "run.pl: Error opening shell command";
        print B "( " . $cmd . ") 2>>$logfile >> $logfile";
        close(B);                   # If there was an error, exit status is in $?
        $ret = $?;
    
        $lowbits = $ret & 127;
        $highbits = $ret >> 8;
        if ($lowbits != 0) { $return_str = "code $highbits; signal $lowbits" }
        else { $return_str = "code $highbits"; }
        
        # 输出命令结束时间
        $endtime = `date +'%s'`;
        open(F, ">>$logfile") || die "run.pl: Error opening log file $logfile (again)";
        $enddate = `date`;
        chop $enddate;
        print F "# Accounting: time=" . ($endtime - $starttime) . " threads=1\n";
        print F "# Ended ($return_str) at " . $enddate . ", elapsed time " . ($endtime-$starttime) . " seconds\n";
        close(F);
        exit($ret == 0 ? 0 : 1);
      } else {
        $pid[$jobid] = $childpid;
        $active_pids{$childpid} = $jobid;
        # print STDERR "Queued: " .  Dumper(\%active_pids) . "\n";
      }
    }
    
    # 等待所有进程结束并判定其是否执行成功
    # Now we have submitted all the jobs, lets wait until all the jobs finish
    foreach $child (keys %active_pids) {
        $jobid=$active_pids{$child};
        $r = waitpid($pid[$jobid], 0);
        $code = $?;
        if ($r == -1) { die "run.pl: Error waiting for child process"; } # should never happen.
        if ($r != 0) { $fail[$jobid]=$code; $numfail++ if $code!=0; } # Completed successfully
    }
    
    # 判定每个任务结果是否执行成功
    # Some sanity checks:
    # The $fail array should not contain undefined codes
    # The number of non-zeros in that array  should be equal to $numfail
    # We cannot do foreach() here, as the JOB ids do not necessarily start by zero
    $failed_jids=0;
    for ($jobid = $jobstart; $jobid <= $jobend; $jobid++) {
      $job_return = $fail[$jobid];
      if (not defined $job_return ) {
        # print Dumper(\@fail);
    
        die "run.pl: Sanity check failed: we have indication that some jobs are running " .
          "even after we waited for all jobs to finish" ;
      }
      if ($job_return != 0 ){ $failed_jids++;}
    }
    if ($failed_jids != $numfail) {
      die "run.pl: Sanity check failed: cannot find out how many jobs failed ($failed_jids x $numfail)."
    }
    if ($numfail > 0) { $ret = 1; }
    
    if ($ret != 0) {
      $njobs = $jobend - $jobstart + 1;
      if ($njobs == 1) {
        if (defined $jobname) {
          $logfile =~ s/$jobname/$jobstart/; # only one numbered job, so replace name with
                                             # that job.
        }
        print STDERR "run.pl: job failed, log is in $logfile\n";
        if ($logfile =~ m/JOB/) {
          print STDERR "run.pl: probably you forgot to put JOB=1:\$nj in your script.";
        }
      }
      else {
        $logfile =~ s/$jobname/*/g;
        print STDERR "run.pl: $numfail / $njobs failed, log is in $logfile\n";
      }
    }
    # 结束并返回结果
    exit ($ret);
    

    相关文章

      网友评论

          本文标题:kaldi 源码分析(三) - run.pl 分析

          本文链接:https://www.haomeiwen.com/subject/vigjvftx.html