From a0f3744743042091225cb47924f7923b934a5ce3 Mon Sep 17 00:00:00 2001 From: Gulshan Singh Date: Mon, 22 Aug 2022 05:39:55 -0700 Subject: [PATCH] Refactor heap code (#1063) * Add Bins classes and refactor allocator methods to return them * Refactor bins() and related commands * Refactor malloc_chunk * Use chunk_size_nomask in top_chunk() * Refactor vis_heap_chunks * Rename read_chunk to read_chunk_from_gdb and move to ptmalloc.py * Add get_first_chunk_in_heap and use it in heap and vis_heap_chunks commands * Move some methods from DebugSymsHeap to Heap base class * Strip type hints from heap.py and ptmalloc.py * Set heap_region before using it * Fix test_heap_bins test * Fix try_free --- pwndbg/commands/heap.py | 670 ++++++++++++++++++++-------------------- pwndbg/heap/ptmalloc.py | 276 +++++++++++++++-- tests/test_heap_bins.py | 89 +++--- 3 files changed, 626 insertions(+), 409 deletions(-) diff --git a/pwndbg/commands/heap.py b/pwndbg/commands/heap.py index a3082681e..ac6d0e4d3 100644 --- a/pwndbg/commands/heap.py +++ b/pwndbg/commands/heap.py @@ -12,25 +12,14 @@ import pwndbg.glibc import pwndbg.typeinfo from pwndbg.color import generateColorFunction from pwndbg.color import message +from pwndbg.color import underline from pwndbg.commands.config import extend_value_with_default from pwndbg.commands.config import get_config_parameters from pwndbg.commands.config import print_row - - -def read_chunk(addr): - """Read a chunk's metadata.""" - # In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`. - # To support both versions, change the new names to the old ones here so that - # the rest of the code can deal with uniform names. - renames = { - "mchunk_size": "size", - "mchunk_prev_size": "prev_size", - } - if not pwndbg.config.resolve_heap_via_heuristic: - val = pwndbg.typeinfo.read_gdbvalue("struct malloc_chunk", addr) - else: - val = pwndbg.heap.current.malloc_chunk(addr) - return dict({ renames.get(key, key): int(val[key]) for key in val.type.keys() }) +from pwndbg.heap.ptmalloc import Bin +from pwndbg.heap.ptmalloc import Bins +from pwndbg.heap.ptmalloc import BinType +from pwndbg.heap.ptmalloc import read_chunk_from_gdb def format_bin(bins, verbose=False, offset=None): @@ -39,49 +28,44 @@ def format_bin(bins, verbose=False, offset=None): offset = allocator.chunk_key_offset('fd') result = [] - bins_type = bins.pop('type') + bins_type = bins.bin_type - for size in bins: - b = bins[size] - count, is_chain_corrupted = None, False - safe_lnk = False + for size, b in bins.bins.items(): + if not verbose and ( + b.fd_chain == [0] and not b.count + ) and not b.is_corrupted: + continue - # fastbins consists of only single linked list - if bins_type == 'fastbins': - chain_fd = b - safe_lnk = pwndbg.glibc.check_safe_linking() - # tcachebins consists of single linked list and entries count - elif bins_type == 'tcachebins': - chain_fd, count = b + # TODO: Abstract this away + safe_lnk = False + if bins_type in [BinType.FAST, BinType.TCACHE]: safe_lnk = pwndbg.glibc.check_safe_linking() - # normal bins consists of double linked list and may be corrupted (we can detect corruption) - else: # normal bin - chain_fd, chain_bk, is_chain_corrupted = b - - if not verbose and (chain_fd == [0] and not count) and not is_chain_corrupted: - continue - if bins_type == 'tcachebins': - limit = 8 - if count <= 7: - limit = count + 1 - formatted_chain = pwndbg.chain.format(chain_fd[0], offset=offset, limit=limit, safe_linking=safe_lnk) + if bins_type == BinType.TCACHE: + limit = min(8, b.count + 1) else: - formatted_chain = pwndbg.chain.format(chain_fd[0], offset=offset, safe_linking=safe_lnk) + limit = pwndbg.chain.LIMIT + formatted_chain = pwndbg.chain.format( + b.fd_chain[0], limit=limit, offset=offset, safe_linking=safe_lnk + ) - if isinstance(size, int): - size = hex(size) + size_str = Bin.size_to_display_name(size) - if is_chain_corrupted: - line = message.hint(size) + message.error(' [corrupted]') + '\n' + if b.is_corrupted: + line = message.hint(size_str) + message.error(' [corrupted]') + '\n' line += message.hint('FD: ') + formatted_chain + '\n' - line += message.hint('BK: ') + pwndbg.chain.format(chain_bk[0], offset=allocator.chunk_key_offset('bk')) + line += message.hint('BK: ') + pwndbg.chain.format( + b.bk_chain[0], offset=allocator.chunk_key_offset('bk') + ) else: - if count is not None: - line = (message.hint(size) + message.hint(' [%3d]' % count) + ': ').ljust(13) - else: - line = (message.hint(size) + ': ').ljust(13) + line = message.hint(size_str) + if b.count is not None: + line += message.hint(' [%3d]' % b.count) + + line += ': ' + line.ljust(13) + line += formatted_chain result.append(line) @@ -94,9 +78,26 @@ def format_bin(bins, verbose=False, offset=None): parser = argparse.ArgumentParser() parser.description = "Iteratively print chunks on a heap, default to the current thread's active heap." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the first chunk (malloc_chunk struct start, prev_size field).") -parser.add_argument("-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones.") -parser.add_argument("-s", "--simple", action="store_true", help="Simply print malloc_chunk struct's contents.") +parser.add_argument( + "addr", + nargs="?", + type=int, + default=None, + help= + "Address of the first chunk (malloc_chunk struct start, prev_size field)." +) +parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Print all chunk fields, even unused ones." +) +parser.add_argument( + "-s", + "--simple", + action="store_true", + help="Simply print malloc_chunk struct's contents." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -106,49 +107,23 @@ def heap(addr=None, verbose=False, simple=False): active heap. """ allocator = pwndbg.heap.current - heap_region = allocator.get_heap_boundaries(addr) - arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena() - top_chunk = arena['top'] - ptr_size = allocator.size_sz - - # Store the heap base address in a GDB variable that can be used in other - # GDB commands - gdb.execute('set $heap_base=0x{:x}'.format(heap_region.start)) - # Calculate where to start printing; if an address was supplied, use that, - # if this heap belongs to the main arena, start at the beginning of the - # heap's mapping, otherwise, compensate for the presence of a heap_info - # struct and possibly an arena. + # If an address was supplied, start printing from there, otherwise start + # from the first chunk in the heap region if addr: cursor = int(addr) - elif arena == allocator.main_arena: - cursor = heap_region.start else: - cursor = heap_region.start + allocator.heap_info.sizeof - if pwndbg.vmmap.find(allocator.get_heap(heap_region.start)['ar_ptr']) == heap_region: - # Round up to a 2-machine-word alignment after an arena to - # compensate for the presence of the have_fastchunks variable - # in GLIBC versions >= 2.27. - cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask - - # i686 alignment heuristic - first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size)) - if first_chunk_size == 0: - cursor += ptr_size * 2 + heap_region = allocator.get_heap_boundaries(addr) + cursor = allocator.get_first_chunk_in_heap(heap_region.start) - while cursor in heap_region: - malloc_chunk(cursor, verbose=verbose, simple=simple) - if cursor == top_chunk: - break - - size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size')) - real_size = size_field & ~allocator.malloc_align_mask - cursor += real_size + # Store the heap base address in a GDB variable that can be used in other + # GDB commands + # TODO: See https://github.com/pwndbg/pwndbg/issues/1060 + gdb.execute('set $heap_base=0x{:x}'.format(heap_region.start)) - # Avoid an infinite loop when a chunk's size is 0. - if real_size == 0: - break + for chunk in allocator.chunks(cursor): + malloc_chunk(chunk, verbose=verbose, simple=simple) parser = argparse.ArgumentParser() @@ -209,7 +184,9 @@ def mp(): parser = argparse.ArgumentParser() parser.description = "Print relevant information about an arena's top chunk, default to current thread's arena." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") +parser.add_argument( + "addr", nargs="?", type=int, default=None, help="Address of the arena." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -221,18 +198,68 @@ def top_chunk(addr=None): allocator = pwndbg.heap.current arena = allocator.get_arena(addr) address = arena['top'] - size = pwndbg.memory.u(int(address) + allocator.chunk_key_offset('size')) + size = allocator.chunk_size_nomask(int(address)) - out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format(M.get(address), size) + out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format( + M.get(address), size + ) print(out) +def get_chunk_bin(addr): + # points to the real start of the chunk + cursor = int(addr) + + allocator = pwndbg.heap.current + size = allocator.chunk_size(cursor) + + arena = allocator.get_arena_for_chunk(addr) + + bins = [ + allocator.fastbins(arena.address), + allocator.smallbins(arena.address), + allocator.largebins(arena.address), + allocator.unsortedbin(arena.address), + ] + + if allocator.has_tcache(): + bins.append(allocator.tcachebins(None)) + + # TODO: What if we somehow got this chunk into a bin of a different size? We + # would miss it with this logic. Should we check every bin? + res = [] + for bin_ in bins: + if bin_.contains_chunk(size, cursor): + res.append(bin_.bin_type) + + if len(res) == 0: + return [BinType.NOT_IN_BIN] + else: + return res + + parser = argparse.ArgumentParser() parser.description = "Print a chunk." -parser.add_argument("addr", type=int, help="Address of the chunk (malloc_chunk struct start, prev_size field).") -parser.add_argument("-f", "--fake", action="store_true", help="Is this a fake chunk?") -parser.add_argument("-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones.") -parser.add_argument("-s", "--simple", action="store_true", help="Simply print malloc_chunk struct's contents.") +parser.add_argument( + "addr", + type=int, + help="Address of the chunk (malloc_chunk struct start, prev_size field)." +) +parser.add_argument( + "-f", "--fake", action="store_true", help="Is this a fake chunk?" +) +parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Print all chunk fields, even unused ones." +) +parser.add_argument( + "-s", + "--simple", + action="store_true", + help="Simply print malloc_chunk struct's contents." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -243,36 +270,34 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False): cursor = int(addr) allocator = pwndbg.heap.current - ptr_size = allocator.size_sz - size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size')) - real_size = size_field & ~allocator.malloc_align_mask + size_field = allocator.chunk_size_nomask(cursor) + real_size = allocator.chunk_size(cursor) headers_to_print = [] # both state (free/allocated) and flags fields_to_print = set() # in addition to addr and size - out_fields = "Addr: {}\n".format(M.get(cursor)) + + prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(size_field) + if prev_inuse: + headers_to_print.append(message.hint('PREV_INUSE')) + if is_mmapped: + headers_to_print.append(message.hint('IS_MMAPED')) + if non_main_arena: + headers_to_print.append(message.hint('NON_MAIN_ARENA')) if fake: - headers_to_print.append(message.on("Fake chunk")) + headers_to_print.append(message.on('Fake chunk')) verbose = True # print all fields for fake chunks if simple: - chunk = read_chunk(cursor) - - if not headers_to_print: - headers_to_print.append(message.hint(M.get(cursor))) + chunk = read_chunk_from_gdb(cursor) - prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(int(chunk['size'])) - if prev_inuse: - headers_to_print.append(message.hint('PREV_INUSE')) - if is_mmapped: - headers_to_print.append(message.hint('IS_MMAPED')) - if non_main_arena: - headers_to_print.append(message.hint('NON_MAIN_ARENA')) + # The address should be the first header + headers_to_print.insert(0, message.hint(M.get(cursor))) print(' | '.join(headers_to_print)) for key, val in chunk.items(): - print(message.system(key) + ": 0x{:02x}".format(int(val))) + print(message.system(key) + ': 0x{:02x}'.format(int(val))) print('') return @@ -283,70 +308,55 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False): arena_address = arena.address top_chunk = arena['top'] if cursor == top_chunk: - headers_to_print.append(message.off("Top chunk")) + headers_to_print.append(message.off('Top chunk')) is_top = True if not is_top: - fastbins = allocator.fastbins(arena_address) or {} - smallbins = allocator.smallbins(arena_address) or {} - largebins = allocator.largebins(arena_address) or {} - unsortedbin = allocator.unsortedbin(arena_address) or {} - if allocator.has_tcache(): - tcachebins = allocator.tcachebins(None) - - if real_size in fastbins.keys() and cursor in fastbins[real_size]: - headers_to_print.append(message.on("Free chunk (fastbins)")) - if not verbose: - fields_to_print.add('fd') - - elif real_size in smallbins.keys() and cursor in bin_addrs(smallbins[real_size], "smallbins"): - headers_to_print.append(message.on("Free chunk (smallbins)")) - if not verbose: - fields_to_print.update(['fd', 'bk']) - - elif real_size >= list(largebins.items())[0][0] and cursor in bin_addrs(largebins[(list(largebins.items())[allocator.largebin_index(real_size) - 64][0])], "largebins"): - headers_to_print.append(message.on("Free chunk (largebins)")) - if not verbose: - fields_to_print.update(['fd', 'bk', 'fd_nextsize', 'bk_nextsize']) - - elif cursor in bin_addrs(unsortedbin['all'], "unsortedbin"): - headers_to_print.append(message.on("Free chunk (unsortedbin)")) - if not verbose: - fields_to_print.update(['fd', 'bk']) - - elif allocator.has_tcache() and real_size in tcachebins.keys() and cursor + ptr_size*2 in bin_addrs(tcachebins[real_size], "tcachebins"): - headers_to_print.append(message.on("Free chunk (tcache)")) - if not verbose: - fields_to_print.add('fd') + bin_types = get_chunk_bin(cursor) + if BinType.NOT_IN_BIN in bin_types: + headers_to_print.append(message.hint('Allocated chunk')) else: - headers_to_print.append(message.hint("Allocated chunk")) - + # TODO: Handle a chunk being in multiple bins + bin_type = bin_types[0] + headers_to_print.append( + message.on('Free chunk ({})'.format('|'.join(bin_types))) + ) + for bin_type in bin_types: + fields_to_print.update(bin_type.valid_fields()) + + + out_fields = ['Addr: {}'.format(M.get(cursor))] + fields_ordered = [ + 'prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize' + ] if verbose: - fields_to_print.update(['prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize']) + fields_to_print.update(fields_ordered) else: - out_fields += "Size: 0x{:02x}\n".format(size_field) - - prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(size_field) - if prev_inuse: - headers_to_print.append(message.hint('PREV_INUSE')) - if is_mmapped: - headers_to_print.append(message.hint('IS_MMAPED')) - if non_main_arena: - headers_to_print.append(message.hint('NON_MAIN_ARENA')) + out_fields.append('Size: 0x{:02x}'.format(size_field)) - fields_ordered = ['prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize'] + print(' | '.join(headers_to_print)) for field_to_print in fields_ordered: if field_to_print in fields_to_print: - out_fields += message.system(field_to_print) + ": 0x{:02x}\n".format(pwndbg.memory.u(cursor + allocator.chunk_key_offset(field_to_print))) + field_val = pwndbg.memory.u(cursor + allocator.chunk_key_offset(field_to_print)) + out_fields.append(message.system(field_to_print) + ': 0x{:02x}'.format(field_val)) - print(' | '.join(headers_to_print) + "\n" + out_fields) + print('\n'.join(out_fields)) + print('') parser = argparse.ArgumentParser() parser.description = "Print the contents of all an arena's bins and a thread's tcache, default to the current thread's arena and tcache." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") -parser.add_argument("tcache_addr", nargs="?", type=int, default=None, help="Address of the tcache.") +parser.add_argument( + "addr", nargs="?", type=int, default=None, help="Address of the arena." +) +parser.add_argument( + "tcache_addr", + nargs="?", + type=int, + default=None, + help="Address of the tcache." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -363,10 +373,35 @@ def bins(addr=None, tcache_addr=None): largebins(addr) +def print_bins(bin_type, addr=None, verbose=False): + allocator = pwndbg.heap.current + offset = None + + # TODO: Abstract this away + if bin_type == BinType.TCACHE: + offset = allocator.tcache_next_offset + else: + offset = None + + bins = allocator.get_bins(bin_type, addr=addr) + if bins is None: + return + + formatted_bins = format_bin(bins, verbose, offset=offset) + + print(C.banner(bin_type.value)) + for node in formatted_bins: + print(node) + + parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's fastbins, default to the current thread's arena." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") -parser.add_argument("verbose", nargs="?", type=bool, default=True, help="Show extra detail.") +parser.add_argument( + "addr", nargs="?", type=int, default=None, help="Address of the arena." +) +parser.add_argument( + "verbose", nargs="?", type=bool, default=True, help="Show extra detail." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -375,23 +410,17 @@ def fastbins(addr=None, verbose=True): """Print the contents of an arena's fastbins, default to the current thread's arena. """ - allocator = pwndbg.heap.current - fastbins = allocator.fastbins(addr) - - if fastbins is None: - return - - formatted_bins = format_bin(fastbins, verbose) - - print(C.banner('fastbins')) - for node in formatted_bins: - print(node) + print_bins(BinType.FAST, addr, verbose) parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's unsortedbin, default to the current thread's arena." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") -parser.add_argument("verbose", nargs="?", type=bool, default=True, help="Show extra detail.") +parser.add_argument( + "addr", nargs="?", type=int, default=None, help="Address of the arena." +) +parser.add_argument( + "verbose", nargs="?", type=bool, default=True, help="Show extra detail." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -400,23 +429,17 @@ def unsortedbin(addr=None, verbose=True): """Print the contents of an arena's unsortedbin, default to the current thread's arena. """ - allocator = pwndbg.heap.current - unsortedbin = allocator.unsortedbin(addr) - - if unsortedbin is None: - return - - formatted_bins = format_bin(unsortedbin, verbose) - - print(C.banner('unsortedbin')) - for node in formatted_bins: - print(node) + print_bins(BinType.UNSORTED, addr, verbose) parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's smallbins, default to the current thread's arena." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") -parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Show extra detail.") +parser.add_argument( + "addr", nargs="?", type=int, default=None, help="Address of the arena." +) +parser.add_argument( + "verbose", nargs="?", type=bool, default=False, help="Show extra detail." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -425,23 +448,17 @@ def smallbins(addr=None, verbose=False): """Print the contents of an arena's smallbins, default to the current thread's arena. """ - allocator = pwndbg.heap.current - smallbins = allocator.smallbins(addr) - - if smallbins is None: - return - - formatted_bins = format_bin(smallbins, verbose) - - print(C.banner('smallbins')) - for node in formatted_bins: - print(node) + print_bins(BinType.SMALL, addr, verbose) parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's largebins, default to the current thread's arena." -parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") -parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Show extra detail.") +parser.add_argument( + "addr", nargs="?", type=int, default=None, help="Address of the arena." +) +parser.add_argument( + "verbose", nargs="?", type=bool, default=False, help="Show extra detail." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -450,23 +467,25 @@ def largebins(addr=None, verbose=False): """Print the contents of an arena's largebins, default to the current thread's arena. """ - allocator = pwndbg.heap.current - largebins = allocator.largebins(addr) - - if largebins is None: - return - - formatted_bins = format_bin(largebins, verbose) - - print(C.banner('largebins')) - for node in formatted_bins: - print(node) + print_bins(BinType.LARGE, addr, verbose) parser = argparse.ArgumentParser() parser.description = "Print the contents of a tcache, default to the current thread's tcache." -parser.add_argument("addr", nargs="?", type=int, default=None, help="The address of the tcache bins.") -parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Whether to show more details or not.") +parser.add_argument( + "addr", + nargs="?", + type=int, + default=None, + help="The address of the tcache bins." +) +parser.add_argument( + "verbose", + nargs="?", + type=bool, + default=False, + help="Whether to show more details or not." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -474,17 +493,7 @@ parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Whethe @pwndbg.commands.OnlyWithTcache def tcachebins(addr=None, verbose=False): """Print the contents of a tcache, default to the current thread's tcache.""" - allocator = pwndbg.heap.current - tcachebins = allocator.tcachebins(addr) - - if tcachebins is None: - return - - formatted_bins = format_bin(tcachebins, verbose, offset = allocator.tcache_next_offset) - - print(C.banner('tcachebins')) - for node in formatted_bins: - print(node) + print_bins(BinType.TCACHE, addr, verbose) parser = argparse.ArgumentParser() @@ -535,14 +544,28 @@ def find_fake_fast(addr, size=None): parser = argparse.ArgumentParser() parser.description = "Visualize chunks on a heap, default to the current arena's active heap." -parser.add_argument("count", nargs="?", type=lambda n:max(int(n, 0),1), default=10, help="Number of chunks to visualize.") -parser.add_argument("addr", nargs="?", default=None, help="Address of the first chunk.") -parser.add_argument("--naive", "-n", action="store_true", default=False, help="Attempt to keep printing beyond the top chunk.") +parser.add_argument( + "count", + nargs="?", + type=lambda n: max(int(n, 0), 1), + default=10, + help="Number of chunks to visualize." +) +parser.add_argument( + "addr", nargs="?", default=None, help="Address of the first chunk." +) +parser.add_argument( + "--naive", + "-n", + action="store_true", + default=False, + help="Attempt to keep printing beyond the top chunk." +) @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @pwndbg.commands.OnlyWhenHeapIsInitialized -def vis_heap_chunks(addr=None, count=None, naive=None): +def vis_heap_chunks(addr=None, count=None, naive=False): """Visualize chunks on a heap, default to the current arena's active heap.""" allocator = pwndbg.heap.current heap_region = allocator.get_heap_boundaries(addr) @@ -553,73 +576,58 @@ def vis_heap_chunks(addr=None, count=None, naive=None): # Build a list of addresses that delimit each chunk. chunk_delims = [] + + # If an address was supplied, start printing from there, otherwise start + # from the first chunk in the heap region if addr: cursor = int(addr) - elif arena == allocator.main_arena: - cursor = heap_region.start else: - cursor = heap_region.start + allocator.heap_info.sizeof - if pwndbg.vmmap.find(allocator.get_heap(heap_region.start)['ar_ptr']) == heap_region: - # Round up to a 2-machine-word alignment after an arena to - # compensate for the presence of the have_fastchunks variable - # in GLIBC versions >= 2.27. - cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask - - # Check if there is an alignment at the start of the heap, adjust if necessary. - if not addr: - first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size)) - if first_chunk_size == 0: - cursor += ptr_size * 2 + heap_region = allocator.get_heap_boundaries(addr) + cursor = allocator.get_first_chunk_in_heap(heap_region.start) cursor_backup = cursor - for _ in range(count + 1): - # Don't read beyond the heap mapping if --naive or corrupted heap. - if cursor not in heap_region: - chunk_delims.append(heap_region.end) - break - - size_field = pwndbg.memory.u(cursor + ptr_size) - real_size = size_field & ~allocator.malloc_align_mask - prev_inuse = allocator.chunk_flags(size_field)[0] + i = 0 + # TODO: This rewrite probably breaks --naive + # TODO: If we do it like this, we should store the first chunk, not the next one + for cursor in allocator.chunks(cursor): + if i == 0: + i += 1 + continue - # Don't repeatedly operate on the same address (e.g. chunk size of 0). - if cursor in chunk_delims or cursor + ptr_size in chunk_delims: + if i >= count: break - if prev_inuse: - chunk_delims.append(cursor + ptr_size) + i += 1 + + next_chunk = allocator.next_chunk(cursor) + if cursor == top_chunk: + inuse = False else: - chunk_delims.append(cursor) + inuse = allocator.prev_inuse(next_chunk) - if (cursor == top_chunk and not naive) or (cursor == heap_region.end - ptr_size*2): - chunk_delims.append(cursor + ptr_size*2) + # TODO: Is this check still necessary? + # Don't read beyond the heap mapping if --naive or corrupted heap. + if cursor not in heap_region: + chunk_delims.append((heap_region.end, inuse)) break - cursor += real_size + chunk_delims.append((cursor, inuse)) + + # if (cursor == top_chunk + # and not naive) or (cursor == heap_region.end - ptr_size * 2): + # chunk_delims.append(cursor + ptr_size * 2) + # break # Build the output buffer, changing color at each chunk delimiter. - # TODO: maybe print free chunks in bold or underlined color_funcs = [ - generateColorFunction("yellow"), - generateColorFunction("cyan"), - generateColorFunction("purple"), - generateColorFunction("green"), - generateColorFunction("blue"), + generateColorFunction('yellow'), + generateColorFunction('cyan'), + generateColorFunction('purple'), + generateColorFunction('green'), + generateColorFunction('blue'), ] - bin_collections = [ - allocator.fastbins(arena.address), - allocator.unsortedbin(arena.address), - allocator.smallbins(arena.address), - allocator.largebins(arena.address), - ] - if allocator.has_tcache(): - # Only check for tcache entries belonging to the current thread, - # it's difficult (impossible?) to find all the thread caches for a - # specific heap. - bin_collections.insert(0, allocator.tcachebins(None)) - printed = 0 out = '' asc = '' @@ -627,7 +635,13 @@ def vis_heap_chunks(addr=None, count=None, naive=None): cursor = cursor_backup - for c, stop in enumerate(chunk_delims): + for c, (stop, inuse) in enumerate(chunk_delims): + if inuse: + stop += ptr_size + + # TODO: Are we duplicating work with bin_labels? + bin_type = get_chunk_bin(cursor) + color_func = color_funcs[c % len(color_funcs)] while cursor != stop: @@ -635,18 +649,26 @@ def vis_heap_chunks(addr=None, count=None, naive=None): out += "\n0x%x" % cursor cell = pwndbg.arch.unpack(pwndbg.memory.read(cursor, ptr_size)) - cell_hex = '\t0x{:0{n}x}'.format(cell, n=ptr_size*2) + cell_hex = '\t0x{:0{n}x}'.format(cell, n=ptr_size * 2) - out += color_func(cell_hex) + colored_text = color_func(cell_hex) + if bin_type != BinType.NOT_IN_BIN: + colored_text = underline(colored_text) + out += colored_text printed += 1 - labels.extend(bin_labels(cursor, bin_collections)) + labels.extend(bin_labels(cursor, bin_type)) if cursor == top_chunk: labels.append('Top chunk') - asc += bin_ascii(pwndbg.memory.read(cursor, ptr_size)) + asc += ''.join( + c if c.isprintable() and c.isascii() else '.' + for c in map(chr, pwndbg.memory.read(cursor, ptr_size)) + ) if printed % 2 == 0: - out += '\t' + color_func(asc) + ('\t <-- ' + ', '.join(labels) if len(labels) else '') + out += '\t' + color_func(asc) + ( + '\t <-- ' + ', '.join(labels) if len(labels) else '' + ) asc = '' labels = [] @@ -655,43 +677,29 @@ def vis_heap_chunks(addr=None, count=None, naive=None): print(out) -def bin_ascii(bs): - from string import printable - valid_chars = list(map(ord, set(printable) - set('\t\r\n\x0c\x0b'))) - return ''.join(chr(c) if c in valid_chars else '.'for c in bs) - - -def bin_labels(addr, collections): +def bin_labels(addr, bin_type): labels = [] - for bins in collections: - bins_type = bins.get('type', None) - if not bins_type: - continue + allocator = pwndbg.heap.current - for size in filter(lambda x: x != 'type', bins.keys()): - b = bins[size] - if isinstance(size, int): - size = hex(size) - count = '/{:d}'.format(b[1]) if bins_type == 'tcachebins' else None - chunks = bin_addrs(b, bins_type) - for chunk_addr in chunks: - if addr == chunk_addr: - labels.append('{:s}[{:s}][{:d}{}]'.format(bins_type, size, chunks.index(addr), count or '')) + bins = allocator.get_bins(bin_type) + if bins is None: + return [] - return labels + for size, b in bins.bins.items(): + size_str = Bin.size_to_display_name(size) + if b.contains_chunk(addr): + count = '' + if bins.bin_type == BinType.TCACHE: + count = '/{:d}'.format(b.count) -def bin_addrs(b, bins_type): - addrs = [] - if bins_type == 'fastbins': - return b - # tcachebins consists of single linked list and entries count - elif bins_type == 'tcachebins': - addrs, _ = b - # normal bins consists of double linked list and may be corrupted (we can detect corruption) - else: # normal bin - addrs, _, _ = b - return addrs + labels.append( + '{:s}[{:s}][{:d}{:s}]'.format( + bins.bin_type.value, size_str, b.fd_chain.index(addr), count + ) + ) + + return labels try_free_parser = argparse.ArgumentParser(description='Check what would happen if free was called with given address') @@ -726,7 +734,7 @@ def try_free(addr): ptr_size = pwndbg.arch.ptrsize def unsigned_size(size): - # read_chunk()['size'] is signed in pwndbg ;/ + # read_chunk_from_gdb()['size'] is signed in pwndbg ;/ # there may be better way to handle that if ptr_size < 8: return ctypes.c_uint32(size).value @@ -752,7 +760,7 @@ def try_free(addr): # try to get the chunk try: - chunk = read_chunk(addr) + chunk = read_chunk_from_gdb(addr) except gdb.MemoryError as e: print(message.error('Can\'t read chunk at address 0x{:x}, memory error'.format(addr))) return @@ -835,10 +843,10 @@ def try_free(addr): if chunk_size_unmasked <= allocator.global_max_fast: print(message.notice('Fastbin checks')) chunk_fastbin_idx = allocator.fastbin_index(chunk_size_unmasked) - fastbin_list = allocator.fastbins(int(arena.address))[(chunk_fastbin_idx+2)*(ptr_size*2)] + fastbin_list = allocator.fastbins(int(arena.address)).bins[(chunk_fastbin_idx+2)*(ptr_size*2)] try: - next_chunk = read_chunk(addr + chunk_size_unmasked) + next_chunk = read_chunk_from_gdb(addr + chunk_size_unmasked) except gdb.MemoryError as e: print(message.error('Can\'t read next chunk at address 0x{:x}, memory error'.format(chunk + chunk_size_unmasked))) finalize(errors_found, returned_before_error) @@ -854,7 +862,7 @@ def try_free(addr): errors_found += 1 # chunk is not the same as the one on top of fastbin[idx] - if int(fastbin_list[0]) == addr: + if int(fastbin_list.fd_chain[0]) == addr: err = 'double free or corruption (fasttop) -> chunk already is on top of fastbin list\n' err += ' fastbin idx == {}' err = err.format(chunk_fastbin_idx) @@ -862,10 +870,10 @@ def try_free(addr): errors_found += 1 # chunk's size is ~same as top chunk's size - fastbin_top_chunk = int(fastbin_list[0]) + fastbin_top_chunk = int(fastbin_list.fd_chain[0]) if fastbin_top_chunk != 0: try: - fastbin_top_chunk = read_chunk(fastbin_top_chunk) + fastbin_top_chunk = read_chunk_from_gdb(fastbin_top_chunk) except gdb.MemoryError as e: print(message.error('Can\'t read top fastbin chunk at address 0x{:x}, memory error'.format(fastbin_top_chunk))) finalize(errors_found, returned_before_error) @@ -895,7 +903,7 @@ def try_free(addr): # next chunk is not beyond the boundaries of the arena NONCONTIGUOUS_BIT = 2 top_chunk_addr = (int(arena['top'])) - top_chunk = read_chunk(top_chunk_addr) + top_chunk = read_chunk_from_gdb(top_chunk_addr) next_chunk_addr = addr + chunk_size_unmasked # todo: in libc, addition may overflow @@ -908,7 +916,7 @@ def try_free(addr): # now we need to dereference chunk try : - next_chunk = read_chunk(next_chunk_addr) + next_chunk = read_chunk_from_gdb(next_chunk_addr) next_chunk_size = chunksize(unsigned_size(next_chunk['size'])) except (OverflowError, gdb.MemoryError) as e: print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_chunk_addr))) @@ -938,7 +946,7 @@ def try_free(addr): prev_chunk_addr = addr - prev_size try : - prev_chunk = read_chunk(prev_chunk_addr) + prev_chunk = read_chunk_from_gdb(prev_chunk_addr) prev_chunk_size = chunksize(unsigned_size(prev_chunk['size'])) except (OverflowError, gdb.MemoryError) as e: print(message.error('Can\'t read next chunk at address 0x{:x}'.format(prev_chunk_addr))) @@ -962,7 +970,7 @@ def try_free(addr): print(message.notice('Next chunk is not top chunk')) try : next_next_chunk_addr = next_chunk_addr + next_chunk_size - next_next_chunk = read_chunk(next_next_chunk_addr) + next_next_chunk = read_chunk_from_gdb(next_next_chunk_addr) except (OverflowError, gdb.MemoryError) as e: print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_next_chunk_addr))) finalize(errors_found, returned_before_error) @@ -980,12 +988,12 @@ def try_free(addr): # unsorted bin fd->bk should be unsorted bean unsorted_addr = int(arena['bins'][0]) try: - unsorted = read_chunk(unsorted_addr) + unsorted = read_chunk_from_gdb(unsorted_addr) try: - if read_chunk(unsorted['fd'])['bk'] != unsorted_addr: + if read_chunk_from_gdb(unsorted['fd'])['bk'] != unsorted_addr: err = 'free(): corrupted unsorted chunks -> unsorted_chunk->fd->bk != unsorted_chunk\n' err += 'unsorted at 0x{:x}, unsorted->fd == 0x{:x}, unsorted->fd->bk == 0x{:x}' - err = err.format(unsorted_addr, unsorted['fd'], read_chunk(unsorted['fd'])['bk']) + err = err.format(unsorted_addr, unsorted['fd'], read_chunk_from_gdb(unsorted['fd'])['bk']) print(message.error(err)) errors_found += 1 except (OverflowError, gdb.MemoryError) as e: diff --git a/pwndbg/heap/ptmalloc.py b/pwndbg/heap/ptmalloc.py index d0a3411db..bb906ba7e 100644 --- a/pwndbg/heap/ptmalloc.py +++ b/pwndbg/heap/ptmalloc.py @@ -1,5 +1,6 @@ import importlib from collections import OrderedDict +from enum import Enum from functools import wraps import gdb @@ -23,6 +24,101 @@ from pwndbg.heap import heap_chain_limit HEAP_MAX_SIZE = 1024 * 1024 if pwndbg.arch.ptrsize == 4 else 2 * 4 * 1024 * 1024 * 8 +# Note that we must inherit from `str` before `Enum`: https://stackoverflow.com/a/58608362/803801 +class BinType(str, Enum): + TCACHE = 'tcachebins' + FAST = 'fastbins' + SMALL = 'smallbins' + LARGE = 'largebins' + UNSORTED = 'unsortedbin' + NOT_IN_BIN = 'not_in_bin' + + def valid_fields(self): + if self in [BinType.FAST, BinType.TCACHE]: + return ['fd'] + elif self in [BinType.SMALL, BinType.UNSORTED]: + return ['fd', 'bk'] + elif self == BinType.LARGE: + return ['fd', 'bk', 'fd_nextsize', 'bk_nextsize'] + +class Bin: + def __init__(self, fd_chain, bk_chain=None, count=None, is_corrupted=False): + self.fd_chain = fd_chain + self.bk_chain = bk_chain + self.count = count + self.is_corrupted = is_corrupted + + def contains_chunk(self, chunk): + return chunk in self.fd_chain + + @staticmethod + def size_to_display_name(size): + if size == 'all': + return size + + assert isinstance(size, int) + + return hex(size) + + +class Bins: + def __init__(self, bin_type): + self.bins = OrderedDict() + self.bin_type = bin_type + + # TODO: There's a bunch of bin-specific logic in here, maybe we should + # subclass and put that logic in there + def contains_chunk(self, size, chunk): + # TODO: It will be the same thing, but it would be better if we used + # pwndbg.heap.current.size_sz. I think each bin should already have a + # reference to the allocator and shouldn't need to access the `current` + # variable + ptr_size = pwndbg.arch.ptrsize + + if self.bin_type == BinType.UNSORTED: + # The unsorted bin only has one bin called 'all' + + # TODO: We shouldn't be mixing int and str types like this + size = 'all' + elif self.bin_type == BinType.LARGE: + # All the other bins (other than unsorted) store chunks of the same + # size in a bin, so we can use the size directly. But the largebin + # stores a range of sizes, so we need to compute which bucket this + # chunk falls into + + # TODO: Refactor this, the bin should know how to calculate + # largebin_index without calling into the allocator + size = pwndbg.heap.current.largebin_index(size) - 64 + elif self.bin_type == BinType.TCACHE: + # Unlike fastbins, tcache bins don't store the chunk address in the + # bins, they store the address of the fd pointer, so we need to + # search for that address in the tcache bin instead + + # TODO: Can we use chunk_key_offset? + chunk += ptr_size * 2 + + if size in self.bins: + return self.bins[size].contains_chunk(chunk) + + return False + + +def read_chunk_from_gdb(addr): + """Read a chunk's metadata.""" + # In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`. + # To support both versions, change the new names to the old ones here so that + # the rest of the code can deal with uniform names. + renames = { + "mchunk_size": "size", + "mchunk_prev_size": "prev_size", + } + if not pwndbg.config.resolve_heap_via_heuristic: + val = pwndbg.typeinfo.read_gdbvalue("struct malloc_chunk", addr) + else: + val = pwndbg.heap.current.malloc_chunk(addr) + return dict({ renames.get(key, key): int(val[key]) for key in val.type.keys() }) + + def heap_for_ptr(ptr): """Round a pointer to a chunk down to find its corresponding heap_info struct, the pointer must point inside a heap which does not belong to @@ -288,11 +384,47 @@ class Heap(pwndbg.heap.heap.BaseHeap): def get_heap(self, addr): raise NotImplementedError() + def get_first_chunk_in_heap(self, heap_start=None): + # Find which arena this heap belongs to + if heap_start: + arena = self.get_arena_for_chunk(heap_start) + else: + arena = self.get_arena() + + ptr_size = self.size_sz + + start_addr = heap_start + if arena != self.main_arena: + start_addr += self.heap_info.sizeof + heap_region = self.get_heap_boundaries(heap_start) + if pwndbg.vmmap.find(self.get_heap(heap_start)['ar_ptr']) == heap_region: + # Round up to a 2-machine-word alignment after an arena to + # compensate for the presence of the have_fastchunks variable + # in GLIBC versions >= 2.27. + start_addr += pwndbg.memory.align_down( + self.malloc_state.sizeof + ptr_size, + self.malloc_alignment + ) + + # In glibc 2.26, the malloc_alignment for i386 was hardcoded to 16 (instead + # of 2*sizeof(size_t), which is 8). In order for the data to be aligned to + # 16 bytes, the first chunk now needs to start offset 8 instead of offset 0 + + # TODO: Can we just check if this is 32bit and >= glibc 2.26? This type of + # check is confusing as is, and unnecessary in most cases + first_chunk_size = pwndbg.arch.unpack( + pwndbg.memory.read(start_addr + ptr_size, ptr_size) + ) + if first_chunk_size == 0: + start_addr += ptr_size * 2 + + return start_addr + def get_arena(self, arena_addr=None): raise NotImplementedError() def get_arena_for_chunk(self, addr): - chunk = pwndbg.commands.heap.read_chunk(addr) + chunk = read_chunk_from_gdb(addr) _,_,nm = self.chunk_flags(chunk['size']) if nm: r=self.get_arena(arena_addr=self.get_heap(addr)['ar_ptr']) @@ -310,6 +442,20 @@ class Heap(pwndbg.heap.heap.BaseHeap): """Find the memory map containing 'addr'.""" return pwndbg.vmmap.find(addr) + def get_bins(self, bin_type, addr=None): + if bin_type == BinType.TCACHE: + return self.tcachebins(addr) + elif bin_type == BinType.FAST: + return self.fastbins(addr) + elif bin_type == BinType.UNSORTED: + return self.unsortedbin(addr) + elif bin_type == BinType.SMALL: + return self.smallbins(addr) + elif bin_type == BinType.LARGE: + return self.largebins(addr) + else: + return None + def fastbin_index(self, size): if pwndbg.arch.ptrsize == 8: return (size >> 4) - 2 @@ -318,33 +464,38 @@ class Heap(pwndbg.heap.heap.BaseHeap): def fastbins(self, arena_addr=None): """Returns: chain or None""" + result = Bins(BinType.FAST) arena = self.get_arena(arena_addr) if arena is None: - return + return result - fastbinsY = arena['fastbinsY'] - fd_offset = self.chunk_key_offset('fd') + fastbinsY = arena['fastbinsY'] + fd_offset = self.chunk_key_offset('fd') num_fastbins = 7 - size = pwndbg.arch.ptrsize * 2 + size = pwndbg.arch.ptrsize * 2 safe_lnk = pwndbg.glibc.check_safe_linking() - result = OrderedDict() for i in range(num_fastbins): size += pwndbg.arch.ptrsize * 2 - chain = pwndbg.chain.get(int(fastbinsY[i]), offset=fd_offset, limit=heap_chain_limit, safe_linking=safe_lnk) + chain = pwndbg.chain.get( + int(fastbinsY[i]), + offset=fd_offset, + limit=heap_chain_limit, + safe_linking=safe_lnk + ) - result[size] = chain + result.bins[size] = Bin(chain) - result['type'] = 'fastbins' return result def tcachebins(self, tcache_addr=None): """Returns: tuple(chain, count) or None""" + result = Bins(BinType.TCACHE) tcache = self.get_tcache(tcache_addr) if tcache is None: - return + return result counts = tcache['counts'] entries = tcache['entries'] @@ -356,15 +507,18 @@ class Heap(pwndbg.heap.heap.BaseHeap): """Tcache bin index to chunk size, following tidx2usize macro in glibc malloc.c""" return idx * self.malloc_alignment + self.minsize - self.size_sz - result = OrderedDict() for i in range(num_tcachebins): size = self._request2size(tidx2usize(i)) count = int(counts[i]) - chain = pwndbg.chain.get(int(entries[i]), offset=self.tcache_next_offset, limit=heap_chain_limit, safe_linking=safe_lnk) + chain = pwndbg.chain.get( + int(entries[i]), + offset=self.tcache_next_offset, + limit=heap_chain_limit, + safe_linking=safe_lnk + ) - result[size] = (chain, count) + result.bins[size] = Bin(chain, count=count) - result['type'] = 'tcachebins' return result def bin_at(self, index, arena_addr=None): @@ -387,18 +541,24 @@ class Heap(pwndbg.heap.heap.BaseHeap): return normal_bins = arena['bins'] - num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof + num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof - bins_base = int(normal_bins.address) - (pwndbg.arch.ptrsize* 2) + bins_base = int(normal_bins.address) - (pwndbg.arch.ptrsize * 2) current_base = bins_base + (index * pwndbg.arch.ptrsize * 2) front, back = normal_bins[index * 2], normal_bins[index * 2 + 1] - fd_offset = self.chunk_key_offset('fd') - bk_offset = self.chunk_key_offset('bk') + fd_offset = self.chunk_key_offset('fd') + bk_offset = self.chunk_key_offset('bk') is_chain_corrupted = False - get_chain = lambda bin, offset: pwndbg.chain.get(int(bin), offset=offset, hard_stop=current_base, limit=heap_chain_limit, include_start=True) + get_chain = lambda bin, offset: pwndbg.chain.get( + int(bin), + offset=offset, + hard_stop=current_base, + limit=heap_chain_limit, + include_start=True + ) chain_fd = get_chain(front, fd_offset) chain_bk = get_chain(back, bk_offset) @@ -414,51 +574,99 @@ class Heap(pwndbg.heap.heap.BaseHeap): return (chain_fd, chain_bk, is_chain_corrupted) def unsortedbin(self, arena_addr=None): - chain = self.bin_at(1, arena_addr=arena_addr) - result = OrderedDict() + result = Bins(BinType.UNSORTED) + chain = self.bin_at(1, arena_addr=arena_addr) if chain is None: - return + return result - result['all'] = chain + fd_chain, bk_chain, is_corrupted = chain + result.bins['all'] = Bin(fd_chain, bk_chain, is_corrupted=is_corrupted) - result['type'] = 'unsortedbin' return result def smallbins(self, arena_addr=None): - size = self.min_chunk_size - self.malloc_alignment + size = self.min_chunk_size - self.malloc_alignment spaces_table = self._spaces_table() - result = OrderedDict() + result = Bins(BinType.SMALL) for index in range(2, 64): size += spaces_table[index] chain = self.bin_at(index, arena_addr=arena_addr) if chain is None: - return + # TODO: Should I return an empty Bins() instead? + return result - result[size] = chain + fd_chain, bk_chain, is_corrupted = chain + result.bins[size] = Bin( + fd_chain, bk_chain, is_corrupted=is_corrupted + ) - result['type'] = 'smallbins' return result def largebins(self, arena_addr=None): - size = (ptmalloc.NSMALLBINS * self.malloc_alignment) - self.malloc_alignment + size = ( + ptmalloc.NSMALLBINS * self.malloc_alignment + ) - self.malloc_alignment spaces_table = self._spaces_table() - result = OrderedDict() + result = Bins(BinType.LARGE) for index in range(64, 127): size += spaces_table[index] chain = self.bin_at(index, arena_addr=arena_addr) if chain is None: - return + # TODO: Should I return an empty Bins() instead? + return result - result[size] = chain + fd_chain, bk_chain, is_corrupted = chain + result.bins[size] = Bin( + fd_chain, bk_chain, is_corrupted=is_corrupted + ) - result['type'] = 'largebins' return result + + def chunks(self, addr): + # TODO: Add assertions to verify alignment? Maybe write a decorator + # that enforces that an argument is aligned + + heap_region = self.get_heap_boundaries(addr) + arena = self.get_arena_for_chunk(addr) + top_chunk = arena['top'] + + chunk = addr + while chunk in heap_region: + yield chunk + + if chunk == top_chunk: + break + + old_chunk = chunk + chunk = self.next_chunk(chunk) + + # Avoid an infinite loop when a chunk's size is 0. + if old_chunk == chunk: + break + + def chunk_size_nomask(self, addr): + return pwndbg.memory.u(addr + self.chunk_key_offset('size')) + + def chunk_size(self, addr): + # TODO: Check if this breaks on 32-bit glibc versions >= 2.26, where + # only the data is aligned and not the chunk address + return pwndbg.memory.align_down( + self.chunk_size_nomask(addr), self.malloc_alignment + ) + + def next_chunk(self, addr): + return addr + self.chunk_size(addr) + + def prev_inuse(self, addr): + prev_inuse, _, _ = self.chunk_flags(self.chunk_size(addr)) + return prev_inuse == ptmalloc.PREV_INUSE + def largebin_index_32(self, sz): """Modeled on the GLIBC malloc largebin_index_32 macro. diff --git a/tests/test_heap_bins.py b/tests/test_heap_bins.py index 6ea00bc25..a635ba127 100644 --- a/tests/test_heap_bins.py +++ b/tests/test_heap_bins.py @@ -5,6 +5,7 @@ import pwndbg.memory import pwndbg.symbol import pwndbg.vmmap import tests +from pwndbg.heap.ptmalloc import BinType BINARY = tests.binaries.get('heap_bins.out') @@ -39,94 +40,94 @@ def test_heap_bins(start_binary): largebin_count = pwndbg.memory.u64(addr) result = allocator.tcachebins() - assert result['type'] == 'tcachebins' - assert tcache_size in result - assert result[tcache_size][1] == 0 and len(result[tcache_size][0]) == 1 + assert result.bin_type == BinType.TCACHE + assert tcache_size in result.bins + assert result.bins[tcache_size].count == 0 and len(result.bins[tcache_size].fd_chain) == 1 result = allocator.fastbins() - assert result['type'] == 'fastbins' - assert fastbin_size in result - assert len(result[fastbin_size]) == 1 + assert result.bin_type == BinType.FAST + assert fastbin_size in result.bins + assert len(result.bins[fastbin_size].fd_chain) == 1 result = allocator.unsortedbin() - assert result['type'] == 'unsortedbin' - assert len(result['all'][0]) == 1 - assert not result['all'][2] + assert result.bin_type == BinType.UNSORTED + assert len(result.bins['all'].fd_chain) == 1 + assert not result.bins['all'].is_corrupted result = allocator.smallbins() - assert result['type'] == 'smallbins' - assert smallbin_size in result - assert len(result[smallbin_size][0]) == 1 and len(result[smallbin_size][1]) == 1 - assert not result[smallbin_size][2] + assert result.bin_type == BinType.SMALL + assert smallbin_size in result.bins + assert len(result.bins[smallbin_size].fd_chain) == 1 and len(result.bins[smallbin_size].bk_chain) == 1 + assert not result.bins[smallbin_size].is_corrupted result = allocator.largebins() - assert result['type'] == 'largebins' - largebin_size = list(result.items())[allocator.largebin_index(largebin_size) - 64][0] - assert largebin_size in result - assert len(result[largebin_size][0]) == 1 and len(result[largebin_size][1]) == 1 - assert not result[largebin_size][2] + assert result.bin_type == BinType.LARGE + largebin_size = list(result.bins.items())[allocator.largebin_index(largebin_size) - 64][0] + assert largebin_size in result.bins + assert len(result.bins[largebin_size].fd_chain) == 1 and len(result.bins[largebin_size].bk_chain) == 1 + assert not result.bins[largebin_size].is_corrupted # check tcache gdb.execute('continue') result = allocator.tcachebins() - assert result['type'] == 'tcachebins' - assert tcache_size in result - assert result[tcache_size][1] == tcache_count and len(result[tcache_size][0]) == tcache_count + 1 - for addr in result[tcache_size][0][:-1]: + assert result.bin_type == BinType.TCACHE + assert tcache_size in result.bins + assert result.bins[tcache_size].count == tcache_count and len(result.bins[tcache_size].fd_chain) == tcache_count + 1 + for addr in result.bins[tcache_size].fd_chain[:-1]: assert pwndbg.vmmap.find(addr) # check fastbin gdb.execute('continue') result = allocator.fastbins() - assert result['type'] == 'fastbins' - assert (fastbin_size in result) and (len(result[fastbin_size]) == fastbin_count + 1) - for addr in result[fastbin_size][:-1]: + assert result.bin_type == BinType.FAST + assert (fastbin_size in result.bins) and (len(result.bins[fastbin_size].fd_chain) == fastbin_count + 1) + for addr in result.bins[fastbin_size].fd_chain[:-1]: assert pwndbg.vmmap.find(addr) # check unsortedbin gdb.execute('continue') result = allocator.unsortedbin() - assert result['type'] == 'unsortedbin' - assert len(result['all'][0]) == smallbin_count + 2 and len(result['all'][1]) == smallbin_count + 2 - assert not result['all'][2] - for addr in result['all'][0][:-1]: + assert result.bin_type == BinType.UNSORTED + assert len(result.bins['all'].fd_chain) == smallbin_count + 2 and len(result.bins['all'].bk_chain) == smallbin_count + 2 + assert not result.bins['all'].is_corrupted + for addr in result.bins['all'].fd_chain[:-1]: assert pwndbg.vmmap.find(addr) - for addr in result['all'][1][:-1]: + for addr in result.bins['all'].bk_chain[:-1]: assert pwndbg.vmmap.find(addr) # check smallbins gdb.execute('continue') result = allocator.smallbins() - assert result['type'] == 'smallbins' - assert len(result[smallbin_size][0]) == smallbin_count + 2 and len(result[smallbin_size][1]) == smallbin_count + 2 - assert not result[smallbin_size][2] - for addr in result[smallbin_size][0][:-1]: + assert result.bin_type == BinType.SMALL + assert len(result.bins[smallbin_size].fd_chain) == smallbin_count + 2 and len(result.bins[smallbin_size].bk_chain) == smallbin_count + 2 + assert not result.bins[smallbin_size].is_corrupted + for addr in result.bins[smallbin_size].fd_chain[:-1]: assert pwndbg.vmmap.find(addr) - for addr in result[smallbin_size][1][:-1]: + for addr in result.bins[smallbin_size].bk_chain[:-1]: assert pwndbg.vmmap.find(addr) # check largebins gdb.execute('continue') result = allocator.largebins() - assert result['type'] == 'largebins' - assert len(result[largebin_size][0]) == largebin_count + 2 and len(result[largebin_size][1]) == largebin_count + 2 - assert not result[largebin_size][2] - for addr in result[largebin_size][0][:-1]: + assert result.bin_type == BinType.LARGE + assert len(result.bins[largebin_size].fd_chain) == largebin_count + 2 and len(result.bins[largebin_size].bk_chain) == largebin_count + 2 + assert not result.bins[largebin_size].is_corrupted + for addr in result.bins[largebin_size].fd_chain[:-1]: assert pwndbg.vmmap.find(addr) - for addr in result[largebin_size][1][:-1]: + for addr in result.bins[largebin_size].bk_chain[:-1]: assert pwndbg.vmmap.find(addr) # check corrupted gdb.execute('continue') result = allocator.smallbins() - assert result['type'] == 'smallbins' - assert result[smallbin_size][2] + assert result.bin_type == BinType.SMALL + assert result.bins[smallbin_size].is_corrupted result = allocator.largebins() - assert result['type'] == 'largebins' - assert result[largebin_size][2] + assert result.bin_type == BinType.LARGE + assert result.bins[largebin_size].is_corrupted