diff --git a/pwndbg/gdblib/events.py b/pwndbg/gdblib/events.py index bd604d8ac..592188c1f 100644 --- a/pwndbg/gdblib/events.py +++ b/pwndbg/gdblib/events.py @@ -8,12 +8,14 @@ from __future__ import annotations import sys from collections import defaultdict +from collections import deque from enum import Enum from enum import auto from functools import partial from functools import wraps from typing import Any from typing import Callable +from typing import Deque from typing import Dict from typing import List from typing import TypeVar @@ -74,6 +76,61 @@ class StartEvent: gdb.events.start = StartEvent() +def _is_safe_event_packet(): + try: + gdb.selected_frame() + except gdb.error as e: + if "Remote 'g' packet reply is too long" in str(e): + return False + return True + + +def _is_safe_event_thread(): + try: + gdb.newest_frame() + except gdb.error as e: + if "Selected thread is running" in str(e): + return False + return True + + +def wrap_safe_event_handler(event_handler: Callable[P, T]) -> Callable[P, T]: + """ + Wraps an event handler to ensure it is only executed when the event is safe. + Invalid events are queued and executed later when safe. + + Note: Avoid using `gdb.post_event` because of another bug in gdbserver + where the `gdb.newest_frame` function may not work properly. + + Workaround to fix bug in gdbserver: https://github.com/pwndbg/pwndbg/issues/2576 + """ + queued_invalid_events: Deque[Callable[..., Any]] = deque() + + def _loop_until_thread_ok(): + if not queued_invalid_events: + return + + if not _is_safe_event_thread(): + gdb.post_event(_loop_until_thread_ok) + return + + while queued_invalid_events: + queued_invalid_events.popleft()() + + @wraps(event_handler) + def _inner_handler(*a: P.args, **kw: P.kwargs): + if _is_safe_event_packet(): + while queued_invalid_events: + queued_invalid_events.popleft()() + event_handler(*a, **kw) + return + + queued_invalid_events.append(lambda: event_handler(*a, **kw)) + gdb.post_event(_loop_until_thread_ok) + + return _inner_handler + + class HandlerPriority(Enum): """ A priority level for an event handler, ordered from highest to lowest priority. @@ -143,6 +200,10 @@ def connect( registered[event_handler].setdefault(priority, []).append(caller) if event_handler not in connected: handle = partial(invoke_event, event_handler) + + if event_handler == gdb.events.new_objfile: + handle = wrap_safe_event_handler(handle) + event_handler.connect(handle) connected[event_handler] = handle return func