DDG¶
数据依赖图分析
标题没想好¶
ddg.graph 的节点是CodeLocation object
""" Constructor. :param int block_addr: 块地址 :param int stmt_idx: Statement ID. None for SimProcedures :param class sim_procedure: 对应的 SimProcedure class. :param int ins_addr: 指令地址. Optional. :param kwargs: 其他参数, will be stored, but not used in __eq__ or __hash__. """ <0x40137b id=0x40137b[3]> <$ins_addr id=$block_addr[$stmt_idx]>
class DDG(Analysis)¶
This is a fast data dependence graph directly generated from our CFG analysis result. The only reason for its existence is the speed. There is zero guarantee for being sound or accurate. You are supposed to use it only when you want to track the simplest data dependence, and you do not care about soundness or accuracy. For a better data dependence graph, please consider performing a better static analysis first (like Value-set Analysis), and then construct a dependence graph on top of the analysis result (for example, the VFG in angr). Also note that since we are using states from CFG, any improvement in analysis performed on CFG (like a points-to analysis) will directly benefit the DDG.
类DDG__init__¶
__init__(self, cfg, start=None, call_depth=None, block_addrs=None)
相关参数
cfg: Control flow graph. Please make sure each node has an associated `state` with it. You may want to generate your CFG with `keep_state=True`.(cfg with state) start: An address, Specifies where we start the generation of this data dependence graph.(分析ddg的起始地址) call_depth: None or integers. A non-negative integer specifies how deep we would like to track in the call tree. None disables call_depth limit.(调用深度限制) iterable or None block_addrs: A collection of block addresses that the DDG analysis should be performed on.(是什么再说)
重要变量
# 输入的参数相关 self._cfg = cfg self._start = self.project.entry if start is None else start self._call_depth = call_depth self._block_addrs = block_addrs # 结果存储 self._stmt_graph = networkx.DiGraph() self._data_graph = networkx.DiGraph() self._simplified_data_graph = None self._ast_graph = networkx.DiGraph() # A mapping of ProgramVariable to ASTs self._symbolic_mem_ops = set() # 每个函数的数据依赖图 self._function_data_dependencies = None self.view = DDGView(self._cfg, self, simplified=False) self.simple_view = DDGView(self._cfg, self, simplified=True) # 其他
Begin construction!
self._construct()
构造_construct(self)¶
""" Construct the data dependence graph. We track the following types of dependence: - (Intra-IRSB) temporary variable dependencies - 寄存器依赖 - 内存依赖, 尽管功能受限 追踪一下几种内存依赖: - (Intra-functional) Stack read/write. Trace changes of stack pointers inside a function, and the dereferences of stack pointers. - (Inter-functional) Stack read/write. - (Global) Static memory positions. Keep a map of all accessible memory positions to their source statements per function. After that, we traverse the CFG and link each pair of reads/writes together in the order of control-flow. We do not track the following types of memory access - Symbolic memory access Well, they cannot be tracked under fastpath mode (which is the mode we are generating the CTF) anyways. """
一些初始化操作
# 初始化worklist,这一步将要分析的CFG node 加入worklist_set,将DDGJob 加入worklist if self._start is None: # initial nodes are those nodes in CFG that has no in-degrees for n in self._cfg.graph.nodes(): if self._cfg.graph.in_degree(n) == 0: # Put it into the worklist job = DDGJob(n, 0) self._worklist_append(job, worklist, worklist_set) else: for n in self._cfg.get_all_nodes(self._start): job = DDGJob(n, 0) self._worklist_append(job, worklist, worklist_set) """ 增加一个CFGNode 和它的后继Node到work-list,这里同样要满足call-depth的限制 :param node_wrapper: The NodeWrapper instance to insert.DDGJob :param worklist: The work-list, which is a list. :param worklist_set: A set of all CFGNodes that are inside the work-list, 为了加速查找. It will be updated as well. :returns: A set of newly-inserted CFGNodes (not NodeWrapper instances). # # worklist.append(node_wrapper) DDGJob的实例 worklist_set.add(node_wrapper.cfg_node) 添加的是cfg node """
开始循环处理 分析的对象是每个CFGJob,CFGJob是node和call deepth的封装,操作的是node的final state,call deepth用作限制分析的深度
live_defs_per_node = {} while worklist: # pop 一个节点 ddg_job = worklist[0] l.debug("Processing %s.", ddg_job) node, call_depth = ddg_job.cfg_node, ddg_job.call_depth worklist = worklist[ 1 : ] worklist_set.remove(node) # 抓取所有的 final states. 通常超过一个 (对每一个 successor 都有一个state), 并处理它们 # process all of them final_states = node.final_states #为每个节点创建livedefinition对象实例,初步判定表示生命周期的数据定义 if node in live_defs_per_node: live_defs = live_defs_per_node[node] else: live_defs = LiveDefinitions() live_defs_per_node[node] = live_defs successing_nodes = list(self._cfg.graph.successors(node)) # 尝试把每个final state分配给对应的后继node,反之亦然 match_suc = defaultdict(bool) match_state = defaultdict(set) for suc in successing_nodes: matched = False for state in final_states: try: if state.solver.eval(state.ip) == suc.addr: # 判断这个state的跳转地址是否是后继node的地址 match_suc[suc.addr] = True match_state[state].add(suc) matched = True except (SimUnsatError, SimSolverModeError, ZeroDivisionError): # ignore matched = matched if not matched: break # 查看是否所有的final state都找到了它的后继node,以及后继node都有final state matches = len(match_suc) == len(successing_nodes) and len(match_state) == len(final_states) for state in final_states: if not matches and state.history.jumpkind == 'Ijk_FakeRet' and len(final_states) > 1: # Skip fakerets if there are other control flow transitions available continue new_call_depth = call_depth if state.history.jumpkind == 'Ijk_Call': new_call_depth += 1 elif state.history.jumpkind == 'Ijk_Ret': new_call_depth -= 1 if self._call_depth is not None and call_depth > self._call_depth: l.debug('Do not trace into %s due to the call depth limit', state.ip) continue new_defs = self._track(state, live_defs, node.irsb.statements if node.irsb is not None else None) #corresponding_successors = [n for n in successing_nodes if # not state.ip.symbolic and n.addr == state.solver.eval(state.ip)] #if not corresponding_successors: # continue changed = False # if every successor can be matched with one or more final states (by IP address), # only take over the LiveDefinition of matching states if matches: add_state_to_sucs = match_state[state] else: add_state_to_sucs = successing_nodes for successing_node in add_state_to_sucs: if (state.history.jumpkind == 'Ijk_Call' or state.history.jumpkind.startswith('Ijk_Sys')) and \ (state.ip.symbolic or successing_node.addr != state.solver.eval(state.ip)): suc_new_defs = self._filter_defs_at_call_sites(new_defs) else: suc_new_defs = new_defs if successing_node in live_defs_per_node: defs_for_next_node = live_defs_per_node[successing_node] else: defs_for_next_node = LiveDefinitions() live_defs_per_node[successing_node] = defs_for_next_node for var, code_loc_set in suc_new_defs.items(): # l.debug("Adding %d new definitions for variable %s.", len(code_loc_set), var) changed |= defs_for_next_node.add_defs(var, code_loc_set) if changed: if (self._call_depth is None) or \ (self._call_depth is not None and 0 <= new_call_depth <= self._call_depth): # Put all reachable successors back to our work-list again for successor in self._cfg.get_all_successors(node): nw = DDGJob(successor, new_call_depth) self._worklist_append(nw, worklist, worklist_set)
Variable <|Reg 16, 8B>
class SimRegisterVariable(SimVariable): __slots__ = ['reg', 'size', '_hash'] def __init__(self, reg_offset, size, ident=None, name=None, region=None, category=None): SimVariable.__init__(self, ident=ident, name=name, region=region, category=category) self.reg = reg_offset self.size = size self._hash = None def __repr__(self): ident_str = "[%s]" % self.ident if self.ident else "" region_str = hex(self.region) if isinstance(self.region, int) else self.region s = "<%s%s|Reg %s, %sB>" % (region_str, ident_str, self.reg, self.size) return s def __hash__(self): if self._hash is None: self._hash = hash(('reg', self.region, self.reg, self.size, self.ident)) return self._hash def __eq__(self, other): if isinstance(other, SimRegisterVariable): return self.ident == other.ident and \ self.reg == other.reg and \ self.size == other.size and \ self.region == other.region return False class SimVariable: __slots__ = ['ident', 'name', 'region', 'category'] def __init__(self, ident=None, name=None, region=None, category=None): """ :param ident: A unique identifier provided by user or the program. Usually a string. :param str name: Name of this variable. """ self.ident = ident self.name = name self.region = region if region is not None else "" self.category = category