Port tests to debugger-agnostic test group: First Batch (#3165)

* Add supporting functionality

* Add initial ported test

* Fix 0

* Fix 1

* Add dbg/test_command_plist.py

* Fix 0

* Fix 1

* Fix 2

* Fix 3

* Add more tests

* Fix 0

* Add more tests

* Add even more tests

* Fix 0

* Adapt tests to module structure

* Remove coverage warning in LLDB test driver

* Remove `*_before_binary_start` tests, as they are flaky in LLDB

* Skip `test_command_break_if_x64` if not in GDB

* Update tests/host/lldb/launch_guest.py

Co-authored-by: patryk4815 <bux.patryk@gmail.com>

* Update tests/host/lldb/launch_guest.py

* Import `shlex` in `launch_guest`

---------

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
Co-authored-by: patryk4815 <bux.patryk@gmail.com>
pull/3196/head
Matt. 4 months ago committed by GitHub
parent 9aabc0b83d
commit 15524bcfaa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -347,7 +347,7 @@ def search(
gdb.Breakpoint("*%#x" % address, temporary=False)
else:
print(
f"breakpoints are not supported outside of GDB yet, would be set at {address:#x}"
f"Breakpoints are not supported outside of GDB yet, would be set at {address:#x}"
)
if not trunc_out or i < 20:

@ -191,6 +191,15 @@ class LLDBFrame(pwndbg.dbg_mod.Frame):
# event.
self.proc.dbg._trigger_event(pwndbg.dbg_mod.EventType.REGISTER_CHANGED)
# If we set the stack pointer, the inner object might have been invalidated, try
# to restore it, as it should still be the selected frame.
if (
name in (reg_sets[pwndbg.aglib.arch.name].frame, "sp")
and not self.inner.IsValid()
):
self.inner = thread.GetSelectedFrame()
assert self.inner.GetSP() == val
# Make sure we've caught and handled the special cases in which the inner object
# might be invalidated by the command.
assert self.inner.IsValid()

@ -46,6 +46,8 @@ import sys
import threading
from contextlib import contextmanager
from io import BytesIO
from io import TextIOBase
from io import TextIOWrapper
from typing import Any
from typing import Awaitable
from typing import BinaryIO
@ -369,7 +371,7 @@ def run(
last_exc = asyncio.CancelledError()
continue
if not exec_repl_command(line, sys.stdout.buffer, dbg, driver, relay):
if not exec_repl_command(line, sys.stdout, dbg, driver, relay):
last_exc = asyncio.CancelledError()
continue
@ -385,13 +387,11 @@ def run(
print(f"{PROMPT}{action._command}")
if action._capture:
with BytesIO() as output:
with TextIOWrapper(BytesIO(), write_through=True) as output:
should_continue = exec_repl_command(action._command, output, dbg, driver, relay)
last_result = output.getvalue()
last_result = output.buffer.getvalue()
else:
should_continue = exec_repl_command(
action._command, sys.stdout.buffer, dbg, driver, relay
)
should_continue = exec_repl_command(action._command, sys.stdout, dbg, driver, relay)
if not should_continue:
last_exc = asyncio.CancelledError()
@ -400,7 +400,7 @@ def run(
def exec_repl_command(
line: str,
lldb_out_target: BinaryIO,
output_to,
dbg: LLDB,
driver: ProcessDriver,
relay: EventRelay,
@ -408,6 +408,35 @@ def exec_repl_command(
"""
Parses and runs the given command, returning whether the event loop should continue.
"""
stdout = None
lldb_out = None
try:
stdout = sys.stdout
lldb_out = dbg.debugger.GetOutputFile()
sys.stdout = output_to
dbg.debugger.SetOutputFile(
lldb.SBFile.Create(output_to, borrow=True, force_io_methods=True)
)
return _exec_repl_command(line, output_to.buffer, dbg, driver, relay)
finally:
if stdout is not None:
sys.stdout = stdout
if lldb_out is not None:
dbg.debugger.SetOutputFile(lldb_out)
def _exec_repl_command(
line: str,
lldb_out_target: BinaryIO,
dbg: LLDB,
driver: ProcessDriver,
relay: EventRelay,
) -> bool:
"""
Implementation for exec_repl_command
"""
bits = lex_args(line)

@ -149,13 +149,45 @@ class TestHost:
class Controller:
def launch(self, binary: Path) -> Awaitable[None]:
def launch(self, binary: Path, args: List[str] = []) -> Awaitable[None]:
"""
Launch the binary with the given path, relative to the binaries folder
for the calling test.
"""
raise NotImplementedError()
def execute_and_capture(self, command: str) -> Awaitable[str]:
"""
Execute the given command and capture its output.
While this method is capable of executing any command supported by the
debugger, in with keeping tests debugger-agnostic, is should only ever
be used to invoke Pwndbg commands.
"""
raise NotImplementedError()
def execute(self, command: str) -> Awaitable[None]:
"""
Execute the given command.
While this method is capable of executing any command supported by the
debugger, in with keeping tests debugger-agnostic, is should only ever
be used to invoke Pwndbg commands.
"""
raise NotImplementedError()
def cont(self) -> Awaitable[None]:
"""
Resume execution until the next stop event.
"""
raise NotImplementedError()
def step_instruction(self) -> Awaitable[None]:
"""
Perform a step in the scope of a single instruction.
"""
raise NotImplementedError()
def start(controller: Callable[[Controller], Coroutine[Any, Any, None]]) -> None:
"""

@ -6,6 +6,7 @@ from pathlib import Path
from typing import Any
from typing import Callable
from typing import Coroutine
from typing import List
import coverage
import gdb
@ -15,7 +16,7 @@ from ... import host
class _GDBController(host.Controller):
async def launch(self, binary_path: Path) -> None:
async def launch(self, binary_path: Path, args: List[str] = []) -> None:
"""
Launch the given binary.
@ -30,6 +31,18 @@ class _GDBController(host.Controller):
os.environ["COLUMNS"] = "80"
gdb.execute("starti " + " ".join(args))
async def cont(self) -> None:
gdb.execute("continue")
async def execute(self, command: str) -> None:
gdb.execute(command)
async def execute_and_capture(self, command: str) -> str:
return gdb.execute(command, to_string=True)
async def step_instruction(self) -> None:
gdb.execute("stepi")
def _start(outer: Callable[[host.Controller], Coroutine[Any, Any, None]]) -> None:
# The GDB controller is entirely synchronous, so keep advancing the

@ -67,12 +67,6 @@ class LLDBTestHost(TestHost):
return [f"tests/{name}" for name in names]
def run(self, case: str, coverage_out: Path | None, interactive: bool) -> TestResult:
if coverage_out is not None:
# Do before PR is merged.
#
# TODO: Add CodeCov for the LLDB test driver
print("[-] Warning: LLDB does not yet support code coverage")
beg = time.monotonic_ns()
result = self._launch("RUN-TEST", case, not interactive, interactive)
end = time.monotonic_ns()

@ -1,6 +1,7 @@
from __future__ import annotations
import os
import shlex
import sys
from enum import Enum
from pathlib import Path
@ -25,15 +26,33 @@ async def _run(ctrl: Any, outer: Callable[..., Coroutine[Any, Any, None]]) -> No
def __init__(self, pc: PwndbgController):
self.pc = pc
async def launch(self, binary: Path) -> None:
async def launch(self, binary: Path, args: List[str] = []) -> None:
await self.pc.execute(f"target create {binary}")
await self.pc.execute("process launch -s")
await self.pc.execute(
"process launch -s -- " + " ".join(shlex.quote(arg) for arg in args)
)
async def cont(self) -> None:
await self.pc.execute("continue")
async def execute(self, command: str) -> None:
await self.pc.execute(command)
async def execute_and_capture(self, command: str) -> str:
return (await self.pc.execute_and_capture(command)).decode(
"utf-8", errors="surrogateescape"
)
async def step_instruction(self) -> None:
await self.pc.execute("thread step-inst")
await outer(_LLDBController(ctrl))
def run(pytest_args: List[str], pytest_plugins: List[Any] | None) -> int:
# The import path is set up before this function is called.
os.environ["PWNDBG_DISABLE_COLORS"] = "1"
from pwndbginit import pwndbg_lldb
from ... import host
@ -41,7 +60,7 @@ def run(pytest_args: List[str], pytest_plugins: List[Any] | None) -> int:
# Replace host.start with a proper implementation of the start command.
def _start(outer: Callable[[Controller], Coroutine[Any, Any, None]]) -> None:
pwndbg_lldb.launch(_run, outer, debug=True)
pwndbg_lldb.launch(_run, outer, debug=False)
host.start = _start

@ -35,3 +35,33 @@ def pwndbg_test[**T](
def get_binary(name: str) -> str:
return os.path.join(BINARIES_PATH, name)
def break_at_sym(sym: str) -> None:
import pwndbg
from pwndbg.dbg import BreakpointLocation
inf = pwndbg.dbg.selected_inferior()
addr = inf.lookup_symbol(sym)
inf.break_at(BreakpointLocation(int(addr)))
async def launch_to(ctrl: Controller, target: str, sym: str) -> None:
import pwndbg
import pwndbg.aglib.regs
from pwndbg.dbg import BreakpointLocation
await ctrl.launch(target)
inf = pwndbg.dbg.selected_inferior()
addr = inf.lookup_symbol(sym)
if pwndbg.aglib.regs.pc != int(addr):
inf.break_at(BreakpointLocation(int(addr)))
await ctrl.cont()
def get_expr(expr: str):
import pwndbg
ctx = pwndbg.dbg.selected_frame() or pwndbg.dbg.selected_inferior()
return ctx.evaluate_expression(expr)

@ -0,0 +1,9 @@
from __future__ import annotations
from pwndbg.aglib.arch import registered_architectures
from pwndbg.lib.arch import PWNDBG_SUPPORTED_ARCHITECTURES
def test_all_pwndbg_architectures_are_defined():
for arch in PWNDBG_SUPPORTED_ARCHITECTURES:
assert arch in registered_architectures

@ -0,0 +1,107 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import pwndbg_test
BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_cache_single_value(ctrl: Controller) -> None:
from pwndbg.lib import cache
x = 0
@cache.cache_until("stop")
def foo():
nonlocal x
x += 1
# Typically its bad idea to cache a non-local/global variable
# but we need this for testing purposes :)
return x
assert foo() == x == 1
# The function result should now be pulled from cache
# so that `x` should not change as well
assert foo() == x == 1
foo.cache.clear()
assert foo() == x == 2
assert foo() == x == 2
# Check if cache is properly cleared on a stop event
await ctrl.launch(BINARY)
assert foo() == x == 3
assert foo() == x == 3
@pwndbg_test
async def test_cache_args_kwargs_properly(ctrl: Controller) -> None:
from pwndbg.lib import cache
x = 0
@cache.cache_until("stop")
def foo(arg0, *args, **kwargs):
nonlocal x
x += 1
# Typically its bad idea to cache a non-local/global variable
# but we need this for testing purposes :)
return x, arg0, args, kwargs
assert foo("abc") == (1, "abc", (), {}) and x == 1
assert foo("abc") == (1, "abc", (), {}) and x == 1
assert foo(100, 200) == (2, 100, (200,), {}) and x == 2
assert foo(100, 200) == (2, 100, (200,), {}) and x == 2
assert foo("abc") == (1, "abc", (), {}) and x == 2
assert foo(100, 200) == (2, 100, (200,), {}) and x == 2
foo.cache.clear()
assert foo("abc") == (3, "abc", (), {}) and x == 3
assert foo("abc") == (3, "abc", (), {}) and x == 3
assert foo(100, 200) == (4, 100, (200,), {}) and x == 4
assert foo(100, 200) == (4, 100, (200,), {}) and x == 4
# Check if cache is properly cleared on a stop event
await ctrl.launch(BINARY)
assert foo("abc") == (5, "abc", (), {}) and x == 5
assert foo(100, 200) == (6, 100, (200,), {}) and x == 6
@pwndbg_test
async def test_cache_clear_has_priority(ctrl: Controller) -> None:
import pwndbg
from pwndbg.dbg import EventType
from pwndbg.lib import cache
actions = []
@pwndbg.dbg.event_handler(EventType.STOP)
def on_stop():
actions.append("on_stop")
# test to make sure event handlers don't have a stale cache
foo()
@cache.cache_until("stop")
def foo():
actions.append("foo")
foo()
foo()
assert actions == ["foo"]
await ctrl.launch(BINARY)
assert actions == ["foo", "on_stop", "foo"]
foo()
foo()
assert actions == ["foo", "on_stop", "foo"]

@ -0,0 +1,21 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import launch_to
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_callstack_readable(ctrl: Controller) -> None:
import pwndbg.aglib.memory
import pwndbg.aglib.stack
await launch_to(ctrl, REFERENCE_BINARY, "break_here")
addresses = pwndbg.aglib.stack.callstack()
assert len(addresses) > 0
assert all(pwndbg.aglib.memory.is_readable_address(address) for address in addresses)

@ -0,0 +1,48 @@
from __future__ import annotations
import pytest
from ....host import Controller
from . import break_at_sym
from . import get_binary
from . import launch_to
from . import pwndbg_test
CONDBR_X64_BINARY = get_binary("conditional_branch_breakpoints_x64.out")
@pwndbg_test
@pytest.mark.parametrize("binary", [CONDBR_X64_BINARY], ids=["x86-64"])
async def test_command_break_if_x64(ctrl: Controller, binary: str) -> None:
"""
Tests the chain for a non-nested linked list
"""
import pwndbg
if not pwndbg.dbg.is_gdblib_available():
pytest.skip("Not yet available outside GDB")
return
await launch_to(ctrl, binary, "break_here")
break_at_sym("break_here0")
break_at_sym("break_here1")
await ctrl.execute("break-if-taken branch0")
await ctrl.execute("break-if-taken branch1")
await ctrl.execute("break-if-not-taken branch2")
await ctrl.execute("break-if-not-taken branch3")
await continue_and_test_pc(ctrl, "branch0")
await continue_and_test_pc(ctrl, "break_here0")
await continue_and_test_pc(ctrl, "break_here1")
await continue_and_test_pc(ctrl, "branch3")
async def continue_and_test_pc(ctrl: Controller, stop_label: str) -> None:
import pwndbg
await ctrl.cont()
address = int(pwndbg.dbg.selected_inferior().lookup_symbol(stop_label))
assert pwndbg.aglib.regs.pc == address

@ -0,0 +1,60 @@
from __future__ import annotations
import pytest
from . import get_binary
from . import launch_to
from . import pwndbg_test
CANARY_X86_64_BINARY = get_binary("canary.x86-64.out")
CANARY_I386_BINARY = get_binary("canary.i386.out")
@pwndbg_test
@pytest.mark.integration
@pytest.mark.parametrize(
"binary, reg_name, skips",
[
(CANARY_X86_64_BINARY, "rax", 0),
(CANARY_I386_BINARY, "eax", 2),
],
ids=["x86-64", "i386"],
)
async def test_command_canary(ctrl: Controller, binary: str, reg_name: str, skips: int) -> None:
"""
Tests the canary command for x86-64 and i386 architectures
"""
import pwndbg
import pwndbg.aglib.memory
import pwndbg.aglib.regs
await launch_to(ctrl, binary, "main")
# The instruction that loads the canary is at the start of the function,
# but it it not necessarily at any given fixed position, scan for it.
initial_reg = getattr(pwndbg.aglib.regs, reg_name)
while True:
register = getattr(pwndbg.aglib.regs, reg_name)
if register != initial_reg:
if skips == 0:
break
skips = skips - 1
initial_reg = register
await ctrl.step_instruction()
canary_value, at_random = pwndbg.commands.canary.canary_value()
raw = pwndbg.aglib.memory.read_pointer_width(at_random)
mask = pwndbg.aglib.arch.ptrmask ^ 0xFF
masked_raw = raw & mask
tls_addr = pwndbg.commands.canary.find_tls_canary_addr()
raw_tls = pwndbg.aglib.memory.read_pointer_width(tls_addr) & mask
# Check AT_RANDOM
assert masked_raw == canary_value
# Check TLS Canary
assert raw_tls == canary_value
# Check Canary
assert register == canary_value

@ -0,0 +1,53 @@
from __future__ import annotations
import re
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_config(ctrl: Controller) -> None:
await ctrl.launch(REFERENCE_BINARY)
await ctrl.execute("set context-disasm-lines 8")
assert "8 (10)" in (await ctrl.execute_and_capture("config"))
await ctrl.execute("set banner-separator #")
# \u2500 is ─
assert "'#' ('\u2500')" in (await ctrl.execute_and_capture("theme"))
await ctrl.execute("set global-max-fast 0x80")
assert "'0x80' ('0')" in (await ctrl.execute_and_capture("heap-config"))
@pwndbg_test
async def test_config_filtering(ctrl: Controller) -> None:
await ctrl.launch(REFERENCE_BINARY)
out = (await ctrl.execute_and_capture("config context-disasm-lines")).splitlines()
assert re.match(r"Name\s+Documentation\s+Value\s+\(Default\)", out[0])
assert re.match(r"-+", out[1])
assert re.match(
r"context-disasm-lines\s+number of additional lines to print in the disasm context\s+10",
out[2],
)
assert (
out[3]
== "You can set a config variable with `set <config-var> <value>`, and read more about it with `help set <config-var>`."
)
assert (
out[4]
== "You can generate a configuration file using `configfile` - then put it in your .gdbinit after initializing pwndbg."
)
@pwndbg_test
async def test_config_filtering_missing(ctrl: Controller):
await ctrl.launch(REFERENCE_BINARY)
out = await ctrl.execute_and_capture("config asdasdasdasd")
assert out == 'No config parameter found with filter "asdasdasdasd"\n'

@ -0,0 +1,102 @@
from __future__ import annotations
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_command_cyclic_value(ctrl: Controller) -> None:
"""
Tests lookup on a constant value
"""
from pwnlib.util.cyclic import cyclic
import pwndbg.aglib.arch
await ctrl.launch(REFERENCE_BINARY)
ptr_size = pwndbg.aglib.arch.ptrsize
test_offset = 37
pattern = cyclic(length=80, n=ptr_size)
val = int.from_bytes(pattern[test_offset : test_offset + ptr_size], pwndbg.aglib.arch.endian)
out = await ctrl.execute_and_capture(f"cyclic -l {hex(val)}")
assert out == (
"Finding cyclic pattern of 8 bytes: b'aaafaaaa' (hex: 0x6161616661616161)\n"
"Found at offset 37\n"
)
@pwndbg_test
async def test_command_cyclic_register(ctrl: Controller) -> None:
"""
Tests lookup on a register
"""
from pwnlib.util.cyclic import cyclic
import pwndbg.aglib.arch
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
ptr_size = pwndbg.aglib.arch.ptrsize
test_offset = 45
pattern = cyclic(length=80, n=ptr_size)
pwndbg.aglib.regs.rdi = int.from_bytes(
pattern[test_offset : test_offset + ptr_size], pwndbg.aglib.arch.endian
)
out = await ctrl.execute_and_capture("cyclic -l $rdi")
assert out == (
"Finding cyclic pattern of 8 bytes: b'aaagaaaa' (hex: 0x6161616761616161)\n"
"Found at offset 45\n"
)
@pwndbg_test
async def test_command_cyclic_address(ctrl: Controller) -> None:
"""
Tests lookup on a memory address
"""
from pwnlib.util.cyclic import cyclic
import pwndbg.aglib.arch
import pwndbg.aglib.memory
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
addr = pwndbg.aglib.regs.rsp
ptr_size = pwndbg.aglib.arch.ptrsize
test_offset = 48
pattern = cyclic(length=80, n=ptr_size)
pwndbg.aglib.memory.write(addr, pattern)
out = await ctrl.execute_and_capture(f"cyclic -l '*(unsigned long*){hex(addr + test_offset)}'")
assert out == (
"Finding cyclic pattern of 8 bytes: b'gaaaaaaa' (hex: 0x6761616161616161)\n"
"Found at offset 48\n"
)
@pwndbg_test
async def test_command_cyclic_wrong_alphabet(ctrl: Controller) -> None:
await ctrl.launch(REFERENCE_BINARY)
out = await ctrl.execute_and_capture("cyclic -l 1234")
assert out == (
"Finding cyclic pattern of 8 bytes: b'\\xd2\\x04\\x00\\x00\\x00\\x00\\x00\\x00' (hex: 0xd204000000000000)\n"
"Pattern contains characters not present in the alphabet\n"
)
@pwndbg_test
async def test_command_cyclic_wrong_length(ctrl: Controller) -> None:
await ctrl.launch(REFERENCE_BINARY)
out = await ctrl.execute_and_capture("cyclic -l qwerty")
assert out == (
"Lookup pattern must be 8 bytes (use `-n <length>` to lookup pattern of different length)\n"
)

@ -0,0 +1,43 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_command_distance(ctrl: Controller):
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
# Test against regs
rsp = pwndbg.aglib.regs.rsp
result = await ctrl.execute_and_capture("distance $rsp $rsp+0x10")
assert result == f"{rsp:#x}->{rsp + 0x10:#x} is 0x10 bytes (0x2 words)\n"
# Test if it works with symbols
rip = pwndbg.aglib.regs.rip
main = pwndbg.aglib.symbol.lookup_symbol_addr("main")
break_here = pwndbg.aglib.symbol.lookup_symbol_addr("break_here")
diff = break_here - main
# Test symbol (function address) and its proper &symbol address
for sym1 in ("main", "&main"):
for sym2 in ("break_here", "&break_here"):
result = await ctrl.execute_and_capture(f"distance {sym1} {sym2}")
assert result == f"{main:#x}->{break_here:#x} is {diff:#x} bytes ({diff//8:#x} words)\n"
# Test if it works with reg + symbol
diff = break_here - rip
result = await ctrl.execute_and_capture("distance $rip &break_here")
assert result == f"{rip:#x}->{break_here:#x} is {diff:#x} bytes ({diff//8:#x} words)\n"
# Test if it works with symbol + reg
diff = rip - break_here
result = await ctrl.execute_and_capture("distance &break_here $rip")
assert result == f"{break_here:#x}->{rip:#x} is {diff:#x} bytes ({diff//8:#x} words)\n"

@ -0,0 +1,42 @@
from __future__ import annotations
import re
from ....host import Controller
from . import get_binary
from . import launch_to
from . import pwndbg_test
HEAP_MALLOC_CHUNK = get_binary("heap_malloc_chunk.out")
@pwndbg_test
async def test_command_dt_works_with_address(ctrl: Controller) -> None:
await launch_to(ctrl, HEAP_MALLOC_CHUNK, "break_here")
tcache = await ctrl.execute_and_capture("print tcache")
tcache_addr = tcache.split()[-1]
out = await ctrl.execute_and_capture(f'dt "struct tcache_perthread_struct" {tcache_addr}')
exp_regex = (
"struct tcache_perthread_struct @ 0x[0-9a-f]+\n"
" 0x[0-9a-f]+ \\+0x0000 counts +: +.*\\{([0-9]+, [0-9]+ <repeats 63 times>|(\\s*\\[[0-9]+\\] = [0-9]){64}\\s*)\\}\n"
" 0x[0-9a-f]+ \\+0x[0-9a-f]{4} entries +: +.*\\{(0x[0-9a-f]+, 0x[0-9a-f]+ <repeats 63 times>|(\\s*\\[[0-9]+\\] = (0x[0-9a-f]+|NULL)){64}\\s*)\\}"
)
assert re.match(exp_regex, out)
@pwndbg_test
async def test_command_dt_works_with_no_address(ctrl: Controller) -> None:
await launch_to(ctrl, HEAP_MALLOC_CHUNK, "break_here")
out = await ctrl.execute_and_capture('dt "struct tcache_perthread_struct"')
exp_regex = (
"struct tcache_perthread_struct\n"
" \\+0x0000 counts +: +uint16_t ?\\[64\\]\n"
" \\+0x[0-9a-f]{4} entries +: +tcache_entry ?\\*\\[64\\]\n"
)
assert re.match(exp_regex, out)

@ -0,0 +1,49 @@
from __future__ import annotations
from ....host import Controller
from . import break_at_sym
from . import get_binary
from . import pwndbg_test
# We use the heap_vis binary as it enforces pthreads and so will have TLS on all distros
REFERENCE_BINARY = get_binary("heap_vis.out")
@pwndbg_test
async def test_command_errno(ctrl: Controller) -> None:
"""
Tests the errno command display
"""
await ctrl.launch(REFERENCE_BINARY)
# Since 'ctrl.launch' stops on the very first instruction, 'errno' might not
# yet be available, depending on the system. If it is available, it should
# be zero.
result = "".join((await ctrl.execute_and_capture("errno")).splitlines())
assert (
result
== "Could not determine error code automatically: neither `errno` nor `__errno_location` symbols were provided (perhaps libc.so hasn't been not loaded yet?)"
) or (result == "Errno 0: OK")
break_at_sym("main")
await ctrl.cont()
result = await ctrl.execute_and_capture("errno")
assert result == "Errno 0: OK\n"
await ctrl.execute("p *(int*)&errno=11")
result = await ctrl.execute_and_capture("errno")
assert result == "Errno 11: EAGAIN\n"
await ctrl.execute("p *(int*)&errno=111")
result = await ctrl.execute_and_capture("errno")
assert result == "Errno 111: ECONNREFUSED\n"
result = await ctrl.execute_and_capture("errno 8")
assert result == "Errno 8: ENOEXEC\n"
result = await ctrl.execute_and_capture("errno 123")
assert result == "Errno 123: ENOMEDIUM\n"
result = await ctrl.execute_and_capture("errno 250")
assert result == "Errno 250: Unknown error code\n"

@ -0,0 +1,35 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_flags_command(ctrl: Controller) -> None:
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
old_eflags = pwndbg.aglib.regs.eflags
# Verify CF is not set
assert old_eflags & 0x1 == 0
await ctrl.execute("setflag cf 1")
# Verify CF is set and no other flags have changed
assert (old_eflags | 1) == pwndbg.aglib.regs.eflags
await ctrl.execute("setflag cf 0")
# Verify CF is not set and no other flags have changed
assert old_eflags == pwndbg.aglib.regs.eflags
# Test setting an invalid value
await ctrl.execute("setflag cf 2")
# Verify no flags have changed
assert old_eflags == pwndbg.aglib.regs.eflags

@ -0,0 +1,29 @@
from __future__ import annotations
from ....host import Controller
from . import break_at_sym
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_command_libcinfo(ctrl: Controller) -> None:
"""
Tests the libcinfo command
"""
await ctrl.launch(REFERENCE_BINARY)
result = await ctrl.execute_and_capture("libcinfo")
assert result == "Could not determine libc version.\n"
# Continue until main, so the libc is actually loaded
break_at_sym("main")
await ctrl.cont()
result = (await ctrl.execute_and_capture("libcinfo")).splitlines()
assert len(result) == 2
assert result[0].startswith("libc version: ")
assert result[1].startswith("libc source link: https://ftp.gnu.org/gnu/libc/glibc-")
assert result[1].endswith(".tar.gz")

@ -0,0 +1,311 @@
from __future__ import annotations
import re
from . import break_at_sym
from . import get_binary
from . import get_expr
from . import pwndbg_test
LINKED_LISTS_BINARY = get_binary("linked-lists.out")
async def startup(ctrl: Controller):
await ctrl.launch(LINKED_LISTS_BINARY)
break_at_sym("break_here")
await ctrl.cont()
await ctrl.execute("up")
@pwndbg_test
async def test_command_plist_dereference_limit_change_has_impact_on_plist(ctrl: Controller):
"""
Tests the plist command with different dereference limits
"""
await startup(ctrl)
await ctrl.execute("set dereference-limit 5")
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.*{\\s*
value = 0,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_b>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_b>:.*{\\s*
value = 1,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_c>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_c>:.*{\\s*
value = 2,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_d>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_d>:.*{\\s*
value = 3,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_e>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_e>:.*{\\s*
value = 4,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_f>)?,?\\s*
}\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next")
print(result_str)
assert expected_out.match(result_str) is not None
await ctrl.execute("set dereference-limit 1")
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.*{\\s*
value = 0,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_b>)?,?\\s*
}\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_unreached_sentinel_does_not_cause_null_deference(ctrl: Controller):
"""
Tests the plist command with a sentinel set to an address that is not reached does
not try to dereference zero
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.* 0\\s*
0[xX][0-9a-fA-F]+ <node_b>:.* 1\\s*
0[xX][0-9a-fA-F]+ <node_c>:.* 2\\s*
0[xX][0-9a-fA-F]+ <node_d>:.* 3\\s*
0[xX][0-9a-fA-F]+ <node_e>:.* 4\\s*
\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next --sentinel 1 -f value")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_invalid_address_deference_is_displayed_properly(ctrl: Controller):
"""
Tests that the error message is displayed nicely when an incorrect address gets
deferenced
"""
await startup(ctrl)
await ctrl.execute("p node_a->next = (node*) 0x1234")
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.* 0\\s*
Cannot dereference 0x1234 for list link #2:.*\\s*
Is the linked list corrupted or is the sentinel value wrong\\?\\s*
\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next -f value")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_flat_with_offset(ctrl: Controller):
"""
Tests the plist for a non-nested linked list with an arbitrary offset value
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_d>:.*{\\s*
value = 3,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_e>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_e>:.*{\\s*
value = 4,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_f>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_f>:.*{\\s*
value = 5,?\\s*
next = (0x0|NULL),?\\s*
}\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next -o 3")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_flat_with_count(ctrl: Controller):
"""
Tests the plist for a non-nested linked list with an arbitrary count value
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.*{\\s*
value = 0,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_b>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_b>:.*{\\s*
value = 1,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_c>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_c>:.*{\\s*
value = 2,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_d>)?,?\\s*
}\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next -c 3")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_flat_no_flags(ctrl: Controller):
"""
Tests the plist for a non-nested linked list
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.*{\\s*
value = 0,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_b>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_b>:.*{\\s*
value = 1,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_c>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_c>:.*{\\s*
value = 2,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_d>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_d>:.*{\\s*
value = 3,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_e>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_e>:.*{\\s*
value = 4,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_f>)?,?\\s*
}\
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_flat_field(ctrl: Controller):
"""
Tests the plist command for a non-nested linked list with field flag
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.* 0\\s*
0[xX][0-9a-fA-F]+ <node_b>:.* 1\\s*
0[xX][0-9a-fA-F]+ <node_c>:.* 2\\s*
"""
)
result_str = await ctrl.execute_and_capture("plist node_a next -f value")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_flat_sentinel(ctrl: Controller):
"""
Tests the plist command for a non-nested linked list with field flag
"""
await startup(ctrl)
sentinel = int(get_expr("node_c").address)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <node_a>:.*{\\s*
value = 0,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_b>)?,?\\s*
}\\s*
0[xX][0-9a-fA-F]+ <node_b>:.*{\\s*
value = 1,?\\s*
next = 0[xX][0-9a-fA-F]+( <node_c>)?,?\\s*
}"""
)
result_str = await ctrl.execute_and_capture(f"plist node_a next -s {sentinel}")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_nested_direct(ctrl: Controller):
"""
Tests the plist for a nested linked list pointing to the outer structure
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <inner_b_node_a>:.*{\\s*
value = 0,?\\s*
inner = {\\s*
next = 0[xX][0-9a-fA-F]+( <inner_b_node_b>)?,?\\s*
}\\s*
}\\s*
0[xX][0-9a-fA-F]+ <inner_b_node_b>:.*{\\s*
value = 1,?\\s*
inner = {\\s*
next = 0[xX][0-9a-fA-F]+( <inner_b_node_c>)?,?\\s*
}\\s*
}\\s*
0[xX][0-9a-fA-F]+ <inner_b_node_c>:.*{\\s*
value = 2,?\\s*
inner = {\\s*
next = (0x0|NULL),?\\s*
}\\s*
}"""
)
result_str = await ctrl.execute_and_capture("plist inner_b_node_a -i inner next")
assert expected_out.match(result_str) is not None
@pwndbg_test
async def test_command_plist_nested_indirect(ctrl: Controller):
"""
Tests the plist for a nested linked list pointing to the inner structure
"""
await startup(ctrl)
expected_out = re.compile(
"""\
0[xX][0-9a-fA-F]+ <inner_a_node_a>:.*{\\s*
value = 0,?\\s*
inner = {\\s*
next = 0[xX][0-9a-fA-F]+( <inner_a_node_b\\+8>)?,?\\s*
}\\s*
}\\s*
0[xX][0-9a-fA-F]+ <inner_a_node_b>:.*{\\s*
value = 1,?\\s*
inner = {\\s*
next = 0[xX][0-9a-fA-F]+( <inner_a_node_c\\+8>)?,?\\s*
}\\s*
}\\s*
0[xX][0-9a-fA-F]+ <inner_a_node_c>:.*{\\s*
value = 2,?\\s*
inner = {\\s*
next = (0x0|NULL),?\\s*
}\\s*
}"""
)
result_str = await ctrl.execute_and_capture("plist inner_a_node_a -i inner next")
assert expected_out.match(result_str) is not None

@ -0,0 +1,72 @@
from __future__ import annotations
import socket
import threading
import time
import pytest
from ....host import Controller
from . import break_at_sym
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY_NET = get_binary("reference-binary-net.out")
class TCPServerThread(threading.Thread):
def __init__(self, *, ip: str, port: int):
super().__init__(daemon=True)
self.sock = socket.socket(
socket.AF_INET6 if ":" in ip else socket.AF_INET, socket.SOCK_STREAM
)
try:
self.sock.bind((ip, port))
except OSError:
pytest.skip(f"Could not bind to {ip}:{port}.")
self.port = self.sock.getsockname()[1]
self.sock.listen(1)
def stop(self):
self.sock.close()
def run(self):
try:
# Accept one conn and sleep
conn, addr = self.sock.accept()
while True:
time.sleep(1)
except OSError:
pass # Socket closed
@pwndbg_test
@pytest.mark.parametrize("ip_connect", ["127.0.0.1", "::1"])
async def test_command_procinfo_net(ctrl: Controller, ip_connect: str) -> None:
import pwndbg.aglib.proc
# Listen tcp server
server = TCPServerThread(ip=ip_connect, port=0)
server.start()
await ctrl.launch(REFERENCE_BINARY_NET, args=[ip_connect, str(server.port)])
bin_path = pwndbg.aglib.proc.exe
pid = str(pwndbg.aglib.proc.pid)
break_at_sym("break_here")
await ctrl.cont()
result = await ctrl.execute_and_capture("procinfo")
res_list = result.split("\n")
assert bin_path in res_list[0]
assert pid in res_list[3]
if ":" in ip_connect:
assert f"[{ip_connect}]:{server.port}" in result
else:
assert f"{ip_connect}:{server.port}" in result
# Close tcp server
server.stop()

@ -0,0 +1,220 @@
from __future__ import annotations
import re
from ....host import Controller
from . import get_binary
from . import launch_to
from . import pwndbg_test
SEARCH_BINARY = get_binary("search_memory.out")
SEARCH_PATTERN = 0xD00DBEEF
SEARCH_PATTERN2 = 0xABCDEF1234567890
@pwndbg_test
async def test_command_search_literal(ctrl: Controller) -> None:
"""
Searches for a string literal in a few different ways
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
# Perform three equivalent searches, and chop off the first line of verbosity.
result0 = (await ctrl.execute_and_capture("search -t bytes Hello!")).splitlines()[1:]
result1 = (await ctrl.execute_and_capture("search -t bytes -x 48656c6c6f21")).splitlines()[1:]
result2 = (await ctrl.execute_and_capture("search -t string Hello!")).splitlines()[1:]
assert result0 == result1
assert result1 == result2
for line in result0:
assert re.match(".* .* 0x216f6c6c6548 /\\* 'Hello!' \\*/", line) is not None
@pwndbg_test
async def test_command_search_limit_single_page(ctrl: Cotnroller) -> None:
"""
Tests simple search limit for single memory page
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
search_limit = 10
result_str = await ctrl.execute_and_capture(
f"search --dword {SEARCH_PATTERN} -l {search_limit} -w",
)
result_count = 0
result_value = None
for line in result_str.split("\n"):
if line.startswith("[anon_"):
if not result_value:
result_value = line.split(" ")[2]
result_count += 1
assert result_count == search_limit
assert result_value == hex(SEARCH_PATTERN)
@pwndbg_test
async def test_command_search_limit_multiple_pages(ctrl: Controller) -> None:
"""
Tests simple search limit for multiple memory pages
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
def filter_results(line):
return hex(SEARCH_PATTERN2).lower() in line.lower()
total_entries = 3
result_str: str = await ctrl.execute_and_capture(f"search -8 {SEARCH_PATTERN2}")
result_count = len(list(filter(filter_results, result_str.splitlines())))
assert result_count == total_entries
search_limit = 2
result_str = await ctrl.execute_and_capture(f"search -8 {SEARCH_PATTERN2} -l {search_limit}")
result_count = len(list(filter(filter_results, result_str.splitlines())))
assert result_count == search_limit
@pwndbg_test
async def test_command_search_alignment(ctrl: Controller) -> None:
"""
Tests aligned search
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
alignment = 8
result_str = await ctrl.execute_and_capture(
f"search --dword {SEARCH_PATTERN} -a {alignment} -w"
)
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_address = line.split(" ")[1]
assert int(result_address, 16) % alignment == 0
@pwndbg_test
async def test_command_search_step(ctrl: Controller) -> None:
"""
Tests stepped search
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
step = 0x1000
result_str = await ctrl.execute_and_capture(f"search --dword {SEARCH_PATTERN} -s {step} -w")
result_count = 0
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_count += 1
# We allocate 0x100000 bytes
assert result_count == 0x100
@pwndbg_test
async def test_command_search_byte_width(ctrl: Controller) -> None:
"""
Tests 1-byte search
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture("search --byte 0xef -w")
result_count = 0
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_count += 1
assert result_count > 0x100
@pwndbg_test
async def test_command_search_word_width(ctrl: Controller) -> None:
"""
Tests 2-byte word search
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture("search --word 0xbeef -w")
result_count = 0
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_count += 1
assert result_count > 0x100
@pwndbg_test
async def test_command_search_dword_width(ctrl: Controller) -> None:
"""
Tests 4-byte dword search
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture("search --dword 0xd00dbeef -w")
result_count = 0
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_count += 1
assert result_count > 0x100
@pwndbg_test
async def test_command_search_qword_width(ctrl: Controller) -> None:
"""
Tests 8-byte qword search
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture("search --dword 0x00000000d00dbeef -w")
result_count = 0
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_count += 1
assert result_count > 0x100
@pwndbg_test
async def test_command_search_rwx(ctrl: Controller) -> None:
"""
Tests searching for rwx memory only
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture("search --dword 0x00000000d00dbeef -w -x")
result_count = 0
for line in result_str.split("\n"):
if line.startswith("[anon_"):
result_count += 1
assert result_count == 0
@pwndbg_test
async def test_command_search_asm(ctrl: Controller) -> None:
"""
Tests searching for asm instructions
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture('search --asm "add rax, rdx" search_memory')
result_count = 0
for line in result_str.split("\n"):
if line.startswith("search_memory"):
result_count += 1
assert result_count == 2
@pwndbg_test
async def test_command_set_breakpoint_search_asm(ctrl: Controller) -> None:
"""
Tests setting breakpoints on found asm instructions
"""
await launch_to(ctrl, SEARCH_BINARY, "break_here")
result_str = await ctrl.execute_and_capture('search --asmbp "add rax, rdx" search_memory')
result_count = 0
for line in result_str.split("\n"):
if line.startswith("Breakpoint"):
result_count += 1
assert result_count == 2

@ -0,0 +1,36 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import pwndbg_test
STEPSYSCALL_X64_BINARY = get_binary("stepsyscall_x64.out")
@pwndbg_test
async def test_command_stepsyscall(ctrl: Controller) -> None:
import pwndbg.aglib.regs
import pwndbg.aglib.symbol
await ctrl.launch(STEPSYSCALL_X64_BINARY)
# Test that the logic correctly handles multiple consecutive jumps
await ctrl.execute("stepsyscall")
address = pwndbg.aglib.symbol.lookup_symbol_addr("syscall_write_label")
assert pwndbg.aglib.regs.pc == address
await ctrl.execute("stepsyscall")
address = pwndbg.aglib.symbol.lookup_symbol_addr("syscall_exit_label")
assert pwndbg.aglib.regs.pc == address
@pwndbg_test
async def test_command_nextsyscall(ctrl: Controller) -> None:
import pwndbg.aglib.regs
import pwndbg.aglib.symbol
await ctrl.launch(STEPSYSCALL_X64_BINARY)
await ctrl.execute("nextsyscall")
address = pwndbg.aglib.symbol.lookup_symbol_addr("syscall_exit_label")
assert pwndbg.aglib.regs.pc == address

@ -0,0 +1,27 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import launch_to
from . import pwndbg_test
STEPUNTILASM_X64_BINARY = get_binary("stepuntilasm_x64.out")
@pwndbg_test
async def test_command_untilasm_x64(ctrl: Controller) -> None:
await launch_to(ctrl, STEPUNTILASM_X64_BINARY, "break_here")
await run_and_verify(ctrl, "stop1", "nop")
await run_and_verify(ctrl, "stop2", "xor rax, rax")
await run_and_verify(ctrl, "stop3", "mov qword ptr [rax], 0x20")
await run_and_verify(ctrl, "stop4", "mov dword ptr [rax+4], 0x20")
async def run_and_verify(ctrl: Controller, stop_label: str, asm: str) -> None:
import pwndbg.aglib.regs
import pwndbg.aglib.symbol
await ctrl.execute(f"stepuntilasm {asm}")
address = pwndbg.aglib.symbol.lookup_symbol_addr(stop_label)
assert pwndbg.aglib.regs.pc == address

@ -0,0 +1,174 @@
from __future__ import annotations
import re
from ....host import Controller
from . import get_binary
from . import get_expr
from . import launch_to
from . import pwndbg_test
TELESCOPE_BINARY = get_binary("telescope_binary.out")
@pwndbg_test
async def test_command_telescope(ctrl: Controller) -> None:
"""
Tests simple telescope
"""
await ctrl.execute("set telescope-skip-repeating-val off")
await launch_to(ctrl, TELESCOPE_BINARY, "break_here")
await ctrl.execute("up")
result_str = await ctrl.execute_and_capture("telescope &a")
result_lines = result_str.split("\n")
value = get_expr("a")
fields = value.type.fields()
for i in range(len(fields)):
expected_addr = int(value.address) + fields[i].bitpos // 8
assert f"{expected_addr:x}" in result_lines[fields[i].bitpos // 64]
@pwndbg_test
async def test_command_telescope_reverse(ctrl: Controller) -> None:
"""
Tests reversed telescope
"""
await ctrl.execute("set telescope-skip-repeating-val off")
await launch_to(ctrl, TELESCOPE_BINARY, "break_here")
await ctrl.execute("up")
result_str = await ctrl.execute_and_capture("telescope ((uint8_t*)&a)+0x38 -r")
result_lines = result_str.split("\n")
value = get_expr("a")
fields = value.type.fields()
for i in range(len(fields)):
expected_addr = int(value.address) + fields[i].bitpos // 8
assert f"{expected_addr:x}" in result_lines[fields[i].bitpos // 64]
@pwndbg_test
async def test_command_telescope_n_records(ctrl: Controller) -> None:
"""
Tests telescope defined number of records
"""
await ctrl.launch(TELESCOPE_BINARY)
n = 3
# ???
# gdb.execute("entry")
result = (await ctrl.execute_and_capture(f"telescope $rsp {n}")).strip().splitlines()
assert len(result) == n
@pwndbg_test
async def test_telescope_command_with_address_as_count(ctrl: Controller) -> None:
import pwndbg.aglib.proc
import pwndbg.aglib.regs
await ctrl.launch(TELESCOPE_BINARY)
out = (await ctrl.execute_and_capture("telescope 2")).splitlines()
rsp = pwndbg.aglib.regs.rsp
assert len(out) == 2
assert out[0] == "00:0000│ rsp %#x ◂— 1" % rsp
expected = rf"01:0008│ {rsp + 8:#x} —▸ 0x[0-9a-f]+ ◂— '{pwndbg.aglib.proc.exe}'"
assert re.search(expected, out[1])
@pwndbg_test
async def test_telescope_command_with_address_as_count_and_reversed_flag(ctrl: Controller) -> None:
import pwndbg.aglib.regs
await ctrl.launch(TELESCOPE_BINARY)
out = (await ctrl.execute_and_capture("telescope -r 2")).splitlines()
rsp = pwndbg.aglib.regs.rsp
assert out == ["00:0000│ %#x ◂— 0" % (rsp - 8), "01:0008│ rsp %#x ◂— 1" % rsp]
@pwndbg_test
async def test_command_telescope_reverse_skipped_records_shows_input_address(
ctrl: Controller,
) -> None:
"""
Tests reversed telescope with skipped records shows input address
"""
import pwndbg.aglib.memory
import pwndbg.aglib.regs
await launch_to(ctrl, TELESCOPE_BINARY, "break_here")
await ctrl.execute("up")
pwndbg.aglib.memory.write(pwndbg.aglib.regs.rsp - 8 * 3, b"\x00" * 8 * 4)
expected_value = hex(pwndbg.aglib.regs.rsp)
result_str = await ctrl.execute_and_capture("telescope -r $rsp")
result_lines = result_str.strip("\n").split("\n")
assert expected_value in result_lines[-1]
@pwndbg_test
async def test_command_telescope_frame(ctrl: Controller) -> None:
"""
Tests telescope --frame
"""
import pwndbg.aglib.regs
await launch_to(ctrl, TELESCOPE_BINARY, "break_here")
rsp = hex(pwndbg.aglib.regs.sp)
rbp = hex(pwndbg.aglib.regs[pwndbg.aglib.regs.frame])
result_str = await ctrl.execute_and_capture("telescope --frame")
result_lines = result_str.strip().split("\n")
assert rsp in result_lines[0]
assert rbp in result_lines[-2]
@pwndbg_test
async def test_command_telescope_frame_bp_below_sp(ctrl: Controller) -> None:
"""
Tests telescope --frame when base pointer is below stack pointer
"""
import pwndbg.aglib.regs
await launch_to(ctrl, TELESCOPE_BINARY, "break_here")
await ctrl.execute("memoize") # turn off cache
pwndbg.aglib.regs.sp = pwndbg.aglib.regs[pwndbg.aglib.regs.frame] + 1
result_str = await ctrl.execute_and_capture("telescope --frame")
assert "Cannot display stack frame because base pointer is below stack pointer" in result_str
@pwndbg_test
async def test_command_telescope_frame_bp_sp_different_vmmaps(ctrl: Controller) -> None:
"""
Tests telescope --frame when base pointer and stack pointer are on different vmmap pages
"""
import pwndbg.aglib.regs
import pwndbg.aglib.vmmap
await launch_to(ctrl, TELESCOPE_BINARY, "break_here")
await ctrl.execute("memoize") # turn off cache
pages = pwndbg.aglib.vmmap.get()
pwndbg.aglib.regs.sp = pages[0].start
pwndbg.aglib.regs.rbp = pages[1].start
result_str = await ctrl.execute_and_capture("telescope --frame")
assert (
"Cannot display stack frame because base pointer is not on the same page with stack pointer"
in result_str
)

@ -0,0 +1,76 @@
from __future__ import annotations
from ....host import Controller
from . import get_binary
from . import pwndbg_test
REFERENCE_BINARY = get_binary("reference-binary.out")
@pwndbg_test
async def test_command_xor_with_dbg_execute(ctrl: Controller) -> None:
"""
Tests simple xoring
"""
import pwndbg.aglib.memory
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
before = pwndbg.aglib.regs.rsp
pwndbg.aglib.memory.write(before, b"aaaaaaaa")
await ctrl.execute("xor $rsp ' ' 4")
after = pwndbg.aglib.memory.read(before, 8)
assert after == b"AAAAaaaa"
@pwndbg_test
async def test_command_xor_with_int(ctrl: Controller) -> None:
"""
Tests simple xoring
"""
import pwndbg.aglib.memory
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
before = pwndbg.aglib.regs.rsp
assert isinstance(before, int)
pwndbg.aglib.memory.write(before, b"aaaaaaaa")
await ctrl.execute(f"xor {before} ' ' 4")
after = pwndbg.aglib.memory.read(before, 8)
assert after == b"AAAAaaaa"
@pwndbg_test
async def test_command_xor_with_hex(ctrl: Controller) -> None:
"""
Tests simple xoring
"""
import pwndbg.aglib.memory
import pwndbg.aglib.regs
await ctrl.launch(REFERENCE_BINARY)
before = pwndbg.aglib.regs.rsp
before_hex = hex(before)
assert isinstance(before_hex, str)
pwndbg.aglib.memory.write(before, b"aaaaaaaa")
await ctrl.execute(f"xor {before_hex} ' ' 4")
after = pwndbg.aglib.memory.read(before, 8)
assert after == b"AAAAaaaa"
@pwndbg_test
async def test_command_memfrob(ctrl: Controller) -> None:
import pwndbg.aglib.memory
import pwndbg.aglib.regs
from pwndbg.commands.xor import memfrob
await ctrl.launch(REFERENCE_BINARY)
before = pwndbg.aglib.regs.rsp
pwndbg.aglib.memory.write(before, b"aaaaaaaa")
memfrob(before, 4)
after = pwndbg.aglib.memory.read(before, 8)
assert after == b"KKKKaaaa"

@ -0,0 +1,37 @@
from __future__ import annotations
from ....host import Controller
from . import break_at_sym
from . import get_binary
from . import pwndbg_test
MMAP_GAPS_BINARY = get_binary("mmap_gaps.out")
@pwndbg_test
async def test_dump_mmap_args(ctrl: Controller):
"""
Tests dumpargs command on an xmmap call
"""
await ctrl.launch(MMAP_GAPS_BINARY)
# Run until main
break_at_sym("main")
await ctrl.cont()
# Stop on xmmap(...)
await ctrl.execute("nextcall")
# Step into the xmmap(...) call
await ctrl.step_instruction()
# Stop on mmap(...)
await ctrl.execute("nextcall")
out = (await ctrl.execute_and_capture("dumpargs")).splitlines()
assert len(out) == 6
assert out[0] == " addr: 0xcafe0000"
assert out[1] == " len: 0x1000"
assert out[2] == " prot: 1"
assert out[3] == " flags: 0x32 (MAP_PRIVATE|MAP_ANONYMOUS|MAP_FIXED)"
assert out[4] == " fd: 0xffffffff"
assert out[5] == " offset: 0"
Loading…
Cancel
Save