refactor: create ptmalloc2 and jemalloc command categories (#2575)

pull/2581/head
Aaron Adams 1 year ago committed by GitHub
parent ce954f7448
commit 4fa42857ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -47,7 +47,8 @@ class CommandCategory(str, Enum):
START = "Start"
NEXT = "Step/Next/Continue"
CONTEXT = "Context"
HEAP = "Heap"
PTMALLOC2 = "GLibc ptmalloc2 Heap"
JEMALLOC = "jemalloc Heap"
BREAKPOINT = "Breakpoint"
MEMORY = "Memory"
STACK = "Stack"
@ -722,7 +723,7 @@ def load_commands() -> None:
import pwndbg.commands.godbg
import pwndbg.commands.got
import pwndbg.commands.got_tracking
import pwndbg.commands.heap_tracking
import pwndbg.commands.ptmalloc2_tracking
import pwndbg.commands.ida
import pwndbg.commands.ignore
import pwndbg.commands.ipython_interactive
@ -761,10 +762,10 @@ def load_commands() -> None:
import pwndbg.commands.flags
import pwndbg.commands.gdt
import pwndbg.commands.ghidra
import pwndbg.commands.heap
import pwndbg.commands.hex2ptr
import pwndbg.commands.hexdump
import pwndbg.commands.integration
import pwndbg.commands.jemalloc
import pwndbg.commands.leakfind
import pwndbg.commands.linkmap
import pwndbg.commands.memoize
@ -780,6 +781,7 @@ def load_commands() -> None:
import pwndbg.commands.plist
import pwndbg.commands.probeleak
import pwndbg.commands.procinfo
import pwndbg.commands.ptmalloc2
import pwndbg.commands.radare2
import pwndbg.commands.retaddr
import pwndbg.commands.rizin

@ -42,7 +42,7 @@ from pwndbg.commands import CommandCategory
if pwndbg.dbg.is_gdblib_available():
import gdb
import pwndbg.gdblib.heap_tracking
import pwndbg.gdblib.ptmalloc2_tracking
import pwndbg.gdblib.symbol
import pwndbg.ghidra
@ -761,14 +761,16 @@ def context_regs(target=sys.stdout, with_banner=True, width=None):
@serve_context_history
def context_heap_tracker(target=sys.stdout, with_banner=True, width=None):
if not pwndbg.gdblib.heap_tracking.is_enabled():
if not pwndbg.gdblib.ptmalloc2_tracking.is_enabled():
return []
banner = [pwndbg.ui.banner("heap tracker", target=target, width=width, extra="")]
if pwndbg.gdblib.heap_tracking.last_issue is not None:
info = [f"Detected the following potential issue: {pwndbg.gdblib.heap_tracking.last_issue}"]
pwndbg.gdblib.heap_tracking.last_issue = None
if pwndbg.gdblib.ptmalloc2_tracking.last_issue is not None:
info = [
f"Detected the following potential issue: {pwndbg.gdblib.ptmalloc2_tracking.last_issue}"
]
pwndbg.gdblib.ptmalloc2_tracking.last_issue = None
else:
info = ["Nothing to report."]

@ -0,0 +1,99 @@
from __future__ import annotations
import argparse
import pwndbg
import pwndbg.aglib.heap
import pwndbg.aglib.heap.jemalloc as jemalloc
import pwndbg.color.context as C
from pwndbg.color import message
from pwndbg.commands import CommandCategory
parser = argparse.ArgumentParser(
description="Returns extent information for pointer address allocated by jemalloc"
)
parser.add_argument("addr", type=int, help="Address of the allocated memory location")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.JEMALLOC)
def jemalloc_find_extent(addr) -> None:
print(C.banner("Jemalloc find extent"))
print("This command was tested only for jemalloc 5.3.0 and does not support lower versions")
print()
addr = int(addr)
try:
rtree = jemalloc.RTree.get_rtree()
extent = rtree.lookup_hard(addr)
if extent is None:
print(message.error("ERROR: Extent not found"))
return
# print pointer address first, then extent address then extent information
print(f"Pointer Address: {hex(addr)}")
print(f"Extent Address: {hex(extent.extent_address)}")
print()
jemalloc_extent_info(extent.extent_address, header=False)
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return
parser = argparse.ArgumentParser(description="Prints extent information for the given address")
parser.add_argument("addr", type=int, help="Address of the extent metadata")
parser.add_argument(
"-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones."
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.JEMALLOC)
def jemalloc_extent_info(addr, verbose=False, header=True) -> bool:
if header:
print(C.banner("Jemalloc extent info"))
print("This command was tested only for jemalloc 5.3.0 and does not support lower versions")
print()
try:
extent = jemalloc.Extent(int(addr))
print(f"Allocated Address: {hex(extent.allocated_address)}")
print(f"Extent Address: {hex(extent.extent_address)}")
print(f"Size: {hex(extent.size)}")
print(f"Small class: {extent.has_slab}")
print(f"State: {extent.state_name}")
if verbose:
for bit, val in extent.bitfields.items():
print(bit, val)
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return False
return True
parser = argparse.ArgumentParser(description="Prints all extents information")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.JEMALLOC)
def jemalloc_heap() -> None:
print(C.banner("Jemalloc heap"))
print("This command was tested only for jemalloc 5.3.0 and does not support lower versions")
print()
try:
rtree = jemalloc.RTree.get_rtree()
extents = rtree.extents
if len(extents) == 0:
print(message.warn("No extents found"))
return
for extent in extents:
# TODO: refactor so not create copies
if not jemalloc_extent_info(extent.extent_address, header=False):
return
print()
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return

