diff --git a/pwndbg/commands/heap.py b/pwndbg/commands/heap.py index 9972be07f..ab26c2ff8 100644 --- a/pwndbg/commands/heap.py +++ b/pwndbg/commands/heap.py @@ -13,9 +13,11 @@ import pwndbg.lib.heap.helpers from pwndbg.color import generateColorFunction from pwndbg.color import message from pwndbg.commands.config import display_config +from pwndbg.heap.ptmalloc import Arena from pwndbg.heap.ptmalloc import Bins from pwndbg.heap.ptmalloc import BinType from pwndbg.heap.ptmalloc import Chunk +from pwndbg.heap.ptmalloc import Heap def read_chunk(addr): @@ -127,44 +129,18 @@ def heap(addr=None, verbose=False, simple=False): active heap. """ allocator = pwndbg.heap.current - heap_region = pwndbg.heap.ptmalloc.Heap(addr) - arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena() - top_chunk = arena["top"] - ptr_size = allocator.size_sz - - # Store the heap base address in a GDB variable that can be used in other - # GDB commands - gdb.execute("set $heap_base=0x{:x}".format(heap_region.start)) - # Calculate where to start printing; if an address was supplied, use that, - # if this heap belongs to the main arena, start at the beginning of the - # heap's mapping, otherwise, compensate for the presence of a heap_info - # struct and possibly an arena. - if addr: - cursor = int(addr) + if addr is not None: + chunk = Chunk(addr) + while chunk is not None: + malloc_chunk(chunk.address) + chunk = chunk.next_chunk() else: - cursor = heap_region.start - - # i686 alignment heuristic - first_chunk_size = pwndbg.gdblib.arch.unpack( - pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size) - ) - if first_chunk_size == 0: - cursor += ptr_size * 2 - - while cursor in heap_region: - malloc_chunk(cursor, verbose=verbose, simple=simple) - - if cursor == top_chunk: - break - - size_field = pwndbg.gdblib.memory.u(cursor + allocator.chunk_key_offset("size")) - real_size = size_field & ~allocator.malloc_align_mask - cursor += real_size + arena = allocator.thread_arena + h = arena.active_heap - # Avoid an infinite loop when a chunk's size is 0. - if real_size == 0: - break + for chunk in h: + malloc_chunk(chunk.address) parser = argparse.ArgumentParser() @@ -179,8 +155,13 @@ parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of def arena(addr=None): """Print the contents of an arena, default to the current thread's arena.""" allocator = pwndbg.heap.current - arena = allocator.get_arena(addr) - print(arena) + + if addr is not None: + arena = Arena(addr) + else: + arena = allocator.thread_arena + + print(arena._gdbValue) # Breaks encapsulation, find a better way. parser = argparse.ArgumentParser() @@ -247,12 +228,13 @@ def top_chunk(addr=None): current thread's arena. """ allocator = pwndbg.heap.current - arena = allocator.get_arena(addr) - address = arena["top"] - size = pwndbg.gdblib.memory.u(int(address) + allocator.chunk_key_offset("size")) - out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format(M.get(address), size) - print(out) + if addr is not None: + arena = Arena(addr) + else: + arena = allocator.thread_arena + + malloc_chunk(arena.top) parser = argparse.ArgumentParser() @@ -275,10 +257,9 @@ parser.add_argument( @pwndbg.commands.OnlyWhenHeapIsInitialized def malloc_chunk(addr, fake=False, verbose=False, simple=False): """Print a malloc_chunk struct's contents.""" - chunk = Chunk(addr) - allocator = pwndbg.heap.current - ptr_size = allocator.size_sz + + chunk = Chunk(addr) headers_to_print = [] # both state (free/allocated) and flags fields_to_print = set() # in addition to addr and size @@ -300,7 +281,7 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False): if chunk.is_top_chunk: headers_to_print.append(message.off("Top chunk")) - if not chunk.is_top_chunk: + if not chunk.is_top_chunk and arena: bins_list = [ allocator.fastbins(arena.address) or {}, @@ -644,35 +625,22 @@ parser.add_argument( def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None): """Visualize chunks on a heap, default to the current arena's active heap.""" allocator = pwndbg.heap.current - heap_region = allocator.get_heap_boundaries(addr) - arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena() - top_chunk = arena["top"] + if addr is not None: + cursor = int(addr) + heap_region = Heap(cursor) + arena = heap_region.arena + else: + arena = allocator.thread_arena + heap_region = arena.active_heap + cursor = heap_region.start + ptr_size = allocator.size_sz # Build a list of addresses that delimit each chunk. chunk_delims = [] - if addr: - cursor = int(addr) - elif arena == allocator.main_arena: - cursor = heap_region.start - else: - cursor = heap_region.start + allocator.heap_info.sizeof - if pwndbg.gdblib.vmmap.find(allocator.get_heap(heap_region.start)["ar_ptr"]) == heap_region: - # Round up to a 2-machine-word alignment after an arena to - # compensate for the presence of the have_fastchunks variable - # in GLIBC versions >= 2.27. - cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask - - # Check if there is an alignment at the start of the heap, adjust if necessary. - if not addr: - first_chunk_size = pwndbg.gdblib.arch.unpack( - pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size) - ) - if first_chunk_size == 0: - cursor += ptr_size * 2 - cursor_backup = cursor + chunk = Chunk(cursor) for _ in range(count + 1): # Don't read beyond the heap mapping if --naive or corrupted heap. @@ -680,24 +648,21 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None): chunk_delims.append(heap_region.end) break - size_field = pwndbg.gdblib.memory.u(cursor + ptr_size) - real_size = size_field & ~allocator.malloc_align_mask - prev_inuse = allocator.chunk_flags(size_field)[0] - # Don't repeatedly operate on the same address (e.g. chunk size of 0). if cursor in chunk_delims or cursor + ptr_size in chunk_delims: break - if prev_inuse: + if chunk.prev_inuse: chunk_delims.append(cursor + ptr_size) else: chunk_delims.append(cursor) - if (cursor == top_chunk and not naive) or (cursor == heap_region.end - ptr_size * 2): + if (chunk.is_top_chunk and not naive) or (cursor == heap_region.end - ptr_size * 2): chunk_delims.append(cursor + ptr_size * 2) break - cursor += real_size + cursor += chunk.real_size + chunk = Chunk(cursor) # Build the output buffer, changing color at each chunk delimiter. # TODO: maybe print free chunks in bold or underlined @@ -727,6 +692,7 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None): labels = [] cursor = cursor_backup + chunk = Chunk(cursor) has_huge_chunk = False # round up to align with 4*ptr_size and get half @@ -768,7 +734,7 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None): printed += 1 labels.extend(bin_labels(cursor, bin_collections)) - if cursor == top_chunk: + if cursor == arena.top: labels.append("Top chunk") asc += bin_ascii(pwndbg.gdblib.memory.read(cursor, ptr_size)) @@ -847,7 +813,7 @@ def try_free(addr): # constants allocator = pwndbg.heap.current - arena = allocator.get_arena() + arena = allocator.thread_arena aligned_lsb = allocator.malloc_align_mask.bit_length() size_sz = allocator.size_sz @@ -973,7 +939,7 @@ def try_free(addr): print(message.notice("Fastbin checks")) chunk_fastbin_idx = allocator.fastbin_index(chunk_size_unmasked) fastbin_list = ( - allocator.fastbins(int(arena.address)) + allocator.fastbins(arena.address) .bins[(chunk_fastbin_idx + 2) * (ptr_size * 2)] .fd_chain ) @@ -993,10 +959,10 @@ def try_free(addr): # next chunk's size is big enough and small enough next_chunk_size = unsigned_size(next_chunk["size"]) - if next_chunk_size <= 2 * size_sz or chunksize(next_chunk_size) >= int(arena["system_mem"]): + if next_chunk_size <= 2 * size_sz or chunksize(next_chunk_size) >= arena.system_mem: err = "free(): invalid next size (fast) -> next chunk's size not in [2*size_sz; av->system_mem]\n" err += " next chunk's size is 0x{:x}, 2*size_sz is 0x{:x}, system_mem is 0x{:x}" - err = err.format(next_chunk_size, 2 * size_sz, int(arena["system_mem"])) + err = err.format(next_chunk_size, 2 * size_sz, arena.system_mem) print(message.error(err)) errors_found += 1 @@ -1044,21 +1010,21 @@ def try_free(addr): print(message.notice("Not mapped checks")) # chunks is not top chunk - if addr == int(arena["top"]): + if addr == arena.top: err = "double free or corruption (top) -> chunk is top chunk" print(message.error(err)) errors_found += 1 # next chunk is not beyond the boundaries of the arena NONCONTIGUOUS_BIT = 2 - top_chunk_addr = int(arena["top"]) + top_chunk_addr = arena.top top_chunk = read_chunk(top_chunk_addr) next_chunk_addr = addr + chunk_size_unmasked # todo: in libc, addition may overflow - if ( - arena["flags"] & NONCONTIGUOUS_BIT == 0 - ) and next_chunk_addr >= top_chunk_addr + chunksize(top_chunk["size"]): + if (arena.flags & NONCONTIGUOUS_BIT == 0) and next_chunk_addr >= top_chunk_addr + chunksize( + top_chunk["size"] + ): err = "double free or corruption (out) -> next chunk is beyond arena and arena is contiguous\n" err += "next chunk at 0x{:x}, end of arena at 0x{:x}" err = err.format( @@ -1084,10 +1050,10 @@ def try_free(addr): errors_found += 1 # next chunk's size is big enough and small enough - if next_chunk_size <= 2 * size_sz or next_chunk_size >= int(arena["system_mem"]): + if next_chunk_size <= 2 * size_sz or next_chunk_size >= arena.system_mem: err = "free(): invalid next size (normal) -> next chunk's size not in [2*size_sz; system_mem]\n" err += "next chunk's size is 0x{:x}, 2*size_sz is 0x{:x}, system_mem is 0x{:x}" - err = err.format(next_chunk_size, 2 * size_sz, int(arena["system_mem"])) + err = err.format(next_chunk_size, 2 * size_sz, arena.system_mem) print(message.error(err)) errors_found += 1 @@ -1145,7 +1111,7 @@ def try_free(addr): print(message.notice("Clearing next chunk's P bit")) # unsorted bin fd->bk should be unsorted bean - unsorted_addr = int(arena["bins"][0]) + unsorted_addr = int(arena.bins[0]) try: unsorted = read_chunk(unsorted_addr) try: diff --git a/pwndbg/constants/ptmalloc.py b/pwndbg/constants/ptmalloc.py index cef1f4148..dbb2a46fb 100644 --- a/pwndbg/constants/ptmalloc.py +++ b/pwndbg/constants/ptmalloc.py @@ -3,6 +3,7 @@ PREV_INUSE = 1 IS_MMAPPED = 2 NON_MAIN_ARENA = 4 SIZE_BITS = PREV_INUSE | IS_MMAPPED | NON_MAIN_ARENA +NONCONTIGUOUS_BIT = 2 NBINS = 128 NSMALLBINS = 64 diff --git a/pwndbg/heap/ptmalloc.py b/pwndbg/heap/ptmalloc.py index ef1ccbfc6..95636e1db 100644 --- a/pwndbg/heap/ptmalloc.py +++ b/pwndbg/heap/ptmalloc.py @@ -1,3 +1,4 @@ +import copy import importlib from collections import OrderedDict from enum import Enum @@ -134,11 +135,12 @@ class Chunk: "_bk", "_fd_nextsize", "_bk_nextsize", + "_heap", "_arena", "_is_top_chunk", ) - def __init__(self, addr, arena=None): + def __init__(self, addr, heap=None, arena=None): if isinstance(pwndbg.heap.current.malloc_chunk, gdb.Type): self._gdbValue = pwndbg.gdblib.memory.poi(pwndbg.heap.current.malloc_chunk, addr) else: @@ -155,6 +157,7 @@ class Chunk: self._bk = None self._fd_nextsize = None self._bk_nextsize = None + self._heap = heap self._arena = arena self._is_top_chunk = None @@ -196,8 +199,7 @@ class Chunk: if self._real_size is None: try: self._real_size = int( - self._gdbValue[self.__match_renamed_field("size")] - & ~(ptmalloc.NON_MAIN_ARENA | ptmalloc.IS_MMAPPED | ptmalloc.PREV_INUSE) + self._gdbValue[self.__match_renamed_field("size")] & ~(ptmalloc.SIZE_BITS) ) except gdb.MemoryError: pass @@ -283,18 +285,17 @@ class Chunk: return self._bk_nextsize + @property + def heap(self): + if self._heap is None: + self._heap = Heap(self.address) + + return self._heap + @property def arena(self): if self._arena is None: - try: - ar_ptr = pwndbg.heap.current.get_heap(self.address)["ar_ptr"] - ar_ptr.fetch_lazy() - except Exception: - ar_ptr = None - if ar_ptr is not None and ar_ptr in (ar.address for ar in pwndbg.heap.current.arenas): - self._arena = Arena(ar_ptr) - else: - self._arena = Arena(pwndbg.heap.current.main_arena.address) + self._arena = self.heap.arena return self._arena @@ -309,90 +310,160 @@ class Chunk: return self._is_top_chunk + def next_chunk(self): + if self.is_top_chunk: + return None + + if self.real_size == 0: + return None + + next = Chunk(self.address + self.real_size, arena=self.arena) + if pwndbg.gdblib.memory.peek(next.address): + return next + else: + return None + class Heap: - def __init__(self, addr=None): + __slots__ = ( + "_gdbValue", + "arena", + "_memory_region", + "start", + "end", + "_prev", + "first_chunk", + ) + + def __init__(self, addr, arena=None): + """Build a Heap object given an address on that heap. + Heap regions are treated differently depending on their arena: + 1) main_arena - uses the sbrk heap + 2) non-main arena - heap starts after its heap_info struct (and possibly an arena) + 3) non-contiguous main_arena - just a memory region + 4) no arena - for fake/mmapped chunks + """ allocator = pwndbg.heap.current - if addr is not None: + main_arena = allocator.main_arena + + sbrk_region = allocator.get_sbrk_heap_region() + if addr in sbrk_region: + # Case 1; main_arena. + self.arena = main_arena if arena is None else arena + self._memory_region = sbrk_region + self._gdbValue = None + else: + heap_region = allocator.get_region(addr) + heap_info = allocator.get_heap(addr) try: - # Can fail if any part of the struct is unmapped (due to corruption/fake struct). - ar_ptr = allocator.get_heap(addr)["ar_ptr"] - ar_ptr.fetch_lazy() - except Exception: + ar_ptr = int(heap_info["ar_ptr"]) + except gdb.MemoryError: ar_ptr = None - # This is probably a non-main arena if a legitimate ar_ptr exists. if ar_ptr is not None and ar_ptr in (ar.address for ar in allocator.arenas): - self.arena = Arena(ar_ptr) + # Case 2; non-main arena. + self.arena = Arena(ar_ptr) if arena is None else arena + start = heap_region.start + allocator.heap_info.sizeof + if ar_ptr in heap_region: + start += pwndbg.lib.memory.align_up( + allocator.malloc_state.sizeof, allocator.malloc_alignment + ) + + heap_region.memsz = heap_region.end - start + heap_region.vaddr = start + self._memory_region = heap_region + self._gdbValue = heap_info + elif main_arena.non_contiguous: + # Case 3; non-contiguous main_arena. + self.arena = main_arena if arena is None else arena + self._memory_region = heap_region + self._gdbValue = None else: - # Use the main arena as a fallback - self.arena = Arena(allocator.main_arena.address) - else: - # Get the thread arena under default conditions. - self.arena = Arena(allocator.get_arena().address) + # Case 4; fake/mmapped chunk + self.arena = None + self._memory_region = heap_region + self._gdbValue = None - if self.arena.address == allocator.main_arena.address: - self.is_main_arena_heap = True - else: - self.is_main_arena_heap = False + self.start = self._memory_region.start + self.end = self._memory_region.end + self.first_chunk = Chunk(self.start) - heap_region = allocator.get_heap_boundaries(addr) - if not self.is_main_arena_heap: - page = pwndbg.lib.memory.Page(0, 0, 0, 0) - page.vaddr = heap_region.start + allocator.heap_info.sizeof - if ( - pwndbg.gdblib.vmmap.find(allocator.get_heap(heap_region.start)["ar_ptr"]) - == heap_region - ): - page.vaddr += ( - allocator.malloc_state.sizeof + allocator.size_sz - ) & ~allocator.malloc_align_mask - page.memsz = heap_region.end - page.start - heap_region = page + self._prev = None + + @property + def prev(self): + if self._prev is None and self._gdbValue is not None: + try: + self._prev = int(self._gdbValue["prev"]) + except gdb.MemoryError: + pass - self.start = heap_region.start - self.end = heap_region.end + return self._prev + + def __iter__(self): + iter_chunk = self.first_chunk + while iter_chunk is not None: + yield iter_chunk + iter_chunk = iter_chunk.next_chunk() def __contains__(self, addr: int) -> bool: return self.start <= addr < self.end + def __str__(self): + fmt = "[%%%ds]" % (pwndbg.gdblib.arch.ptrsize * 2) + return message.hint(fmt % (hex(self.first_chunk.address))) + M.heap( + str(pwndbg.gdblib.vmmap.find(self.start)) + ) + -# https://bazaar.launchpad.net/~ubuntu-branches/ubuntu/trusty/eglibc/trusty-security/view/head:/malloc/malloc.c#L1356 class Arena: __slots__ = ( "_gdbValue", "address", - "is_main_arena", + "_is_main_arena", "_top", - "heaps", + "_active_heap", + "_heaps", "_mutex", "_flags", + "_non_contiguous", "_have_fastchunks", "_fastbinsY", "_bins", "_binmap", "_next", "_next_free", + "_system_mem", ) - def __init__(self, addr, heaps=None): + def __init__(self, addr): if isinstance(pwndbg.heap.current.malloc_state, gdb.Type): self._gdbValue = pwndbg.gdblib.memory.poi(pwndbg.heap.current.malloc_state, addr) else: self._gdbValue = pwndbg.heap.current.malloc_state(addr) self.address = int(self._gdbValue.address) - self.is_main_arena = self.address == pwndbg.heap.current.main_arena.address + self._is_main_arena = None self._top = None - self.heaps = heaps + self._active_heap = None + self._heaps = None self._mutex = None self._flags = None + self._non_contiguous = None self._have_fastchunks = None self._fastbinsY = None self._bins = None self._binmap = None self._next = None self._next_free = None + self._system_mem = None + + @property + def is_main_arena(self): + if self._is_main_arena is None: + self._is_main_arena = self.address == pwndbg.heap.current.main_arena.address + + return self._is_main_arena @property def mutex(self): @@ -408,12 +479,21 @@ class Arena: def flags(self): if self._flags is None: try: - self._flags = int(self._gdbValue["flag"]) + self._flags = int(self._gdbValue["flags"]) except gdb.MemoryError: pass return self._flags + @property + def non_contiguous(self): + if self._non_contiguous is None: + flags = self.flags + if flags is not None: + self._non_contiguous = bool(flags & ptmalloc.NONCONTIGUOUS_BIT) + + return self._non_contiguous + @property def have_fastchunks(self): if self._have_fastchunks is None: @@ -490,6 +570,42 @@ class Arena: return self._next_free + @property + def system_mem(self): + if self._system_mem is None: + try: + self._system_mem = int(self._gdbValue["system_mem"]) + except gdb.MemoryError: + pass + + return self._system_mem + + @property + def active_heap(self): + if self._active_heap is None: + self._active_heap = Heap(self.top, arena=self) + + return self._active_heap + + @property + def heaps(self): + if self._heaps is None: + heap = self.active_heap + heap_list = [heap] + if self.is_main_arena: + sbrk_region = pwndbg.heap.current.get_sbrk_heap_region() + if self.top not in sbrk_region: + heap_list.append(Heap(sbrk_region.start, arena=self)) + else: + while heap.prev: + heap = Heap(heap.prev, arena=self) + heap_list.append(heap) + + heap_list.reverse() + self._heaps = heap_list + + return self._heaps + def fastbins(self): size = pwndbg.gdblib.arch.ptrsize * 2 fd_offset = pwndbg.gdblib.arch.ptrsize * 2 @@ -518,18 +634,6 @@ class Arena: return "\n".join(res) -class HeapInfo: - def __init__(self, addr, first_chunk): - self.addr = addr - self.first_chunk = first_chunk - - def __str__(self): - fmt = "[%%%ds]" % (pwndbg.gdblib.arch.ptrsize * 2) - return message.hint(fmt % (hex(self.first_chunk))) + M.heap( - str(pwndbg.gdblib.vmmap.find(self.addr)) - ) - - class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator): def __init__(self): # Global ptmalloc objects @@ -554,53 +658,17 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator): @property @pwndbg.lib.memoize.reset_on_stop def arenas(self): - arena = self.main_arena + """Return a tuple of all current arenas.""" arenas = [] - arena_cnt = 0 - main_arena_addr = int(arena.address) - sbrk_page = self.get_heap_boundaries().vaddr - - # Create the main_arena with a fake HeapInfo - main_arena = Arena(main_arena_addr, [HeapInfo(sbrk_page, sbrk_page)]) + main_arena = self.main_arena arenas.append(main_arena) - # Iterate over all the non-main arenas - addr = int(arena["next"]) - while addr != main_arena_addr: - heaps = [] - arena = self.get_arena(addr) - arena_cnt += 1 - - # Get the first and last element on the heap linked list of the arena - last_heap_addr = heap_for_ptr(int(arena["top"])) - first_heap_addr = heap_for_ptr(addr) - - heap = self.get_heap(last_heap_addr) - if not heap: - print(message.error("Could not find the heap for arena %s" % hex(addr))) - return - - # Iterate over the heaps of the arena - haddr = last_heap_addr - while haddr != 0: - if haddr == first_heap_addr: - # The first heap has a heap_info and a malloc_state before the actual chunks - chunks_offset = self.heap_info.sizeof + self.malloc_state.sizeof - else: - # The others just - chunks_offset = self.heap_info.sizeof - heaps.append(HeapInfo(haddr, haddr + chunks_offset)) - - # Name the heap mapping, so that it can be colored properly. Note that due to the way malloc is - # optimized, a vm mapping may contain two heaps, so the numbering will not be exact. - page = self.get_region(haddr) - page.objfile = "[heap %d:%d]" % (arena_cnt, len(heaps)) - heap = self.get_heap(haddr) - haddr = int(heap["prev"]) - - # Add to the list of arenas and move on to the next one - arenas.append(Arena(addr, tuple(reversed(heaps)))) - addr = int(arena["next"]) + arena = main_arena + addr = arena.next + while addr != main_arena.address: + arenas.append(Arena(addr)) + arena = Arena(addr) + addr = arena.next arenas = tuple(arenas) self._arenas = arenas @@ -609,6 +677,10 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator): def has_tcache(self): raise NotImplementedError() + @property + def thread_arena(self): + raise NotImplementedError() + @property def thread_cache(self): raise NotImplementedError() @@ -771,28 +843,15 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator): def get_heap(self, addr): raise NotImplementedError() - def get_arena(self, arena_addr=None): - raise NotImplementedError() - - def get_arena_for_chunk(self, addr): - chunk = pwndbg.commands.heap.read_chunk(addr) - _, _, nm = self.chunk_flags(chunk["size"]) - if nm: - h = self.get_heap(addr) - r = self.get_arena(h["ar_ptr"]) if h else None - else: - r = self.main_arena - return r - def get_tcache(self, tcache_addr=None): raise NotImplementedError() - def get_heap_boundaries(self, addr=None): + def get_sbrk_heap_region(self): raise NotImplementedError() def get_region(self, addr): """Find the memory map containing 'addr'.""" - return pwndbg.gdblib.vmmap.find(addr) + return copy.deepcopy(pwndbg.gdblib.vmmap.find(addr)) def get_bins(self, bin_type, addr=None): if bin_type == BinType.TCACHE: @@ -816,12 +875,15 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator): def fastbins(self, arena_addr=None): """Returns: chain or None""" - arena = self.get_arena(arena_addr) + if arena_addr: + arena = Arena(arena_addr) + else: + arena = self.thread_arena if arena is None: return - fastbinsY = arena["fastbinsY"] + fastbinsY = arena.fastbinsY fd_offset = self.chunk_key_offset("fd") num_fastbins = 7 size = pwndbg.gdblib.arch.ptrsize * 2 @@ -885,13 +947,16 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator): Returns: tuple(chain_from_bin_fd, chain_from_bin_bk, is_chain_corrupted) or None """ index = index - 1 - arena = self.get_arena(arena_addr) + + if arena_addr is not None: + arena = Arena(arena_addr) + else: + arena = self.thread_arena if arena is None: return - normal_bins = arena["bins"] - num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof + normal_bins = arena._gdbValue["bins"] # Breaks encapsulation, find a better way. bins_base = int(normal_bins.address) - (pwndbg.gdblib.arch.ptrsize * 2) current_base = bins_base + (index * pwndbg.gdblib.arch.ptrsize * 2) @@ -1040,13 +1105,26 @@ class DebugSymsHeap(GlibcMemoryAllocator): "main_arena" ) or pwndbg.gdblib.symbol.address("main_arena") if self._main_arena_addr is not None: - self._main_arena = pwndbg.gdblib.memory.poi(self.malloc_state, self._main_arena_addr) + self._main_arena = Arena(self._main_arena_addr) return self._main_arena def has_tcache(self): return self.mp and "tcache_bins" in self.mp.type.keys() and self.mp["tcache_bins"] + @property + def thread_arena(self): + if self.multithreaded: + thread_arena_addr = pwndbg.gdblib.symbol.static_linkage_symbol_address( + "thread_arena" + ) or pwndbg.gdblib.symbol.address("thread_arena") + if thread_arena_addr is not None and thread_arena_addr != 0: + return Arena(pwndbg.gdblib.memory.pvoid(thread_arena_addr)) + else: + return None + else: + return self.main_arena + @property def thread_cache(self): """Locate a thread's tcache struct. If it doesn't have one, use the main @@ -1136,35 +1214,7 @@ class DebugSymsHeap(GlibcMemoryAllocator): def get_heap(self, addr): """Find & read the heap_info struct belonging to the chunk at 'addr'.""" - try: - r = pwndbg.gdblib.memory.poi(self.heap_info, heap_for_ptr(addr)) - r.fetch_lazy() - except gdb.MemoryError: - r = None - - return r - - def get_arena(self, arena_addr=None): - """Read a malloc_state struct from the specified address, default to - reading the current thread's arena. Return the main arena if the - current thread is not attached to an arena. - """ - if arena_addr is None: - if self.multithreaded: - arena_addr = pwndbg.gdblib.memory.u( - pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena") - or pwndbg.gdblib.symbol.address("thread_arena") - ) - if arena_addr > 0: - return pwndbg.gdblib.memory.poi(self.malloc_state, arena_addr) - - return self.main_arena - - return ( - None - if pwndbg.gdblib.vmmap.find(arena_addr) is None - else pwndbg.gdblib.memory.poi(self.malloc_state, arena_addr) - ) + return pwndbg.gdblib.memory.poi(self.heap_info, heap_for_ptr(addr)) def get_tcache(self, tcache_addr=None): if tcache_addr is None: @@ -1172,23 +1222,20 @@ class DebugSymsHeap(GlibcMemoryAllocator): return pwndbg.gdblib.memory.poi(self.tcache_perthread_struct, tcache_addr) - def get_heap_boundaries(self, addr=None): - """Find the boundaries of the heap containing `addr`, default to the - boundaries of the heap containing the top chunk for the thread's arena. + def get_sbrk_heap_region(self): + """Return a Page object representing the sbrk heap region. + Ensure the region's start address is aligned to SIZE_SZ * 2, + which compensates for the presence of GLIBC_TUNABLES. """ - region = self.get_region(addr) if addr else self.get_region(self.get_arena()["top"]) - - # Occasionally, the [heap] vm region and the actual start of the heap are - # different, e.g. [heap] starts at 0x61f000 but mp_.sbrk_base is 0x620000. - # Return an adjusted Page object if this is the case. - page = pwndbg.lib.memory.Page(0, 0, 0, 0) - sbrk_base = int(self.mp["sbrk_base"]) - if region == self.get_region(sbrk_base): - if sbrk_base != region.vaddr: - page.vaddr = sbrk_base - page.memsz = region.memsz - (sbrk_base - region.vaddr) - return page - return region + sbrk_base = pwndbg.lib.memory.align_up( + int(self.mp["sbrk_base"]), pwndbg.heap.current.size_sz * 2 + ) + + sbrk_region = self.get_region(sbrk_base) + sbrk_region.memsz = sbrk_region.end - sbrk_base + sbrk_region.vaddr = sbrk_base + + return sbrk_region def is_initialized(self): addr = pwndbg.gdblib.symbol.address("__libc_malloc_initialized") @@ -1370,7 +1417,7 @@ class HeuristicHeap(GlibcMemoryAllocator): break if self._main_arena_addr and pwndbg.gdblib.vmmap.find(self._main_arena_addr): - self._main_arena = self.malloc_state(self._main_arena_addr) + self._main_arena = Arena(self._main_arena_addr) return self._main_arena raise SymbolUnresolvableError("main_arena") @@ -1383,161 +1430,168 @@ class HeuristicHeap(GlibcMemoryAllocator): @property def thread_arena(self): - thread_arena_via_config = int(str(pwndbg.gdblib.config.thread_arena), 0) - thread_arena_via_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address( - "thread_arena" - ) or pwndbg.gdblib.symbol.address("thread_arena") - if thread_arena_via_config > 0: - return thread_arena_via_config - elif thread_arena_via_symbol: - if pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena"): - # If the symbol is static-linkage symbol, we trust it. - return pwndbg.gdblib.memory.u(thread_arena_via_symbol) - # Check &thread_arena is nearby TLS base or not to avoid false positive. - tls_base = pwndbg.gdblib.tls.address - if tls_base: - if pwndbg.gdblib.arch.current in ("x86-64", "i386"): - is_valid_address = 0 < tls_base - thread_arena_via_symbol < 0x250 - else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"): - is_valid_address = 0 < thread_arena_via_symbol - tls_base < 0x250 - - is_valid_address = ( - is_valid_address - and thread_arena_via_symbol in pwndbg.gdblib.vmmap.find(tls_base) - ) - - if is_valid_address: - thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol) - # Check &thread_arena is a valid address or not to avoid false positive. - if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr): - return thread_arena_struct_addr + if self.multithreaded: + thread_arena_via_config = int(str(pwndbg.gdblib.config.thread_arena), 0) + thread_arena_via_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address( + "thread_arena" + ) or pwndbg.gdblib.symbol.address("thread_arena") + if thread_arena_via_config > 0: + return Arena(thread_arena_via_config) + elif thread_arena_via_symbol: + if pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena"): + # If the symbol is static-linkage symbol, we trust it. + return Arena(pwndbg.gdblib.memory.u(thread_arena_via_symbol)) + # Check &thread_arena is nearby TLS base or not to avoid false positive. + tls_base = pwndbg.gdblib.tls.address + if tls_base: + if pwndbg.gdblib.arch.current in ("x86-64", "i386"): + is_valid_address = 0 < tls_base - thread_arena_via_symbol < 0x250 + else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"): + is_valid_address = 0 < thread_arena_via_symbol - tls_base < 0x250 + + is_valid_address = ( + is_valid_address + and thread_arena_via_symbol in pwndbg.gdblib.vmmap.find(tls_base) + ) - if not self._thread_arena_offset and pwndbg.gdblib.symbol.address("__libc_calloc"): - # TODO/FIXME: This method should be updated if we find a better way to find the target assembly code - __libc_calloc_instruction = pwndbg.disasm.near( - pwndbg.gdblib.symbol.address("__libc_calloc"), 100, show_prev_insns=False - ) - # try to find the reference to thread_arena in arena_get in __libc_calloc ( ptr = thread_arena; ) - if pwndbg.gdblib.arch.current == "x86-64": - # try to find something like `mov rax, [rip + disp]` - # and its next is `mov reg, qword ptr fs:[rax]` - # and then we can get the tls offset to thread_arena by calculating value of rax + if is_valid_address: + thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol) + # Check &thread_arena is a valid address or not to avoid false positive. + if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr): + return Arena(thread_arena_struct_addr) - is_possible = lambda i, instr: ( - __libc_calloc_instruction[i + 1].op_str.endswith("qword ptr fs:[rax]") - and instr.op_str.startswith("rax, qword ptr [rip +") - ) - get_offset_instruction = next( - instr - for i, instr in enumerate(__libc_calloc_instruction[:-1]) - if is_possible(i, instr) - ) - # rip + disp - self._thread_arena_offset = pwndbg.gdblib.memory.s64( - get_offset_instruction.next + get_offset_instruction.disp - ) - elif pwndbg.gdblib.arch.current == "i386" and self.possible_page_of_symbols: - base_offset = self.possible_page_of_symbols.vaddr - # try to find something like `mov eax, dword ptr [reg + disp]` (disp is a negative value) - # and its next is either `mov reg, dword ptr gs:[eax]` or `mov reg, dword ptr [reg + eax]` - # and then we can get the tls offset to thread_arena by calculating value of eax - - # this part is very dirty, but it works - is_possible = lambda i, instr: ( - ( - __libc_calloc_instruction[i + 1].op_str.endswith("gs:[eax]") - ^ __libc_calloc_instruction[i + 1].op_str.endswith("+ eax]") - ) - and __libc_calloc_instruction[i + 1].mnemonic == "mov" - and instr.mnemonic == "mov" - and instr.op_str.startswith("eax, dword ptr [e") - and instr.disp < 0 - ) - get_offset_instruction = [ - instr - for i, instr in enumerate(__libc_calloc_instruction[:-1]) - if is_possible(i, instr) - ][-1] - # reg + disp (value of reg is the page start of the last libc page) - self._thread_arena_offset = pwndbg.gdblib.memory.s32( - base_offset + get_offset_instruction.disp - ) - elif pwndbg.gdblib.arch.current == "aarch64": - # There's a branch to get main_arena or thread_arena - # and before the branch, the flow of assembly code will like: - # `mrs reg1, tpidr_el; - # adrp reg2, #base_offset; - # ldr reg2, [reg2, #offset] - # /* branch(cbnz) to arena_get or use main_arena */; - # /* if branch to thread_arena*/; - # ldr reg3, [reg1, reg2]` - # Or: - # `adrp reg1, #base_offset; - # ldr reg1, [reg1, #offset]; - # mrs reg2, tpidr_el; - # /* branch(cbnz) to arena_get or use main_arena */; - # /* if branch to thread_arena*/; - # ldr reg2, [reg1, reg2]` - # , then reg3 or reg2 will be &thread_arena - mrs_instr = next( - instr for instr in __libc_calloc_instruction if instr.mnemonic == "mrs" + if not self._thread_arena_offset and pwndbg.gdblib.symbol.address("__libc_calloc"): + # TODO/FIXME: This method should be updated if we find a better way to find the target assembly code + __libc_calloc_instruction = pwndbg.disasm.near( + pwndbg.gdblib.symbol.address("__libc_calloc"), 100, show_prev_insns=False ) - min_adrp_distance = 0x1000 # just a big enough number - nearest_adrp = None - nearest_adrp_idx = 0 - for i, instr in enumerate(__libc_calloc_instruction): - if ( - instr.mnemonic == "adrp" - and abs(mrs_instr.address - instr.address) < min_adrp_distance - ): - reg = instr.operands[0].str - nearest_adrp = instr - nearest_adrp_idx = i - min_adrp_distance = abs(mrs_instr.address - instr.address) - if instr.address - mrs_instr.address > min_adrp_distance: - break - for instr in __libc_calloc_instruction[nearest_adrp_idx + 1 :]: - if instr.mnemonic == "ldr": - base_offset = nearest_adrp.operands[1].int - offset = instr.operands[1].mem.disp - self._thread_arena_offset = pwndbg.gdblib.memory.s64(base_offset + offset) - break + # try to find the reference to thread_arena in arena_get in __libc_calloc ( ptr = thread_arena; ) + if pwndbg.gdblib.arch.current == "x86-64": + # try to find something like `mov rax, [rip + disp]` + # and its next is `mov reg, qword ptr fs:[rax]` + # and then we can get the tls offset to thread_arena by calculating value of rax - elif pwndbg.gdblib.arch.current == "arm": - # We need to find something near the first `mrc 15, ......` - # The flow of assembly code will like: - # `ldr reg1, [pc, #offset]; - # mrc 15, 0, reg2, cr13, cr0, {3}; - # add reg1, pc; - # ldr reg1, [reg1]; - # add reg1, reg2` - # , then reg1 will be &thread_arena - found_mrc = False - ldr_instr = None - for instr in __libc_calloc_instruction: - if not found_mrc: - if instr.mnemonic == "mrc": - found_mrc = True - elif instr.mnemonic == "ldr": - ldr_instr = instr - else: - reg = ldr_instr.operands[0].str - if instr.mnemonic == "add" and instr.op_str == reg + ", pc": - offset = ldr_instr.operands[1].mem.disp - offset = pwndbg.gdblib.memory.s32((ldr_instr.address + 4 & -4) + offset) - self._thread_arena_offset = pwndbg.gdblib.memory.s32( - instr.address + 4 + offset + is_possible = lambda i, instr: ( + __libc_calloc_instruction[i + 1].op_str.endswith("qword ptr fs:[rax]") + and instr.op_str.startswith("rax, qword ptr [rip +") + ) + get_offset_instruction = next( + instr + for i, instr in enumerate(__libc_calloc_instruction[:-1]) + if is_possible(i, instr) + ) + # rip + disp + self._thread_arena_offset = pwndbg.gdblib.memory.s64( + get_offset_instruction.next + get_offset_instruction.disp + ) + elif pwndbg.gdblib.arch.current == "i386" and self.possible_page_of_symbols: + base_offset = self.possible_page_of_symbols.vaddr + # try to find something like `mov eax, dword ptr [reg + disp]` (disp is a negative value) + # and its next is either `mov reg, dword ptr gs:[eax]` or `mov reg, dword ptr [reg + eax]` + # and then we can get the tls offset to thread_arena by calculating value of eax + + # this part is very dirty, but it works + is_possible = lambda i, instr: ( + ( + __libc_calloc_instruction[i + 1].op_str.endswith("gs:[eax]") + ^ __libc_calloc_instruction[i + 1].op_str.endswith("+ eax]") + ) + and __libc_calloc_instruction[i + 1].mnemonic == "mov" + and instr.mnemonic == "mov" + and instr.op_str.startswith("eax, dword ptr [e") + and instr.disp < 0 + ) + get_offset_instruction = [ + instr + for i, instr in enumerate(__libc_calloc_instruction[:-1]) + if is_possible(i, instr) + ][-1] + # reg + disp (value of reg is the page start of the last libc page) + self._thread_arena_offset = pwndbg.gdblib.memory.s32( + base_offset + get_offset_instruction.disp + ) + elif pwndbg.gdblib.arch.current == "aarch64": + # There's a branch to get main_arena or thread_arena + # and before the branch, the flow of assembly code will like: + # `mrs reg1, tpidr_el; + # adrp reg2, #base_offset; + # ldr reg2, [reg2, #offset] + # /* branch(cbnz) to arena_get or use main_arena */; + # /* if branch to thread_arena*/; + # ldr reg3, [reg1, reg2]` + # Or: + # `adrp reg1, #base_offset; + # ldr reg1, [reg1, #offset]; + # mrs reg2, tpidr_el; + # /* branch(cbnz) to arena_get or use main_arena */; + # /* if branch to thread_arena*/; + # ldr reg2, [reg1, reg2]` + # , then reg3 or reg2 will be &thread_arena + mrs_instr = next( + instr for instr in __libc_calloc_instruction if instr.mnemonic == "mrs" + ) + min_adrp_distance = 0x1000 # just a big enough number + nearest_adrp = None + nearest_adrp_idx = 0 + for i, instr in enumerate(__libc_calloc_instruction): + if ( + instr.mnemonic == "adrp" + and abs(mrs_instr.address - instr.address) < min_adrp_distance + ): + reg = instr.operands[0].str + nearest_adrp = instr + nearest_adrp_idx = i + min_adrp_distance = abs(mrs_instr.address - instr.address) + if instr.address - mrs_instr.address > min_adrp_distance: + break + for instr in __libc_calloc_instruction[nearest_adrp_idx + 1 :]: + if instr.mnemonic == "ldr": + base_offset = nearest_adrp.operands[1].int + offset = instr.operands[1].mem.disp + self._thread_arena_offset = pwndbg.gdblib.memory.s64( + base_offset + offset ) break - if self._thread_arena_offset: - tls_base = pwndbg.gdblib.tls.address - if tls_base: - thread_arena_struct_addr = tls_base + self._thread_arena_offset - if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr): - return pwndbg.gdblib.memory.pvoid(thread_arena_struct_addr) + elif pwndbg.gdblib.arch.current == "arm": + # We need to find something near the first `mrc 15, ......` + # The flow of assembly code will like: + # `ldr reg1, [pc, #offset]; + # mrc 15, 0, reg2, cr13, cr0, {3}; + # add reg1, pc; + # ldr reg1, [reg1]; + # add reg1, reg2` + # , then reg1 will be &thread_arena + found_mrc = False + ldr_instr = None + for instr in __libc_calloc_instruction: + if not found_mrc: + if instr.mnemonic == "mrc": + found_mrc = True + elif instr.mnemonic == "ldr": + ldr_instr = instr + else: + reg = ldr_instr.operands[0].str + if instr.mnemonic == "add" and instr.op_str == reg + ", pc": + offset = ldr_instr.operands[1].mem.disp + offset = pwndbg.gdblib.memory.s32( + (ldr_instr.address + 4 & -4) + offset + ) + self._thread_arena_offset = pwndbg.gdblib.memory.s32( + instr.address + 4 + offset + ) + break - raise SymbolUnresolvableError("thread_arena") + if self._thread_arena_offset: + tls_base = pwndbg.gdblib.tls.address + if tls_base: + thread_arena_struct_addr = tls_base + self._thread_arena_offset + if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr): + return Arena(pwndbg.gdblib.memory.pvoid(thread_arena_struct_addr)) + + raise SymbolUnresolvableError("thread_arena") + else: + return self.main_arena @property def thread_cache(self): @@ -1572,10 +1626,8 @@ class HeuristicHeap(GlibcMemoryAllocator): if is_valid_address: thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol) - # Check *tcache is in the heap region or not to avoid false positive. - if thread_cache_struct_addr in self.get_heap_boundaries(): - self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) - return self._thread_cache + self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) + return self._thread_cache if self.has_tcache(): # Each thread has a tcache struct, and the address of the tcache struct is stored in the TLS. @@ -1741,31 +1793,17 @@ class HeuristicHeap(GlibcMemoryAllocator): thread_cache_struct_addr = pwndbg.gdblib.memory.pvoid( tls_base + self._thread_cache_offset ) - if ( - pwndbg.gdblib.vmmap.find(thread_cache_struct_addr) - and thread_cache_struct_addr in self.get_heap_boundaries() - ): + if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr): self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) return self._thread_cache # If we still can't find the tcache, we guess tcache is in the first chunk of the heap # Note: The result might be wrong if the arena is being shared by multiple threads # And that's why we need to find the tcache address in TLS first - arena = self.get_arena() - heap_region = self.get_heap_boundaries() + arena = self.thread_arena ptr_size = pwndbg.gdblib.arch.ptrsize - if arena == self.main_arena: - cursor = heap_region.start - else: - cursor = heap_region.start + self.heap_info.sizeof - if ( - pwndbg.gdblib.vmmap.find(self.get_heap(heap_region.start)["ar_ptr"]) - == heap_region - ): - # Round up to a 2-machine-word alignment after an arena to - # compensate for the presence of the have_fastchunks variable - # in GLIBC versions >= 2.27. - cursor += (self.malloc_state.sizeof + ptr_size) & ~self.malloc_align_mask + + cursor = arena.active_heap.start # i686 alignment heuristic first_chunk_size = pwndbg.gdblib.arch.unpack( @@ -1906,7 +1944,7 @@ class HeuristicHeap(GlibcMemoryAllocator): region = None # Try to find heap region via `main_arena.top` if self._main_arena_addr and arena: - region = self.get_region(arena["top"]) + region = self.get_region(arena.top) # If we can't use `main_arena` to find the heap region, try to find it via vmmap region = region or next( (p for p in pwndbg.gdblib.vmmap.get() if "[heap]" == p.objfile), None @@ -2105,65 +2143,44 @@ class HeuristicHeap(GlibcMemoryAllocator): """Find & read the heap_info struct belonging to the chunk at 'addr'.""" return self.heap_info(heap_for_ptr(addr)) - def get_arena(self, arena_addr=None): - """Read a malloc_state struct from the specified address, default to - reading the current thread's arena. Return the main arena if the - current thread is not attached to an arena. - """ - if arena_addr is None: - if self.multithreaded: - thread_arena = self.thread_arena - if thread_arena > 0: - return self.malloc_state(thread_arena) - - return self.main_arena - - return self.malloc_state(arena_addr) - def get_tcache(self, tcache_addr=None): if tcache_addr is None: return self.thread_cache return self.tcache_perthread_struct(tcache_addr) - def get_heap_boundaries(self, addr=None): - """Find the boundaries of the heap containing `addr`, default to the - boundaries of the heap containing the top chunk for the thread's arena. + def get_sbrk_heap_region(self): + """Return a Page object representing the sbrk heap region. + Ensure the region's start address is aligned to SIZE_SZ * 2, + which compensates for the presence of GLIBC_TUNABLES. + This heuristic version requires some sanity checks and may raise SymbolUnresolvableError + if malloc's `mp_` struct can't be resolved. """ - region = None - try: - region = self.get_region(addr) if addr else self.get_region(self.get_arena()["top"]) - except Exception: - # Although `self.get_arena` should only raise `SymbolUnresolvableError`, we catch all exceptions here to avoid some bugs in main_arena's heuristics break this function :) - pass - # If we can't use arena to find the heap region, we use vmmap to find the heap region - if region is None and not self.multithreaded: - region = next((p for p in pwndbg.gdblib.vmmap.get() if "[heap]" == p.objfile), None) - if region is not None and addr is not None: - region = None if addr not in region else region - - # Occasionally, the [heap] vm region and the actual start of the heap are - # different, e.g. [heap] starts at 0x61f000 but mp_.sbrk_base is 0x620000. - # Return an adjusted Page object if this is the case. + # Initialize malloc's mp_ struct if necessary. if not self._mp_addr: try: - self.mp # try to fetch the mp_ structure to make sure it's initialized + self.mp except Exception: - # Although `self.mp` should only raise `SymbolUnresolvableError`, we catch all exceptions here to avoid some bugs in mp_'s heuristics break this function :) + # Should only raise SymbolUnresolvableError, but the heuristic heap implementation is still buggy so catch all exceptions for now. pass - if self._mp_addr: # sometimes we can't find mp_ via heuristics - page = pwndbg.lib.memory.Page(0, 0, 0, 0) - # make sure mp["sbrk_base"] is valid + + if self._mp_addr: if self.get_region(self.mp.get_field_address("sbrk_base")) and self.get_region( self.mp["sbrk_base"] ): - sbrk_base = int(self.mp["sbrk_base"]) - if region == self.get_region(sbrk_base): - if sbrk_base != region.vaddr: - page.vaddr = sbrk_base - page.memsz = region.memsz - (sbrk_base - region.vaddr) - return page - return region + sbrk_base = pwndbg.lib.memory.align_up( + int(self.mp["sbrk_base"]), pwndbg.heap.current.size_sz * 2 + ) + + sbrk_region = self.get_region(sbrk_base) + sbrk_region.memsz = self.get_region(sbrk_base).end - sbrk_base + sbrk_region.vaddr = sbrk_base + + return sbrk_region + else: + raise ValueError("mp_.sbrk_base is unmapped or points to unmapped memory.") + else: + raise SymbolUnresolvableError("Unable to resolve mp_ struct via heuristics.") def is_initialized(self): # TODO/FIXME: If main_arena['top'] is been modified to 0, this will not work. diff --git a/tests/gdb-tests/tests/heap/test_heap.py b/tests/gdb-tests/tests/heap/test_heap.py index 43a26def2..0e1b90d08 100644 --- a/tests/gdb-tests/tests/heap/test_heap.py +++ b/tests/gdb-tests/tests/heap/test_heap.py @@ -240,7 +240,7 @@ def test_main_arena_heuristic(start_binary): assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol # Check the struct size is correct assert ( - pwndbg.heap.current.main_arena.type.sizeof + pwndbg.heap.current.main_arena._gdbValue.type.sizeof == pwndbg.gdblib.typeinfo.lookup_types("struct malloc_state").sizeof ) pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg @@ -381,7 +381,7 @@ def test_thread_arena_heuristic(start_binary): # Level 1: We check we can get the address of `thread_arena` from debug symbols and the value of `thread_arena` is correct assert pwndbg.heap.current.thread_arena is not None # Check the address of `thread_arena` is correct - assert pwndbg.heap.current.thread_arena == thread_arena_via_debug_symbol + assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg # Level 2: We check we can get the address of `thread_arena` by parsing the assembly code of `__libc_calloc` @@ -389,7 +389,7 @@ def test_thread_arena_heuristic(start_binary): with mock_for_heuristic(["thread_arena"]): assert pwndbg.gdblib.symbol.address("thread_arena") is None # Check the value of `thread_arena` is correct - assert pwndbg.heap.current.thread_arena == thread_arena_via_debug_symbol + assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol def test_heuristic_fail_gracefully(start_binary): diff --git a/tests/gdb-tests/tests/heap/test_try_free.py b/tests/gdb-tests/tests/heap/test_try_free.py index 7be949aa7..d0d060caa 100644 --- a/tests/gdb-tests/tests/heap/test_try_free.py +++ b/tests/gdb-tests/tests/heap/test_try_free.py @@ -157,9 +157,11 @@ def test_try_free_invalid_fastbin_entry(start_binary): def test_try_free_double_free_or_corruption_top(start_binary): setup_heap(start_binary, 9) + allocator = pwndbg.heap.current ptr_size = pwndbg.gdblib.arch.ptrsize - top_chunk = int(pwndbg.heap.current.get_arena()["top"]) + 2 * ptr_size + arena = allocator.thread_arena or allocator.main_arena + top_chunk = arena.top + (2 * ptr_size) result = gdb.execute("try_free {}".format(hex(top_chunk)), to_string=True) assert "double free or corruption (top)" in result