@ -24,8 +24,21 @@ import gdb
from typing_extensions import ParamSpec
from pwndbg import config
from pwndbg . color import message
debug = config . add_param ( " debug-events " , False , " display internal event debugging info " )
gdb_workaround_stop_event = config . add_param (
" gdb-workaround-stop-event " ,
0 ,
""" Enable asynchronous stop events as a workaround to improve ' commands ' functionality.
Note : This may cause unexpected behavior with pwndbg or gdb . execute .
Values :
0 - Disable the workaround ( default ) .
1 - Enable asynchronous stop events ; gdb . execute may behave unexpectedly ( asynchronously ) .
2 - Disable only deadlock detection ; deadlocks may still occur .
""" ,
)
P = ParamSpec ( " P " )
T = TypeVar ( " T " )
@ -94,7 +107,65 @@ def _is_safe_event_thread():
return True
def wrap_safe_event_handler ( event_handler : Callable [ P , T ] ) - > Callable [ P , T ] :
queued_events : Deque [ Callable [ . . . , Any ] ] = deque ( )
executing_event = False
def _update_start_event_state ( event_type : Any ) :
"""
Update the state of the StartEvent appropriately
( we emulate this event so we need to set it properly )
"""
# Implement our custom event gdb.events.start!
if event_type == gdb . events . stop :
queued_events . append ( gdb . events . start . on_stop )
elif event_type == gdb . events . new_objfile :
queued_events . append ( gdb . events . start . on_new_objfile )
elif event_type == gdb . events . exited :
gdb . events . start . on_exited ( )
def _detect_deadlock ( ) :
if not executing_event :
# Not executing an event inside another event, so no deadlock
return
if gdb_workaround_stop_event == 2 :
# Skip deadlock detection because this option disables it
return
print ( message . error ( " DEADLOCK DETECTED... " ) )
print (
message . error (
f """ The deadlock issue is likely caused by using ' commands[ \\ n] { message . hint ( " continue " ) } [ \\ n]end ' .
To address this , you have three options :
1. Avoid using ' { message.hint( " commands " )} ' . Instead , rewrite it as a Python script . For example :
{ message . hint ( '''
# Read more at: https://github.com/pwndbg/pwndbg/issues/425#issuecomment-892302716
class Bp ( gdb . Breakpoint ) :
def stop ( self ) :
print ( " Breakpoint hit! " )
return False # False = continue to next breakpoint, True = stop inferior
Bp ( " main " )
''' )}
2. Replace ' { message.hint( " continue " )} ' with ' { message.hint( ' pi gdb . execute ( " continue " ) ' )} ' and use ' { message.hint( " set gdb-workaround-stop-event 2 " )} ' .
This change reduces the likelihood of deadlocks , while preserving pwndbg functionality .
3. Run ' { message.hint( " set gdb-workaround-stop-event 1 " )} ' , allowing you to keep ' { message.hint( " continue " )} ' as is .
However , this setting may cause pwndbg or gdb . execute to behave asynchronously / unpredictably .
"""
)
)
import os
os . _exit ( 1 )
def wrap_safe_event_handler ( event_handler : Callable [ P , T ] , event_type : Any ) - > Callable [ P , T ] :
"""
Wraps an event handler to ensure it is only executed when the event is safe .
Invalid events are queued and executed later when safe .
@ -102,31 +173,54 @@ def wrap_safe_event_handler(event_handler: Callable[P, T]) -> Callable[P, T]:
Note : Avoid using ` gdb . post_event ` because of another bug in gdbserver
where the ` gdb . newest_frame ` function may not work properly .
Workaround to fix bug in gdbserver : https : / / github . com / pwndbg / pwndbg / issues / 2576
Workaround to fix bug in gdbserver ( gdb . events . new_objfile ) : https : / / github . com / pwndbg / pwndbg / issues / 2576
Workaround to fix bug in gdb ( gdb . events . stop ) : https : / / github . com / pwndbg / pwndbg / issues / 425
"""
queued_invalid_events : Deque [ Callable [ . . . , Any ] ] = deque ( )
def _loop_until_thread_ok ( ) :
if not queued_invalid_events :
global queued_events
if not queued_events :
return
if not _is_safe_event_thread ( ) :
gdb . post_event ( _loop_until_thread_ok )
return
while queued_ invalid_ events:
queued_ invalid_ events. popleft ( ) ( )
while queued_ events:
queued_ events. popleft ( ) ( )
@wraps ( event_handler )
def _inner_handler ( * a : P . args , * * kw : P . kwargs ) :
if _is_safe_event_packet ( ) :
while queued_invalid_events :
queued_invalid_events . popleft ( ) ( )
global queued_events , executing_event
if event_type == gdb . events . start :
# SKIP our custom event from this wrapper...
event_handler ( * a , * * kw )
return
queued_invalid_events . append ( lambda : event_handler ( * a , * * kw ) )
gdb . post_event ( _loop_until_thread_ok )
_detect_deadlock ( )
_update_start_event_state ( event_type )
queued_events . append ( lambda : event_handler ( * a , * * kw ) )
if event_type == gdb . events . new_objfile and not _is_safe_event_packet ( ) :
# Workaround to issue with gdbserver - Remote 'g' packet reply is too long
# https://github.com/pwndbg/pwndbg/issues/2576
gdb . post_event ( _loop_until_thread_ok )
return
elif event_type == gdb . events . stop :
# Workaround to issue with gdb `commands \n continue \n end` - Selected thread is running
# https://github.com/pwndbg/pwndbg/issues/425
if gdb_workaround_stop_event == 1 :
gdb . post_event ( _loop_until_thread_ok )
return
executing_event = True
gdb . execute ( " " , to_string = True ) # Trigger bug in gdb, it is like 'yield'
executing_event = False
while queued_events :
queued_events . popleft ( ) ( )
return _inner_handler
@ -200,9 +294,7 @@ def connect(
registered [ event_handler ] . setdefault ( priority , [ ] ) . append ( caller )
if event_handler not in connected :
handle = partial ( invoke_event , event_handler )
if event_handler == gdb . events . new_objfile :
handle = wrap_safe_event_handler ( handle )
handle = wrap_safe_event_handler ( handle , event_handler )
event_handler . connect ( handle )
connected [ event_handler ] = handle
@ -279,18 +371,3 @@ def after_reload(start: bool = True) -> None:
def on_reload ( ) - > None :
for functions in registered . values ( ) :
functions . clear ( )
@new_objfile
def _start_newobjfile ( ) - > None :
gdb . events . start . on_new_objfile ( )
@exit
def _start_exit ( ) - > None :
gdb . events . start . on_exited ( )
@stop
def _start_stop ( ) - > None :
gdb . events . start . on_stop ( )