#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import datetime import threading import xmlrpclib from SimpleXMLRPCServer import SimpleXMLRPCServer from xml.sax.saxutils import escape import idaapi import idautils import idc # Wait for any processing to get done idaapi.autoWait() # On Windows with NTFS filesystem a filepath with ':' # is treated as NTFS ADS (Alternative Data Stream) # and so saving file with such name fails dt = datetime.datetime.now().isoformat().replace(':', '-') # Save the database so nothing gets lost. if idaapi.IDA_SDK_VERSION >= 700: idaapi.save_database(idc.GetIdbPath() + '.' + dt) else: idc.SaveBase(idc.GetIdbPath() + '.' + dt) DEBUG_MARSHALLING = False def create_marshaller(use_format=None, just_to_str=False): assert use_format or just_to_str, 'Either pass format to use or make it converting the value to str.' def wrapper(_marshaller, value, appender): if use_format: marshalled = use_format % value elif just_to_str: marshalled = '%s' % escape(str(value)) if DEBUG_MARSHALLING: print("Marshalled: '%s'" % marshalled) appender(marshalled) return wrapper xmlrpclib.Marshaller.dispatch[type(0L)] = create_marshaller("%d") xmlrpclib.Marshaller.dispatch[type(0)] = create_marshaller("%d") xmlrpclib.Marshaller.dispatch[idaapi.cfuncptr_t] = create_marshaller(just_to_str=True) host = '127.0.0.1' port = 31337 orig_LineA = idc.LineA def LineA(*a, **kw): v = orig_LineA(*a, **kw) if v and v.startswith('\x01\x04; '): v = v[4:] return v idc.LineA = LineA mutex = threading.Condition() def wrap(f): def wrapper(*a, **kw): rv = [] error = [] def work(): try: result = f(*a, **kw) rv.append(result) except Exception as e: error.append(e) with mutex: flags = idaapi.MFF_WRITE if f == idc.SetColor: flags |= idaapi.MFF_NOWAIT rv.append(None) idaapi.execute_sync(work, flags) if error: msg = 'Failed on calling {}.{} with args: {}, kwargs: {}\nException: {}' \ .format(f.__module__, f.__name__, a, kw, str(error[0])) print('[!!!] ERROR:', msg) raise error[0] return rv[0] return wrapper def register_module(module): for name, function in module.__dict__.items(): if hasattr(function, '__call__'): server.register_function(wrap(function), name) def decompile(addr): """ Function that overwrites `idaapi.decompile` for xmlrpc so that instead of throwing an exception on `idaapi.DecompilationFailure` it just returns `None`. (so that we don't have to parse xmlrpc Fault's exception string on pwndbg side as it differs between IDA versions). """ try: return idaapi.decompile(addr) except idaapi.DecompilationFailure: return None def get_decompile_coord_by_ea(cfunc, addr): if idaapi.IDA_SDK_VERSION >= 720: item = cfunc.body.find_closest_addr(addr) y_holder = idaapi.int_pointer() if not cfunc.find_item_coords(item, None, y_holder): return cfunc y = y_holder.value() else: lnmap = {} for i, line in enumerate(cfunc.pseudocode): phead = idaapi.ctree_item_t() pitem = idaapi.ctree_item_t() ptail = idaapi.ctree_item_t() ret = cfunc.get_line_item(line.line, 0, True, phead, pitem, ptail) if ret and pitem.it: lnmap[pitem.it.ea] = i y = -1 closest_ea = BADADDR for ea,line in lnmap.items(): if closest_ea == BADADDR or abs(closest_ea - addr) > abs(ea - addr): closest_ea = ea y = lnmap[ea] return y def decompile_context(addr, context_lines): cfunc = decompile(addr) if cfunc is None: return None y = get_decompile_coord_by_ea(cfunc, addr) lines = cfunc.get_pseudocode() retlines = [] for lnnum in range(max(0, y - context_lines), min(len(lines), y + context_lines)): retlines.append(idaapi.tag_remove(lines[lnnum].line)) if lnnum == y: retlines[-1] = '>' + retlines[-1][1:] return '\n'.join(retlines) def versions(): """Returns IDA & Python versions""" import sys return { 'python': sys.version, 'ida': idaapi.get_kernel_version(), 'hexrays': idaapi.get_hexrays_version() if idaapi.init_hexrays_plugin() else None } server = SimpleXMLRPCServer((host, port), logRequests=True, allow_none=True) register_module(idc) register_module(idautils) register_module(idaapi) server.register_function(lambda a: eval(a, globals(), locals()), 'eval') server.register_function(wrap(decompile)) # overwrites idaapi/ida_hexrays.decompile server.register_function(wrap(decompile_context), 'decompile_context') # support context decompile server.register_function(versions) server.register_introspection_functions() print('IDA Pro xmlrpc hosted on http://%s:%s' % (host, port)) print('Call `shutdown()` to shutdown the IDA Pro xmlrpc server.') thread = threading.Thread(target=server.serve_forever) thread.daemon = True thread.start() def shutdown(): global server global thread server.shutdown() server.server_close() del server del thread