diff --git a/binja_script.py b/binja_script.py index 71ac2cd52..7e0066aff 100644 --- a/binja_script.py +++ b/binja_script.py @@ -22,7 +22,7 @@ xmlrpc.client.MININT = -(10**100) host = os.environ.get("PWNDBG_BINJA_SERVER_HOST", "127.0.0.1") -port = int(os.environ.get("PWNDBG_BINJA_SERVER_PORT", "31337")) +port = int(os.environ.get("PWNDBG_BINJA_SERVER_PORT", "43717")) logger = binaryninja.log.Logger(0, "pwndbg-integration") diff --git a/docs/configuration/config.md b/docs/configuration/config.md index e43374fd9..cf6318f3a 100644 --- a/docs/configuration/config.md +++ b/docs/configuration/config.md @@ -214,7 +214,7 @@ Binary Ninja XML-RPC server port. -**Default:** 31337 +**Default:** 43717 ---------- @@ -684,7 +684,7 @@ Ida xmlrpc server port. -**Default:** 31337 +**Default:** 43718 ---------- diff --git a/docs/functions/index.md b/docs/functions/index.md index d9334a846..ae00892fd 100644 --- a/docs/functions/index.md +++ b/docs/functions/index.md @@ -127,7 +127,7 @@ returned. ``` {.python .no-copy} -bn_eval(expr: gdb.Value) -> int | None +bn_eval(expr: gdb.Value) -> int ``` @@ -144,7 +144,7 @@ This function cannot see stack local variables. #### Example ``` pwndbg> set integration-provider binja -Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337 +Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717 Set which provider to use for integration features to 'binja'. pwndbg> p/x $bn_eval("10+20") $6 = 0x30 @@ -168,7 +168,7 @@ $11 = 1 ``` {.python .no-copy} -bn_sym(name_val: gdb.Value) -> int | None +bn_sym(name_val: gdb.Value) -> int ``` @@ -180,7 +180,7 @@ but not stack local variables, use `bn_var` for that. #### Example ``` pwndbg> set integration-provider binja -Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337 +Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717 Set which provider to use for integration features to 'binja'. pwndbg> p main No symbol "main" in current context. @@ -196,7 +196,7 @@ Breakpoint 1 at 0x555555555645 ``` {.python .no-copy} -bn_var(name_val: gdb.Value) -> int | None +bn_var(name_val: gdb.Value) -> int ``` @@ -208,7 +208,7 @@ use `bn_sym` for that. #### Example ``` pwndbg> set integration-provider binja -Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337 +Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717 Set which provider to use for integration features to 'binja'. pwndbg> p user_choice No symbol "user_choice" in current context. @@ -384,7 +384,7 @@ This functions doesn't see stack local variables. #### Example ``` pwndbg> set integration-provider ida -Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:31337 +Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:43718 Set which provider to use for integration features to 'ida'. pwndbg> p main No symbol "main" in current context. diff --git a/docs/tutorials/decompiler-integration/ida-integration.md b/docs/tutorials/decompiler-integration/ida-integration.md index 15263bbd4..979e0c5b3 100644 --- a/docs/tutorials/decompiler-integration/ida-integration.md +++ b/docs/tutorials/decompiler-integration/ida-integration.md @@ -17,7 +17,7 @@ save-ida Save the ida database. pwndbg> config ida attachp-resolution-method how to determine the process to attach when multiple candidates exists 'ask' ida-rpc-host ida xmlrpc server address '127.0.0.1' -ida-rpc-port ida xmlrpc server port 31337 +ida-rpc-port ida xmlrpc server port 43718 ida-timeout time to wait for ida xmlrpc in seconds 2 pwndbg> | help function | grep ida function ida -- Lookup a symbol's address by name from IDA. diff --git a/ida_script.py b/ida_script.py index ce5abaa47..65a097d21 100644 --- a/ida_script.py +++ b/ida_script.py @@ -50,7 +50,7 @@ xmlclient.Marshaller.dispatch[int] = create_marshaller("%d None: """ Synchronize Binary Ninja's cursor with GDB diff --git a/pwndbg/commands/binja_functions.py b/pwndbg/commands/binja_functions.py index 8d47f2a1a..03106072c 100644 --- a/pwndbg/commands/binja_functions.py +++ b/pwndbg/commands/binja_functions.py @@ -13,8 +13,7 @@ from pwndbg.color import message @pwndbg.gdblib.functions.GdbFunction() -@pwndbg.integration.binja.with_bn() -def bn_sym(name_val: gdb.Value) -> int | None: +def bn_sym(name_val: gdb.Value) -> int: """ Lookup a symbol's address by name from Binary Ninja. @@ -24,7 +23,7 @@ def bn_sym(name_val: gdb.Value) -> int | None: Example: ``` pwndbg> set integration-provider binja - Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337 + Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717 Set which provider to use for integration features to 'binja'. pwndbg> p main No symbol "main" in current context. @@ -34,16 +33,21 @@ def bn_sym(name_val: gdb.Value) -> int | None: Breakpoint 1 at 0x555555555645 ``` """ + # GDB convenience functions are not allowed to return None, so we cannot + # decorate with @withBinja(). + if not pwndbg.integration.binja.establish_connection(): + return 0 + name = name_val.string() addr: int | None = pwndbg.integration.binja._bn.get_symbol_addr(name) if addr is None: - return None + print(message.error("Not found.")) + return 0 return pwndbg.integration.binja.r2l(addr) @pwndbg.gdblib.functions.GdbFunction() -@pwndbg.integration.binja.with_bn() -def bn_var(name_val: gdb.Value) -> int | None: +def bn_var(name_val: gdb.Value) -> int: """ Lookup a stack variable's address by name from Binary Ninja. @@ -53,7 +57,7 @@ def bn_var(name_val: gdb.Value) -> int | None: Example: ``` pwndbg> set integration-provider binja - Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337 + Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717 Set which provider to use for integration features to 'binja'. pwndbg> p user_choice No symbol "user_choice" in current context. @@ -67,12 +71,18 @@ def bn_var(name_val: gdb.Value) -> int | None: Error while executing Python code. ``` """ + # GDB convenience functions are not allowed to return None, so we cannot + # decorate with @withBinja(). + if not pwndbg.integration.binja.establish_connection(): + return 0 + name = name_val.string() conf_and_offset: Tuple[int, int] | None = pwndbg.integration.binja._bn.get_var_offset_from_sp( pwndbg.integration.binja.l2r(pwndbg.aglib.regs.pc), name ) if conf_and_offset is None: - return None + print(message.error("Not found.")) + return 0 (conf, offset) = conf_and_offset if conf < 64: print(message.warn(f"Warning: Stack offset only has {conf / 255 * 100:.2f}% confidence")) @@ -80,8 +90,8 @@ def bn_var(name_val: gdb.Value) -> int | None: @pwndbg.gdblib.functions.GdbFunction() -@pwndbg.integration.binja.with_bn() -def bn_eval(expr: gdb.Value) -> int | None: +@pwndbg.integration.binja.enabledBinja() +def bn_eval(expr: gdb.Value) -> int: """ Parse and evaluate a Binary Ninja expression. @@ -96,7 +106,7 @@ def bn_eval(expr: gdb.Value) -> int | None: Example: ``` pwndbg> set integration-provider binja - Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337 + Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717 Set which provider to use for integration features to 'binja'. pwndbg> p/x $bn_eval("10+20") $6 = 0x30 @@ -114,6 +124,11 @@ def bn_eval(expr: gdb.Value) -> int | None: $11 = 1 ``` """ + # GDB convenience functions are not allowed to return None, so we cannot + # decorate with @withBinja(). + if not pwndbg.integration.binja.establish_connection(): + return 0 + magic_vars = {} for r in pwndbg.aglib.regs.current: v = pwndbg.aglib.regs[r] @@ -121,4 +136,7 @@ def bn_eval(expr: gdb.Value) -> int | None: magic_vars[r] = v magic_vars["piebase"] = pwndbg.aglib.proc.binary_base_addr ret: int | None = pwndbg.integration.binja._bn.parse_expr(expr.string(), magic_vars) + if ret is None: + print(message.error("Not found.")) + return 0 return ret diff --git a/pwndbg/commands/ida.py b/pwndbg/commands/ida.py index 5dc01fd12..6f5bf321c 100644 --- a/pwndbg/commands/ida.py +++ b/pwndbg/commands/ida.py @@ -23,7 +23,7 @@ from pwndbg.gdblib.functions import GdbFunction ) @pwndbg.commands.OnlyWhenRunning @pwndbg.dbg.event_handler(EventType.STOP) -@pwndbg.integration.ida.withIDA +@pwndbg.integration.ida.enabledIDA def j(*args) -> None: """ Synchronize IDA's cursor with GDB @@ -99,9 +99,6 @@ def down(n=1) -> None: @pwndbg.integration.ida.withIDA def save_ida() -> None: """Save the IDA database""" - if not pwndbg.integration.ida.available(): - return - path = pwndbg.integration.ida.GetIdbPath() # Need to handle emulated paths for Wine @@ -138,9 +135,6 @@ def save_ida() -> None: os.unlink(full_path) -save_ida() - - def _ida_local(name: str) -> int | None: if not pwndbg.aglib.proc.alive: return None @@ -185,7 +179,7 @@ def ida(name: gdb.Value) -> int: Example: ``` pwndbg> set integration-provider ida - Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:31337 + Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:43718 Set which provider to use for integration features to 'ida'. pwndbg> p main No symbol "main" in current context. @@ -195,6 +189,11 @@ def ida(name: gdb.Value) -> int: Breakpoint 2 at 0x555555555645 ``` """ + # GDB convenience functions are not allowed to return None, so we cannot + # decorate with @withIda(). + if not pwndbg.integration.ida.establish_connection(): + return 0 + name = name.string() # Lookup local variables first diff --git a/pwndbg/integration/__init__.py b/pwndbg/integration/__init__.py index 32c785550..096e04399 100644 --- a/pwndbg/integration/__init__.py +++ b/pwndbg/integration/__init__.py @@ -65,7 +65,16 @@ class IntegrationProvider: """ return None + def disable(self) -> None: + """ + Notify the provider that it should disable itself. + """ + return None + +# This value should only be the name of the provider if we have a valid connection +# to the provider. I.e. if we fail to connect to the provider, we should set this to +# "none". provider_name = pwndbg.config.add_param( "integration-provider", "none", @@ -130,28 +139,24 @@ class ConfigurableProvider(IntegrationProvider): def get_stack_var_name(self, addr: int) -> str | None: return self.inner.get_stack_var_name(addr) + def disable(self) -> None: + return self.inner.disable() + provider: IntegrationProvider = IntegrationProvider() -@pwndbg.config.trigger(provider_name) -def switch_providers(): +def set_provider(prov: IntegrationProvider) -> None: + """ + Call this from provider-specific code whenever you establish a connection. + """ + global provider + provider = ConfigurableProvider(prov) + + +def unset_provider() -> None: + """ + Call this from provider-specific code whenever a connection stops. + """ global provider - if not provider_name.value or provider_name.value == "none": - provider = IntegrationProvider() - elif provider_name.value == "binja": - # do not import at start of file to avoid circular import - import pwndbg.integration.binja - - provider = ConfigurableProvider(pwndbg.integration.binja.BinjaProvider()) - elif provider_name.value == "ida": - import pwndbg.integration.ida - - provider = ConfigurableProvider(pwndbg.integration.ida.IdaProvider()) - else: - print( - message.warn( - f"Invalid provider {provider_name.value!r} specified. Disabling integration." - ) - ) - provider_name.revert_default() + provider = IntegrationProvider() diff --git a/pwndbg/integration/binja.py b/pwndbg/integration/binja.py index be4d23bd0..c0328f069 100644 --- a/pwndbg/integration/binja.py +++ b/pwndbg/integration/binja.py @@ -25,6 +25,7 @@ import pygments.formatters import pygments.style import pygments.token from typing_extensions import ParamSpec +from typing_extensions import override import pwndbg import pwndbg.aglib.arch @@ -51,7 +52,7 @@ from pwndbg.lib.functions import Function bn_rpc_host = pwndbg.config.add_param( "bn-rpc-host", "127.0.0.1", "Binary Ninja XML-RPC server host" ) -bn_rpc_port = pwndbg.config.add_param("bn-rpc-port", 31337, "Binary Ninja XML-RPC server port") +bn_rpc_port = pwndbg.config.add_param("bn-rpc-port", 43717, "Binary Ninja XML-RPC server port") bn_timeout = pwndbg.config.add_param( "bn-timeout", 2, "time to wait for Binary Ninja XML-RPC, in seconds" ) @@ -81,18 +82,36 @@ K = TypeVar("K") @pwndbg.decorators.only_after_first_prompt() @pwndbg.config.trigger(bn_rpc_host, bn_rpc_port, pwndbg.integration.provider_name, bn_timeout) -def init_bn_rpc_client() -> None: - global _bn, _bn_last_exception, _bn_last_connection_check +def binja_config_changed(): + if pwndbg.integration.provider_name.value == "binja": + # We need to (re)connect the client, possibly with updated values. + try_init_bn_rpc_client() - if pwndbg.integration.provider_name.value != "binja": - return + +def ensure_disabled() -> None: + global _bn + _bn = None + pwndbg.integration.unset_provider() + pwndbg.integration.provider_name.value = "none" + + +def try_init_bn_rpc_client() -> bool: + """ + Try to connect to the Binary Ninja RPC client. + + If the connection succeeds, or we were already connected, + return True. Otherwise, False. + + An appropriate message will be also printed to the user. + """ + global _bn, _bn_last_exception, _bn_last_connection_check xmlrpc.client.MAXINT = 10**100 # type: ignore[misc] xmlrpc.client.MININT = -(10**100) # type: ignore[misc] now = time.time() if _bn is None and (now - _bn_last_connection_check) < int(bn_timeout) + 5: - return + return False addr = f"http://{bn_rpc_host}:{bn_rpc_port}" @@ -102,21 +121,45 @@ def init_bn_rpc_client() -> None: exception = None # (type, value, traceback) try: version: str = _bn.get_version() + pwndbg.integration.set_provider(BinjaProvider()) + print( message.success( f"Pwndbg successfully connected to Binary Ninja ({version}) xmlrpc: {addr}" ) ) - except TimeoutError: + + if pwndbg.integration.provider_name.value != "binja": + # We managed to successfully connect, and this happened because a binja + # command was invoked, rather than the user setting the integration-provider parameter. + # So, we want to set the provider name now. + # Note that binja_config_changed() is a trigger, and not a value listener, so it won't + # be called when we set the value here (which is good, we would have recursion otherwise). + pwndbg.integration.provider_name.value = "binja" + + assert ( + len(pwndbg.config.triggers[pwndbg.integration.provider_name.name]) == 2 + ), "Does this new function need to be called here?" + + return True + + except (TimeoutError, xmlrpc.client.ProtocolError): exception = sys.exc_info() - _bn = None except OSError as e: - if e.errno != errno.ECONNREFUSED: + if e.errno == errno.ECONNREFUSED: + print( + message.error("Connection refused. ") + + message.hint("Did you start the plugin in Binary Ninja?") + # TODO: remove this after some time passes + + message.notice( + "\nIn the latest version of Pwndbg, the binja_script.py file has been\n" + ) + + message.notice("updated, you may need to reinstall it.") + ) + else: exception = sys.exc_info() - _bn = None - except xmlrpc.client.ProtocolError: - exception = sys.exc_info() - _bn = None + + ensure_disabled() if exception: if ( @@ -153,44 +196,75 @@ def init_bn_rpc_client() -> None: _bn_last_exception = exception and exception[1] _bn_last_connection_check = now + return False -def with_bn(fallback: K = None) -> Callable[[Callable[P, T]], Callable[P, T | K]]: - global _bn +# We cannot catch the ConnectionRefusedError here, nor in @withBinja because there +# may be multiple nested decorated functions, and the bottom most one will swallow +# the exception up prevent it from bubbling to the top. Thus, we catch +# ConnectionRefusedError in CommandObj.__call__(). +def enabledBinja(fallback: K = None) -> Callable[[Callable[P, T]], Callable[P, T | K]]: + """ + If we have a connection to binary ninja, call the function. - def decorator(func: Callable[P, T]) -> Callable[P, T | K]: - global _bn + Otherwise, return fallback. Thus, all functions decorated with this, that do + not specify a fallback, must have "| None" in their return signature. + + This will not try to open a connection if it doesn't already exist. + No messages will be printed. + """ + def decorator(func: Callable[P, T]) -> Callable[P, T | K]: @functools.wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | K: - global _bn if _bn is None: - init_bn_rpc_client() - - try: - if _bn is not None: - return func(*args, **kwargs) - except ConnectionRefusedError: - print(message.error("[!] Binary Ninja connection refused")) - _bn = None + assert pwndbg.integration.provider_name.value != "binja" + return fallback - return fallback + return func(*args, **kwargs) return wrapper return decorator -@pwndbg.lib.cache.cache_until("stop") -def available() -> bool: - return can_connect() is not None +def establish_connection() -> bool: + """ + If we already had a connection, or succeed in creating a new one, return True. + Otherwise False. + """ + if _bn is not None: + return True + + print(message.notice("Trying to connect to Binary Ninja...")) + ok = try_init_bn_rpc_client() + if not ok: + print(message.error("Aborting.")) + return False -@with_bn() -def can_connect() -> bool: return True +def withBinja(func: Callable[P, T]) -> Callable[P, T | None]: + """ + Try to connect to Binary Ninja before running the decorated function. + + If we fail connecting, return None. Thus, all functions + decorated with this must have "| None" in their return signature. + + Use this for user-initiated stuff like pwndbg.commands.binja.bn_sync(). + """ + + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: + if establish_connection(): + return func(*args, **kwargs) + return None + + return wrapper + + def l2r(addr: int) -> int: result = (addr - pwndbg.aglib.proc.binary_base_addr + base()) & pwndbg.aglib.arch.ptrmask return result @@ -207,7 +281,7 @@ def base(): @pwndbg.dbg.event_handler(EventType.STOP) -@with_bn() +@enabledBinja() def auto_update_pc() -> None: if not pwndbg.aglib.proc.alive: return @@ -223,7 +297,7 @@ _managed_bps: Dict[int, StopPoint] = {} @pwndbg.dbg.event_handler(EventType.START) @pwndbg.dbg.event_handler(EventType.STOP) @pwndbg.dbg.event_handler(EventType.CONTINUE) -@with_bn() +@enabledBinja() def auto_update_bp() -> None: if not pwndbg.aglib.proc.alive: return @@ -241,12 +315,12 @@ def auto_update_bp() -> None: @pwndbg.dbg.event_handler(EventType.CONTINUE) @pwndbg.dbg.event_handler(EventType.EXIT) -@with_bn() +@enabledBinja() def auto_clear_pc() -> None: _bn.clear_pc_tag() -@with_bn() +@enabledBinja() def navigate_to(addr: int) -> None: _bn.navigate_to(l2r(addr)) @@ -388,7 +462,7 @@ style = theme.add_param( class BinjaProvider(pwndbg.integration.IntegrationProvider): @pwndbg.decorators.suppress_errors() - @with_bn() + @enabledBinja() @pwndbg.lib.cache.cache_until("stop") def get_symbol(self, addr: int) -> str | None: sym: str | None = _bn.get_symbol(l2r(addr)) @@ -411,7 +485,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider): return None @pwndbg.decorators.suppress_errors(fallback=()) - @with_bn(fallback=()) + @enabledBinja(fallback=()) def get_versions(self) -> Tuple[str, ...]: bn_version: str = _bn.get_version() py_version: str = _bn.get_py_version() @@ -421,19 +495,19 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider): ) @pwndbg.decorators.suppress_errors(fallback=True) - @with_bn(fallback=True) + @enabledBinja(fallback=True) @pwndbg.lib.cache.cache_until("stop") def is_in_function(self, addr: int) -> bool: return _bn.get_func_info(l2r(addr)) is not None @pwndbg.decorators.suppress_errors(fallback=[]) - @with_bn(fallback=[]) + @enabledBinja(fallback=[]) def get_comment_lines(self, addr: int) -> List[str]: comments: List[str] = _bn.get_comments(l2r(addr)) return comments @pwndbg.decorators.suppress_errors() - @with_bn() + @enabledBinja() def decompile(self, addr: int, lines: int) -> List[str] | None: decomp: List[Tuple[int, List[Tuple[str, str]]]] | None = _bn.decompile_func( l2r(addr), bn_il_level.value @@ -511,7 +585,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider): return ret @pwndbg.decorators.suppress_errors() - @with_bn() + @enabledBinja() def get_func_type(self, addr: int) -> Function | None: ty: Tuple[Tuple[str, int, str], List[Tuple[str, int, str]]] = _bn.get_func_type(l2r(addr)) if ty is None: @@ -520,7 +594,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider): return Function(type=ty[0][0], derefcnt=ty[0][1], name=ty[0][2], args=args) @pwndbg.decorators.suppress_errors() - @with_bn() + @enabledBinja() @pwndbg.lib.cache.cache_until("stop") def get_stack_var_name(self, addr: int) -> str | None: cur = pwndbg.dbg.selected_frame() @@ -559,3 +633,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider): return f"{var}{suffix}" else: return f"{func}:{var}{suffix}" + + @override + def disable(self) -> None: + ensure_disabled() diff --git a/pwndbg/integration/ida.py b/pwndbg/integration/ida.py index 6e767c4d8..1cae3cac6 100644 --- a/pwndbg/integration/ida.py +++ b/pwndbg/integration/ida.py @@ -21,8 +21,12 @@ from typing import Tuple from typing import TypeVar import gdb +from pygments import highlight +from pygments.formatters import Terminal256Formatter +from pygments.lexers import CppLexer from typing_extensions import Concatenate from typing_extensions import ParamSpec +from typing_extensions import override import pwndbg import pwndbg.aglib.arch @@ -39,7 +43,7 @@ from pwndbg.dbg import EventType from pwndbg.lib.functions import Function ida_rpc_host = pwndbg.config.add_param("ida-rpc-host", "127.0.0.1", "ida xmlrpc server address") -ida_rpc_port = pwndbg.config.add_param("ida-rpc-port", 31337, "ida xmlrpc server port") +ida_rpc_port = pwndbg.config.add_param("ida-rpc-port", 43718, "ida xmlrpc server port") ida_timeout = pwndbg.config.add_param("ida-timeout", 2, "time to wait for ida xmlrpc in seconds") @@ -57,18 +61,37 @@ T = TypeVar("T") @pwndbg.decorators.only_after_first_prompt() @pwndbg.config.trigger(ida_rpc_host, ida_rpc_port, pwndbg.integration.provider_name, ida_timeout) -def init_ida_rpc_client() -> None: - global _ida, _ida_last_exception, _ida_last_connection_check +def ida_config_changed() -> None: + if pwndbg.integration.provider_name.value == "ida": + # We need to (re)connect the client, possibly with updated values. + try_init_ida_rpc_client() + + +def ensure_disabled() -> None: + global _ida + _ida = None + pwndbg.integration.unset_provider() + pwndbg.integration.provider_name.value = "none" + + +def try_init_ida_rpc_client() -> bool: + """ + Try to connect to the IDA RPC client. + + If the connection succeeds, or we were already connected, + return True. Otherwise, False. + + An appropriate message will be also printed to the user. + """ - if pwndbg.integration.provider_name.value != "ida": - return + global _ida, _ida_last_exception, _ida_last_connection_check xmlrpc.client.MAXINT = 10**100 # type: ignore[misc] xmlrpc.client.MININT = -(10**100) # type: ignore[misc] now = time.time() if _ida is None and (now - _ida_last_connection_check) < int(ida_timeout) + 5: - return + return False addr = f"http://{ida_rpc_host}:{ida_rpc_port}" @@ -78,18 +101,37 @@ def init_ida_rpc_client() -> None: exception = None # (type, value, traceback) try: _ida.here() - print(message.success(f"Pwndbg successfully connected to Ida Pro xmlrpc: {addr}")) idc._update() - except TimeoutError: + pwndbg.integration.set_provider(IdaProvider()) + + print(message.success(f"Pwndbg successfully connected to Ida Pro xmlrpc: {addr}")) + + if pwndbg.integration.provider_name.value != "ida": + # We managed to successfully connect, and this happened because an Ida + # command was invoked, rather than the user setting the integration-provider parameter. + # So, we want to set the provider name now. + # Note that ida_config_changed() is a trigger, and not a value listener, so it won't + # be called when we set the value here (which is good, we would have recursion otherwise). + pwndbg.integration.provider_name.value = "ida" + + assert ( + len(pwndbg.config.triggers[pwndbg.integration.provider_name.name]) == 2 + ), "Does this new function need to be called here?" + + return True + + except (TimeoutError, xmlrpc.client.ProtocolError): exception = sys.exc_info() - _ida = None except OSError as e: - if e.errno != errno.ECONNREFUSED: + if e.errno == errno.ECONNREFUSED: + print( + message.error("Connection refused. ") + + message.hint("Did you start ./ida_script.py from Ida?") + ) + else: exception = sys.exc_info() - _ida = None - except xmlrpc.client.ProtocolError: - exception = sys.exc_info() - _ida = None + + ensure_disabled() if exception: if ( @@ -126,14 +168,66 @@ def init_ida_rpc_client() -> None: _ida_last_exception = exception and exception[1] _ida_last_connection_check = now + return False -def withIDA(func: Callable[P, T]) -> Callable[P, T | None]: +# We cannot catch the ConnectionRefusedError here, nor in @withIDA because there +# may be multiple nested decorated functions, and the bottom most one will swallow +# the exception up prevent it from bubbling to the top. Thus, we catch +# ConnectionRefusedError in CommandObj.__call__(). +def enabledIDA(func: Callable[P, T]) -> Callable[P, T | None]: + """ + If we have a connection to Ida, call the function. + + Otherwise, return None. Thus, all functions decorated with this must have + "| None" in their return signature. + + This will not try to open a connection if it doesn't already exist. + No messages will be printed. + """ + @functools.wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: if _ida is None: - init_ida_rpc_client() - if _ida is not None: + assert pwndbg.integration.provider_name.value != "ida" + return None + + return func(*args, **kwargs) + + return wrapper + + +def establish_connection() -> bool: + """ + If we already had a connection, or succeed in creating a new one, return True. + Otherwise False. + """ + if _ida is not None: + return True + + print(message.notice("Trying to connect to Ida...")) + ok = try_init_ida_rpc_client() + + if not ok: + print(message.error("Aborting.")) + return False + + return True + + +def withIDA(func: Callable[P, T]) -> Callable[P, T | None]: + """ + Try to connect to Ida before running the decorated function. + + If we fail connecting to Ida, return None. Thus, all functions + decorated with this must have "| None" in their return signature. + + Use this for user-initiated stuff like pwndbg.commands.ida.save_ida(). + """ + + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: + if establish_connection(): return func(*args, **kwargs) return None @@ -141,7 +235,7 @@ def withIDA(func: Callable[P, T]) -> Callable[P, T | None]: def withHexrays(func: Callable[P, T]) -> Callable[P, T | None]: - @withIDA + @enabledIDA @functools.wraps(func) def wrapper(*a: P.args, **kw: P.kwargs) -> T | None: if _ida is not None and _ida.init_hexrays_plugin(): @@ -167,18 +261,6 @@ def returns_address(function: Callable[P, int]) -> Callable[P, int]: return wrapper -@pwndbg.lib.cache.cache_until("stop") -def available() -> bool: - if pwndbg.integration.provider_name.value != "ida": - return False - return can_connect() - - -@withIDA -def can_connect() -> bool: - return True - - def l2r(addr: int) -> int: region_start = pwndbg.aglib.vmmap.addr_region_start(addr) if region_start is None: @@ -210,20 +292,20 @@ def base(): return segaddr - base -@withIDA +@enabledIDA @takes_address def Comment(addr: int): return _ida.get_cmt(addr, 0) or _ida.get_cmt(addr) -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def Name(addr: int): return _ida.get_name(addr, 0x1) # GN_VISIBLE -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def GetFuncOffset(addr: int): @@ -231,14 +313,14 @@ def GetFuncOffset(addr: int): return rv -@withIDA +@enabledIDA @takes_address def GetFuncAttr(addr: int, attr: int): rv = _ida.get_func_attr(addr, attr) return rv -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def GetType(addr: int): @@ -246,20 +328,20 @@ def GetType(addr: int): return rv -@withIDA +@enabledIDA @returns_address def here() -> int: return _ida.here() # type: ignore[return-value] -@withIDA +@enabledIDA @takes_address def Jump(addr: int): # uses C++ api instead of idc one to avoid activating the IDA window return _ida.jumpto(addr, -1, 0) -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def Anterior(addr: int): @@ -275,18 +357,18 @@ def Anterior(addr: int): return b"\n".join(lines) -@withIDA +@enabledIDA def GetBreakpoints(): for i in range(GetBptQty()): yield GetBptEA(i) -@withIDA +@enabledIDA def GetBptQty(): return _ida.get_bpt_qty() -@withIDA +@enabledIDA @returns_address def GetBptEA(i: int) -> int: return _ida.get_bpt_ea(i) # type: ignore[return-value] @@ -297,7 +379,7 @@ _breakpoints: List[gdb.Breakpoint] = [] @pwndbg.dbg.event_handler(EventType.CONTINUE) @pwndbg.dbg.event_handler(EventType.STOP) -@withIDA +@enabledIDA def UpdateBreakpoints() -> None: # XXX: Remove breakpoints from IDA when the user removes them. current = {eval(b.location.lstrip("*")) for b in _breakpoints} @@ -318,7 +400,7 @@ def UpdateBreakpoints() -> None: _breakpoints.append(bp) -@withIDA +@enabledIDA @takes_address def SetColor(pc, color): return _ida.set_color(pc, 1, color) @@ -328,7 +410,7 @@ colored_pc = None @pwndbg.dbg.event_handler(EventType.STOP) -@withIDA +@enabledIDA def Auto_Color_PC() -> None: global colored_pc colored_pc = pwndbg.aglib.regs.pc @@ -336,7 +418,7 @@ def Auto_Color_PC() -> None: @pwndbg.dbg.event_handler(EventType.CONTINUE) -@withIDA +@enabledIDA def Auto_UnColor_PC() -> None: global colored_pc if colored_pc: @@ -344,14 +426,14 @@ def Auto_UnColor_PC() -> None: colored_pc = None -@withIDA +@enabledIDA @returns_address @pwndbg.lib.cache.cache_until("objfile") def LocByName(name) -> int: return _ida.get_name_ea_simple(str(name)) # type: ignore[return-value] -@withIDA +@enabledIDA @takes_address @returns_address @pwndbg.lib.cache.cache_until("objfile") @@ -359,7 +441,7 @@ def PrevHead(addr): return _ida.prev_head(addr) -@withIDA +@enabledIDA @takes_address @returns_address @pwndbg.lib.cache.cache_until("objfile") @@ -367,39 +449,39 @@ def NextHead(addr): return _ida.next_head(addr) -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def GetFunctionName(addr): return _ida.get_func_name(addr) -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def GetFlags(addr): return _ida.get_full_flags(addr) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("objfile") def isASCII(flags): return _ida.is_strlit(flags) -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("objfile") def ArgCount(address) -> None: pass -@withIDA +@enabledIDA def SaveBase(path: str): return _ida.save_database(path) -@withIDA +@enabledIDA def GetIdbPath(): return _ida.get_idb_path() @@ -410,94 +492,104 @@ def has_cached_cfunc(addr): return _ida.has_cached_cfunc(addr) +_lexer = CppLexer() +_formatter = Terminal256Formatter(style="monokai") + + @withHexrays @takes_address @pwndbg.lib.cache.cache_until("stop") -def decompile(addr): - return _ida.decompile(addr) +def decompile(addr) -> str | None: + code: str | None = _ida.decompile(addr) + if code is not None and not pwndbg.config.disable_colors: + code = highlight(code, _lexer, _formatter) + return code @withHexrays @takes_address @pwndbg.lib.cache.cache_until("stop") -def decompile_context(pc, context_lines): - return _ida.decompile_context(pc, context_lines) +def decompile_context(pc, context_lines) -> str | None: + code: str | None = _ida.decompile_context(pc, context_lines) + if code is not None and not pwndbg.config.disable_colors: + code = highlight(code, _lexer, _formatter) + return code -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("forever") def get_ida_versions() -> Dict[str, str]: return _ida.versions() # type: ignore[return-value] -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetStrucQty(): return _ida.get_struc_qty() -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetStrucId(idx): return _ida.get_struc_by_idx(idx) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetStrucName(sid): return _ida.get_struc_name(sid) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetStrucSize(sid): return _ida.get_struc_size(sid) -@withIDA +@enabledIDA @takes_address @pwndbg.lib.cache.cache_until("stop") def GetFrameId(addr): return _ida.get_frame_id(addr) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetMemberQty(sid): return _ida.get_member_qty(sid) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetMemberSize(sid, offset): return _ida.get_member_size(sid, offset) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetMemberId(sid, offset): return _ida.get_member_id(sid, offset) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetMemberName(sid, offset): return _ida.get_member_name(sid, offset) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetMemberOffset(sid, member_name): return _ida.get_member_offset(sid, member_name) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetMemberFlag(sid, offset): return _ida.get_member_flag(sid, offset) -@withIDA +@enabledIDA @pwndbg.lib.cache.cache_until("stop") def GetStrucNextOff(sid, offset): return _ida.get_next_offset(sid, offset) @@ -506,10 +598,6 @@ def GetStrucNextOff(sid, offset): class IDC: query = "{k:v for k,v in globals()['idc'].__dict__.items() if isinstance(v, int)}" - def __init__(self) -> None: - if available(): - self._update() - def _update(self) -> None: data: Dict[Any, Any] = _ida.eval(self.query) self.__dict__.update(data) @@ -569,7 +657,7 @@ ida_replacements = { class IdaProvider(pwndbg.integration.IntegrationProvider): @pwndbg.decorators.suppress_errors() - @withIDA + @enabledIDA def get_symbol(self, addr: int) -> str | None: exe = pwndbg.aglib.elf.exe() if exe: @@ -590,18 +678,18 @@ class IdaProvider(pwndbg.integration.IntegrationProvider): return () @pwndbg.decorators.suppress_errors(fallback=True) - @withIDA + @enabledIDA def is_in_function(self, addr: int) -> bool: - return available() and bool(GetFunctionName(addr)) + return bool(GetFunctionName(addr)) @pwndbg.decorators.suppress_errors(fallback=[]) - @withIDA + @enabledIDA def get_comment_lines(self, addr: int) -> List[str]: pre = Anterior(addr) return pre.decode().split("\n") if pre else [] @pwndbg.decorators.suppress_errors() - @withIDA + @enabledIDA def decompile(self, addr: int, lines: int) -> List[str] | None: code = decompile_context(addr, lines // 2) if code: @@ -610,7 +698,7 @@ class IdaProvider(pwndbg.integration.IntegrationProvider): return None @pwndbg.decorators.suppress_errors() - @withIDA + @enabledIDA def get_func_type(self, addr: int) -> Function | None: typename: str = GetType(addr) @@ -626,3 +714,7 @@ class IdaProvider(pwndbg.integration.IntegrationProvider): return pwndbg.lib.funcparser.ExtractFuncDeclFromSource(typename + ";") return None + + @override + def disable(self) -> None: + ensure_disabled()