@ -11,7 +11,6 @@ from tabulate import tabulate
import pwndbg
import pwndbg.aglib.heap
import pwndbg.aglib.heap.jemalloc as jemalloc
import pwndbg.aglib.memory
import pwndbg.aglib.proc
import pwndbg.aglib.typeinfo
@ -177,7 +176,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -228,7 +227,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@ -268,7 +267,7 @@ Default to the current thread's arena.""",
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -298,7 +297,7 @@ def arena(addr: int | None = None) -> None:
parser = argparse.ArgumentParser(description="List this process's arenas.")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -365,7 +364,7 @@ Default to the current thread's tcache.""",
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the tcache.")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWithTcache
@pwndbg.commands.OnlyWhenUserspace
@ -394,7 +393,7 @@ def tcache(addr: int | None = None) -> None:
parser = argparse.ArgumentParser(description="Print the mp_ struct's contents.")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -416,7 +415,7 @@ Default to current thread's arena.""",
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -457,7 +456,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -574,7 +573,7 @@ parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of
parser.add_argument("tcache_addr", nargs="?", type=int, default=None, help="Address of the tcache.")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -611,7 +610,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -647,7 +646,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -683,7 +682,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -719,7 +718,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -754,7 +753,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWithTcache
@pwndbg.commands.OnlyWhenUserspace
@ -806,7 +805,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -972,7 +971,7 @@ group.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
@ -1189,7 +1188,7 @@ try_free_parser = argparse.ArgumentParser(
try_free_parser.add_argument("addr", help="Address passed to free")
@pwndbg.commands.ArgparsedCommand(try_free_parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(try_free_parser, category=CommandCategory.PTMALLOC2)
@pwndbg.commands.OnlyWhenHeapIsInitialized
@pwndbg.commands.OnlyWhenUserspace
def try_free(addr: str | int) -> None:
@ -1561,7 +1560,7 @@ parser.add_argument(
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.PTMALLOC2)
def heap_config(filter_pattern: str) -> None:
display_config(filter_pattern, "heap", has_file_command=False)
@ -1570,95 +1569,3 @@ def heap_config(filter_pattern: str) -> None:
"Some config values (e.g. main_arena) will be used only when resolve-heap-via-heuristic is `auto` or `force`"
)
)
# Jemalloc
parser = argparse.ArgumentParser(
description="Returns extent information for pointer address allocated by jemalloc"
)
parser.add_argument("addr", type=int, help="Address of the allocated memory location")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
def jemalloc_find_extent(addr) -> None:
print(C.banner("Jemalloc find extent"))
print("This command was tested only for jemalloc 5.3.0 and does not support lower versions")
print()
addr = int(addr)
try:
rtree = jemalloc.RTree.get_rtree()
extent = rtree.lookup_hard(addr)
if extent is None:
print(message.error("ERROR: Extent not found"))
return
# print pointer address first, then extent address then extent information
print(f"Pointer Address: {hex(addr)}")
print(f"Extent Address: {hex(extent.extent_address)}")
print()
jemalloc_extent_info(extent.extent_address, header=False)
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return
parser = argparse.ArgumentParser(description="Prints extent information for the given address")
parser.add_argument("addr", type=int, help="Address of the extent metadata")
parser.add_argument(
"-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones."
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
def jemalloc_extent_info(addr, verbose=False, header=True) -> bool:
if header:
print(C.banner("Jemalloc extent info"))
print("This command was tested only for jemalloc 5.3.0 and does not support lower versions")
print()
try:
extent = jemalloc.Extent(int(addr))
print(f"Allocated Address: {hex(extent.allocated_address)}")
print(f"Extent Address: {hex(extent.extent_address)}")
print(f"Size: {hex(extent.size)}")
print(f"Small class: {extent.has_slab}")
print(f"State: {extent.state_name}")
if verbose:
for bit, val in extent.bitfields.items():
print(bit, val)
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return False
return True
parser = argparse.ArgumentParser(description="Prints all extents information")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.HEAP)
def jemalloc_heap() -> None:
print(C.banner("Jemalloc heap"))
print("This command was tested only for jemalloc 5.3.0 and does not support lower versions")
print()
try:
rtree = jemalloc.RTree.get_rtree()
extents = rtree.extents
if len(extents) == 0:
print(message.warn("No extents found"))
return
for extent in extents:
# TODO: refactor so not create copies
if not jemalloc_extent_info(extent.extent_address, header=False):
return
print()
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return

