integration: add highlighting to ida (#3367)

* ida integration fixups

* handle connection breaks cleanly

* bring binja up to speed

* make the ports not collide, and more random

* final fixups

* final final

* typo fix

* add highlighting to ida
pull/3364/head
k4lizen 1 month ago committed by GitHub
parent 217590668b
commit bb687cc761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -22,7 +22,7 @@ xmlrpc.client.MININT = -(10**100)
host = os.environ.get("PWNDBG_BINJA_SERVER_HOST", "127.0.0.1")
port = int(os.environ.get("PWNDBG_BINJA_SERVER_PORT", "31337"))
port = int(os.environ.get("PWNDBG_BINJA_SERVER_PORT", "43717"))
logger = binaryninja.log.Logger(0, "pwndbg-integration")

@ -214,7 +214,7 @@ Binary Ninja XML-RPC server port.
**Default:** 31337
**Default:** 43717
----------
@ -684,7 +684,7 @@ Ida xmlrpc server port.
**Default:** 31337
**Default:** 43718
----------

@ -127,7 +127,7 @@ returned.
``` {.python .no-copy}
bn_eval(expr: gdb.Value) -> int | None
bn_eval(expr: gdb.Value) -> int
```
@ -144,7 +144,7 @@ This function cannot see stack local variables.
#### Example
```
pwndbg> set integration-provider binja
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717
Set which provider to use for integration features to 'binja'.
pwndbg> p/x $bn_eval("10+20")
$6 = 0x30
@ -168,7 +168,7 @@ $11 = 1
``` {.python .no-copy}
bn_sym(name_val: gdb.Value) -> int | None
bn_sym(name_val: gdb.Value) -> int
```
@ -180,7 +180,7 @@ but not stack local variables, use `bn_var` for that.
#### Example
```
pwndbg> set integration-provider binja
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717
Set which provider to use for integration features to 'binja'.
pwndbg> p main
No symbol "main" in current context.
@ -196,7 +196,7 @@ Breakpoint 1 at 0x555555555645
``` {.python .no-copy}
bn_var(name_val: gdb.Value) -> int | None
bn_var(name_val: gdb.Value) -> int
```
@ -208,7 +208,7 @@ use `bn_sym` for that.
#### Example
```
pwndbg> set integration-provider binja
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717
Set which provider to use for integration features to 'binja'.
pwndbg> p user_choice
No symbol "user_choice" in current context.
@ -384,7 +384,7 @@ This functions doesn't see stack local variables.
#### Example
```
pwndbg> set integration-provider ida
Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:43718
Set which provider to use for integration features to 'ida'.
pwndbg> p main
No symbol "main" in current context.

@ -17,7 +17,7 @@ save-ida Save the ida database.
pwndbg> config ida
attachp-resolution-method how to determine the process to attach when multiple candidates exists 'ask'
ida-rpc-host ida xmlrpc server address '127.0.0.1'
ida-rpc-port ida xmlrpc server port 31337
ida-rpc-port ida xmlrpc server port 43718
ida-timeout time to wait for ida xmlrpc in seconds 2
pwndbg> | help function | grep ida
function ida -- Lookup a symbol's address by name from IDA.

@ -50,7 +50,7 @@ xmlclient.Marshaller.dispatch[int] = create_marshaller("<value><i8>%d</i8></valu
xmlclient.Marshaller.dispatch[idaapi.cfuncptr_t] = create_marshaller(just_to_str=True)
host = os.environ.get("PWNDBG_IDA_SERVER_HOST", "127.0.0.1")
port = int(os.environ.get("PWNDBG_IDA_SERVER_PORT", "31337"))
port = int(os.environ.get("PWNDBG_IDA_SERVER_PORT", "43718"))
mutex = threading.Condition()

@ -371,6 +371,20 @@ class CommandObj:
except TypeError:
print(f"{self.command_name}: {self.description}")
pwndbg.exception.handle(self.function.__name__)
except ConnectionRefusedError:
print(message.error("Connection Refused Exception."))
print(message.hint("Did an integration provider die?"), end="")
# If yes, the resulting state can be really messy.
if pwndbg.integration.provider_name != "none":
print(
message.hint(
f" Automatically disabled {pwndbg.integration.provider_name} integration."
)
)
pwndbg.integration.provider.disable()
else:
print()
except Exception:
pwndbg.exception.handle(self.function.__name__)
return None

@ -13,6 +13,7 @@ from pwndbg.commands import CommandCategory
aliases=["bns"],
)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.integration.binja.withBinja
def bn_sync(*args) -> None:
"""
Synchronize Binary Ninja's cursor with GDB

@ -13,8 +13,7 @@ from pwndbg.color import message
@pwndbg.gdblib.functions.GdbFunction()
@pwndbg.integration.binja.with_bn()
def bn_sym(name_val: gdb.Value) -> int | None:
def bn_sym(name_val: gdb.Value) -> int:
"""
Lookup a symbol's address by name from Binary Ninja.
@ -24,7 +23,7 @@ def bn_sym(name_val: gdb.Value) -> int | None:
Example:
```
pwndbg> set integration-provider binja
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717
Set which provider to use for integration features to 'binja'.
pwndbg> p main
No symbol "main" in current context.
@ -34,16 +33,21 @@ def bn_sym(name_val: gdb.Value) -> int | None:
Breakpoint 1 at 0x555555555645
```
"""
# GDB convenience functions are not allowed to return None, so we cannot
# decorate with @withBinja().
if not pwndbg.integration.binja.establish_connection():
return 0
name = name_val.string()
addr: int | None = pwndbg.integration.binja._bn.get_symbol_addr(name)
if addr is None:
return None
print(message.error("Not found."))
return 0
return pwndbg.integration.binja.r2l(addr)
@pwndbg.gdblib.functions.GdbFunction()
@pwndbg.integration.binja.with_bn()
def bn_var(name_val: gdb.Value) -> int | None:
def bn_var(name_val: gdb.Value) -> int:
"""
Lookup a stack variable's address by name from Binary Ninja.
@ -53,7 +57,7 @@ def bn_var(name_val: gdb.Value) -> int | None:
Example:
```
pwndbg> set integration-provider binja
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717
Set which provider to use for integration features to 'binja'.
pwndbg> p user_choice
No symbol "user_choice" in current context.
@ -67,12 +71,18 @@ def bn_var(name_val: gdb.Value) -> int | None:
Error while executing Python code.
```
"""
# GDB convenience functions are not allowed to return None, so we cannot
# decorate with @withBinja().
if not pwndbg.integration.binja.establish_connection():
return 0
name = name_val.string()
conf_and_offset: Tuple[int, int] | None = pwndbg.integration.binja._bn.get_var_offset_from_sp(
pwndbg.integration.binja.l2r(pwndbg.aglib.regs.pc), name
)
if conf_and_offset is None:
return None
print(message.error("Not found."))
return 0
(conf, offset) = conf_and_offset
if conf < 64:
print(message.warn(f"Warning: Stack offset only has {conf / 255 * 100:.2f}% confidence"))
@ -80,8 +90,8 @@ def bn_var(name_val: gdb.Value) -> int | None:
@pwndbg.gdblib.functions.GdbFunction()
@pwndbg.integration.binja.with_bn()
def bn_eval(expr: gdb.Value) -> int | None:
@pwndbg.integration.binja.enabledBinja()
def bn_eval(expr: gdb.Value) -> int:
"""
Parse and evaluate a Binary Ninja expression.
@ -96,7 +106,7 @@ def bn_eval(expr: gdb.Value) -> int | None:
Example:
```
pwndbg> set integration-provider binja
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Binary Ninja (4.2.6455 Personal) xmlrpc: http://127.0.0.1:43717
Set which provider to use for integration features to 'binja'.
pwndbg> p/x $bn_eval("10+20")
$6 = 0x30
@ -114,6 +124,11 @@ def bn_eval(expr: gdb.Value) -> int | None:
$11 = 1
```
"""
# GDB convenience functions are not allowed to return None, so we cannot
# decorate with @withBinja().
if not pwndbg.integration.binja.establish_connection():
return 0
magic_vars = {}
for r in pwndbg.aglib.regs.current:
v = pwndbg.aglib.regs[r]
@ -121,4 +136,7 @@ def bn_eval(expr: gdb.Value) -> int | None:
magic_vars[r] = v
magic_vars["piebase"] = pwndbg.aglib.proc.binary_base_addr
ret: int | None = pwndbg.integration.binja._bn.parse_expr(expr.string(), magic_vars)
if ret is None:
print(message.error("Not found."))
return 0
return ret

@ -23,7 +23,7 @@ from pwndbg.gdblib.functions import GdbFunction
)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.dbg.event_handler(EventType.STOP)
@pwndbg.integration.ida.withIDA
@pwndbg.integration.ida.enabledIDA
def j(*args) -> None:
"""
Synchronize IDA's cursor with GDB
@ -99,9 +99,6 @@ def down(n=1) -> None:
@pwndbg.integration.ida.withIDA
def save_ida() -> None:
"""Save the IDA database"""
if not pwndbg.integration.ida.available():
return
path = pwndbg.integration.ida.GetIdbPath()
# Need to handle emulated paths for Wine
@ -138,9 +135,6 @@ def save_ida() -> None:
os.unlink(full_path)
save_ida()
def _ida_local(name: str) -> int | None:
if not pwndbg.aglib.proc.alive:
return None
@ -185,7 +179,7 @@ def ida(name: gdb.Value) -> int:
Example:
```
pwndbg> set integration-provider ida
Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:31337
Pwndbg successfully connected to Ida Pro xmlrpc: http://127.0.0.1:43718
Set which provider to use for integration features to 'ida'.
pwndbg> p main
No symbol "main" in current context.
@ -195,6 +189,11 @@ def ida(name: gdb.Value) -> int:
Breakpoint 2 at 0x555555555645
```
"""
# GDB convenience functions are not allowed to return None, so we cannot
# decorate with @withIda().
if not pwndbg.integration.ida.establish_connection():
return 0
name = name.string()
# Lookup local variables first

@ -65,7 +65,16 @@ class IntegrationProvider:
"""
return None
def disable(self) -> None:
"""
Notify the provider that it should disable itself.
"""
return None
# This value should only be the name of the provider if we have a valid connection
# to the provider. I.e. if we fail to connect to the provider, we should set this to
# "none".
provider_name = pwndbg.config.add_param(
"integration-provider",
"none",
@ -130,28 +139,24 @@ class ConfigurableProvider(IntegrationProvider):
def get_stack_var_name(self, addr: int) -> str | None:
return self.inner.get_stack_var_name(addr)
def disable(self) -> None:
return self.inner.disable()
provider: IntegrationProvider = IntegrationProvider()
@pwndbg.config.trigger(provider_name)
def switch_providers():
def set_provider(prov: IntegrationProvider) -> None:
"""
Call this from provider-specific code whenever you establish a connection.
"""
global provider
provider = ConfigurableProvider(prov)
def unset_provider() -> None:
"""
Call this from provider-specific code whenever a connection stops.
"""
global provider
if not provider_name.value or provider_name.value == "none":
provider = IntegrationProvider()
elif provider_name.value == "binja":
# do not import at start of file to avoid circular import
import pwndbg.integration.binja
provider = ConfigurableProvider(pwndbg.integration.binja.BinjaProvider())
elif provider_name.value == "ida":
import pwndbg.integration.ida
provider = ConfigurableProvider(pwndbg.integration.ida.IdaProvider())
else:
print(
message.warn(
f"Invalid provider {provider_name.value!r} specified. Disabling integration."
)
)
provider_name.revert_default()
provider = IntegrationProvider()

@ -25,6 +25,7 @@ import pygments.formatters
import pygments.style
import pygments.token
from typing_extensions import ParamSpec
from typing_extensions import override
import pwndbg
import pwndbg.aglib.arch
@ -51,7 +52,7 @@ from pwndbg.lib.functions import Function
bn_rpc_host = pwndbg.config.add_param(
"bn-rpc-host", "127.0.0.1", "Binary Ninja XML-RPC server host"
)
bn_rpc_port = pwndbg.config.add_param("bn-rpc-port", 31337, "Binary Ninja XML-RPC server port")
bn_rpc_port = pwndbg.config.add_param("bn-rpc-port", 43717, "Binary Ninja XML-RPC server port")
bn_timeout = pwndbg.config.add_param(
"bn-timeout", 2, "time to wait for Binary Ninja XML-RPC, in seconds"
)
@ -81,18 +82,36 @@ K = TypeVar("K")
@pwndbg.decorators.only_after_first_prompt()
@pwndbg.config.trigger(bn_rpc_host, bn_rpc_port, pwndbg.integration.provider_name, bn_timeout)
def init_bn_rpc_client() -> None:
global _bn, _bn_last_exception, _bn_last_connection_check
def binja_config_changed():
if pwndbg.integration.provider_name.value == "binja":
# We need to (re)connect the client, possibly with updated values.
try_init_bn_rpc_client()
if pwndbg.integration.provider_name.value != "binja":
return
def ensure_disabled() -> None:
global _bn
_bn = None
pwndbg.integration.unset_provider()
pwndbg.integration.provider_name.value = "none"
def try_init_bn_rpc_client() -> bool:
"""
Try to connect to the Binary Ninja RPC client.
If the connection succeeds, or we were already connected,
return True. Otherwise, False.
An appropriate message will be also printed to the user.
"""
global _bn, _bn_last_exception, _bn_last_connection_check
xmlrpc.client.MAXINT = 10**100 # type: ignore[misc]
xmlrpc.client.MININT = -(10**100) # type: ignore[misc]
now = time.time()
if _bn is None and (now - _bn_last_connection_check) < int(bn_timeout) + 5:
return
return False
addr = f"http://{bn_rpc_host}:{bn_rpc_port}"
@ -102,21 +121,45 @@ def init_bn_rpc_client() -> None:
exception = None # (type, value, traceback)
try:
version: str = _bn.get_version()
pwndbg.integration.set_provider(BinjaProvider())
print(
message.success(
f"Pwndbg successfully connected to Binary Ninja ({version}) xmlrpc: {addr}"
)
)
except TimeoutError:
if pwndbg.integration.provider_name.value != "binja":
# We managed to successfully connect, and this happened because a binja
# command was invoked, rather than the user setting the integration-provider parameter.
# So, we want to set the provider name now.
# Note that binja_config_changed() is a trigger, and not a value listener, so it won't
# be called when we set the value here (which is good, we would have recursion otherwise).
pwndbg.integration.provider_name.value = "binja"
assert (
len(pwndbg.config.triggers[pwndbg.integration.provider_name.name]) == 2
), "Does this new function need to be called here?"
return True
except (TimeoutError, xmlrpc.client.ProtocolError):
exception = sys.exc_info()
_bn = None
except OSError as e:
if e.errno != errno.ECONNREFUSED:
if e.errno == errno.ECONNREFUSED:
print(
message.error("Connection refused. ")
+ message.hint("Did you start the plugin in Binary Ninja?")
# TODO: remove this after some time passes
+ message.notice(
"\nIn the latest version of Pwndbg, the binja_script.py file has been\n"
)
+ message.notice("updated, you may need to reinstall it.")
)
else:
exception = sys.exc_info()
_bn = None
except xmlrpc.client.ProtocolError:
exception = sys.exc_info()
_bn = None
ensure_disabled()
if exception:
if (
@ -153,44 +196,75 @@ def init_bn_rpc_client() -> None:
_bn_last_exception = exception and exception[1]
_bn_last_connection_check = now
return False
def with_bn(fallback: K = None) -> Callable[[Callable[P, T]], Callable[P, T | K]]:
global _bn
# We cannot catch the ConnectionRefusedError here, nor in @withBinja because there
# may be multiple nested decorated functions, and the bottom most one will swallow
# the exception up prevent it from bubbling to the top. Thus, we catch
# ConnectionRefusedError in CommandObj.__call__().
def enabledBinja(fallback: K = None) -> Callable[[Callable[P, T]], Callable[P, T | K]]:
"""
If we have a connection to binary ninja, call the function.
def decorator(func: Callable[P, T]) -> Callable[P, T | K]:
global _bn
Otherwise, return fallback. Thus, all functions decorated with this, that do
not specify a fallback, must have "| None" in their return signature.
This will not try to open a connection if it doesn't already exist.
No messages will be printed.
"""
def decorator(func: Callable[P, T]) -> Callable[P, T | K]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | K:
global _bn
if _bn is None:
init_bn_rpc_client()
try:
if _bn is not None:
return func(*args, **kwargs)
except ConnectionRefusedError:
print(message.error("[!] Binary Ninja connection refused"))
_bn = None
assert pwndbg.integration.provider_name.value != "binja"
return fallback
return fallback
return func(*args, **kwargs)
return wrapper
return decorator
@pwndbg.lib.cache.cache_until("stop")
def available() -> bool:
return can_connect() is not None
def establish_connection() -> bool:
"""
If we already had a connection, or succeed in creating a new one, return True.
Otherwise False.
"""
if _bn is not None:
return True
print(message.notice("Trying to connect to Binary Ninja..."))
ok = try_init_bn_rpc_client()
if not ok:
print(message.error("Aborting."))
return False
@with_bn()
def can_connect() -> bool:
return True
def withBinja(func: Callable[P, T]) -> Callable[P, T | None]:
"""
Try to connect to Binary Ninja before running the decorated function.
If we fail connecting, return None. Thus, all functions
decorated with this must have "| None" in their return signature.
Use this for user-initiated stuff like pwndbg.commands.binja.bn_sync().
"""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None:
if establish_connection():
return func(*args, **kwargs)
return None
return wrapper
def l2r(addr: int) -> int:
result = (addr - pwndbg.aglib.proc.binary_base_addr + base()) & pwndbg.aglib.arch.ptrmask
return result
@ -207,7 +281,7 @@ def base():
@pwndbg.dbg.event_handler(EventType.STOP)
@with_bn()
@enabledBinja()
def auto_update_pc() -> None:
if not pwndbg.aglib.proc.alive:
return
@ -223,7 +297,7 @@ _managed_bps: Dict[int, StopPoint] = {}
@pwndbg.dbg.event_handler(EventType.START)
@pwndbg.dbg.event_handler(EventType.STOP)
@pwndbg.dbg.event_handler(EventType.CONTINUE)
@with_bn()
@enabledBinja()
def auto_update_bp() -> None:
if not pwndbg.aglib.proc.alive:
return
@ -241,12 +315,12 @@ def auto_update_bp() -> None:
@pwndbg.dbg.event_handler(EventType.CONTINUE)
@pwndbg.dbg.event_handler(EventType.EXIT)
@with_bn()
@enabledBinja()
def auto_clear_pc() -> None:
_bn.clear_pc_tag()
@with_bn()
@enabledBinja()
def navigate_to(addr: int) -> None:
_bn.navigate_to(l2r(addr))
@ -388,7 +462,7 @@ style = theme.add_param(
class BinjaProvider(pwndbg.integration.IntegrationProvider):
@pwndbg.decorators.suppress_errors()
@with_bn()
@enabledBinja()
@pwndbg.lib.cache.cache_until("stop")
def get_symbol(self, addr: int) -> str | None:
sym: str | None = _bn.get_symbol(l2r(addr))
@ -411,7 +485,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider):
return None
@pwndbg.decorators.suppress_errors(fallback=())
@with_bn(fallback=())
@enabledBinja(fallback=())
def get_versions(self) -> Tuple[str, ...]:
bn_version: str = _bn.get_version()
py_version: str = _bn.get_py_version()
@ -421,19 +495,19 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider):
)
@pwndbg.decorators.suppress_errors(fallback=True)
@with_bn(fallback=True)
@enabledBinja(fallback=True)
@pwndbg.lib.cache.cache_until("stop")
def is_in_function(self, addr: int) -> bool:
return _bn.get_func_info(l2r(addr)) is not None
@pwndbg.decorators.suppress_errors(fallback=[])
@with_bn(fallback=[])
@enabledBinja(fallback=[])
def get_comment_lines(self, addr: int) -> List[str]:
comments: List[str] = _bn.get_comments(l2r(addr))
return comments
@pwndbg.decorators.suppress_errors()
@with_bn()
@enabledBinja()
def decompile(self, addr: int, lines: int) -> List[str] | None:
decomp: List[Tuple[int, List[Tuple[str, str]]]] | None = _bn.decompile_func(
l2r(addr), bn_il_level.value
@ -511,7 +585,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider):
return ret
@pwndbg.decorators.suppress_errors()
@with_bn()
@enabledBinja()
def get_func_type(self, addr: int) -> Function | None:
ty: Tuple[Tuple[str, int, str], List[Tuple[str, int, str]]] = _bn.get_func_type(l2r(addr))
if ty is None:
@ -520,7 +594,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider):
return Function(type=ty[0][0], derefcnt=ty[0][1], name=ty[0][2], args=args)
@pwndbg.decorators.suppress_errors()
@with_bn()
@enabledBinja()
@pwndbg.lib.cache.cache_until("stop")
def get_stack_var_name(self, addr: int) -> str | None:
cur = pwndbg.dbg.selected_frame()
@ -559,3 +633,7 @@ class BinjaProvider(pwndbg.integration.IntegrationProvider):
return f"{var}{suffix}"
else:
return f"{func}:{var}{suffix}"
@override
def disable(self) -> None:
ensure_disabled()

