Reduce heap code redundancy (#1346)

* Add get_sbrk_heap_region() method

* Use SIZE_BITS in Chunk.real_size()

* Add non_contiguous property to Arena class

* Improve Heap class

* More accurate arena detection
* Integrate Heap class into Chunk class

* Don't parse bins when no arena in find_fake_fast

* Add active_heap property to Arena class

* Add more functionality to heap classes

* next_chunk method for Chunk class
* prev property & __str__ method for Heap class
* heaps property for Arena class
* arenas command updated to reflect changes to Arena class
* Use deepcopy() in get_region() to avoid changing vmmap command output
* Import fiddling to deal with unrelated bug

* Attempt at integration with heap commands

With debug syms looks good, still issues to iron out with heuristics

* Remove redundant heap functions

* Remove redundant functions from tests

* Add system_mem property to Arena class

* thread_arena returns main_arena if single thread
pull/1367/head
CptGibbon 3 years ago committed by GitHub
parent 15b80caede
commit f71a4aa65d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,9 +13,11 @@ import pwndbg.lib.heap.helpers
from pwndbg.color import generateColorFunction
from pwndbg.color import message
from pwndbg.commands.config import display_config
from pwndbg.heap.ptmalloc import Arena
from pwndbg.heap.ptmalloc import Bins
from pwndbg.heap.ptmalloc import BinType
from pwndbg.heap.ptmalloc import Chunk
from pwndbg.heap.ptmalloc import Heap
def read_chunk(addr):
@ -127,44 +129,18 @@ def heap(addr=None, verbose=False, simple=False):
active heap.
"""
allocator = pwndbg.heap.current
heap_region = pwndbg.heap.ptmalloc.Heap(addr)
arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena()
top_chunk = arena["top"]
ptr_size = allocator.size_sz
# Store the heap base address in a GDB variable that can be used in other
# GDB commands
gdb.execute("set $heap_base=0x{:x}".format(heap_region.start))
# Calculate where to start printing; if an address was supplied, use that,
# if this heap belongs to the main arena, start at the beginning of the
# heap's mapping, otherwise, compensate for the presence of a heap_info
# struct and possibly an arena.
if addr:
cursor = int(addr)
if addr is not None:
chunk = Chunk(addr)
while chunk is not None:
malloc_chunk(chunk.address)
chunk = chunk.next_chunk()
else:
cursor = heap_region.start
# i686 alignment heuristic
first_chunk_size = pwndbg.gdblib.arch.unpack(
pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size)
)
if first_chunk_size == 0:
cursor += ptr_size * 2
while cursor in heap_region:
malloc_chunk(cursor, verbose=verbose, simple=simple)
if cursor == top_chunk:
break
size_field = pwndbg.gdblib.memory.u(cursor + allocator.chunk_key_offset("size"))
real_size = size_field & ~allocator.malloc_align_mask
cursor += real_size
arena = allocator.thread_arena
h = arena.active_heap
# Avoid an infinite loop when a chunk's size is 0.
if real_size == 0:
break
for chunk in h:
malloc_chunk(chunk.address)
parser = argparse.ArgumentParser()
@ -179,8 +155,13 @@ parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of
def arena(addr=None):
"""Print the contents of an arena, default to the current thread's arena."""
allocator = pwndbg.heap.current
arena = allocator.get_arena(addr)
print(arena)
if addr is not None:
arena = Arena(addr)
else:
arena = allocator.thread_arena
print(arena._gdbValue) # Breaks encapsulation, find a better way.
parser = argparse.ArgumentParser()
@ -247,12 +228,13 @@ def top_chunk(addr=None):
current thread's arena.
"""
allocator = pwndbg.heap.current
arena = allocator.get_arena(addr)
address = arena["top"]
size = pwndbg.gdblib.memory.u(int(address) + allocator.chunk_key_offset("size"))
out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format(M.get(address), size)
print(out)
if addr is not None:
arena = Arena(addr)
else:
arena = allocator.thread_arena
malloc_chunk(arena.top)
parser = argparse.ArgumentParser()
@ -275,10 +257,9 @@ parser.add_argument(
@pwndbg.commands.OnlyWhenHeapIsInitialized
def malloc_chunk(addr, fake=False, verbose=False, simple=False):
"""Print a malloc_chunk struct's contents."""
chunk = Chunk(addr)
allocator = pwndbg.heap.current
ptr_size = allocator.size_sz
chunk = Chunk(addr)
headers_to_print = [] # both state (free/allocated) and flags
fields_to_print = set() # in addition to addr and size
@ -300,7 +281,7 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False):
if chunk.is_top_chunk:
headers_to_print.append(message.off("Top chunk"))
if not chunk.is_top_chunk:
if not chunk.is_top_chunk and arena:
bins_list = [
allocator.fastbins(arena.address) or {},
@ -644,35 +625,22 @@ parser.add_argument(
def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None):
"""Visualize chunks on a heap, default to the current arena's active heap."""
allocator = pwndbg.heap.current
heap_region = allocator.get_heap_boundaries(addr)
arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena()
top_chunk = arena["top"]
if addr is not None:
cursor = int(addr)
heap_region = Heap(cursor)
arena = heap_region.arena
else:
arena = allocator.thread_arena
heap_region = arena.active_heap
cursor = heap_region.start
ptr_size = allocator.size_sz
# Build a list of addresses that delimit each chunk.
chunk_delims = []
if addr:
cursor = int(addr)
elif arena == allocator.main_arena:
cursor = heap_region.start
else:
cursor = heap_region.start + allocator.heap_info.sizeof
if pwndbg.gdblib.vmmap.find(allocator.get_heap(heap_region.start)["ar_ptr"]) == heap_region:
# Round up to a 2-machine-word alignment after an arena to
# compensate for the presence of the have_fastchunks variable
# in GLIBC versions >= 2.27.
cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask
# Check if there is an alignment at the start of the heap, adjust if necessary.
if not addr:
first_chunk_size = pwndbg.gdblib.arch.unpack(
pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size)
)
if first_chunk_size == 0:
cursor += ptr_size * 2
cursor_backup = cursor
chunk = Chunk(cursor)
for _ in range(count + 1):
# Don't read beyond the heap mapping if --naive or corrupted heap.
@ -680,24 +648,21 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None):
chunk_delims.append(heap_region.end)
break
size_field = pwndbg.gdblib.memory.u(cursor + ptr_size)
real_size = size_field & ~allocator.malloc_align_mask
prev_inuse = allocator.chunk_flags(size_field)[0]
# Don't repeatedly operate on the same address (e.g. chunk size of 0).
if cursor in chunk_delims or cursor + ptr_size in chunk_delims:
break
if prev_inuse:
if chunk.prev_inuse:
chunk_delims.append(cursor + ptr_size)
else:
chunk_delims.append(cursor)
if (cursor == top_chunk and not naive) or (cursor == heap_region.end - ptr_size * 2):
if (chunk.is_top_chunk and not naive) or (cursor == heap_region.end - ptr_size * 2):
chunk_delims.append(cursor + ptr_size * 2)
break
cursor += real_size
cursor += chunk.real_size
chunk = Chunk(cursor)
# Build the output buffer, changing color at each chunk delimiter.
# TODO: maybe print free chunks in bold or underlined
@ -727,6 +692,7 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None):
labels = []
cursor = cursor_backup
chunk = Chunk(cursor)
has_huge_chunk = False
# round up to align with 4*ptr_size and get half
@ -768,7 +734,7 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None):
printed += 1
labels.extend(bin_labels(cursor, bin_collections))
if cursor == top_chunk:
if cursor == arena.top:
labels.append("Top chunk")
asc += bin_ascii(pwndbg.gdblib.memory.read(cursor, ptr_size))
@ -847,7 +813,7 @@ def try_free(addr):
# constants
allocator = pwndbg.heap.current
arena = allocator.get_arena()
arena = allocator.thread_arena
aligned_lsb = allocator.malloc_align_mask.bit_length()
size_sz = allocator.size_sz
@ -973,7 +939,7 @@ def try_free(addr):
print(message.notice("Fastbin checks"))
chunk_fastbin_idx = allocator.fastbin_index(chunk_size_unmasked)
fastbin_list = (
allocator.fastbins(int(arena.address))
allocator.fastbins(arena.address)
.bins[(chunk_fastbin_idx + 2) * (ptr_size * 2)]
.fd_chain
)
@ -993,10 +959,10 @@ def try_free(addr):
# next chunk's size is big enough and small enough
next_chunk_size = unsigned_size(next_chunk["size"])
if next_chunk_size <= 2 * size_sz or chunksize(next_chunk_size) >= int(arena["system_mem"]):
if next_chunk_size <= 2 * size_sz or chunksize(next_chunk_size) >= arena.system_mem:
err = "free(): invalid next size (fast) -> next chunk's size not in [2*size_sz; av->system_mem]\n"
err += " next chunk's size is 0x{:x}, 2*size_sz is 0x{:x}, system_mem is 0x{:x}"
err = err.format(next_chunk_size, 2 * size_sz, int(arena["system_mem"]))
err = err.format(next_chunk_size, 2 * size_sz, arena.system_mem)
print(message.error(err))
errors_found += 1
@ -1044,21 +1010,21 @@ def try_free(addr):
print(message.notice("Not mapped checks"))
# chunks is not top chunk
if addr == int(arena["top"]):
if addr == arena.top:
err = "double free or corruption (top) -> chunk is top chunk"
print(message.error(err))
errors_found += 1
# next chunk is not beyond the boundaries of the arena
NONCONTIGUOUS_BIT = 2
top_chunk_addr = int(arena["top"])
top_chunk_addr = arena.top
top_chunk = read_chunk(top_chunk_addr)
next_chunk_addr = addr + chunk_size_unmasked
# todo: in libc, addition may overflow
if (
arena["flags"] & NONCONTIGUOUS_BIT == 0
) and next_chunk_addr >= top_chunk_addr + chunksize(top_chunk["size"]):
if (arena.flags & NONCONTIGUOUS_BIT == 0) and next_chunk_addr >= top_chunk_addr + chunksize(
top_chunk["size"]
):
err = "double free or corruption (out) -> next chunk is beyond arena and arena is contiguous\n"
err += "next chunk at 0x{:x}, end of arena at 0x{:x}"
err = err.format(
@ -1084,10 +1050,10 @@ def try_free(addr):
errors_found += 1
# next chunk's size is big enough and small enough
if next_chunk_size <= 2 * size_sz or next_chunk_size >= int(arena["system_mem"]):
if next_chunk_size <= 2 * size_sz or next_chunk_size >= arena.system_mem:
err = "free(): invalid next size (normal) -> next chunk's size not in [2*size_sz; system_mem]\n"
err += "next chunk's size is 0x{:x}, 2*size_sz is 0x{:x}, system_mem is 0x{:x}"
err = err.format(next_chunk_size, 2 * size_sz, int(arena["system_mem"]))
err = err.format(next_chunk_size, 2 * size_sz, arena.system_mem)
print(message.error(err))
errors_found += 1
@ -1145,7 +1111,7 @@ def try_free(addr):
print(message.notice("Clearing next chunk's P bit"))
# unsorted bin fd->bk should be unsorted bean
unsorted_addr = int(arena["bins"][0])
unsorted_addr = int(arena.bins[0])
try:
unsorted = read_chunk(unsorted_addr)
try:

@ -3,6 +3,7 @@ PREV_INUSE = 1
IS_MMAPPED = 2
NON_MAIN_ARENA = 4
SIZE_BITS = PREV_INUSE | IS_MMAPPED | NON_MAIN_ARENA
NONCONTIGUOUS_BIT = 2
NBINS = 128
NSMALLBINS = 64

@ -1,3 +1,4 @@
import copy
import importlib
from collections import OrderedDict
from enum import Enum
@ -134,11 +135,12 @@ class Chunk:
"_bk",
"_fd_nextsize",
"_bk_nextsize",
"_heap",
"_arena",
"_is_top_chunk",
)
def __init__(self, addr, arena=None):
def __init__(self, addr, heap=None, arena=None):
if isinstance(pwndbg.heap.current.malloc_chunk, gdb.Type):
self._gdbValue = pwndbg.gdblib.memory.poi(pwndbg.heap.current.malloc_chunk, addr)
else:
@ -155,6 +157,7 @@ class Chunk:
self._bk = None
self._fd_nextsize = None
self._bk_nextsize = None
self._heap = heap
self._arena = arena
self._is_top_chunk = None
@ -196,8 +199,7 @@ class Chunk:
if self._real_size is None:
try:
self._real_size = int(
self._gdbValue[self.__match_renamed_field("size")]
& ~(ptmalloc.NON_MAIN_ARENA | ptmalloc.IS_MMAPPED | ptmalloc.PREV_INUSE)
self._gdbValue[self.__match_renamed_field("size")] & ~(ptmalloc.SIZE_BITS)
)
except gdb.MemoryError:
pass
@ -283,18 +285,17 @@ class Chunk:
return self._bk_nextsize
@property
def heap(self):
if self._heap is None:
self._heap = Heap(self.address)
return self._heap
@property
def arena(self):
if self._arena is None:
try:
ar_ptr = pwndbg.heap.current.get_heap(self.address)["ar_ptr"]
ar_ptr.fetch_lazy()
except Exception:
ar_ptr = None
if ar_ptr is not None and ar_ptr in (ar.address for ar in pwndbg.heap.current.arenas):
self._arena = Arena(ar_ptr)
else:
self._arena = Arena(pwndbg.heap.current.main_arena.address)
self._arena = self.heap.arena
return self._arena
@ -309,90 +310,160 @@ class Chunk:
return self._is_top_chunk
def next_chunk(self):
if self.is_top_chunk:
return None
if self.real_size == 0:
return None
next = Chunk(self.address + self.real_size, arena=self.arena)
if pwndbg.gdblib.memory.peek(next.address):
return next
else:
return None
class Heap:
def __init__(self, addr=None):
__slots__ = (
"_gdbValue",
"arena",
"_memory_region",
"start",
"end",
"_prev",
"first_chunk",
)
def __init__(self, addr, arena=None):
"""Build a Heap object given an address on that heap.
Heap regions are treated differently depending on their arena:
1) main_arena - uses the sbrk heap
2) non-main arena - heap starts after its heap_info struct (and possibly an arena)
3) non-contiguous main_arena - just a memory region
4) no arena - for fake/mmapped chunks
"""
allocator = pwndbg.heap.current
if addr is not None:
main_arena = allocator.main_arena
sbrk_region = allocator.get_sbrk_heap_region()
if addr in sbrk_region:
# Case 1; main_arena.
self.arena = main_arena if arena is None else arena
self._memory_region = sbrk_region
self._gdbValue = None
else:
heap_region = allocator.get_region(addr)
heap_info = allocator.get_heap(addr)
try:
# Can fail if any part of the struct is unmapped (due to corruption/fake struct).
ar_ptr = allocator.get_heap(addr)["ar_ptr"]
ar_ptr.fetch_lazy()
except Exception:
ar_ptr = int(heap_info["ar_ptr"])
except gdb.MemoryError:
ar_ptr = None
# This is probably a non-main arena if a legitimate ar_ptr exists.
if ar_ptr is not None and ar_ptr in (ar.address for ar in allocator.arenas):
self.arena = Arena(ar_ptr)
else:
# Use the main arena as a fallback
self.arena = Arena(allocator.main_arena.address)
# Case 2; non-main arena.
self.arena = Arena(ar_ptr) if arena is None else arena
start = heap_region.start + allocator.heap_info.sizeof
if ar_ptr in heap_region:
start += pwndbg.lib.memory.align_up(
allocator.malloc_state.sizeof, allocator.malloc_alignment
)
heap_region.memsz = heap_region.end - start
heap_region.vaddr = start
self._memory_region = heap_region
self._gdbValue = heap_info
elif main_arena.non_contiguous:
# Case 3; non-contiguous main_arena.
self.arena = main_arena if arena is None else arena
self._memory_region = heap_region
self._gdbValue = None
else:
# Get the thread arena under default conditions.
self.arena = Arena(allocator.get_arena().address)
# Case 4; fake/mmapped chunk
self.arena = None
self._memory_region = heap_region
self._gdbValue = None
if self.arena.address == allocator.main_arena.address:
self.is_main_arena_heap = True
else:
self.is_main_arena_heap = False
self.start = self._memory_region.start
self.end = self._memory_region.end
self.first_chunk = Chunk(self.start)
heap_region = allocator.get_heap_boundaries(addr)
if not self.is_main_arena_heap:
page = pwndbg.lib.memory.Page(0, 0, 0, 0)
page.vaddr = heap_region.start + allocator.heap_info.sizeof
if (
pwndbg.gdblib.vmmap.find(allocator.get_heap(heap_region.start)["ar_ptr"])
== heap_region
):
page.vaddr += (
allocator.malloc_state.sizeof + allocator.size_sz
) & ~allocator.malloc_align_mask
page.memsz = heap_region.end - page.start
heap_region = page
self._prev = None
self.start = heap_region.start
self.end = heap_region.end
@property
def prev(self):
if self._prev is None and self._gdbValue is not None:
try:
self._prev = int(self._gdbValue["prev"])
except gdb.MemoryError:
pass
return self._prev
def __iter__(self):
iter_chunk = self.first_chunk
while iter_chunk is not None:
yield iter_chunk
iter_chunk = iter_chunk.next_chunk()
def __contains__(self, addr: int) -> bool:
return self.start <= addr < self.end
def __str__(self):
fmt = "[%%%ds]" % (pwndbg.gdblib.arch.ptrsize * 2)
return message.hint(fmt % (hex(self.first_chunk.address))) + M.heap(
str(pwndbg.gdblib.vmmap.find(self.start))
)
# https://bazaar.launchpad.net/~ubuntu-branches/ubuntu/trusty/eglibc/trusty-security/view/head:/malloc/malloc.c#L1356
class Arena:
__slots__ = (
"_gdbValue",
"address",
"is_main_arena",
"_is_main_arena",
"_top",
"heaps",
"_active_heap",
"_heaps",
"_mutex",
"_flags",
"_non_contiguous",
"_have_fastchunks",
"_fastbinsY",
"_bins",
"_binmap",
"_next",
"_next_free",
"_system_mem",
)
def __init__(self, addr, heaps=None):
def __init__(self, addr):
if isinstance(pwndbg.heap.current.malloc_state, gdb.Type):
self._gdbValue = pwndbg.gdblib.memory.poi(pwndbg.heap.current.malloc_state, addr)
else:
self._gdbValue = pwndbg.heap.current.malloc_state(addr)
self.address = int(self._gdbValue.address)
self.is_main_arena = self.address == pwndbg.heap.current.main_arena.address
self._is_main_arena = None
self._top = None
self.heaps = heaps
self._active_heap = None
self._heaps = None
self._mutex = None
self._flags = None
self._non_contiguous = None
self._have_fastchunks = None
self._fastbinsY = None
self._bins = None
self._binmap = None
self._next = None
self._next_free = None
self._system_mem = None
@property
def is_main_arena(self):
if self._is_main_arena is None:
self._is_main_arena = self.address == pwndbg.heap.current.main_arena.address
return self._is_main_arena
@property
def mutex(self):
@ -408,12 +479,21 @@ class Arena:
def flags(self):
if self._flags is None:
try:
self._flags = int(self._gdbValue["flag"])
self._flags = int(self._gdbValue["flags"])
except gdb.MemoryError:
pass
return self._flags
@property
def non_contiguous(self):
if self._non_contiguous is None:
flags = self.flags
if flags is not None:
self._non_contiguous = bool(flags & ptmalloc.NONCONTIGUOUS_BIT)
return self._non_contiguous
@property
def have_fastchunks(self):
if self._have_fastchunks is None:
@ -490,6 +570,42 @@ class Arena:
return self._next_free
@property
def system_mem(self):
if self._system_mem is None:
try:
self._system_mem = int(self._gdbValue["system_mem"])
except gdb.MemoryError:
pass
return self._system_mem
@property
def active_heap(self):
if self._active_heap is None:
self._active_heap = Heap(self.top, arena=self)
return self._active_heap
@property
def heaps(self):
if self._heaps is None:
heap = self.active_heap
heap_list = [heap]
if self.is_main_arena:
sbrk_region = pwndbg.heap.current.get_sbrk_heap_region()
if self.top not in sbrk_region:
heap_list.append(Heap(sbrk_region.start, arena=self))
else:
while heap.prev:
heap = Heap(heap.prev, arena=self)
heap_list.append(heap)
heap_list.reverse()
self._heaps = heap_list
return self._heaps
def fastbins(self):
size = pwndbg.gdblib.arch.ptrsize * 2
fd_offset = pwndbg.gdblib.arch.ptrsize * 2
@ -518,18 +634,6 @@ class Arena:
return "\n".join(res)
class HeapInfo:
def __init__(self, addr, first_chunk):
self.addr = addr
self.first_chunk = first_chunk
def __str__(self):
fmt = "[%%%ds]" % (pwndbg.gdblib.arch.ptrsize * 2)
return message.hint(fmt % (hex(self.first_chunk))) + M.heap(
str(pwndbg.gdblib.vmmap.find(self.addr))
)
class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator):
def __init__(self):
# Global ptmalloc objects
@ -554,53 +658,17 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator):
@property
@pwndbg.lib.memoize.reset_on_stop
def arenas(self):
arena = self.main_arena
"""Return a tuple of all current arenas."""
arenas = []
arena_cnt = 0
main_arena_addr = int(arena.address)
sbrk_page = self.get_heap_boundaries().vaddr
# Create the main_arena with a fake HeapInfo
main_arena = Arena(main_arena_addr, [HeapInfo(sbrk_page, sbrk_page)])
main_arena = self.main_arena
arenas.append(main_arena)
# Iterate over all the non-main arenas
addr = int(arena["next"])
while addr != main_arena_addr:
heaps = []
arena = self.get_arena(addr)
arena_cnt += 1
# Get the first and last element on the heap linked list of the arena
last_heap_addr = heap_for_ptr(int(arena["top"]))
first_heap_addr = heap_for_ptr(addr)
heap = self.get_heap(last_heap_addr)
if not heap:
print(message.error("Could not find the heap for arena %s" % hex(addr)))
return
# Iterate over the heaps of the arena
haddr = last_heap_addr
while haddr != 0:
if haddr == first_heap_addr:
# The first heap has a heap_info and a malloc_state before the actual chunks
chunks_offset = self.heap_info.sizeof + self.malloc_state.sizeof
else:
# The others just
chunks_offset = self.heap_info.sizeof
heaps.append(HeapInfo(haddr, haddr + chunks_offset))
# Name the heap mapping, so that it can be colored properly. Note that due to the way malloc is
# optimized, a vm mapping may contain two heaps, so the numbering will not be exact.
page = self.get_region(haddr)
page.objfile = "[heap %d:%d]" % (arena_cnt, len(heaps))
heap = self.get_heap(haddr)
haddr = int(heap["prev"])
# Add to the list of arenas and move on to the next one
arenas.append(Arena(addr, tuple(reversed(heaps))))
addr = int(arena["next"])
arena = main_arena
addr = arena.next
while addr != main_arena.address:
arenas.append(Arena(addr))
arena = Arena(addr)
addr = arena.next
arenas = tuple(arenas)
self._arenas = arenas
@ -609,6 +677,10 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator):
def has_tcache(self):
raise NotImplementedError()
@property
def thread_arena(self):
raise NotImplementedError()
@property
def thread_cache(self):
raise NotImplementedError()
@ -771,28 +843,15 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator):
def get_heap(self, addr):
raise NotImplementedError()
def get_arena(self, arena_addr=None):
raise NotImplementedError()
def get_arena_for_chunk(self, addr):
chunk = pwndbg.commands.heap.read_chunk(addr)
_, _, nm = self.chunk_flags(chunk["size"])
if nm:
h = self.get_heap(addr)
r = self.get_arena(h["ar_ptr"]) if h else None
else:
r = self.main_arena
return r
def get_tcache(self, tcache_addr=None):
raise NotImplementedError()
def get_heap_boundaries(self, addr=None):
def get_sbrk_heap_region(self):
raise NotImplementedError()
def get_region(self, addr):
"""Find the memory map containing 'addr'."""
return pwndbg.gdblib.vmmap.find(addr)
return copy.deepcopy(pwndbg.gdblib.vmmap.find(addr))
def get_bins(self, bin_type, addr=None):
if bin_type == BinType.TCACHE:
@ -816,12 +875,15 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator):
def fastbins(self, arena_addr=None):
"""Returns: chain or None"""
arena = self.get_arena(arena_addr)
if arena_addr:
arena = Arena(arena_addr)
else:
arena = self.thread_arena
if arena is None:
return
fastbinsY = arena["fastbinsY"]
fastbinsY = arena.fastbinsY
fd_offset = self.chunk_key_offset("fd")
num_fastbins = 7
size = pwndbg.gdblib.arch.ptrsize * 2
@ -885,13 +947,16 @@ class GlibcMemoryAllocator(pwndbg.heap.heap.MemoryAllocator):
Returns: tuple(chain_from_bin_fd, chain_from_bin_bk, is_chain_corrupted) or None
"""
index = index - 1
arena = self.get_arena(arena_addr)
if arena_addr is not None:
arena = Arena(arena_addr)
else:
arena = self.thread_arena
if arena is None:
return
normal_bins = arena["bins"]
num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof
normal_bins = arena._gdbValue["bins"] # Breaks encapsulation, find a better way.
bins_base = int(normal_bins.address) - (pwndbg.gdblib.arch.ptrsize * 2)
current_base = bins_base + (index * pwndbg.gdblib.arch.ptrsize * 2)
@ -1040,13 +1105,26 @@ class DebugSymsHeap(GlibcMemoryAllocator):
"main_arena"
) or pwndbg.gdblib.symbol.address("main_arena")
if self._main_arena_addr is not None:
self._main_arena = pwndbg.gdblib.memory.poi(self.malloc_state, self._main_arena_addr)
self._main_arena = Arena(self._main_arena_addr)
return self._main_arena
def has_tcache(self):
return self.mp and "tcache_bins" in self.mp.type.keys() and self.mp["tcache_bins"]
@property
def thread_arena(self):
if self.multithreaded:
thread_arena_addr = pwndbg.gdblib.symbol.static_linkage_symbol_address(
"thread_arena"
) or pwndbg.gdblib.symbol.address("thread_arena")
if thread_arena_addr is not None and thread_arena_addr != 0:
return Arena(pwndbg.gdblib.memory.pvoid(thread_arena_addr))
else:
return None
else:
return self.main_arena
@property
def thread_cache(self):
"""Locate a thread's tcache struct. If it doesn't have one, use the main
@ -1136,35 +1214,7 @@ class DebugSymsHeap(GlibcMemoryAllocator):
def get_heap(self, addr):
"""Find & read the heap_info struct belonging to the chunk at 'addr'."""
try:
r = pwndbg.gdblib.memory.poi(self.heap_info, heap_for_ptr(addr))
r.fetch_lazy()
except gdb.MemoryError:
r = None
return r
def get_arena(self, arena_addr=None):
"""Read a malloc_state struct from the specified address, default to
reading the current thread's arena. Return the main arena if the
current thread is not attached to an arena.
"""
if arena_addr is None:
if self.multithreaded:
arena_addr = pwndbg.gdblib.memory.u(
pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena")
or pwndbg.gdblib.symbol.address("thread_arena")
)
if arena_addr > 0:
return pwndbg.gdblib.memory.poi(self.malloc_state, arena_addr)
return self.main_arena
return (
None
if pwndbg.gdblib.vmmap.find(arena_addr) is None
else pwndbg.gdblib.memory.poi(self.malloc_state, arena_addr)
)
return pwndbg.gdblib.memory.poi(self.heap_info, heap_for_ptr(addr))
def get_tcache(self, tcache_addr=None):
if tcache_addr is None:
@ -1172,23 +1222,20 @@ class DebugSymsHeap(GlibcMemoryAllocator):
return pwndbg.gdblib.memory.poi(self.tcache_perthread_struct, tcache_addr)
def get_heap_boundaries(self, addr=None):
"""Find the boundaries of the heap containing `addr`, default to the
boundaries of the heap containing the top chunk for the thread's arena.
def get_sbrk_heap_region(self):
"""Return a Page object representing the sbrk heap region.
Ensure the region's start address is aligned to SIZE_SZ * 2,
which compensates for the presence of GLIBC_TUNABLES.
"""
region = self.get_region(addr) if addr else self.get_region(self.get_arena()["top"])
# Occasionally, the [heap] vm region and the actual start of the heap are
# different, e.g. [heap] starts at 0x61f000 but mp_.sbrk_base is 0x620000.
# Return an adjusted Page object if this is the case.
page = pwndbg.lib.memory.Page(0, 0, 0, 0)
sbrk_base = int(self.mp["sbrk_base"])
if region == self.get_region(sbrk_base):
if sbrk_base != region.vaddr:
page.vaddr = sbrk_base
page.memsz = region.memsz - (sbrk_base - region.vaddr)
return page
return region
sbrk_base = pwndbg.lib.memory.align_up(
int(self.mp["sbrk_base"]), pwndbg.heap.current.size_sz * 2
)
sbrk_region = self.get_region(sbrk_base)
sbrk_region.memsz = sbrk_region.end - sbrk_base
sbrk_region.vaddr = sbrk_base
return sbrk_region
def is_initialized(self):
addr = pwndbg.gdblib.symbol.address("__libc_malloc_initialized")
@ -1370,7 +1417,7 @@ class HeuristicHeap(GlibcMemoryAllocator):
break
if self._main_arena_addr and pwndbg.gdblib.vmmap.find(self._main_arena_addr):
self._main_arena = self.malloc_state(self._main_arena_addr)
self._main_arena = Arena(self._main_arena_addr)
return self._main_arena
raise SymbolUnresolvableError("main_arena")
@ -1383,16 +1430,17 @@ class HeuristicHeap(GlibcMemoryAllocator):
@property
def thread_arena(self):
if self.multithreaded:
thread_arena_via_config = int(str(pwndbg.gdblib.config.thread_arena), 0)
thread_arena_via_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address(
"thread_arena"
) or pwndbg.gdblib.symbol.address("thread_arena")
if thread_arena_via_config > 0:
return thread_arena_via_config
return Arena(thread_arena_via_config)
elif thread_arena_via_symbol:
if pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena"):
# If the symbol is static-linkage symbol, we trust it.
return pwndbg.gdblib.memory.u(thread_arena_via_symbol)
return Arena(pwndbg.gdblib.memory.u(thread_arena_via_symbol))
# Check &thread_arena is nearby TLS base or not to avoid false positive.
tls_base = pwndbg.gdblib.tls.address
if tls_base:
@ -1410,7 +1458,7 @@ class HeuristicHeap(GlibcMemoryAllocator):
thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol)
# Check &thread_arena is a valid address or not to avoid false positive.
if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr):
return thread_arena_struct_addr
return Arena(thread_arena_struct_addr)
if not self._thread_arena_offset and pwndbg.gdblib.symbol.address("__libc_calloc"):
# TODO/FIXME: This method should be updated if we find a better way to find the target assembly code
@ -1500,7 +1548,9 @@ class HeuristicHeap(GlibcMemoryAllocator):
if instr.mnemonic == "ldr":
base_offset = nearest_adrp.operands[1].int
offset = instr.operands[1].mem.disp
self._thread_arena_offset = pwndbg.gdblib.memory.s64(base_offset + offset)
self._thread_arena_offset = pwndbg.gdblib.memory.s64(
base_offset + offset
)
break
elif pwndbg.gdblib.arch.current == "arm":
@ -1524,7 +1574,9 @@ class HeuristicHeap(GlibcMemoryAllocator):
reg = ldr_instr.operands[0].str
if instr.mnemonic == "add" and instr.op_str == reg + ", pc":
offset = ldr_instr.operands[1].mem.disp
offset = pwndbg.gdblib.memory.s32((ldr_instr.address + 4 & -4) + offset)
offset = pwndbg.gdblib.memory.s32(
(ldr_instr.address + 4 & -4) + offset
)
self._thread_arena_offset = pwndbg.gdblib.memory.s32(
instr.address + 4 + offset
)
@ -1535,9 +1587,11 @@ class HeuristicHeap(GlibcMemoryAllocator):
if tls_base:
thread_arena_struct_addr = tls_base + self._thread_arena_offset
if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr):
return pwndbg.gdblib.memory.pvoid(thread_arena_struct_addr)
return Arena(pwndbg.gdblib.memory.pvoid(thread_arena_struct_addr))
raise SymbolUnresolvableError("thread_arena")
else:
return self.main_arena
@property
def thread_cache(self):
@ -1572,8 +1626,6 @@ class HeuristicHeap(GlibcMemoryAllocator):
if is_valid_address:
thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol)
# Check *tcache is in the heap region or not to avoid false positive.
if thread_cache_struct_addr in self.get_heap_boundaries():
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
@ -1741,31 +1793,17 @@ class HeuristicHeap(GlibcMemoryAllocator):
thread_cache_struct_addr = pwndbg.gdblib.memory.pvoid(
tls_base + self._thread_cache_offset
)
if (
pwndbg.gdblib.vmmap.find(thread_cache_struct_addr)
and thread_cache_struct_addr in self.get_heap_boundaries()
):
if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr):
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
# If we still can't find the tcache, we guess tcache is in the first chunk of the heap
# Note: The result might be wrong if the arena is being shared by multiple threads
# And that's why we need to find the tcache address in TLS first
arena = self.get_arena()
heap_region = self.get_heap_boundaries()
arena = self.thread_arena
ptr_size = pwndbg.gdblib.arch.ptrsize
if arena == self.main_arena:
cursor = heap_region.start
else:
cursor = heap_region.start + self.heap_info.sizeof
if (
pwndbg.gdblib.vmmap.find(self.get_heap(heap_region.start)["ar_ptr"])
== heap_region
):
# Round up to a 2-machine-word alignment after an arena to
# compensate for the presence of the have_fastchunks variable
# in GLIBC versions >= 2.27.
cursor += (self.malloc_state.sizeof + ptr_size) & ~self.malloc_align_mask
cursor = arena.active_heap.start
# i686 alignment heuristic
first_chunk_size = pwndbg.gdblib.arch.unpack(
@ -1906,7 +1944,7 @@ class HeuristicHeap(GlibcMemoryAllocator):
region = None
# Try to find heap region via `main_arena.top`
if self._main_arena_addr and arena:
region = self.get_region(arena["top"])
region = self.get_region(arena.top)
# If we can't use `main_arena` to find the heap region, try to find it via vmmap
region = region or next(
(p for p in pwndbg.gdblib.vmmap.get() if "[heap]" == p.objfile), None
@ -2105,65 +2143,44 @@ class HeuristicHeap(GlibcMemoryAllocator):
"""Find & read the heap_info struct belonging to the chunk at 'addr'."""
return self.heap_info(heap_for_ptr(addr))
def get_arena(self, arena_addr=None):
"""Read a malloc_state struct from the specified address, default to
reading the current thread's arena. Return the main arena if the
current thread is not attached to an arena.
"""
if arena_addr is None:
if self.multithreaded:
thread_arena = self.thread_arena
if thread_arena > 0:
return self.malloc_state(thread_arena)
return self.main_arena
return self.malloc_state(arena_addr)
def get_tcache(self, tcache_addr=None):
if tcache_addr is None:
return self.thread_cache
return self.tcache_perthread_struct(tcache_addr)
def get_heap_boundaries(self, addr=None):
"""Find the boundaries of the heap containing `addr`, default to the
boundaries of the heap containing the top chunk for the thread's arena.
def get_sbrk_heap_region(self):
"""Return a Page object representing the sbrk heap region.
Ensure the region's start address is aligned to SIZE_SZ * 2,
which compensates for the presence of GLIBC_TUNABLES.
This heuristic version requires some sanity checks and may raise SymbolUnresolvableError
if malloc's `mp_` struct can't be resolved.
"""
region = None
try:
region = self.get_region(addr) if addr else self.get_region(self.get_arena()["top"])
except Exception:
# Although `self.get_arena` should only raise `SymbolUnresolvableError`, we catch all exceptions here to avoid some bugs in main_arena's heuristics break this function :)
pass
# If we can't use arena to find the heap region, we use vmmap to find the heap region
if region is None and not self.multithreaded:
region = next((p for p in pwndbg.gdblib.vmmap.get() if "[heap]" == p.objfile), None)
if region is not None and addr is not None:
region = None if addr not in region else region
# Occasionally, the [heap] vm region and the actual start of the heap are
# different, e.g. [heap] starts at 0x61f000 but mp_.sbrk_base is 0x620000.
# Return an adjusted Page object if this is the case.
# Initialize malloc's mp_ struct if necessary.
if not self._mp_addr:
try:
self.mp # try to fetch the mp_ structure to make sure it's initialized
self.mp
except Exception:
# Although `self.mp` should only raise `SymbolUnresolvableError`, we catch all exceptions here to avoid some bugs in mp_'s heuristics break this function :)
# Should only raise SymbolUnresolvableError, but the heuristic heap implementation is still buggy so catch all exceptions for now.
pass
if self._mp_addr: # sometimes we can't find mp_ via heuristics
page = pwndbg.lib.memory.Page(0, 0, 0, 0)
# make sure mp["sbrk_base"] is valid
if self._mp_addr:
if self.get_region(self.mp.get_field_address("sbrk_base")) and self.get_region(
self.mp["sbrk_base"]
):
sbrk_base = int(self.mp["sbrk_base"])
if region == self.get_region(sbrk_base):
if sbrk_base != region.vaddr:
page.vaddr = sbrk_base
page.memsz = region.memsz - (sbrk_base - region.vaddr)
return page
return region
sbrk_base = pwndbg.lib.memory.align_up(
int(self.mp["sbrk_base"]), pwndbg.heap.current.size_sz * 2
)
sbrk_region = self.get_region(sbrk_base)
sbrk_region.memsz = self.get_region(sbrk_base).end - sbrk_base
sbrk_region.vaddr = sbrk_base
return sbrk_region
else:
raise ValueError("mp_.sbrk_base is unmapped or points to unmapped memory.")
else:
raise SymbolUnresolvableError("Unable to resolve mp_ struct via heuristics.")
def is_initialized(self):
# TODO/FIXME: If main_arena['top'] is been modified to 0, this will not work.

