美文网首页CTF-PWN工作生活
Angr 中的函数识别

Angr 中的函数识别

作者: Nevv | 来源:发表于2019-07-02 10:34 被阅读0次

    Angr 中的函数识别

    函数识别概述

    ​ 二进制文件通过IDA或者radare2这样的反汇编工具,能够识别二进制文件中的函数边界信息,并根据其调用关系生成整个程序的函数调用图。因此准确识别二进制文件中的函数边界对于进一步的分析二进制文件非常重要。二进制文件通常组织为 数据、代码、元数据的形式,在没有strip掉符号表的二进制文件中,关于函数的起始偏移、大小通常在元数据中可以直接找到,但是对于去除符号表的二进制文件,由于其符号表的缺失,通常需要采用别的以下方法进行函数识别:

    • 函数序言和尾声

    • 通过对call、jmp等指令所跳转的地址进行分析

    • 分析不可达代码位置

    ​ 但是采用以上方法同样会存在一些不准确的问题,比如依赖函数序言和尾声的检查,在使用不同编译器或者优化选项的情况下,采用这种硬编码的方式可能就不再适用;对于call或者jmp指令的分析,可能很多地址只有在运行时才能确定等等。

    Angr中的函数识别

    1. 初始化阶段

    ​ Angr中函数识别的过程是在CFGFast中构建CFG的时候进行的,在初始化阶段主要步骤如下:

    • 首先调用self._executable_memory_regions获取二进制文件有执行权限的region,将这些区间作为候选分析位置

    • 然后对得到的regions进行一些筛选和处理,同时对分析进行一些设置,比如设置是否有符号表、是否收集数据引用信息等。这里可以使用objdump命令查看文件的符号表信息:

      • 查看动态符号表信息
      nevv@ubuntu:~/angr/others$ objdump -T cgibin | grep "main"
      004023e0 g    DF .text    00000304  Base        main
      00409480 g    DF .text    000005ec  Base        hedwigcgi_main
      00408a78 g    DF .text    00000198  Base        captchacgi_main
      0040b930 g    DF .text    00000b28  Base        conntrackcgi_main
      00406b40 g    DF .text    00000008  Base        dlcfg_main
      0040a268 g    DF .text    000003d4  Base        servicecgi_main
      0040cb5c g    DF .text    00000604  Base        hnap_main
      0040a6f0 g    DF .text    00000218  Base        ssdpcgi_main
      0040b818 g    DF .text    00000110  Base        genacgi_main
      00407164 g    DF .text    000000e4  Base        fwupdater_main
      00409ae0 g    DF .text    00000620  Base        pigwidgeoncgi_main
      00406b48 g    DF .text    00000008  Base        fwup_main
      004195a0      DF *UND*    00000000              __uClibc_main
      00405284 g    DF .text    000002c8  Base        phpcgi_main
      0040ae50 g    DF .text    00000484  Base        soapcgi_main
      00408e10 g    DF .text    0000066c  Base        sessioncgi_main
      00406b50 g    DF .text    00000008  Base        seamacgi_main
      
      • 查看静态符号表信息:
      nevv@ubuntu:~/angr/others$ objdump -t cgibin
      
      cgibin:     file format elf32-tradlittlemips
      
      SYMBOL TABLE:
      no symbols
      

      两者的区别如下:

      • 静态链接中有一个专门的段叫符号表 -- “.symtab”(Symbol Table), 里面保存了所有关于该目标文件的符号的定义和引用。

      • 动态链接中同样有一个段叫 动态符号表 -- “.dynsym”(Dynamic Symbol) , 但.dynsym 相对于 .symtab 只保存了与动态链接相关的导入导出符号。so中同样有.symtab,其中保存着所有的符号。

    • 最后调用 self._analyze 进行前向分析

    2. 分析阶段

    _analyze方法很简单,主要就是self._pre_analysis()和根据self._graph_visitor 是否存在来调用对应的分析方法,对于还没有建立一个图结构的分析来说(就比如cfg恢复分析),第一次总是会调用self._analysis_core_baremetal() 方法。

        def _analyze(self):
            """
            The main analysis routine.
    
            :return: None
            """
    
            self._pre_analysis()
    
            if self._graph_visitor is None:
                # There is no base graph that we can rely on. The analysis itself should generate successors for the
                # current job.
                # An example is the CFG recovery.
    
                self._analysis_core_baremetal()
    
            else:
                # We have a base graph to follow. Just handle the current job.
    
                self._analysis_core_graph()
    
            self._post_analysis()
    
    0x1 _pre_analysis

    ​ 这个函数的主要功能就是,初始化分析过程中需要用到的变量,使用符号表,将符号表中的函数起始位置作为分析的起始位置,使用函数序言搜索函数,并将搜索结果保存在 _function_prologue_addrs 中。

        def _pre_analysis(self):
            import pdb
            pdb.set_trace()
            # 初始化一些cfg相关的变量
            self._initialize_cfg()
    
            # Scan for __x86_return_thunk and friends
            self._known_thunks = self._find_thunks()
            """
            这里应该是会寻找一些特殊的字节序列
    >>> print disasm('E807000000F3900FAEE8EBF948890424C3'.decode("hex"))
       0:   e8 07 00 00 00          call   0xc
       5:   f3 90                   pause  
       7:   0f ae e8                lfence 
       a:   eb f9                   jmp    0x5
       c:   48                      dec    eax
       d:   89 04 24                mov    DWORD PTR [esp],eax
      10:   c3                      ret
    >>> print disasm('E807000000F3900FAEE8EBF9488D642408C3'.decode("hex"))
       0:   e8 07 00 00 00          call   0xc
       5:   f3 90                   pause  
       7:   0f ae e8                lfence 
       a:   eb f9                   jmp    0x5
       c:   48                      dec    eax
       d:   8d 64 24 08             lea    esp,[esp+0x8]
      11:   c3                      ret
    >>>  
            """
    
            # 初始化一些分析时候需要用到的变量
            self._pending_jobs = PendingJobs(self.functions, self._deregister_analysis_job)
            self._traced_addresses = set()
            self._function_returns = defaultdict(set)
    
            # 不是所有的函数调用都使用call指令,因此需要记录下每一个单一函数的退出点,
            # 在需要函数调用的时候,在函数调用图上添加对应的边
            self._function_exits = defaultdict(set)
    
            # 创建一个初始化状态
            self._initial_state = self.project.factory.blank_state(mode="fastpath")
            initial_options = self._initial_state.options - {o.TRACK_CONSTRAINTS} - o.refs
            initial_options |= {o.SUPER_FASTPATH, o.SYMBOL_FILL_UNCONSTRAINED_REGISTERS, o.SYMBOL_FILL_UNCONSTRAINED_MEMORY}
            # initial_options.remove(o.COW_STATES)
            self._initial_state.options = initial_options
    
            starting_points = set()
    
            # clear all existing functions
            self.kb.functions.clear()
    
            if self._use_symbols:
                starting_points |= self._function_addresses_from_symbols
            # 根据符号表获取函数的地址,这里有190个函数
            """
            调试的时候不知道这里为什么有一个0
            """
    
            if self._extra_function_starts:
                starting_points |= set(self._extra_function_starts)
    
            # 对函数入口点进行排序
            starting_points = sorted(list(starting_points), reverse=True)
    
            if self._start_at_entry and self.project.entry is not None and self._inside_regions(self.project.entry) and \
                    self.project.entry not in starting_points:
                # make sure self.project.entry is inserted
                starting_points += [ self.project.entry ]
    
            # 对于每一个起始位置,创建一个CFGJOB进行分析
            for sp in starting_points:
                job = CFGJob(sp, sp, 'Ijk_Boring')
                self._insert_job(job)
                # register the job to function `sp`
                self._register_analysis_job(sp, job)
    
            self._updated_nonreturning_functions = set()
    
            # 这里是使用函数序言进行函数查找,该例子找到了217个
            if self._use_function_prologues and self.project.concrete_target is None:
                self._function_prologue_addrs = sorted(self._func_addrs_from_prologues())
                # make a copy of those prologue addresses, so that we can pop from the list
                self._remaining_function_prologue_addrs = self._function_prologue_addrs[::]
    
                # make function_prologue_addrs a set for faster lookups
                self._function_prologue_addrs = set(self._function_prologue_addrs)
    
    
    0x2 _analysis_core_baremetal

    ​ 这个函数的功能就是从刚才的队列中取出来添加的 job 并处理。主要就是以下三步:

    • _pre_job_handling

    • _process_job_and_get_successors

    • _intra_analysis

      该函数的源码如下:

    def _analysis_core_baremetal(self):
    
        if not self._job_info_queue:
            self._job_queue_empty()
    
        while not self.should_abort:
    
            if self._status_callback is not None:
                self._status_callback(self)
    
            # should_abort might be changed by the status callback function
            if self.should_abort:
                return
    
            if not self._job_info_queue:
                self._job_queue_empty()   # 时间消耗 1/3
    
            if not self._job_info_queue:
                # still no job available
                break
    
            job_info = self._job_info_queue[0]
    
            try:
                self._pre_job_handling(job_info.job)
            except AngrDelayJobNotice:
                # delay the handling of this job
                continue
            except AngrSkipJobNotice:
                # consume and skip this job
                self._job_info_queue = self._job_info_queue[1:]
                self._job_map.pop(self._job_key(job_info.job), None)
                continue
    
            # remove the job info from the map
            self._job_map.pop(self._job_key(job_info.job), None)
    
            self._job_info_queue = self._job_info_queue[1:]
    
            self._process_job_and_get_successors(job_info)
    
            # Short-cut for aborting the analysis
            if self.should_abort:
                break
    
            self._intra_analysis()  
    
    • _job_queue_empty 占了构建cfg的总时间的1/3。其执行流程如下
      • 首先查找来自必须要返回的函数的job,如果有,添加到队列中
      • 然后将已经完成分析的函数添加到_completed_functions中
      • 迭代地分析所有更改的函数,更新它们的返回属性,直到达到一个定点(即没有找到新的返回/不返回函数)。
      • 尝试分析剩下的间接调用位置
      • 如果选择使用函数序言分析的话,就将之前使用函数序言找到的函数添加到job队列中
      • 如果选择使用完整分析的话,会获取下条指令封装为job添加到队列中
        def _job_queue_empty(self):
    
            if self._pending_jobs:
                # fastpath
                # look for a job that comes from a function that must return
                # if we can find one, just use it
                job = self._pop_pending_job(returning=True)
                if job is not None:
                    self._insert_job(job)
                    return
    
                self._clean_pending_exits()
    
            # did we finish analyzing any function?
            # fill in self._completed_functions
            self._make_completed_functions()
    
            # analyze function features, most importantly, whether each function returns or not
            self._analyze_all_function_features()
    
            # Clear _changed_functions set
            self._updated_nonreturning_functions = set()
    
            if self._pending_jobs:
                self._clean_pending_exits()
    
                job = self._pop_pending_job(returning=True)
                if job is not None:
                    self._insert_job(job)
                    return
    
                job = self._pop_pending_job(returning=False)
                if job is not None:
                    self._insert_job(job)
                    return
    
            # Try to see if there is any indirect jump left to be resolved
            if self._resolve_indirect_jumps and self._indirect_jumps_to_resolve:
                self._process_unresolved_indirect_jumps()
    
                if self._job_info_queue:
                    return
    
            if self._use_function_prologues and self._remaining_function_prologue_addrs:
                while self._remaining_function_prologue_addrs:
                    prolog_addr = self._remaining_function_prologue_addrs[0]
                    self._remaining_function_prologue_addrs = self._remaining_function_prologue_addrs[1:]
                    if self._seg_list.is_occupied(prolog_addr):
                        continue
    
                    job = CFGJob(prolog_addr, prolog_addr, 'Ijk_Boring')
                    self._insert_job(job)
                    self._register_analysis_job(prolog_addr, job)
                    return
    
            if self._force_complete_scan:
                addr = self._next_code_addr()
                if addr is None:
                    l.debug("Force-scan jumping failed")
                else:
                    l.debug("Force-scanning to %#x", addr)
    
                if addr is not None:
                    job = CFGJob(addr, addr, "Ijk_Boring", last_addr=None, job_type=CFGJob.JOB_TYPE_COMPLETE_SCANNING)
                    self._insert_job(job)
                    self._register_analysis_job(addr, job)
    
    
    • _pre_job_handling

      一些简单的对 job 的预处理,比如对进度条的计算

    • _process_job_and_get_successors

      这个函数的主要目的是对 job 进行处理,并获取当前job的后继,把当前job的后继节点包装为 job,添加入队列。时间消耗约为2/3

        def _process_job_and_get_successors(self, job_info):
            """
            Process a job, get all successors of this job, and call _handle_successor() to handle each successor.
    
            :param JobInfo job_info: The JobInfo instance
            :return: None
            """
    
            job = job_info.job
    
            successors = self._get_successors(job)
    
            all_new_jobs = [ ]
    
            for successor in successors:
                new_jobs = self._handle_successor(job, successor, successors)
                # 在cfgfast中是直接把其所有的后继返回
    
                if new_jobs:
                    all_new_jobs.extend(new_jobs)
    
                    for new_job in new_jobs:
                        self._insert_job(new_job)
    
            self._post_job_handling(job, all_new_jobs, successors)
    

    ​ _get_successors 在子类 cfgFast 中实现,主要功能是从给定的地址在搜索一个基本块

    0x3 _get_successors

    ​ 调用_scan_block函数,并将其后继基本块包装为job对象,并添加到待分析的队列中:

        def _get_successors(self, job):  # pylint:disable=arguments-differ
    
            # current_function_addr = job.func_addr
            # addr = job.addr
    
            # if current_function_addr != -1:
            #    l.debug("Tracing new exit %#x in function %#x", addr, current_function_addr)
            # else:
            #    l.debug("Tracing new exit %#x", addr)
    
            jobs = self._scan_block(job)
    
            # l.debug("... got %d jobs: %s", len(jobs), jobs)
    
            for job_ in jobs:  # type: CFGJob
                # register those jobs
                self._register_analysis_job(job_.func_addr, job_)
    
            return jobs
    
    0x4 _scan_block
    
        def _scan_block(self, cfg_job):
            """
            Scan a basic block starting at a specific address
    
            :param CFGJob cfg_job: The CFGJob instance.
            :return: a list of successors
            :rtype: list
            """
    
            addr = cfg_job.addr
            current_func_addr = cfg_job.func_addr
    
            # Fix the function address
            # This is for rare cases where we cannot successfully determine the end boundary of a previous function, and
            # as a consequence, our analysis mistakenly thinks the previous function goes all the way across the boundary,
            # resulting the missing of the second function in function manager.
            if addr in self._function_addresses_from_symbols:
                current_func_addr = addr
    
            if self._addr_hooked_or_syscall(addr):
                entries = self._scan_procedure(cfg_job, current_func_addr)
    
            else:
                entries = self._scan_irsb(cfg_job, current_func_addr)
    
            return entries
    
    0x5 _scan_irsb
        def _scan_irsb(self, cfg_job, current_func_addr):
            """
            Generate a list of successors (generating them each as entries) to IRSB.
            Updates previous CFG nodes with edges.
    
            :param CFGJob cfg_job: The CFGJob instance.
            :param int current_func_addr: Address of the current function
            :return: a list of successors
            :rtype: list
            """
            # 生成cfgnode
            addr, function_addr, cfg_node, irsb = self._generate_cfgnode(cfg_job, current_func_addr)
        
            # 添加函数内部指向该node的边
            cfg_job.apply_function_edges(self, clear=True)
    
            # function_addr and current_function_addr can be different. e.g. when tracing an optimized tail-call that jumps
            # into another function that has been identified before.
    
            if cfg_node is None:
                # exceptions occurred, or we cannot get a CFGNode for other reasons
                return [ ]
            
            # 为cfg添加相应的边
            self._graph_add_edge(cfg_node, cfg_job.src_node, cfg_job.jumpkind, cfg_job.src_ins_addr,
                                 cfg_job.src_stmt_idx
                                 )
            # 将对应的cfg添加到对应函数
            self._function_add_node(cfg_node, function_addr)
    
            if self.functions.get_by_addr(function_addr).returning is not True:
                self._updated_nonreturning_functions.add(function_addr)
    
            # If we have traced it before, don't trace it anymore
            real_addr = get_real_address_if_arm(self.project.arch, addr)
            if real_addr in self._traced_addresses:
                # the address has been traced before
                return [ ]
            else:
                # Mark the address as traced
                self._traced_addresses.add(real_addr)
    
            # irsb cannot be None here
            # assert irsb is not None
    
            # IRSB在每个CFGNode中只使用一次,因此在这里必须释放掉以节省内存
            cfg_node.irsb = None
            # 1/10 _scan_irsb 的时间消耗
            self._process_block_arch_specific(addr, irsb, function_addr)
    
            # Scan the basic block to collect data references
            if self._collect_data_ref:
                self._collect_data_references(irsb, addr)
            # 3/20 _scan_irsb 的时间消耗
            # Get all possible successors
            irsb_next, jumpkind = irsb.next, irsb.jumpkind
            successors = [ ]
    
            last_ins_addr = None
            ins_addr = addr
            if irsb.statements:
                for i, stmt in enumerate(irsb.statements):
                    if isinstance(stmt, pyvex.IRStmt.Exit):
                        successors.append((i,
                                           last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
                                           stmt.dst,
                                           stmt.jumpkind
                                           )
                                          )
                    elif isinstance(stmt, pyvex.IRStmt.IMark):
                        last_ins_addr = ins_addr
                        ins_addr = stmt.addr + stmt.delta
            else:
                for ins_addr, stmt_idx, exit_stmt in irsb.exit_statements:
                    successors.append((
                        stmt_idx,
                        last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
                        exit_stmt.dst,
                        exit_stmt.jumpkind
                    ))
    
            successors.append((DEFAULT_STATEMENT,
                               last_ins_addr if self.project.arch.branch_delay_slot else ins_addr, irsb_next, jumpkind)
                              )
    
            entries = [ ]
    
            # 如果是arm架构的话,就会做一些处理,然后再返回
            successors = self._post_process_successors(addr, irsb.size, successors)
    
            # Process each successor 这一部分时间消耗占用了 15/20
            for suc in successors:
                stmt_idx, ins_addr, target, jumpkind = suc
    
                entries += self._create_jobs(target, jumpkind, function_addr, irsb, addr, cfg_node, ins_addr,
                                             stmt_idx
                                             )
    
            return entries
    
    0x6 _create_jobs

    ​ 给定一个node和其后继节点的一些信息,返回CFGJobs的list。这个函数的主要执行流程如下:

    • 根据当前cfg_node获取target_address

    • 如果target_address是None,说明跳转地址不是一个确定的值,此时根据jumpkind的类型进行判断:

      • 如果是ret类型,说明是函数结尾,否则说明是一个间接跳转,调用_indirect_jump_encountered尝试去解析这个间接跳转

      • 如果成功解析,则直接把其所有可能的地址包装为cfg_job并返回

      • 如果jumpkind 属于 ("Ijk_Boring", 'Ijk_InvalICache'),说明跳转的地址是一个plt表的地址,直接添加到对应plt函数的边,并创建一个CFGJob添加到jobs中

      • 否则就把这个间接跳转加入待解析的间接跳转工作队列中。

        def _create_jobs(self, target, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr, stmt_idx):
            """
            Given a node and details of a successor, makes a list of CFGJobs
            and if it is a call or exit marks it appropriately so in the CFG
    
            :param int target:          Destination of the resultant job
            :param str jumpkind:        The jumpkind of the edge going to this node
            :param int current_function_addr: Address of the current function
            :param pyvex.IRSB irsb:     IRSB of the predecessor node
            :param int addr:            The predecessor address
            :param CFGNode cfg_node:    The CFGNode of the predecessor node
            :param int ins_addr:        Address of the source instruction.
            :param int stmt_idx:        ID of the source statement.
            :return:                    a list of CFGJobs
            :rtype:                     list
            """
    
            if type(target) is pyvex.IRExpr.Const:  # pylint: disable=unidiomatic-typecheck
                target_addr = target.con.value
            elif type(target) in (pyvex.IRConst.U8, pyvex.IRConst.U16, pyvex.IRConst.U32, pyvex.IRConst.U64):  # pylint: disable=unidiomatic-typecheck
                target_addr = target.value
            elif type(target) is int:  # pylint: disable=unidiomatic-typecheck
                target_addr = target
            else:
                target_addr = None
    
            if target_addr in self._known_thunks and jumpkind == 'Ijk_Boring':
                thunk_kind = self._known_thunks[target_addr][0]
                if thunk_kind == 'ret':
                    jumpkind = 'Ijk_Ret'
                    target_addr = None
                elif thunk_kind == 'jmp':
                    pass # ummmmmm not sure about this one
                else:
                    raise AngrCFGError("This shouldn't be possible")
    
            jobs = [ ]
            is_syscall = jumpkind.startswith("Ijk_Sys")
    
            # Special handling:
            # If a call instruction has a target that points to the immediate next instruction, we treat it as a boring jump
            if jumpkind == "Ijk_Call" and \
                    not self.project.arch.call_pushes_ret and \
                    cfg_node.instruction_addrs and \
                    ins_addr == cfg_node.instruction_addrs[-1] and \
                    target_addr == irsb.addr + irsb.size:
                jumpkind = "Ijk_Boring"
    
            if target_addr is None:
                # The target address is not a concrete value
    
                if jumpkind == "Ijk_Ret":
                    # This block ends with a return instruction.
                    if current_function_addr != -1:
                        self._function_exits[current_function_addr].add(addr)
                        self._function_add_return_site(addr, current_function_addr)
                        self.functions[current_function_addr].returning = True
                        self._pending_jobs.add_returning_function(current_function_addr)
    
                    cfg_node.has_return = True
    
                elif self._resolve_indirect_jumps and \
                        (jumpkind in ('Ijk_Boring', 'Ijk_Call', 'Ijk_InvalICache') or jumpkind.startswith('Ijk_Sys')):
                    # This is an indirect jump. Try to resolve it.
                    # FIXME: in some cases, a statementless irsb will be missing its instr addresses
                    # and this next part will fail. Use the real IRSB instead
                    irsb = cfg_node.block.vex
                    cfg_node.instruction_addrs = irsb.instruction_addresses
                    resolved, resolved_targets, ij = self._indirect_jump_encountered(addr, cfg_node, irsb,
                                                                                     current_function_addr, stmt_idx)
                    if resolved:
                        for resolved_target in resolved_targets:
                            if jumpkind == 'Ijk_Call':
                                jobs += self._create_job_call(cfg_node.addr, irsb, cfg_node, stmt_idx, ins_addr,
                                                              current_function_addr, resolved_target, jumpkind)
                            else:
                                edge = FunctionTransitionEdge(cfg_node, resolved_target, current_function_addr,
                                                              to_outside=False, stmt_idx=stmt_idx, ins_addr=ins_addr,
                                                              )
                                ce = CFGJob(resolved_target, current_function_addr, jumpkind,
                                            last_addr=resolved_target, src_node=cfg_node, src_stmt_idx=stmt_idx,
                                            src_ins_addr=ins_addr, func_edges=[ edge ],
                                            )
                                jobs.append(ce)
                        return jobs
    
                    if jumpkind in ("Ijk_Boring", 'Ijk_InvalICache'):
                        resolved_as_plt = False
    
                        if irsb and self._heuristic_plt_resolving:
                            # Test it on the initial state. Does it jump to a valid location?
                            # It will be resolved only if this is a .plt entry
                            resolved_as_plt = self._resolve_plt(addr, irsb, ij)
    
                            if resolved_as_plt:
                                jump_target = next(iter(ij.resolved_targets))
                                target_func_addr = jump_target  # TODO: FIX THIS
    
                                edge = FunctionTransitionEdge(cfg_node, jump_target, current_function_addr,
                                                              to_outside=True, dst_func_addr=jump_target,
                                                              stmt_idx=stmt_idx, ins_addr=ins_addr,
                                                              )
                                ce = CFGJob(jump_target, target_func_addr, jumpkind, last_addr=jump_target,
                                            src_node=cfg_node, src_stmt_idx=stmt_idx, src_ins_addr=ins_addr,
                                            func_edges=[edge],
                                            )
                                jobs.append(ce)
    
                        if resolved_as_plt:
                            # has been resolved as a PLT entry. Remove it from indirect_jumps_to_resolve
                            if ij.addr in self._indirect_jumps_to_resolve:
                                self._indirect_jumps_to_resolve.remove(ij.addr)
                                self._deregister_analysis_job(current_function_addr, ij)
                        else:
                            # add it to indirect_jumps_to_resolve
                            self._indirect_jumps_to_resolve.add(ij)
    
                            # register it as a job for the current function
                            self._register_analysis_job(current_function_addr, ij)
    
                    else:  # jumpkind == "Ijk_Call" or jumpkind.startswith('Ijk_Sys')
                        self._indirect_jumps_to_resolve.add(ij)
                        self._register_analysis_job(current_function_addr, ij)
    
                        jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr, None,
                                                      jumpkind, is_syscall=is_syscall
                                                      )
    
            elif target_addr is not None:
                # This is a direct jump with a concrete target.
    
                # pylint: disable=too-many-nested-blocks
                if jumpkind in ('Ijk_Boring', 'Ijk_InvalICache'):
                    # if the target address is at another section, it has to be jumping to a new function
                    if not self._addrs_belong_to_same_section(addr, target_addr):
                        target_func_addr = target_addr
                        to_outside = True
                    else:
                        # it might be a jumpout
                        target_func_addr = None
                        real_target_addr = get_real_address_if_arm(self.project.arch, target_addr)
                        if real_target_addr in self._traced_addresses:
                            node = self.model.get_any_node(target_addr)
                            if node is not None:
                                target_func_addr = node.function_address
                        if target_func_addr is None:
                            target_func_addr = current_function_addr
    
                        to_outside = not target_func_addr == current_function_addr
    
                    edge = FunctionTransitionEdge(cfg_node, target_addr, current_function_addr,
                                                  to_outside=to_outside,
                                                  dst_func_addr=target_func_addr,
                                                  ins_addr=ins_addr,
                                                  stmt_idx=stmt_idx,
                                                  )
    
                    ce = CFGJob(target_addr, target_func_addr, jumpkind, last_addr=addr, src_node=cfg_node,
                                src_ins_addr=ins_addr, src_stmt_idx=stmt_idx, func_edges=[ edge ])
                    jobs.append(ce)
    
                elif jumpkind == 'Ijk_Call' or jumpkind.startswith("Ijk_Sys"):
                    jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr,
                                                  target_addr, jumpkind, is_syscall=is_syscall
                                                  )
    
                else:
                    # TODO: Support more jumpkinds
                    l.debug("Unsupported jumpkind %s", jumpkind)
                    l.debug("Instruction address: %#x", ins_addr)
    
            return jobs
    
    0x7 时间消耗分析
    • 例子: cgibin (D-link DIR 815路由器固件)
    • 总时间消耗: 32s (基本上30s全是构建cfg的时间)
      • _job_queue_empty 大约占了总时间1/3的时间消耗(10s)
      • scan_block 大约占了总时间2/3的时间消耗(20s)
        • 其中最耗时的函数是_create_jobs函数,大约占用了16s
        • _create_jobs函数主要是处理直接调用和间接调用,间接调用几乎占了_create_jobs函数100%的时间消耗
    3. 解析间接调用
    0x1 process_unresolved_indirect_jumps
    • Ijk_Call 类型表示地址是由前一个基本块传递过来
    • Ijk_Boring类型表示是一个jump table类型的
        def _process_unresolved_indirect_jumps(self):
            """
            Resolve all unresolved indirect jumps found in previous scanning.
    
            Currently we support resolving the following types of indirect jumps:
            - Ijk_Call: indirect calls where the function address is passed in from a proceeding basic block
            - Ijk_Boring: jump tables
            - For an up-to-date list, see analyses/cfg/indirect_jump_resolvers
    
            :return:    A set of concrete indirect jump targets (ints).
            :rtype:     set
            """
    
            l.info("%d indirect jumps to resolve.", len(self._indirect_jumps_to_resolve))
    
            all_targets = set()
            for idx, jump in enumerate(self._indirect_jumps_to_resolve):  # type:int,IndirectJump
                if self._low_priority:
                    self._release_gil(idx, 20, 0.0001)
                all_targets |= self._process_one_indirect_jump(jump)
    
            self._indirect_jumps_to_resolve.clear()
    
            return all_targets
    
    0x2 process_one_indirect_jump

    ​ 使用angr.analyses.cfg.indirect_jump_resolvers.jumptable.JumpTableResolver求解器来求解间接调用。

    def _process_one_indirect_jump(self, jump):
        """
        Resolve a given indirect jump.
    
        :param IndirectJump jump:  The IndirectJump instance.
        :return:        A set of resolved indirect jump targets (ints).
        """
    
        resolved = False
        resolved_by = None
        targets = None
    
        block = self._lift(jump.addr, opt_level=1)
    
        for resolver in self.indirect_jump_resolvers:
            resolver.base_state = self._base_state
    
            if not resolver.filter(self, jump.addr, jump.func_addr, block, jump.jumpkind):
                continue
    
            resolved, targets = resolver.resolve(self, jump.addr, jump.func_addr, block, jump.jumpkind)
            if resolved:
                resolved_by = resolver
                break
    
        if resolved:
            self._indirect_jump_resolved(jump, jump.addr, resolved_by, targets)
        else:
            self._indirect_jump_unresolved(jump)
    
        return set() if targets is None else set(targets)
    
    0x3 indirect_jump_encountered

    ​ 当遇到间接跳转时调用。将尝试使用不受时间限制(快速)的间接跳转解析器来解决这个间接跳转。如果无法解决,将查看以前是否已经解决了这个间接跳转。

        def _indirect_jump_encountered(self, addr, cfg_node, irsb, func_addr, stmt_idx=DEFAULT_STATEMENT):
            """
            Called when we encounter an indirect jump. We will try to resolve this indirect jump using timeless (fast)
            indirect jump resolvers. If it cannot be resolved, we will see if this indirect jump has been resolved before.
    
            :param int addr:                Address of the block containing the indirect jump.
            :param cfg_node:                The CFGNode instance of the block that contains the indirect jump.
            :param pyvex.IRSB irsb:         The IRSB instance of the block that contains the indirect jump.
            :param int func_addr:           Address of the current function.
            :param int or str stmt_idx:     ID of the source statement.
    
            :return:    A 3-tuple of (whether it is resolved or not, all resolved targets, an IndirectJump object
                        if there is one or None otherwise)
            :rtype:     tuple
            """
    
            jumpkind = irsb.jumpkind
            l.debug('IRSB %#x has an indirect jump (%s) as its default exit.', addr, jumpkind)
    
            # try resolving it fast
            resolved, resolved_targets = self._resolve_indirect_jump_timelessly(addr, irsb, func_addr, jumpkind)
            if resolved:
                l.debug("Indirect jump at block %#x is resolved by a timeless indirect jump resolver. "
                        "%d targets found.", addr, len(resolved_targets))
                return True, resolved_targets, None
    
            l.debug("Indirect jump at block %#x cannot be resolved by a timeless indirect jump resolver.", addr)
    
            # Add it to our set. Will process it later if user allows.
            # Create an IndirectJump instance
            if addr not in self.indirect_jumps:
                if self.project.arch.branch_delay_slot:
                    ins_addr = cfg_node.instruction_addrs[-2]
                else:
                    ins_addr = cfg_node.instruction_addrs[-1]
                ij = IndirectJump(addr, ins_addr, func_addr, jumpkind, stmt_idx, resolved_targets=[])
                self.indirect_jumps[addr] = ij
                resolved = False
            else:
                ij = self.indirect_jumps[addr]  # type: IndirectJump
                resolved = len(ij.resolved_targets) > 0
    
            return resolved, ij.resolved_targets, ij
    
    0x4 resolve_indirect_jump_timelessly

    ​ 会调用angr.analyses.cfg.indirect_jump_resolvers.mips_elf_fast.MipsElfFastResolver来求解间接调用。

        def _resolve_indirect_jump_timelessly(self, addr, block, func_addr, jumpkind):
            """
            Checks if MIPS32 and calls MIPS32 check, otherwise false
    
            :param int addr: irsb address
            :param pyvex.IRSB block: irsb
            :param int func_addr: Function address
            :return: If it was resolved and targets alongside it
            :rtype: tuple
            """
    
            if block.statements is None:
                block = self.project.factory.block(block.addr, size=block.size).vex
    
            for res in self.timeless_indirect_jump_resolvers:
                if res.filter(self, addr, func_addr, block, jumpkind):
                    r, resolved_targets = res.resolve(self, addr, func_addr, block, jumpkind)
                    if r:
                        return True, resolved_targets
            return False, [ ]
    

    相关文章

      网友评论

        本文标题:Angr 中的函数识别

        本文链接:https://www.haomeiwen.com/subject/qrzacctx.html