标题: Angr符号执行练习--对付OLLVM Control Flow Flattening/控制流平坦化 创建: 2025-07-31 12:22 更新: 2025-08-08 11:06 链接: https://scz.617.cn/unix/202507311222.txt -------------------------------------------------------------------------- 目录: ☆ 背景介绍 ☆ hello.c ☆ hello_fla ☆ hello_fla_patch.py ☆ IDAPython插件D810 1) 安装D810插件 2) 使用D810插件 3) 借助AI理解D810框架结构 ☆ 后记 -------------------------------------------------------------------------- ☆ 背景介绍 参看 -------------------------------------------------------------------------- Control Flow Flattening (CFF) https://github.com/obfuscator-llvm/obfuscator/wiki/Control-Flow-Flattening Deobfuscation: recovering an OLLVM-protected program - Francis Gabriel [2014-12-04] https://blog.quarkslab.com/deobfuscation-recovering-an-ollvm-protected-program.html -------------------------------------------------------------------------- 控制流平坦化将正常控制流转换成状态变量驱动,夹杂状态变量混淆,使得IDA F5结 果非人类可读,但不影响原有代码逻辑,顶多有些性能损耗。CFF目的是对抗静态分 析。 本文以学习angr进阶用法为目的,借CFF反混淆为靶标。 $ pip3 show angr | grep Version Version: 9.2.125.dev0 ☆ hello.c -------------------------------------------------------------------------- #include #include static unsigned int foo ( unsigned int n ) { unsigned int mod = n % 4; unsigned int ret = 0; if ( mod == 0 ) { ret = ( n | 0xbaaad0bf ) * ( 2 ^ n ); } else if ( mod == 1 ) { ret = ( n & 0xbaaad0bf ) * ( 3 + n ); } else if ( mod == 2 ) { ret = ( n ^ 0xbaaad0bf ) * ( 4 | n ); } else { ret = ( n + 0xbaaad0bf ) * ( 5 & n ); } return ret; } int main ( int argc, char * argv[] ) { unsigned int n, ret; if ( argc < 2 ) { fprintf( stderr, "Usage: %s \n", argv[0] ); return -1; } n = (unsigned int)strtoul( argv[1], NULL, 0 ); ret = foo( n ); fprintf( stdout, "n=%#x ret=%#x\n", n, ret ); return 0; } -------------------------------------------------------------------------- clang -pipe -O0 -s -mllvm -passes=fla -o hello_fla hello.c 用某版OLLVM启用fla编译,得到hello_fla。 完整测试用例打包 https://scz.617.cn/unix/202507311222.txt https://scz.617.cn/unix/202507311222.7z ☆ hello_fla $ file -b hello_fla ELF 64-bit LSB executable, x86-64, ..., stripped IDA64反汇编hello_fla -------------------------------------------------------------------------- __int64 __fastcall main(int a1, char **a2, char **a3) { int v3; int v5; unsigned int v6; unsigned int v7; unsigned int v8; v8 = 0; v5 = 0xF2AB2D56; while ( 1 ) { while ( v5 == 0xA6509F46 ) { fprintf(stderr, "Usage: %s \n", *a2); v8 = -1; v5 = 0xE926118E; } if ( v5 == 0xE926118E ) break; if ( v5 == 0xF2AB2D56 ) { v3 = 0x64B7B86A; if ( a1 < 2 ) v3 = 0xA6509F46; v5 = v3; } else { v7 = strtoul(a2[1], 0LL, 0); v6 = sub_401270(v7); fprintf(stdout, "n=%#x ret=%#x\n", v7, v6); v8 = 0; v5 = 0xE926118E; } } return v8; } __int64 __fastcall sub_401270(int a1) { int v1; int v2; int v3; int v5; unsigned int v6; int v7; v7 = a1 & 3; v6 = 0; v5 = 0x1D861884; while ( v5 != 0x95AD57E0 ) { switch ( v5 ) { case 0xBBF4F2F8: v6 = (a1 + 3) * (a1 & 0xBAAAD0BF); v5 = 0xCE9DE31; break; case 0xC915711E: v3 = 0x54BE0661; if ( v7 == 2 ) v3 = 0xD6927C4E; v5 = v3; break; case 0xD6927C4E: v6 = (a1 | 4) * (a1 ^ 0xBAAAD0BF); v5 = 0x5B1C3258; break; case 0xCE9DE31: v5 = 0x95AD57E0; break; case 0x134D92DC: v6 = (a1 ^ 2) * (a1 | 0xBAAAD0BF); v5 = 0x95AD57E0; break; case 0x1D861884: v1 = 0x1F74CBC8; if ( (a1 & 3) == 0 ) v1 = 0x134D92DC; v5 = v1; break; case 0x1F74CBC8: v2 = 0xC915711E; if ( v7 == 1 ) v2 = 0xBBF4F2F8; v5 = v2; break; case 0x54BE0661: v6 = (a1 & 5) * (a1 - 0x45552F41); v5 = 0x5B1C3258; break; default: v5 = 0xCE9DE31; break; } } return v6; } -------------------------------------------------------------------------- F5的伪代码没必要深究,看个大概即可。 ☆ hello_fla_patch.py 这是对付hello_fla的完整代码,演示性质,非通用实现。 hello_fla_patch.py实际源自 https://github.com/cq674350529/deflat/blob/master/flat_control_flow/deflat.py am_graph模块实际源自 https://github.com/angr/angr-management/blob/master/angrmanagement/utils/graph.py -------------------------------------------------------------------------- #!/usr/bin/env python # -*- encoding: utf-8 -*- import sys, collections import angr, claripy, pyvex import am_graph def get_func_from_addr ( proj, addr ) : try : return proj.kb.functions.get_by_addr( addr ) except KeyError : return proj.kb.functions.floor_func( addr ) # # threshold是所有"有效块"中最小字节长度 # def get_relevant_nop_nodes ( supergraph, pre_dispatcher_node, prologue_node, retn_node, threshold ) : # # relevant_nodes = list( supergraph.predecessors( pre_dispatcher_node ) ) # relevant_nodes = [] nop_nodes = [] for node in supergraph.nodes() : if node.addr in ( prologue_node.addr, retn_node.addr, pre_dispatcher_node.addr ) : continue # # 靠threshold快速过滤、排除"非有效块" # if supergraph.has_edge( node, pre_dispatcher_node ) and node.size > threshold : relevant_nodes.append( node ) else : nop_nodes.append( node ) return relevant_nodes, nop_nodes # # 通过符号执行寻找下一跳 # # 若CFF的状态转换变量在某个case中不是简单赋值,而是与当前case值进行计算所 # 得,下面的符号执行方案可能有问题,涉及如何初始化的问题。 # def symbolic_execution ( proj, keep_blocks, start_addr, hook_addrs, set_value=None ) : def retn_procedure ( state ) : # # ip = state.solver.eval( state.regs.ip ) # proj.unhook( ip ) # proj.unhook( state.addr ) return def statement_inspect ( state ) : # # state.scratch.irsb是正在处理的IR SuperBlock (IRSB)的VEX IR表示。 # 一个IRSB包含一系列IR语句。 # # 当'statement'类型的检查点触发时,在state.inspect.statement中存储 # 当前正在处理的IR语句的索引。 # # state.scratch.irsb.statements[]是个数组 # state.inspect.statement是int,是个索引 # # 每个IR语句可能包含一个或多个表达式(expressions) # expressions = list( state.scratch.irsb.statements[state.inspect.statement].expressions ) # # if...then...else # if len( expressions ) != 0 and isinstance( expressions[0], pyvex.expr.ITE ) : # # state.scratch.temps[]用于存储VEX IR临时变量值 # # ITE表达式的cond属性代表条件表达式本身。下面这个值决定ITE表达 # 式走then分支还是else分支。 # state.scratch.temps[expressions[0].cond.tmp] = set_value state.inspect._breakpoints['statement'] = [] if hook_addrs : for addr in hook_addrs : # # 假设call指令占5字节 # proj.hook( addr, retn_procedure, length=5 ) init_state = proj.factory.blank_state( addr = start_addr, add_options = { angr.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY, angr.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS, angr.options.BYPASS_UNSUPPORTED_SYSCALL, }, remove_options = { angr.options.LAZY_SOLVES, } ) if set_value is not None : init_state.inspect.b( 'statement', when=angr.BP_BEFORE, action=statement_inspect ) sm = proj.factory.simulation_manager( init_state ) sm.step() while len( sm.active ) > 0 : for state in sm.active : if state.addr in keep_blocks : return state.addr sm.step() return None # # 恢复控制流 # def get_flow ( proj, prologue_node, relevant_nodes, retn_node ) : # # 本例实测下来,target不含序言块也可以,但稳妥起见,还是含序言块 # symbolic_execution_target \ = [prologue_node] symbolic_execution_target.extend( relevant_nodes ) keep_blocks = [node.addr for node in relevant_nodes] # # keep_blocks包含返回块,不包含主分发器、预处理器 # keep_blocks.extend( [retn_node.addr,] ) print( f'keep_blocks[{len(keep_blocks)}]:' ) for i, addr in enumerate( keep_blocks ) : print( f'[{i}] {addr:#x}' ) # # 从list转成set,提高后面的检查效率,实测并不明显 # keep_blocks = set( keep_blocks ) flow = collections.defaultdict( list ) ins_dict = {} for node in symbolic_execution_target : block = proj.factory.block( node.addr, size=node.size ) has_branch = False hook_addrs = set() for ins in block.capstone.insns : if ins.mnemonic.startswith( 'cmov' ) : # # only record the first one # if node not in ins_dict : ins_dict[node] = ins # # 发现comv*系列 # has_branch = True elif ins.mnemonic.startswith( 'call' ) : hook_addrs.add( ins.address ) if has_branch : next_addr = symbolic_execution( proj, keep_blocks, node.addr, hook_addrs, claripy.BVV( 1, 1 ) ) if next_addr is not None : flow[node].append( next_addr ) next_addr = symbolic_execution( proj, keep_blocks, node.addr, hook_addrs, claripy.BVV( 0, 1 ) ) if next_addr is not None : flow[node].append( next_addr ) else : next_addr = symbolic_execution( proj, keep_blocks, node.addr, hook_addrs ) if next_addr is not None : flow[node].append( next_addr ) return ( flow, ins_dict, ) OPCODES = { 'a' : b'\x87', 'ae' : b'\x83', 'b' : b'\x82', 'be' : b'\x86', 'c' : b'\x82', 'e' : b'\x84', 'z' : b'\x84', 'g' : b'\x8f', 'ge' : b'\x8d', 'l' : b'\x8c', 'le' : b'\x8e', 'na' : b'\x86', 'nae' : b'\x82', 'nb' : b'\x83', 'nbe' : b'\x87', 'nc' : b'\x83', 'ne' : b'\x85', 'ng' : b'\x8e', 'nge' : b'\x8c', 'nl' : b'\x8d', 'nle' : b'\x8f', 'no' : b'\x81', 'np' : b'\x8b', 'ns' : b'\x89', 'nz' : b'\x85', 'o' : b'\x80', 'p' : b'\x8a', 'pe' : b'\x8a', 'po' : b'\x8b', 's' : b'\x88', 'jmp' : b'\xe9', 'j' : b'\x0f', 'nop' : b'\x90', } def fill_nop ( proj, buf, addr, size ) : off = proj.loader.main_object.addr_to_offset( addr ) buf[off:off+size] \ = OPCODES['nop'] * size def get_j_ins ( f_addr, t_addr, j_type ) : if 'jmp' == j_type : j_opcode = OPCODES['jmp'] j_size = 5 else : j_opcode = OPCODES['j'] + OPCODES[j_type] j_size = 6 j_off = t_addr - f_addr - j_size # # struct.pack( ' %#x' % ( ins.address, children[0] ) ) assert ins.size >= len( j_ins ) # # 此处NOP化并非必要,保守起见,建议NOP化 # fill_nop( proj, buf, ins.address, ins.size ) patch_ins( proj, buf, ins.address, j_ins ) else : ins = ins_dict[parent] # # patch instructions starting from cmovx to the end of block # fill_nop( proj, buf, ins.address, parent.addr + parent.size - ins.address ) # # patch the cmovx to jx # j_ins = get_j_ins( ins.address, children[0], ins.mnemonic[len('cmov'):] ) # print( '%#x => %#x, %#x' % ( ins.address, children[0], children[1] ) ) # # 确保"jx+jmp"不会越过block边界 # assert 6 + 5 <= parent.addr + parent.size - ins.address patch_ins( proj, buf, ins.address, j_ins ) # # patch the next instruction to jmp instrcution # j_ins = get_j_ins( ins.address+6, children[1], 'jmp' ) patch_ins( proj, buf, ins.address+6, j_ins ) def dosth ( proj, buf, addr ) : print( f'func {addr:#x}' ) func = get_func_from_addr( proj, addr ) # # A super transition graph is a graph that looks like IDA CFG, where # calls to returning functions do not terminate basic blocks. # supergraph = am_graph.to_supergraph( func.transition_graph ) # # get prologue_node and retn_node # prologue_node = None retn_node = None for node in supergraph.nodes() : if 0 == supergraph.in_degree( node ) : prologue_node = node if 0 == supergraph.out_degree( node ) and len( node.out_branches ) == 0 : retn_node = node assert prologue_node is not None assert retn_node is not None print( 'prologue_node: %#x' % prologue_node.addr ) print( 'retn_node: %#x' % retn_node.addr ) # # 序言的后继为主分发器 # main_dispatcher_node \ = list( supergraph.successors( prologue_node ) )[0] print( 'main_dispatcher_node: %#x' % main_dispatcher_node.addr ) pre_dispatcher_node \ = None # # 后继为主分发器的非序言块为预处理器 # for node in supergraph.predecessors( main_dispatcher_node ) : if node.addr != prologue_node.addr : pre_dispatcher_node = node break assert pre_dispatcher_node is not None print( 'pre_dispatcher_node: %#x' % pre_dispatcher_node.addr ) relevant_nodes, nop_nodes \ = get_relevant_nop_nodes( supergraph, pre_dispatcher_node, prologue_node, retn_node, 12 ) flow, ins_dict = get_flow( proj, prologue_node, relevant_nodes, retn_node ) print( f'flow[{len(flow)}]:' ) for i, ( k, v ) in enumerate( flow.items() ) : print( '[%d] %#x ->' % ( i, k.addr ), [hex(child) for child in v] ) # # 修改buf # patch_buf( proj, buf, nop_nodes, flow, ins_dict ) print( '' ) def main ( argv ) : base_addr = 0x400000 # # proj.arch.name == 'AMD64' # proj = angr.Project( argv[1], load_options = { 'auto_load_libs' : False, 'main_opts' : { 'base_addr' : base_addr } } ) cfg = proj.analyses.CFG( force_smart_scan = False, force_complete_scan = True, normalize = True, resolve_indirect_jumps \ = True, fail_fast = True ) with open( argv[1], 'rb' ) as f : buf = bytearray( f.read() ) origsize = len( buf ) # # 对应若干需要Patch的函数 # addrlist = ( 0x401140, 0x401270, ) for addr in addrlist : dosth( proj, buf, addr ) # # 防止误增加buf长度 # assert len( buf ) == origsize with open( argv[2], 'wb' ) as f : f.write( buf ) if "__main__" == __name__ : main( sys.argv ) -------------------------------------------------------------------------- $ python3 hello_fla_patch.py hello_fla hello_fla_new func 0x401140 prologue_node: 0x401140 retn_node: 0x40125e main_dispatcher_node: 0x401169 pre_dispatcher_node: 0x401267 keep_blocks[4]: [0] 0x4011d8 [1] 0x4011bd [2] 0x40120b [3] 0x40125e flow[4]: [0] 0x401140 -> ['0x4011bd'] [1] 0x4011d8 -> ['0x40125e'] [2] 0x4011bd -> ['0x4011d8', '0x40120b'] [3] 0x40120b -> ['0x40125e'] func 0x401270 prologue_node: 0x401270 retn_node: 0x401445 main_dispatcher_node: 0x40129a pre_dispatcher_node: 0x40144a keep_blocks[8]: [0] 0x4013b4 [1] 0x4013d4 [2] 0x4013ed [3] 0x40137b [4] 0x401360 [5] 0x40139b [6] 0x40140d [7] 0x401445 flow[8]: [0] 0x401270 -> ['0x401360'] [1] 0x4013b4 -> ['0x401445'] [2] 0x4013d4 -> ['0x4013ed', '0x40140d'] [3] 0x4013ed -> ['0x401445'] [4] 0x40137b -> ['0x401445'] [5] 0x401360 -> ['0x40137b', '0x40139b'] [6] 0x40139b -> ['0x4013b4', '0x4013d4'] [7] 0x40140d -> ['0x401445'] IDA64反汇编hello_fla_new_*,F5查看main、sub_401270,已能看出hello.c所展示 的代码逻辑。 ☆ IDAPython插件D810 参看 -------------------------------------------------------------------------- D810: Creating an extensible deobfuscation plugin for IDA Pro https://eshard.com/posts/d810-deobfuscation-ida-pro D810: A journey into control flow unflattening - Boris Batteaux https://eshard.com/posts/D810-a-journey-into-control-flow-unflattening https://gitlab.com/eshard/d810 -------------------------------------------------------------------------- D810是IDAPython插件,利用Hex-Rays微码技术反控制流平坦化。 1) 安装D810插件 安装过程就是复制D810.py及d810子目录到 X:\Green\IDA\plugins\ 需在IDAPython环境中安装z3-solver模块 2) 使用D810插件 呼出D810的GUI -------------------------------------------------------------------------- Edit->Plugins->D-810 (Ctrl+Shift+D) Current file loaded X:\Green\IDA\plugins\d810\conf\default_unflattening_ollvm.json Start 红色的"Not loaded"变成绿色的"Loaded" -------------------------------------------------------------------------- 对付hello_fla时,用default_unflattening_ollvm.json即可;一般要根据目标二进 制选相应的反CFF规则,不熟的就挨个试。选完规则,点击Start,启用规则,默认是 Stop状态。最后F5反编译目标函数。 使用D810插件默认会生成日志,保存在 X:\Green\IDA\plugins\d810_logs\ 假设增加自定义规则,需编辑两个文件 X:\Green\IDA\plugins\d810\conf\ options.json // 配置中增加"Test_Analysis.json"的条目 Test_Analysis.json // 自定义规则 3) 借助AI理解D810框架结构 参看 《让AI协助阅读代码》 https://scz.617.cn/misc/202503101024.txt 访问 https://gitseek.dev/zh 向它提交D810项目源码所在网址 https://gitlab.com/eshard/d810 得到合并后的D810项目源码 eshard-d810-prompt.txt 将上述文件上传至aistudio。依次提问: -------------------------------------------------------------------------- 这是D810的项目,我需要学习该项目。请用中文交流,解释该项目框架结构。 D810总是自动生成d810_logs,这个行为怎么控制? D810是不是有个什么配置,可以帮你分析样本中用到哪些pattern? 详细解释unflattener.py -------------------------------------------------------------------------- 我对D810作者姓名不敏感,据bluerust说,“看了一眼D810,大为震撼,赶紧搜了搜 作者,果然是巨佬”。 ☆ 后记 控制流平坦化与反控制流平坦化是场猫鼠游戏,本文只是入门级原理演示。现实世界 中有许多魔改过的CFF实现,源自中国大陆地区各大互联网厂商及黑灰产从业人员。 建议学习D810源码,了解Hex-Rays微码技术,针对魔改CFF实现,编写自定义规则对 抗之。