diff --git a/poetry.lock b/poetry.lock index 9351e2b7f..6962841b7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -822,4 +822,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "7cf6330ebd4f053ab23a4b3ff34cbd248311bde276f3985b32a8a26bede383f0" +content-hash = "f021e5fa44baa43e697de4f60b62eb9730b5c6ed8f58b7f01edf04c1efaa3f9c" diff --git a/pwndbg/commands/__init__.py b/pwndbg/commands/__init__.py index 235aa4aa0..271b36c0e 100644 --- a/pwndbg/commands/__init__.py +++ b/pwndbg/commands/__init__.py @@ -654,6 +654,7 @@ def load_commands() -> None: import pwndbg.commands.got import pwndbg.commands.got_tracking import pwndbg.commands.heap + import pwndbg.commands.heap_tracking import pwndbg.commands.hexdump import pwndbg.commands.ida import pwndbg.commands.ignore diff --git a/pwndbg/commands/context.py b/pwndbg/commands/context.py index 67c57355d..45e2e5092 100644 --- a/pwndbg/commands/context.py +++ b/pwndbg/commands/context.py @@ -21,6 +21,7 @@ import pwndbg.commands.telescope import pwndbg.disasm import pwndbg.gdblib.config import pwndbg.gdblib.events +import pwndbg.gdblib.heap_tracking import pwndbg.gdblib.nearpc import pwndbg.gdblib.regs import pwndbg.gdblib.symbol @@ -79,7 +80,7 @@ config_output = pwndbg.gdblib.config.add_param( ) config_context_sections = pwndbg.gdblib.config.add_param( "context-sections", - "regs disasm code ghidra stack backtrace expressions threads", + "regs disasm code ghidra stack backtrace expressions threads heap-tracker", "which context sections are displayed (controls order)", ) config_max_threads_display = pwndbg.gdblib.config.add_param( @@ -521,6 +522,21 @@ def context_regs(target=sys.stdout, with_banner=True, width=None): return banner + regs if with_banner else regs +def context_heap_tracker(target=sys.stdout, with_banner=True, width=None): + if not pwndbg.gdblib.heap_tracking.is_enabled(): + return [] + + banner = [pwndbg.ui.banner("heap tracker", target=target, width=width, extra="")] + + if pwndbg.gdblib.heap_tracking.last_issue is not None: + info = [f"Detected the following potential issue: {pwndbg.gdblib.heap_tracking.last_issue}"] + pwndbg.gdblib.heap_tracking.last_issue = None + else: + info = ["Nothing to report."] + + return banner + info if with_banner else info + + parser = argparse.ArgumentParser(description="Print out all registers and enhance the information.") parser.add_argument("regs", nargs="*", type=str, default=None, help="Registers to be shown") @@ -977,6 +993,7 @@ context_sections = { "b": context_backtrace, "e": context_expressions, "g": context_ghidra, + "h": context_heap_tracker, "t": context_threads, } diff --git a/pwndbg/commands/heap_tracking.py b/pwndbg/commands/heap_tracking.py new file mode 100644 index 000000000..0c87f8352 --- /dev/null +++ b/pwndbg/commands/heap_tracking.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import argparse + +import pwndbg.chain +import pwndbg.commands +import pwndbg.gdblib.heap_tracking +from pwndbg.commands import CommandCategory + +parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter, + description="""Manages the heap tracker. + +The heap tracker is a module that tracks usage of the GLibc heap and looks for +user errors such as double frees and use after frees. + +Currently, the following errors can be detected: + - Use After Free +""", +) + +subparsers = parser.add_subparsers( + required=True, description="Used to enable, disable and query information about the tracker" +) + +# Subcommand that enables the tracker. +enable = subparsers.add_parser("enable", help="Enable heap tracking") +enable.add_argument( + "-b", + "--hardware-breakpoints", + dest="use_hardware_breakpoints", + type=bool, + default=False, + help="Force the tracker to use hardware breakpoints.", +) +enable.set_defaults(mode="enable") + +# Subcommand that disables the tracker. +disable = subparsers.add_parser("disable", help="Disable heap tracking") +disable.set_defaults(mode="disable") + +# Subcommand that produces a report. +toggle_break = subparsers.add_parser( + "toggle-break", help="Toggles whether possible UAF conditions will pause execution" +) +toggle_break.set_defaults(mode="toggle-break") + + +@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.LINUX, command_name="track-heap") +@pwndbg.commands.OnlyWhenRunning +def track_heap(mode=None, use_hardware_breakpoints=False): + if mode == "enable": + # Enable the tracker. + pwndbg.gdblib.heap_tracking.install() + elif mode == "disable": + # Disable the tracker. + pwndbg.gdblib.heap_tracking.uninstall() + elif mode == "toggle-break": + # Delegate to the report function. + pwndbg.gdblib.heap_tracking.stop_on_error = not pwndbg.gdblib.heap_tracking.stop_on_error + if pwndbg.gdblib.heap_tracking.stop_on_error: + print("The program will stop when the heap tracker detects an error") + else: + print("The heap tracker will only print a message when it detects an error") + else: + raise AssertionError(f"track-heap must never have invalid mode '{mode}'. this is a bug") diff --git a/pwndbg/gdblib/heap_tracking.py b/pwndbg/gdblib/heap_tracking.py new file mode 100644 index 000000000..d99504e0f --- /dev/null +++ b/pwndbg/gdblib/heap_tracking.py @@ -0,0 +1,655 @@ +""" +Heap Tracking + +This module implements runtime tracking of the heap, allowing pwndbg to detect +heap related misbehavior coming from an inferior in real time, which lets us +catch UAF bugs, double frees (and more), and report them to the user. + +# Approach +The approach used starting with using breakpoints to hook into the following +libc symbols: `malloc`, `free`, `calloc`, and `realloc`. Each hook has a +reference to a shared instance of the `Tracker` class, which is responsible for +handling the tracking of the chunks of memory from the heap. + +The tracker keeps two sorted maps of chunks, for freed and in use chunks, keyed +by their base address. Newly allocated chunks are added to the map of in use +chunks right before an allocating call returns, and newly freed chunks are moved +from the map of in use chunks to the map of free ones right before a freeing +call returns. The tracker is also responsible for installing watchpoints for +free chunks when they're added to the free chunk map and deleting them when +their corresponding chunks are removed from the map. + +Additionally, because going through the data structures inside of libc to +determine whether a chunk is free or not is, more often than not, a fairly slow +operation, this module will only do so when it determines its view of the chunks +has diverged from the one in libc in a way that would affect behavior. When such +a diffence is detected, this module will rebuild the chunk maps in the range it +determines to have been affected. + +Currently, the way it does this is by deleting and querying from libc the new +status of all chunks that overlap the region of a new allocation when it detects +that allocation overlaps chunks it previously considered free. + +This approach lets us avoid a lot of the following linked lists that comes with +trying to answer the allocation status of a chunk, by keeping at hand as much +known-good information as possible about them. Keep in mind that, although it is +much faster than going to libc every time we need to know the allocation status +of a chunk, this approach does have drawbacks when it comes to memory usage. + +# Compatibility +Currently module assumes the inferior is using GLibc. + +There are points along the code in this module where the assumptions it makes +are explicitly documented and checked to be valid for the current inferior, so +that it may be immediately clear to the user that something has gone wrong if +they happen to not be valid. However, be aware that there may be assumptions +that were not made explicit. + +""" + +from __future__ import annotations + +import gdb +from sortedcontainers import SortedDict # type: ignore + +import pwndbg.gdblib +from pwndbg.color import message + +LIBC_NAME = "libc.so.6" +MALLOC_NAME = "malloc" +CALLOC_NAME = "calloc" +REALLOC_NAME = "realloc" +FREE_NAME = "free" + +last_issue = None + +# Useful to track possbile collision errors. +PRINT_DEBUG = False + + +def is_enabled() -> bool: + """ + Whether the heap tracker in enabled. + """ + global malloc_enter + global free_enter + + installed = [malloc_enter is not None, free_enter is not None] + + # Make sure we're not in an inconsistent state. + assert all(installed) == any(installed) + + return any(installed) + + +def _basename(val): + """ + Returns the last component of a path. + """ + val.split("/")[-1] + + +def resolve_address(name: str) -> int | None: + """ + Checks whether a given symbol is available and part of libc, and returns its + address. + """ + # If that fails, try to query for it by using the less precise pwndbg API. + address = pwndbg.gdblib.symbol.address(name) + if not address: + # Nothing that we can do here. + return None + + # Try to see if this belongs to libc. + # + # This check is, frankly, horrifying, but it's one of the few ways we can + # check what objfile the address we got is coming from*, and it's better to + # err on the side of caution here and at least attempt to prevent the wrong + # symbol from being used, than to return a possibly wrong symbol and have + # the user wonder why on Earth the heap tracker would be hooking to ld.so. + # + # *: A better way would be to use gdb.objfile_from_address, but that's only + # available in relatively recent versions of GDB. + info = gdb.execute(f"info symbol {address:#x}", to_string=True, from_tty=False) + info = info.split(" of ")[-1].split("/")[-1] + if not info or LIBC_NAME not in info: + print( + message.warn( + f'Found "{name}" that does not seem to belong to {LIBC_NAME}. Refusing to use.' + ) + ) + return None + + return address + + +class FreeChunkWatchpoint(gdb.Breakpoint): + def __init__(self, chunk, tracker): + self.chunk = chunk + self.tracker = tracker + + language = gdb.execute("show language", to_string=True) + if "rust" in language: + loc = f"*({chunk.address:#x} as *mut [u8;{chunk.size}])" + else: + loc = f"*(char[{chunk.size}]*){chunk.address:#x}" + + super().__init__(loc, type=gdb.BP_WATCHPOINT, internal=True) + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + if not in_program_code_stack(): + # Untracked. + return False + + # malloc() and free() implementations will often modify the data in the + # payload of a freed chunk, where the watchpoint is insalled. So, we + # should not flag accesses done as a result of a call to either. + if self.tracker.is_performing_memory_management(): + # We explicitly allow this operation. + return False + + msg = f"Possible use-after-free in {self.chunk.size}-byte chunk at address {self.chunk.address:#x}" + print(f"[!] {msg}") + + global stop_on_error + if stop_on_error: + global last_issue + last_issue = message.error(msg) + return stop_on_error + + +class AllocChunkWatchpoint(gdb.Breakpoint): + def __init__(self, chunk): + self.chunk = chunk + super().__init__(f"*(char[{chunk.size}]*){chunk.address:#x}", internal=True) + + def stop(self): + return False + + +class Chunk: + def __init__(self, address, size, requested_size, flags): + self.address = address + self.size = size + self.requested_size = requested_size + self.flags = flags + + +class Tracker: + def __init__(self): + self.free_chunks = SortedDict() + self.alloc_chunks = SortedDict() + self.free_whatchpoints = dict() + self.memory_management_calls = dict() + + def is_performing_memory_management(self): + thread = gdb.selected_thread().global_num + if thread not in self.memory_management_calls: + return False + else: + return self.memory_management_calls[thread] + + def enter_memory_management(self, name): + thread = gdb.selected_thread().global_num + + # We don't support re-entry. + if thread in self.memory_management_calls: + assert not self.memory_management_calls[ + thread + ], f"in {name}(): re-entrant calls are not supported" + + self.memory_management_calls[thread] = True + + def exit_memory_management(self): + thread = gdb.selected_thread().global_num + + # Make sure we're not doing anything wrong. + if thread in self.memory_management_calls: + assert self.memory_management_calls[thread] + + self.memory_management_calls[thread] = False + + def malloc(self, chunk): + # malloc()s may arbitrarily change the structure of freed blocks, to the + # point our chunk maps may become invalid, so, we update them here if + # anything looks wrong. + lo_i = self.free_chunks.bisect_right(chunk.address) + hi_i = self.free_chunks.bisect_right(chunk.address + chunk.size) + if lo_i > 0: + left_chunk = self.free_chunks.peekitem(index=lo_i - 1)[1] + if left_chunk.address + left_chunk.size >= chunk.address: + # Include the element to the left in the update. + lo_i -= 1 + + try: + if lo_i != hi_i: + # The newly registered chunk overlaps with chunks we had registered + # previously, which means our libc shuffled some things around and + # so we need to update our view of the chunks. + lo_chunk = self.free_chunks.peekitem(index=lo_i)[1] + hi_chunk = self.free_chunks.peekitem(index=hi_i - 1)[1] + + lo_addr = lo_chunk.address + hi_addr = hi_chunk.address + hi_chunk.size + + lo_heap = pwndbg.heap.ptmalloc.Heap(lo_addr) + hi_heap = pwndbg.heap.ptmalloc.Heap(hi_addr - 1) + + # TODO: Can this ever actually fail in real world use? + # + # It shouldn't be possible, the way glibc implements it[0], to have + # a contiguous range at time t+1 that overlaps with two or more + # contiguous ranges that at time t belonged to different heaps. + # + # glibc doesn't move or resize its heaps, which means the boundaries + # between them stay fixed, and, since a chunk can only be created + # from slicing a heap, the heap used to create the chunk at t+1 must + # be the same as the one used to create the ranges at t that it + # overlaps with. + # + # The question is, if we were to support other implementations, we + # couldn't take this behavior for granted. Regardless, if we ever + # do, it's better to fail here if/when this assumption is violated + # than to let it become a bug. + # + # [0]: https://sourceware.org/glibc/wiki/MallocInternals + assert lo_heap.start == hi_heap.start and lo_heap.end == hi_heap.end + + # Remove all of our old handlers. + for i in reversed(range(lo_i, hi_i)): + addr, ch = self.free_chunks.popitem(index=i) + + self.free_whatchpoints[addr].delete() + del self.free_whatchpoints[addr] + + # Add new handlers in their place. We scan over all of the chunks in + # the heap in the range of affected chunks, and add the ones that + # are free. + allocator = pwndbg.heap.current + bins_list = [ + allocator.fastbins(lo_heap.arena.address), + allocator.smallbins(lo_heap.arena.address), + allocator.largebins(lo_heap.arena.address), + allocator.unsortedbin(lo_heap.arena.address), + ] + if allocator.has_tcache(): + bins_list.append(allocator.tcachebins(None)) + bins_list = [x for x in bins_list if x is not None] + + for ch in lo_heap: + # Check for range overlap. + ch_lo_addr = ch.address + ch_hi_addr = ch.address + ch.size + ch.address + + for ch in lo_heap: + # Check for range overlap. + ch_lo_addr = ch.address + ch_hi_addr = ch.address + ch.size + + lo_in_range = ch_lo_addr < hi_addr + hi_in_range = ch_hi_addr > lo_addr + + if not lo_in_range or not hi_in_range: + # No range overlap. + continue + + # Check if the chunk is free. + found = False + for b in bins_list: + if b.contains_chunk(ch.real_size, ch.address): + # The chunk is free. Add it to the free list and install + # a new watch point for it. + nch = Chunk(ch.address, ch.size, ch.real_size, 0) + wp = FreeChunkWatchpoint(nch, self) + + self.free_chunks[ch.address] = nch + self.free_whatchpoints[ch.address] = wp + + # Move on to the next chunk. + found = True + break + except IndexError as e: + import traceback + + traceback.print_exc() + + self.alloc_chunks[chunk.address] = chunk + + def free(self, address) -> bool: + if address not in self.alloc_chunks: + return False + chunk = self.alloc_chunks[address] + del self.alloc_chunks[address] + + wp = FreeChunkWatchpoint(chunk, self) + + self.free_chunks[chunk.address] = chunk + self.free_whatchpoints[chunk.address] = wp + + return True + + +class MallocEnterBreakpoint(gdb.Breakpoint): + def __init__(self, address, tracker): + super().__init__(f"*{address:#x}", internal=True) + self.tracker = tracker + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + requested_size = pwndbg.arguments.argument(0) + self.tracker.enter_memory_management(MALLOC_NAME) + AllocExitBreakpoint(self.tracker, requested_size, f"malloc({requested_size})") + return False + + +class CallocEnterBreakpoint(gdb.Breakpoint): + def __init__(self, address, tracker): + super().__init__(f"*{address:#x}", internal=True) + self.tracker = tracker + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + + num_elements = pwndbg.arguments.argument(0) + element_size = pwndbg.arguments.argument(1) + requested_size = element_size * num_elements + + self.tracker.enter_memory_management(CALLOC_NAME) + AllocExitBreakpoint(self.tracker, requested_size, f"calloc({num_elements}, {element_size})") + return False + + +def get_chunk(address, requested_size): + """ + Reads a chunk from a given address. + """ + ty = pwndbg.gdblib.typeinfo.ppvoid + size = int(pwndbg.gdblib.memory.poi(ty, address - ty.sizeof)) + + # GLibc bakes the chunk flags in the lowest 3 bits of the size value, + # so, we separate them here. + FLAGS_BITMASK = 7 + + flags = size & 7 + size ^= flags + + return Chunk(address, size, requested_size, flags) + + +class AllocExitBreakpoint(gdb.FinishBreakpoint): + def __init__(self, tracker, requested_size, name): + super().__init__(internal=True) + self.requested_size = requested_size + self.tracker = tracker + self.name = name + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + if not in_program_code_stack(): + # Untracked. + self.tracker.exit_memory_management() + return False + + ret_ptr = int(self.return_value) + if ret_ptr == 0: + # No change. + self.tracker.exit_memory_management() + return False + + chunk = get_chunk(ret_ptr, self.requested_size) + self.tracker.malloc(chunk) + print(f"[*] {self.name} -> {ret_ptr:#x}, {chunk.size} bytes real size") + + self.tracker.exit_memory_management() + return False + + def out_of_scope(self): + print( + message.warn( + f"warning: could not follow allocation request of {self.requested_size} bytes" + ) + ) + self.tracker.exit_memory_management() + + +class ReallocEnterBreakpoint(gdb.Breakpoint): + def __init__(self, address, tracker): + super().__init__(f"*{address:#x}", internal=True) + self.tracker = tracker + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + + freed_pointer = pwndbg.arguments.argument(0) + requested_size = pwndbg.arguments.argument(1) + + self.tracker.enter_memory_management(REALLOC_NAME) + ReallocExitBreakpoint(self.tracker, freed_pointer, requested_size) + return False + + +class ReallocExitBreakpoint(gdb.FinishBreakpoint): + def __init__(self, tracker, freed_ptr, requested_size): + super().__init__(internal=True) + self.freed_ptr = freed_ptr + self.requested_size = requested_size + self.tracker = tracker + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + if not in_program_code_stack(): + # Untracked. + self.tracker.exit_memory_management() + return False + + # Figure out what the reallocated pointer is. + ret_ptr = int(self.return_value) + if ret_ptr == 0: + # No change. + malloc = None + chunk = get_chunk(ret_ptr, self.requested_size) + malloc = lambda: self.tracker.malloc(chunk) + + if not self.tracker.free(self.freed_ptr): + # This is a chunk we'd never seen before. + malloc() + self.tracker.exit_memory_management() + + msg = f"realloc() to {self.requested_size} bytes with previously unknown pointer {self.freed_ptr:#x}" + print(f"[!] {msg}") + + global stop_on_error + if stop_on_error: + global last_issue + last_issue = message.error(msg) + return stop_on_error + + malloc() + self.tracker.exit_memory_management() + + print( + f"[*] realloc({self.freed_ptr:#x}, {self.requested_size}) -> {ret_ptr:#x}, {chunk.size} bytes real size" + ) + return False + + def out_of_scope(self): + print(message.warn(f"warning: could not follow free request for chunk {self.ptr:#x}")) + self.tracker.exit_memory_management() + + +class FreeEnterBreakpoint(gdb.Breakpoint): + def __init__(self, address, tracker): + super().__init__(f"*{address:#x}", internal=True) + self.tracker = tracker + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + ptr = pwndbg.arguments.argument(0) + + self.tracker.enter_memory_management(FREE_NAME) + FreeExitBreakpoint(self.tracker, ptr) + return False + + +class FreeExitBreakpoint(gdb.FinishBreakpoint): + def __init__(self, tracker, ptr): + super().__init__(internal=True) + self.ptr = ptr + self.tracker = tracker + + def stop(self): + pwndbg.lib.cache.clear_cache("stop") + if not in_program_code_stack(): + # Untracked. + self.tracker.exit_memory_management() + return False + + if not self.tracker.free(self.ptr): + # This is a chunk we'd never seen before. + self.tracker.exit_memory_management() + + msg = f"free() with previously unknown pointer {self.ptr:#x}" + print(f"[!] {msg}") + global stop_on_error + if stop_on_error: + global last_issue + last_issue = message.error(msg) + return stop_on_error + + self.tracker.exit_memory_management() + + print(f"[*] free({self.ptr:#x})") + return False + + def out_of_scope(self): + print(message.warn(f"warning: could not follow free request for chunk {self.ptr:#x}")) + self.tracker.exit_memory_management() + + +def in_program_code_stack(): + exe = pwndbg.gdblib.proc.exe + binary_exec_page_ranges = tuple( + (p.start, p.end) for p in pwndbg.gdblib.vmmap.get() if p.objfile == exe and p.execute + ) + + frame = gdb.newest_frame() + while frame is not None: + pc = frame.pc() + for start, end in binary_exec_page_ranges: + if start <= pc < end: + return True + frame = frame.older() + return False + + +# These variables track the currently installed heap tracker. +malloc_enter = None +calloc_enter = None +realloc_enter = None +free_enter = None + +# Whether the inferior should be stopped when an error is detected. +stop_on_error = True + + +def install(disable_hardware_whatchpoints=True): + global malloc_enter + global calloc_enter + global realloc_enter + global free_enter + + if is_enabled(): + print("Nothing to do.") + return + + # Make sure the required functions are available. + required_symbols = [MALLOC_NAME, FREE_NAME] + available = [resolve_address(name) for name in required_symbols] + + if not all(available): + print(message.error("The following required symbols are not available:")) + for name in (x[0] for x in zip(required_symbols, available) if not x[1]): + print(message.error(f" - {name}")) + print(message.error(f"Make sure {LIBC_NAME} has already been loaded.")) + + return + + # Warn our users that this is still an experimental feature and that due to + # limitations in how GDB handles breakpoint creation and deletion during + # processing of stop events for other breakpoints, there's not a lot we can + # do about it currently. + # + # See https://sourceware.org/pipermail/gdb/2024-January/051062.html + print( + message.warn( + "This feature is experimental and is known to report false positives, take the" + ) + ) + print(message.warn("diagnostics it procudes with a grain of salt. Use at your own risk.")) + print() + + # Disable hardware watchpoints. + # + # We don't really know how to make sure that the hardware watchpoints + # present in the system have enough capabilities for them to be useful to + # us in this module, seeing as what they can do varies considerably between + # systems and failures are fairly quiet and, thus, hard to detect[1]. + # Because of this, we opt to disable them by default for the sake of + # consistency and so that we don't have to chase silent failures. + # + # [1]: https://sourceware.org/gdb/onlinedocs/gdb/Set-Watchpoints.html + if disable_hardware_whatchpoints: + gdb.execute("set can-use-hw-watchpoints 0") + print("Hardware watchpoints have been disabled. Please do not turn them back on until") + print("heap tracking is disabled, as it may lead to unexpected silent errors.") + print() + print("They may be re-enabled with `set can-use-hw-watchpoints 1`") + print() + else: + print( + message.warn("Hardware watchpoints have not been disabled, silent errors may happen.") + ) + print() + + # Install the heap tracker. + tracker = Tracker() + + malloc_enter = MallocEnterBreakpoint(available[0], tracker) + free_enter = FreeEnterBreakpoint(available[1], tracker) + + calloc_address = resolve_address(CALLOC_NAME) + if calloc_address: + calloc_enter = CallocEnterBreakpoint(calloc_address, tracker) + + realloc_address = resolve_address(REALLOC_NAME) + if realloc_address: + realloc_enter = ReallocEnterBreakpoint(realloc_address, tracker) + + print("Heap tracker installed.") + + +def uninstall(): + global malloc_enter + global calloc_enter + global realloc_enter + global free_enter + + if is_enabled(): + malloc_enter.delete() + free_enter.delete() + + malloc_enter = None + free_enter = None + + if calloc_enter is not None: + calloc_enter.delete() + calloc_enter = None + if realloc_enter is not None: + realloc_enter.delete() + realloc_enter = None + + print("Heap tracker removed.") + else: + print("Nothing to do.") diff --git a/pwndbg/gdblib/nearpc.py b/pwndbg/gdblib/nearpc.py index 8eef72e46..cd66d835a 100644 --- a/pwndbg/gdblib/nearpc.py +++ b/pwndbg/gdblib/nearpc.py @@ -176,6 +176,84 @@ def nearpc(pc=None, lines=None, emulate=False, repeat=False) -> list[str]: first_pc = False should_highlight_opcodes = True + # If this instruction performs a memory access operation, we should tell + # the user anything we can figure out about the memory it's trying to + # access. + mem_access = "" + if instr.address == pc and False: + accesses = [] + for operand in instr.operands: + if operand.type != CS_OP_MEM: + continue + address = operand.mem.disp + + base = operand.mem.base + if base > 0: + address += pwndbg.gdblib.regs[instr.reg_name(base)] + + vmmap = pwndbg.gdblib.vmmap.get() + page = next((page for page in vmmap if address in page), None) + if page is None: + # This is definetly invalid. Don't even bother checking + # any other conditions. + accesses.append(f"[X] {address:#x}") + continue + + if operand.access == CS_AC_READ and not page.read: + # Tried to read from a page we can't read. + accesses.append(f"[X] {address:#x}") + continue + if operand.access == CS_AC_WRITE and not page.write: + # Tried to write to a page we can't write. + accesses.append(f"[X] {address:#x}") + continue + + # At this point, we know the operation is legal, but we don't + # know where it's going yet. It could be going to either memory + # managed by libc or memory managed by the program itself. + + if not pwndbg.heap.current.is_initialized(): + # The libc heap hasn't been initialized yet. There's not a + # lot that we can say beyond this point. + continue + allocator = pwndbg.heap.current + + heap = pwndbg.heap.ptmalloc.Heap(address) + chunk = None + for ch in heap: + # Find the chunk in this heap the corresponds to the address + # we're trying to access. + offset = address - ch.address + if offset >= 0 and offset < ch.real_size: + chunk = ch + break + if chunk is None: + # The memory for this chunk is not managed by libc. We can't + # reason about it. + accesses.append(f"[?] {address:#x}") + continue + + # Scavenge through all of the bins in the current allocator. + # Bins track free chunks, so, whether or not we can find the + # chunk we're trying to access in a bin will tells us whether + # this access is a UAF. + bins_list = [ + allocator.fastbins(chunk.arena.address), + allocator.smallbins(chunk.arena.address), + allocator.largebins(chunk.arena.address), + allocator.unsortedbin(chunk.arena.address), + ] + if allocator.has_tcache(): + bins_list.append(allocator.tcachebins(None)) + + bins_list = [x for x in bins_list if x is not None] + for bins in bins_list: + if bins.contains_chunk(chunk.real_size, chunk.address): + # This chunk is free. This is a UAF. + accesses.append(f"[UAF] {address:#x}") + continue + mem_access = " ".join(accesses) + opcodes = "" if show_opcode_bytes > 0: opcodes = (opcode_separator_bytes * " ").join( @@ -193,7 +271,7 @@ def nearpc(pc=None, lines=None, emulate=False, repeat=False) -> list[str]: if should_highlight_opcodes: opcodes = C.highlight(opcodes) should_highlight_opcodes = False - line = " ".join(filter(None, (prefix, address_str, opcodes, symbol, asm))) + line = " ".join(filter(None, (prefix, address_str, opcodes, symbol, asm, mem_access))) # If there was a branch before this instruction which was not # contiguous, put in some ellipses. diff --git a/pyproject.toml b/pyproject.toml index eeb3a297a..925659c18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -265,6 +265,7 @@ pycparser = "2.21" pyelftools = "0.29" pygments = "2.15.0" ropgadget = "7.2" +sortedcontainers = "2.4.0" tabulate = "0.9.0" typing-extensions = "4.6.1" unicorn = "2.0.1.post1" diff --git a/tests/gdb-tests/tests/test_context_commands.py b/tests/gdb-tests/tests/test_context_commands.py index c1997971a..8620b3f2d 100644 --- a/tests/gdb-tests/tests/test_context_commands.py +++ b/tests/gdb-tests/tests/test_context_commands.py @@ -81,7 +81,7 @@ def test_empty_context_sections(start_binary, sections): start_binary(USE_FDS_BINARY) # Sanity check - default_ctx_sects = "regs disasm code ghidra stack backtrace expressions threads" + default_ctx_sects = "regs disasm code ghidra stack backtrace expressions threads heap-tracker" assert pwndbg.gdblib.config.context_sections.value == default_ctx_sects assert gdb.execute("context", to_string=True) != ""