Improve performance of pointer memory reads (#3021)

* Change usage of pwndbg.aglib.memory.pvoid to optimized pwndbg.aglib.memory.read_pointer_width

* lint

* Use google style docstring, remove old pvoid function

---------

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
pull/3039/head
OBarronCS 6 months ago committed by GitHub
parent 1b78613a2a
commit c63a484001
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -513,11 +513,7 @@ class DisassemblyAssistant:
page = pwndbg.aglib.vmmap.find(address)
if page and not page.write:
try:
address = int(
pwndbg.aglib.memory.get_typed_pointer_value(
pwndbg.aglib.typeinfo.ppvoid, address
)
)
address = pwndbg.aglib.memory.read_pointer_width(address)
address &= pwndbg.aglib.arch.ptrmask
address_list.append(address)
except pwndbg.dbg_mod.Error:

@ -201,7 +201,7 @@ class X86DisassemblyAssistant(pwndbg.aglib.disasm.arch.DisassemblyAssistant):
elif pc_is_at_instruction:
# Attempt to read from the top of the stack
try:
value = pwndbg.aglib.memory.pvoid(pwndbg.aglib.regs.sp)
value = pwndbg.aglib.memory.read_pointer_width(pwndbg.aglib.regs.sp)
instruction.annotation = register_assign(
reg_operand.str, MemoryColor.get_address_and_symbol(value)
)

@ -1570,10 +1570,10 @@ class DebugSymsHeap(GlibcMemoryAllocator[pwndbg.dbg_mod.Type, pwndbg.dbg_mod.Val
"thread_arena", prefer_static=True
)
if thread_arena_addr:
thread_arena_value = pwndbg.aglib.memory.pvoid(thread_arena_addr)
thread_arena_value = pwndbg.aglib.memory.read_pointer_width(thread_arena_addr)
# thread_arena might be NULL if the thread doesn't allocate arena yet
if thread_arena_value:
return Arena(pwndbg.aglib.memory.pvoid(thread_arena_addr))
return Arena(pwndbg.aglib.memory.read_pointer_width(thread_arena_addr))
return None
else:
return self.main_arena
@ -1585,7 +1585,7 @@ class DebugSymsHeap(GlibcMemoryAllocator[pwndbg.dbg_mod.Type, pwndbg.dbg_mod.Val
"""
if self.has_tcache():
if self.multithreaded:
tcache_addr = pwndbg.aglib.memory.pvoid(
tcache_addr = pwndbg.aglib.memory.read_pointer_width(
pwndbg.aglib.symbol.lookup_symbol_addr("tcache", prefer_static=True)
)
if tcache_addr == 0:
@ -1925,7 +1925,7 @@ class HeuristicHeap(
and offset % pwndbg.aglib.arch.ptrsize == 0
and pwndbg.aglib.memory.is_readable_address(offset + tls_address)
):
guess = pwndbg.aglib.memory.pvoid(offset + tls_address)
guess = pwndbg.aglib.memory.read_pointer_width(offset + tls_address)
if validator(guess):
return guess, offset + tls_address
return None
@ -1947,7 +1947,7 @@ class HeuristicHeap(
for addr in search_range:
if pwndbg.aglib.memory.is_readable_address(addr):
reading = True
guess = pwndbg.aglib.memory.pvoid(addr)
guess = pwndbg.aglib.memory.read_pointer_width(addr)
if validator(guess):
return guess, addr
elif reading:
@ -1961,7 +1961,7 @@ class HeuristicHeap(
"thread_arena", prefer_static=True
)
if thread_arena_via_symbol:
thread_arena_value = pwndbg.aglib.memory.pvoid(thread_arena_via_symbol)
thread_arena_value = pwndbg.aglib.memory.read_pointer_width(thread_arena_via_symbol)
return Arena(thread_arena_value) if thread_arena_value else None
thread_arena_via_config = int(str(pwndbg.config.thread_arena), 0)
if thread_arena_via_config:
@ -2034,7 +2034,9 @@ class HeuristicHeap(
self._thread_cache = tps(thread_cache_via_config)
return self._thread_cache
elif thread_cache_via_symbol:
thread_cache_struct_addr = pwndbg.aglib.memory.pvoid(thread_cache_via_symbol)
thread_cache_struct_addr = pwndbg.aglib.memory.read_pointer_width(
thread_cache_via_symbol
)
if thread_cache_struct_addr:
self._thread_cache = tps(int(thread_cache_struct_addr))
return self._thread_cache

@ -164,7 +164,7 @@ def kcmdline() -> str:
addr = pwndbg.aglib.symbol.lookup_symbol_addr("saved_command_line")
assert addr is not None, "Symbol saved_command_line not exists"
cmdline_addr = pwndbg.aglib.memory.pvoid(addr)
cmdline_addr = pwndbg.aglib.memory.read_pointer_width(addr)
return pwndbg.aglib.memory.string(cmdline_addr).decode("ascii")

@ -93,7 +93,7 @@ class Freelist:
while current_object:
addr = int(current_object)
yield current_object
current_object = pwndbg.aglib.memory.pvoid(addr + self.offset)
current_object = pwndbg.aglib.memory.read_pointer_width(addr + self.offset)
if self.random:
current_object ^= self.random ^ swab(addr + self.offset)
if addr in seen:

@ -183,12 +183,14 @@ def uint(addr: int) -> int:
return readtype(pwndbg.aglib.typeinfo.uint, addr)
def pvoid(addr: int) -> int:
"""pvoid(addr) -> int
def read_pointer_width(addr: int) -> int:
"""
Read one pointer-width integer at the specified address.
Read one pointer from the specified address.
Raises:
pwndbg.dbg_mod.Error: if memory read fails.
"""
return readtype(pwndbg.aglib.typeinfo.pvoid, addr)
return pwndbg.aglib.arch.unpack(read(addr, pwndbg.aglib.arch.ptrsize))
def u8(addr: int) -> int:

@ -394,7 +394,7 @@ def check_non_stack_argv(expr: str) -> Tuple[CheckSatResult, str]:
n = 0
while True:
try:
argv_n = pwndbg.aglib.memory.pvoid(argv + n * pwndbg.aglib.arch.ptrsize)
argv_n = pwndbg.aglib.memory.read_pointer_width(argv + n * pwndbg.aglib.arch.ptrsize)
except pwndbg.dbg_mod.Error:
output_msg += f"&argv[{n}] = {argv + n * pwndbg.aglib.arch.ptrsize:#x}, {argv + n * pwndbg.aglib.arch.ptrsize:#x} is a invalid address\n"
return UNSAT, output_msg
@ -444,7 +444,7 @@ def check_envp(expr: str) -> Tuple[bool, str]:
n = 0
while True:
try:
envp_n = pwndbg.aglib.memory.pvoid(envp + n * pwndbg.aglib.arch.ptrsize)
envp_n = pwndbg.aglib.memory.read_pointer_width(envp + n * pwndbg.aglib.arch.ptrsize)
except pwndbg.dbg_mod.Error:
output_msg += f"&envp[{n}] = {envp + n * pwndbg.aglib.arch.ptrsize:#x}, {envp + n * pwndbg.aglib.arch.ptrsize:#x} is a invalid address\n"
return False, output_msg

@ -142,7 +142,7 @@ def argument(n: int, abi: pwndbg.lib.abi.ABI | None = None) -> int:
sp = pwndbg.aglib.regs.sp + (n * pwndbg.aglib.arch.ptrsize)
return int(pwndbg.aglib.memory.get_typed_pointer_value(pwndbg.aglib.typeinfo.ppvoid, sp))
return pwndbg.aglib.memory.read_pointer_width(sp)
def arguments(abi: pwndbg.lib.abi.ABI | None = None):

@ -80,9 +80,7 @@ def get(
if not is_pagefault_supported and not pwndbg.aglib.vmmap.find(address):
break
next_address = int(
pwndbg.aglib.memory.get_typed_pointer_value(pwndbg.aglib.typeinfo.ppvoid, address)
)
next_address = pwndbg.aglib.memory.read_pointer_width(address)
address = next_address ^ ((address >> 12) if safe_linking else 0)
address &= pwndbg.aglib.arch.ptrmask
result.append(address)

@ -40,7 +40,7 @@ def canary_value() -> Tuple[Optional[int], Optional[int]]:
if at_random is None:
return None, None
global_canary = pwndbg.aglib.memory.pvoid(at_random)
global_canary = pwndbg.aglib.memory.read_pointer_width(at_random)
# masking canary value as canaries on the stack has last byte = 0
global_canary &= pwndbg.aglib.arch.ptrmask ^ 0xFF
@ -106,7 +106,9 @@ def canary(all) -> None:
# Verify the value at the TLS address matches our computed canary
try:
tls_canary = pwndbg.aglib.memory.pvoid(tls_addr) & (pwndbg.aglib.arch.ptrmask ^ 0xFF)
tls_canary = pwndbg.aglib.memory.read_pointer_width(tls_addr) & (
pwndbg.aglib.arch.ptrmask ^ 0xFF
)
if tls_canary != global_canary:
print(message.warn("Warning: TLS canary value doesn't match global canary!"))
except Exception:

@ -9,6 +9,7 @@ from elftools.elf.elffile import ELFFile
import pwndbg.aglib.arch
import pwndbg.aglib.file
import pwndbg.aglib.memory
import pwndbg.aglib.proc
import pwndbg.aglib.qemu
import pwndbg.aglib.vmmap
@ -185,5 +186,5 @@ def _got(path: str, accept_readonly: bool, symbol_filter: str) -> None:
)
for output in outputs:
print(
f"[{M.get(output['address'])}] {message.hint(output['name'])} -> {pwndbg.chain.format(pwndbg.aglib.memory.pvoid(output['address']))}" # type: ignore[arg-type]
f"[{M.get(output['address'])}] {message.hint(output['name'])} -> {pwndbg.chain.format(pwndbg.aglib.memory.read_pointer_width(output['address']))}" # type: ignore[arg-type]
)

@ -10,6 +10,7 @@ from typing import Dict
from typing import List
import pwndbg
import pwndbg.aglib.memory
import pwndbg.aglib.vmmap
import pwndbg.color.memory as M
import pwndbg.commands
@ -162,7 +163,7 @@ def leakfind(
):
try:
cur_addr &= pwndbg.aglib.arch.ptrmask
result = int(pwndbg.aglib.memory.pvoid(cur_addr))
result = int(pwndbg.aglib.memory.read_pointer_width(cur_addr))
if result in visited_map or result in visited_set:
continue
visited_map[result] = (

@ -42,7 +42,9 @@ def _get_errno() -> int:
# So we have to check the got.plt entry first before calling it
errno_loc_gotplt = pwndbg.aglib.symbol.lookup_symbol_addr("__errno_location@got.plt")
if errno_loc_gotplt is not None:
page_loaded = pwndbg.aglib.vmmap.find(pwndbg.aglib.memory.pvoid(errno_loc_gotplt))
page_loaded = pwndbg.aglib.vmmap.find(
pwndbg.aglib.memory.read_pointer_width(errno_loc_gotplt)
)
if page_loaded is None:
raise pwndbg.dbg_mod.Error(
"Could not determine error code automatically: the __errno_location@got.plt has no valid address yet (perhaps libc.so hasn't been loaded yet?)"

@ -68,7 +68,7 @@ parser.add_argument("mapping_names", type=address_range, nargs="+", help="Mappin
def maybe_points_to_ranges(ptr: int, rs: List[AddrRange]):
try:
pointee = pwndbg.aglib.memory.pvoid(ptr)
pointee = pwndbg.aglib.memory.read_pointer_width(ptr)
except Exception:
return None

@ -1221,7 +1221,7 @@ def try_free(addr: str | int) -> None:
# check hook
free_hook = pwndbg.aglib.symbol.lookup_symbol_addr("__free_hook")
if free_hook is not None:
if pwndbg.aglib.memory.pvoid(free_hook) != 0:
if pwndbg.aglib.memory.read_pointer_width(free_hook) != 0:
print(message.success("__libc_free: will execute __free_hook"))
# free(0) has no effect
@ -1341,7 +1341,7 @@ def try_free(addr: str | int) -> None:
print(message.notice("Tcache checks"))
e = addr + 2 * size_sz
e += allocator.tcache_entry.keys().index("key") * ptr_size
e = pwndbg.aglib.memory.pvoid(e)
e = pwndbg.aglib.memory.read_pointer_width(e)
tcache_addr = int(allocator.thread_cache.address)
if e == tcache_addr:
# todo, actually do checks

@ -256,7 +256,7 @@ def telescope(
# Buffer repeating values.
if skip_repeating_values:
value = pwndbg.aglib.memory.pvoid(addr)
value = pwndbg.aglib.memory.read_pointer_width(addr)
if (
last == value
and addr != input_address

@ -128,7 +128,7 @@ def enhance(
if value + pwndbg.aglib.arch.ptrsize > page.end:
return E.integer(int_str(value))
intval = int(pwndbg.aglib.memory.get_typed_pointer_value(pwndbg.aglib.typeinfo.pvoid, value))
intval = pwndbg.aglib.memory.read_pointer_width(value)
if safe_linking:
intval ^= value >> 12
intval0 = intval

@ -272,7 +272,7 @@ class Patcher(pwndbg.gdblib.bpoint.Breakpoint):
def should_stop(self) -> bool:
# Read the new branch target, and update the redirection target of the
# tracker accordingly.
new_target = pwndbg.aglib.memory.pvoid(self.entry)
new_target = pwndbg.aglib.memory.read_pointer_width(self.entry)
if new_target == self.tracker.trapped_address:
# The write to this range from within GDB that we do at the end of
# this function can cause this watchpoint to trigger again.

@ -33,12 +33,12 @@ def test_command_canary(start_binary, binary, reg_name):
register = getattr(pwndbg.aglib.regs, reg_name)
canary_value, at_random = pwndbg.commands.canary.canary_value()
raw = pwndbg.aglib.memory.pvoid(at_random)
raw = pwndbg.aglib.memory.read_pointer_width(at_random)
mask = pwndbg.aglib.arch.ptrmask ^ 0xFF
masked_raw = raw & mask
tls_addr = pwndbg.commands.canary.find_tls_canary_addr()
raw_tls = pwndbg.aglib.memory.pvoid(tls_addr) & mask
raw_tls = pwndbg.aglib.memory.read_pointer_width(tls_addr) & mask
# Check AT_RANDOM
assert masked_raw == canary_value

Loading…
Cancel
Save