From 365af330ef9b65ede4414e76bd85f9bb331d612b Mon Sep 17 00:00:00 2001 From: "Matt." <4922458+mbrla0@users.noreply.github.com> Date: Thu, 17 Jul 2025 22:53:14 -0300 Subject: [PATCH] Add LLDB test driver and initial Debugger API tests (#3120) --- pwndbg/dbg/lldb/repl/__init__.py | 6 +- pwndbginit/pwndbg_lldb.py | 129 +++--- tests/host/__init__.py | 382 ++++-------------- tests/host/gdb/__init__.py | 63 +-- tests/host/gdb/pytests_collect.py | 5 + tests/host/gdb/pytests_launcher.py | 45 +++ tests/host/lldb/__init__.py | 79 ++++ tests/host/lldb/launch-guest.py | 125 ++++++ .../library/dbg/{__init__.py => conftest.py} | 0 tests/library/dbg/tests/__init__.py | 37 ++ tests/library/dbg/tests/test_test.py | 65 +++ tests/library/qemu-system/system-tests.sh | 3 +- tests/tests.py | 374 ++++++++++++++++- 13 files changed, 895 insertions(+), 418 deletions(-) create mode 100644 tests/host/lldb/launch-guest.py rename tests/library/dbg/{__init__.py => conftest.py} (100%) create mode 100644 tests/library/dbg/tests/__init__.py create mode 100644 tests/library/dbg/tests/test_test.py diff --git a/pwndbg/dbg/lldb/repl/__init__.py b/pwndbg/dbg/lldb/repl/__init__.py index 1ff00b599..b26716d62 100644 --- a/pwndbg/dbg/lldb/repl/__init__.py +++ b/pwndbg/dbg/lldb/repl/__init__.py @@ -287,7 +287,9 @@ def print_hint(msg: str, *args): @wrap_with_history def run( - controller: Callable[[PwndbgController], Coroutine[Any, Any, None]], debug: bool = False + controller: Callable[..., Coroutine[Any, Any, None]], + *args, + debug: bool = False, ) -> None: """ Runs the Pwndbg CLI through the given asynchronous controller. @@ -322,7 +324,7 @@ def run( show_greeting() last_command = "" - coroutine = controller(PwndbgController()) + coroutine = controller(PwndbgController(), *args) last_result: Any = None last_exc: Exception | None = None diff --git a/pwndbginit/pwndbg_lldb.py b/pwndbginit/pwndbg_lldb.py index e3fdbd6b6..9ac186baf 100755 --- a/pwndbginit/pwndbg_lldb.py +++ b/pwndbginit/pwndbg_lldb.py @@ -8,25 +8,11 @@ import re import shutil import subprocess import sys +from typing import Any +from typing import Callable +from typing import Coroutine from typing import List -PARSER = argparse.ArgumentParser(prog="pwndbg-lldb") -PARSER.add_argument("-v", "--verbose", action="store_true", help="Enable debug output") -PARSER.add_argument("target", nargs="?") -parser_attach = PARSER.add_mutually_exclusive_group() -parser_attach.add_argument( - "-n", "--attach-name", help="Tells the debugger to attach to a process with the given name." -) -parser_attach.add_argument( - "-p", "--attach-pid", help="Tells the debugger to attach to a process with the given pid." -) -PARSER.add_argument( - "-w", - "--wait-for", - action="store_true", - help="Tells the debugger to wait for a process with the given pid or name to launch before attaching.", -) - def find_lldb_version() -> List[int]: """ @@ -59,27 +45,14 @@ def find_lldb_python_path() -> str: return folder -def get_venv_bin_path(): - bin_dir = "Scripts" if os.name == "nt" else "bin" - return os.path.join(sys.prefix, bin_dir) - - -def prepend_venv_bin_to_path(): - # Set virtualenv's bin path (needed for utility tools like ropper, pwntools etc) - venv_bin = get_venv_bin_path() - path_elements = os.environ.get("PATH", "").split(os.pathsep) - if venv_bin in path_elements: - return - - path_elements.insert(0, venv_bin) - os.environ["PATH"] = os.pathsep.join(path_elements) - - -def main(): - prepend_venv_bin_to_path() - - args = PARSER.parse_args() - debug = args.verbose +def launch( + controller: Callable[..., Coroutine[Any, Any, None]], + *args, + debug: bool = False, +) -> None: + """ + Launch Pwndbg with the given controller. + """ if sys.platform == "linux" and "LLDB_DEBUGSERVER_PATH" not in os.environ: os.environ["LLDB_DEBUGSERVER_PATH"] = shutil.which("lldb-server") @@ -121,15 +94,62 @@ def main(): print("[-] Launcher: Initializing Pwndbg") lldbinit.main(debugger, lldb_version[0], lldb_version[1], debug=debug) - from pwndbg.dbg.lldb.repl import PwndbgController - from pwndbg.dbg.lldb.repl import print_error - from pwndbg.dbg.lldb.repl import print_warn from pwndbg.dbg.lldb.repl import run as run_repl if debug: print("[-] Launcher: Entering Pwndbg CLI") - # Prepare the startup commands. + run_repl(controller, *args, debug=debug) + + # Dispose of our debugger and terminate LLDB. + lldb.SBDebugger.Destroy(debugger) + lldb.SBDebugger.Terminate() + + +def get_venv_bin_path(): + bin_dir = "Scripts" if os.name == "nt" else "bin" + return os.path.join(sys.prefix, bin_dir) + + +def prepend_venv_bin_to_path(): + # Set virtualenv's bin path (needed for utility tools like ropper, pwntools etc) + venv_bin = get_venv_bin_path() + path_elements = os.environ.get("PATH", "").split(os.pathsep) + if venv_bin in path_elements: + return + + path_elements.insert(0, venv_bin) + os.environ["PATH"] = os.pathsep.join(path_elements) + + +def main() -> None: + """ + Entry point for the pwndbg-lldb command line tool. + """ + prepend_venv_bin_to_path() + + # Parse the arguments we were given. + parser = argparse.ArgumentParser(prog="pwndbg-lldb") + parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output") + parser.add_argument("target", nargs="?") + parser_attach = parser.add_mutually_exclusive_group() + parser_attach.add_argument( + "-n", "--attach-name", help="Tells the debugger to attach to a process with the given name." + ) + parser_attach.add_argument( + "-p", "--attach-pid", help="Tells the debugger to attach to a process with the given pid." + ) + parser.add_argument( + "-w", + "--wait-for", + action="store_true", + help="Tells the debugger to wait for a process with the given pid or name to launch before attaching.", + ) + + args = parser.parse_args() + debug = args.verbose + + # Prepare the startup commands based on those arguments. startup = [] if args.target: # DEVIATION: The LLDB CLI silently ignores any target information passed @@ -145,7 +165,7 @@ def main(): # both '--attach-name' and '--attach-pid', it silently ignores it when # used with the latter. Pwndbg prints out a warning, instead. if args.wait_for: - print_warn("'--wait-for' has no effect when used with '--attach-pid'") + print("warn: '--wait-for' has no effect when used with '--attach-pid'") startup.append(f'process attach --pid "{args.attach_pid}"') else: @@ -154,19 +174,23 @@ def main(): # nesting argument groups has been deprecated since Python 3.11, and # the deprecation message suggests it was never even supported in # the first place :/ - print_error( - "'--wait-for' must be used in combination with either '--attach-name' or '--attach-pid'" + print( + "error: '--wait-for' must be used in combination with either '--attach-name' or '--attach-pid'" ) - PARSER.print_usage() + parser.print_usage() sys.exit(1) if (args.attach_pid is not None or args.attach_name is not None) and args.target: - print_warn( - "have both a target and an attach request, your target may be overwritten on attach" + print( + "warn: have both a target and an attach request, your target may be overwritten on attach" ) def drive(startup: List[str] | None): - async def drive(c: PwndbgController): + async def drive(c): + from pwndbg.dbg.lldb.repl import PwndbgController + + assert isinstance(c, PwndbgController) + if startup is not None: for line in startup: await c.execute(line) @@ -176,11 +200,8 @@ def main(): return drive - run_repl(drive(startup), debug=debug) - - # Dispose of our debugger and terminate LLDB. - lldb.SBDebugger.Destroy(debugger) - lldb.SBDebugger.Terminate() + # Launch Pwndbg in interactive mode. + launch(drive(startup), debug=debug) if __name__ == "__main__": diff --git a/tests/host/__init__.py b/tests/host/__init__.py index 6cf59c718..62c75b723 100644 --- a/tests/host/__init__.py +++ b/tests/host/__init__.py @@ -13,164 +13,82 @@ import time from enum import Enum from pathlib import Path from subprocess import CompletedProcess +from typing import Any +from typing import Awaitable +from typing import Callable +from typing import Coroutine from typing import List from typing import Tuple -def main(): - args = parse_args() - coverage_out = None - if args.cov: - print("Will run codecov") - coverage_out = Path(".cov/coverage") - if args.pdb: - print("Will run tests in serial and with Python debugger") - args.serial = True - - local_pwndbg_root = (Path(os.path.dirname(__file__)) / ".." / "../").resolve() - print(f"[*] Local Pwndbg root: {local_pwndbg_root}") - - # Build the binaries for the test group. - # - # As the nix store is read-only, we always use the local Pwndbg root for - # building tests, even if the user has requested a nix-compatible test. - # - # Ideally, however, we would build the test targets as part of `nix verify`. - ensure_zig_path(local_pwndbg_root) - make_all(local_pwndbg_root / args.group.binary_dir()) - - if not args.driver.can_run(args.group): - print( - f"ERROR: Driver '{args.driver}' can't run test group '{args.group}'. Use another driver." - ) - sys.exit(1) - - match args.driver: - case Driver.GDB: - host = get_gdb_host(args, local_pwndbg_root) - - # Handle the case in which the user only wants the collection to run. - if args.collect_only: - for test in host.collect(): - print(test) - sys.exit(0) - - # Actually run the tests. - run_tests_and_print_stats( - host, args.test_name_filter, args.pdb, args.serial, args.verbose, coverage_out - ) - - -def run_tests_and_print_stats( - host: TestHost, - regex_filter: str | None, - pdb: bool, - serial: bool, - verbose: bool, - coverage_out: Path | None, -): +def _collection_from_pytest( + result: CompletedProcess[str], pwndbg_root: Path, pytest_root: Path +) -> List[str]: """ - Runs all the tests made available by a given test host. + Given the output of a completed Pytest collection, return a list of tests. """ - stats = TestStats() - start = time.monotonic_ns() + tests_collect_output = result.stdout - # PDB tests always run in sequence. - if pdb and not serial: - print("WARNING: Python Debugger (PDB) requires serial execution, but the user has") - print(" requested parallel execution. Tests will *not* run in parallel.") - serial = True + if result.returncode != 0: + raise RuntimeError(f"collection command failed: {result.stderr} {result.stdout}") - tests_list = host.collect() - if regex_filter is not None: - # Filter test names if required. - tests_list = [case for case in tests_list if re.search(regex_filter, case)] - - if serial: - print("\nRunning tests in series") - for test in tests_list: - result = host.run(test, coverage_out, pdb) - stats.handle_test_result(test, result, verbose) - else: - print("\nRunning tests in parallel") - with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: - for test in tests_list: - executor.submit(host.run, test, coverage_out, pdb).add_done_callback( - # `test=test` forces the variable to bind early. This will - # change the type of the lambda, however, so we have to - # assure MyPy we know what we're doing. - lambda future, test=test: stats.handle_test_result( # type: ignore[misc] - test, future.result(), verbose - ) - ) - - # Return SIGINT to the default behavior. - signal.signal(signal.SIGINT, signal.SIG_DFL) - - end = time.monotonic_ns() - duration = end - start - print("") - print("*********************************") - print("********* TESTS SUMMARY *********") - print("*********************************") - print( - f"Time Spent : {duration / 1000000000:.2f}s (cumulative: {stats.total_duration / 1000000000:.2f}s)" - ) - print(f"Tests Passed : {stats.pass_tests}") - print(f"Tests Skipped: {stats.skip_tests}") - print(f"Tests Failed : {stats.fail_tests}") + # Extract the test names from the output using regex + # + # _run_gdb executes it in the current working directory, and so paths + # printed by pytest are relative to it. + path_spec = pytest_root.resolve().relative_to(pwndbg_root / "tests") + pattern = re.compile(rf"{path_spec}.*::.*") + matches = pattern.findall(tests_collect_output) - if stats.fail_tests != 0: - print("\nFailing tests:") - for test_case in stats.fail_tests_names: - print(f"- {test_case}") - sys.exit(1) + return list(matches) -def get_gdb_host(args: argparse.Namespace, local_pwndbg_root: Path) -> TestHost: +def _result_from_pytest(result: CompletedProcess[str], duration_ns: int) -> TestResult: """ - Build a GDB-based test host. + Given the output of a completed test, return a `TestResult`. """ - if args.nix: - # Use pwndbg, as build by nix. - gdb_path = local_pwndbg_root / "result" / "bin" / "pwndbg" - - if not gdb_path.exists(): - print("ERROR: No nix-compatible pwndbg found. Run nix build .#pwndbg-dev") - sys.exit(1) - elif args.group == Group.CROSS_ARCH_USER: - # Some systems don't ship 'gdb-multiarch', but support multiple - # architectures in their regular binaries. Try the regular GDB. - supports_arches = "py import os; archs = ['i386', 'aarch64', 'arm', 'mips', 'riscv', 'sparc']; os._exit(3) if len([arch for arch in archs if arch in gdb.architecture_names()]) == len(archs) else os._exit(2)" - gdb_path_str = shutil.which("pwndbg") - if gdb_path_str is None: - print("ERROR: No 'pwndbg' executables in path") - sys.exit(1) - - result = subprocess.run([gdb_path_str, "-nx", "-ex", supports_arches], capture_output=True) - # GDB supports cross architecture targets - if result.returncode == 3: - gdb_path = Path(gdb_path_str) + # Determine low-granularity status from process return code. + status = TestStatus.PASSED if result.returncode == 0 else TestStatus.FAILED + + # Determine high-granularity status from process output, if possible. + stdout_status = None + stdout_context = None + if result.stdout is not None: + entries = re.search( + r"(\x1b\[3.m(PASSED|FAILED|SKIPPED|XPASS|XFAIL)\x1b\[0m)( .*::.* -)?( (.*))?", + result.stdout, + re.MULTILINE, + ) + if entries: + stdout_status = entries[2] + stdout_context = entries[5] + + # If possible, augment the status with the high-granularity output. + if stdout_status is not None: + # Check the consistency between the values. + if status == TestStatus.FAILED and stdout_status != "FAILED": + # They disagree. + # + # In this case, we should believe the more accurate but + # lower-granularity status value. This may happen if the output + # of the test includes any of the words we match against. + pass else: - print("ERROR: 'pwndbg' does not support cross architecture targets") - sys.exit(1) - else: - # Use the regular system GDB. - gdb_path_str = shutil.which("pwndbg") - if gdb_path_str is None: - print("ERROR: No 'gdb' executable in path") - sys.exit(1) - gdb_path = Path(gdb_path_str) - - from host.gdb import GDBTestHost - - return GDBTestHost( - local_pwndbg_root, - local_pwndbg_root / args.group.library(), - local_pwndbg_root / args.group.binary_dir(), - gdb_path, - ) + match stdout_status: + case "PASSED": + status = TestStatus.PASSED + case "SKIPPED": + status = TestStatus.SKIPPED + case "XPASS": + status = TestStatus.XPASS + case "XFAIL": + status = TestStatus.XFAIL + case _: + # Also a disegreement. Keep the low-granularity status. + pass + + return TestResult(status, duration_ns, result.stdout, result.stderr, stdout_context) class TestStatus(Enum): @@ -231,179 +149,33 @@ class TestHost: Collection of code coverage data may be enabled for the test by specifying a coverage file path in `coverage_out`. """ - pass + raise NotImplementedError() def collect(self) -> List[str]: """ Collect the names of all the tests available to this host. """ - pass - + raise NotImplementedError() -class Group(Enum): - """ - Tests are divided into multiple groups. - """ - - GDB = "gdb" - LLDB = "lldb" - DBG = "dbg" - CROSS_ARCH_USER = "cross-arch-user" - - def __str__(self): - return self._value_ - def library(self) -> Path: +class Controller: + def launch(self, binary: Path) -> Awaitable[None]: """ - Subdirectory relative to the Pwndbg root containing the tests. + Launch the binary with the given path, relative to the binaries folder + for the calling test. """ - match self: - case Group.GDB: - return Path("tests/library/gdb/") - case Group.LLDB: - return Path("tests/library/lldb/") - case Group.DBG: - return Path("tests/library/dbg/") - case Group.CROSS_ARCH_USER: - return Path("tests/library/qemu-user/") - case other: - raise AssertionError(f"group {other} is unaccounted for") + raise NotImplementedError() - def binary_dir(self) -> Path: - """ - Subdirectory relative to the Pwndbg root containing the required - binaries for a given test group. - """ - match self: - case Group.GDB | Group.LLDB | Group.DBG: - return Path("tests/binaries/host/") - case Group.CROSS_ARCH_USER: - return Path("tests/binaries/qemu-user/") - case other: - raise AssertionError(f"group {other} is unaccounted for") - -class Driver(Enum): - GDB = "gdb" - - def __str__(self): - return self._value_ - - def can_run(self, grp: Group) -> bool: - """ - Wether a given driver can run a given test group. - """ - match self: - case Driver.GDB: - match grp: - case Group.GDB: - return True - case Group.LLDB: - return False - case Group.DBG: - return True - case Group.CROSS_ARCH_USER: - return True - raise AssertionError(f"unaccounted for combination of driver '{self}' and group '{grp}'") - - -def parse_args(): - parser = argparse.ArgumentParser(description="Run tests.") - parser.add_argument("-g", "--group", choices=list(Group), type=Group, required=True) - parser.add_argument( - "-d", - "--driver", - choices=list(Driver), - type=Driver, - required=True, - ) - parser.add_argument( - "-p", - "--pdb", - action="store_true", - help="enable pdb (Python debugger) post mortem debugger on failed tests", - ) - parser.add_argument("-c", "--cov", action="store_true", help="enable codecov") - parser.add_argument( - "-v", - "--verbose", - action="store_true", - help="display all test output instead of just failing test output", - ) - parser.add_argument( - "-s", "--serial", action="store_true", help="run tests one at a time instead of in parallel" - ) - parser.add_argument( - "--nix", - action="store_true", - help="run tests using built for nix environment", - ) - parser.add_argument( - "--collect-only", - action="store_true", - help="only show the output of test collection, don't run any tests", - ) - parser.add_argument( - "test_name_filter", nargs="?", help="run only tests that match the regex", default=".*" - ) - return parser.parse_args() - - -def ensure_zig_path(local_pwndbg_root: Path): - if "ZIGPATH" not in os.environ: - # If ZIGPATH is not set, set it to $pwd/.zig - # In Docker environment this should by default be set to /opt/zig - os.environ["ZIGPATH"] = str(local_pwndbg_root / ".zig") - print(f'[+] ZIGPATH set to {os.environ["ZIGPATH"]}') - - -def make_all(path: Path, jobs: int = multiprocessing.cpu_count()): +def start(controller: Callable[[Controller], Coroutine[Any, Any, None]]) -> None: """ - Build the binaries for a given test group. - """ - if not path.exists(): - raise ValueError(f"given non-existent path {path}") - - print(f"[+] make -C {path} -j{jobs} all") - try: - subprocess.check_call(["make", f"-j{jobs}", "all"], cwd=str(path)) - except subprocess.CalledProcessError: - sys.exit(1) - - -class TestStats: - def __init__(self): - self.total_duration = 0 - self.fail_tests = 0 - self.pass_tests = 0 - self.skip_tests = 0 - self.fail_tests_names = [] + The start function. - def handle_test_result(self, case: str, test_result: TestResult, verbose: bool): - match test_result.status: - case TestStatus.FAILED | TestStatus.XFAIL: - self.fail_tests += 1 - self.fail_tests_names.append(case) - case TestStatus.PASSED | TestStatus.XPASS: - self.pass_tests += 1 - case TestStatus.SKIPPED: - self.skip_tests += 1 - # skip_reason = " " + ( - # process.stdout.split(test_status)[1].split("\n\n\x1b[33m")[0].replace("\n", "") - # ) + Both the testing hosts and the tests themselves share this module, and this + function is used by the test piping to start the async debugger runtime. - self.total_duration += test_result.duration_ns - - print( - f"{case:<100} {test_result.status} {test_result.duration_ns / 1000000000:.2f}s {test_result.context if test_result.context else ''}" - ) - - # Only show the output of failed tests unless the verbose flag was used - if verbose or test_result.status == TestStatus.FAILED: - print("") - print(test_result.stderr) - print(test_result.stdout) - - -if __name__ == "__main__": - main() + This function must be replaced in the test. + """ + raise AssertionError( + "either called host.start() from the testing host, or testing code did not replace it" + ) diff --git a/tests/host/gdb/__init__.py b/tests/host/gdb/__init__.py index d1c9598a2..9811fade3 100644 --- a/tests/host/gdb/__init__.py +++ b/tests/host/gdb/__init__.py @@ -11,6 +11,8 @@ from typing import List from host import TestHost from host import TestResult from host import TestStatus +from host import _collection_from_pytest +from host import _result_from_pytest class GDBTestHost(TestHost): @@ -76,6 +78,7 @@ class GDBTestHost(TestHost): env["PWNDBG_DISABLE_COLORS"] = "1" env["GDB_BIN_PATH"] = str(self._gdb_path) env["TEST_BINARIES_ROOT"] = str(self._binaries_root) + env["TEST_PWNDBG_ROOT"] = str(self._pwndbg_root) if interactive: env["USE_PDB"] = "1" @@ -86,47 +89,7 @@ class GDBTestHost(TestHost): ) duration = time.monotonic_ns() - started_at - # Determine low-granularity status from process return code. - status = TestStatus.PASSED if result.returncode == 0 else TestStatus.FAILED - - # Determine high-granularity status from process output, if possible. - stdout_status = None - stdout_context = None - if not interactive: - entries = re.search( - r"(\x1b\[3.m(PASSED|FAILED|SKIPPED|XPASS|XFAIL)\x1b\[0m)( .*::.* -)?( (.*))?", - result.stdout, - re.MULTILINE, - ) - if entries: - stdout_status = entries[2] - stdout_context = entries[5] - - # If possible, augment the status with the high-granularity output. - if stdout_status is not None: - # Check the consistency between the values. - if status == TestStatus.FAILED and stdout_status != "FAILED": - # They disagree. - # - # In this case, we should believe the more accurate but - # lower-granularity status value. This may happen if the output - # of the test includes any of the words we match against. - pass - else: - match stdout_status: - case "PASSED": - status = TestStatus.PASSED - case "SKIPPED": - status = TestStatus.SKIPPED - case "XPASS": - status = TestStatus.XPASS - case "XFAIL": - status = TestStatus.XFAIL - case _: - # Also a disegreement. Keep the low-granularity status. - pass - - return TestResult(status, duration, result.stdout, result.stderr, stdout_context) + return _result_from_pytest(result, duration) def collect(self) -> List[str]: # NOTE: We run tests under GDB sessions and because of some cleanup/tests dependencies problems @@ -135,19 +98,11 @@ class GDBTestHost(TestHost): env = os.environ.copy() env["TEST_BINARIES_ROOT"] = str(self._binaries_root) + env["TEST_PWNDBG_ROOT"] = str(self._pwndbg_root) env["TESTS_PATH"] = str(self._pytest_root) result = self._run_gdb(target, env=env) - tests_collect_output = result.stdout - - if result.returncode != 0: - raise RuntimeError(f"collection command failed: {result.stderr} {result.stdout}") - - # Extract the test names from the output using regex - # - # _run_gdb executes it in the current working directory, and so paths - # printed by pytest are relative to it. - path_spec = self._pytest_root.resolve().relative_to(self._pwndbg_root) - pattern = re.compile(rf"{path_spec}.*::.*") - matches = pattern.findall(tests_collect_output) - return list(matches) + names = _collection_from_pytest(result, self._pwndbg_root, self._pytest_root) + + # We execute from Pwndbg root, so we need to prepend tests/ to the names. + return [f"tests/{name}" for name in names] diff --git a/tests/host/gdb/pytests_collect.py b/tests/host/gdb/pytests_collect.py index 6c091d264..631a206b1 100644 --- a/tests/host/gdb/pytests_collect.py +++ b/tests/host/gdb/pytests_collect.py @@ -5,12 +5,17 @@ import sys import pytest +PWNDBG_ROOT = os.environ.get("TEST_PWNDBG_ROOT") TESTS_PATH = os.environ.get("TESTS_PATH") if TESTS_PATH is None: print("'TESTS_PATH' environment variable not set. Failed to collect tests.") sys.exit(1) +host_home = f"{PWNDBG_ROOT}/tests/" +if host_home not in sys.path: + sys.path.append(host_home) + class CollectTestFunctionNames: """See https://github.com/pytest-dev/pytest/issues/2039#issuecomment-257753269""" diff --git a/tests/host/gdb/pytests_launcher.py b/tests/host/gdb/pytests_launcher.py index 95382ec70..c1fe70743 100644 --- a/tests/host/gdb/pytests_launcher.py +++ b/tests/host/gdb/pytests_launcher.py @@ -2,10 +2,55 @@ from __future__ import annotations import os import sys +from pathlib import Path +from typing import Any +from typing import Callable +from typing import Coroutine import coverage +import gdb import pytest +PWNDBG_ROOT = os.environ["TEST_PWNDBG_ROOT"] + +# Prepare the test host environment for the Debugger API tests. +host_home = f"{PWNDBG_ROOT}/tests/" +if host_home not in sys.path: + sys.path.append(host_home) + +import host + + +class _GDBController(host.Controller): + async def launch(self, binary_path: Path) -> None: + """ + Launch the given binary. + + GDB hides the asynchronous heavy lifting from us, so this call is + synchronous. + """ + os.environ["PWNDBG_IN_TEST"] = "1" + gdb.execute(f"file {binary_path}") + gdb.execute("set exception-verbose on") + gdb.execute("set width 80") + gdb.execute("set context-reserve-lines never") + os.environ["COLUMNS"] = "80" + gdb.execute("starti " + " ".join(args)) + + +def _start(outer: Callable[[host.Controller], Coroutine[Any, Any, None]]) -> None: + # The GDB controller is entirely synchronous, so keep advancing the + # corountine unconditionally until it ends.. + coroutine = outer(_GDBController()) + try: + coroutine.send(None) + except StopIteration: + pass + + +host.start = _start + +# Start the test, proper. use_pdb = os.environ.get("USE_PDB") == "1" sys._pwndbg_unittest_run = True # type: ignore[attr-defined] diff --git a/tests/host/lldb/__init__.py b/tests/host/lldb/__init__.py index e69de29bb..9cfa8a2e1 100644 --- a/tests/host/lldb/__init__.py +++ b/tests/host/lldb/__init__.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import os +import subprocess +import sys +import time +from pathlib import Path +from typing import List + +from host import TestHost +from host import TestResult +from host import _collection_from_pytest +from host import _result_from_pytest + + +class LLDBTestHost(TestHost): + def __init__(self, pwndbg_root: Path, pytest_root: Path, binaries_root: Path): + assert pwndbg_root.exists() + assert pwndbg_root.is_dir() + + assert pytest_root.exists() + assert pytest_root.is_dir() + + assert binaries_root.exists() + assert binaries_root.is_dir() + + self._pwndbg_root = pwndbg_root + self._pytest_root = pytest_root + self._binaries_root = binaries_root + + def _launch( + self, + op: str, + test_name: str | None, + capture: bool, + pdb: bool, + ) -> subprocess.CompletedProcess[str]: + target = self._pwndbg_root / "tests/host/lldb/launch-guest.py" + + assert target.exists() + assert target.is_file() + + assert op in ("RUN-TEST", "COLLECT") + assert op != "RUN-TEST" or test_name is not None + + interpreter = Path(sys.executable) + + assert interpreter.exists() + assert interpreter.is_file() + + env = os.environ.copy() + env["TEST_OPERATION"] = op + env["TEST_PYTEST_ROOT"] = str(self._pytest_root) + env["TEST_PWNDBG_ROOT"] = str(self._pwndbg_root) + env["TEST_BINARIES_ROOT"] = str(self._binaries_root) + env["TEST_PDB_ON_FAIL"] = "1" if pdb else "0" + if test_name is not None: + env["TEST_NAME"] = test_name + + return subprocess.run( + [interpreter, str(target)], capture_output=capture, text=True, env=env + ) + + def collect(self) -> List[str]: + result = self._launch("COLLECT", None, True, False) + return _collection_from_pytest(result, self._pwndbg_root, self._pytest_root) + + def run(self, case: str, coverage_out: Path | None, interactive: bool) -> TestResult: + if coverage_out is not None: + # Do before PR is merged. + # + # TODO: Add CodeCov for the LLDB test driver + print("[-] Warning: LLDB does not yet support code coverage") + + beg = time.monotonic_ns() + result = self._launch("RUN-TEST", case, not interactive, interactive) + end = time.monotonic_ns() + + return _result_from_pytest(result, end - beg) diff --git a/tests/host/lldb/launch-guest.py b/tests/host/lldb/launch-guest.py new file mode 100644 index 000000000..016760b38 --- /dev/null +++ b/tests/host/lldb/launch-guest.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import os +import sys +from enum import Enum +from pathlib import Path +from typing import Any +from typing import Callable +from typing import Coroutine +from typing import List + + +async def _run(ctrl: Any, outer: Callable[..., Coroutine[Any, Any, None]]) -> None: + # We only import this here, as pwndbg-lldb is responsible for setting Pwndbg + # up on our behalf. + from host import Controller + + from pwndbg.dbg.lldb.repl import PwndbgController + + assert isinstance(ctrl, PwndbgController) + + # Idealy we'd define this in an outer scope, but doing it in here gains us + # proper access to type names. + class _LLDBController(Controller): + def __init__(self, pc: PwndbgController): + self.pc = pc + + async def launch(self, binary: Path) -> None: + await self.pc.execute(f"target create {binary}") + await self.pc.execute("process launch -s") + + await outer(_LLDBController(ctrl)) + + +def run(pytest_args: List[str], pytest_plugins: List[Any] | None) -> int: + # The import path is set up before this function is called. + import host + from host import Controller + + from pwndbginit import pwndbg_lldb + + # Replace host.start with a proper implementation of the start command. + def _start(outer: Callable[[Controller], Coroutine[Any, Any, None]]) -> None: + pwndbg_lldb.launch(_run, outer, debug=True) + + host.start = _start + + # Run Pytest. + import pytest + + return pytest.main(pytest_args, plugins=pytest_plugins) + + +class Operation(Enum): + RUN_TEST = "RUN-TEST" + COLLECT = "COLLECT" + + def __str__(self) -> str: + return self._value_ + + +class CollectTestFunctionNames: + "See https://github.com/pytest-dev/pytest/issues/2039#issuecomment-257753269" + + def __init__(self): + self.collected = [] + + def pytest_collection_modifyitems(self, items): + for item in items: + self.collected.append(item.nodeid) + + +if __name__ == "__main__": + pwndbg_home = Path(os.environ["TEST_PWNDBG_ROOT"]) + + assert pwndbg_home.exists() + assert pwndbg_home.is_dir() + + pwndbg_home = pwndbg_home.resolve(strict=True) + + host_home = pwndbg_home / "tests" + + assert host_home.exists() + assert host_home.is_dir() + + # Add to the path so we can access both Pwndbg and the testing host library. + if str(host_home) not in sys.path: + sys.path = [str(host_home)] + sys.path + + if str(pwndbg_home) not in sys.path: + sys.path = [str(pwndbg_home)] + sys.path + + # Prepare the requested operation. + op = Operation(os.environ["TEST_OPERATION"]) + match op: + case Operation.COLLECT: + pytest_home = Path(os.environ["TEST_PYTEST_ROOT"]) + assert pytest_home.exists() + assert pytest_home.is_dir() + + pytest_args = ["--collect-only", str(pytest_home)] + pytest_plugins = [CollectTestFunctionNames()] + case Operation.RUN_TEST: + test_name = os.environ["TEST_NAME"] + + # Ideally, we'd check that the test name is both valid and only + # matches a single test in the library, but checking that it is at + # least not empty should be good enough, provided the test host + # is careful. + assert test_name + + pytest_args = [test_name, "-vvv", "-s", "--showlocals", "--color=yes"] + if os.environ["TEST_PDB_ON_FAIL"] == "1": + pytest_args.append("--pdb") + + pytest_plugins = None + + # Start the test, proper. + status = run(pytest_args, pytest_plugins) + + if op == Operation.COLLECT: + for nodeid in pytest_plugins[0].collected: + print(nodeid) + + sys.exit(status) diff --git a/tests/library/dbg/__init__.py b/tests/library/dbg/conftest.py similarity index 100% rename from tests/library/dbg/__init__.py rename to tests/library/dbg/conftest.py diff --git a/tests/library/dbg/tests/__init__.py b/tests/library/dbg/tests/__init__.py new file mode 100644 index 000000000..a767b01e2 --- /dev/null +++ b/tests/library/dbg/tests/__init__.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import functools +import os +from inspect import signature +from typing import Any +from typing import Callable +from typing import Concatenate +from typing import Coroutine + +import host +from host import Controller + +BINARIES_PATH = os.environ.get("TEST_BINARIES_ROOT") + + +def pwndbg_test[**T]( + test: Callable[Concatenate[Controller, T], Coroutine[Any, Any, None]], +) -> Callable[T, None]: + @functools.wraps(test) + def inner_test(*args: T.args, **kwargs: T.kwargs) -> None: + async def _test(controller: Controller) -> None: + await test(controller, *args, **kwargs) + + print(f"[+] Launching test {test.__name__} asynchronously") + host.start(_test) + + # Remove the controller from the signature, as seen by Pytest. + sig = signature(inner_test) + sig = sig.replace(parameters=tuple(sig.parameters.values())[1:]) + inner_test.__signature__ = sig + + return inner_test + + +def get_binary(name: str) -> str: + return os.path.join(BINARIES_PATH, name) diff --git a/tests/library/dbg/tests/test_test.py b/tests/library/dbg/tests/test_test.py new file mode 100644 index 000000000..ffc69a054 --- /dev/null +++ b/tests/library/dbg/tests/test_test.py @@ -0,0 +1,65 @@ +""" +Metatests. + +These tests are intended to check the functioning of the testing code itself, +rather than of Pwndbg more generally. + +Some tests come in SUCCESS and XFAIL pairs, and they require that both succeed +in order for the overall test to succeed, as they contain no inner test logic +other than the minimum necessary to start the asynchronous controller function. + +This module is responsible for testing the pwndbg_test decorator for async +controller tests. +""" + +from __future__ import annotations + +import host +import pytest +from host import Controller + +from . import get_binary +from . import pwndbg_test + + +@pytest.mark.xfail +def test_starts_no_decorator_xfail() -> None: + async def run(ctrl: Controller): + raise RuntimeError("should fail!") + + host.start(run) + + +def test_starts_no_decorator() -> None: + async def run(ctrl: Controller): + pass + + host.start(run) + + +@pytest.mark.xfail +@pwndbg_test +async def test_starts_xfail(ctrl: Controller) -> None: + raise RuntimeError("should fail") + + +@pwndbg_test +async def test_starts(ctrl: Controller) -> None: + pass + + +@pwndbg_test +async def test_launch(ctrl: Controller) -> None: + """ + Launches a process and checks if a simple static CString can be read from it. + """ + import pwndbg + import pwndbg.aglib.typeinfo + + await ctrl.launch(get_binary("memory.out")) + + inf = pwndbg.dbg.selected_inferior() + addr = inf.lookup_symbol("short_str") + string = addr.cast(pwndbg.aglib.typeinfo.char.pointer()).string() + + assert string == "some cstring here" diff --git a/tests/library/qemu-system/system-tests.sh b/tests/library/qemu-system/system-tests.sh index 657f4dfad..98e96699f 100755 --- a/tests/library/qemu-system/system-tests.sh +++ b/tests/library/qemu-system/system-tests.sh @@ -155,7 +155,7 @@ run_gdb() { # NOTE: We run tests under GDB sessions and because of some cleanup/tests dependencies problems # we decided to run each test in a separate GDB session gdb_args=(--command ../../host/gdb/pytests_collect.py) -TESTS_COLLECT_OUTPUT=$(TESTS_PATH="$ROOT_DIR/tests/library/qemu-system/tests/" run_gdb "x86_64" 0 "${gdb_args[@]}") +TESTS_COLLECT_OUTPUT=$(TESTS_PATH="$ROOT_DIR/tests/library/qemu-system/tests/" TEST_PWNDBG_ROOT="${PWNDBG_ABS_PATH}" run_gdb "x86_64" 0 "${gdb_args[@]}") if [ $? -eq 1 ]; then echo -E "$TESTS_COLLECT_OUTPUT" @@ -201,6 +201,7 @@ run_test() { PWNDBG_ARCH="${arch}" \ PWNDBG_KERNEL_TYPE="${kernel_type}" \ PWNDBG_KERNEL_VERSION="${kernel_version}" \ + TEST_PWNDBG_ROOT="${PWNDBG_ABS_PATH}" \ run_gdb "${arch}" $should_drop_to_pdb "${gdb_args[@]}" return $? } diff --git a/tests/tests.py b/tests/tests.py index 058002e41..b8e92c5b1 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,7 +1,377 @@ #!/usr/bin/env python3 from __future__ import annotations -import host +import argparse +import concurrent.futures +import multiprocessing +import os +import re +import shutil +import signal +import subprocess +import sys +import time +from enum import Enum +from pathlib import Path + +from host import TestHost +from host import TestResult +from host import TestStatus + + +def main(): + args = parse_args() + coverage_out = None + if args.cov: + print("Will run codecov") + coverage_out = Path(".cov/coverage") + if args.pdb: + print("Will run tests in serial and with Python debugger") + args.serial = True + + local_pwndbg_root = (Path(os.path.dirname(__file__)) / "../").resolve() + print(f"[*] Local Pwndbg root: {local_pwndbg_root}") + + # Build the binaries for the test group. + # + # As the nix store is read-only, we always use the local Pwndbg root for + # building tests, even if the user has requested a nix-compatible test. + # + # Ideally, however, we would build the test targets as part of `nix verify`. + ensure_zig_path(local_pwndbg_root) + make_all(local_pwndbg_root / args.group.binary_dir()) + + if not args.driver.can_run(args.group): + print( + f"ERROR: Driver '{args.driver}' can't run test group '{args.group}'. Use another driver." + ) + sys.exit(1) + + match args.driver: + case Driver.GDB: + host = get_gdb_host(args, local_pwndbg_root) + case Driver.LLDB: + host = get_lldb_host(args, local_pwndbg_root) + + # Handle the case in which the user only wants the collection to run. + if args.collect_only: + for test in host.collect(): + print(test) + sys.exit(0) + + # Actually run the tests. + run_tests_and_print_stats( + host, args.test_name_filter, args.pdb, args.serial, args.verbose, coverage_out + ) + + +def run_tests_and_print_stats( + host: TestHost, + regex_filter: str | None, + pdb: bool, + serial: bool, + verbose: bool, + coverage_out: Path | None, +): + """ + Runs all the tests made available by a given test host. + """ + stats = TestStats() + start = time.monotonic_ns() + + # PDB tests always run in sequence. + if pdb and not serial: + print("WARNING: Python Debugger (PDB) requires serial execution, but the user has") + print(" requested parallel execution. Tests will *not* run in parallel.") + serial = True + + tests_list = host.collect() + if regex_filter is not None: + # Filter test names if required. + tests_list = [case for case in tests_list if re.search(regex_filter, case)] + + if serial: + print("\nRunning tests in series") + for test in tests_list: + result = host.run(test, coverage_out, pdb) + stats.handle_test_result(test, result, verbose) + else: + print("\nRunning tests in parallel") + with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + for test in tests_list: + executor.submit(host.run, test, coverage_out, pdb).add_done_callback( + # `test=test` forces the variable to bind early. This will + # change the type of the lambda, however, so we have to + # assure MyPy we know what we're doing. + lambda future, test=test: stats.handle_test_result( # type: ignore[misc] + test, future.result(), verbose + ) + ) + + # Return SIGINT to the default behavior. + signal.signal(signal.SIGINT, signal.SIG_DFL) + + end = time.monotonic_ns() + duration = end - start + print("") + print("*********************************") + print("********* TESTS SUMMARY *********") + print("*********************************") + print( + f"Time Spent : {duration / 1000000000:.2f}s (cumulative: {stats.total_duration / 1000000000:.2f}s)" + ) + print(f"Tests Passed : {stats.pass_tests}") + print(f"Tests Skipped: {stats.skip_tests}") + print(f"Tests Failed : {stats.fail_tests}") + + if stats.fail_tests != 0: + print("\nFailing tests:") + for test_case in stats.fail_tests_names: + print(f"- {test_case}") + sys.exit(1) + + +def get_gdb_host(args: argparse.Namespace, local_pwndbg_root: Path) -> TestHost: + """ + Build a GDB-based test host. + """ + if args.nix: + # Use pwndbg, as build by nix. + gdb_path = local_pwndbg_root / "result" / "bin" / "pwndbg" + + if not gdb_path.exists(): + print("ERROR: No nix-compatible pwndbg found. Run nix build .#pwndbg-dev") + sys.exit(1) + elif args.group == Group.CROSS_ARCH_USER: + # Some systems don't ship 'gdb-multiarch', but support multiple + # architectures in their regular binaries. Try the regular GDB. + supports_arches = "py import os; archs = ['i386', 'aarch64', 'arm', 'mips', 'riscv', 'sparc']; os._exit(3) if len([arch for arch in archs if arch in gdb.architecture_names()]) == len(archs) else os._exit(2)" + + gdb_path_str = shutil.which("pwndbg") + if gdb_path_str is None: + print("ERROR: No 'pwndbg' executables in path") + sys.exit(1) + + result = subprocess.run([gdb_path_str, "-nx", "-ex", supports_arches], capture_output=True) + # GDB supports cross architecture targets + if result.returncode == 3: + gdb_path = Path(gdb_path_str) + else: + print("ERROR: 'pwndbg' does not support cross architecture targets") + sys.exit(1) + else: + # Use the regular system GDB. + gdb_path_str = shutil.which("pwndbg") + if gdb_path_str is None: + print("ERROR: No 'gdb' executable in path") + sys.exit(1) + gdb_path = Path(gdb_path_str) + + from host.gdb import GDBTestHost + + return GDBTestHost( + local_pwndbg_root, + local_pwndbg_root / args.group.library(), + local_pwndbg_root / args.group.binary_dir(), + gdb_path, + ) + + +def get_lldb_host(args: argparse.Namespace, local_pwndbg_root: Path) -> TestHost: + """ + Build an LLDB-based test host. + """ + if args.nix: + print("ERROR: Nix is currently not supported with driver LLDB") + sys.exit(1) + + from host.lldb import LLDBTestHost + + return LLDBTestHost( + local_pwndbg_root, + local_pwndbg_root / args.group.library(), + local_pwndbg_root / args.group.binary_dir(), + ) + + +class Group(Enum): + """ + Tests are divided into multiple groups. + """ + + GDB = "gdb" + LLDB = "lldb" + DBG = "dbg" + CROSS_ARCH_USER = "cross-arch-user" + + def __str__(self): + return self._value_ + + def library(self) -> Path: + """ + Subdirectory relative to the Pwndbg root containing the tests. + """ + match self: + case Group.GDB: + return Path("tests/library/gdb/") + case Group.LLDB: + return Path("tests/library/lldb/") + case Group.DBG: + return Path("tests/library/dbg/") + case Group.CROSS_ARCH_USER: + return Path("tests/library/qemu-user/") + case other: + raise AssertionError(f"group {other} is unaccounted for") + + def binary_dir(self) -> Path: + """ + Subdirectory relative to the Pwndbg root containing the required + binaries for a given test group. + """ + match self: + case Group.GDB | Group.LLDB | Group.DBG: + return Path("tests/binaries/host/") + case Group.CROSS_ARCH_USER: + return Path("tests/binaries/qemu-user/") + case other: + raise AssertionError(f"group {other} is unaccounted for") + + +class Driver(Enum): + GDB = "gdb" + LLDB = "lldb" + + def __str__(self): + return self._value_ + + def can_run(self, grp: Group) -> bool: + """ + Whether a given driver can run a given test group. + """ + match self: + case Driver.GDB: + match grp: + case Group.GDB: + return True + case Group.LLDB: + return False + case Group.DBG: + return True + case Group.CROSS_ARCH_USER: + return True + case Driver.LLDB: + match grp: + case Group.GDB: + return False + case Group.LLDB: + return True + case Group.DBG: + return True + case Group.CROSS_ARCH_USER: + return False + raise AssertionError(f"unaccounted for combination of driver '{self}' and group '{grp}'") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Run tests.") + parser.add_argument("-g", "--group", choices=list(Group), type=Group, required=True) + parser.add_argument( + "-d", + "--driver", + choices=list(Driver), + type=Driver, + required=True, + ) + parser.add_argument( + "-p", + "--pdb", + action="store_true", + help="enable pdb (Python debugger) post mortem debugger on failed tests", + ) + parser.add_argument("-c", "--cov", action="store_true", help="enable codecov") + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="display all test output instead of just failing test output", + ) + parser.add_argument( + "-s", "--serial", action="store_true", help="run tests one at a time instead of in parallel" + ) + parser.add_argument( + "--nix", + action="store_true", + help="run tests using built for nix environment", + ) + parser.add_argument( + "--collect-only", + action="store_true", + help="only show the output of test collection, don't run any tests", + ) + parser.add_argument( + "test_name_filter", nargs="?", help="run only tests that match the regex", default=".*" + ) + return parser.parse_args() + + +def ensure_zig_path(local_pwndbg_root: Path): + if "ZIGPATH" not in os.environ: + # If ZIGPATH is not set, set it to $pwd/.zig + # In Docker environment this should by default be set to /opt/zig + os.environ["ZIGPATH"] = str(local_pwndbg_root / ".zig") + print(f'[+] ZIGPATH set to {os.environ["ZIGPATH"]}') + + +def make_all(path: Path, jobs: int = multiprocessing.cpu_count()): + """ + Build the binaries for a given test group. + """ + if not path.exists(): + raise ValueError(f"given non-existent path {path}") + + print(f"[+] make -C {path} -j{jobs} all") + try: + subprocess.check_call(["make", f"-j{jobs}", "all"], cwd=str(path)) + except subprocess.CalledProcessError: + sys.exit(1) + + +class TestStats: + def __init__(self): + self.total_duration = 0 + self.fail_tests = 0 + self.pass_tests = 0 + self.skip_tests = 0 + self.fail_tests_names = [] + + def handle_test_result(self, case: str, test_result: TestResult, verbose: bool): + match test_result.status: + case TestStatus.FAILED: + self.fail_tests += 1 + self.fail_tests_names.append(case) + case TestStatus.PASSED | TestStatus.XFAIL: + self.pass_tests += 1 + case TestStatus.XPASS: + # Technically this is a failure, but Pwndbg does not consider it so. + self.pass_tests += 1 + case TestStatus.SKIPPED: + self.skip_tests += 1 + # skip_reason = " " + ( + # process.stdout.split(test_status)[1].split("\n\n\x1b[33m")[0].replace("\n", "") + # ) + + self.total_duration += test_result.duration_ns + + print( + f"{case:<100} {test_result.status} {test_result.duration_ns / 1000000000:.2f}s {test_result.context if test_result.context else ''}" + ) + + # Only show the output of failed tests unless the verbose flag was used + if verbose or test_result.status == TestStatus.FAILED: + print("") + print(test_result.stderr) + print(test_result.stdout) + if __name__ == "__main__": - host.main() + main()