Add more type hints (#1743)

* Apply safe autotyping transformations

* Apply aggressive autotyping transformations

* Reformat

* Convert Python2 type hints to Python3

* Fix cache type hints

* Convert forward reference types to strings, as Python 3.6 doesn't support __future__.annotations
pull/1745/head
Gulshan Singh 3 years ago committed by GitHub
parent a3b66dae5b
commit 91f3081e4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,7 +16,7 @@ from pwndbg.heap.ptmalloc import DebugSymsHeap
from pwndbg.heap.ptmalloc import HeuristicHeap
from pwndbg.heap.ptmalloc import SymbolUnresolvableError
commands = [] # type: List[Command]
commands: "List[Command]" = []
command_names = set()
@ -76,7 +76,7 @@ class Command(gdb.Command):
"""Generic command wrapper"""
builtin_override_whitelist = {"up", "down", "search", "pwd", "start", "ignore"}
history = {} # type: Dict[int,str]
history: Dict[int, str] = {}
def __init__(
self,

@ -111,7 +111,7 @@ def build_prompt(question, command=None):
conversation = [system_msg, context_msg]
for (q, a) in zip(last_question, last_answer):
for q, a in zip(last_question, last_answer):
conversation.append({"role": "user", "content": q})
conversation.append({"role": "assistant", "content": a})
@ -136,7 +136,6 @@ def flatten_prompt(conversation):
def build_context_prompt_body():
decompile = False
## First, get the current GDB context
## Let's begin with the assembly near the current instruction

@ -84,7 +84,7 @@ config_context_sections = pwndbg.gdblib.config.add_param(
)
# Storing output configuration per section
outputs = {} # type: Dict[str,str]
outputs: Dict[str, str] = {}
output_settings = {}
@ -800,7 +800,6 @@ def context_backtrace(with_banner=True, target=sys.stdout, width=None):
i = 0
bt_prefix = "%s" % pwndbg.gdblib.config.backtrace_prefix
while True:
prefix = bt_prefix if frame == this_frame else " " * len(bt_prefix)
prefix = f" {c.prefix(prefix)}"
addrsz = c.address(pwndbg.ui.addrsz(frame.pc()))
@ -848,7 +847,6 @@ def save_signal(signal) -> None:
msg = f"Program received signal {signal.stop_signal}"
if signal.stop_signal == "SIGSEGV":
# When users use rr (https://rr-project.org or https://github.com/mozilla/rr)
# we can't access $_siginfo, so lets just show current pc
# see also issue 476

@ -141,7 +141,6 @@ def add_custom_structure(custom_structure_name) -> None:
@OnlyWhenStructFileExists
def edit_custom_structure(custom_structure_name, custom_structure_path) -> None:
# Lookup an editor to use for editing the custom structure.
editor_preference = os.getenv("EDITOR")
if not editor_preference:

@ -41,7 +41,7 @@ def setflag(flag, value) -> None:
flag = flag.upper()
for flag_reg, flags in register_set.flags.items():
for (flag_name, flag_bit) in flags.items():
for flag_name, flag_bit in flags.items():
if flag_name == flag:
old_flags_reg_value = pwndbg.gdblib.regs[flag_reg]
bit_value = 1 << flag_bit

@ -18,7 +18,6 @@ parser.add_argument(
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.LINUX)
@pwndbg.commands.OnlyWhenRunning
def got(name_filter="") -> None:
relro_status = pwndbg.wrappers.checksec.relro_status()
pie_status = pwndbg.wrappers.checksec.pie_status()
jmpslots = list(pwndbg.wrappers.readelf.get_jmpslots())

@ -134,7 +134,6 @@ save_ida()
@GdbFunction()
def ida(name):
"""Evaluate ida.LocByName() on the supplied value."""
name = name.string()
result = pwndbg.ida.LocByName(name)

@ -61,7 +61,6 @@ def prot_str_to_val(protstr):
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.MEMORY)
@pwndbg.commands.OnlyWhenRunning
def mprotect(addr, length, prot) -> None:
prot_int = prot_str_to_val(prot)
# generate a shellcode that executes the mprotect syscall

@ -112,7 +112,6 @@ def p2p_walk(addr, ranges, current_level):
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.MEMORY)
@pwndbg.commands.OnlyWhenRunning
def p2p(mapping_names: Optional[List] = None) -> None:
if not mapping_names:
return

