OTA制作

作者: 迷茫的天蝎 | 来源:发表于2016-11-18 14:45 被阅读0次

    OTA的制作流程是从ota_from_target_files开始的,ota_from_target_files模块最先执行的方法如下:

    if __name__ == '__main__':
      try:
        common.CloseInheritedPipes()     //MAC系统gmake会存在文件描述符泄露,因此在开始其他工作的时候将这些描述符关闭
        main(sys.argv[1:])                         //开始制作OTA包
      except common.ExternalError as e:
        print
        print "   ERROR: %s" % (e,)
        print
        sys.exit(1)
      finally:
        common.Cleanup()
    

    脚本入口函数main

    def main(argv):
    
      def option_handler(o, a):   \\将用户设定的option存入到一个OPTIONS类中
        if o == "--board_config":
          pass   # deprecated
        elif o in ("-k", "--package_key"):
          OPTIONS.package_key = a
        elif o in ("-i", "--incremental_from"):
          OPTIONS.incremental_source = a
        elif o == "--full_radio":
          OPTIONS.full_radio = True
        elif o == "--full_bootloader":
          OPTIONS.full_bootloader = True
        elif o in ("-w", "--wipe_user_data"):
          OPTIONS.wipe_user_data = True
        elif o in ("-n", "--no_prereq"):
          OPTIONS.omit_prereq = True
        elif o in ("-o", "--oem_settings"):
          OPTIONS.oem_source = a
        elif o in ("-e", "--extra_script"):
          OPTIONS.extra_script = a
        elif o in ("-a", "--aslr_mode"):
          if a in ("on", "On", "true", "True", "yes", "Yes"):
            OPTIONS.aslr_mode = True
          else:
            OPTIONS.aslr_mode = False
        elif o in ("-t", "--worker_threads"):
          if a.isdigit():
            OPTIONS.worker_threads = int(a)
          else:
            raise ValueError("Cannot parse value %r for option %r - only "
                             "integers are allowed." % (a, o))
        elif o in ("-2", "--two_step"):
          OPTIONS.two_step = True
        elif o == "--no_signing":
          OPTIONS.no_signing = True
        elif o == "--verify":
          OPTIONS.verify = True
        elif o == "--block":
          OPTIONS.block_based = True
        elif o in ("-b", "--binary"):
          OPTIONS.updater_binary = a
        elif o in ("--no_fallback_to_full",):
          OPTIONS.fallback_to_full = False
        elif o == "--stash_threshold":
          try:
            OPTIONS.stash_threshold = float(a)
          except ValueError:
            raise ValueError("Cannot parse value %r for option %r - expecting "
                             "a float" % (a, o))
        else:
          return False
        return True
    

    解析参数,将得到的参数和参数值传回给args,args是个列表,保存了没有加-或者--的参数

      args = common.ParseOptions(argv, __doc__,
                                 extra_opts="b:k:i:d:wne:t:a:2o:",
                                 extra_long_opts=[
                                     "board_config=",
                                     "package_key=",
                                     "incremental_from=",
                                     "full_radio",
                                     "full_bootloader",
                                     "wipe_user_data",
                                     "no_prereq",
                                     "extra_script=",
                                     "worker_threads=",
                                     "aslr_mode=",
                                     "two_step",
                                     "no_signing",
                                     "block",
                                     "binary=",
                                     "oem_settings=",
                                     "verify",
                                     "no_fallback_to_full",
                                     "stash_threshold=",
                                 ], extra_option_handler=option_handler)
    

    common.py中的ParseOptions,主要是调用python中的getopt模块中的getopt函数来解析参数

    def ParseOptions(argv,
                     docstring,
                     extra_opts="", extra_long_opts=(),
                     extra_option_handler=None):
      """Parse the options in argv and return any arguments that aren't
      flags.  docstring is the calling module's docstring, to be displayed
      for errors and -h.  extra_opts and extra_long_opts are for flags
      defined by the caller, which are processed by passing them to
      extra_option_handler."""
    
      try:
        opts, args = getopt.getopt(
            argv, "hvp:s:x:" + extra_opts,
            ["help", "verbose", "path=", "signapk_path=", "extra_signapk_args=",
             "java_path=", "java_args=", "public_key_suffix=",
             "private_key_suffix=", "boot_signer_path=", "boot_signer_args=",
             "verity_signer_path=", "verity_signer_args=", "device_specific=",
             "extra="] +
            list(extra_long_opts))
      except getopt.GetoptError as err:
        Usage(docstring)
        print "**", str(err), "**"
        sys.exit(2)
    
      for o, a in opts:
        if o in ("-h", "--help"):
          Usage(docstring)
          sys.exit()
        elif o in ("-v", "--verbose"):
          OPTIONS.verbose = True
        elif o in ("-p", "--path"):
          OPTIONS.search_path = a
        elif o in ("--signapk_path",):
          OPTIONS.signapk_path = a
        elif o in ("--extra_signapk_args",):
          OPTIONS.extra_signapk_args = shlex.split(a)
        elif o in ("--java_path",):
          OPTIONS.java_path = a
        elif o in ("--java_args",):
          OPTIONS.java_args = a
        elif o in ("--public_key_suffix",):
          OPTIONS.public_key_suffix = a
    elif o in ("--private_key_suffix",):
          OPTIONS.private_key_suffix = a
        elif o in ("--boot_signer_path",):
          OPTIONS.boot_signer_path = a
        elif o in ("--boot_signer_args",):
          OPTIONS.boot_signer_args = shlex.split(a)
        elif o in ("--verity_signer_path",):
          OPTIONS.verity_signer_path = a
        elif o in ("--verity_signer_args",):
          OPTIONS.verity_signer_args = shlex.split(a)
        elif o in ("-s", "--device_specific"):
          OPTIONS.device_specific = a
        elif o in ("-x", "--extra"):
          key, value = a.split("=", 1)
          OPTIONS.extras[key] = value
        else:
          if extra_option_handler is None or not extra_option_handler(o, a):
            assert False, "unknown option \"%s\"" % (o,)
    
      if OPTIONS.search_path:
        os.environ["PATH"] = (os.path.join(OPTIONS.search_path, "bin") +
                              os.pathsep + os.environ["PATH"])
    
      return args
    
      if len(args) != 2:
        common.Usage(__doc__)
        sys.exit(1)
    
      if OPTIONS.extra_script is not None:
        OPTIONS.extra_script = open(OPTIONS.extra_script).read()
    

    解压目标文件包

      print "unzipping target target-files..."
      OPTIONS.input_tmp, input_zip = common.UnzipTemp(args[0])
    

    common.py中的UnzipTemp函数,主要是对目标文件包进行解压,返回值是tmp临时路径和zipfile.ZipFile目标文件包的内容。

    def UnzipTemp(filename, pattern=None):
      """Unzip the given archive into a temporary directory and return the name.
    
      If filename is of the form "foo.zip+bar.zip", unzip foo.zip into a
      temp dir, then unzip bar.zip into that_dir/BOOTABLE_IMAGES.
    
      Returns (tempdir, zipobj) where zipobj is a zipfile.ZipFile (of the
      main file), open for reading.
      """
    
      tmp = tempfile.mkdtemp(prefix="targetfiles-")
      OPTIONS.tempfiles.append(tmp)
    
      def unzip_to_dir(filename, dirname):
        cmd = ["unzip", "-o", "-q", filename, "-d", dirname]
        if pattern is not None:
          cmd.append(pattern)
        p = Run(cmd, stdout=subprocess.PIPE)  //开启一个进程来解压目标文件包
        p.communicate()
        if p.returncode != 0:
          raise ExternalError("failed to unzip input target-files \"%s\"" %
                              (filename,))
    
      m = re.match(r"^(.*[.]zip)\+(.*[.]zip)$", filename, re.IGNORECASE)
      if m:
        unzip_to_dir(m.group(1), tmp)
        unzip_to_dir(m.group(2), os.path.join(tmp, "BOOTABLE_IMAGES"))
        filename = m.group(1)
      else:
        unzip_to_dir(filename, tmp)
    
      return tmp, zipfile.ZipFile(filename, "r")
    
      OPTIONS.target_tmp = OPTIONS.input_tmp
      OPTIONS.info_dict = common.LoadInfoDict(input_zip)
    

    将目标文件包中META/misc_info.txt的信息进行解析保存

    def LoadInfoDict(input_file):
      """Read and parse the META/misc_info.txt key/value pairs from the
      input target files and return a dict."""
    
      def read_helper(fn):
        if isinstance(input_file, zipfile.ZipFile):
          return input_file.read(fn)
        else:
          path = os.path.join(input_file, *fn.split("/"))
          try:
            with open(path) as f:
              return f.read()
          except IOError as e:
            if e.errno == errno.ENOENT:
              raise KeyError(fn)
      d = {}
      try:
        d = LoadDictionaryFromLines(read_helper("META/misc_info.txt").split("\n"))
      except KeyError:
        # ok if misc_info.txt doesn't exist
        pass
    
      # backwards compatibility: These values used to be in their own
      # files.  Look for them, in case we're processing an old
      # target_files zip.
    
      if "mkyaffs2_extra_flags" not in d:
        try:
          d["mkyaffs2_extra_flags"] = read_helper(
              "META/mkyaffs2-extra-flags.txt").strip()
        except KeyError:
          # ok if flags don't exist
          pass
    
      if "recovery_api_version" not in d:
        try:
          d["recovery_api_version"] = read_helper(
              "META/recovery-api-version.txt").strip()
        except KeyError:
          raise ValueError("can't find recovery API version in input target-files")
    
    if "tool_extensions" not in d:
        try:
          d["tool_extensions"] = read_helper("META/tool-extensions.txt").strip()
        except KeyError:
          # ok if extensions don't exist
          pass
    
      if "fstab_version" not in d:
        d["fstab_version"] = "1"
    
      try:
        data = read_helper("META/imagesizes.txt")
        for line in data.split("\n"):
          if not line:
            continue
          name, value = line.split(" ", 1)
          if not value:
            continue
          if name == "blocksize":
            d[name] = value
          else:
            d[name + "_size"] = value
      except KeyError:
        pass
    
      def makeint(key):
        if key in d:
          d[key] = int(d[key], 0)
    
      makeint("recovery_api_version")
      makeint("blocksize")
      makeint("system_size")
      makeint("vendor_size")
      makeint("userdata_size")
      makeint("cache_size")
      makeint("recovery_size")
      makeint("boot_size")
      makeint("fstab_version")
    
      d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"])
      d["build.prop"] = LoadBuildProp(read_helper)
      return d
    
    def LoadDictionaryFromLines(lines):
      d = {}
      for line in lines:
        line = line.strip()
        if not line or line.startswith("#"):
          continue
        if "=" in line:
          name, value = line.split("=", 1)
          d[name] = value
      return d
    
      # If this image was originally labelled with SELinux contexts, make sure we
      # also apply the labels in our new image. During building, the "file_contexts"
      # is in the out/ directory tree, but for repacking from target-files.zip it's
      # in the root directory of the ramdisk.
      if "selinux_fc" in OPTIONS.info_dict:
        OPTIONS.info_dict["selinux_fc"] = os.path.join(
            OPTIONS.input_tmp, "BOOT", "RAMDISK", "file_contexts")
    
      if OPTIONS.verbose:
        print "--- target info ---"
        common.DumpInfoDict(OPTIONS.info_dict)    //打印从目标文件包中的/META/misc_info.txt读取的信息
    
      # If the caller explicitly specified the device-specific extensions
      # path via -s/--device_specific, use that.  Otherwise, use
      # META/releasetools.py if it is present in the target target_files.
      # Otherwise, take the path of the file from 'tool_extensions' in the
      # info dict and look for that in the local filesystem, relative to
      # the current directory.
    
      if OPTIONS.device_specific is None:
        from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py")
        if os.path.exists(from_input):
          print "(using device-specific extensions from target_files)"
          OPTIONS.device_specific = from_input
        else:
          OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions", None)
    
      if OPTIONS.device_specific is not None:
        OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific)
    
      while True:
    
        if OPTIONS.no_signing:
          if os.path.exists(args[1]):
            os.unlink(args[1])
          output_zip = zipfile.ZipFile(args[1], "w",
                                       compression=zipfile.ZIP_DEFLATED)
        else:
          temp_zip_file = tempfile.NamedTemporaryFile()
          output_zip = zipfile.ZipFile(temp_zip_file, "w",
                                       compression=zipfile.ZIP_DEFLATED)
    
        cache_size = OPTIONS.info_dict.get("cache_size", None)
        if cache_size is None:
          raise RuntimeError("can't determine the cache partition size")
        OPTIONS.cache_size = cache_size
    
        if OPTIONS.incremental_source is None:
          WriteFullOTAPackage(input_zip, output_zip)
          if OPTIONS.package_key is None:
            OPTIONS.package_key = OPTIONS.info_dict.get(
                "default_system_dev_certificate",
                "build/target/product/security/testkey")
          common.ZipClose(output_zip)
          break
    
        else:
          print "unzipping source target-files..."     //解析和保存源文件包的相关信息
          OPTIONS.source_tmp, source_zip = common.UnzipTemp(
              OPTIONS.incremental_source)
          OPTIONS.target_info_dict = OPTIONS.info_dict
          OPTIONS.source_info_dict = common.LoadInfoDict(source_zip)
          if "selinux_fc" in OPTIONS.source_info_dict:
            OPTIONS.source_info_dict["selinux_fc"] = os.path.join(
                OPTIONS.source_tmp, "BOOT", "RAMDISK", "file_contexts")
          if OPTIONS.package_key is None:
            OPTIONS.package_key = OPTIONS.source_info_dict.get(
                "default_system_dev_certificate",
                "build/target/product/security/testkey")
          if OPTIONS.verbose:
            print "--- source info ---"
            common.DumpInfoDict(OPTIONS.source_info_dict)
    
          try:
            WriteIncrementalOTAPackage(input_zip, source_zip, output_zip)  //构建OTA包
            common.ZipClose(output_zip)
            break
          except ValueError:
            if not OPTIONS.fallback_to_full:
              raise
            print "--- failed to build incremental; falling back to full ---"
            OPTIONS.incremental_source = None
            common.ZipClose(output_zip)
    
      if not OPTIONS.no_signing:
        SignOutput(temp_zip_file.name, args[1])
        temp_zip_file.close()
    
      print "done."
    

    相关文章

      网友评论

          本文标题:OTA制作

          本文链接:https://www.haomeiwen.com/subject/dpmjpttx.html