@ -4,7 +4,7 @@ import argparse
import pwndbg.chain
import pwndbg.commands
import pwndbg.gdblib.heap_tracking
import pwndbg.gdblib.ptmalloc2_tracking
from pwndbg.commands import CommandCategory
parser = argparse.ArgumentParser(
@ -51,14 +51,16 @@ toggle_break.set_defaults(mode="toggle-break")
def track_heap(mode=None, use_hardware_breakpoints=False):
if mode == "enable":
# Enable the tracker.
pwndbg.gdblib.heap_tracking.install()
pwndbg.gdblib.ptmalloc2_tracking.install()
elif mode == "disable":
# Disable the tracker.
pwndbg.gdblib.heap_tracking.uninstall()
pwndbg.gdblib.ptmalloc2_tracking.uninstall()
elif mode == "toggle-break":
# Delegate to the report function.
pwndbg.gdblib.heap_tracking.stop_on_error = not pwndbg.gdblib.heap_tracking.stop_on_error
if pwndbg.gdblib.heap_tracking.stop_on_error:
pwndbg.gdblib.ptmalloc2_tracking.stop_on_error = (
not pwndbg.gdblib.ptmalloc2_tracking.stop_on_error
)
if pwndbg.gdblib.ptmalloc2_tracking.stop_on_error:
print("The program will stop when the heap tracker detects an error")
else:
print("The heap tracker will only print a message when it detects an error")

@ -2,26 +2,18 @@
line-length = 100
[tool.ruff.lint]
ignore = [
"A003",
"E402",
"E501",
"E731",
"F405",
"F821",
"W505",
]
ignore = ["A003", "E402", "E501", "E731", "F405", "F821", "W505"]
select = [
"A", # flake8-builtins
"E", # pycodestyle
"F", # pyflakes
"W", # pycodestyle
"C4", # flake8-comprehensions
"ISC", # flake8-implicit-str-concat
"SLOT", # flake8-slots
"FLY", # flynt
"PGH", # pygrep-hooks
"A", # flake8-builtins
"E", # pycodestyle
"F", # pyflakes
"W", # pycodestyle
"C4", # flake8-comprehensions
"ISC", # flake8-implicit-str-concat
"SLOT", # flake8-slots
"FLY", # flynt
"PGH", # pygrep-hooks
"RET506", # flake8-return: superfluous-else-raise
"RET507", # flake8-return: superfluous-else-continue
"RET508", # flake8-return: superfluous-else-break
@ -69,14 +61,11 @@ show_error_codes = true
incremental = false
disable_error_code = [
# https://github.com/python/mypy/issues/6232
"assignment"
"assignment",
]
[[tool.mypy.overrides]]
module = [
"pwndbg.gdblib.elf",
"pwndbg.aglib.elf",
]
module = ["pwndbg.gdblib.elf", "pwndbg.aglib.elf"]
disable_error_code = ["name-defined"]
[[tool.mypy.overrides]]
@ -93,7 +82,7 @@ module = [
"pwndbg.aglib.dynamic",
"pwndbg.gdblib.events",
"pwndbg.gdblib.got",
"pwndbg.gdblib.heap_tracking",
"pwndbg.gdblib.ptmalloc2_tracking",
"pwndbg.gdblib.stack",
"pwndbg.aglib.heap.*",
"pwndbg.hexdump",
@ -103,9 +92,7 @@ module = [
disable_error_code = ["attr-defined"]
[[tool.mypy.overrides]]
module = [
"pwndbg.commands.telescope",
]
module = ["pwndbg.commands.telescope"]
disable_error_code = ["attr-defined", "index"]
[[tool.mypy.overrides]]
@ -118,19 +105,37 @@ module = [
disable_error_code = ["name-defined", "attr-defined"]
[[tool.mypy.overrides]]
module = [
"pwndbg.aglib.disasm.*",
]
module = ["pwndbg.aglib.disasm.*"]
disable_error_code = ["index", "name-defined", "attr-defined"]
[[tool.mypy.overrides]]
module = ["capstone.*", "unicorn.*", "pwnlib.*", "ropgadget.*", "elftools.*", "ipdb.*", "r2pipe", "rzpipe", "rich.*", "pt_gdb", "lldb.*", "gnureadline"]
module = [
"capstone.*",
"unicorn.*",
"pwnlib.*",
"ropgadget.*",
"elftools.*",
"ipdb.*",
"r2pipe",
"rzpipe",
"rich.*",
"pt_gdb",
"lldb.*",
"gnureadline",
]
ignore_missing_imports = true
[tool.isort]
profile = "black"
force_single_line = true
known_third_party = ["capstone", "unicorn", "psutil", "pycparser", "gdb", "lldb"]
known_third_party = [
"capstone",
"unicorn",
"psutil",
"pycparser",
"gdb",
"lldb",
]
add_imports = "from __future__ import annotations"
[tool.coverage.run]
@ -150,9 +155,7 @@ description = "Exploit Development and Reverse Engineering with GDB Made Easy"
version = "2024.08.29"
authors = ["Dominik 'disconnect3d' Czarnota <dominik.b.czarnota+dc@gmail.com>"]
readme = "README.md"
packages = [
{ include = "pwndbg" },
]
packages = [{ include = "pwndbg" }]
[tool.poetry.dependencies]
python = "^3.10"
@ -171,9 +174,9 @@ tabulate = "^0.9.0"
typing-extensions = "^4.12.0"
unicorn = "^2.1.1"
requests = "^2.32.3"
pt = {git = "https://github.com/martinradev/gdb-pt-dump", rev = "50227bda0b6332e94027f811a15879588de6d5cb"}
pt = { git = "https://github.com/martinradev/gdb-pt-dump", rev = "50227bda0b6332e94027f811a15879588de6d5cb" }
# Newer versions of bcrypt break NIX. Who need bcrypt: pwntools->paramiko->bcrypt
bcrypt="4.2.0"
bcrypt = "4.2.0"
[tool.poetry.group.lldb]
optional = true
@ -186,7 +189,7 @@ gnureadline = "^8.2.10"
optional = true
[tool.poetry.group.dev.dependencies]
coverage = {version = "^7.5.0", extras = ["toml"]}
coverage = { version = "^7.5.0", extras = ["toml"] }
isort = "^5.13.2"
mypy = "^1.10.0"
# Newer versions of pytest break CI on GitHub

@ -38,7 +38,7 @@ def test_vis_heap_chunk_command(start_binary):
return heap_addr
def hexdump_16B(gdb_symbol):
from pwndbg.commands.heap import bin_ascii
from pwndbg.commands.ptmalloc2 import bin_ascii
first, second = gdb.execute(f"x/16xb {gdb_symbol}", to_string=True).splitlines()
first = [int(v, 16) for v in first.split(":")[1].split("\t")[1:]]

Loading…
Cancel
Save