From ce954f74489cf5849cee509d541c7d9255a9cd82 Mon Sep 17 00:00:00 2001 From: patryk4815 Date: Wed, 27 Nov 2024 13:59:01 +0100 Subject: [PATCH] Add workaround for gdbserver weird bug (#2577) * add workaround for gdbserver, exception: Remote 'g' packet reply is too long * Fix workaround for gdbserver, for single events * cleanup redundant class * cleanup _inner_handler --- pwndbg/gdblib/events.py | 61 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/pwndbg/gdblib/events.py b/pwndbg/gdblib/events.py index bd604d8ac..592188c1f 100644 --- a/pwndbg/gdblib/events.py +++ b/pwndbg/gdblib/events.py @@ -8,12 +8,14 @@ from __future__ import annotations import sys from collections import defaultdict +from collections import deque from enum import Enum from enum import auto from functools import partial from functools import wraps from typing import Any from typing import Callable +from typing import Deque from typing import Dict from typing import List from typing import TypeVar @@ -74,6 +76,61 @@ class StartEvent: gdb.events.start = StartEvent() +def _is_safe_event_packet(): + try: + gdb.selected_frame() + except gdb.error as e: + if "Remote 'g' packet reply is too long" in str(e): + return False + return True + + +def _is_safe_event_thread(): + try: + gdb.newest_frame() + except gdb.error as e: + if "Selected thread is running" in str(e): + return False + return True + + +def wrap_safe_event_handler(event_handler: Callable[P, T]) -> Callable[P, T]: + """ + Wraps an event handler to ensure it is only executed when the event is safe. + Invalid events are queued and executed later when safe. + + Note: Avoid using `gdb.post_event` because of another bug in gdbserver + where the `gdb.newest_frame` function may not work properly. + + Workaround to fix bug in gdbserver: https://github.com/pwndbg/pwndbg/issues/2576 + """ + queued_invalid_events: Deque[Callable[..., Any]] = deque() + + def _loop_until_thread_ok(): + if not queued_invalid_events: + return + + if not _is_safe_event_thread(): + gdb.post_event(_loop_until_thread_ok) + return + + while queued_invalid_events: + queued_invalid_events.popleft()() + + @wraps(event_handler) + def _inner_handler(*a: P.args, **kw: P.kwargs): + if _is_safe_event_packet(): + while queued_invalid_events: + queued_invalid_events.popleft()() + event_handler(*a, **kw) + return + + queued_invalid_events.append(lambda: event_handler(*a, **kw)) + gdb.post_event(_loop_until_thread_ok) + + return _inner_handler + + class HandlerPriority(Enum): """ A priority level for an event handler, ordered from highest to lowest priority. @@ -143,6 +200,10 @@ def connect( registered[event_handler].setdefault(priority, []).append(caller) if event_handler not in connected: handle = partial(invoke_event, event_handler) + + if event_handler == gdb.events.new_objfile: + handle = wrap_safe_event_handler(handle) + event_handler.connect(handle) connected[event_handler] = handle return func