Add support for controlling execution to the Debugger-agnostic API (#2469)

* Add support for controlling execution to the Debugger-agnostic API

* Remove `empty_awaitable` in `pwndbg.dbg.gdb`

* Remove extra typing assignments
pull/2481/head
Matt. 1 year ago committed by GitHub
parent f945b417da
commit dcc8db7021
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,7 +7,9 @@ from __future__ import annotations
import contextlib
from enum import Enum
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import Coroutine
from typing import Generator
from typing import List
from typing import Literal
@ -266,6 +268,28 @@ class MemoryMap:
raise NotImplementedError()
class ExecutionController:
def single_step(self) -> Awaitable[None]:
"""
Steps to the next instruction.
Throws `CancelledError` if a breakpoint or watchpoint is hit, the program
exits, or if any other unexpected event that diverts execution happens
while fulfulling the step.
"""
raise NotImplementedError()
def cont(self, until: StopPoint) -> Awaitable[None]:
"""
Continues execution until the given breakpoint or whatchpoint is hit.
Throws `CancelledError` if a breakpoint or watchpoint is hit that is not
the one given in `until`, the program exits, or if any other unexpected
event happens.
"""
raise NotImplementedError()
class Process:
def threads(self) -> List[Thread]:
"""
@ -502,6 +526,15 @@ class Process:
"""
raise NotImplementedError()
def dispatch_execution_controller(
self, procedure: Callable[[ExecutionController], Coroutine[Any, Any, None]]
):
"""
Queues up the given execution controller-based coroutine for execution,
sometime between the calling of this function and the
"""
raise NotImplementedError()
class TypeCode(Enum):
"""

@ -3,6 +3,7 @@ from __future__ import annotations
import re
from contextlib import nullcontext
from typing import Any
from typing import Coroutine
from typing import Generator
from typing import List
from typing import Literal
@ -812,6 +813,42 @@ class GDBProcess(pwndbg.dbg_mod.Process):
out = gdb.execute("info dll", to_string=True)
return "No shared libraries loaded at this time." not in out
@override
def dispatch_execution_controller(
self, procedure: Callable[[pwndbg.dbg_mod.ExecutionController], Coroutine[Any, Any, None]]
):
# GDB isn't nearly as finnicky as LLDB when it comes to us controlling
# the execution of the inferior, so we can safely mostly ignore all of
# the async plumbing and drive the coroutine by just iterating over it.
#
# Aditionally, the Debugger-agnostic API allows us enough freedom in how
# we schedule execution of the controller that running it immediately is
# perfectly acceptable. So that's what we do.
coroutine = procedure(EXECUTION_CONTROLLER)
while True:
try:
# We don't need to bother communicating with the coroutine, as
# it doesn't yield anything we care about.
coroutine.send(None)
except StopIteration:
# We're done.
break
class GDBExecutionController(pwndbg.dbg_mod.ExecutionController):
@override
async def single_step(self):
gdb.execute("si")
@override
async def cont(self, until: pwndbg.dbg_mod.StopPoint):
gdb.execute("continue")
# Like in LLDB, we only need a single instance of the execution controller.
EXECUTION_CONTROLLER = GDBExecutionController()
class GDBCommand(gdb.Command):
def __init__(

@ -6,7 +6,9 @@ import os
import random
import sys
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import Coroutine
from typing import Dict
from typing import Generator
from typing import List
@ -586,6 +588,64 @@ class LLDBStopPoint(pwndbg.dbg_mod.StopPoint):
self.inner.SetEnabled(enabled)
class OneShotAwaitable:
"""
Used as part of the logic for the execution controller. This is an Awaitable
object that yields the value passed to its constructor exactly once.
"""
def __init__(self, value: Any):
self.value = value
def __await__(self) -> Generator[Any, Any, None]:
yield self.value
class YieldContinue:
"""
Continues execution of the process until the breakpoint or watchpoint given
in the constructor is hit or the operation is cancelled.
This class is part of the execution controller system, so it is intented to
be yielded by the async function with access to an execution controller, and
caught and hanlded by the event loop in the LLDB Pwndbg CLI.
"""
target: LLDBStopPoint
def __init__(self, target: LLDBStopPoint):
self.target = target
class YieldSingleStep:
"""
Moves execution of the process being debugged forward by one instruction.
This class is part of the execution controller system, so it is intented to
be yielded by the async function with access to an execution controller, and
caught and hanlded by the event loop in the LLDB Pwndbg CLI.
"""
pass
class LLDBExecutionController(pwndbg.dbg_mod.ExecutionController):
@override
def single_step(self) -> Awaitable[None]:
return OneShotAwaitable(YieldSingleStep())
@override
def cont(self, target: pwndbg.dbg_mod.StopPoint) -> Awaitable[None]:
assert isinstance(target, LLDBStopPoint)
return OneShotAwaitable(YieldContinue(target))
# Our execution controller doesn't need to change between uses, as all the state
# associated with it resides further up, in the Pwndbg CLI, so we can just share
# the same instance for all our uses.
EXECUTION_CONTROLLER = LLDBExecutionController()
class LLDBProcess(pwndbg.dbg_mod.Process):
# Whether this process is based on `ProcessGDBRemote` (AKA: the `gdb-remote`
# LLDB process plugin). This is used to selectively enable the functions
@ -1350,6 +1410,13 @@ class LLDBProcess(pwndbg.dbg_mod.Process):
# linked, same as GDB 13.2.
return self.target.GetNumModules() > 1
@override
def dispatch_execution_controller(
self, procedure: Callable[[pwndbg.dbg_mod.ExecutionController], Coroutine[Any, Any, None]]
):
# Queue the coroutine up for execution by the Pwndbg CLI.
self.dbg.controllers.append((self, procedure(EXECUTION_CONTROLLER)))
class LLDBCommand(pwndbg.dbg_mod.CommandHandle):
def __init__(self, handler_name: str, command_name: str):
@ -1371,10 +1438,16 @@ class LLDB(pwndbg.dbg_mod.Debugger):
# protocol. The REPL controls this field.
_current_process_is_gdb_remote: bool
# Queued up process control coroutines from the last Pwndbg command. We
# should run these in order as soon as the command is over, but before we
# return control to the user.
controllers: List[Tuple[LLDBProcess, Coroutine[Any, Any, None]]]
@override
def setup(self, *args, **kwargs):
self.exec_states = []
self.event_handlers = {}
self.controllers = []
self._current_process_is_gdb_remote = False
debugger = args[0]

@ -349,6 +349,16 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
else:
dbg.debugger.HandleCommand(line)
# At this point, the last command might've queued up some execution
# control procedures for us to chew on. Run them now.
for process, coroutine in dbg.controllers:
assert driver.has_process()
assert driver.process == process.process
driver.run_coroutine(coroutine)
dbg.controllers.clear()
def make_pty() -> Tuple[str, int]:
"""

@ -2,10 +2,16 @@ from __future__ import annotations
import os
import sys
from asyncio import CancelledError
from typing import Any
from typing import Coroutine
from typing import List
import lldb
import pwndbg
from pwndbg.dbg.lldb import YieldContinue
from pwndbg.dbg.lldb import YieldSingleStep
from pwndbg.dbg.lldb.repl.io import IODriver
@ -87,7 +93,7 @@ class ProcessDriver:
first_timeout: int = 1,
only_if_started: bool = False,
fire_events: bool = True,
):
) -> lldb.SBEvent | None:
"""
Runs the event loop of the process until the next stop event is hit, with
a configurable timeouts for the first and subsequent timeouts.
@ -114,6 +120,7 @@ class ProcessDriver:
# started by a previous action and is running.
running = not only_if_started
reason: lldb.SBEvent | None = None
while True:
event = lldb.SBEvent()
if not self.listener.WaitForEvent(timeout_time, event):
@ -162,6 +169,7 @@ class ProcessDriver:
# for the time being. Trigger the stopped event and return.
if fire_events:
self.eh.suspended()
reason = event
break
if new_state == lldb.eStateRunning or new_state == lldb.eStateStepping:
@ -186,12 +194,14 @@ class ProcessDriver:
if fire_events:
self.eh.exited()
reason = event
break
if io_started:
self.io.stop()
return reason
def cont(self) -> None:
"""
Continues execution of the process this object is driving, and returns
@ -245,6 +255,108 @@ class ProcessDriver:
self._run_until_next_stop()
def run_coroutine(self, coroutine: Coroutine[Any, Any, None]) -> bool:
"""
Runs the given coroutine and allows it to control the execution of the
process in this driver. Returns `True` if the coroutine ran to completion,
and `False` if it was cancelled.
"""
exception: Exception | None = False
while True:
try:
if exception is None:
step = coroutine.send(None)
else:
step = coroutine.throw(exception)
# The coroutine has caught the exception. Continue running
# it as if nothing happened.
exception = None
except StopIteration:
# We got to the end of the coroutine. We're done.
break
except CancelledError:
# We requested that the coroutine be cancelled, and it didn't
# override our decision. We're done.
break
if isinstance(step, YieldSingleStep):
# Pick the currently selected thread and step it forward by one
# instruction.
#
# LLDB lets us step any thread that we choose, so, maybe we
# should consider letting the caller pick which thread they want
# the step to happen in?
thread = self.process.GetSelectedThread()
assert thread is not None, "Tried to single step, but no thread is selected?"
e = lldb.SBError()
thread.StepInstruction(False, e)
if not e.success:
# The step failed. Raise an error in the coroutine and give
# it a chance to recover gracefully before we propagate it
# up to the caller.
exception = pwndbg.dbg_mod.Error(
f"Could not perform single step: {e.description}"
)
continue
self._run_until_next_stop()
elif isinstance(step, YieldContinue):
# Continue the process and wait for the next stop-like event.
self.process.Continue()
event = self._run_until_next_stop()
assert (
event is not None
), "None should only be returned by _run_until_next_stop unless start timeouts are enabled"
# Check whether this stop event is the one we expect.
stop: lldb.SBBreakpoint | lldb.SBWatchpoint = step.target.inner
if lldb.SBProcess.GetStateFromEvent(event) == lldb.eStateStopped:
matches = 0
for thread in lldb.SBProcess.GetProcessFromEvent(event).threads:
# We only check the stop reason, as the other methods
# for querying thread state (`IsStopped`, `IsSuspended`)
# are unreliable[1][2], and so we just assume that
# after a stop event, all the threads are stopped[3].
#
# [1]: https://github.com/llvm/llvm-project/issues/16196
# [2]: https://discourse.llvm.org/t/bug-28455-new-thread-state-not-in-sync-with-process-state/41699
# [3]: https://discourse.llvm.org/t/sbthread-isstopped-always-returns-false-on-linux/36944/5
bpwp_id = None
if thread.GetStopReason() == lldb.eStopReasonBreakpoint and isinstance(
stop, lldb.SBBreakpoint
):
bpwp_id = thread.GetStopReasonDataAtIndex(0)
elif thread.GetStopReason() == lldb.eStopReasonWatchpoint and isinstance(
stop, lldb.SBWatchpoint
):
bpwp_id = thread.GetStopReasonDataAtIndex(0)
if bpwp_id is not None and stop.GetID() == bpwp_id:
matches += 1
if matches > 0:
# At least one of the threads got stopped by our target.
# Return control back to the coroutine and await further
# instruction.
pass
else:
# Something else that we weren't expecting caused the
# process to stop. Request that the coroutine be
# cancelled.
exception = CancelledError()
else:
# The process might've crashed, been terminated, exited, or
# we might've lost connection to it for some other reason.
# Regardless, we should cancel the coroutine.
exception = CancelledError()
# Let the caller distinguish between a coroutine that's been run to
# completion and one that got cancelled.
return not isinstance(exception, CancelledError)
def launch(
self, target: lldb.SBTarget, io: IODriver, env: List[str], args: List[str], working_dir: str
) -> lldb.SBError:

Loading…
Cancel
Save