Fix handling of O_NONBLOCK in IODriverPlainText (#3443)

pull/3459/head
Matt. 7 days ago committed by GitHub
parent 0daaf1889a
commit a3721aecaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -2,6 +2,7 @@ from __future__ import annotations
import bisect
import collections
import enum
import os
import random
import re
@ -1699,7 +1700,11 @@ class LLDBProcess(pwndbg.dbg_mod.Process):
_struct: lldb.SBStructuredData,
_internal,
) -> bool:
return stop_handler(sp)
try:
self.dbg.lldb_python_state_callback(LLDBPythonState.LLDB_STOP_HANDLER)
return stop_handler(sp)
finally:
self.dbg.lldb_python_state_callback(LLDBPythonState.PWNDBG)
sys.modules[self.dbg.module].__dict__[stop_handler_name] = handler
@ -1842,6 +1847,35 @@ class LLDBCommand(pwndbg.dbg_mod.CommandHandle):
self.command_name = command_name
class LLDBPythonState(enum.Enum):
"""
State of LLDB Python execution.
Unlike in pwndbg-gdb, in pwndbg-lldb the responsibility of driving execution
of Python code forward is shared between Pwndbg and LLDB. Knowing which one
is in charge is crucial to the correct functioning of the Pwndbg REPL.
This class defines the different kinds of states we can be in.
"""
PWNDBG = 1
"Pwndbg is driving execution of Python code"
LLDB_COMMAND_HANDLER = 0
"Python code is executing from inside an LLDB command handler"
LLDB_STOP_HANDLER = 2
"Python code is executing from an LLDB breakpoint/watchpoint hook handler"
def _default_lldb_python_state_callback(_state: LLDBPythonState) -> None:
"""
Before being set by the Pwndbg REPL, LLDB instances need a sensible default
value for lldb_python_state_callback. This is it.
"""
raise RuntimeError("Pwndbg REPL failed to set lldb_python_state_callback")
class LLDB(pwndbg.dbg_mod.Debugger):
exec_states: List[lldb.SBExecutionState]
@ -1867,8 +1901,8 @@ class LLDB(pwndbg.dbg_mod.Debugger):
# Relay used for exceptions originating from commands called through LLDB.
_exception_relay: BaseException | None
in_lldb_command_handler: bool
"Whether an LLDB command handler is currently running"
lldb_python_state_callback: Callable[[LLDBPythonState], None]
"Callback to the REPL, used to notify it of LLDB driving Python code"
# temporarily suspend context output
should_suspend_ctx: bool
@ -1884,7 +1918,7 @@ class LLDB(pwndbg.dbg_mod.Debugger):
self.controllers = []
self._current_process_is_gdb_remote = False
self._exception_relay = None
self.in_lldb_command_handler = False
self.lldb_python_state_callback = _default_lldb_python_state_callback
self.should_suspend_ctx = False
import pwndbg
@ -1959,12 +1993,12 @@ class LLDB(pwndbg.dbg_mod.Debugger):
def __call__(self, _, command, exe_context, result):
try:
debugger.exec_states.append(exe_context)
debugger.in_lldb_command_handler = True
debugger.lldb_python_state_callback(LLDBPythonState.LLDB_COMMAND_HANDLER)
handler(debugger, command, True)
except BaseException as e:
debugger._exception_relay = e
finally:
debugger.in_lldb_command_handler = False
debugger.lldb_python_state_callback(LLDBPythonState.PWNDBG)
assert (
debugger.exec_states.pop() == exe_context
), "Execution state mismatch on command handler"