@ -240,7 +240,7 @@ def test_main_arena_heuristic(start_binary):
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
# Check the struct size is correct
assert (
pwndbg.heap.current.main_arena.type.sizeof
pwndbg.heap.current.main_arena._gdbValue.type.sizeof
== pwndbg.gdblib.typeinfo.lookup_types("struct malloc_state").sizeof
)
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
@ -381,7 +381,7 @@ def test_thread_arena_heuristic(start_binary):
# Level 1: We check we can get the address of `thread_arena` from debug symbols and the value of `thread_arena` is correct
assert pwndbg.heap.current.thread_arena is not None
# Check the address of `thread_arena` is correct
assert pwndbg.heap.current.thread_arena == thread_arena_via_debug_symbol
assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2: We check we can get the address of `thread_arena` by parsing the assembly code of `__libc_calloc`
@ -389,7 +389,7 @@ def test_thread_arena_heuristic(start_binary):
with mock_for_heuristic(["thread_arena"]):
assert pwndbg.gdblib.symbol.address("thread_arena") is None
# Check the value of `thread_arena` is correct
assert pwndbg.heap.current.thread_arena == thread_arena_via_debug_symbol
assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol
def test_heuristic_fail_gracefully(start_binary):

@ -157,9 +157,11 @@ def test_try_free_invalid_fastbin_entry(start_binary):
def test_try_free_double_free_or_corruption_top(start_binary):
setup_heap(start_binary, 9)
allocator = pwndbg.heap.current
ptr_size = pwndbg.gdblib.arch.ptrsize
top_chunk = int(pwndbg.heap.current.get_arena()["top"]) + 2 * ptr_size
arena = allocator.thread_arena or allocator.main_arena
top_chunk = arena.top + (2 * ptr_size)
result = gdb.execute("try_free {}".format(hex(top_chunk)), to_string=True)
assert "double free or corruption (top)" in result

Loading…
Cancel
Save