Add programatic controls to the LLDB Pwndbg CLI (#2785)

* Add programatic controls to the LLDB Pwndbg CLI

* Update pwndbg-lldb.py

Co-authored-by: patryk4815 <bux.patryk@gmail.com>

* Use `sys.stdout.buffer` directly, when sensible

* Use `.execute`, not `.execute_and_capture` in pwndbg-lldb

* Small fixes

---------

Co-authored-by: patryk4815 <bux.patryk@gmail.com>
pull/2798/head
Matt. 9 months ago committed by GitHub
parent 9cc021849a
commit 2f267c3f97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -105,12 +105,24 @@ if __name__ == "__main__":
if len(sys.argv) == 2: if len(sys.argv) == 2:
target = sys.argv[1] target = sys.argv[1]
from pwndbg.dbg.lldb.repl import PwndbgController
from pwndbg.dbg.lldb.repl import run as run_repl from pwndbg.dbg.lldb.repl import run as run_repl
if debug: if debug:
print("[-] Launcher: Entering Pwndbg CLI") print("[-] Launcher: Entering Pwndbg CLI")
run_repl([f"target create '{target}'"] if target else None, debug=debug) def drive(startup: List[str] | None):
async def drive(c: PwndbgController):
if startup is not None:
for line in startup:
await c.execute(line)
while True:
await c.interactive()
return drive
run_repl(drive([f"target create '{target}'"] if target else None), debug=debug)
# Dispose of our debugger and terminate LLDB. # Dispose of our debugger and terminate LLDB.
lldb.SBDebugger.Destroy(debugger) lldb.SBDebugger.Destroy(debugger)

@ -713,8 +713,8 @@ class OneShotAwaitable:
def __init__(self, value: Any): def __init__(self, value: Any):
self.value = value self.value = value
def __await__(self) -> Generator[Any, Any, None]: def __await__(self) -> Generator[Any, Any, Any]:
yield self.value return (yield self.value)
class YieldContinue: class YieldContinue:

@ -37,13 +37,19 @@ of the event system.
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import asyncio
import os import os
import re import re
import signal import signal
import sys import sys
import threading import threading
from contextlib import contextmanager from contextlib import contextmanager
from io import BytesIO
from typing import Any from typing import Any
from typing import Awaitable
from typing import BinaryIO
from typing import Callable
from typing import Coroutine
from typing import List from typing import List
from typing import Tuple from typing import Tuple
@ -55,6 +61,7 @@ import pwndbg.dbg.lldb
from pwndbg.color import message from pwndbg.color import message
from pwndbg.dbg import EventType from pwndbg.dbg import EventType
from pwndbg.dbg.lldb import LLDB from pwndbg.dbg.lldb import LLDB
from pwndbg.dbg.lldb import OneShotAwaitable
from pwndbg.dbg.lldb.pset import pset from pwndbg.dbg.lldb.pset import pset
from pwndbg.dbg.lldb.repl.io import IODriver from pwndbg.dbg.lldb.repl.io import IODriver
from pwndbg.dbg.lldb.repl.io import get_io_driver from pwndbg.dbg.lldb.repl.io import get_io_driver
@ -170,19 +177,77 @@ def show_greeting() -> None:
print(colored_tip) print(colored_tip)
class YieldExecDirect:
"""
Execute the given command directly, on behalf of the user.
"""
def __init__(self, command: str, capture: bool, prompt_silent: bool):
self._command = command
self._capture = capture
self._prompt_silent = prompt_silent
class YieldInteractive:
"""
Prompt the user for the next command.
"""
pass
class PwndbgController:
"""
Class providing interfaces for a client to control the behavior of Pwndbg
asynchronously.
"""
def interactive(self) -> Awaitable[None]:
"""
Runs a single interactive round, in which the user is prompted for a
command from standard input and `readline`, and whatever command they
type in is executed.
"""
return OneShotAwaitable(YieldInteractive())
def execute(self, command: str) -> Awaitable[None]:
"""
Runs the given command, and displays its output to the user.
# Interactivity
Some commands - such as `lldb` and `ipi` - start interactive prompts
when they are run, and issuing them through this command will not change
that behavior.
"""
return OneShotAwaitable(YieldExecDirect(command, False, False))
def execute_and_capture(self, command: str) -> Awaitable[bytes]:
"""
Runs the given command, and captures its output as a byte string.
# Interactivity
Same caveats apply as in `execute`.
# Reliabily of Capture
Some Pwndbg commands currently do not have their outputs captured, even
when run through this command. It is expected that this will be improved
in the future, but, as as general rule, clients should not rely on the
output of the command being available.
"""
return OneShotAwaitable(YieldExecDirect(command, True, False))
@wrap_with_history @wrap_with_history
def run(startup: List[str] | None = None, debug: bool = False) -> None: def run(
controller: Callable[[PwndbgController], Coroutine[Any, Any, None]], debug: bool = False
) -> None:
""" """
Runs the Pwndbg REPL under LLDB. Optionally enters the commands given in Runs the Pwndbg CLI through the given asynchronous controller.
`startup` as part of the startup process.
""" """
assert isinstance(pwndbg.dbg, LLDB) assert isinstance(pwndbg.dbg, LLDB)
dbg: LLDB = pwndbg.dbg dbg: LLDB = pwndbg.dbg
startup = startup if startup else []
startup_i = 0
enable_readline(dbg) enable_readline(dbg)
# We're gonna be dealing with process events ourselves, so we'll want to run # We're gonna be dealing with process events ourselves, so we'll want to run
@ -205,16 +270,34 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
show_greeting() show_greeting()
last_command = "" last_command = ""
coroutine = controller(PwndbgController())
last_result: Any = None
last_exc: Exception | None = None
while True: while True:
# Execute the prompt hook and ask for input. # Execute the prompt hook.
dbg._fire_prompt_hook() dbg._fire_prompt_hook()
try: try:
if startup_i < len(startup): if last_exc is not None:
print(PROMPT, end="") coroutine.throw(last_exc)
line = startup[startup_i]
print(line)
startup_i += 1
else: else:
action = coroutine.send(last_result)
except StopIteration:
# Nothing else for us to do.
break
except asyncio.CancelledError:
# We requested a cancellation that wasn't overwritten.
break
finally:
last_exc = None
last_result = None
if isinstance(action, YieldInteractive):
if debug:
print("[-] REPL: Prompt next command from user interactively")
try:
line = input(PROMPT) line = input(PROMPT)
# If the input is empty (i.e., 'Enter'), use the previous command # If the input is empty (i.e., 'Enter'), use the previous command
if line: if line:
@ -223,12 +306,53 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
line = last_command line = last_command
except EOFError: except EOFError:
# Exit the REPL if there's nothing else to run. # Exit the REPL if there's nothing else to run.
print() last_exc = asyncio.CancelledError()
break continue
if not exec_repl_command(line, sys.stdout.buffer, dbg, driver, relay):
last_exc = asyncio.CancelledError()
continue
elif isinstance(action, YieldExecDirect):
if debug:
print(
f"[-] REPL: Executing command '{action._command}' {'with' if action._capture else 'without'} output capture"
)
last_command = action._command
if not action._prompt_silent:
print(f"{PROMPT}{action._command}")
if action._capture:
with BytesIO() as output:
should_continue = exec_repl_command(action._command, output, dbg, driver, relay)
last_result = output.getvalue()
else:
should_continue = exec_repl_command(
action._command, sys.stdout.buffer, dbg, driver, relay
)
if not should_continue:
last_exc = asyncio.CancelledError()
continue
def exec_repl_command(
line: str,
lldb_out_target: BinaryIO,
dbg: LLDB,
driver: ProcessDriver,
relay: EventRelay,
) -> bool:
"""
Parses and runs the given command, returning whether the event loop should continue.
"""
bits = lex_args(line) bits = lex_args(line)
if len(line) == 0: if len(line) == 0:
continue return True
# Let the user get an LLDB prompt if they so desire. # Let the user get an LLDB prompt if they so desire.
if bits[0] == "lldb": if bits[0] == "lldb":
@ -240,14 +364,14 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
dbg.debugger.RunCommandInterpreter( dbg.debugger.RunCommandInterpreter(
True, False, lldb.SBCommandInterpreterRunOptions(), 0, False, False True, False, lldb.SBCommandInterpreterRunOptions(), 0, False, False
) )
continue return True
# There are interactive commands that `SBDebugger.HandleCommand` will # There are interactive commands that `SBDebugger.HandleCommand` will
# silently ignore. We have to implement them manually, here. # silently ignore. We have to implement them manually, here.
if "quit".startswith(line): if "quit".startswith(line) and line.startswith("quit"):
break return False
if line == "exit": if "exit".startswith(line) and line.startswith("exit"):
break return False
# `script` is a little weird. Unlike with the other commands we're # `script` is a little weird. Unlike with the other commands we're
# emulating, we actually need LLDB to spawn it for it to make sense # emulating, we actually need LLDB to spawn it for it to make sense
@ -297,7 +421,7 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
found_barred = True found_barred = True
if found_barred: if found_barred:
continue return True
# Because we need to capture events related to target setup and process # Because we need to capture events related to target setup and process
# startup, we handle them here, in a special way. # startup, we handle them here, in a special way.
@ -305,15 +429,15 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
if len(bits) > 1 and bits[1].startswith("la") and "launch".startswith(bits[1]): if len(bits) > 1 and bits[1].startswith("la") and "launch".startswith(bits[1]):
# This is `process launch`. # This is `process launch`.
process_launch(driver, relay, bits[2:], dbg) process_launch(driver, relay, bits[2:], dbg)
continue return True
if len(bits) > 1 and bits[1].startswith("a") and "attach".startswith(bits[1]): if len(bits) > 1 and bits[1].startswith("a") and "attach".startswith(bits[1]):
# This is `process attach`. # This is `process attach`.
process_attach(driver, relay, bits[2:], dbg) process_attach(driver, relay, bits[2:], dbg)
continue return True
if len(bits) > 1 and bits[1].startswith("conn") and "connect".startswith(bits[1]): if len(bits) > 1 and bits[1].startswith("conn") and "connect".startswith(bits[1]):
# This is `process connect`. # This is `process connect`.
process_connect(driver, relay, bits[2:], dbg) process_connect(driver, relay, bits[2:], dbg)
continue return True
# We don't care about other process commands.. # We don't care about other process commands..
if (bits[0].startswith("at") and "attach".startswith(bits[0])) or ( if (bits[0].startswith("at") and "attach".startswith(bits[0])) or (
@ -322,13 +446,13 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
# `attach` is an alias for `_regexp-attach` # `attach` is an alias for `_regexp-attach`
# (it is NOT an alias for `process attach` even if it may seem so!) # (it is NOT an alias for `process attach` even if it may seem so!)
attach(driver, relay, bits[1:], dbg) attach(driver, relay, bits[1:], dbg)
continue return True
if bits[0].startswith("ta") and "target".startswith(bits[0]): if bits[0].startswith("ta") and "target".startswith(bits[0]):
if len(bits) > 1 and bits[1].startswith("c") and "create".startswith(bits[1]): if len(bits) > 1 and bits[1].startswith("c") and "create".startswith(bits[1]):
# This is `target create` # This is `target create`
target_create(bits[2:], dbg) target_create(bits[2:], dbg)
continue return True
if len(bits) > 1 and bits[1].startswith("de") and "delete".startswith(bits[1]): if len(bits) > 1 and bits[1].startswith("de") and "delete".startswith(bits[1]):
# This is `target delete` # This is `target delete`
# #
@ -340,7 +464,7 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
if bits[0].startswith("r") and "run".startswith(bits[0]): if bits[0].startswith("r") and "run".startswith(bits[0]):
# `run` is an alias for `process launch` # `run` is an alias for `process launch`
process_launch(driver, relay, bits[1:], dbg) process_launch(driver, relay, bits[1:], dbg)
continue return True
if bits[0] == "c" or (bits[0].startswith("con") and "continue".startswith(bits[0])): if bits[0] == "c" or (bits[0].startswith("con") and "continue".startswith(bits[0])):
# Handle `continue` manually. While `ProcessDriver.run_lldb_command` # Handle `continue` manually. While `ProcessDriver.run_lldb_command`
@ -348,7 +472,7 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
# need for it to. We know what the user wants, so we can fast-track # need for it to. We know what the user wants, so we can fast-track
# their request. # their request.
continue_process(driver, bits[1:], dbg) continue_process(driver, bits[1:], dbg)
continue return True
if bits[0].startswith("gd") and "gdb-remote".startswith(bits[0]): if bits[0].startswith("gd") and "gdb-remote".startswith(bits[0]):
# `gdb-remote` is almost the same as `process launch -p gdb-remote`, # `gdb-remote` is almost the same as `process launch -p gdb-remote`,
@ -356,7 +480,7 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
# "connect://" to it. So, from our pespective, it is a separate # "connect://" to it. So, from our pespective, it is a separate
# command, even though it will also end up calling process_launch(). # command, even though it will also end up calling process_launch().
gdb_remote(driver, relay, bits[1:], dbg) gdb_remote(driver, relay, bits[1:], dbg)
continue return True
if bits[0] == "set": if bits[0] == "set":
# We handle `set` as a command override. We do this so that users # We handle `set` as a command override. We do this so that users
@ -382,20 +506,31 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
) )
) )
continue return True
if bits[0] == "ipi": if bits[0] == "ipi":
# Spawn IPython shell, easy for debugging # Spawn IPython shell, easy for debugging
run_ipython_shell() run_ipython_shell()
continue return True
# The command hasn't matched any of our filtered commands, just let LLDB # The command hasn't matched any of our filtered commands, just let LLDB
# handle it normally. Either in the context of the process, if we have # handle it normally. Either in the context of the process, if we have
# one, or just in a general context. # one, or just in a general context.
if driver.has_process(): if driver.has_process():
driver.run_lldb_command(line) driver.run_lldb_command(line, lldb_out_target)
else: else:
dbg.debugger.HandleCommand(line) ret = lldb.SBCommandReturnObject()
dbg.debugger.GetCommandInterpreter().HandleCommand(line, ret)
if ret.IsValid():
# LLDB can give us strings that may fail to encode.
out = ret.GetOutput().strip()
if len(out) > 0:
lldb_out_target.write(out.encode(sys.stdout.encoding, errors="backslashreplace"))
lldb_out_target.write(b"\n")
out = ret.GetError().strip()
if len(out) > 0:
lldb_out_target.write(out.encode(sys.stdout.encoding, errors="backslashreplace"))
lldb_out_target.write(b"\n")
# At this point, the last command might've queued up some execution # At this point, the last command might've queued up some execution
# control procedures for us to chew on. Run them now. # control procedures for us to chew on. Run them now.
@ -420,7 +555,8 @@ def run(startup: List[str] | None = None, debug: bool = False) -> None:
"Exceptions occurred execution controller processing. Debugging will likely be unreliable going forward." "Exceptions occurred execution controller processing. Debugging will likely be unreliable going forward."
) )
) )
break
return True
def parse(args: List[str], parser: argparse.ArgumentParser, unsupported: List[str]) -> Any | None: def parse(args: List[str], parser: argparse.ArgumentParser, unsupported: List[str]) -> Any | None:

@ -4,6 +4,7 @@ import os
import sys import sys
from asyncio import CancelledError from asyncio import CancelledError
from typing import Any from typing import Any
from typing import BinaryIO
from typing import Coroutine from typing import Coroutine
from typing import List from typing import List
@ -213,7 +214,7 @@ class ProcessDriver:
self.process.Continue() self.process.Continue()
self._run_until_next_stop() self._run_until_next_stop()
def run_lldb_command(self, command: str) -> None: def run_lldb_command(self, command: str, target: BinaryIO) -> None:
""" """
Runs the given LLDB command and ataches I/O if necessary. Runs the given LLDB command and ataches I/O if necessary.
""" """
@ -226,12 +227,12 @@ class ProcessDriver:
# LLDB can give us strings that may fail to encode. # LLDB can give us strings that may fail to encode.
out = ret.GetOutput().strip() out = ret.GetOutput().strip()
if len(out) > 0: if len(out) > 0:
sys.stdout.buffer.write(out.encode(sys.stdout.encoding, errors="backslashreplace")) target.write(out.encode(sys.stdout.encoding, errors="backslashreplace"))
print() target.write(b"\n")
out = ret.GetError().strip() out = ret.GetError().strip()
if len(out) > 0: if len(out) > 0:
sys.stdout.buffer.write(out.encode(sys.stdout.encoding, errors="backslashreplace")) target.write(out.encode(sys.stdout.encoding, errors="backslashreplace"))
print() target.write(b"\n")
if self.debug: if self.debug:
print(f"[-] ProcessDriver: LLDB Command Status: {ret.GetStatus():#x}") print(f"[-] ProcessDriver: LLDB Command Status: {ret.GetStatus():#x}")

Loading…
Cancel
Save