@ -21,8 +21,12 @@ from typing import Tuple
from typing import TypeVar
import gdb
from pygments import highlight
from pygments.formatters import Terminal256Formatter
from pygments.lexers import CppLexer
from typing_extensions import Concatenate
from typing_extensions import ParamSpec
from typing_extensions import override
import pwndbg
import pwndbg.aglib.arch
@ -39,7 +43,7 @@ from pwndbg.dbg import EventType
from pwndbg.lib.functions import Function
ida_rpc_host = pwndbg.config.add_param("ida-rpc-host", "127.0.0.1", "ida xmlrpc server address")
ida_rpc_port = pwndbg.config.add_param("ida-rpc-port", 31337, "ida xmlrpc server port")
ida_rpc_port = pwndbg.config.add_param("ida-rpc-port", 43718, "ida xmlrpc server port")
ida_timeout = pwndbg.config.add_param("ida-timeout", 2, "time to wait for ida xmlrpc in seconds")
@ -57,18 +61,37 @@ T = TypeVar("T")
@pwndbg.decorators.only_after_first_prompt()
@pwndbg.config.trigger(ida_rpc_host, ida_rpc_port, pwndbg.integration.provider_name, ida_timeout)
def init_ida_rpc_client() -> None:
global _ida, _ida_last_exception, _ida_last_connection_check
def ida_config_changed() -> None:
if pwndbg.integration.provider_name.value == "ida":
# We need to (re)connect the client, possibly with updated values.
try_init_ida_rpc_client()
def ensure_disabled() -> None:
global _ida
_ida = None
pwndbg.integration.unset_provider()
pwndbg.integration.provider_name.value = "none"
def try_init_ida_rpc_client() -> bool:
"""
Try to connect to the IDA RPC client.
If the connection succeeds, or we were already connected,
return True. Otherwise, False.
An appropriate message will be also printed to the user.
"""
if pwndbg.integration.provider_name.value != "ida":
return
global _ida, _ida_last_exception, _ida_last_connection_check
xmlrpc.client.MAXINT = 10**100 # type: ignore[misc]
xmlrpc.client.MININT = -(10**100) # type: ignore[misc]
now = time.time()
if _ida is None and (now - _ida_last_connection_check) < int(ida_timeout) + 5:
return
return False
addr = f"http://{ida_rpc_host}:{ida_rpc_port}"
@ -78,18 +101,37 @@ def init_ida_rpc_client() -> None:
exception = None # (type, value, traceback)
try:
_ida.here()
print(message.success(f"Pwndbg successfully connected to Ida Pro xmlrpc: {addr}"))
idc._update()
except TimeoutError:
pwndbg.integration.set_provider(IdaProvider())
print(message.success(f"Pwndbg successfully connected to Ida Pro xmlrpc: {addr}"))
if pwndbg.integration.provider_name.value != "ida":
# We managed to successfully connect, and this happened because an Ida
# command was invoked, rather than the user setting the integration-provider parameter.
# So, we want to set the provider name now.
# Note that ida_config_changed() is a trigger, and not a value listener, so it won't
# be called when we set the value here (which is good, we would have recursion otherwise).
pwndbg.integration.provider_name.value = "ida"
assert (
len(pwndbg.config.triggers[pwndbg.integration.provider_name.name]) == 2
), "Does this new function need to be called here?"
return True
except (TimeoutError, xmlrpc.client.ProtocolError):
exception = sys.exc_info()
_ida = None
except OSError as e:
if e.errno != errno.ECONNREFUSED:
if e.errno == errno.ECONNREFUSED:
print(
message.error("Connection refused. ")
+ message.hint("Did you start ./ida_script.py from Ida?")
)
else:
exception = sys.exc_info()
_ida = None
except xmlrpc.client.ProtocolError:
exception = sys.exc_info()
_ida = None
ensure_disabled()
if exception:
if (
@ -126,14 +168,66 @@ def init_ida_rpc_client() -> None:
_ida_last_exception = exception and exception[1]
_ida_last_connection_check = now
return False
def withIDA(func: Callable[P, T]) -> Callable[P, T | None]:
# We cannot catch the ConnectionRefusedError here, nor in @withIDA because there
# may be multiple nested decorated functions, and the bottom most one will swallow
# the exception up prevent it from bubbling to the top. Thus, we catch
# ConnectionRefusedError in CommandObj.__call__().
def enabledIDA(func: Callable[P, T]) -> Callable[P, T | None]:
"""
If we have a connection to Ida, call the function.
Otherwise, return None. Thus, all functions decorated with this must have
"| None" in their return signature.
This will not try to open a connection if it doesn't already exist.
No messages will be printed.
"""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None:
if _ida is None:
init_ida_rpc_client()
if _ida is not None:
assert pwndbg.integration.provider_name.value != "ida"
return None
return func(*args, **kwargs)
return wrapper
def establish_connection() -> bool:
"""
If we already had a connection, or succeed in creating a new one, return True.
Otherwise False.
"""
if _ida is not None:
return True
print(message.notice("Trying to connect to Ida..."))
ok = try_init_ida_rpc_client()
if not ok:
print(message.error("Aborting."))
return False
return True
def withIDA(func: Callable[P, T]) -> Callable[P, T | None]:
"""
Try to connect to Ida before running the decorated function.
If we fail connecting to Ida, return None. Thus, all functions
decorated with this must have "| None" in their return signature.
Use this for user-initiated stuff like pwndbg.commands.ida.save_ida().
"""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None:
if establish_connection():
return func(*args, **kwargs)
return None
@ -141,7 +235,7 @@ def withIDA(func: Callable[P, T]) -> Callable[P, T | None]:
def withHexrays(func: Callable[P, T]) -> Callable[P, T | None]:
@withIDA
@enabledIDA
@functools.wraps(func)
def wrapper(*a: P.args, **kw: P.kwargs) -> T | None:
if _ida is not None and _ida.init_hexrays_plugin():
@ -167,18 +261,6 @@ def returns_address(function: Callable[P, int]) -> Callable[P, int]:
return wrapper
@pwndbg.lib.cache.cache_until("stop")
def available() -> bool:
if pwndbg.integration.provider_name.value != "ida":
return False
return can_connect()
@withIDA
def can_connect() -> bool:
return True
def l2r(addr: int) -> int:
region_start = pwndbg.aglib.vmmap.addr_region_start(addr)
if region_start is None:
@ -210,20 +292,20 @@ def base():
return segaddr - base
@withIDA
@enabledIDA
@takes_address
def Comment(addr: int):
return _ida.get_cmt(addr, 0) or _ida.get_cmt(addr)
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def Name(addr: int):
return _ida.get_name(addr, 0x1) # GN_VISIBLE
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def GetFuncOffset(addr: int):
@ -231,14 +313,14 @@ def GetFuncOffset(addr: int):
return rv
@withIDA
@enabledIDA
@takes_address
def GetFuncAttr(addr: int, attr: int):
rv = _ida.get_func_attr(addr, attr)
return rv
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def GetType(addr: int):
@ -246,20 +328,20 @@ def GetType(addr: int):
return rv
@withIDA
@enabledIDA
@returns_address
def here() -> int:
return _ida.here() # type: ignore[return-value]
@withIDA
@enabledIDA
@takes_address
def Jump(addr: int):
# uses C++ api instead of idc one to avoid activating the IDA window
return _ida.jumpto(addr, -1, 0)
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def Anterior(addr: int):
@ -275,18 +357,18 @@ def Anterior(addr: int):
return b"\n".join(lines)
@withIDA
@enabledIDA
def GetBreakpoints():
for i in range(GetBptQty()):
yield GetBptEA(i)
@withIDA
@enabledIDA
def GetBptQty():
return _ida.get_bpt_qty()
@withIDA
@enabledIDA
@returns_address
def GetBptEA(i: int) -> int:
return _ida.get_bpt_ea(i) # type: ignore[return-value]
@ -297,7 +379,7 @@ _breakpoints: List[gdb.Breakpoint] = []
@pwndbg.dbg.event_handler(EventType.CONTINUE)
@pwndbg.dbg.event_handler(EventType.STOP)
@withIDA
@enabledIDA
def UpdateBreakpoints() -> None:
# XXX: Remove breakpoints from IDA when the user removes them.
current = {eval(b.location.lstrip("*")) for b in _breakpoints}
@ -318,7 +400,7 @@ def UpdateBreakpoints() -> None:
_breakpoints.append(bp)
@withIDA
@enabledIDA
@takes_address
def SetColor(pc, color):
return _ida.set_color(pc, 1, color)
@ -328,7 +410,7 @@ colored_pc = None
@pwndbg.dbg.event_handler(EventType.STOP)
@withIDA
@enabledIDA
def Auto_Color_PC() -> None:
global colored_pc
colored_pc = pwndbg.aglib.regs.pc
@ -336,7 +418,7 @@ def Auto_Color_PC() -> None:
@pwndbg.dbg.event_handler(EventType.CONTINUE)
@withIDA
@enabledIDA
def Auto_UnColor_PC() -> None:
global colored_pc
if colored_pc:
@ -344,14 +426,14 @@ def Auto_UnColor_PC() -> None:
colored_pc = None
@withIDA
@enabledIDA
@returns_address
@pwndbg.lib.cache.cache_until("objfile")
def LocByName(name) -> int:
return _ida.get_name_ea_simple(str(name)) # type: ignore[return-value]
@withIDA
@enabledIDA
@takes_address
@returns_address
@pwndbg.lib.cache.cache_until("objfile")
@ -359,7 +441,7 @@ def PrevHead(addr):
return _ida.prev_head(addr)
@withIDA
@enabledIDA
@takes_address
@returns_address
@pwndbg.lib.cache.cache_until("objfile")
@ -367,39 +449,39 @@ def NextHead(addr):
return _ida.next_head(addr)
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def GetFunctionName(addr):
return _ida.get_func_name(addr)
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def GetFlags(addr):
return _ida.get_full_flags(addr)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("objfile")
def isASCII(flags):
return _ida.is_strlit(flags)
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def ArgCount(address) -> None:
pass
@withIDA
@enabledIDA
def SaveBase(path: str):
return _ida.save_database(path)
@withIDA
@enabledIDA
def GetIdbPath():
return _ida.get_idb_path()
@ -410,94 +492,104 @@ def has_cached_cfunc(addr):
return _ida.has_cached_cfunc(addr)
_lexer = CppLexer()
_formatter = Terminal256Formatter(style="monokai")
@withHexrays
@takes_address
@pwndbg.lib.cache.cache_until("stop")
def decompile(addr):
return _ida.decompile(addr)
def decompile(addr) -> str | None:
code: str | None = _ida.decompile(addr)
if code is not None and not pwndbg.config.disable_colors:
code = highlight(code, _lexer, _formatter)
return code
@withHexrays
@takes_address
@pwndbg.lib.cache.cache_until("stop")
def decompile_context(pc, context_lines):
return _ida.decompile_context(pc, context_lines)
def decompile_context(pc, context_lines) -> str | None:
code: str | None = _ida.decompile_context(pc, context_lines)
if code is not None and not pwndbg.config.disable_colors:
code = highlight(code, _lexer, _formatter)
return code
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("forever")
def get_ida_versions() -> Dict[str, str]:
return _ida.versions() # type: ignore[return-value]
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetStrucQty():
return _ida.get_struc_qty()
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetStrucId(idx):
return _ida.get_struc_by_idx(idx)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetStrucName(sid):
return _ida.get_struc_name(sid)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetStrucSize(sid):
return _ida.get_struc_size(sid)
@withIDA
@enabledIDA
@takes_address
@pwndbg.lib.cache.cache_until("stop")
def GetFrameId(addr):
return _ida.get_frame_id(addr)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetMemberQty(sid):
return _ida.get_member_qty(sid)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetMemberSize(sid, offset):
return _ida.get_member_size(sid, offset)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetMemberId(sid, offset):
return _ida.get_member_id(sid, offset)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetMemberName(sid, offset):
return _ida.get_member_name(sid, offset)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetMemberOffset(sid, member_name):
return _ida.get_member_offset(sid, member_name)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetMemberFlag(sid, offset):
return _ida.get_member_flag(sid, offset)
@withIDA
@enabledIDA
@pwndbg.lib.cache.cache_until("stop")
def GetStrucNextOff(sid, offset):
return _ida.get_next_offset(sid, offset)
@ -506,10 +598,6 @@ def GetStrucNextOff(sid, offset):
class IDC:
query = "{k:v for k,v in globals()['idc'].__dict__.items() if isinstance(v, int)}"
def __init__(self) -> None:
if available():
self._update()
def _update(self) -> None:
data: Dict[Any, Any] = _ida.eval(self.query)
self.__dict__.update(data)
@ -569,7 +657,7 @@ ida_replacements = {
class IdaProvider(pwndbg.integration.IntegrationProvider):
@pwndbg.decorators.suppress_errors()
@withIDA
@enabledIDA
def get_symbol(self, addr: int) -> str | None:
exe = pwndbg.aglib.elf.exe()
if exe:
@ -590,18 +678,18 @@ class IdaProvider(pwndbg.integration.IntegrationProvider):
return ()
@pwndbg.decorators.suppress_errors(fallback=True)
@withIDA
@enabledIDA
def is_in_function(self, addr: int) -> bool:
return available() and bool(GetFunctionName(addr))
return bool(GetFunctionName(addr))
@pwndbg.decorators.suppress_errors(fallback=[])
@withIDA
@enabledIDA
def get_comment_lines(self, addr: int) -> List[str]:
pre = Anterior(addr)
return pre.decode().split("\n") if pre else []
@pwndbg.decorators.suppress_errors()
@withIDA
@enabledIDA
def decompile(self, addr: int, lines: int) -> List[str] | None:
code = decompile_context(addr, lines // 2)
if code:
@ -610,7 +698,7 @@ class IdaProvider(pwndbg.integration.IntegrationProvider):
return None
@pwndbg.decorators.suppress_errors()
@withIDA
@enabledIDA
def get_func_type(self, addr: int) -> Function | None:
typename: str = GetType(addr)
@ -626,3 +714,7 @@ class IdaProvider(pwndbg.integration.IntegrationProvider):
return pwndbg.lib.funcparser.ExtractFuncDeclFromSource(typename + ";")
return None
@override
def disable(self) -> None:
ensure_disabled()

Loading…
Cancel
Save