Additional type hints (#2120)

* Additional type hints

Activate `vermin --eval-annotations` to catch invalid type hints for Python 3.8.

Use `typing_extensions.ParamSpec` to avoid hiding function arguments through decorators.

* Fix safe linking detection

* Fix cast of glibc_version parameter in < Python 3.10

* Use "queue.Queue[int]" for Python 3.8 compatibility

* Address review

* Add comments and address review
pull/2169/head
peace-maker 2 years ago committed by GitHub
parent f2c4e2bfea
commit 90dc42e5b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -78,7 +78,7 @@ else
fi
# Checking minimum python version
vermin -vvv --no-tips -t=3.8- --violations ./pwndbg/
vermin -vvv --no-tips -t=3.8- --eval-annotations --violations ./pwndbg/
# mypy is run in a separate step on GitHub Actions
if [[ -z "$GITHUB_ACTIONS" ]]; then

25
poetry.lock generated

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "appnope"
@ -1321,13 +1321,13 @@ testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "plumbum"
version = "1.8.2"
version = "1.8.3"
description = "Plumbum: shell combinators library"
optional = false
python-versions = ">=3.6"
files = [
{file = "plumbum-1.8.2-py3-none-any.whl", hash = "sha256:3ad9e5f56c6ec98f6f7988f7ea8b52159662ea9e915868d369dbccbfca0e367e"},
{file = "plumbum-1.8.2.tar.gz", hash = "sha256:9e6dc032f4af952665f32f3206567bc23b7858b1413611afe603a3f8ad9bfd75"},
{file = "plumbum-1.8.3-py3-none-any.whl", hash = "sha256:8595d36dae2472587d6f59789c8d7b26250f45f6f6ed75ccb378de59ee7b9cf9"},
{file = "plumbum-1.8.3.tar.gz", hash = "sha256:6092c85ab970b7a7a9d5d85c75200bc93be82b33c9bdf640ffa87d2d7c8709f0"},
]
[package.dependencies]
@ -1955,6 +1955,21 @@ files = [
{file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"},
]
[[package]]
name = "sortedcontainers-stubs"
version = "2.4.2"
description = "Type stubs for sortedcontainers"
optional = false
python-versions = ">=3.8,<4.0"
files = [
{file = "sortedcontainers_stubs-2.4.2-py3-none-any.whl", hash = "sha256:6cdbcf9c71198729dd6c0a8a7dc9742768f25b49384b91965d716a9d16ca7b2a"},
{file = "sortedcontainers_stubs-2.4.2.tar.gz", hash = "sha256:c24e33877effa5a3eb85357112496ba33a0b681c05e0d5d3745664fd3be75d82"},
]
[package.dependencies]
sortedcontainers = ">=2,<3"
typing-extensions = ">=4.1.0,<5.0.0"
[[package]]
name = "stack-data"
version = "0.6.3"
@ -2339,4 +2354,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.8"
content-hash = "68e08925a9c889db020f8bd5f2267319c62b7f78c4ec32baabefa665d0c4d731"
content-hash = "509deff0815b809362ebbe6e672a08320e18fe6162f15358ef6092ae01d72938"

@ -1,6 +1,5 @@
from __future__ import annotations
import re
import signal
import gdb

@ -5,6 +5,9 @@ may be passed in a combination of registers and stack values.
from __future__ import annotations
from typing import List
from typing import Tuple
import gdb
from capstone import CS_GRP_CALL
from capstone import CS_GRP_INT
@ -13,7 +16,9 @@ import pwndbg.chain
import pwndbg.constants
import pwndbg.disasm
import pwndbg.gdblib.arch
import pwndbg.gdblib.file
import pwndbg.gdblib.memory
import pwndbg.gdblib.proc
import pwndbg.gdblib.regs
import pwndbg.gdblib.symbol
import pwndbg.gdblib.typeinfo
@ -52,12 +57,12 @@ ida_replacements = {
}
def get_syscall_name(instruction: PwndbgInstruction):
def get_syscall_name(instruction: PwndbgInstruction) -> str | None:
if CS_GRP_INT not in instruction.groups:
return None
syscall_register = pwndbg.lib.abi.ABI.syscall().syscall_register
syscall_arch = pwndbg.gdblib.arch.current
syscall_arch = pwndbg.gdblib.arch.name
# On x86/x64 `syscall` and `int <value>` instructions are in CS_GRP_INT
# but only `syscall` and `int 0x80` actually execute syscalls on Linux.
@ -79,7 +84,7 @@ def get_syscall_name(instruction: PwndbgInstruction):
return pwndbg.constants.syscall(syscall_number, syscall_arch) or "<unk_%d>" % syscall_number
def get(instruction: PwndbgInstruction):
def get(instruction: PwndbgInstruction) -> List[Tuple[pwndbg.lib.functions.Argument, int]]:
"""
Returns an array containing the arguments to the current function,
if $pc is a 'call' or 'bl' type instruction.
@ -109,7 +114,7 @@ def get(instruction: PwndbgInstruction):
if not target:
return []
if pwndbg.gdblib.arch.current in ["rv32", "rv64"]:
if pwndbg.gdblib.arch.name in ["rv32", "rv64"]:
target += instruction.address
target &= pwndbg.gdblib.arch.ptrmask
@ -180,7 +185,7 @@ def get(instruction: PwndbgInstruction):
return result
def argname(n, abi=None):
def argname(n: int, abi: pwndbg.lib.abi.ABI | None = None) -> str:
abi = abi or pwndbg.lib.abi.ABI.default()
regs = abi.register_arguments
@ -190,7 +195,7 @@ def argname(n, abi=None):
return "arg[%i]" % n
def argument(n, abi=None):
def argument(n: int, abi: pwndbg.lib.abi.ABI | None = None) -> int:
"""
Returns the nth argument, as if $pc were a 'call' or 'bl' type
instruction.
@ -209,7 +214,7 @@ def argument(n, abi=None):
return int(pwndbg.gdblib.memory.poi(pwndbg.gdblib.typeinfo.ppvoid, sp))
def arguments(abi=None):
def arguments(abi: pwndbg.lib.abi.ABI | None = None):
"""
Yields (arg_name, arg_value) tuples for arguments from a given ABI.
Works only for ABIs that use registers for arguments.
@ -221,7 +226,7 @@ def arguments(abi=None):
yield argname(i, abi), argument(i, abi)
def format_args(instruction: PwndbgInstruction):
def format_args(instruction: PwndbgInstruction) -> List[str]:
result = []
for arg, value in get(instruction):
code = arg.type != "char"

@ -1,5 +1,7 @@
from __future__ import annotations
from typing import List
import gdb
import pwndbg.color.memory as M
@ -29,14 +31,14 @@ c = ColorConfig(
def get(
address,
limit=LIMIT,
offset=0,
hard_stop=None,
hard_end=0,
include_start=True,
safe_linking=False,
):
address: int | None,
limit: int = int(LIMIT),
offset: int = 0,
hard_stop: int | None = None,
hard_end: int = 0,
include_start: bool = True,
safe_linking: bool = False,
) -> List[int] | None:
"""
Recursively dereferences an address. For bare metal, it will stop when the address is not in any of vmmap pages to avoid redundant dereference.
@ -58,7 +60,7 @@ def get(
limit = int(limit)
result = [address] if include_start else []
for i in range(limit):
for _ in range(limit):
# Don't follow cycles, except to stop at the second occurrence.
if result.count(address) >= 2:
break
@ -97,15 +99,15 @@ config_contiguous = theme.add_param(
def format(
value,
limit=LIMIT,
code=True,
offset=0,
hard_stop=None,
hard_end=0,
safe_linking=False,
enhance_string_len: int = None,
):
value: int | List[int] | None,
limit: int = int(LIMIT),
code: bool = True,
offset: int = 0,
hard_stop: int | None = None,
hard_end: int = 0,
safe_linking: bool = False,
enhance_string_len: int | None = None,
) -> str:
"""
Recursively dereferences an address into string representation, or convert the list representation
of address dereferences into string representation.
@ -132,13 +134,13 @@ def format(
if isinstance(value, list):
chain = value
else:
chain = get(value, limit, offset, hard_stop, hard_end, safe_linking=safe_linking)
chain = get(value, limit, offset, hard_stop, hard_end, safe_linking=safe_linking) or []
arrow_left = c.arrow(f" {config_arrow_left} ")
arrow_right = c.arrow(f" {config_arrow_right} ")
# Colorize the chain
rest = []
rest: List[str] = []
for link in chain:
symbol = pwndbg.gdblib.symbol.get(link) or None
if symbol:

@ -3,6 +3,8 @@ from __future__ import annotations
import os
import re
from typing import Callable
from typing import Dict
from typing import List
from typing import NamedTuple
from pwndbg.lib.config import Parameter
@ -134,8 +136,10 @@ disable_colors = theme.add_param(
)
def generateColorFunctionInner(old: Callable[[str], str], new: Callable[[str], str]):
def wrapper(text: str) -> str:
def generateColorFunctionInner(
old: Callable[[object], str], new: Callable[[str], str]
) -> Callable[[object], str]:
def wrapper(text: object) -> str:
return new(old(text))
return wrapper
@ -148,9 +152,9 @@ class ColorParamSpec(NamedTuple):
class ColorConfig:
def __init__(self, namespace: str, params: list[ColorParamSpec]) -> None:
def __init__(self, namespace: str, params: List[ColorParamSpec]) -> None:
self._namespace = namespace
self._params: dict[str, Parameter] = {}
self._params: Dict[str, Parameter] = {}
for param in params:
self._params[param.name] = theme.add_color_param(
f"{self._namespace}-{param.name}-color", param.default, param.doc
@ -165,7 +169,7 @@ class ColorConfig:
def generateColorFunction(
config: str | Parameter, _globals: dict[str, Callable[[str], str]] = globals()
config: str | Parameter, _globals: Dict[str, Callable[[str], str]] = globals()
) -> Callable[[object], str]:
# the `config` here may be a config Parameter object
# and if we run with disable_colors or if the config value

@ -1,5 +1,7 @@
from __future__ import annotations
from typing import List
from pwndbg.color import generateColorFunction
from pwndbg.color import theme
from pwndbg.gdblib import config
@ -88,7 +90,7 @@ def comment(x: object) -> str:
return generateColorFunction(config.comment_color)(x)
def format_flags(value, flags: BitFlags, last=None):
def format_flags(value: int | None, flags: BitFlags, last: int | None = None):
if value is None:
return "<unavailable>"
@ -96,7 +98,7 @@ def format_flags(value, flags: BitFlags, last=None):
if not flags:
return desc
names = []
names: List[str] = []
for name, bit in flags.items():
# If the size is not specified, assume it's 1
if isinstance(bit, int):

@ -1,5 +1,7 @@
from __future__ import annotations
from typing import List
import capstone
import pwndbg.chain
@ -54,20 +56,20 @@ WHITESPACE_LIMIT = 20
# To making the padding visually nicer, the following padding scheme is used for annotations:
# All instructions in a group will have the same amount of left-adjusting spaces, so they are aligned.
# A group is defined as a sequence of instructions surrounded by instructions that can change the instruction pointer.
def instructions_and_padding(instructions: list[PwndbgInstruction]) -> list[str]:
result: list[str] = []
def instructions_and_padding(instructions: List[PwndbgInstruction]) -> List[str]:
result: List[str] = []
cur_padding_len = None
# Stores intermediate padding results so we can do a final pass to clean up edges and jagged parts
# None if padding doesn't apply to the instruction
paddings: list[int | None] = []
paddings: List[int | None] = []
# Used for padding. List of groups.
# Each group is a list of index into paddings list
groups: list[list[int]] = []
groups: List[List[int]] = []
current_group: list[int] = []
current_group: List[int] = []
for i, (ins, asm) in enumerate(zip(instructions, (one_instruction(i) for i in instructions))):
if ins.can_change_instruction_pointer:

@ -64,6 +64,7 @@ def get(address: int | gdb.Value, text: str | None = None, prefix: str | None =
"""
address = int(address)
page = pwndbg.gdblib.vmmap.find(address)
color: Callable[[str], str]
if page is None:
color = normal

@ -3,6 +3,7 @@ from __future__ import annotations
import os.path
import re
from typing import Any
from typing import Dict
import pygments
import pygments.formatters
@ -24,7 +25,7 @@ style = theme.add_param(
formatter = pygments.formatters.Terminal256Formatter(style=str(style))
pwntools_lexer = PwntoolsLexer()
lexer_cache: dict[str, Any] = {}
lexer_cache: Dict[str, Any] = {}
@pwndbg.gdblib.config.trigger(style)

@ -14,18 +14,22 @@ from typing import Tuple
from typing import TypeVar
import gdb
from typing_extensions import ParamSpec
import pwndbg.exception
import pwndbg.gdblib.kernel
import pwndbg.gdblib.proc
import pwndbg.gdblib.qemu
import pwndbg.gdblib.regs
import pwndbg.heap
from pwndbg.color import message
from pwndbg.heap.ptmalloc import DebugSymsHeap
from pwndbg.heap.ptmalloc import GlibcMemoryAllocator
from pwndbg.heap.ptmalloc import HeuristicHeap
from pwndbg.heap.ptmalloc import SymbolUnresolvableError
T = TypeVar("T")
P = ParamSpec("P")
commands: List[Command] = []
command_names: Set[str] = set()
@ -255,9 +259,9 @@ def fix_int_reraise(*a, **kw) -> int:
return fix_int(*a, reraise=True, **kw)
def OnlyWithFile(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWithFile(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWithFile(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWithFile(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.proc.exe:
return function(*a, **kw)
else:
@ -270,9 +274,9 @@ def OnlyWithFile(function: Callable[..., T]) -> Callable[..., Optional[T]]:
return _OnlyWithFile
def OnlyWhenQemuKernel(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWhenQemuKernel(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWhenQemuKernel(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWhenQemuKernel(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.qemu.is_qemu_kernel():
return function(*a, **kw)
else:
@ -284,9 +288,9 @@ def OnlyWhenQemuKernel(function: Callable[..., T]) -> Callable[..., Optional[T]]
return _OnlyWhenQemuKernel
def OnlyWhenUserspace(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWhenUserspace(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWhenUserspace(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWhenUserspace(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if not pwndbg.gdblib.qemu.is_qemu_kernel():
return function(*a, **kw)
else:
@ -298,7 +302,7 @@ def OnlyWhenUserspace(function: Callable[..., T]) -> Callable[..., Optional[T]]:
return _OnlyWhenUserspace
def OnlyWithArch(arch_names: List[str]) -> Callable[[Callable[..., T]], Callable[..., Optional[T]]]:
def OnlyWithArch(arch_names: List[str]) -> Callable[[Callable[P, T]], Callable[P, Optional[T]]]:
"""Decorates function to work only with the specified archictectures."""
for arch in arch_names:
if arch not in pwndbg.gdblib.arch_mod.ARCHS:
@ -306,9 +310,9 @@ def OnlyWithArch(arch_names: List[str]) -> Callable[[Callable[..., T]], Callable
f"OnlyWithArch used with unsupported arch={arch}. Must be one of {', '.join(arch_names)}"
)
def decorator(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def decorator(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWithArch(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWithArch(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.arch.name in arch_names:
return function(*a, **kw)
else:
@ -324,9 +328,9 @@ def OnlyWithArch(arch_names: List[str]) -> Callable[[Callable[..., T]], Callable
return decorator
def OnlyWithKernelDebugSyms(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWithKernelDebugSyms(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWithKernelDebugSyms(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWithKernelDebugSyms(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.kernel.has_debug_syms():
return function(*a, **kw)
else:
@ -338,9 +342,9 @@ def OnlyWithKernelDebugSyms(function: Callable[..., T]) -> Callable[..., Optiona
return _OnlyWithKernelDebugSyms
def OnlyWhenPagingEnabled(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWhenPagingEnabled(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWhenPagingEnabled(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWhenPagingEnabled(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.kernel.paging_enabled():
return function(*a, **kw)
else:
@ -350,9 +354,9 @@ def OnlyWhenPagingEnabled(function: Callable[..., T]) -> Callable[..., Optional[
return _OnlyWhenPagingEnabled
def OnlyWhenRunning(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWhenRunning(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWhenRunning(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWhenRunning(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.proc.alive:
return function(*a, **kw)
else:
@ -362,9 +366,10 @@ def OnlyWhenRunning(function: Callable[..., T]) -> Callable[..., Optional[T]]:
return _OnlyWhenRunning
def OnlyWithTcache(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWithTcache(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWithTcache(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWithTcache(*a: P.args, **kw: P.kwargs) -> Optional[T]:
assert isinstance(pwndbg.heap.current, GlibcMemoryAllocator)
if pwndbg.heap.current.has_tcache():
return function(*a, **kw)
else:
@ -376,10 +381,10 @@ def OnlyWithTcache(function: Callable[..., T]) -> Callable[..., Optional[T]]:
return _OnlyWithTcache
def OnlyWhenHeapIsInitialized(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def OnlyWhenHeapIsInitialized(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWhenHeapIsInitialized(*a: Any, **kw: Any) -> Optional[T]:
if pwndbg.heap.current.is_initialized():
def _OnlyWhenHeapIsInitialized(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.heap.current is not None and pwndbg.heap.current.is_initialized():
return function(*a, **kw)
else:
print(f"{function.__name__}: Heap is not initialized yet.")
@ -394,7 +399,7 @@ def _is_statically_linked() -> bool:
return "No shared libraries loaded at this time." in out
def _try2run_heap_command(function: Callable[..., str | None], a: Any, kw: Any) -> str | None:
def _try2run_heap_command(function: Callable[P, T], *a: P.args, **kw: P.kwargs) -> T | None:
e = lambda s: print(message.error(s))
w = lambda s: print(message.warn(s))
# Note: We will still raise the error for developers when exception-* is set to "on"
@ -430,9 +435,9 @@ def _try2run_heap_command(function: Callable[..., str | None], a: Any, kw: Any)
return None
def OnlyWithResolvedHeapSyms(function: Callable[..., T]) -> Callable[..., T]:
def OnlyWithResolvedHeapSyms(function: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(function)
def _OnlyWithResolvedHeapSyms(*a: Any, **kw: Any):
def _OnlyWithResolvedHeapSyms(*a: P.args, **kw: P.kwargs) -> T | None:
e = lambda s: print(message.error(s))
w = lambda s: print(message.warn(s))
if (
@ -442,8 +447,12 @@ def OnlyWithResolvedHeapSyms(function: Callable[..., T]) -> Callable[..., T]:
):
# In auto mode, we will try to use the debug symbols if possible
pwndbg.heap.current = DebugSymsHeap()
if pwndbg.heap.current.can_be_resolved():
return _try2run_heap_command(function, a, kw) # type: ignore[arg-type]
if (
pwndbg.heap.current is not None
and isinstance(pwndbg.heap.current, GlibcMemoryAllocator)
and pwndbg.heap.current.can_be_resolved()
):
return _try2run_heap_command(function, *a, **kw)
else:
if (
isinstance(pwndbg.heap.current, DebugSymsHeap)
@ -457,7 +466,7 @@ def OnlyWithResolvedHeapSyms(function: Callable[..., T]) -> Callable[..., T]:
"pwndbg will try to resolve the heap symbols via heuristic now since we cannot resolve the heap via the debug symbols.\n"
"This might not work in all cases. Use `help set resolve-heap-via-heuristic` for more details.\n"
)
return _try2run_heap_command(function, a, kw)
return _try2run_heap_command(function, *a, **kw)
elif _is_statically_linked():
e(
"Can't find GLIBC version required for this command to work since this is a statically linked binary"
@ -501,6 +510,7 @@ def OnlyWithResolvedHeapSyms(function: Callable[..., T]) -> Callable[..., T]:
pwndbg.exception.inform_report_issue(
"An unknown error occurred when resolved the heap"
)
return None
return _OnlyWithResolvedHeapSyms

@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
from typing import Dict
import pwndbg.commands
from pwndbg.color import message
@ -11,7 +12,7 @@ parser.add_argument(
)
parser.add_argument("comment", type=str, default=None, help="The text you want to comment")
file_lists: dict[str, dict[str, str]] = {} # This saves all comments.
file_lists: Dict[str, Dict[str, str]] = {} # This saves all comments.
@pwndbg.commands.ArgparsedCommand(parser)

@ -7,6 +7,7 @@ import sys
from collections import defaultdict
from typing import Any
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Tuple
@ -92,7 +93,7 @@ config_max_threads_display = pwndbg.gdblib.config.add_param(
)
# Storing output configuration per section
outputs: dict[str, str] = {}
outputs: Dict[str, str] = {}
output_settings = {}
@ -394,7 +395,7 @@ def context(subcontext=None) -> None:
sections += [(arg, context_sections.get(arg[0], None)) for arg in args]
result = defaultdict(list)
result_settings: DefaultDict[str, dict[Any, Any]] = defaultdict(dict)
result_settings: DefaultDict[str, Dict[Any, Any]] = defaultdict(dict)
for section, func in sections:
if func:
target = output(section)
@ -852,7 +853,7 @@ def context_args(with_banner=True, target=sys.stdout, width=None):
return args
last_signal: list[str] = []
last_signal: List[str] = []
thread_status_messages = {
"running": pwndbg.color.light_green("running"),

@ -22,10 +22,12 @@ import os
import subprocess
import sys
import tempfile
from typing import Callable
from typing import Dict
from typing import TypeVar
import gdb
from typing_extensions import ParamSpec
from typing_extensions import Protocol
import pwndbg
import pwndbg.commands
@ -34,6 +36,7 @@ import pwndbg.lib.gcc
import pwndbg.lib.tempfile
from pwndbg.color import message
P = ParamSpec("P")
T = TypeVar("T")
gcc_compiler_path = pwndbg.gdblib.config.add_param(
@ -51,7 +54,7 @@ cymbol_editor = pwndbg.gdblib.config.add_param(
)
# Remeber loaded symbols. This would be useful for 'remove-symbol-file'.
loaded_symbols: dict[str, str] = {}
loaded_symbols: Dict[str, str] = {}
# Where generated symbol source files are saved.
pwndbg_cachedir = pwndbg.lib.tempfile.cachedir("custom-symbols")
@ -64,10 +67,16 @@ def unload_loaded_symbol(custom_structure_name: str) -> None:
loaded_symbols.pop(custom_structure_name)
def OnlyWhenStructFileExists(func: Callable[..., T]) -> Callable[..., T]:
class _OnlyWhenStructFileExists(Protocol):
def __call__(self, custom_structure_name: str, custom_structure_path: str = "") -> T | None: ...
def OnlyWhenStructFileExists(func: _OnlyWhenStructFileExists) -> _OnlyWhenStructFileExists:
@functools.wraps(func)
def wrapper(custom_structure_name: str) -> T | None:
pwndbg_custom_structure_path = os.path.join(pwndbg_cachedir, custom_structure_name) + ".c"
def wrapper(custom_structure_name: str, custom_structure_path: str = "") -> T | None:
pwndbg_custom_structure_path = (
custom_structure_path or os.path.join(pwndbg_cachedir, custom_structure_name) + ".c"
)
if not os.path.exists(pwndbg_custom_structure_path):
print(message.error("No custom structure was found with the given name!"))
return None
@ -146,7 +155,7 @@ def add_custom_structure(custom_structure_name: str) -> None:
@OnlyWhenStructFileExists
def edit_custom_structure(custom_structure_name: str, custom_structure_path: str) -> None:
def edit_custom_structure(custom_structure_name: str, custom_structure_path: str = "") -> None:
# Lookup an editor to use for editing the custom structure.
editor_preference = os.getenv("EDITOR")
if not editor_preference:
@ -179,14 +188,14 @@ def edit_custom_structure(custom_structure_name: str, custom_structure_path: str
@OnlyWhenStructFileExists
def remove_custom_structure(custom_structure_name: str, custom_structure_path: str) -> None:
def remove_custom_structure(custom_structure_name: str, custom_structure_path: str = "") -> None:
unload_loaded_symbol(custom_structure_name)
os.remove(custom_structure_path)
print(message.success("Symbols are removed!"))
@OnlyWhenStructFileExists
def load_custom_structure(custom_structure_name: str, custom_structure_path: str) -> None:
def load_custom_structure(custom_structure_name: str, custom_structure_path: str = "") -> None:
unload_loaded_symbol(custom_structure_name)
pwndbg_debug_symbols_output_file = generate_debug_symbols(custom_structure_path)
if not pwndbg_debug_symbols_output_file:
@ -197,7 +206,7 @@ def load_custom_structure(custom_structure_name: str, custom_structure_path: str
@OnlyWhenStructFileExists
def show_custom_structure(custom_structure_name: str, custom_structure_path: str) -> None:
def show_custom_structure(custom_structure_name: str, custom_structure_path: str = "") -> None:
# Call non-caching version of the function (thus .__wrapped__)
highlighted_source = pwndbg.pwndbg.commands.context.get_highlight_source.__wrapped__(
custom_structure_path
@ -251,7 +260,7 @@ parser.add_argument(
@pwndbg.commands.ArgparsedCommand(parser)
def cymbol(add, remove, edit, load, show) -> None:
def cymbol(add: str, remove: str, edit: str, load: str, show: str) -> None:
if add:
add_custom_structure(add)
elif remove:

@ -1,12 +1,14 @@
from __future__ import annotations
import argparse
from typing import List
import pwndbg.arguments
import pwndbg.chain
import pwndbg.commands
import pwndbg.commands.telescope
import pwndbg.disasm
import pwndbg.gdblib.arch
parser = argparse.ArgumentParser(description="Prints determined arguments for call instruction.")
parser.add_argument("-f", "--force", action="store_true", help="Force displaying of all arguments.")
@ -14,7 +16,7 @@ parser.add_argument("-f", "--force", action="store_true", help="Force displaying
@pwndbg.commands.ArgparsedCommand(parser, aliases=["args"])
@pwndbg.commands.OnlyWhenRunning
def dumpargs(force=False) -> None:
def dumpargs(force: bool = False) -> None:
args = (not force and call_args()) or all_args()
if args:
@ -22,17 +24,17 @@ def dumpargs(force=False) -> None:
else:
print("Couldn't resolve call arguments from registers.")
print(
f"Detected ABI: {pwndbg.gdblib.arch.current} ({pwndbg.gdblib.arch.ptrsize * 8} bit) either doesn't pass arguments through registers or is not implemented. Maybe they are passed on the stack?"
f"Detected ABI: {pwndbg.gdblib.arch.name} ({pwndbg.gdblib.arch.ptrsize * 8} bit) either doesn't pass arguments through registers or is not implemented. Maybe they are passed on the stack?"
)
def call_args():
def call_args() -> List[str]:
"""
Returns list of resolved call argument strings for display.
Attempts to resolve the target and determine the number of arguments.
Should be used only when being on a call instruction.
"""
results = []
results: List[str] = []
for arg, value in pwndbg.arguments.get(pwndbg.disasm.one()):
code = arg.type != "char"
@ -42,11 +44,11 @@ def call_args():
return results
def all_args():
def all_args() -> List[str]:
"""
Returns list of all argument strings for display.
"""
results = []
results: List[str] = []
for name, value in pwndbg.arguments.arguments():
results.append("%4s = %s" % (name, pwndbg.chain.format(value)))

@ -3,16 +3,25 @@ from __future__ import annotations
import argparse
import ctypes
from string import printable
from typing import Dict
from typing import List
from typing import Set
import gdb
from tabulate import tabulate
import pwndbg.chain
import pwndbg.color.context as C
import pwndbg.color.memory as M
import pwndbg.commands
import pwndbg.gdblib.config
import pwndbg.gdblib.memory
import pwndbg.gdblib.proc
import pwndbg.gdblib.symbol
import pwndbg.gdblib.typeinfo
import pwndbg.gdblib.vmmap
import pwndbg.glibc
import pwndbg.heap
import pwndbg.lib.heap.helpers
from pwndbg.color import generateColorFunction
from pwndbg.color import message
@ -24,36 +33,44 @@ from pwndbg.heap.ptmalloc import Bins
from pwndbg.heap.ptmalloc import BinType
from pwndbg.heap.ptmalloc import Chunk
from pwndbg.heap.ptmalloc import DebugSymsHeap
from pwndbg.heap.ptmalloc import GlibcMemoryAllocator
from pwndbg.heap.ptmalloc import Heap
def read_chunk(addr):
def read_chunk(addr: int) -> Dict[str, int]:
"""Read a chunk's metadata."""
# In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`.
# To support both versions, change the new names to the old ones here so that
# the rest of the code can deal with uniform names.
assert isinstance(pwndbg.heap.current, GlibcMemoryAllocator)
assert pwndbg.heap.current.malloc_chunk is not None
renames = {
"mchunk_size": "size",
"mchunk_prev_size": "prev_size",
}
if isinstance(pwndbg.heap.current, DebugSymsHeap):
val = pwndbg.gdblib.typeinfo.read_gdbvalue("struct malloc_chunk", addr)
val = pwndbg.gdblib.memory.poi(pwndbg.heap.current.malloc_chunk, addr)
else:
val = pwndbg.heap.current.malloc_chunk(addr)
return {renames.get(key, key): int(val[key]) for key in val.type.keys()}
value_keys: List[str] = val.type.keys()
return {renames.get(key, key): int(val[key]) for key in value_keys}
def format_bin(bins: Bins, verbose=False, offset=None):
def format_bin(bins: Bins, verbose: bool = False, offset: int | None = None) -> List[str]:
assert isinstance(pwndbg.heap.current, GlibcMemoryAllocator)
allocator = pwndbg.heap.current
if offset is None:
offset = allocator.chunk_key_offset("fd")
result = []
result: List[str] = []
bins_type = bins.bin_type
for size in bins.bins:
b = bins.bins[size]
count, is_chain_corrupted = None, False
count: int | None = None
chain_fd: List[int] = []
chain_bk: List[int] | None = []
is_chain_corrupted = False
safe_lnk = False
# fastbins consists of only single linked list
@ -128,7 +145,7 @@ def print_no_arena_found_error(tid=None) -> None:
)
def print_no_tcache_bins_found_error(tid=None) -> None:
def print_no_tcache_bins_found_error(tid: int | None = None) -> None:
if tid is None:
tid = pwndbg.gdblib.proc.thread_id
print(
@ -163,11 +180,12 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def heap(addr=None, verbose=False, simple=False) -> None:
def heap(addr: int | None = None, verbose: bool = False, simple: bool = False) -> None:
"""Iteratively print chunks on a heap, default to the current thread's
active heap.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
if addr is not None:
chunk = Chunk(addr)
@ -213,7 +231,7 @@ parser.add_argument(
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
def hi(addr, verbose=False, simple=False, fake=False) -> None:
def hi(addr: int, verbose: bool = False, simple: bool = False, fake: bool = False) -> None:
try:
heap = Heap(addr)
except Exception as E:
@ -230,7 +248,7 @@ def hi(addr, verbose=False, simple=False, fake=False) -> None:
start = chunk.address + (pwndbg.gdblib.arch.ptrsize if chunk.prev_inuse else 0x00)
print(f"Your address: {hex(addr)}")
print(f"Head offset: {hex(addr - start)}")
if chunk.is_top_chunk is False:
if chunk.is_top_chunk is False and chunk.real_size is not None:
end = (
start
+ chunk.real_size
@ -253,9 +271,10 @@ parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def arena(addr=None) -> None:
def arena(addr: int | None = None) -> None:
"""Print the contents of an arena, default to the current thread's arena."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
if addr is not None:
arena = Arena(addr)
@ -285,6 +304,8 @@ parser = argparse.ArgumentParser(description="List this process's arenas.")
def arenas() -> None:
"""Lists this process's arenas."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
arenas = allocator.arenas
table = []
@ -347,11 +368,13 @@ parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWithTcache
@pwndbg.commands.OnlyWhenUserspace
def tcache(addr=None) -> None:
def tcache(addr: int | None = None) -> None:
"""Print a thread's tcache contents, default to the current thread's
tcache.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
tcache = allocator.get_tcache(addr)
# if the current thread doesn't allocate the arena, tcache will be NULL
tid = pwndbg.gdblib.proc.thread_id
@ -377,6 +400,8 @@ parser = argparse.ArgumentParser(description="Print the mp_ struct's contents.")
def mp() -> None:
"""Print the mp_ struct's contents."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
print(message.notice("mp_ struct at: ") + message.hint(hex(allocator.mp.address)))
print(allocator.mp)
@ -394,11 +419,12 @@ parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def top_chunk(addr=None) -> None:
def top_chunk(addr: int | None = None) -> None:
"""Print relevant information about an arena's top chunk, default to the
current thread's arena.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
if addr is not None:
arena = Arena(addr)
@ -434,14 +460,22 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def malloc_chunk(addr, fake=False, verbose=False, simple=False, next=0, dump=False) -> None:
def malloc_chunk(
addr: int,
fake: bool = False,
verbose: bool = False,
simple: bool = False,
next: int = 0,
dump: bool = False,
) -> None:
"""Print a malloc_chunk struct's contents."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
chunk = Chunk(addr)
headers_to_print = [] # both state (free/allocated) and flags
fields_to_print = set() # in addition to addr and size
headers_to_print: List[str] = [] # both state (free/allocated) and flags
fields_to_print: Set[str] = set() # in addition to addr and size
out_fields = f"Addr: {M.get(chunk.address)}\n"
if fake:
@ -543,16 +577,19 @@ parser.add_argument("tcache_addr", nargs="?", type=int, default=None, help="Addr
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def bins(addr=None, tcache_addr=None) -> None:
def bins(addr: int | None = None, tcache_addr: int | None = None) -> None:
"""Print the contents of all an arena's bins and a thread's tcache,
default to the current thread's arena and tcache.
"""
if pwndbg.heap.current.has_tcache():
if tcache_addr is None and pwndbg.heap.current.thread_cache is None:
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
if allocator.has_tcache():
if tcache_addr is None and allocator.thread_cache is None:
print_no_tcache_bins_found_error()
else:
tcachebins(tcache_addr)
if addr is None and pwndbg.heap.current.thread_arena is None:
if addr is None and allocator.thread_arena is None:
print_no_arena_found_error()
return
fastbins(addr)
@ -577,11 +614,13 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def fastbins(addr=None, verbose=False) -> None:
def fastbins(addr: int | None = None, verbose: bool = False) -> None:
"""Print the contents of an arena's fastbins, default to the current
thread's arena.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
fastbins = allocator.fastbins(addr)
if fastbins is None:
@ -611,11 +650,13 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def unsortedbin(addr=None, verbose=False) -> None:
def unsortedbin(addr: int | None = None, verbose: bool = False) -> None:
"""Print the contents of an arena's unsortedbin, default to the current
thread's arena.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
unsortedbin = allocator.unsortedbin(addr)
if unsortedbin is None:
@ -645,11 +686,13 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def smallbins(addr=None, verbose=False) -> None:
def smallbins(addr: int | None = None, verbose: bool = False) -> None:
"""Print the contents of an arena's smallbins, default to the current
thread's arena.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
smallbins = allocator.smallbins(addr)
if smallbins is None:
@ -679,11 +722,12 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def largebins(addr=None, verbose=False) -> None:
def largebins(addr: int | None = None, verbose: bool = False) -> None:
"""Print the contents of an arena's largebins, default to the current
thread's arena.
"""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
largebins = allocator.largebins(addr)
if largebins is None:
@ -713,9 +757,11 @@ parser.add_argument(
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWithTcache
@pwndbg.commands.OnlyWhenUserspace
def tcachebins(addr=None, verbose=False) -> None:
def tcachebins(addr: int | None = None, verbose: bool = False) -> None:
"""Print the contents of a tcache, default to the current thread's tcache."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
tcachebins = allocator.tcachebins(addr)
if tcachebins is None:
@ -764,16 +810,28 @@ parser.add_argument(
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def find_fake_fast(
target_address, max_candidate_size=None, align=False, glibc_fastbin_bug=False
target_address: int,
max_candidate_size: int | None = None,
align: bool = False,
glibc_fastbin_bug: bool = False,
) -> None:
"""Find candidate fake fast chunks overlapping the specified address."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
size_sz = allocator.size_sz
min_chunk_size = allocator.min_chunk_size
global_max_fast = allocator.global_max_fast
size_field_width = gdb.lookup_type("unsigned int").sizeof if glibc_fastbin_bug else size_sz
if global_max_fast is None:
print(
message.warn(
"The global_max_fast symbol is not available, falling back to the default value of 0x80"
)
)
global_max_fast = 0x80
if max_candidate_size is None:
max_candidate_size = global_max_fast
else:
@ -915,10 +973,15 @@ group.add_argument(
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def vis_heap_chunks(
addr=None, count=None, beyond_top=None, no_truncate=None, all_chunks=None
addr: int | None = None,
count: int | None = None,
beyond_top: bool = False,
no_truncate: bool = False,
all_chunks: bool = False,
) -> None:
"""Visualize chunks on a heap, default to the current arena's active heap."""
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
if addr is not None:
cursor = int(addr)
@ -1006,7 +1069,7 @@ def vis_heap_chunks(
>> 1
)
bin_labels_map: dict[int, list[str]] = bin_labels_mapping(bin_collections)
bin_labels_map: Dict[int, List[str]] = bin_labels_mapping(bin_collections)
for c, stop in enumerate(chunk_delims):
color_func = color_funcs[c % len(color_funcs)]
@ -1076,7 +1139,7 @@ def bin_labels_mapping(collections):
We precompute all of them because doing this on demand was too slow and inefficient
See #1675 for more details
"""
labels_mapping: dict[int, list[str]] = {}
labels_mapping: Dict[int, List[str]] = {}
for bins in collections:
if not bins:
@ -1106,7 +1169,7 @@ try_free_parser.add_argument("addr", nargs="?", help="Address passed to free")
@pwndbg.commands.ArgparsedCommand(try_free_parser, category=CommandCategory.HEAP)
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def try_free(addr) -> None:
def try_free(addr: str | int) -> None:
addr = int(addr)
# check hook
@ -1122,6 +1185,7 @@ def try_free(addr) -> None:
# constants
allocator = pwndbg.heap.current
assert isinstance(allocator, GlibcMemoryAllocator)
arena = allocator.thread_arena
# arena might be None if the current thread doesn't allocate the arena
if arena is None:
@ -1136,7 +1200,7 @@ def try_free(addr) -> None:
ptr_size = pwndbg.gdblib.arch.ptrsize
def unsigned_size(size):
def unsigned_size(size: int):
# read_chunk()['size'] is signed in pwndbg ;/
# there may be better way to handle that
if ptr_size < 8:
@ -1144,11 +1208,11 @@ def try_free(addr) -> None:
x = ctypes.c_uint64(size).value
return x
def chunksize(chunk_size):
def chunksize(chunk_size: int):
# maybe move this to ptmalloc.py
return chunk_size & (~7)
def finalize(errors_found, returned_before_error) -> None:
def finalize(errors_found: int, returned_before_error: bool) -> None:
print("-" * 10)
if returned_before_error:
print(message.success("Free should succeed!"))
@ -1221,9 +1285,13 @@ def try_free(addr) -> None:
errors_found += 1
# tcache
if allocator.has_tcache() and "key" in allocator.tcache_entry.keys():
if (
allocator.has_tcache()
and allocator.tcache_entry is not None
and "key" in allocator.tcache_entry.keys()
):
tc_idx = (chunk_size_unmasked - chunk_minsize + malloc_alignment - 1) // malloc_alignment
if tc_idx < allocator.mp["tcache_bins"]:
if allocator.mp is not None and tc_idx < allocator.mp["tcache_bins"]:
print(message.notice("Tcache checks"))
e = addr + 2 * size_sz
e += allocator.tcache_entry.keys().index("key") * ptr_size
@ -1456,7 +1524,7 @@ def try_free(addr) -> None:
finalize(errors_found, returned_before_error)
def try_unlink(addr) -> None:
def try_unlink(addr: int) -> None:
pass
@ -1471,7 +1539,7 @@ parser.add_argument(
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
def heap_config(filter_pattern) -> None:
def heap_config(filter_pattern: str) -> None:
display_config(filter_pattern, "heap", has_file_command=False)
print(

@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
from typing import List
import gdb
@ -31,7 +32,7 @@ parser.add_argument(
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PROCESS)
@pwndbg.commands.OnlyWhenRunning
def killthreads(thread_ids: list[int] | None = None, all: bool = False) -> None:
def killthreads(thread_ids: List[int] | None = None, all: bool = False) -> None:
if len(thread_ids) == 0 and not all:
print(message.error("No thread IDs or --all flag specified"))
return

@ -6,6 +6,8 @@ from __future__ import annotations
import argparse
import queue
from typing import Dict
from typing import List
import gdb
@ -142,7 +144,7 @@ def leakfind(
# We need to store both so that we can nicely create our leak chain.
visited_map = {}
visited_set = {int(address)}
address_queue: queue.Queue[int] = queue.Queue()
address_queue: "queue.Queue[int]" = queue.Queue()
address_queue.put(int(address))
depth = 0
time_to_depth_increase = 0
@ -175,7 +177,7 @@ def leakfind(
break
# A map of length->list of lines. Used to let us print in a somewhat nice manner.
output_map: dict[int, list[str]] = {}
output_map: Dict[int, List[str]] = {}
arrow_right = C.arrow(" %s " % pwndbg.gdblib.config.chain_arrow_right)
for child in visited_map:

@ -5,6 +5,7 @@ import binascii
import codecs
import os
import struct
from typing import Set
import gdb
import pwnlib
@ -20,7 +21,7 @@ import pwndbg.search
from pwndbg.color import message
from pwndbg.commands import CommandCategory
saved: set[int] = set()
saved: Set[int] = set()
def print_search_hit(address) -> None:

@ -1,6 +1,9 @@
from __future__ import annotations
import argparse
from typing import Dict
from typing import List
from typing import Set
from typing import Tuple
import pwnlib.rop.srop
@ -19,7 +22,7 @@ from pwndbg.lib.regs import i386
# Grab frame values from pwntools. Offsets are defined as the offset to stack pointer when syscall instruction is called
# Offsets and names are from Linux kernel source. For example x86_64 is defined in CONFIG_X86_64 struct rt_sigframe (Linux Kernel /arch/x86/include/asm/sigframe.h)
SIGRETURN_FRAME_LAYOUTS: dict[str, list[Tuple[int, str]]] = {
SIGRETURN_FRAME_LAYOUTS: Dict[str, List[Tuple[int, str]]] = {
"x86-64": sorted([(-8, "&pretcode")] + list(pwnlib.rop.srop.registers["amd64"].items())),
"i386": sorted(pwnlib.rop.srop.registers["i386"].items()),
"aarch64": sorted(pwnlib.rop.srop.registers["aarch64"].items()),
@ -27,7 +30,7 @@ SIGRETURN_FRAME_LAYOUTS: dict[str, list[Tuple[int, str]]] = {
}
# Always print these registers (as well as flag register, eflags / cpsr)
SIGRETURN_CORE_REGISTER: dict[str, set[str]] = {
SIGRETURN_CORE_REGISTER: Dict[str, Set[str]] = {
"x86-64": {*amd64.gpr, amd64.frame, amd64.stack, amd64.pc},
"i386": {*i386.gpr, i386.frame, i386.stack, i386.pc},
"aarch64": {*aarch64.gpr, "sp", "pc"},

@ -10,6 +10,8 @@ import argparse
import collections
import math
from typing import DefaultDict
from typing import Dict
from typing import List
import pwndbg.chain
import pwndbg.color.telescope as T
@ -154,7 +156,7 @@ def telescope(
count = max(math.ceil(count / ptrsize), 1)
# Map of address to register string
reg_values: DefaultDict[int, list[str]] = collections.defaultdict(list)
reg_values: DefaultDict[int, List[str]] = collections.defaultdict(list)
for reg in pwndbg.gdblib.regs.common:
reg_values[pwndbg.gdblib.regs[reg]].append(reg)
@ -168,7 +170,7 @@ def telescope(
step = -1 * ptrsize
# Find all registers which show up in the trace, map address to regs
regs: dict[int, str] = {}
regs: Dict[int, str] = {}
for i in range(start, stop, step):
values = list(reg_values[i])
@ -187,7 +189,7 @@ def telescope(
# Print everything out
result = []
last = None
collapse_buffer: list[str] = []
collapse_buffer: List[str] = []
skipped_padding = (
2
+ len(offset_delimiter)
@ -265,7 +267,7 @@ def telescope(
return result
def regs_or_frame_offset(addr: int, bp: int | None, regs: dict[int, str], longest_regs: int) -> str:
def regs_or_frame_offset(addr: int, bp: int | None, regs: Dict[int, str], longest_regs: int) -> str:
# bp only set if print_framepointer_offset=True
# len(regs[addr]) == 1 if no registers pointer to address
if bp is None or len(regs[addr]) > 1 or not -0xFFF <= addr - bp <= 0xFFF:

@ -1,6 +1,7 @@
from __future__ import annotations
import pwndbg.gdblib.arch
from types import ModuleType
from typing import Dict
from . import aarch64
from . import amd64
@ -10,7 +11,7 @@ from . import mips
from . import riscv64
from . import thumb
arches = {
arches: Dict[str, ModuleType] = {
"arm": arm,
"armcm": arm,
"i386": i386,
@ -22,19 +23,19 @@ arches = {
}
def syscall(number: int, arch):
def syscall(number: int, arch: str) -> str | None:
"""
Given a syscall number and architecture, returns the name of the syscall.
E.g. execve == 59 on x86-64
"""
arch = arches.get(arch, None)
arch_module = arches.get(arch)
if arch is None:
if arch_module is None:
return None
prefix = "__NR_"
for k, v in arch.__dict__.items():
for k, v in arch_module.__dict__.items():
if v != number:
continue

@ -1,21 +1,29 @@
from __future__ import annotations
import functools
from typing import Any
from typing import Callable
from typing import TypeVar
from typing_extensions import ParamSpec
first_prompt = False
P = ParamSpec("P")
T = TypeVar("T")
def only_after_first_prompt(value_before=None):
def only_after_first_prompt(
value_before: T | None = None,
) -> Callable[[Callable[P, T]], Callable[P, T | None]]:
"""
Decorator to prevent a function from running before the first prompt was displayed.
The 'value_before' parameter can be used to specify the value that is
returned if the function is called before the first prompt was displayed.
"""
def decorator(func):
def decorator(func: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any | None:
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None:
if first_prompt:
return func(*args, **kwargs)
else:

@ -7,11 +7,10 @@ from __future__ import annotations
import collections
import re
import typing
from dataclasses import dataclass
from typing import Any
from typing import DefaultDict
from typing import List
from typing import Tuple
from typing import Union
import capstone
@ -83,7 +82,7 @@ VariableInstructionSizeMax = {
# emulated to the last time the process stopped. This allows use to skips a handful of instruction, but still retain the cache
# Any larger changes of the program counter will cause the cache to reset.
next_addresses_cache: set[int] = set()
next_addresses_cache: Set[int] = set()
# Register GDB event listeners for all stop events
@ -266,14 +265,14 @@ def get(
enhance=True,
from_cache=False,
put_cache=False,
) -> list[PwndbgInstruction]:
) -> List[PwndbgInstruction]:
address = int(address)
# Dont disassemble if there's no memory
if not pwndbg.gdblib.memory.peek(address):
return []
retval: list[PwndbgInstruction] = []
retval: List[PwndbgInstruction] = []
for _ in range(instructions):
i = get_one_instruction(
address, emu, enhance=enhance, from_cache=from_cache, put_cache=put_cache
@ -326,7 +325,7 @@ first_time_emulate = True
# Return (list of PwndbgInstructions, index in list where instruction.address = passed in address)
def near(
address, instructions=1, emulate=False, show_prev_insns=True, use_cache=False, linear=False
) -> tuple[list[PwndbgInstruction], int]:
) -> Tuple[List[PwndbgInstruction], int]:
"""
Disasms instructions near given `address`. Passing `emulate` makes use of
unicorn engine to emulate instructions to predict branches that will be taken.
@ -365,7 +364,7 @@ def near(
if current is None:
return ([], -1)
insns: list[PwndbgInstruction] = []
insns: List[PwndbgInstruction] = []
# Get previously executed instructions from the cache.
if DEBUG_ENHANCEMENT:

@ -1,6 +1,7 @@
from __future__ import annotations
from typing import Callable
from typing import Dict
from capstone import * # noqa: F403
from capstone.arm64 import * # noqa: F403
@ -17,7 +18,7 @@ class DisassemblyAssistant(pwndbg.disasm.arch.DisassemblyAssistant):
def __init__(self, architecture: str) -> None:
super().__init__(architecture)
self.annotation_handlers: dict[int, Callable[[PwndbgInstruction, Emulator], None]] = {
self.annotation_handlers: Dict[int, Callable[[PwndbgInstruction, Emulator], None]] = {
# MOV
ARM64_INS_MOV: self.generic_register_destination,
# ADR

@ -1,6 +1,7 @@
from __future__ import annotations
from typing import Callable
from typing import Dict
import gdb
from capstone import * # noqa: F403
@ -120,7 +121,7 @@ class DisassemblyAssistant:
if architecture is not None:
self.assistants[architecture] = self
self.op_handlers: dict[
self.op_handlers: Dict[
int, Callable[[PwndbgInstruction, EnhancedOperand, Emulator], int | None]
] = {
CS_OP_IMM: self.parse_immediate, # Return immediate value
@ -131,7 +132,7 @@ class DisassemblyAssistant:
# Return a string corresponding to operand. Used to reduce code duplication while printing
# REG type wil return register name, "RAX"
self.op_names: dict[int, Callable[[PwndbgInstruction, EnhancedOperand], str | None]] = {
self.op_names: Dict[int, Callable[[PwndbgInstruction, EnhancedOperand], str | None]] = {
CS_OP_IMM: self.immediate_string,
CS_OP_REG: self.register_string,
CS_OP_MEM: self.memory_string,
@ -423,7 +424,7 @@ class DisassemblyAssistant:
operand: EnhancedOperand,
emu: Emulator,
read_size: int = None,
) -> list[int]:
) -> List[int]:
"""
Dereference an address recursively - takes into account emulation.
@ -483,7 +484,7 @@ class DisassemblyAssistant:
return [address]
# Dispatch to the appropriate format handler. Pass the list returned by `telescope()` to this function
def telescope_format_list(self, addresses: list[int], limit: int, emu: Emulator) -> str:
def telescope_format_list(self, addresses: List[int], limit: int, emu: Emulator) -> str:
# It is assumed proper checks have been made BEFORE calling this function so that pwndbg.chain.format
# will return values accurate to the program state at the time of instruction executing.

@ -2,6 +2,9 @@ from __future__ import annotations
import typing
from enum import Enum
from typing import Dict
from typing import List
from typing import Set
from typing import TypedDict
import gdb
@ -43,7 +46,7 @@ from capstone.x86 import X86Op
# Architecture specific instructions that mutate the instruction pointer unconditionally
# The Capstone RET and CALL groups are also used to filter CALL and RET types when we check for unconditional jumps,
# so we don't need to manually specify those for each architecture
UNCONDITIONAL_JUMP_INSTRUCTIONS: dict[int, set[int]] = {
UNCONDITIONAL_JUMP_INSTRUCTIONS: Dict[int, Set[int]] = {
CS_ARCH_X86: {X86_INS_JMP},
CS_ARCH_MIPS: {MIPS_INS_J, MIPS_INS_JR, MIPS_INS_JAL, MIPS_INS_JALR, MIPS_INS_BAL, MIPS_INS_B},
CS_ARCH_SPARC: {SPARC_INS_JMP, SPARC_INS_JMPL},
@ -123,7 +126,7 @@ class PwndbgInstruction:
Ex: 'RAX, RDX'
"""
self.groups: list[int] = cs_insn.groups
self.groups: List[int] = cs_insn.groups
"""
Capstone instruction groups that we belong to.
Groups that apply to all architectures: CS_GRP_INVALID | CS_GRP_JUMP | CS_GRP_CALL | CS_GRP_RET | CS_GRP_INT | CS_GRP_IRET | CS_GRP_PRIVILEGE | CS_GRP_BRANCH_RELATIVE
@ -143,7 +146,7 @@ class PwndbgInstruction:
if self.cs_insn._cs.syntax == CS_OPT_SYNTAX_ATT:
self.cs_insn.operands.reverse()
self.operands: list[EnhancedOperand] = [EnhancedOperand(op) for op in self.cs_insn.operands]
self.operands: List[EnhancedOperand] = [EnhancedOperand(op) for op in self.cs_insn.operands]
# ***********
# The following member variables are set during instruction enhancement

@ -13,6 +13,8 @@
from __future__ import annotations
from typing import Callable
from typing import Dict
from typing import List
from capstone import * # noqa: F403
from capstone.mips import * # noqa: F403
@ -31,7 +33,7 @@ def to_signed(unsigned: int):
return unsigned - ((unsigned & 0x80000000) << 1)
CONDITION_RESOLVERS: dict[int, Callable[[list[int]], bool]] = {
CONDITION_RESOLVERS: Dict[int, Callable[[List[int]], bool]] = {
MIPS_INS_BEQZ: lambda ops: ops[0] == 0,
MIPS_INS_BNEZ: lambda ops: ops[0] != 0,
MIPS_INS_BEQ: lambda ops: ops[0] == ops[1],
@ -55,7 +57,7 @@ class DisassemblyAssistant(pwndbg.disasm.arch.DisassemblyAssistant):
return InstructionCondition.UNDETERMINED
# Not using list comprehension because they run in a separate scope in which super() does not exist
resolved_operands: list[int] = []
resolved_operands: List[int] = []
for op in instruction.operands:
resolved_operands.append(
super().resolve_used_value(op.before_value, instruction, op, emu)

@ -1,6 +1,7 @@
from __future__ import annotations
from typing import Callable
from typing import Dict
from capstone import * # noqa: F403
from capstone.x86 import * # noqa: F403
@ -37,7 +38,7 @@ class DisassemblyAssistant(pwndbg.disasm.arch.DisassemblyAssistant):
def __init__(self, architecture: str) -> None:
super().__init__(architecture)
self.annotation_handlers: dict[int, Callable[[PwndbgInstruction, Emulator], None]] = {
self.annotation_handlers: Dict[int, Callable[[PwndbgInstruction, Emulator], None]] = {
# MOV
X86_INS_MOV: self.handle_mov,
X86_INS_MOVABS: self.handle_mov,

@ -7,6 +7,8 @@ from __future__ import annotations
import binascii
import re
import string
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Tuple
@ -31,14 +33,14 @@ from pwndbg import color
from pwndbg.color.syntax_highlight import syntax_highlight
def parse_consts(u_consts) -> dict[str, int]:
def parse_consts(u_consts) -> Dict[str, int]:
"""
Unicorn "consts" is a python module consisting of a variable definition
for each known entity. We repack it here as a dict for performance.
Maps "UC_*" -> integer value of the constant
"""
consts: dict[str, int] = {}
consts: Dict[str, int] = {}
for name in dir(u_consts):
if name.startswith("UC_"):
consts[name] = getattr(u_consts, name)
@ -47,14 +49,14 @@ def parse_consts(u_consts) -> dict[str, int]:
# Generate Map<Register name, unicorn constant>
def create_reg_to_const_map(
base_consts: dict[str, int], additional_mapping: dict[str, int] = None
) -> dict[str, int]:
base_consts: Dict[str, int], additional_mapping: Dict[str, int] = None
) -> Dict[str, int]:
# base_consts is Map<"UC_*_REG_", constant>
# additional mapping is the manually additions that add to the returned dict
# Create a map of "register_name" -> Capstone ID, for faster lookup
# Example of one field in the mapping for x86: { "RAX": 35 }
reg_to_const: dict[str, int] = {}
reg_to_const: Dict[str, int] = {}
r = re.compile(r"^UC_.*_REG_(.*)$")
for k, v in base_consts.items():
@ -311,7 +313,7 @@ class Emulator:
# Recursively dereference memory, return list of addresses
# read_size typically must be either 1, 2, 4, or 8. It dictates the size to read
# Naturally, if it is less than the pointer size, then only one value would be telescoped
def telescope(self, address: int, limit: int, read_size: int = None) -> list[int]:
def telescope(self, address: int, limit: int, read_size: int = None) -> List[int]:
read_size = read_size if read_size is not None else pwndbg.gdblib.arch.ptrsize
result = [address]
@ -342,7 +344,7 @@ class Emulator:
return self.format_telescope_list(address_list, limit)
def format_telescope_list(
self, chain: list[int], limit: int, enhance_string_len: int = None
self, chain: List[int], limit: int, enhance_string_len: int = None
) -> str:
# Code is near identical to pwndbg.chain.format, but takes into account reading from
# the emulator's memory when necessary
@ -475,7 +477,7 @@ class Emulator:
else:
return E.integer(pwndbg.enhance.int_str(intval0))
retval_final: tuple[str] = tuple(filter(lambda x: x is not None, retval))
retval_final: Tuple[str] = tuple(filter(lambda x: x is not None, retval))
if len(retval_final) == 0:
return E.unknown("???")

@ -10,6 +10,7 @@ supplemental information sources (e.g. active IDA Pro connection).
from __future__ import annotations
import string
from typing import Tuple
import pwndbg.color.enhance as E
import pwndbg.color.memory
@ -37,7 +38,7 @@ def format_small_int(value: int) -> str:
return hex(value & pwndbg.gdblib.arch.ptrmask)
def format_small_int_pair(first: int, second: int) -> tuple[str, str]:
def format_small_int_pair(first: int, second: int) -> Tuple[str, str]:
if first < 10 and second < 10:
return (str(first), str(second))
else:

@ -4,13 +4,14 @@
from __future__ import annotations
import re
from types import ModuleType
import gdb
from pwndbg.gdblib import arch as arch_mod
from pwndbg.gdblib import config as config_mod
from pwndbg.gdblib.arch import arch
from pwndbg.gdblib.config import config
from pwndbg.gdblib.arch import arch as arch
from pwndbg.gdblib.config import config as config
regs = None

@ -1,18 +1,19 @@
from __future__ import annotations
import functools
from typing import Any
from typing import Callable
from typing import Optional
from typing import TypeVar
import gdb
from typing_extensions import ParamSpec
import pwndbg.color.message as M
P = ParamSpec("P")
D = TypeVar("D")
T = TypeVar("T")
abi = None
abi: str | None = None
linux = False
@ -51,15 +52,15 @@ def update() -> None:
def LinuxOnly(
default: Optional[Any] = None,
) -> Callable[[Callable[..., T]], Callable[..., Optional[T]]]:
default: D = None,
) -> Callable[[Callable[P, T]], Callable[P, T | D | None]]:
"""Create a decorator that the function will be called when ABI is Linux.
Otherwise, return `default`.
"""
def decorator(func: Callable[..., T]) -> Callable[..., Optional[T]]:
def decorator(func: Callable[P, T | None]) -> Callable[P, T | D | None]:
@functools.wraps(func)
def caller(*args: Any, **kwargs: Any) -> Optional[T]:
def caller(*args: P.args, **kwargs: P.kwargs) -> T | D | None:
if linux:
return func(*args, **kwargs)
else:

@ -23,6 +23,7 @@ Structure = ctypes.LittleEndianStructure # default Structure type
@pwndbg.gdblib.events.start
@pwndbg.gdblib.events.new_objfile
def update() -> None:
global Structure
if pwndbg.gdblib.arch.endian == "little":
Structure = ctypes.LittleEndianStructure
else:

@ -11,9 +11,12 @@ from __future__ import annotations
import ctypes
import importlib
import sys
from collections import namedtuple
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Tuple
from typing import TypeVar
from typing import Union
import gdb
from elftools.elf.constants import SH_FLAGS
@ -24,11 +27,13 @@ from elftools.elf.relocation import RelocationSection
import pwndbg.auxv
import pwndbg.gdblib.abi
import pwndbg.gdblib.arch
import pwndbg.gdblib.ctypes
import pwndbg.gdblib.events
import pwndbg.gdblib.file
import pwndbg.gdblib.info
import pwndbg.gdblib.memory
import pwndbg.gdblib.proc
import pwndbg.gdblib.qemu
import pwndbg.gdblib.symbol
import pwndbg.gdblib.vmmap
import pwndbg.lib.cache
@ -43,12 +48,14 @@ ET_EXEC, ET_DYN = 2, 3
module = sys.modules[__name__]
class ELFInfo(namedtuple("ELFInfo", "header sections segments")):
class ELFInfo(NamedTuple):
"""
ELF metadata and structures.
"""
__slots__ = ()
header: Dict[str, int | str]
sections: List[Dict[str, int | str]]
segments: List[Dict[str, int | str]]
@property
def is_pic(self) -> bool:
@ -59,13 +66,14 @@ class ELFInfo(namedtuple("ELFInfo", "header sections segments")):
return self.is_pic
Ehdr: Union[pwndbg.lib.elftypes.Elf32_Ehdr, pwndbg.lib.elftypes.Elf64_Ehdr]
Phdr: Union[pwndbg.lib.elftypes.Elf32_Phdr, pwndbg.lib.elftypes.Elf64_Phdr]
Ehdr = Union[pwndbg.lib.elftypes.Elf32_Ehdr, pwndbg.lib.elftypes.Elf64_Ehdr]
Phdr = Union[pwndbg.lib.elftypes.Elf32_Phdr, pwndbg.lib.elftypes.Elf64_Phdr]
@pwndbg.gdblib.events.start
@pwndbg.gdblib.events.new_objfile
def update() -> None:
global Ehdr, Phdr
importlib.reload(pwndbg.lib.elftypes)
if pwndbg.gdblib.arch.ptrsize == 4:
@ -80,8 +88,14 @@ def update() -> None:
update()
T = TypeVar(
"T",
Union[pwndbg.lib.elftypes.Elf32_Ehdr, pwndbg.lib.elftypes.Elf64_Ehdr],
Union[pwndbg.lib.elftypes.Elf32_Phdr, pwndbg.lib.elftypes.Elf64_Phdr],
)
def read(typ, address, blob=None):
def read(typ: T, address: int, blob: bytearray | None = None) -> T:
size = ctypes.sizeof(typ)
if not blob:
@ -138,20 +152,21 @@ def get_elf_info_rebased(filepath: str, vaddr: int) -> ELFInfo:
# silently ignores "wrong" vaddr supplied for non-PIE ELF
load = vaddr if raw_info.is_pic else 0
headers = dict(raw_info.header)
headers["e_entry"] += load
headers["e_entry"] += load # type: ignore[operator]
segments = []
segments: List[Dict[str, int | str]] = []
for seg in raw_info.segments:
s = dict(seg)
for vaddr_attr in ["p_vaddr", "x_vaddr_mem_end", "x_vaddr_file_end"]:
s[vaddr_attr] += load
assert isinstance(headers[vaddr_attr], int)
s[vaddr_attr] += load # type: ignore[operator]
segments.append(s)
sections = []
sections: List[Dict[str, int | str]] = []
for sec in raw_info.sections:
s = dict(sec)
for vaddr_attr in ["sh_addr", "x_addr_mem_end", "x_addr_file_end"]:
s[vaddr_attr] += load
s[vaddr_attr] += load # type: ignore[operator]
sections.append(s)
return ELFInfo(headers, sections, segments)
@ -166,7 +181,7 @@ def get_containing_segments(elf_filepath: str, elf_loadaddr: int, vaddr: int):
if isinstance(seg["p_type"], int) or ("LOAD" not in seg["p_type"] and seg["p_filesz"] == 0):
continue
# disregard segments not containing vaddr
if vaddr < seg["p_vaddr"] or vaddr >= seg["x_vaddr_mem_end"]:
if vaddr < seg["p_vaddr"] or vaddr >= seg["x_vaddr_mem_end"]: # type: ignore[operator]
continue
segments.append(dict(seg))
return segments
@ -180,7 +195,7 @@ def get_containing_sections(elf_filepath: str, elf_loadaddr: int, vaddr: int):
if sec["sh_flags"] & SH_FLAGS.SHF_ALLOC == 0:
continue
# disregard sections that do not contain vaddr
if vaddr < sec["sh_addr"] or vaddr >= sec["x_addr_mem_end"]:
if vaddr < sec["sh_addr"] or vaddr >= sec["x_addr_mem_end"]: # type: ignore[operator]
continue
sections.append(dict(sec))
return sections
@ -220,7 +235,7 @@ def dump_relocations_by_section_name(
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.cache.cache_until("start")
def exe():
def exe() -> Ehdr | None:
"""
Return a loaded ELF header object pointing to the Ehdr of the
main executable.
@ -228,6 +243,7 @@ def exe():
e = entry()
if e:
return load(e)
return None
@pwndbg.gdblib.proc.OnlyWhenRunning
@ -256,7 +272,7 @@ def entry() -> int:
# Try common names
for name in ["_start", "start", "__start", "main"]:
try:
return pwndbg.gdblib.symbol.address(name)
return pwndbg.gdblib.symbol.address(name) or 0
except gdb.error:
pass
@ -264,7 +280,7 @@ def entry() -> int:
return 0
def load(pointer: int):
def load(pointer: int) -> Ehdr | None:
return get_ehdr(pointer)[1]
@ -277,7 +293,7 @@ def reset_ehdr_type_loaded() -> None:
ehdr_type_loaded = 0
def get_ehdr(pointer: int):
def get_ehdr(pointer: int) -> Tuple[int | None, Ehdr | None]:
"""
Returns an ehdr object for the ELF pointer points into.
@ -327,17 +343,17 @@ def get_ehdr(pointer: int):
ei_class = pwndbg.gdblib.memory.byte(base + 4)
# Find out where the section headers start
Elfhdr = read(Ehdr, base)
Elfhdr: Elf32_Ehdr | Elf64_Ehdr | None = read(Ehdr, base) # type: ignore[type-var]
return ei_class, Elfhdr
def get_phdrs(pointer):
def get_phdrs(pointer: int):
"""
Returns a tuple containing (phnum, phentsize, gdb.Value),
where the gdb.Value object is an ELF Program Header with
the architecture-appropriate structure type.
"""
ei_class, Elfhdr = get_ehdr(pointer)
_, Elfhdr = get_ehdr(pointer)
if Elfhdr is None:
return (0, 0, None)
@ -346,11 +362,11 @@ def get_phdrs(pointer):
phoff = Elfhdr.e_phoff
phentsize = Elfhdr.e_phentsize
x = (phnum, phentsize, read(Phdr, Elfhdr.address + phoff))
x = (phnum, phentsize, read(Phdr, Elfhdr.address + phoff)) # type: ignore[type-var]
return x
def iter_phdrs(ehdr):
def iter_phdrs(ehdr: Ehdr):
if not ehdr:
return
@ -392,7 +408,7 @@ def map(pointer: int, objfile: str = "") -> Tuple[pwndbg.lib.memory.Page, ...]:
return map_inner(ei_class, ehdr, objfile)
def map_inner(ei_class, ehdr, objfile: str) -> Tuple[pwndbg.lib.memory.Page, ...]:
def map_inner(ei_class: int, ehdr: Ehdr, objfile: str) -> Tuple[pwndbg.lib.memory.Page, ...]:
if not ehdr:
return ()

@ -11,15 +11,19 @@ from functools import partial
from functools import wraps
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Set
from typing import TypeVar
import gdb
from typing_extensions import ParamSpec
from pwndbg.gdblib.config import config
debug = config.add_param("debug-events", False, "display internal event debugging info")
P = ParamSpec("P")
T = TypeVar("T")
@ -73,7 +77,7 @@ gdb.events.start = StartEvent()
# In order to support reloading, we must be able to re-fire
# all 'objfile' and 'stop' events.
registered: dict[Any, List[Callable[..., Any]]] = {
registered: Dict[Any, List[Callable[..., Any]]] = {
gdb.events.exited: [],
gdb.events.cont: [],
gdb.events.new_objfile: [],
@ -90,15 +94,15 @@ registered: dict[Any, List[Callable[..., Any]]] = {
# objects are loaded. This greatly slows down the debugging session.
# In order to combat this, we keep track of which objfiles have been loaded
# this session, and only emit objfile events for each *new* file.
objfile_cache: dict[str, set[str]] = {}
objfile_cache: Dict[str, Set[str]] = {}
def connect(func: Callable[..., T], event_handler: Any, name: str = "") -> Callable[..., T]:
def connect(func: Callable[P, T], event_handler: Any, name: str = "") -> Callable[P, T]:
if debug:
print("Connecting", func.__name__, event_handler)
@wraps(func)
def caller(*a: Any) -> None:
def caller(*a: P.args, **kw: P.kwargs) -> None:
if debug:
sys.stdout.write(f"{name!r} {func.__module__}.{func.__name__} {a!r}\n")
@ -115,6 +119,8 @@ def connect(func: Callable[..., T], event_handler: Any, name: str = "") -> Calla
objfile_cache[path] = dispatched
try:
# Don't pass the event along to the decorated function.
# This is because there are functions with multiple event decorators
func()
except Exception as e:
import pwndbg.exception
@ -127,44 +133,42 @@ def connect(func: Callable[..., T], event_handler: Any, name: str = "") -> Calla
return func
def exit(func: Callable[..., T]) -> Callable[..., T]:
def exit(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.exited, "exit")
def cont(func: Callable[..., T]) -> Callable[..., T]:
def cont(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.cont, "cont")
def new_objfile(func: Callable[..., T]) -> Callable[..., T]:
def new_objfile(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.new_objfile, "obj")
def stop(func: Callable[..., T]) -> Callable[..., T]:
def stop(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.stop, "stop")
def start(func: Callable[..., T]) -> Callable[..., T]:
def start(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.start, "start")
def thread(func: Callable[..., T]) -> Callable[..., T]:
def thread(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.new_thread, "thread")
before_prompt = partial(connect, event_handler=gdb.events.before_prompt, name="before_prompt")
def reg_changed(func: Callable[..., T]) -> Callable[..., T]:
def reg_changed(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.register_changed, "reg_changed")
def mem_changed(func: Callable[..., T]) -> Callable[..., T]:
def mem_changed(func: Callable[[], T]) -> Callable[[], T]:
return connect(func, gdb.events.memory_changed, "mem_changed")
# TODO/FIXME: type ofile with gdb.NewObjFileEvent | None = None
# after https://github.com/python/typeshed/issues/11208 is resolved
def log_objfiles(ofile=None) -> None:
def log_objfiles(ofile: gdb.NewObjFileEvent | None = None) -> None:
if not (debug and ofile):
return None

@ -52,9 +52,13 @@ from __future__ import annotations
from typing import Dict
import gdb
from sortedcontainers import SortedDict # type: ignore # noqa: PGH003
from sortedcontainers import SortedDict
import pwndbg.gdblib
import pwndbg.gdblib.symbol
import pwndbg.heap
import pwndbg.heap.ptmalloc
import pwndbg.lib.cache
from pwndbg.color import message
LIBC_NAME = "libc.so.6"
@ -63,7 +67,7 @@ CALLOC_NAME = "calloc"
REALLOC_NAME = "realloc"
FREE_NAME = "free"
last_issue = None
last_issue: str | None = None
# Useful to track possbile collision errors.
PRINT_DEBUG = False
@ -84,13 +88,6 @@ def is_enabled() -> bool:
return any(installed)
def _basename(val) -> None:
"""
Returns the last component of a path.
"""
val.split("/")[-1]
def resolve_address(name: str) -> int | None:
"""
Checks whether a given symbol is available and part of libc, and returns its
@ -126,7 +123,7 @@ def resolve_address(name: str) -> int | None:
class FreeChunkWatchpoint(gdb.Breakpoint):
def __init__(self, chunk, tracker) -> None:
def __init__(self, chunk: Chunk, tracker: Tracker) -> None:
self.chunk = chunk
self.tracker = tracker
@ -162,7 +159,7 @@ class FreeChunkWatchpoint(gdb.Breakpoint):
class AllocChunkWatchpoint(gdb.Breakpoint):
def __init__(self, chunk) -> None:
def __init__(self, chunk: Chunk) -> None:
self.chunk = chunk
super().__init__(f"*(char[{chunk.size}]*){chunk.address:#x}", internal=True)
@ -171,7 +168,7 @@ class AllocChunkWatchpoint(gdb.Breakpoint):
class Chunk:
def __init__(self, address, size, requested_size, flags) -> None:
def __init__(self, address: int, size: int, requested_size: int, flags: int) -> None:
self.address = address
self.size = size
self.requested_size = requested_size
@ -180,8 +177,8 @@ class Chunk:
class Tracker:
def __init__(self) -> None:
self.free_chunks = SortedDict()
self.alloc_chunks = SortedDict()
self.free_chunks: SortedDict[int, Chunk] = SortedDict()
self.alloc_chunks: SortedDict[int, Chunk] = SortedDict()
self.free_watchpoints: Dict[int, FreeChunkWatchpoint] = {}
self.memory_management_calls: Dict[int, bool] = {}
@ -192,7 +189,7 @@ class Tracker:
else:
return self.memory_management_calls[thread]
def enter_memory_management(self, name) -> None:
def enter_memory_management(self, name: str) -> None:
thread = gdb.selected_thread().global_num
# We don't support re-entry.
@ -212,7 +209,7 @@ class Tracker:
self.memory_management_calls[thread] = False
def malloc(self, chunk) -> None:
def malloc(self, chunk: Chunk) -> None:
# malloc()s may arbitrarily change the structure of freed blocks, to the
# point our chunk maps may become invalid, so, we update them here if
# anything looks wrong.
@ -237,6 +234,7 @@ class Tracker:
lo_heap = pwndbg.heap.ptmalloc.Heap(lo_addr)
hi_heap = pwndbg.heap.ptmalloc.Heap(hi_addr - 1)
assert lo_heap.arena is not None and hi_heap.arena is not None
# TODO: Can this ever actually fail in real world use?
#
@ -269,6 +267,7 @@ class Tracker:
# the heap in the range of affected chunks, and add the ones that
# are free.
allocator = pwndbg.heap.current
assert isinstance(allocator, pwndbg.heap.ptmalloc.GlibcMemoryAllocator)
bins_list = [
allocator.fastbins(lo_heap.arena.address),
allocator.smallbins(lo_heap.arena.address),
@ -317,7 +316,7 @@ class Tracker:
self.alloc_chunks[chunk.address] = chunk
def free(self, address) -> bool:
def free(self, address: int) -> bool:
if address not in self.alloc_chunks:
return False
chunk = self.alloc_chunks[address]

@ -5,14 +5,13 @@ import math
import re
from abc import ABC
from abc import abstractmethod
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import TypeVar
import gdb
from typing_extensions import ParamSpec
import pwndbg.color.message as M
import pwndbg.gdblib.memory
@ -22,7 +21,11 @@ import pwndbg.lib.cache
import pwndbg.lib.kernel.kconfig
import pwndbg.lib.kernel.structs
_kconfig: pwndbg.lib.kernel.kconfig.Kconfig = None
_kconfig: pwndbg.lib.kernel.kconfig.Kconfig | None = None
P = ParamSpec("P")
D = TypeVar("D")
T = TypeVar("T")
def BIT(shift: int):
@ -40,10 +43,10 @@ def has_debug_syms() -> bool:
# NOTE: This implies requires_debug_syms(), as it is needed for kconfig() to return non-None
def requires_kconfig(default: Any = None) -> Callable[..., Any]:
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
def requires_kconfig(default: D = None) -> Callable[[Callable[P, T]], Callable[P, T | D]]:
def decorator(f: Callable[P, T]) -> Callable[P, T | D]:
@functools.wraps(f)
def func(*args, **kwargs):
def func(*args: P.args, **kwargs: P.kwargs) -> T | D:
if kconfig():
return f(*args, **kwargs)
@ -59,10 +62,10 @@ def requires_kconfig(default: Any = None) -> Callable[..., Any]:
return decorator
def requires_debug_syms(default: Any = None) -> Callable[..., Any]:
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
def requires_debug_syms(default: D = None) -> Callable[[Callable[P, T]], Callable[P, T | D]]:
def decorator(f: Callable[P, T]) -> Callable[P, T | D]:
@functools.wraps(f)
def func(*args, **kwargs):
def func(*args: P.args, **kwargs: P.kwargs) -> T | D:
if has_debug_syms():
return f(*args, **kwargs)
@ -85,7 +88,7 @@ def nproc() -> int:
@requires_debug_syms(default={})
def load_kconfig() -> pwndbg.lib.kernel.kconfig.Kconfig:
def load_kconfig() -> pwndbg.lib.kernel.kconfig.Kconfig | None:
config_start = pwndbg.gdblib.symbol.address("kernel_config_data")
config_end = pwndbg.gdblib.symbol.address("kernel_config_data_end")
if config_start is None or config_end is None:
@ -97,7 +100,7 @@ def load_kconfig() -> pwndbg.lib.kernel.kconfig.Kconfig:
@pwndbg.lib.cache.cache_until("start")
def kconfig() -> pwndbg.lib.kernel.kconfig.Kconfig:
def kconfig() -> pwndbg.lib.kernel.kconfig.Kconfig | None:
global _kconfig
if _kconfig is None:
_kconfig = load_kconfig()
@ -122,7 +125,7 @@ def kversion() -> str:
@requires_debug_syms()
@pwndbg.lib.cache.cache_until("start")
def krelease() -> tuple[int, ...]:
def krelease() -> Tuple[int, ...]:
match = re.search(r"Linux version (\d+)\.(\d+)(?:\.(\d+))?", kversion())
if match:
return tuple(int(x) for x in match.groups() if x)

@ -1,6 +1,8 @@
from __future__ import annotations
from typing import Generator
from typing import List
from typing import Set
import gdb
@ -65,7 +67,7 @@ _flags = {
}
def get_flags_list(flags: int) -> list[str]:
def get_flags_list(flags: int) -> List[str]:
return [flag_name for flag_name, mask in _flags.items() if flags & mask]
@ -139,7 +141,7 @@ class SlabCache:
return int(self._slab_cache["align"])
@property
def flags(self) -> list[str]:
def flags(self) -> List[str]:
return get_flags_list(int(self._slab_cache["flags"]))
@property
@ -210,7 +212,7 @@ class CpuCache:
return Slab(_slab.dereference(), self, self.slab_cache)
@property
def partial_slabs(self) -> list[Slab]:
def partial_slabs(self) -> List[Slab]:
partial_slabs = []
cur_slab = self._cpu_cache["partial"]
while cur_slab:
@ -231,7 +233,7 @@ class NodeCache:
return int(self._node_cache)
@property
def partial_slabs(self) -> list[Slab]:
def partial_slabs(self) -> List[Slab]:
ret = []
for slab in for_each_entry(self._node_cache["partial"], "struct slab", "slab_list"):
ret.append(Slab(slab.dereference(), None, self.slab_cache, is_partial=True))
@ -309,14 +311,14 @@ class Slab:
)
@property
def freelists(self) -> list[Freelist]:
def freelists(self) -> List[Freelist]:
freelists = [self.freelist]
if not self.is_partial:
freelists.append(self.cpu_cache.freelist)
return freelists
@property
def free_objects(self) -> set[int]:
def free_objects(self) -> Set[int]:
return {obj for freelist in self.freelists for obj in freelist}

@ -6,6 +6,7 @@ from __future__ import annotations
import re
from typing import Dict
from typing import Set
from typing import Union
import gdb
@ -14,6 +15,7 @@ import pwndbg.gdblib.arch
import pwndbg.gdblib.events
import pwndbg.gdblib.qemu
import pwndbg.gdblib.typeinfo
import pwndbg.gdblib.vmmap
import pwndbg.lib.cache
import pwndbg.lib.memory
from pwndbg.lib.memory import PAGE_MASK
@ -322,8 +324,8 @@ def find_upper_boundary(addr: int, max_pages: int = 1024) -> int:
"""
addr = pwndbg.lib.memory.page_align(int(addr))
try:
for i in range(max_pages):
pwndbg.gdblib.memory.read(addr, 1)
for _ in range(max_pages):
read(addr, 1)
# import sys
# sys.stdout.write(hex(addr) + '\n')
addr += PAGE_SIZE
@ -349,7 +351,7 @@ def find_lower_boundary(addr: int, max_pages: int = 1024) -> int:
addr = pwndbg.lib.memory.page_align(int(addr))
try:
for _ in range(max_pages):
pwndbg.gdblib.memory.read(addr, 1)
read(addr, 1)
addr -= PAGE_SIZE
# Sanity check (see comment in find_upper_boundary)
@ -370,8 +372,8 @@ def update_min_addr() -> None:
def fetch_struct_as_dictionary(
struct_name: str,
struct_address: int,
include_only_fields: set[str] = set(),
exclude_fields: set[str] = set(),
include_only_fields: Set[str] = set(),
exclude_fields: Set[str] = set(),
) -> GdbDict:
struct_type = gdb.lookup_type("struct " + struct_name)
fetched_struct = poi(struct_type, struct_address)
@ -381,8 +383,8 @@ def fetch_struct_as_dictionary(
def pack_struct_into_dictionary(
fetched_struct: gdb.Value,
include_only_fields: set[str] = set(),
exclude_fields: set[str] = set(),
include_only_fields: Set[str] = set(),
exclude_fields: Set[str] = set(),
) -> GdbDict:
struct_as_dictionary = {}

@ -1,5 +1,7 @@
from __future__ import annotations
from typing import List
import gdb
from capstone import * # noqa: F403
@ -72,7 +74,7 @@ opcode_separator_bytes = pwndbg.gdblib.config.add_param(
def nearpc(
pc: int = None, lines: int = None, emulate=False, repeat=False, use_cache=False, linear=False
) -> list[str]:
) -> List[str]:
"""
Disassemble near a specified address.
@ -86,7 +88,7 @@ def nearpc(
# that would require a larger refactor
pc = nearpc.next_pc
result: list[str] = []
result: List[str] = []
if pc is not None:
pc = gdb.Value(pc).cast(pwndbg.gdblib.typeinfo.pvoid)
@ -140,7 +142,7 @@ def nearpc(
# Gather all addresses and symbols for each instruction
# Ex: <main+43>
symbols = [pwndbg.gdblib.symbol.get(i.address) for i in instructions]
addresses: list[str] = ["%#x" % i.address for i in instructions]
addresses: List[str] = ["%#x" % i.address for i in instructions]
nearpc.next_pc = instructions[-1].address + instructions[-1].size if instructions else 0

@ -7,6 +7,8 @@ import re
import subprocess
from enum import Enum
from typing import Any
from typing import Dict
from typing import Tuple
import gdb
from tabulate import tabulate
@ -182,13 +184,13 @@ class Lambda:
self.deref_count -= 1
return self
def evaluate(self, context: dict[Any, Any]) -> int | Lambda:
def evaluate(self, context: Dict[Any, Any]) -> int | Lambda:
if self.deref_count > 0 or (self.obj and self.obj not in context):
raise ValueError(f"Can't eval {self}")
return context[self.obj] + self.immi
@staticmethod
def parse(argument: str, predefined: dict[Any, Any] = {}) -> int | Lambda:
def parse(argument: str, predefined: Dict[Any, Any] = {}) -> int | Lambda:
if not argument or argument == "!":
return 0
try:
@ -215,7 +217,7 @@ class Lambda:
return obj
@staticmethod
def mem_obj(arg: str) -> tuple[str, int]:
def mem_obj(arg: str) -> Tuple[str, int]:
tokens = re.split(r"[+\-]", arg)
if len(tokens) == 1:
return tokens[0], 0
@ -283,7 +285,7 @@ def run_onegadget() -> str:
return output
def parse_expression(expr: str) -> tuple[int | None, str, str | None]:
def parse_expression(expr: str) -> Tuple[int | None, str, str | None]:
"""
Parse expression, return the result, colorized string and error message
"""
@ -315,7 +317,7 @@ def parse_expression(expr: str) -> tuple[int | None, str, str | None]:
return None, f"{cast}{lambda_expr.color_str}", str(e)
def check_stack_argv(expr: str) -> tuple[CheckSatResult, str]:
def check_stack_argv(expr: str) -> Tuple[CheckSatResult, str]:
"""
Check argv that's on the stack, return the result and the message
"""
@ -367,7 +369,7 @@ def check_stack_argv(expr: str) -> tuple[CheckSatResult, str]:
return SAT, output_msg
def check_non_stack_argv(expr: str) -> tuple[CheckSatResult, str]:
def check_non_stack_argv(expr: str) -> Tuple[CheckSatResult, str]:
"""
Check argv that's not on the stack, return the result and the message
"""
@ -402,7 +404,7 @@ def check_non_stack_argv(expr: str) -> tuple[CheckSatResult, str]:
n += 1
def check_argv(expr: str) -> tuple[CheckSatResult, str]:
def check_argv(expr: str) -> Tuple[CheckSatResult, str]:
"""
Check argv, return the result and the message
"""
@ -411,7 +413,7 @@ def check_argv(expr: str) -> tuple[CheckSatResult, str]:
return check_non_stack_argv(expr)
def check_envp(expr: str) -> tuple[bool, str]:
def check_envp(expr: str) -> Tuple[bool, str]:
"""
Check envp, return the result and the message
"""
@ -446,7 +448,7 @@ def check_envp(expr: str) -> tuple[bool, str]:
n += 1
def check_constraint(constraint: str) -> tuple[CheckSatResult, str]:
def check_constraint(constraint: str) -> Tuple[CheckSatResult, str]:
"""
Parse constraint, return the result and the message
"""
@ -585,7 +587,7 @@ def check_gadget(
def find_gadgets(
show_unsat: bool = False, no_unknown: bool = False, verbose: bool = False
) -> dict[CheckSatResult, int]:
) -> Dict[CheckSatResult, int]:
"""
Find gadgets by parsing the output of onegadget, return there's any valid gadget
"""

@ -9,19 +9,22 @@ from __future__ import annotations
import functools
import sys
from types import ModuleType
from typing import Any
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
from typing import TypeVar
import gdb
from elftools.elf.relocation import Relocation
from typing_extensions import ParamSpec
import pwndbg.gdblib.info
import pwndbg.gdblib.qemu
import pwndbg.lib.cache
import pwndbg.lib.memory
P = ParamSpec("P")
T = TypeVar("T")
pid: int
@ -32,13 +35,17 @@ thread_is_stopped: bool
stopped_with_signal: bool
exe: str | None
binary_base_addr: int
binary_vmmap: tuple[pwndbg.lib.memory.Page, ...]
# dump_elf_data_section: tuple[int, int, bytes] | None
# dump_relocations_by_section_name: tuple[Relocation, ...] | None
binary_vmmap: Tuple[pwndbg.lib.memory.Page, ...]
# dump_elf_data_section: Tuple[int, int, bytes] | None
# dump_relocations_by_section_name: Tuple[Relocation, ...] | None
# get_section_address_by_name: Callable[[str], int]
OnlyWhenRunning: Callable[[Callable[..., T]], Callable[..., T]]
OnlyWhenQemuKernel: Callable[[Callable[..., T]], Callable[..., T]]
OnlyWithArch: Callable[[List[str]], Callable[[Callable[..., T]], Callable[..., Optional[T]]]]
def OnlyWhenRunning(func: Callable[P, T]) -> Callable[P, T | None]: ...
def OnlyWhenQemuKernel(func: Callable[P, T]) -> Callable[P, T]: ...
def OnlyWithArch(
arch_names: List[str],
) -> Callable[[Callable[..., T]], Callable[..., Optional[T]]]: ...
class module(ModuleType):
@ -122,21 +129,27 @@ class module(ModuleType):
@property
@pwndbg.lib.cache.cache_until("start", "stop")
def binary_vmmap(self) -> tuple[pwndbg.lib.memory.Page, ...]:
def binary_vmmap(self) -> Tuple[pwndbg.lib.memory.Page, ...]:
import pwndbg.gdblib.vmmap
return tuple(p for p in pwndbg.gdblib.vmmap.get() if p.objfile == self.exe)
@pwndbg.lib.cache.cache_until("start", "objfile")
def dump_elf_data_section(self) -> tuple[int, int, bytes] | None:
def dump_elf_data_section(self) -> Tuple[int, int, bytes] | None:
"""
Dump .data section of current process's ELF file
"""
import pwndbg.gdblib.elf
return pwndbg.gdblib.elf.dump_section_by_name(self.exe, ".data", try_local_path=True)
@pwndbg.lib.cache.cache_until("start", "objfile")
def dump_relocations_by_section_name(self, section_name: str) -> tuple[Relocation, ...] | None:
def dump_relocations_by_section_name(self, section_name: str) -> Tuple[Relocation, ...] | None:
"""
Dump relocations of a section by section name of current process's ELF file
"""
import pwndbg.gdblib.elf
return pwndbg.gdblib.elf.dump_relocations_by_section_name(
self.exe, section_name, try_local_path=True
)
@ -152,18 +165,18 @@ class module(ModuleType):
return int(line.split()[0], 16)
return 0
def OnlyWhenRunning(self, func: Callable[..., T]) -> Callable[..., T]:
def OnlyWhenRunning(self, func: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(func)
def wrapper(*a: Any, **kw: Any) -> T:
def wrapper(*a: P.args, **kw: P.kwargs) -> T | None:
if self.alive:
return func(*a, **kw)
return None
return wrapper
def OnlyWhenQemuKernel(self, func: Callable[..., T]) -> Callable[..., T]:
def OnlyWhenQemuKernel(self, func: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(func)
def wrapper(*a: Any, **kw: Any) -> T:
def wrapper(*a: P.args, **kw: P.kwargs) -> T | None:
if pwndbg.gdblib.qemu.is_qemu_kernel():
return func(*a, **kw)
return None
@ -172,7 +185,7 @@ class module(ModuleType):
def OnlyWithArch(
self, arch_names: List[str]
) -> Callable[[Callable[..., T]], Callable[..., Optional[T]]]:
) -> Callable[[Callable[P, T]], Callable[P, Optional[T]]]:
"""Decorates function to work only with the specified archictectures."""
for arch in arch_names:
if arch not in pwndbg.gdblib.arch_mod.ARCHS:
@ -180,9 +193,9 @@ class module(ModuleType):
f"OnlyWithArch used with unsupported arch={arch}. Must be one of {', '.join(arch_names)}"
)
def decorator(function: Callable[..., T]) -> Callable[..., Optional[T]]:
def decorator(function: Callable[P, T]) -> Callable[P, Optional[T]]:
@functools.wraps(function)
def _OnlyWithArch(*a: Any, **kw: Any) -> Optional[T]:
def _OnlyWithArch(*a: P.args, **kw: P.kwargs) -> Optional[T]:
if pwndbg.gdblib.arch.name in arch_names:
return function(*a, **kw)
else:

@ -10,10 +10,12 @@ import re
import sys
from types import ModuleType
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Tuple
from typing import cast
import gdb
@ -22,6 +24,7 @@ import pwndbg.gdblib.events
import pwndbg.gdblib.proc
import pwndbg.gdblib.qemu
import pwndbg.gdblib.remote
import pwndbg.gdblib.typeinfo
import pwndbg.lib.cache
from pwndbg.lib.regs import BitFlags
from pwndbg.lib.regs import RegisterSet
@ -60,10 +63,29 @@ PTRACE_ARCH_PRCTL = 30
ARCH_GET_FS = 0x1003
ARCH_GET_GS = 0x1004
gpr: Tuple[str, ...]
common: List[str]
frame: str | None
retaddr: Tuple[str, ...]
flags: Dict[str, BitFlags]
extra_flags: Dict[str, BitFlags]
stack: str
retval: str | None
all: List[str]
changed: List[str]
fsbase: int
gsbase: int
current: RegisterSet
fix: Callable[[str], str]
items: Callable[[], Generator[Tuple[str, Any], None, None]]
previous: Dict[str, int]
last: Dict[str, int]
pc: int | None
class module(ModuleType):
previous: dict[str, int] = {}
last: dict[str, int] = {}
previous: Dict[str, int] = {}
last: Dict[str, int] = {}
@pwndbg.lib.cache.cache_until("stop", "prompt")
def __getattr__(self, attr: str) -> int | None:
@ -72,11 +94,15 @@ class module(ModuleType):
value = gdb_get_register(attr)
if value is None and attr.lower() == "xpsr":
value = gdb_get_register("xPSR")
if value is None:
return None
size = pwndbg.gdblib.typeinfo.unsigned.get(
value.type.sizeof, pwndbg.gdblib.typeinfo.ulong
)
value = value.cast(size)
if attr == "pc" and pwndbg.gdblib.arch.current == "i8086":
if attr == "pc" and pwndbg.gdblib.arch.name == "i8086":
if self.cs is None:
return None
value += self.cs * 16
return int(value) & pwndbg.gdblib.arch.ptrmask
except (ValueError, gdb.error):
@ -91,69 +117,69 @@ class module(ModuleType):
gdb.execute(f"set ${attr} = {val}")
@pwndbg.lib.cache.cache_until("stop", "prompt")
def __getitem__(self, item: str) -> int | None:
def __getitem__(self, item: Any) -> int | None:
if not isinstance(item, str):
print("Unknown register type: %r" % (item))
return None
# e.g. if we're looking for register "$rax", turn it into "rax"
item = item.lstrip("$")
item = getattr(self, item.lower())
item = getattr(self, item.lower(), None)
if item is not None:
item &= pwndbg.gdblib.arch.ptrmask
return item
def __contains__(self, reg) -> bool:
regs = set(reg_sets[pwndbg.gdblib.arch.current]) | {"pc", "sp"}
def __contains__(self, reg: str) -> bool:
regs = set(reg_sets[pwndbg.gdblib.arch.name]) | {"pc", "sp"}
return reg in regs
def __iter__(self):
regs = set(reg_sets[pwndbg.gdblib.arch.current]) | {"pc", "sp"}
def __iter__(self) -> Generator[str, None, None]:
regs = set(reg_sets[pwndbg.gdblib.arch.name]) | {"pc", "sp"}
yield from regs
@property
def current(self) -> RegisterSet:
return reg_sets[pwndbg.gdblib.arch.current]
return reg_sets[pwndbg.gdblib.arch.name]
# TODO: All these should be able to do self.current
@property
def gpr(self) -> Tuple[str, ...]:
return reg_sets[pwndbg.gdblib.arch.current].gpr
return reg_sets[pwndbg.gdblib.arch.name].gpr
@property
def common(self) -> List[str]:
return reg_sets[pwndbg.gdblib.arch.current].common
return reg_sets[pwndbg.gdblib.arch.name].common
@property
def frame(self) -> str:
return reg_sets[pwndbg.gdblib.arch.current].frame
def frame(self) -> str | None:
return reg_sets[pwndbg.gdblib.arch.name].frame
@property
def retaddr(self) -> Tuple[str, ...]:
return reg_sets[pwndbg.gdblib.arch.current].retaddr
return reg_sets[pwndbg.gdblib.arch.name].retaddr
@property
def flags(self) -> Dict[str, BitFlags]:
return reg_sets[pwndbg.gdblib.arch.current].flags
return reg_sets[pwndbg.gdblib.arch.name].flags
@property
def extra_flags(self) -> Dict[str, BitFlags]:
return reg_sets[pwndbg.gdblib.arch.current].extra_flags
return reg_sets[pwndbg.gdblib.arch.name].extra_flags
@property
def stack(self) -> str:
return reg_sets[pwndbg.gdblib.arch.current].stack
return reg_sets[pwndbg.gdblib.arch.name].stack
@property
def retval(self) -> str:
return reg_sets[pwndbg.gdblib.arch.current].retval
def retval(self) -> str | None:
return reg_sets[pwndbg.gdblib.arch.name].retval
@property
def all(self):
regs = reg_sets[pwndbg.gdblib.arch.current]
retval: list[str] = []
def all(self) -> List[str]:
regs = reg_sets[pwndbg.gdblib.arch.name]
retval: List[str] = []
for regset in (
regs.pc,
regs.stack,
@ -187,7 +213,7 @@ class module(ModuleType):
@property
def changed(self) -> List[str]:
delta = []
delta: List[str] = []
for reg, value in self.previous.items():
if self[reg] != value:
delta.append(reg)
@ -218,19 +244,20 @@ class module(ModuleType):
return self._fs_gs_helper("gs_base", ARCH_GET_GS)
@pwndbg.lib.cache.cache_until("stop")
def _fs_gs_helper(self, regname: str, which) -> int:
def _fs_gs_helper(self, regname: str, which: int) -> int:
"""Supports fetching based on segmented addressing, a la fs:[0x30].
Requires ptrace'ing the child directory if i386."""
if pwndbg.gdblib.arch.current == "x86-64":
return int(gdb_get_register(regname))
if pwndbg.gdblib.arch.name == "x86-64":
reg_value = gdb_get_register(regname)
return int(reg_value) if reg_value is not None else 0
# We can't really do anything if the process is remote.
if pwndbg.gdblib.remote.is_remote():
return 0
# Use the lightweight process ID
pid, lwpid, tid = gdb.selected_thread().ptid
_, lwpid, _ = gdb.selected_thread().ptid
# Get the register
ppvoid = ctypes.POINTER(ctypes.c_void_p)
@ -257,7 +284,7 @@ sys.modules[__name__] = module(__name__, "")
@pwndbg.gdblib.events.cont
@pwndbg.gdblib.events.stop
def update_last() -> None:
M: module = sys.modules[__name__]
M: module = cast(module, sys.modules[__name__])
M.previous = M.last
M.last = {k: M[k] for k in M.common}
if pwndbg.gdblib.config.show_retaddr_reg:

@ -7,6 +7,7 @@ binaries do things to remap the stack (e.g. pwnies' postit).
from __future__ import annotations
from typing import Dict
from typing import List
import gdb
@ -42,7 +43,7 @@ def find_upper_stack_boundary(stack_ptr: int, max_pages: int = 1024) -> int:
@pwndbg.lib.cache.cache_until("stop")
def get() -> dict[int, pwndbg.lib.memory.Page]:
def get() -> Dict[int, pwndbg.lib.memory.Page]:
"""
For each running thread, return the known address range for its stack
Returns a dict which should never be modified (since its cached)
@ -79,8 +80,8 @@ def is_executable() -> bool:
return not nx
def _fetch_via_vmmap() -> dict[int, pwndbg.lib.memory.Page]:
stacks: dict[int, pwndbg.lib.memory.Page] = {}
def _fetch_via_vmmap() -> Dict[int, pwndbg.lib.memory.Page]:
stacks: Dict[int, pwndbg.lib.memory.Page] = {}
pages = pwndbg.gdblib.vmmap.get()
@ -114,7 +115,7 @@ def _fetch_via_vmmap() -> dict[int, pwndbg.lib.memory.Page]:
return stacks
def _fetch_via_exploration() -> dict[int, pwndbg.lib.memory.Page]:
def _fetch_via_exploration() -> Dict[int, pwndbg.lib.memory.Page]:
"""
TODO/FIXME: This exploration is not great since it now hits on each stop
(based on how this function is used). Ideally, explored stacks should be
@ -128,7 +129,7 @@ def _fetch_via_exploration() -> dict[int, pwndbg.lib.memory.Page]:
An alternative to this is dumping this functionality completely and this
will be decided hopefully after a next release.
"""
stacks: dict[int, pwndbg.lib.memory.Page] = {}
stacks: Dict[int, pwndbg.lib.memory.Page] = {}
curr_thread = gdb.selected_thread()
for thread in gdb.selected_inferior().threads():

@ -63,11 +63,11 @@ def _get_debug_file_directory() -> str:
return ""
def _set_debug_file_directory(d) -> None:
def _set_debug_file_directory(d: str) -> None:
gdb.execute(f"set debug-file-directory {d}", to_string=True, from_tty=False)
def _add_debug_file_directory(d) -> None:
def _add_debug_file_directory(d: str) -> None:
current = _get_debug_file_directory()
if current:
_set_debug_file_directory(f"{current}:{d}")
@ -80,7 +80,7 @@ if "/usr/lib/debug" not in _get_debug_file_directory():
@pwndbg.lib.cache.cache_until("objfile")
def get(address: int, gdb_only=False) -> str:
def get(address: int | gdb.Value, gdb_only: bool = False) -> str:
"""
Retrieve the name for the symbol located at `address` - either from GDB or from IDA sync
Passing `gdb_only=True`

@ -6,10 +6,11 @@ via GCC.
from __future__ import annotations
import sys
from typing import Dict
from typing import Optional
import gdb
import pwndbg.lib.cache
import pwndbg.lib.gcc
import pwndbg.lib.tempfile
@ -27,13 +28,13 @@ uint8: gdb.Type
uint16: gdb.Type
uint32: gdb.Type
uint64: gdb.Type
unsigned: dict[int, gdb.Type]
unsigned: Dict[int, gdb.Type]
int8: gdb.Type
int16: gdb.Type
int32: gdb.Type
int64: gdb.Type
signed: dict[int, gdb.Type]
signed: Dict[int, gdb.Type]
pvoid: gdb.Type
ppvoid: gdb.Type
@ -130,9 +131,11 @@ def load(name: str) -> Optional[gdb.Type]:
return None
def read_gdbvalue(type_name: str, addr) -> gdb.Value:
def read_gdbvalue(type_name: str, addr: int) -> gdb.Value:
"""Read the memory contents at addr and interpret them as a GDB value with the given type"""
gdb_type = pwndbg.gdblib.typeinfo.load(type_name)
gdb_type = load(type_name)
if gdb_type is None:
raise ValueError(f"Type {type_name} not found")
return gdb.Value(addr).cast(gdb_type.pointer()).dereference()

@ -164,7 +164,7 @@ def get() -> Tuple[pwndbg.lib.memory.Page, ...]:
@pwndbg.lib.cache.cache_until("stop")
def find(address: int | None) -> pwndbg.lib.memory.Page | None:
def find(address: int | gdb.Value | None) -> pwndbg.lib.memory.Page | None:
if address is None:
return None

@ -7,14 +7,17 @@ from __future__ import annotations
import functools
import os
import re
from typing import Any
from typing import Callable
from typing import List
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import cast
import gdb
from elftools.elf.relocation import Relocation
from typing_extensions import ParamSpec
import pwndbg.gdblib.config
import pwndbg.gdblib.elf
import pwndbg.gdblib.file
import pwndbg.gdblib.info
@ -26,6 +29,7 @@ import pwndbg.lib.cache
import pwndbg.search
from pwndbg.color import message
P = ParamSpec("P")
T = TypeVar("T")
safe_lnk = pwndbg.gdblib.config.add_param(
@ -56,13 +60,16 @@ def set_glibc_version() -> None:
@pwndbg.gdblib.proc.OnlyWhenRunning
def get_version() -> tuple[int, ...] | None:
return glibc_version or _get_version() # type: ignore[return-value]
def get_version() -> Tuple[int, ...] | None:
return cast(Union[Tuple[int, ...], None], glibc_version) or _get_version()
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.cache.cache_until("start", "objfile")
def _get_version() -> tuple[int, ...] | None:
def _get_version() -> Tuple[int, ...] | None:
from pwndbg.heap.ptmalloc import GlibcMemoryAllocator
assert isinstance(pwndbg.heap.current, GlibcMemoryAllocator)
if pwndbg.heap.current.libc_has_debug_syms():
addr = pwndbg.gdblib.symbol.address("__libc_version")
if addr is not None:
@ -72,7 +79,7 @@ def _get_version() -> tuple[int, ...] | None:
if not libc_filename:
return None
result = pwndbg.gdblib.elf.dump_section_by_name(libc_filename, ".rodata", try_local_path=True)
if not result:
if result is None:
return None
_, _, data = result
banner_start = data.find(b"GNU C Library")
@ -89,7 +96,7 @@ def get_libc_filename_from_info_sharedlibrary() -> str | None:
"""
Get the filename of the libc by parsing the output of `info sharedlibrary`.
"""
possible_libc_path = []
possible_libc_path: List[str] = []
for path in pwndbg.gdblib.info.sharedlibrary_paths():
basename = os.path.basename(
path[7:] if path.startswith("target:") else path
@ -112,7 +119,7 @@ def get_libc_filename_from_info_sharedlibrary() -> str | None:
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.cache.cache_until("start", "objfile")
def dump_elf_data_section() -> tuple[int, int, bytes] | None:
def dump_elf_data_section() -> Tuple[int, int, bytes] | None:
"""
Dump .data section of libc ELF file
"""
@ -125,7 +132,7 @@ def dump_elf_data_section() -> tuple[int, int, bytes] | None:
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.cache.cache_until("start", "objfile")
def dump_relocations_by_section_name(section_name: str) -> tuple[Relocation, ...] | None:
def dump_relocations_by_section_name(section_name: str) -> Tuple[Relocation, ...] | None:
"""
Dump relocations of a section by section name of libc ELF file
"""
@ -156,9 +163,9 @@ def get_section_address_by_name(section_name: str) -> int:
return 0
def OnlyWhenGlibcLoaded(function: Callable[..., T]) -> Callable[..., T]:
def OnlyWhenGlibcLoaded(function: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(function)
def _OnlyWhenGlibcLoaded(*a: Any, **kw: Any) -> T | None:
def _OnlyWhenGlibcLoaded(*a: P.args, **kw: P.kwargs) -> T | None:
if get_version() is not None:
return function(*a, **kw)
@ -169,10 +176,10 @@ def OnlyWhenGlibcLoaded(function: Callable[..., T]) -> Callable[..., T]:
@OnlyWhenGlibcLoaded
def check_safe_linking():
def check_safe_linking() -> bool:
"""
Safe-linking is a glibc 2.32 mitigation; see:
- https://lanph3re.blogspot.com/2020/08/blog-post.html
- https://research.checkpoint.com/2020/safe-linking-eliminating-a-20-year-old-malloc-exploit-primitive/
"""
return (get_version() >= (2, 32) or safe_lnk) and safe_lnk is not False
return (get_version() >= (2, 32) or safe_lnk) and safe_lnk is not False # type: ignore[return-value]

@ -6,11 +6,13 @@ from typing import Sequence
import gdb
import pwndbg.gdblib.config
import pwndbg.gdblib.events
import pwndbg.gdblib.proc
import pwndbg.gdblib.symbol
import pwndbg.heap.heap
from pwndbg.color import message
current = None
current: pwndbg.heap.heap.MemoryAllocator | None = None
def add_heap_param(

@ -1,10 +1,14 @@
from __future__ import annotations
from typing import Any
import gdb
class MemoryAllocator:
"""Heap abstraction layer."""
def breakpoint(self, event):
def breakpoint(self, event: str) -> gdb.Breakpoint:
"""Enables breakpoints on the specific event.
Arguments:
@ -15,7 +19,7 @@ class MemoryAllocator:
"""
raise NotImplementedError()
def summarize(self, address, **kwargs):
def summarize(self, address: int, **kwargs: Any) -> str:
"""Returns a textual summary of the specified address.
Arguments:
@ -26,7 +30,7 @@ class MemoryAllocator:
"""
raise NotImplementedError()
def containing(self, address):
def containing(self, address: int) -> int:
"""Returns the address of the allocation which contains 'address'.
Arguments:
@ -37,7 +41,7 @@ class MemoryAllocator:
"""
raise NotImplementedError()
def is_initialized(self):
def is_initialized(self) -> bool:
"""Returns whether the allocator is initialized or not.
Returns:
@ -45,7 +49,7 @@ class MemoryAllocator:
"""
raise NotImplementedError()
def libc_has_debug_syms(self):
def libc_has_debug_syms(self) -> bool:
"""Returns whether the libc has debug symbols or not.
Returns:

File diff suppressed because it is too large Load Diff

@ -2,6 +2,10 @@ from __future__ import annotations
import ctypes
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Type
import gdb
@ -12,7 +16,7 @@ import pwndbg.glibc
from pwndbg.gdblib.ctypes import Structure
def request2size(req):
def request2size(req: int) -> int:
if req + SIZE_SZ + MALLOC_ALIGN_MASK < MINSIZE:
return MINSIZE
return (req + SIZE_SZ + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK
@ -29,7 +33,7 @@ GLIBC_VERSION = pwndbg.glibc.get_version()
# TODO: Move these heap constants and macros to elsewhere, because pwndbg/heap/ptmalloc.py also uses them, we are duplicating them here.
SIZE_SZ = pwndbg.gdblib.arch.ptrsize
MINSIZE = pwndbg.gdblib.arch.ptrsize * 4
if pwndbg.gdblib.arch.current == "i386" and GLIBC_VERSION >= (2, 26):
if pwndbg.gdblib.arch.name == "i386" and GLIBC_VERSION >= (2, 26):
# i386 will override it to 16 when GLIBC version >= 2.26
# See https://elixir.bootlin.com/glibc/glibc-2.26/source/sysdeps/i386/malloc-alignment.h#L22
MALLOC_ALIGN = 16
@ -88,9 +92,13 @@ C2GDB_MAPPING = {
# Use correct endian for the dictionary keys
if pwndbg.gdblib.arch.endian == "little":
C2GDB_MAPPING = {k.__ctype_le__: v for k, v in C2GDB_MAPPING.items()}
C2GDB_MAPPING: Dict[Type[ctypes.c_char], gdb.Type] = { # type: ignore[no-redef]
k.__ctype_le__: v for k, v in C2GDB_MAPPING.items()
}
else:
C2GDB_MAPPING = {k.__ctype_be__: v for k, v in C2GDB_MAPPING.items()}
C2GDB_MAPPING: Dict[Type[ctypes.c_char], gdb.Type] = { # type: ignore[no-redef]
k.__ctype_be__: v for k, v in C2GDB_MAPPING.items()
}
class FakeGDBField:
@ -100,14 +108,14 @@ class FakeGDBField:
def __init__(
self,
bitpos,
name: str,
bitpos: int,
name: str | None,
type,
parent_type,
enumval=None,
artificial=False,
is_base_class=False,
bitsize=0,
enumval: int | None = None,
artificial: bool = False,
is_base_class: bool = False,
bitsize: int = 0,
) -> None:
# Note: pwndbg only uses `name` currently
self.bitpos = bitpos
@ -123,7 +131,7 @@ class FakeGDBField:
class CStruct2GDB:
code = gdb.TYPE_CODE_STRUCT
_c_struct: pwndbg.gdblib.ctypes.Structure = None
_c_struct: Type[ctypes.Structure]
def __init__(self, address: int) -> None:
self.address = address
@ -146,7 +154,7 @@ class CStruct2GDB:
"""
return self.read_field(key)
def __eq__(self, other) -> bool:
def __eq__(self, other: Any) -> bool:
return self.address == int(other)
def __str__(self) -> str:
@ -185,11 +193,11 @@ class CStruct2GDB:
return cls
@classmethod
def fields(cls):
def fields(cls) -> List[FakeGDBField]:
"""
Return fields of the struct to make it compatible with the `gdb.Type` interface.
"""
fake_gdb_fields = []
fake_gdb_fields: List[FakeGDBField] = []
for f in cls._c_struct._fields_:
field_name = f[0]
field_type = f[1]
@ -203,7 +211,7 @@ class CStruct2GDB:
return fake_gdb_fields
@classmethod
def keys(cls) -> list[str]:
def keys(cls) -> List[str]:
"""
Return a list of the names of the fields in the struct to make it compatible with the `gdb.Type` interface.
"""
@ -222,7 +230,7 @@ class CStruct2GDB:
"""
return getattr(cls._c_struct, field).offset
def items(self) -> tuple[tuple[Any, Any], ...]:
def items(self) -> Tuple[Tuple[Any, Any], ...]:
"""
Returns a tuple of (field name, field value) pairs.
"""
@ -427,8 +435,6 @@ class MallocState(CStruct2GDB):
This class represents malloc_state struct with interface compatible with `gdb.Value`.
"""
_c_struct: pwndbg.gdblib.ctypes.Structure
if GLIBC_VERSION >= (2, 27):
_c_struct = c_malloc_state_2_27
elif GLIBC_VERSION >= (2, 23):
@ -472,7 +478,7 @@ class HeapInfo(CStruct2GDB):
This class represents heap_info struct with interface compatible with `gdb.Value`.
"""
_c_struct: pwndbg.gdblib.ctypes.Structure = c_heap_info
_c_struct = c_heap_info
sizeof = ctypes.sizeof(_c_struct)
@ -511,7 +517,7 @@ class MallocChunk(CStruct2GDB):
This class represents malloc_chunk struct with interface compatible with `gdb.Value`.
"""
_c_struct: pwndbg.gdblib.ctypes.Structure = c_malloc_chunk
_c_struct = c_malloc_chunk
sizeof = ctypes.sizeof(_c_struct)
@ -558,7 +564,6 @@ class TcachePerthreadStruct(CStruct2GDB):
This class represents tcache_perthread_struct with interface compatible with `gdb.Value`.
"""
_c_struct: pwndbg.gdblib.ctypes.Structure
if GLIBC_VERSION >= (2, 30):
_c_struct = c_tcache_perthread_struct_2_30
else:
@ -603,7 +608,6 @@ class TcacheEntry(CStruct2GDB):
This class represents the tcache_entry struct with interface compatible with `gdb.Value`.
"""
_c_struct: pwndbg.gdblib.ctypes.Structure
if GLIBC_VERSION >= (2, 29):
_c_struct = c_tcache_entry_2_29
else:
@ -917,7 +921,6 @@ class MallocPar(CStruct2GDB):
This class represents the malloc_par struct with interface compatible with `gdb.Value`.
"""
_c_struct: pwndbg.gdblib.ctypes.Structure
if GLIBC_VERSION >= (2, 35):
_c_struct = c_malloc_par_2_35
elif GLIBC_VERSION >= (2, 26):
@ -948,7 +951,7 @@ class MallocPar(CStruct2GDB):
# .tcache_unsorted_limit = 0 /* No limit. */
# #endif
# };
DEFAULT_MP_ = MallocPar._c_struct() # type: ignore[operator]
DEFAULT_MP_ = MallocPar._c_struct()
DEFAULT_MP_.top_pad = DEFAULT_TOP_PAD
DEFAULT_MP_.n_mmaps_max = DEFAULT_MMAP_MAX
DEFAULT_MP_.mmap_threshold = DEFAULT_MMAP_THRESHOLD

@ -14,8 +14,14 @@ import time
import traceback
import xmlrpc.client
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import TypeVar
import gdb
from typing_extensions import Concatenate
from typing_extensions import ParamSpec
import pwndbg.decorators
import pwndbg.gdblib.arch
@ -41,7 +47,7 @@ ida_timeout = pwndbg.gdblib.config.add_param(
xmlrpc.client.Marshaller.dispatch[int] = lambda _, v, w: w("<value><i8>%d</i8></value>" % v)
_ida = None
_ida: xmlrpc.client.ServerProxy | None = None
# to avoid printing the same exception multiple times, we store the last exception here
_ida_last_exception = None
@ -49,6 +55,9 @@ _ida_last_exception = None
# to avoid checking the connection multiple times with no delay, we store the last time we checked it
_ida_last_connection_check = 0
P = ParamSpec("P")
T = TypeVar("T")
@pwndbg.decorators.only_after_first_prompt()
@pwndbg.gdblib.config.trigger(ida_rpc_host, ida_rpc_port, ida_enabled, ida_timeout)
@ -122,40 +131,40 @@ def init_ida_rpc_client() -> None:
_ida_last_connection_check = now
class withIDA:
def __init__(self, fn) -> None:
self.fn = fn
functools.update_wrapper(self, fn)
def __call__(self, *args: Any, **kwargs: Any) -> Any | None:
def withIDA(func: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None:
if _ida is None:
init_ida_rpc_client()
if _ida is not None:
return self.fn(*args, **kwargs)
return func(*args, **kwargs)
return None
return wrapper
def withHexrays(f):
def withHexrays(func: Callable[P, T]) -> Callable[P, T | None]:
@withIDA
@functools.wraps(f)
def wrapper(*a, **kw):
if _ida.init_hexrays_plugin():
return f(*a, **kw)
@functools.wraps(func)
def wrapper(*a: P.args, **kw: P.kwargs) -> T | None:
if _ida is not None and _ida.init_hexrays_plugin():
return func(*a, **kw)
return None
return wrapper
def takes_address(function):
def takes_address(function: Callable[Concatenate[int, P], T]) -> Callable[Concatenate[int, P], T]:
@functools.wraps(function)
def wrapper(address, *args, **kwargs):
def wrapper(address: int, *args: P.args, **kwargs: P.kwargs) -> T:
return function(l2r(address), *args, **kwargs)
return wrapper
def returns_address(function):
def returns_address(function: Callable[P, int]) -> Callable[P, int]:
@functools.wraps(function)
def wrapper(*args, **kwargs):
def wrapper(*args: P.args, **kwargs: P.kwargs) -> int:
return r2l(function(*args, **kwargs))
return wrapper
@ -173,7 +182,7 @@ def can_connect() -> bool:
return True
def l2r(addr):
def l2r(addr: int) -> int:
exe = pwndbg.gdblib.elf.exe()
if not exe:
raise Exception("Can't find EXE base")
@ -181,7 +190,7 @@ def l2r(addr):
return result
def r2l(addr):
def r2l(addr: int) -> int:
exe = pwndbg.gdblib.elf.exe()
if not exe:
raise Exception("Can't find EXE base")
@ -206,21 +215,21 @@ def base():
@withIDA
@takes_address
def Comment(addr):
def Comment(addr: int):
return _ida.get_cmt(addr, 0) or _ida.get_cmt(addr)
@withIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def Name(addr):
def Name(addr: int):
return _ida.get_name(addr, 0x1) # GN_VISIBLE
@withIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def GetFuncOffset(addr):
def GetFuncOffset(addr: int):
rv = _ida.get_func_off_str(addr)
return rv
@ -228,20 +237,20 @@ def GetFuncOffset(addr):
@withIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def GetType(addr):
def GetType(addr: int):
rv = _ida.get_type(addr)
return rv
@withIDA
@returns_address
def here():
return _ida.here()
def here() -> int:
return _ida.here() # type: ignore[return-value]
@withIDA
@takes_address
def Jump(addr):
def Jump(addr: int):
# uses C++ api instead of idc one to avoid activating the IDA window
return _ida.jumpto(addr, -1, 0)
@ -249,7 +258,7 @@ def Jump(addr):
@withIDA
@takes_address
@pwndbg.lib.cache.cache_until("objfile")
def Anterior(addr):
def Anterior(addr: int):
hexrays_prefix = b"\x01\x04; "
lines = []
for i in range(10):
@ -275,11 +284,11 @@ def GetBptQty():
@withIDA
@returns_address
def GetBptEA(i):
return _ida.get_bpt_ea(i)
def GetBptEA(i: int) -> int:
return _ida.get_bpt_ea(i) # type: ignore[return-value]
_breakpoints: list[gdb.Breakpoint] = []
_breakpoints: List[gdb.Breakpoint] = []
@pwndbg.gdblib.events.cont
@ -334,8 +343,8 @@ def Auto_UnColor_PC() -> None:
@withIDA
@returns_address
@pwndbg.lib.cache.cache_until("objfile")
def LocByName(name):
return _ida.get_name_ea_simple(str(name))
def LocByName(name) -> int:
return _ida.get_name_ea_simple(str(name)) # type: ignore[return-value]
@withIDA
@ -413,8 +422,8 @@ def decompile_context(pc, context_lines):
@withIDA
@pwndbg.lib.cache.cache_until("forever")
def get_ida_versions():
return _ida.versions()
def get_ida_versions() -> Dict[str, str]:
return _ida.versions() # type: ignore[return-value]
@withIDA
@ -482,7 +491,7 @@ class IDC:
def __init__(self) -> None:
if available():
data: dict[Any, Any] = _ida.eval(self.query)
data: Dict[Any, Any] = _ida.eval(self.query)
self.__dict__.update(data)

@ -37,15 +37,15 @@ class ABI:
@staticmethod
def default() -> ABI:
return DEFAULT_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.current, "linux")]
return DEFAULT_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.name, "linux")]
@staticmethod
def syscall() -> SyscallABI:
return SYSCALL_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.current, "linux")]
return SYSCALL_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.name, "linux")]
@staticmethod
def sigreturn() -> SigreturnABI:
return SIGRETURN_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.current, "linux")]
return SIGRETURN_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.name, "linux")]
class SyscallABI(ABI):

@ -11,11 +11,15 @@ from functools import wraps
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing_extensions import ParamSpec
T = TypeVar("T")
P = ParamSpec("P")
# Set to enable print logging of cache hits/misses/clears
NO_DEBUG, DEBUG_GET, DEBUG_CLEAR, DEBUG_SET = 0, 1, 2, 4
@ -26,7 +30,7 @@ debug_name = "regs"
class DebugCacheDict(UserDict): # type: ignore[type-arg]
def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
def __init__(self, func: Callable[P, T], *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.hits = 0
self.misses = 0
@ -57,12 +61,12 @@ class DebugCacheDict(UserDict): # type: ignore[type-arg]
self.misses = 0
Cache = Union[Dict[Tuple[Any], Any], DebugCacheDict]
Cache = Union[Dict[Tuple[Any, ...], Any], DebugCacheDict]
class _CacheUntilEvent:
def __init__(self) -> None:
self.caches: list[Cache] = []
self.caches: List[Cache] = []
def connect_event_hooks(self, event_hooks: Tuple[Any, ...]) -> None:
"""
@ -110,14 +114,14 @@ _KWARGS_SEPARATOR = object()
IS_CACHING = True
def cache_until(*event_names: str) -> Callable[[Callable[..., T]], Callable[..., T]]:
def cache_until(*event_names: str) -> Callable[[Callable[P, T]], Callable[P, T]]:
if any(event_name not in _ALL_CACHE_EVENT_NAMES for event_name in event_names):
raise ValueError(
f"Unknown event name[s] passed to the `cache_until` decorator: {event_names}.\n"
f"Expected: {_ALL_CACHE_EVENT_NAMES}"
)
def inner(func: Callable[..., T]) -> Callable[..., T]:
def inner(func: Callable[P, T]) -> Callable[P, T]:
if hasattr(func, "cache"):
raise ValueError(
f"Cannot cache the {func.__name__} function twice! "
@ -127,16 +131,16 @@ def cache_until(*event_names: str) -> Callable[[Callable[..., T]], Callable[...,
cache: Cache = {} if not debug else DebugCacheDict(func)
@wraps(func)
def decorator(*a: Any, **kw: Any) -> T:
def decorator(*a: P.args, **kw: P.kwargs) -> T:
if IS_CACHING:
key: Tuple[Any] = (a, _KWARGS_SEPARATOR, *kw.items())
key: Tuple[Any, ...] = (a, _KWARGS_SEPARATOR, *kw.items())
# Check if the value is in the cache; if we have a cache miss,
# we return a special singleton object `_NOT_FOUND_IN_CACHE`. This way
# we can also cache a result of 'None' from a function
value = cache.get(key, _NOT_FOUND_IN_CACHE)
if value is not _NOT_FOUND_IN_CACHE:
return value
cached_value: Any = cache.get(key, _NOT_FOUND_IN_CACHE)
if cached_value is not _NOT_FOUND_IN_CACHE:
return cached_value
value = func(*a, **kw)

@ -30,6 +30,7 @@
from __future__ import annotations
import ctypes
from typing import Dict
import pwndbg.gdblib.ctypes
@ -49,7 +50,7 @@ Elf64_Xword = ctypes.c_uint64
Elf64_Sxword = ctypes.c_int64
AT_CONSTANTS: dict[int, str] = {
AT_CONSTANTS: Dict[int, str] = {
0: "AT_NULL", # /* End of vector */
1: "AT_IGNORE", # /* Entry should be ignored */
2: "AT_EXECFD", # /* File descriptor of program */

@ -1,9 +1,21 @@
from __future__ import annotations
import collections
from typing import List
from typing import NamedTuple
class Argument(NamedTuple):
type: str
derefcnt: int
name: str
class Function(NamedTuple):
type: str
derefcnt: int
name: str
args: List[Argument]
Function = collections.namedtuple("Function", ("type", "derefcnt", "name", "args"))
Argument = collections.namedtuple("Argument", ("type", "derefcnt", "name"))
functions = {
"ASN1_BIT_STRING_check": Function(

@ -19,7 +19,7 @@ printed_message = False
def which(arch: Arch) -> List[str]:
gcc = _which_binutils("g++", arch)
if not gcc:
if gcc is None:
global printed_message
if not printed_message:
printed_message = True
@ -29,6 +29,8 @@ def which(arch: Arch) -> List[str]:
return ["g++", "-m32"]
elif arch.ptrsize == 64:
return ["g++", "-m32"]
else:
raise ValueError(f"Unknown pointer size: {arch.ptrsize}")
return [gcc] + _flags(arch.name)
@ -42,7 +44,7 @@ def _which_binutils(util: str, arch: Arch, **kwargs: Any) -> str | None:
# Fix up binjitsu vs Debian triplet naming, and account
# for 'thumb' being its own binjitsu architecture.
arches: List[str] = [arch_name] + {
arches: List[str | None] = [arch_name] + {
"thumb": ["arm", "armcm", "aarch64"],
"i386": ["x86_64", "amd64"],
"i686": ["x86_64", "amd64"],
@ -58,16 +60,16 @@ def _which_binutils(util: str, arch: Arch, **kwargs: Any) -> str | None:
if arch_name in arches:
arches.append(None)
for arch in arches:
for arch_name in arches:
# hack for homebrew-installed binutils on mac
for gutil in ["g" + util, util]:
# e.g. objdump
if arch is None:
if arch_name is None:
pattern = gutil
# e.g. aarch64-linux-gnu-objdump
else:
pattern = f"{arch}*linux*-{gutil}"
pattern = f"{arch_name}*linux*-{gutil}"
for dir in os.environ["PATH"].split(":"):
res = sorted(glob.glob(os.path.join(dir, pattern)))

@ -2,10 +2,12 @@ from __future__ import annotations
import zlib
from collections import UserDict
from typing import Any
from typing import Dict
def parse_config(config_text: bytes) -> dict[str, str]:
res = {}
def parse_config(config_text: bytes) -> Dict[str, str]:
res: Dict[str, str] = {}
for line in config_text.split(b"\n"):
if b"=" in line:
@ -15,7 +17,7 @@ def parse_config(config_text: bytes) -> dict[str, str]:
return res
def parse_compresed_config(compressed_config: bytes) -> dict[str, str]:
def parse_compresed_config(compressed_config: bytes) -> Dict[str, str]:
config_text = zlib.decompress(compressed_config, 16)
return parse_config(config_text)
@ -25,8 +27,8 @@ def config_to_key(name: str) -> str:
class Kconfig(UserDict): # type: ignore[type-arg]
def __init__(self, compressed_config: bytes) -> None:
super().__init__()
def __init__(self, compressed_config: bytes, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.data = parse_compresed_config(compressed_config)
def get_key(self, name: str) -> str | None:
@ -50,7 +52,9 @@ class Kconfig(UserDict): # type: ignore[type-arg]
raise KeyError(f"Key {name} not found")
def __contains__(self, name: str) -> bool: # type: ignore[override]
def __contains__(self, name: object) -> bool:
if not isinstance(name, str):
return False
return self.get_key(name) is not None
def __getattr__(self, name: str):

@ -7,7 +7,9 @@ from __future__ import annotations
from typing import Dict
from typing import Iterator
from typing import List
from typing import OrderedDict
from typing import Set
from typing import Tuple
from typing import Union
@ -16,37 +18,37 @@ BitFlags = OrderedDict[str, Union[int, Tuple[int, int]]]
class RegisterSet:
#: Program counter register
pc: str | None = None
pc: str
#: Stack pointer register
stack: str | None = None
stack: str
#: Frame pointer register
frame: str | None = None
#: Return address register
retaddr: Tuple[str, ...] | None = None
retaddr: Tuple[str, ...]
#: Flags register (eflags, cpsr)
flags: Dict[str, BitFlags] | None = None
flags: Dict[str, BitFlags]
#: List of native-size general-purpose registers
gpr: Tuple[str, ...] | None = None
gpr: Tuple[str, ...]
#: List of miscellaneous, valid registers
misc: Tuple[str, ...] | None = None
misc: Tuple[str, ...]
#: Register-based arguments for most common ABI
regs = None
args: Tuple[str, ...]
#: Return value register
retval: str | None = None
retval: str | None
#: Common registers which should be displayed in the register context
common: list[str] = []
common: List[str] = []
#: All valid registers
all: set[str] | None = None
all: Set[str]
def __init__(
self,

@ -7,14 +7,17 @@ from __future__ import annotations
import sys
from types import TracebackType
from typing import Any
from typing import List
from typing import TextIO
from typing import Tuple
from typing import Type
class Stdio:
queue: list[tuple[TextIO, TextIO, TextIO]] = []
queue: List[Tuple[TextIO, TextIO, TextIO]] = []
def __enter__(self, *a, **kw) -> None:
def __enter__(self, *a: Any, **kw: Any) -> None:
self.queue.append((sys.stdin, sys.stdout, sys.stderr))
sys.stdin = sys.__stdin__

@ -2,10 +2,11 @@ from __future__ import annotations
import re
from random import choice
from typing import List
from pwndbg.color import message
TIPS: list[str] = [
TIPS: List[str] = [
# GDB hints
"GDB's `apropos <topic>` command displays all registered commands that are related to the given <topic>",
"GDB's `follow-fork-mode` parameter can be used to set whether to trace parent or child after fork() calls",

@ -32,9 +32,10 @@ from __future__ import annotations
import os
import stat
from typing import Set
def which(name: str, all: bool = False) -> set[str] | str | None:
def which(name: str, all: bool = False) -> Set[str] | str | None:
"""which(name, flags = os.X_OK, all = False) -> str or str set
Works as the system command ``which``; searches $PATH for ``name`` and
@ -60,7 +61,7 @@ def which(name: str, all: bool = False) -> set[str] | str | None:
return name
isroot = os.getuid() == 0
out: set[str] = set()
out: Set[str] = set()
try:
path = os.environ["PATH"]
except KeyError:

@ -3,16 +3,16 @@ from __future__ import annotations
import functools
import subprocess
from subprocess import STDOUT
from typing import Any
from typing import Callable
from typing import List
from typing import TypeVar
from typing import cast
from pwnlib.util.misc import which
from typing_extensions import ParamSpec
import pwndbg.commands
P = ParamSpec("P")
T = TypeVar("T")
@ -25,18 +25,18 @@ class OnlyWithCommand:
if self.cmd_path:
break
def __call__(self, function: Callable[..., T]) -> Callable[..., T]:
def __call__(self, function: Callable[P, T]) -> Callable[P, T | None]:
function.cmd = self.cmd
@pwndbg.commands.OnlyWithFile
@functools.wraps(function)
def _OnlyWithCommand(*a: Any, **kw: Any) -> T:
def _OnlyWithCommand(*a: P.args, **kw: P.kwargs) -> T | None:
if self.cmd_path:
return function(*a, **kw)
else:
raise OSError(f"Could not find command(s) {', '.join(self.all_cmds)} in $PATH")
return cast(Callable[..., T], _OnlyWithCommand)
return _OnlyWithCommand
def call_cmd(cmd: str | List[str]) -> str:

@ -2,6 +2,7 @@ from __future__ import annotations
from enum import Enum
from typing import Dict
from typing import List
import pwndbg.wrappers
@ -17,12 +18,12 @@ class RelocationType(Enum):
@pwndbg.wrappers.OnlyWithCommand(cmd_name)
def get_got_entry(local_path: str) -> Dict[RelocationType, list[str]]:
def get_got_entry(local_path: str) -> Dict[RelocationType, List[str]]:
# --wide is for showing the full information, e.g.: R_X86_64_JUMP_SLOT instead of R_X86_64_JUMP_SLO
cmd = get_got_entry.cmd + ["--relocs", "--wide", local_path]
readelf_out = pwndbg.wrappers.call_cmd(cmd)
entries: Dict[RelocationType, list[str]] = {category: [] for category in RelocationType}
entries: Dict[RelocationType, List[str]] = {category: [] for category in RelocationType}
for line in readelf_out.splitlines():
if not line or not line[0].isdigit() or " " not in line:
continue

@ -175,6 +175,7 @@ pytest = "8.0.2"
pytest-cov = "^4.1.0"
rich = "^13.7.1"
ruff = "^0.4.1"
sortedcontainers-stubs = "^2.4.2"
testresources = "^2.0.1"
tomli = "^2.0.1"
types-gdb = "^12.1.4.20240408"

Loading…
Cancel
Save