@ -84,7 +84,6 @@ parser.add_argument(
def probeleak(
address=None, count=0x40, max_distance=0x0, point_to=None, max_ptrs=0, flags=None
) -> None:
address = int(address)
address &= pwndbg.gdblib.arch.ptrmask
ptrsize = pwndbg.gdblib.arch.ptrsize

@ -23,7 +23,6 @@ parser.add_argument("argument", nargs="*", type=str, help="Arguments to pass to
@pwndbg.commands.OnlyWithFile
def rop(grep, argument) -> None:
with tempfile.NamedTemporaryFile() as corefile:
# If the process is running, dump a corefile so we get actual addresses.
if pwndbg.gdblib.proc.alive:
filename = corefile.name

@ -19,7 +19,6 @@ parser.add_argument("argument", nargs="*", type=str, help="Arguments to pass to
@pwndbg.commands.OnlyWithFile
def ropper(argument) -> None:
with tempfile.NamedTemporaryFile() as corefile:
# If the process is running, dump a corefile so we get actual addresses.
if pwndbg.gdblib.proc.alive:
filename = corefile.name

@ -218,7 +218,6 @@ def search(type, hex, executable, writable, value, mapping_name, save, next, tru
for address in pwndbg.search.search(
value, mappings=mappings, executable=executable, writable=writable
):
if save:
saved.add(address)

@ -18,7 +18,6 @@ parser.add_argument("address", nargs="?", default="$pc", help="Address to inspec
def print_line(name, addr, first, second, op, width=20) -> None:
print(
f"{name.rjust(width)} {M.get(addr)} = {M.get(first) if not isinstance(first, str) else first.ljust(len(hex(addr).rstrip('L')))} {op} {second:#x}"
)

@ -29,7 +29,6 @@ class DisassemblyAssistant(pwndbg.disasm.arch.DisassemblyAssistant):
return "#" + super().immediate_sz(instruction, operand)
def condition(self, instruction):
# We can't reason about anything except the current instruction
if instruction.cc == ARM_CC_AL:
return None

@ -25,7 +25,7 @@ pwnlib_archs_mapping = {
arch = Arch("i386", typeinfo.ptrsize, "little")
def _get_arch(ptrsize):
def _get_arch(ptrsize: int):
not_exactly_arch = False
if "little" in gdb.execute("show endian", to_string=True).lower():

@ -52,7 +52,7 @@ def get_field_by_name(obj, field):
return obj
def happy(typename):
def happy(typename: str):
prefix = ""
if "unsigned" in typename:
prefix = "u"

@ -73,7 +73,7 @@ class EventWrapper:
fire them manually (to invoke them you have to call `invoke_callbacks`).
"""
def __init__(self, name) -> None:
def __init__(self, name: str) -> None:
self.name = name
self._event = getattr(gdb.events, self.name, None)

@ -13,7 +13,7 @@ def container_of(ptr, typename: str, fieldname: str):
return gdb.Value(obj_addr).cast(ptr_type)
def for_each_entry(head, typename, field):
def for_each_entry(head, typename: str, field):
addr = head["next"]
while addr != head.address:
yield container_of(addr, typename, field)

@ -71,7 +71,7 @@ def get_flags_list(flags: int) -> List[str]:
class Freelist:
def __init__(self, start_addr: int, offset: int, random: int = 0):
def __init__(self, start_addr: int, offset: int, random: int = 0) -> None:
self.start_addr = start_addr
self.offset = offset
self.random = random
@ -100,7 +100,7 @@ class Freelist:
class SlabCache:
def __init__(self, slab_cache: gdb.Value):
def __init__(self, slab_cache: gdb.Value) -> None:
self._slab_cache = slab_cache
@property
@ -173,7 +173,7 @@ class SlabCache:
class CpuCache:
def __init__(self, cpu_cache: gdb.Value, slab_cache: SlabCache, cpu: int):
def __init__(self, cpu_cache: gdb.Value, slab_cache: SlabCache, cpu: int) -> None:
self._cpu_cache = cpu_cache
self.slab_cache = slab_cache
self.cpu = cpu
@ -210,7 +210,7 @@ class CpuCache:
class Slab:
def __init__(self, slab: gdb.Value, cpu_cache: CpuCache, is_partial: bool = False):
def __init__(self, slab: gdb.Value, cpu_cache: CpuCache, is_partial: bool = False) -> None:
self._slab = slab
self.cpu_cache = cpu_cache
self.slab_cache = cpu_cache.slab_cache

@ -3,6 +3,8 @@ Reading, writing, and describing memory.
"""
from typing import Optional
import gdb
import pwndbg.gdblib.arch
@ -115,7 +117,7 @@ def peek(address):
@pwndbg.lib.cache.cache_until("stop")
def is_readable_address(address):
def is_readable_address(address) -> bool:
"""is_readable_address(address) -> bool
Check if the address can be read by GDB.
@ -244,7 +246,7 @@ def u64(addr: int) -> int:
return readtype(pwndbg.gdblib.typeinfo.uint64, addr)
def u(addr, size=None):
def u(addr, size: Optional[int] = None):
"""u(addr, size=None) -> int
Read one ``unsigned`` integer from the specified address,

@ -20,12 +20,12 @@ from pwndbg.lib.regs import reg_sets
@pwndbg.gdblib.proc.OnlyWhenRunning
def gdb77_get_register(name):
def gdb77_get_register(name: str):
return gdb.parse_and_eval("$" + name)
@pwndbg.gdblib.proc.OnlyWhenRunning
def gdb79_get_register(name):
def gdb79_get_register(name: str):
return gdb.selected_frame().read_register(name)
@ -186,7 +186,7 @@ class module(ModuleType):
return self._fs_gs_helper("gs_base", ARCH_GET_GS)
@pwndbg.lib.cache.cache_until("stop")
def _fs_gs_helper(self, regname, which):
def _fs_gs_helper(self, regname: str, which):
"""Supports fetching based on segmented addressing, a la fs:[0x30].
Requires ptrace'ing the child directly for GDB < 8."""

@ -89,7 +89,7 @@ def update():
update()
def load(name):
def load(name: str):
"""Load a GDB symbol; note that new symbols can be added with `add-symbol-file` functionality"""
try:
return gdb.lookup_type(name)
@ -97,13 +97,13 @@ def load(name):
return None
def read_gdbvalue(type_name, addr):
def read_gdbvalue(type_name: str, addr):
"""Read the memory contents at addr and interpret them as a GDB value with the given type"""
gdb_type = pwndbg.gdblib.typeinfo.load(type_name)
return gdb.Value(addr).cast(gdb_type.pointer()).dereference()
def get_type(size):
def get_type(size: int):
return {
1: pwndbg.gdblib.typeinfo.uint8,
2: pwndbg.gdblib.typeinfo.uint16,

@ -105,7 +105,6 @@ def get() -> Tuple[pwndbg.lib.memory.Page, ...]:
"aarch64",
"riscv:rv64",
):
# If kernel_vmmap_via_pt is not set to the default value of "deprecated",
# That means the user was explicitly setting it themselves and need to
# be warned that the option is deprecated
@ -650,7 +649,7 @@ def info_files():
@pwndbg.lib.cache.cache_until("exit")
def info_auxv(skip_exe=False):
def info_auxv(skip_exe: bool = False):
"""
Extracts the name of the executable from the output of the command
"info auxv". Note that if the executable path is a symlink,
@ -692,7 +691,7 @@ def info_auxv(skip_exe=False):
return tuple(sorted(pages))
def find_boundaries(addr, name="", min=0):
def find_boundaries(addr, name: str = "", min: int = 0):
"""
Given a single address, find all contiguous pages
which are mapped.

@ -85,7 +85,7 @@ class Bin:
class Bins:
def __init__(self, bin_type) -> None:
# `typing.OrderedDict` requires Python 3.7
self.bins = OrderedDict() # type: OrderedDict[Union[int, str], Bin]
self.bins: OrderedDict[Union[int, str], Bin] = OrderedDict()
self.bin_type = bin_type
# TODO: There's a bunch of bin-specific logic in here, maybe we should

@ -31,15 +31,15 @@ class ABI:
self.stack_minimum = minimum
@staticmethod
def default(): # type: () -> ABI
def default() -> "ABI":
return DEFAULT_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.current, "linux")]
@staticmethod
def syscall(): # type: () -> ABI
def syscall() -> "ABI":
return SYSCALL_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.current, "linux")]
@staticmethod
def sigreturn(): # type: () -> SigreturnABI
def sigreturn() -> "SigreturnABI":
return SIGRETURN_ABIS[(8 * pwndbg.gdblib.arch.ptrsize, pwndbg.gdblib.arch.current, "linux")]

@ -9,7 +9,9 @@ from functools import wraps
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
# Set to enable print logging of cache hits/misses/clears
NO_DEBUG, DEBUG_GET, DEBUG_CLEAR, DEBUG_SET = 0, 1, 2, 4
@ -20,7 +22,7 @@ debug_name = "regs"
class DebugCacheDict(UserDict):
def __init__(self, func, *args, **kwargs):
def __init__(self, func, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.hits = 0
self.misses = 0
@ -38,12 +40,12 @@ class DebugCacheDict(UserDict):
self.misses += 1
raise
def __setitem__(self, key, value):
def __setitem__(self, key, value) -> None:
if debug & DEBUG_SET and (not debug_name or debug_name in self.name):
print(f"SET {self.name}: {key}={value}")
self.data[key] = value
def clear(self):
def clear(self) -> None:
if debug & DEBUG_CLEAR and (not debug_name or debug_name in self.name):
print(f"CLEAR {self.name} (hits: {self.hits}, misses: {self.misses})")
self.data.clear()
@ -51,11 +53,14 @@ class DebugCacheDict(UserDict):
self.misses = 0
Cache = Union[Dict[Tuple[Any], Any], DebugCacheDict]
class _CacheUntilEvent:
def __init__(self):
self.caches = []
def __init__(self) -> None:
self.caches: List[Cache] = []
def connect_event_hooks(self, event_hooks):
def connect_event_hooks(self, event_hooks) -> None:
"""
A given cache until event may require multiple debugger events
to be handled properly. E.g. our `stop` cache needs to be handled
@ -64,11 +69,11 @@ class _CacheUntilEvent:
for event_hook in event_hooks:
event_hook(self.clear)
def clear(self):
def clear(self) -> None:
for cache in self.caches:
cache.clear()
def add_cache(self, cache):
def add_cache(self, cache) -> None:
self.caches.append(cache)
@ -85,7 +90,7 @@ _ALL_CACHE_UNTIL_EVENTS = {
_ALL_CACHE_EVENT_NAMES = tuple(_ALL_CACHE_UNTIL_EVENTS.keys())
def connect_clear_caching_events(event_dicts):
def connect_clear_caching_events(event_dicts) -> None:
"""
Connect given debugger event hooks to correspoonding _CacheUntilEvent instances
"""
@ -115,7 +120,7 @@ def cache_until(*event_names) -> Callable:
"Pass multiple event names to the `cache_until` decorator."
)
cache: Dict[Tuple[Any], Any] = {} if not debug else DebugCacheDict(func)
cache: Cache = {} if not debug else DebugCacheDict(func)
@wraps(func)
def decorator(*a, **kw):

@ -25,7 +25,7 @@ PARAM_CLASSES = {
class Parameter:
def __init__(
self,
name,
name: str,
default,
set_show_doc,
*,
@ -60,7 +60,7 @@ class Parameter:
i.e. `my-config` has the attribute name `my_config`"""
return self.name.replace("-", "_")
def __getattr__(self, name):
def __getattr__(self, name: str):
return getattr(self.value, name)
# Casting
@ -133,7 +133,7 @@ class Config:
def add_param(
self,
name,
name: str,
default,
set_show_doc,
*,
@ -178,7 +178,7 @@ class Config:
def get_params(self, scope) -> List[Parameter]:
return sorted(filter(lambda p: p.scope == scope, self.params.values()))
def __getattr__(self, name):
def __getattr__(self, name: str):
if name in self.params:
return self.params[name]
else:

@ -73,7 +73,7 @@ def _which_binutils(util, arch, **kwargs):
return res[0]
def _flags(arch_name): # type: (str) -> List[str]
def _flags(arch_name: str) -> List[str]:
if arch_name == "i386":
return ["-m32"]
if arch_name.endswith("x86-64"):

@ -43,7 +43,7 @@ class Kconfig(UserDict):
return None
def __getitem__(self, name):
def __getitem__(self, name: str):
key = self.get_key(name)
if key:
return self.data[key]

@ -38,7 +38,6 @@ def binary_parse_breakpoints(binary_code):
line_no += 1
for bug_id, func_name in func_names.items():
if f"void {func_name}" in line:
# find break1 and break2 inside function
b1, b2 = None, None
while line_no < len(lines) and (b1 is None or b2 is None):

@ -28,7 +28,6 @@ def test_mprotect_executes_properly(start_binary):
def test_cannot_run_mprotect_when_not_running(start_binary):
# expect error message
assert "mprotect: The program is not being run.\n" == gdb.execute(
"mprotect 0x0 0x1000 PROT_EXEC|PROT_READ|PROT_WRITE", to_string=True

@ -18,7 +18,6 @@ def test_windbg_dX_commands(start_binary):
# Try to fail commands in different way
for cmd_prefix in ("dq", "dd", "dw", "db"):
# With a non-existent symbol
cmd = cmd_prefix + " nonexistentsymbol"
assert gdb.execute(cmd, to_string=True) == (

Loading…
Cancel
Save