@ -66,6 +66,7 @@ from pwndbg.color import message
from pwndbg.dbg import EventType
from pwndbg.dbg.lldb import LLDB
from pwndbg.dbg.lldb import LLDBProcess
from pwndbg.dbg.lldb import LLDBPythonState
from pwndbg.dbg.lldb import OneShotAwaitable
from pwndbg.dbg.lldb.pset import InvalidParse
from pwndbg.dbg.lldb.pset import pget
@ -332,99 +333,116 @@ def run(
# This is the driver we're going to be using to handle the process.
relay = EventRelay(dbg)
driver = ProcessDriver(debug=debug, event_handler=relay)
# Set ourselves up to respond to SIGINT by interrupting the process if it is
# running, and doing nothing otherwise.
def handle_sigint(_sig, _frame):
driver.interrupt(in_lldb_command_handler=dbg.in_lldb_command_handler)
signal.signal(signal.SIGINT, handle_sigint)
show_greeting()
last_command = ""
coroutine = controller(PwndbgController(), *args)
last_result: Any = None
last_exc: Exception | None = None
while True:
try:
# Execute the prompt hook.
dbg._fire_prompt_hook()
with ProcessDriver(debug=debug, event_handler=relay) as driver:
# Set ourselves up to respond to changes in Python execution context.
curr_state = LLDBPythonState.PWNDBG
in_lldb_command_handler = False
def handle_lldb_python_state_change(new_state: LLDBPythonState) -> None:
nonlocal curr_state
nonlocal in_lldb_command_handler
match (curr_state, new_state):
case (LLDBPythonState.PWNDBG, LLDBPythonState.LLDB_STOP_HANDLER):
driver.pause_io_if_running()
case (LLDBPythonState.PWNDBG, LLDBPythonState.LLDB_COMMAND_HANDLER):
driver.pause_io_if_running()
case (_, LLDBPythonState.PWNDBG):
driver.resume_io_if_running()
curr_state = new_state
dbg.lldb_python_state_callback = handle_lldb_python_state_change
# Set ourselves up to respond to SIGINT by interrupting the process if it is
# running, and doing nothing otherwise.
def handle_sigint(_sig, _frame):
driver.interrupt(in_lldb_command_handler=in_lldb_command_handler)
signal.signal(signal.SIGINT, handle_sigint)
show_greeting()
last_command = ""
coroutine = controller(PwndbgController(), *args)
last_result: Any = None
last_exc: Exception | None = None
while True:
try:
if last_exc is not None:
action = coroutine.throw(last_exc)
else:
action = coroutine.send(last_result)
except StopIteration:
# Nothing else for us to do.
break
except asyncio.CancelledError:
# We requested a cancellation that wasn't overwritten.
break
finally:
last_exc = None
last_result = None
if isinstance(action, YieldInteractive):
if debug:
print("[-] REPL: Prompt next command from user interactively")
# Execute the prompt hook.
dbg._fire_prompt_hook()
try:
if HAS_FZF:
try:
line = session.prompt(message=PROMPT)
except KeyboardInterrupt:
continue
if last_exc is not None:
action = coroutine.throw(last_exc)
else:
line = input(PROMPT)
# If the input is empty (i.e., 'Enter'), use the previous command
if line:
last_command = line
else:
line = last_command
except EOFError:
# Exit the REPL if there's nothing else to run.
last_exc = asyncio.CancelledError()
continue
if not exec_repl_command(line, sys.stdout, dbg, driver, relay):
last_exc = asyncio.CancelledError()
continue
elif isinstance(action, YieldExecDirect):
if debug:
print(
f"[-] REPL: Executing command '{action._command}' {'with' if action._capture else 'without'} output capture"
)
action = coroutine.send(last_result)
except StopIteration:
# Nothing else for us to do.
break
except asyncio.CancelledError:
# We requested a cancellation that wasn't overwritten.
break
finally:
last_exc = None
last_result = None
if isinstance(action, YieldInteractive):
if debug:
print("[-] REPL: Prompt next command from user interactively")
try:
if HAS_FZF:
try:
line = session.prompt(message=PROMPT)
except KeyboardInterrupt:
continue
else:
line = input(PROMPT)
# If the input is empty (i.e., 'Enter'), use the previous command
if line:
last_command = line
else:
line = last_command
except EOFError:
# Exit the REPL if there's nothing else to run.
last_exc = asyncio.CancelledError()
continue
if not exec_repl_command(line, sys.stdout, dbg, driver, relay):
last_exc = asyncio.CancelledError()
continue
elif isinstance(action, YieldExecDirect):
if debug:
print(
f"[-] REPL: Executing command '{action._command}' {'with' if action._capture else 'without'} output capture"
)
last_command = action._command
last_command = action._command
if not action._prompt_silent:
print(f"{PROMPT.value if HAS_FZF else PROMPT}{action._command}")
if not action._prompt_silent:
print(f"{PROMPT.value if HAS_FZF else PROMPT}{action._command}")
try:
if action._capture:
with TextIOWrapper(BytesIO(), write_through=True) as output:
try:
if action._capture:
with TextIOWrapper(BytesIO(), write_through=True) as output:
should_continue = exec_repl_command(
action._command, output, dbg, driver, relay
)
last_result = output.buffer.getvalue()
else:
should_continue = exec_repl_command(
action._command, output, dbg, driver, relay
action._command, sys.stdout, dbg, driver, relay
)
last_result = output.buffer.getvalue()
else:
should_continue = exec_repl_command(
action._command, sys.stdout, dbg, driver, relay
)
except BaseException as e:
last_exc = e
continue
if not should_continue:
last_exc = asyncio.CancelledError()
continue
except UserCancelledError as e:
last_exc = e
except BaseException as e:
last_exc = e
continue
if not should_continue:
last_exc = asyncio.CancelledError()
continue
except UserCancelledError as e:
last_exc = e
def exec_repl_command(

@ -172,6 +172,12 @@ class IODriver:
"""
raise NotImplementedError()
def close(self) -> None:
"""
Terminate this driver and release all resources associated with it.
"""
raise NotImplementedError()
def get_io_driver() -> IODriver:
"""
@ -197,6 +203,12 @@ class IODriverPlainText(IODriver):
in_thr: threading.Thread
out_thr: threading.Thread
stop_requested: threading.Event
stop_fulfilled: threading.Semaphore
start_requested: threading.Semaphore
_closed: threading.Event
_running: bool
_stdout_nonblock_failed: bool
_stderr_nonblock_failed: bool
process: lldb.SBProcess
@ -204,59 +216,104 @@ class IODriverPlainText(IODriver):
self.likely_output = threading.BoundedSemaphore(1)
self.process = None
self.stop_requested = threading.Event()
self.start_requested = threading.BoundedSemaphore(2)
self.stop_fulfilled = threading.BoundedSemaphore(2)
self._closed = threading.Event()
self._running = False
self._stdout_nonblock_failed = False
self._stderr_nonblock_failed = False
assert self.start_requested.acquire()
assert self.start_requested.acquire()
assert self.stop_fulfilled.acquire()
assert self.stop_fulfilled.acquire()
self.in_thr = threading.Thread(target=self._handle_input)
self.out_thr = threading.Thread(target=self._handle_output)
self.in_thr.start()
self.out_thr.start()
@override
def stdio(self) -> Tuple[str | None, str | None, str | None]:
return None, None, None
def _handle_input(self):
while not self.stop_requested.is_set():
if SELECT_AVAILABLE:
select.select([sys.stdin], [], [], 0.2)
try:
data = sys.stdin.read()
self.process.PutSTDIN(data)
except (BlockingIOError, TypeError):
# We have to check for TypeError here too, as, even though you
# *can* set stdin into nonblocking mode, it doesn't handle it
# very gracefully.
#
# See https://github.com/python/cpython/issues/57531
# Ignore blocking errors, but wait for a little bit before
# trying again if we don't have select().
if not SELECT_AVAILABLE:
time.sleep(0.1)
while not self._closed.is_set():
if not self.start_requested.acquire(blocking=True, timeout=1):
continue
while not self.stop_requested.is_set():
if SELECT_AVAILABLE:
select.select([sys.stdin], [], [], 0.2)
try:
data = sys.stdin.read()
self.process.PutSTDIN(data)
except (BlockingIOError, TypeError):
# We have to check for TypeError here too, as, even though you
# *can* set stdin into nonblocking mode, it doesn't handle it
# very gracefully.
#
# See https://github.com/python/cpython/issues/57531
# Ignore blocking errors, but wait for a little bit before
# trying again if we don't have select().
if not SELECT_AVAILABLE:
time.sleep(0.1)
self.stop_fulfilled.release()
def _handle_output(self):
while not self.stop_requested.is_set():
# Try to acquire the semaphore. This will not succeed until the next
# process output event is received by the event loop.
self.likely_output.acquire(timeout=0.2)
# Don't actually stop ourselves, even if we can't acquire the
# semaphore. LLDB can be a little lazy with the standard output
# events, so we use the semaphore as way to respond much faster to
# output than we otherwise would, but, even if we don't get an
# event, we should still read the output, albeit at a slower pace.
# Copy everything out to standard outputs.
while True:
stdout = self.process.GetSTDOUT(1024)
stderr = self.process.GetSTDERR(1024)
if len(stdout) == 0 and len(stderr) == 0:
break
print(stdout, file=sys.stdout, end="")
print(stderr, file=sys.stderr, end="")
sys.stdout.flush()
sys.stderr.flush()
while not self._closed.is_set():
if not self.start_requested.acquire(blocking=True, timeout=1):
continue
while not self.stop_requested.is_set():
# Try to acquire the semaphore. This will not succeed until the next
# process output event is received by the event loop.
self.likely_output.acquire(blocking=True, timeout=0.2)
# Don't actually stop ourselves, even if we can't acquire the
# semaphore. LLDB can be a little lazy with the standard output
# events, so we use the semaphore as way to respond much faster to
# output than we otherwise would, but, even if we don't get an
# event, we should still read the output, albeit at a slower pace.
# Copy everything out to standard outputs.
stdout = b""
stderr = b""
while True:
stdout += self.process.GetSTDOUT(1024).encode(sys.stdout.encoding)
stderr += self.process.GetSTDERR(1024).encode(sys.stderr.encoding)
if len(stdout) == 0 and len(stderr) == 0:
# Note that, even if we have pulled nothing new from LLDB,
# we still only exit the loop once we manage to push out
# both buffers in their entirety.
#
# This is consistent with the behavior of blocking on STDOUT
# and STDERR that we want, even if the underlying files are
# actually non-blocking.
break
# Crutially, we don't release the semaphore here. Releasing is the
# job of the on_output_event function.
try:
stdout = stdout[sys.stdout.buffer.write(stdout) :]
sys.stdout.buffer.flush()
except BlockingIOError as e:
# STDOUT is nonblocking at this point, and so writes may
# fail. We trim off however much we have managed to write
# from the buffer, and try again in the next iteration.
stdout = stdout[e.characters_written :]
try:
stderr = stderr[sys.stderr.buffer.write(stderr) :]
sys.stderr.buffer.flush()
except BlockingIOError as e:
# Same goes for STDERR as goes for STDOUT.
stderr = stderr[e.characters_written :]
# Crucially, we don't release the semaphore here. Releasing is the
# job of the on_output_event function.
self.stop_fulfilled.release()
@override
def on_output_event(self) -> None:
@ -283,20 +340,83 @@ class IODriverPlainText(IODriver):
self.process = process
self.stop_requested.clear()
os.set_blocking(sys.stdin.fileno(), False)
self.in_thr = threading.Thread(target=self._handle_input)
self.out_thr = threading.Thread(target=self._handle_output)
self.in_thr.start()
self.out_thr.start()
# Nonblocking output is NOT what we want, but in UNIX systems O_NONBLOCK
# is set in the context of the so-called "open file description"[1][2],
# rather than in the context of the file descriptor itself. So, these
# systems will helpfully - and silently, of course - propagate a change
# in blocking policy to all file descriptors that share the same open
# file description - such as ones created through F_DUPFD or dup(2).
#
# Since, in general, we can't know how STDIN, STDOUT and STDERR are
# related to each other ahead of time, and, more specifically, they
# often share the exact same open file description, we have to be able
# to gracefully handle the case in which setting O_NONBLOCK for STDIN
# will also necessarily set it for STDOUT and STDERR.
#
# The strategy this class elects to use, then, is to explicitly set all
# of them to the same blocking policy. While this doesn't solve the
# issue, it at least makes it so that it's not as surprising as it would
# be, otherwise. :)
#
# [1]: https://pubs.opengroup.org/onlinepubs/9799919799/
# [2]: https://linux.die.net/man/2/fcntl
try:
os.set_blocking(sys.stdout.fileno(), False)
except OSError:
# It's not guaranteed that sys.stdout is actually backed by a file,
# or that that file supports non-blocking operation. In fact, the
# Pwndbg CLI itself supports swapping out output the output streams
# as part of capturing command output.
#
# As such, we must also be able to gracefully handle this case.
self._stdout_nonblock_failed = True
try:
os.set_blocking(sys.stderr.fileno(), False)
except OSError:
# Same as above.
self._stderr_nonblock_failed = True
self.start_requested.release(2)
self._running = True
@override
def stop(self) -> None:
# Politely ask for the I/O processors to stop, and wait until they have
# stopped on their own terms.
assert self._running, "Tried to stop an IODriverPlainText that is not running"
self.stop_requested.set()
self.in_thr.join()
self.out_thr.join()
self.stop_fulfilled.acquire(blocking=True)
self.stop_fulfilled.acquire(blocking=True)
os.set_blocking(sys.stdin.fileno(), True)
# See start()
try:
os.set_blocking(sys.stdout.fileno(), True)
except OSError:
if not self._stdout_nonblock_failed:
raise
try:
os.set_blocking(sys.stderr.fileno(), True)
except OSError:
if not self._stderr_nonblock_failed:
raise
self._stdout_nonblock_failed = False
self._stderr_nonblock_failed = False
self.process = None
self._running = False
@override
def close(self) -> None:
if self._running:
self.stop()
self._closed.set()
self.in_thr.join()
self.out_thr.join()
def make_pty() -> Tuple[str, int] | None:
@ -361,6 +481,8 @@ class IODriverPseudoTerminal(IODriver):
io_thread: threading.Thread
process: lldb.SBProcess
termcontrol: OpportunisticTerminalControl
_stdout_nonblock_failed: bool
_stderr_nonblock_failed: bool
has_terminal_control: bool
@ -412,6 +534,9 @@ class IODriverPseudoTerminal(IODriver):
self.input_buffer = b""
self.process = None
self._stdout_nonblock_failed = False
self._stderr_nonblock_failed = False
@override
def stdio(self) -> Tuple[str | None, str | None, str | None]:
return self.worker, self.worker, self.worker
@ -440,8 +565,14 @@ class IODriverPseudoTerminal(IODriver):
data = os.read(self.manager, 1024)
if len(data) == 0:
break
print(data.decode("utf-8"), end="")
sys.stdout.flush()
while len(data) > 0:
try:
data = data[sys.stdout.buffer.write(data) :]
sys.stdout.buffer.flush()
except BlockingIOError as e:
data = data[e.characters_written :]
except IOError:
pass
@ -453,6 +584,17 @@ class IODriverPseudoTerminal(IODriver):
self.stop_requested.clear()
os.set_blocking(sys.stdin.fileno(), False)
# Same reasoning as IODriverPlainText applies here.
try:
os.set_blocking(sys.stdout.fileno(), False)
except OSError:
self._stdout_nonblock_failed = True
try:
os.set_blocking(sys.stderr.fileno(), False)
except OSError:
self._stderr_nonblock_failed = True
self.was_line_buffering = self.termcontrol.get_line_buffering()
self.was_echoing = self.termcontrol.get_echo()
@ -470,9 +612,25 @@ class IODriverPseudoTerminal(IODriver):
self.io_thread.join()
os.set_blocking(sys.stdin.fileno(), True)
# Same reasoning as IODriverPlainText applies here.
try:
os.set_blocking(sys.stdout.fileno(), True)
except OSError:
if not self._stdout_nonblock_failed:
raise
try:
os.set_blocking(sys.stderr.fileno(), True)
except OSError:
if not self._stderr_nonblock_failed:
raise
self.termcontrol.set_line_buffering(self.was_line_buffering)
self.termcontrol.set_echo(self.was_echoing)
self._stdout_nonblock_failed = False
self._stderr_nonblock_failed = False
self.process = None
@override
@ -489,3 +647,7 @@ class IODriverPseudoTerminal(IODriver):
#
# TODO: Replace controlling PTY of the process once it is set up.
pass
@override
def close(self) -> None:
pass

@ -1,6 +1,7 @@
from __future__ import annotations
import contextlib
import enum
import sys
from asyncio import CancelledError
from typing import Any
@ -171,10 +172,27 @@ def _updates_scope_counter(target: str) -> Callable[[Callable[..., Any]], Any]:
return sub0
class _IODriverState(enum.Enum):
STOPPED = 0
"The IODriver is not running, and must be entirely restarted"
RUNNING = 1
"The IODriver is running"
PAUSED = 2
"The IODriver is not running, but is alive and can be resumed"
class ProcessDriver:
"""
Drives the execution of a process, responding to its events and handling its
I/O, and exposes a simple synchronous interface to the REPL interface.
# IODriver State Machine
Because LLDB can make Python code from Pwndbg execute while an I/O driver is
active, and having the I/O driver active while Pwndbg is running leads to
all sorts of fun failure modes, we want to be able to pause it temporarily.
We, thus, use the states described in _IODriverState to keep track of what
operations may be performed on the current IODriver.
"""
io: IODriver
@ -195,6 +213,9 @@ class ProcessDriver:
_in_run_coroutine: int
"Nested scope counter for run_coroutine"
_io_driver_state: _IODriverState
"Whether the I/O driver is currently running"
def __init__(self, event_handler: EventHandler, debug=False):
self.io = None
self.process = None
@ -205,20 +226,31 @@ class ProcessDriver:
self._pending_cancellation = False
self._in_run_until_next_stop = 0
self._in_run_coroutine = 0
self._io_driver_state = _IODriverState.STOPPED
def __enter__(self) -> ProcessDriver:
return self
def __exit__(self, _exc_type, _exc_val, _exc_tb) -> None:
if self.io is not None:
self.io.close()
def debug_print(self, *args, **kwargs) -> None:
if self.debug:
try:
print("[*] ProcessDriver: ", end="")
print(*args, **kwargs)
except BlockingIOError as e:
from io import StringIO
with StringIO() as out:
print("[*] ProcessDriver: ", end="", file=out)
print(*args, file=out, **kwargs)
message = out.getvalue().encode(sys.stdout.encoding)
while len(message) > 0:
try:
# Try to inform the user of the error.
print(
f"[-] ProcessDriver: Error after printing {e.characters_written} characters in the previous debug message. Information may be missing."
)
except BlockingIOError:
pass
message = message[sys.stdout.buffer.write(message) :]
sys.stdout.buffer.flush()
except BlockingIOError as e:
message = message[e.characters_written :]
def has_process(self) -> bool:
"""
@ -296,6 +328,63 @@ class ProcessDriver:
# Perform the called-provided action.
interrupt()
def _start_io_driver(self):
"""
Starts the IODriver handling I/O from the process.
"""
assert self._io_driver_state == _IODriverState.STOPPED
self.io.start(process=self.process)
self.debug_print(f"{self._io_driver_state} -> {_IODriverState.RUNNING}")
self._io_driver_state = _IODriverState.RUNNING
def _stop_io_driver(self):
"""
Stops the IODriver handling I/O from the process.
"""
assert (
self._io_driver_state == _IODriverState.PAUSED
or self._io_driver_state == _IODriverState.RUNNING
)
if self._io_driver_state == _IODriverState.PAUSED:
# Not invalid, but currently a NOP. See pause_io_if_running()
self._io_driver_state = _IODriverState.STOPPED
return
self.io.stop()
self.debug_print(f"{self._io_driver_state} -> {_IODriverState.STOPPED}")
self._io_driver_state = _IODriverState.STOPPED
def pause_io_if_running(self) -> None:
"""
Pauses the handling of process I/O if it is currently running.
"""
if self._io_driver_state != _IODriverState.RUNNING:
return
# Currently, pausing and stopping both stop the IODriver. Nonetheless,
# these operations must stay separate in the state machine, as they are
# meaningfully distinct in other ways.
self.io.stop()
self.debug_print(f"{self._io_driver_state} -> {_IODriverState.PAUSED}")
self._io_driver_state = _IODriverState.PAUSED
def resume_io_if_running(self) -> None:
"""
Resumes the handling of process I/O if it is currently running.
"""
if self._io_driver_state != _IODriverState.PAUSED:
return
assert self.process is not None
self.io.start(process=self.process)
self.debug_print(f"{self._io_driver_state} -> {_IODriverState.RUNNING}")
self._io_driver_state = _IODriverState.RUNNING
@_updates_scope_counter(target="_in_run_until_next_stop")
def _run_until_next_stop(
self,
@ -319,113 +408,126 @@ class ProcessDriver:
# If `only_if_started` is set, we defer the starting of the I/O driver
# to the moment the start event is observed. Otherwise, we just start it
# immediately.
io_started = False
if with_io and not only_if_started:
self.io.start(process=self.process)
io_started = True
# Pick the first timeout value.
timeout_time = first_timeout
# If `only_if_started` is not set, assume the process must have been
# started by a previous action and is running.
running = not only_if_started
result = None
last_event = None
while True:
event = lldb.SBEvent()
if not self.listener.WaitForEvent(timeout_time, event):
self.debug_print(
f"Timed out after {timeout_time}s",
)
timeout_time = timeout
# If the process isn't running, we should stop.
if not running:
try:
if with_io and not only_if_started:
self._start_io_driver()
# Pick the first timeout value.
timeout_time = first_timeout
# If `only_if_started` is not set, assume the process must have been
# started by a previous action and is running.
running = not only_if_started
result = None
last_event = None
delay_until_io_stopped: List[Callable[[], None]] = []
while True:
event = lldb.SBEvent()
if not self.listener.WaitForEvent(timeout_time, event):
self.debug_print(
"Waited too long for process to start running, giving up",
f"Timed out after {timeout_time}s",
)
result = _PollResultTimedOut(last_event)
break
continue
last_event = event
timeout_time = timeout
if self.debug:
descr = lldb.SBStream()
if event.GetDescription(descr):
self.debug_print(descr.GetData())
else:
self.debug_print(f"No description for {event}")
if lldb.SBTarget.EventIsTargetEvent(event):
if event.GetType() == lldb.SBTarget.eBroadcastBitModulesLoaded:
# Notify the event handler that new modules got loaded in.
if fire_events:
self.eh.modules_loaded()
elif lldb.SBProcess.EventIsProcessEvent(event):
if (
event.GetType() == lldb.SBProcess.eBroadcastBitSTDOUT
or event.GetType() == lldb.SBProcess.eBroadcastBitSTDERR
):
# Notify the I/O driver that the process might have something
# new for it to consume.
self.io.on_output_event()
elif event.GetType() == lldb.SBProcess.eBroadcastBitStateChanged:
# The state of the process has changed.
new_state = lldb.SBProcess.GetStateFromEvent(event)
was_resumed = lldb.SBProcess.GetRestartedFromEvent(event)
if new_state == lldb.eStateStopped and not was_resumed:
# The process has stopped, so we're done processing events
# for the time being. Trigger the stopped event and return.
if fire_events:
self.eh.suspended(event)
result = _PollResultStopped(event)
# If the process isn't running, we should stop.
if not running:
self.debug_print(
"Waited too long for process to start running, giving up",
)
result = _PollResultTimedOut(last_event)
break
if new_state == lldb.eStateRunning or new_state == lldb.eStateStepping:
running = True
# Start the I/O driver here if its start got deferred
# because of `only_if_started` being set.
if only_if_started and with_io:
self.io.start(process=self.process)
io_started = True
continue
last_event = event
if (
new_state == lldb.eStateExited
or new_state == lldb.eStateCrashed
or new_state == lldb.eStateDetached
):
# Nothing else for us to do here. Clear our internal
# references to the process, fire the exit event, and leave.
self.debug_print(
f"Process exited with state {new_state}",
)
self.process = None
self.listener = None
if self.debug:
descr = lldb.SBStream()
if event.GetDescription(descr):
self.debug_print(descr.GetData())
else:
self.debug_print(f"No description for {event}")
if lldb.SBTarget.EventIsTargetEvent(event):
if event.GetType() == lldb.SBTarget.eBroadcastBitModulesLoaded:
# Notify the event handler that new modules got loaded in.
if fire_events:
self.eh.exited()
self.eh.modules_loaded()
if new_state == lldb.eStateExited:
proc = lldb.SBProcess.GetProcessFromEvent(event)
desc = (
"" if not proc.exit_description else f" ({proc.exit_description})"
elif lldb.SBProcess.EventIsProcessEvent(event):
if (
event.GetType() == lldb.SBProcess.eBroadcastBitSTDOUT
or event.GetType() == lldb.SBProcess.eBroadcastBitSTDERR
):
# Notify the I/O driver that the process might have something
# new for it to consume.
self.io.on_output_event()
elif event.GetType() == lldb.SBProcess.eBroadcastBitStateChanged:
# The state of the process has changed.
new_state = lldb.SBProcess.GetStateFromEvent(event)
was_resumed = lldb.SBProcess.GetRestartedFromEvent(event)
if new_state == lldb.eStateStopped and not was_resumed:
# The process has stopped, so we're done processing events
# for the time being. Trigger the stopped event and return.
if fire_events:
self.eh.suspended(event)
result = _PollResultStopped(event)
break
if new_state == lldb.eStateRunning or new_state == lldb.eStateStepping:
running = True
# Start the I/O driver here if its start got deferred
# because of `only_if_started` being set.
if only_if_started and with_io:
self._start_io_driver()
if (
new_state == lldb.eStateExited
or new_state == lldb.eStateCrashed
or new_state == lldb.eStateDetached
):
# Nothing else for us to do here. Clear our internal
# references to the process, fire the exit event, and leave.
self.debug_print(
f"Process exited with state {new_state}",
)
print_info(f"process exited with status {proc.exit_state}{desc}")
elif new_state == lldb.eStateCrashed:
print_info("process crashed")
elif new_state == lldb.eStateDetached:
print_info("process detached")
result = _PollResultExited(new_state)
break
self.process = None
self.listener = None
if fire_events:
self.eh.exited()
if new_state == lldb.eStateExited:
proc = lldb.SBProcess.GetProcessFromEvent(event)
desc = (
""
if not proc.exit_description
else f" ({proc.exit_description})"
)
delay_until_io_stopped.append(
lambda: print_info(
f"process exited with status {proc.exit_state}{desc}"
)
)
elif new_state == lldb.eStateCrashed:
delay_until_io_stopped.append(lambda: print_info("process crashed"))
elif new_state == lldb.eStateDetached:
delay_until_io_stopped.append(
lambda: print_info("process detached")
)
result = _PollResultExited(new_state)
break
finally:
if self._io_driver_state != _IODriverState.STOPPED:
self._stop_io_driver()
if isinstance(result, _PollResultExited):
self.io.close()
self.io = None
if io_started:
self.io.stop()
for fn in delay_until_io_stopped:
fn()
return result
@ -725,6 +827,7 @@ class ProcessDriver:
This function always uses a plain text IODriver, as there is no way to
guarantee any other driver will work.
"""
assert self.io is None
self.io = IODriverPlainText()
error = lldb.SBError()
@ -754,6 +857,7 @@ class ProcessDriver:
"""
Launch a process in the host system.
"""
assert self.io is None
self.io = io
error = lldb.SBError()
@ -779,6 +883,7 @@ class ProcessDriver:
if pid == 0:
return lldb.SBError("PID of 0 or no PID was given")
assert self.io is None
self.io = IODriverPlainText()
error = lldb.SBError()
@ -789,6 +894,7 @@ class ProcessDriver:
"""
Attatch to a process in the host system.
"""
assert self.io is None
self.io = IODriverPlainText()
error = lldb.SBError()
@ -868,6 +974,7 @@ class ProcessDriver:
assert self.listener.IsValid()
assert self.process.IsValid()
assert self.io is None
self.io = io
# It's not guaranteed that the process is actually alive, as it might be

Loading…
Cancel
Save