Add LLDB test driver and initial Debugger API tests (#3120)

pull/3149/merge
Matt. 5 months ago committed by GitHub
parent 032ba5fb96
commit 365af330ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -287,7 +287,9 @@ def print_hint(msg: str, *args):
@wrap_with_history
def run(
controller: Callable[[PwndbgController], Coroutine[Any, Any, None]], debug: bool = False
controller: Callable[..., Coroutine[Any, Any, None]],
*args,
debug: bool = False,
) -> None:
"""
Runs the Pwndbg CLI through the given asynchronous controller.
@ -322,7 +324,7 @@ def run(
show_greeting()
last_command = ""
coroutine = controller(PwndbgController())
coroutine = controller(PwndbgController(), *args)
last_result: Any = None
last_exc: Exception | None = None

@ -8,25 +8,11 @@ import re
import shutil
import subprocess
import sys
from typing import Any
from typing import Callable
from typing import Coroutine
from typing import List
PARSER = argparse.ArgumentParser(prog="pwndbg-lldb")
PARSER.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
PARSER.add_argument("target", nargs="?")
parser_attach = PARSER.add_mutually_exclusive_group()
parser_attach.add_argument(
"-n", "--attach-name", help="Tells the debugger to attach to a process with the given name."
)
parser_attach.add_argument(
"-p", "--attach-pid", help="Tells the debugger to attach to a process with the given pid."
)
PARSER.add_argument(
"-w",
"--wait-for",
action="store_true",
help="Tells the debugger to wait for a process with the given pid or name to launch before attaching.",
)
def find_lldb_version() -> List[int]:
"""
@ -59,27 +45,14 @@ def find_lldb_python_path() -> str:
return folder
def get_venv_bin_path():
bin_dir = "Scripts" if os.name == "nt" else "bin"
return os.path.join(sys.prefix, bin_dir)
def prepend_venv_bin_to_path():
# Set virtualenv's bin path (needed for utility tools like ropper, pwntools etc)
venv_bin = get_venv_bin_path()
path_elements = os.environ.get("PATH", "").split(os.pathsep)
if venv_bin in path_elements:
return
path_elements.insert(0, venv_bin)
os.environ["PATH"] = os.pathsep.join(path_elements)
def main():
prepend_venv_bin_to_path()
args = PARSER.parse_args()
debug = args.verbose
def launch(
controller: Callable[..., Coroutine[Any, Any, None]],
*args,
debug: bool = False,
) -> None:
"""
Launch Pwndbg with the given controller.
"""
if sys.platform == "linux" and "LLDB_DEBUGSERVER_PATH" not in os.environ:
os.environ["LLDB_DEBUGSERVER_PATH"] = shutil.which("lldb-server")
@ -121,15 +94,62 @@ def main():
print("[-] Launcher: Initializing Pwndbg")
lldbinit.main(debugger, lldb_version[0], lldb_version[1], debug=debug)
from pwndbg.dbg.lldb.repl import PwndbgController
from pwndbg.dbg.lldb.repl import print_error
from pwndbg.dbg.lldb.repl import print_warn
from pwndbg.dbg.lldb.repl import run as run_repl
if debug:
print("[-] Launcher: Entering Pwndbg CLI")
# Prepare the startup commands.
run_repl(controller, *args, debug=debug)
# Dispose of our debugger and terminate LLDB.
lldb.SBDebugger.Destroy(debugger)
lldb.SBDebugger.Terminate()
def get_venv_bin_path():
bin_dir = "Scripts" if os.name == "nt" else "bin"
return os.path.join(sys.prefix, bin_dir)
def prepend_venv_bin_to_path():
# Set virtualenv's bin path (needed for utility tools like ropper, pwntools etc)
venv_bin = get_venv_bin_path()
path_elements = os.environ.get("PATH", "").split(os.pathsep)
if venv_bin in path_elements:
return
path_elements.insert(0, venv_bin)
os.environ["PATH"] = os.pathsep.join(path_elements)
def main() -> None:
"""
Entry point for the pwndbg-lldb command line tool.
"""
prepend_venv_bin_to_path()
# Parse the arguments we were given.
parser = argparse.ArgumentParser(prog="pwndbg-lldb")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
parser.add_argument("target", nargs="?")
parser_attach = parser.add_mutually_exclusive_group()
parser_attach.add_argument(
"-n", "--attach-name", help="Tells the debugger to attach to a process with the given name."
)
parser_attach.add_argument(
"-p", "--attach-pid", help="Tells the debugger to attach to a process with the given pid."
)
parser.add_argument(
"-w",
"--wait-for",
action="store_true",
help="Tells the debugger to wait for a process with the given pid or name to launch before attaching.",
)
args = parser.parse_args()
debug = args.verbose
# Prepare the startup commands based on those arguments.
startup = []
if args.target:
# DEVIATION: The LLDB CLI silently ignores any target information passed
@ -145,7 +165,7 @@ def main():
# both '--attach-name' and '--attach-pid', it silently ignores it when
# used with the latter. Pwndbg prints out a warning, instead.
if args.wait_for:
print_warn("'--wait-for' has no effect when used with '--attach-pid'")
print("warn: '--wait-for' has no effect when used with '--attach-pid'")
startup.append(f'process attach --pid "{args.attach_pid}"')
else:
@ -154,19 +174,23 @@ def main():
# nesting argument groups has been deprecated since Python 3.11, and
# the deprecation message suggests it was never even supported in
# the first place :/
print_error(
"'--wait-for' must be used in combination with either '--attach-name' or '--attach-pid'"
print(
"error: '--wait-for' must be used in combination with either '--attach-name' or '--attach-pid'"
)
PARSER.print_usage()
parser.print_usage()
sys.exit(1)
if (args.attach_pid is not None or args.attach_name is not None) and args.target:
print_warn(
"have both a target and an attach request, your target may be overwritten on attach"
print(
"warn: have both a target and an attach request, your target may be overwritten on attach"
)
def drive(startup: List[str] | None):
async def drive(c: PwndbgController):
async def drive(c):
from pwndbg.dbg.lldb.repl import PwndbgController
assert isinstance(c, PwndbgController)
if startup is not None:
for line in startup:
await c.execute(line)
@ -176,11 +200,8 @@ def main():
return drive
run_repl(drive(startup), debug=debug)
# Dispose of our debugger and terminate LLDB.
lldb.SBDebugger.Destroy(debugger)
lldb.SBDebugger.Terminate()
# Launch Pwndbg in interactive mode.
launch(drive(startup), debug=debug)
if __name__ == "__main__":

@ -13,164 +13,82 @@ import time
from enum import Enum
from pathlib import Path
from subprocess import CompletedProcess
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import Coroutine
from typing import List
from typing import Tuple
def main():
args = parse_args()
coverage_out = None
if args.cov:
print("Will run codecov")
coverage_out = Path(".cov/coverage")
if args.pdb:
print("Will run tests in serial and with Python debugger")
args.serial = True
local_pwndbg_root = (Path(os.path.dirname(__file__)) / ".." / "../").resolve()
print(f"[*] Local Pwndbg root: {local_pwndbg_root}")
# Build the binaries for the test group.
#
# As the nix store is read-only, we always use the local Pwndbg root for
# building tests, even if the user has requested a nix-compatible test.
#
# Ideally, however, we would build the test targets as part of `nix verify`.
ensure_zig_path(local_pwndbg_root)
make_all(local_pwndbg_root / args.group.binary_dir())
if not args.driver.can_run(args.group):
print(
f"ERROR: Driver '{args.driver}' can't run test group '{args.group}'. Use another driver."
)
sys.exit(1)
match args.driver:
case Driver.GDB:
host = get_gdb_host(args, local_pwndbg_root)
# Handle the case in which the user only wants the collection to run.
if args.collect_only:
for test in host.collect():
print(test)
sys.exit(0)
# Actually run the tests.
run_tests_and_print_stats(
host, args.test_name_filter, args.pdb, args.serial, args.verbose, coverage_out
)
def run_tests_and_print_stats(
host: TestHost,
regex_filter: str | None,
pdb: bool,
serial: bool,
verbose: bool,
coverage_out: Path | None,
):
def _collection_from_pytest(
result: CompletedProcess[str], pwndbg_root: Path, pytest_root: Path
) -> List[str]:
"""
Runs all the tests made available by a given test host.
Given the output of a completed Pytest collection, return a list of tests.
"""
stats = TestStats()
start = time.monotonic_ns()
tests_collect_output = result.stdout
# PDB tests always run in sequence.
if pdb and not serial:
print("WARNING: Python Debugger (PDB) requires serial execution, but the user has")
print(" requested parallel execution. Tests will *not* run in parallel.")
serial = True
if result.returncode != 0:
raise RuntimeError(f"collection command failed: {result.stderr} {result.stdout}")
tests_list = host.collect()
if regex_filter is not None:
# Filter test names if required.
tests_list = [case for case in tests_list if re.search(regex_filter, case)]
if serial:
print("\nRunning tests in series")
for test in tests_list:
result = host.run(test, coverage_out, pdb)
stats.handle_test_result(test, result, verbose)
else:
print("\nRunning tests in parallel")
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
for test in tests_list:
executor.submit(host.run, test, coverage_out, pdb).add_done_callback(
# `test=test` forces the variable to bind early. This will
# change the type of the lambda, however, so we have to
# assure MyPy we know what we're doing.
lambda future, test=test: stats.handle_test_result( # type: ignore[misc]
test, future.result(), verbose
)
)
# Return SIGINT to the default behavior.
signal.signal(signal.SIGINT, signal.SIG_DFL)
end = time.monotonic_ns()
duration = end - start
print("")
print("*********************************")
print("********* TESTS SUMMARY *********")
print("*********************************")
print(
f"Time Spent : {duration / 1000000000:.2f}s (cumulative: {stats.total_duration / 1000000000:.2f}s)"
)
print(f"Tests Passed : {stats.pass_tests}")
print(f"Tests Skipped: {stats.skip_tests}")
print(f"Tests Failed : {stats.fail_tests}")
# Extract the test names from the output using regex
#
# _run_gdb executes it in the current working directory, and so paths
# printed by pytest are relative to it.
path_spec = pytest_root.resolve().relative_to(pwndbg_root / "tests")
pattern = re.compile(rf"{path_spec}.*::.*")
matches = pattern.findall(tests_collect_output)
if stats.fail_tests != 0:
print("\nFailing tests:")
for test_case in stats.fail_tests_names:
print(f"- {test_case}")
sys.exit(1)
return list(matches)
def get_gdb_host(args: argparse.Namespace, local_pwndbg_root: Path) -> TestHost:
def _result_from_pytest(result: CompletedProcess[str], duration_ns: int) -> TestResult:
"""
Build a GDB-based test host.
Given the output of a completed test, return a `TestResult`.
"""
if args.nix:
# Use pwndbg, as build by nix.
gdb_path = local_pwndbg_root / "result" / "bin" / "pwndbg"
if not gdb_path.exists():
print("ERROR: No nix-compatible pwndbg found. Run nix build .#pwndbg-dev")
sys.exit(1)
elif args.group == Group.CROSS_ARCH_USER:
# Some systems don't ship 'gdb-multiarch', but support multiple
# architectures in their regular binaries. Try the regular GDB.
supports_arches = "py import os; archs = ['i386', 'aarch64', 'arm', 'mips', 'riscv', 'sparc']; os._exit(3) if len([arch for arch in archs if arch in gdb.architecture_names()]) == len(archs) else os._exit(2)"
gdb_path_str = shutil.which("pwndbg")
if gdb_path_str is None:
print("ERROR: No 'pwndbg' executables in path")
sys.exit(1)
result = subprocess.run([gdb_path_str, "-nx", "-ex", supports_arches], capture_output=True)
# GDB supports cross architecture targets
if result.returncode == 3:
gdb_path = Path(gdb_path_str)
# Determine low-granularity status from process return code.
status = TestStatus.PASSED if result.returncode == 0 else TestStatus.FAILED
# Determine high-granularity status from process output, if possible.
stdout_status = None
stdout_context = None
if result.stdout is not None:
entries = re.search(
r"(\x1b\[3.m(PASSED|FAILED|SKIPPED|XPASS|XFAIL)\x1b\[0m)( .*::.* -)?( (.*))?",
result.stdout,
re.MULTILINE,
)
if entries:
stdout_status = entries[2]
stdout_context = entries[5]
# If possible, augment the status with the high-granularity output.
if stdout_status is not None:
# Check the consistency between the values.
if status == TestStatus.FAILED and stdout_status != "FAILED":
# They disagree.
#
# In this case, we should believe the more accurate but
# lower-granularity status value. This may happen if the output
# of the test includes any of the words we match against.
pass
else:
print("ERROR: 'pwndbg' does not support cross architecture targets")
sys.exit(1)
else:
# Use the regular system GDB.
gdb_path_str = shutil.which("pwndbg")
if gdb_path_str is None:
print("ERROR: No 'gdb' executable in path")
sys.exit(1)
gdb_path = Path(gdb_path_str)
from host.gdb import GDBTestHost
return GDBTestHost(
local_pwndbg_root,
local_pwndbg_root / args.group.library(),
local_pwndbg_root / args.group.binary_dir(),
gdb_path,
)
match stdout_status:
case "PASSED":
status = TestStatus.PASSED
case "SKIPPED":
status = TestStatus.SKIPPED
case "XPASS":
status = TestStatus.XPASS
case "XFAIL":
status = TestStatus.XFAIL
case _:
# Also a disegreement. Keep the low-granularity status.
pass
return TestResult(status, duration_ns, result.stdout, result.stderr, stdout_context)
class TestStatus(Enum):
@ -231,179 +149,33 @@ class TestHost:
Collection of code coverage data may be enabled for the test by
specifying a coverage file path in `coverage_out`.
"""
pass
raise NotImplementedError()
def collect(self) -> List[str]:
"""
Collect the names of all the tests available to this host.
"""
pass
raise NotImplementedError()
class Group(Enum):
"""
Tests are divided into multiple groups.
"""
GDB = "gdb"
LLDB = "lldb"
DBG = "dbg"
CROSS_ARCH_USER = "cross-arch-user"
def __str__(self):
return self._value_
def library(self) -> Path:
class Controller:
def launch(self, binary: Path) -> Awaitable[None]:
"""
Subdirectory relative to the Pwndbg root containing the tests.
Launch the binary with the given path, relative to the binaries folder
for the calling test.
"""
match self:
case Group.GDB:
return Path("tests/library/gdb/")
case Group.LLDB:
return Path("tests/library/lldb/")
case Group.DBG:
return Path("tests/library/dbg/")
case Group.CROSS_ARCH_USER:
return Path("tests/library/qemu-user/")
case other:
raise AssertionError(f"group {other} is unaccounted for")
raise NotImplementedError()
def binary_dir(self) -> Path:
"""
Subdirectory relative to the Pwndbg root containing the required
binaries for a given test group.
"""
match self:
case Group.GDB | Group.LLDB | Group.DBG:
return Path("tests/binaries/host/")
case Group.CROSS_ARCH_USER:
return Path("tests/binaries/qemu-user/")
case other:
raise AssertionError(f"group {other} is unaccounted for")
class Driver(Enum):
GDB = "gdb"
def __str__(self):
return self._value_
def can_run(self, grp: Group) -> bool:
"""
Wether a given driver can run a given test group.
"""
match self:
case Driver.GDB:
match grp:
case Group.GDB:
return True
case Group.LLDB:
return False
case Group.DBG:
return True
case Group.CROSS_ARCH_USER:
return True
raise AssertionError(f"unaccounted for combination of driver '{self}' and group '{grp}'")
def parse_args():
parser = argparse.ArgumentParser(description="Run tests.")
parser.add_argument("-g", "--group", choices=list(Group), type=Group, required=True)
parser.add_argument(
"-d",
"--driver",
choices=list(Driver),
type=Driver,
required=True,
)
parser.add_argument(
"-p",
"--pdb",
action="store_true",
help="enable pdb (Python debugger) post mortem debugger on failed tests",
)
parser.add_argument("-c", "--cov", action="store_true", help="enable codecov")
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="display all test output instead of just failing test output",
)
parser.add_argument(
"-s", "--serial", action="store_true", help="run tests one at a time instead of in parallel"
)
parser.add_argument(
"--nix",
action="store_true",
help="run tests using built for nix environment",
)
parser.add_argument(
"--collect-only",
action="store_true",
help="only show the output of test collection, don't run any tests",
)
parser.add_argument(
"test_name_filter", nargs="?", help="run only tests that match the regex", default=".*"
)
return parser.parse_args()
def ensure_zig_path(local_pwndbg_root: Path):
if "ZIGPATH" not in os.environ:
# If ZIGPATH is not set, set it to $pwd/.zig
# In Docker environment this should by default be set to /opt/zig
os.environ["ZIGPATH"] = str(local_pwndbg_root / ".zig")
print(f'[+] ZIGPATH set to {os.environ["ZIGPATH"]}')
def make_all(path: Path, jobs: int = multiprocessing.cpu_count()):
def start(controller: Callable[[Controller], Coroutine[Any, Any, None]]) -> None:
"""
Build the binaries for a given test group.
"""
if not path.exists():
raise ValueError(f"given non-existent path {path}")
print(f"[+] make -C {path} -j{jobs} all")
try:
subprocess.check_call(["make", f"-j{jobs}", "all"], cwd=str(path))
except subprocess.CalledProcessError:
sys.exit(1)
class TestStats:
def __init__(self):
self.total_duration = 0
self.fail_tests = 0
self.pass_tests = 0
self.skip_tests = 0
self.fail_tests_names = []
The start function.
def handle_test_result(self, case: str, test_result: TestResult, verbose: bool):
match test_result.status:
case TestStatus.FAILED | TestStatus.XFAIL:
self.fail_tests += 1
self.fail_tests_names.append(case)
case TestStatus.PASSED | TestStatus.XPASS:
self.pass_tests += 1
case TestStatus.SKIPPED:
self.skip_tests += 1
# skip_reason = " " + (
# process.stdout.split(test_status)[1].split("\n\n\x1b[33m")[0].replace("\n", "")
# )
Both the testing hosts and the tests themselves share this module, and this
function is used by the test piping to start the async debugger runtime.
self.total_duration += test_result.duration_ns
print(
f"{case:<100} {test_result.status} {test_result.duration_ns / 1000000000:.2f}s {test_result.context if test_result.context else ''}"
)
# Only show the output of failed tests unless the verbose flag was used
if verbose or test_result.status == TestStatus.FAILED:
print("")
print(test_result.stderr)
print(test_result.stdout)
if __name__ == "__main__":
main()
This function must be replaced in the test.
"""
raise AssertionError(
"either called host.start() from the testing host, or testing code did not replace it"
)

@ -11,6 +11,8 @@ from typing import List
from host import TestHost
from host import TestResult
from host import TestStatus
from host import _collection_from_pytest
from host import _result_from_pytest
class GDBTestHost(TestHost):
@ -76,6 +78,7 @@ class GDBTestHost(TestHost):
env["PWNDBG_DISABLE_COLORS"] = "1"
env["GDB_BIN_PATH"] = str(self._gdb_path)
env["TEST_BINARIES_ROOT"] = str(self._binaries_root)
env["TEST_PWNDBG_ROOT"] = str(self._pwndbg_root)
if interactive:
env["USE_PDB"] = "1"
@ -86,47 +89,7 @@ class GDBTestHost(TestHost):
)
duration = time.monotonic_ns() - started_at
# Determine low-granularity status from process return code.
status = TestStatus.PASSED if result.returncode == 0 else TestStatus.FAILED
# Determine high-granularity status from process output, if possible.
stdout_status = None
stdout_context = None
if not interactive:
entries = re.search(
r"(\x1b\[3.m(PASSED|FAILED|SKIPPED|XPASS|XFAIL)\x1b\[0m)( .*::.* -)?( (.*))?",
result.stdout,
re.MULTILINE,
)
if entries:
stdout_status = entries[2]
stdout_context = entries[5]
# If possible, augment the status with the high-granularity output.
if stdout_status is not None:
# Check the consistency between the values.
if status == TestStatus.FAILED and stdout_status != "FAILED":
# They disagree.
#
# In this case, we should believe the more accurate but
# lower-granularity status value. This may happen if the output
# of the test includes any of the words we match against.
pass
else:
match stdout_status:
case "PASSED":
status = TestStatus.PASSED
case "SKIPPED":
status = TestStatus.SKIPPED
case "XPASS":
status = TestStatus.XPASS
case "XFAIL":
status = TestStatus.XFAIL
case _:
# Also a disegreement. Keep the low-granularity status.
pass
return TestResult(status, duration, result.stdout, result.stderr, stdout_context)
return _result_from_pytest(result, duration)
def collect(self) -> List[str]:
# NOTE: We run tests under GDB sessions and because of some cleanup/tests dependencies problems
@ -135,19 +98,11 @@ class GDBTestHost(TestHost):
env = os.environ.copy()
env["TEST_BINARIES_ROOT"] = str(self._binaries_root)
env["TEST_PWNDBG_ROOT"] = str(self._pwndbg_root)
env["TESTS_PATH"] = str(self._pytest_root)
result = self._run_gdb(target, env=env)
tests_collect_output = result.stdout
if result.returncode != 0:
raise RuntimeError(f"collection command failed: {result.stderr} {result.stdout}")
# Extract the test names from the output using regex
#
# _run_gdb executes it in the current working directory, and so paths
# printed by pytest are relative to it.
path_spec = self._pytest_root.resolve().relative_to(self._pwndbg_root)
pattern = re.compile(rf"{path_spec}.*::.*")
matches = pattern.findall(tests_collect_output)
return list(matches)
names = _collection_from_pytest(result, self._pwndbg_root, self._pytest_root)
# We execute from Pwndbg root, so we need to prepend tests/ to the names.
return [f"tests/{name}" for name in names]

@ -5,12 +5,17 @@ import sys
import pytest
PWNDBG_ROOT = os.environ.get("TEST_PWNDBG_ROOT")
TESTS_PATH = os.environ.get("TESTS_PATH")
if TESTS_PATH is None:
print("'TESTS_PATH' environment variable not set. Failed to collect tests.")
sys.exit(1)
host_home = f"{PWNDBG_ROOT}/tests/"
if host_home not in sys.path:
sys.path.append(host_home)
class CollectTestFunctionNames:
"""See https://github.com/pytest-dev/pytest/issues/2039#issuecomment-257753269"""

@ -2,10 +2,55 @@ from __future__ import annotations
import os
import sys
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Coroutine
import coverage
import gdb
import pytest
PWNDBG_ROOT = os.environ["TEST_PWNDBG_ROOT"]
# Prepare the test host environment for the Debugger API tests.
host_home = f"{PWNDBG_ROOT}/tests/"
if host_home not in sys.path:
sys.path.append(host_home)
import host
class _GDBController(host.Controller):
async def launch(self, binary_path: Path) -> None:
"""
Launch the given binary.
GDB hides the asynchronous heavy lifting from us, so this call is
synchronous.
"""
os.environ["PWNDBG_IN_TEST"] = "1"
gdb.execute(f"file {binary_path}")
gdb.execute("set exception-verbose on")
gdb.execute("set width 80")
gdb.execute("set context-reserve-lines never")
os.environ["COLUMNS"] = "80"
gdb.execute("starti " + " ".join(args))
def _start(outer: Callable[[host.Controller], Coroutine[Any, Any, None]]) -> None:
# The GDB controller is entirely synchronous, so keep advancing the
# corountine unconditionally until it ends..
coroutine = outer(_GDBController())
try:
coroutine.send(None)
except StopIteration:
pass
host.start = _start
# Start the test, proper.
use_pdb = os.environ.get("USE_PDB") == "1"
sys._pwndbg_unittest_run = True # type: ignore[attr-defined]

@ -0,0 +1,79 @@
from __future__ import annotations
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import List
from host import TestHost
from host import TestResult
from host import _collection_from_pytest
from host import _result_from_pytest
class LLDBTestHost(TestHost):
def __init__(self, pwndbg_root: Path, pytest_root: Path, binaries_root: Path):
assert pwndbg_root.exists()
assert pwndbg_root.is_dir()
assert pytest_root.exists()
assert pytest_root.is_dir()
assert binaries_root.exists()
assert binaries_root.is_dir()
self._pwndbg_root = pwndbg_root
self._pytest_root = pytest_root
self._binaries_root = binaries_root
def _launch(
self,
op: str,
test_name: str | None,
capture: bool,
pdb: bool,
) -> subprocess.CompletedProcess[str]:
target = self._pwndbg_root / "tests/host/lldb/launch-guest.py"
assert target.exists()
assert target.is_file()
assert op in ("RUN-TEST", "COLLECT")
assert op != "RUN-TEST" or test_name is not None
interpreter = Path(sys.executable)
assert interpreter.exists()
assert interpreter.is_file()
env = os.environ.copy()
env["TEST_OPERATION"] = op
env["TEST_PYTEST_ROOT"] = str(self._pytest_root)
env["TEST_PWNDBG_ROOT"] = str(self._pwndbg_root)
env["TEST_BINARIES_ROOT"] = str(self._binaries_root)
env["TEST_PDB_ON_FAIL"] = "1" if pdb else "0"
if test_name is not None:
env["TEST_NAME"] = test_name
return subprocess.run(
[interpreter, str(target)], capture_output=capture, text=True, env=env
)
def collect(self) -> List[str]:
result = self._launch("COLLECT", None, True, False)
return _collection_from_pytest(result, self._pwndbg_root, self._pytest_root)
def run(self, case: str, coverage_out: Path | None, interactive: bool) -> TestResult:
if coverage_out is not None:
# Do before PR is merged.
#
# TODO: Add CodeCov for the LLDB test driver
print("[-] Warning: LLDB does not yet support code coverage")
beg = time.monotonic_ns()
result = self._launch("RUN-TEST", case, not interactive, interactive)
end = time.monotonic_ns()
return _result_from_pytest(result, end - beg)

@ -0,0 +1,125 @@
from __future__ import annotations
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Coroutine
from typing import List
async def _run(ctrl: Any, outer: Callable[..., Coroutine[Any, Any, None]]) -> None:
# We only import this here, as pwndbg-lldb is responsible for setting Pwndbg
# up on our behalf.
from host import Controller
from pwndbg.dbg.lldb.repl import PwndbgController
assert isinstance(ctrl, PwndbgController)
# Idealy we'd define this in an outer scope, but doing it in here gains us
# proper access to type names.
class _LLDBController(Controller):
def __init__(self, pc: PwndbgController):
self.pc = pc
async def launch(self, binary: Path) -> None:
await self.pc.execute(f"target create {binary}")
await self.pc.execute("process launch -s")
await outer(_LLDBController(ctrl))
def run(pytest_args: List[str], pytest_plugins: List[Any] | None) -> int:
# The import path is set up before this function is called.
import host
from host import Controller
from pwndbginit import pwndbg_lldb
# Replace host.start with a proper implementation of the start command.
def _start(outer: Callable[[Controller], Coroutine[Any, Any, None]]) -> None:
pwndbg_lldb.launch(_run, outer, debug=True)
host.start = _start
# Run Pytest.
import pytest
return pytest.main(pytest_args, plugins=pytest_plugins)
class Operation(Enum):
RUN_TEST = "RUN-TEST"
COLLECT = "COLLECT"
def __str__(self) -> str:
return self._value_
class CollectTestFunctionNames:
"See https://github.com/pytest-dev/pytest/issues/2039#issuecomment-257753269"
def __init__(self):
self.collected = []
def pytest_collection_modifyitems(self, items):
for item in items:
self.collected.append(item.nodeid)
if __name__ == "__main__":
pwndbg_home = Path(os.environ["TEST_PWNDBG_ROOT"])
assert pwndbg_home.exists()
assert pwndbg_home.is_dir()
pwndbg_home = pwndbg_home.resolve(strict=True)
host_home = pwndbg_home / "tests"
assert host_home.exists()
assert host_home.is_dir()
# Add to the path so we can access both Pwndbg and the testing host library.
if str(host_home) not in sys.path:
sys.path = [str(host_home)] + sys.path
if str(pwndbg_home) not in sys.path:
sys.path = [str(pwndbg_home)] + sys.path
# Prepare the requested operation.
op = Operation(os.environ["TEST_OPERATION"])
match op:
case Operation.COLLECT:
pytest_home = Path(os.environ["TEST_PYTEST_ROOT"])
assert pytest_home.exists()
assert pytest_home.is_dir()
pytest_args = ["--collect-only", str(pytest_home)]
pytest_plugins = [CollectTestFunctionNames()]
case Operation.RUN_TEST:
test_name = os.environ["TEST_NAME"]
# Ideally, we'd check that the test name is both valid and only
# matches a single test in the library, but checking that it is at
# least not empty should be good enough, provided the test host
# is careful.
assert test_name
pytest_args = [test_name, "-vvv", "-s", "--showlocals", "--color=yes"]
if os.environ["TEST_PDB_ON_FAIL"] == "1":
pytest_args.append("--pdb")
pytest_plugins = None
# Start the test, proper.
status = run(pytest_args, pytest_plugins)
if op == Operation.COLLECT:
for nodeid in pytest_plugins[0].collected:
print(nodeid)
sys.exit(status)

@ -0,0 +1,37 @@
from __future__ import annotations
import functools
import os
from inspect import signature
from typing import Any
from typing import Callable
from typing import Concatenate
from typing import Coroutine
import host
from host import Controller
BINARIES_PATH = os.environ.get("TEST_BINARIES_ROOT")
def pwndbg_test[**T](
test: Callable[Concatenate[Controller, T], Coroutine[Any, Any, None]],
) -> Callable[T, None]:
@functools.wraps(test)
def inner_test(*args: T.args, **kwargs: T.kwargs) -> None:
async def _test(controller: Controller) -> None:
await test(controller, *args, **kwargs)
print(f"[+] Launching test {test.__name__} asynchronously")
host.start(_test)
# Remove the controller from the signature, as seen by Pytest.
sig = signature(inner_test)
sig = sig.replace(parameters=tuple(sig.parameters.values())[1:])
inner_test.__signature__ = sig
return inner_test
def get_binary(name: str) -> str:
return os.path.join(BINARIES_PATH, name)

@ -0,0 +1,65 @@
"""
Metatests.
These tests are intended to check the functioning of the testing code itself,
rather than of Pwndbg more generally.
Some tests come in SUCCESS and XFAIL pairs, and they require that both succeed
in order for the overall test to succeed, as they contain no inner test logic
other than the minimum necessary to start the asynchronous controller function.
This module is responsible for testing the pwndbg_test decorator for async
controller tests.
"""
from __future__ import annotations
import host
import pytest
from host import Controller
from . import get_binary
from . import pwndbg_test
@pytest.mark.xfail
def test_starts_no_decorator_xfail() -> None:
async def run(ctrl: Controller):
raise RuntimeError("should fail!")
host.start(run)
def test_starts_no_decorator() -> None:
async def run(ctrl: Controller):
pass
host.start(run)
@pytest.mark.xfail
@pwndbg_test
async def test_starts_xfail(ctrl: Controller) -> None:
raise RuntimeError("should fail")
@pwndbg_test
async def test_starts(ctrl: Controller) -> None:
pass
@pwndbg_test
async def test_launch(ctrl: Controller) -> None:
"""
Launches a process and checks if a simple static CString can be read from it.
"""
import pwndbg
import pwndbg.aglib.typeinfo
await ctrl.launch(get_binary("memory.out"))
inf = pwndbg.dbg.selected_inferior()
addr = inf.lookup_symbol("short_str")
string = addr.cast(pwndbg.aglib.typeinfo.char.pointer()).string()
assert string == "some cstring here"

@ -155,7 +155,7 @@ run_gdb() {
# NOTE: We run tests under GDB sessions and because of some cleanup/tests dependencies problems
# we decided to run each test in a separate GDB session
gdb_args=(--command ../../host/gdb/pytests_collect.py)
TESTS_COLLECT_OUTPUT=$(TESTS_PATH="$ROOT_DIR/tests/library/qemu-system/tests/" run_gdb "x86_64" 0 "${gdb_args[@]}")
TESTS_COLLECT_OUTPUT=$(TESTS_PATH="$ROOT_DIR/tests/library/qemu-system/tests/" TEST_PWNDBG_ROOT="${PWNDBG_ABS_PATH}" run_gdb "x86_64" 0 "${gdb_args[@]}")
if [ $? -eq 1 ]; then
echo -E "$TESTS_COLLECT_OUTPUT"
@ -201,6 +201,7 @@ run_test() {
PWNDBG_ARCH="${arch}" \
PWNDBG_KERNEL_TYPE="${kernel_type}" \
PWNDBG_KERNEL_VERSION="${kernel_version}" \
TEST_PWNDBG_ROOT="${PWNDBG_ABS_PATH}" \
run_gdb "${arch}" $should_drop_to_pdb "${gdb_args[@]}"
return $?
}

@ -1,7 +1,377 @@
#!/usr/bin/env python3
from __future__ import annotations
import host
import argparse
import concurrent.futures
import multiprocessing
import os
import re
import shutil
import signal
import subprocess
import sys
import time
from enum import Enum
from pathlib import Path
from host import TestHost
from host import TestResult
from host import TestStatus
def main():
args = parse_args()
coverage_out = None
if args.cov:
print("Will run codecov")
coverage_out = Path(".cov/coverage")
if args.pdb:
print("Will run tests in serial and with Python debugger")
args.serial = True
local_pwndbg_root = (Path(os.path.dirname(__file__)) / "../").resolve()
print(f"[*] Local Pwndbg root: {local_pwndbg_root}")
# Build the binaries for the test group.
#
# As the nix store is read-only, we always use the local Pwndbg root for
# building tests, even if the user has requested a nix-compatible test.
#
# Ideally, however, we would build the test targets as part of `nix verify`.
ensure_zig_path(local_pwndbg_root)
make_all(local_pwndbg_root / args.group.binary_dir())
if not args.driver.can_run(args.group):
print(
f"ERROR: Driver '{args.driver}' can't run test group '{args.group}'. Use another driver."
)
sys.exit(1)
match args.driver:
case Driver.GDB:
host = get_gdb_host(args, local_pwndbg_root)
case Driver.LLDB:
host = get_lldb_host(args, local_pwndbg_root)
# Handle the case in which the user only wants the collection to run.
if args.collect_only:
for test in host.collect():
print(test)
sys.exit(0)
# Actually run the tests.
run_tests_and_print_stats(
host, args.test_name_filter, args.pdb, args.serial, args.verbose, coverage_out
)
def run_tests_and_print_stats(
host: TestHost,
regex_filter: str | None,
pdb: bool,
serial: bool,
verbose: bool,
coverage_out: Path | None,
):
"""
Runs all the tests made available by a given test host.
"""
stats = TestStats()
start = time.monotonic_ns()
# PDB tests always run in sequence.
if pdb and not serial:
print("WARNING: Python Debugger (PDB) requires serial execution, but the user has")
print(" requested parallel execution. Tests will *not* run in parallel.")
serial = True
tests_list = host.collect()
if regex_filter is not None:
# Filter test names if required.
tests_list = [case for case in tests_list if re.search(regex_filter, case)]
if serial:
print("\nRunning tests in series")
for test in tests_list:
result = host.run(test, coverage_out, pdb)
stats.handle_test_result(test, result, verbose)
else:
print("\nRunning tests in parallel")
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
for test in tests_list:
executor.submit(host.run, test, coverage_out, pdb).add_done_callback(
# `test=test` forces the variable to bind early. This will
# change the type of the lambda, however, so we have to
# assure MyPy we know what we're doing.
lambda future, test=test: stats.handle_test_result( # type: ignore[misc]
test, future.result(), verbose
)
)
# Return SIGINT to the default behavior.
signal.signal(signal.SIGINT, signal.SIG_DFL)
end = time.monotonic_ns()
duration = end - start
print("")
print("*********************************")
print("********* TESTS SUMMARY *********")
print("*********************************")
print(
f"Time Spent : {duration / 1000000000:.2f}s (cumulative: {stats.total_duration / 1000000000:.2f}s)"
)
print(f"Tests Passed : {stats.pass_tests}")
print(f"Tests Skipped: {stats.skip_tests}")
print(f"Tests Failed : {stats.fail_tests}")
if stats.fail_tests != 0:
print("\nFailing tests:")
for test_case in stats.fail_tests_names:
print(f"- {test_case}")
sys.exit(1)
def get_gdb_host(args: argparse.Namespace, local_pwndbg_root: Path) -> TestHost:
"""
Build a GDB-based test host.
"""
if args.nix:
# Use pwndbg, as build by nix.
gdb_path = local_pwndbg_root / "result" / "bin" / "pwndbg"
if not gdb_path.exists():
print("ERROR: No nix-compatible pwndbg found. Run nix build .#pwndbg-dev")
sys.exit(1)
elif args.group == Group.CROSS_ARCH_USER:
# Some systems don't ship 'gdb-multiarch', but support multiple
# architectures in their regular binaries. Try the regular GDB.
supports_arches = "py import os; archs = ['i386', 'aarch64', 'arm', 'mips', 'riscv', 'sparc']; os._exit(3) if len([arch for arch in archs if arch in gdb.architecture_names()]) == len(archs) else os._exit(2)"
gdb_path_str = shutil.which("pwndbg")
if gdb_path_str is None:
print("ERROR: No 'pwndbg' executables in path")
sys.exit(1)
result = subprocess.run([gdb_path_str, "-nx", "-ex", supports_arches], capture_output=True)
# GDB supports cross architecture targets
if result.returncode == 3:
gdb_path = Path(gdb_path_str)
else:
print("ERROR: 'pwndbg' does not support cross architecture targets")
sys.exit(1)
else:
# Use the regular system GDB.
gdb_path_str = shutil.which("pwndbg")
if gdb_path_str is None:
print("ERROR: No 'gdb' executable in path")
sys.exit(1)
gdb_path = Path(gdb_path_str)
from host.gdb import GDBTestHost
return GDBTestHost(
local_pwndbg_root,
local_pwndbg_root / args.group.library(),
local_pwndbg_root / args.group.binary_dir(),
gdb_path,
)
def get_lldb_host(args: argparse.Namespace, local_pwndbg_root: Path) -> TestHost:
"""
Build an LLDB-based test host.
"""
if args.nix:
print("ERROR: Nix is currently not supported with driver LLDB")
sys.exit(1)
from host.lldb import LLDBTestHost
return LLDBTestHost(
local_pwndbg_root,
local_pwndbg_root / args.group.library(),
local_pwndbg_root / args.group.binary_dir(),
)
class Group(Enum):
"""
Tests are divided into multiple groups.
"""
GDB = "gdb"
LLDB = "lldb"
DBG = "dbg"
CROSS_ARCH_USER = "cross-arch-user"
def __str__(self):
return self._value_
def library(self) -> Path:
"""
Subdirectory relative to the Pwndbg root containing the tests.
"""
match self:
case Group.GDB:
return Path("tests/library/gdb/")
case Group.LLDB:
return Path("tests/library/lldb/")
case Group.DBG:
return Path("tests/library/dbg/")
case Group.CROSS_ARCH_USER:
return Path("tests/library/qemu-user/")
case other:
raise AssertionError(f"group {other} is unaccounted for")
def binary_dir(self) -> Path:
"""
Subdirectory relative to the Pwndbg root containing the required
binaries for a given test group.
"""
match self:
case Group.GDB | Group.LLDB | Group.DBG:
return Path("tests/binaries/host/")
case Group.CROSS_ARCH_USER:
return Path("tests/binaries/qemu-user/")
case other:
raise AssertionError(f"group {other} is unaccounted for")
class Driver(Enum):
GDB = "gdb"
LLDB = "lldb"
def __str__(self):
return self._value_
def can_run(self, grp: Group) -> bool:
"""
Whether a given driver can run a given test group.
"""
match self:
case Driver.GDB:
match grp:
case Group.GDB:
return True
case Group.LLDB:
return False
case Group.DBG:
return True
case Group.CROSS_ARCH_USER:
return True
case Driver.LLDB:
match grp:
case Group.GDB:
return False
case Group.LLDB:
return True
case Group.DBG:
return True
case Group.CROSS_ARCH_USER:
return False
raise AssertionError(f"unaccounted for combination of driver '{self}' and group '{grp}'")
def parse_args():
parser = argparse.ArgumentParser(description="Run tests.")
parser.add_argument("-g", "--group", choices=list(Group), type=Group, required=True)
parser.add_argument(
"-d",
"--driver",
choices=list(Driver),
type=Driver,
required=True,
)
parser.add_argument(
"-p",
"--pdb",
action="store_true",
help="enable pdb (Python debugger) post mortem debugger on failed tests",
)
parser.add_argument("-c", "--cov", action="store_true", help="enable codecov")
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="display all test output instead of just failing test output",
)
parser.add_argument(
"-s", "--serial", action="store_true", help="run tests one at a time instead of in parallel"
)
parser.add_argument(
"--nix",
action="store_true",
help="run tests using built for nix environment",
)
parser.add_argument(
"--collect-only",
action="store_true",
help="only show the output of test collection, don't run any tests",
)
parser.add_argument(
"test_name_filter", nargs="?", help="run only tests that match the regex", default=".*"
)
return parser.parse_args()
def ensure_zig_path(local_pwndbg_root: Path):
if "ZIGPATH" not in os.environ:
# If ZIGPATH is not set, set it to $pwd/.zig
# In Docker environment this should by default be set to /opt/zig
os.environ["ZIGPATH"] = str(local_pwndbg_root / ".zig")
print(f'[+] ZIGPATH set to {os.environ["ZIGPATH"]}')
def make_all(path: Path, jobs: int = multiprocessing.cpu_count()):
"""
Build the binaries for a given test group.
"""
if not path.exists():
raise ValueError(f"given non-existent path {path}")
print(f"[+] make -C {path} -j{jobs} all")
try:
subprocess.check_call(["make", f"-j{jobs}", "all"], cwd=str(path))
except subprocess.CalledProcessError:
sys.exit(1)
class TestStats:
def __init__(self):
self.total_duration = 0
self.fail_tests = 0
self.pass_tests = 0
self.skip_tests = 0
self.fail_tests_names = []
def handle_test_result(self, case: str, test_result: TestResult, verbose: bool):
match test_result.status:
case TestStatus.FAILED:
self.fail_tests += 1
self.fail_tests_names.append(case)
case TestStatus.PASSED | TestStatus.XFAIL:
self.pass_tests += 1
case TestStatus.XPASS:
# Technically this is a failure, but Pwndbg does not consider it so.
self.pass_tests += 1
case TestStatus.SKIPPED:
self.skip_tests += 1
# skip_reason = " " + (
# process.stdout.split(test_status)[1].split("\n\n\x1b[33m")[0].replace("\n", "")
# )
self.total_duration += test_result.duration_ns
print(
f"{case:<100} {test_result.status} {test_result.duration_ns / 1000000000:.2f}s {test_result.context if test_result.context else ''}"
)
# Only show the output of failed tests unless the verbose flag was used
if verbose or test_result.status == TestStatus.FAILED:
print("")
print(test_result.stderr)
print(test_result.stdout)
if __name__ == "__main__":
host.main()
main()

Loading…
Cancel
Save