From b51b07de77c687bc513f1f30e7134c40f01afb2b Mon Sep 17 00:00:00 2001 From: Disconnect3d Date: Mon, 22 Aug 2022 22:38:05 +0200 Subject: [PATCH] Revert "Refactor heap code (#1063)" (#1084) This reverts commit a0f3744743042091225cb47924f7923b934a5ce3. --- pwndbg/commands/heap.py | 670 ++++++++++++++++++++-------------------- pwndbg/heap/ptmalloc.py | 276 ++--------------- tests/test_heap_bins.py | 89 +++--- 3 files changed, 409 insertions(+), 626 deletions(-) diff --git a/pwndbg/commands/heap.py b/pwndbg/commands/heap.py index ac6d0e4d3..a3082681e 100644 --- a/pwndbg/commands/heap.py +++ b/pwndbg/commands/heap.py @@ -12,14 +12,25 @@ import pwndbg.glibc import pwndbg.typeinfo from pwndbg.color import generateColorFunction from pwndbg.color import message -from pwndbg.color import underline from pwndbg.commands.config import extend_value_with_default from pwndbg.commands.config import get_config_parameters from pwndbg.commands.config import print_row -from pwndbg.heap.ptmalloc import Bin -from pwndbg.heap.ptmalloc import Bins -from pwndbg.heap.ptmalloc import BinType -from pwndbg.heap.ptmalloc import read_chunk_from_gdb + + +def read_chunk(addr): + """Read a chunk's metadata.""" + # In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`. + # To support both versions, change the new names to the old ones here so that + # the rest of the code can deal with uniform names. + renames = { + "mchunk_size": "size", + "mchunk_prev_size": "prev_size", + } + if not pwndbg.config.resolve_heap_via_heuristic: + val = pwndbg.typeinfo.read_gdbvalue("struct malloc_chunk", addr) + else: + val = pwndbg.heap.current.malloc_chunk(addr) + return dict({ renames.get(key, key): int(val[key]) for key in val.type.keys() }) def format_bin(bins, verbose=False, offset=None): @@ -28,44 +39,49 @@ def format_bin(bins, verbose=False, offset=None): offset = allocator.chunk_key_offset('fd') result = [] - bins_type = bins.bin_type - - for size, b in bins.bins.items(): - if not verbose and ( - b.fd_chain == [0] and not b.count - ) and not b.is_corrupted: - continue + bins_type = bins.pop('type') - # TODO: Abstract this away + for size in bins: + b = bins[size] + count, is_chain_corrupted = None, False safe_lnk = False - if bins_type in [BinType.FAST, BinType.TCACHE]: + + # fastbins consists of only single linked list + if bins_type == 'fastbins': + chain_fd = b + safe_lnk = pwndbg.glibc.check_safe_linking() + # tcachebins consists of single linked list and entries count + elif bins_type == 'tcachebins': + chain_fd, count = b safe_lnk = pwndbg.glibc.check_safe_linking() + # normal bins consists of double linked list and may be corrupted (we can detect corruption) + else: # normal bin + chain_fd, chain_bk, is_chain_corrupted = b + + if not verbose and (chain_fd == [0] and not count) and not is_chain_corrupted: + continue - if bins_type == BinType.TCACHE: - limit = min(8, b.count + 1) + if bins_type == 'tcachebins': + limit = 8 + if count <= 7: + limit = count + 1 + formatted_chain = pwndbg.chain.format(chain_fd[0], offset=offset, limit=limit, safe_linking=safe_lnk) else: - limit = pwndbg.chain.LIMIT + formatted_chain = pwndbg.chain.format(chain_fd[0], offset=offset, safe_linking=safe_lnk) - formatted_chain = pwndbg.chain.format( - b.fd_chain[0], limit=limit, offset=offset, safe_linking=safe_lnk - ) - size_str = Bin.size_to_display_name(size) + if isinstance(size, int): + size = hex(size) - if b.is_corrupted: - line = message.hint(size_str) + message.error(' [corrupted]') + '\n' + if is_chain_corrupted: + line = message.hint(size) + message.error(' [corrupted]') + '\n' line += message.hint('FD: ') + formatted_chain + '\n' - line += message.hint('BK: ') + pwndbg.chain.format( - b.bk_chain[0], offset=allocator.chunk_key_offset('bk') - ) + line += message.hint('BK: ') + pwndbg.chain.format(chain_bk[0], offset=allocator.chunk_key_offset('bk')) else: - line = message.hint(size_str) - if b.count is not None: - line += message.hint(' [%3d]' % b.count) - - line += ': ' - line.ljust(13) - + if count is not None: + line = (message.hint(size) + message.hint(' [%3d]' % count) + ': ').ljust(13) + else: + line = (message.hint(size) + ': ').ljust(13) line += formatted_chain result.append(line) @@ -78,26 +94,9 @@ def format_bin(bins, verbose=False, offset=None): parser = argparse.ArgumentParser() parser.description = "Iteratively print chunks on a heap, default to the current thread's active heap." -parser.add_argument( - "addr", - nargs="?", - type=int, - default=None, - help= - "Address of the first chunk (malloc_chunk struct start, prev_size field)." -) -parser.add_argument( - "-v", - "--verbose", - action="store_true", - help="Print all chunk fields, even unused ones." -) -parser.add_argument( - "-s", - "--simple", - action="store_true", - help="Simply print malloc_chunk struct's contents." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the first chunk (malloc_chunk struct start, prev_size field).") +parser.add_argument("-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones.") +parser.add_argument("-s", "--simple", action="store_true", help="Simply print malloc_chunk struct's contents.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -107,23 +106,49 @@ def heap(addr=None, verbose=False, simple=False): active heap. """ allocator = pwndbg.heap.current + heap_region = allocator.get_heap_boundaries(addr) + arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena() + top_chunk = arena['top'] + ptr_size = allocator.size_sz + + # Store the heap base address in a GDB variable that can be used in other + # GDB commands + gdb.execute('set $heap_base=0x{:x}'.format(heap_region.start)) - # If an address was supplied, start printing from there, otherwise start - # from the first chunk in the heap region + # Calculate where to start printing; if an address was supplied, use that, + # if this heap belongs to the main arena, start at the beginning of the + # heap's mapping, otherwise, compensate for the presence of a heap_info + # struct and possibly an arena. if addr: cursor = int(addr) + elif arena == allocator.main_arena: + cursor = heap_region.start else: - heap_region = allocator.get_heap_boundaries(addr) - cursor = allocator.get_first_chunk_in_heap(heap_region.start) + cursor = heap_region.start + allocator.heap_info.sizeof + if pwndbg.vmmap.find(allocator.get_heap(heap_region.start)['ar_ptr']) == heap_region: + # Round up to a 2-machine-word alignment after an arena to + # compensate for the presence of the have_fastchunks variable + # in GLIBC versions >= 2.27. + cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask + # i686 alignment heuristic + first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size)) + if first_chunk_size == 0: + cursor += ptr_size * 2 - # Store the heap base address in a GDB variable that can be used in other - # GDB commands - # TODO: See https://github.com/pwndbg/pwndbg/issues/1060 - gdb.execute('set $heap_base=0x{:x}'.format(heap_region.start)) + while cursor in heap_region: + malloc_chunk(cursor, verbose=verbose, simple=simple) + + if cursor == top_chunk: + break + + size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size')) + real_size = size_field & ~allocator.malloc_align_mask + cursor += real_size - for chunk in allocator.chunks(cursor): - malloc_chunk(chunk, verbose=verbose, simple=simple) + # Avoid an infinite loop when a chunk's size is 0. + if real_size == 0: + break parser = argparse.ArgumentParser() @@ -184,9 +209,7 @@ def mp(): parser = argparse.ArgumentParser() parser.description = "Print relevant information about an arena's top chunk, default to current thread's arena." -parser.add_argument( - "addr", nargs="?", type=int, default=None, help="Address of the arena." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -198,68 +221,18 @@ def top_chunk(addr=None): allocator = pwndbg.heap.current arena = allocator.get_arena(addr) address = arena['top'] - size = allocator.chunk_size_nomask(int(address)) + size = pwndbg.memory.u(int(address) + allocator.chunk_key_offset('size')) - out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format( - M.get(address), size - ) + out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format(M.get(address), size) print(out) -def get_chunk_bin(addr): - # points to the real start of the chunk - cursor = int(addr) - - allocator = pwndbg.heap.current - size = allocator.chunk_size(cursor) - - arena = allocator.get_arena_for_chunk(addr) - - bins = [ - allocator.fastbins(arena.address), - allocator.smallbins(arena.address), - allocator.largebins(arena.address), - allocator.unsortedbin(arena.address), - ] - - if allocator.has_tcache(): - bins.append(allocator.tcachebins(None)) - - # TODO: What if we somehow got this chunk into a bin of a different size? We - # would miss it with this logic. Should we check every bin? - res = [] - for bin_ in bins: - if bin_.contains_chunk(size, cursor): - res.append(bin_.bin_type) - - if len(res) == 0: - return [BinType.NOT_IN_BIN] - else: - return res - - parser = argparse.ArgumentParser() parser.description = "Print a chunk." -parser.add_argument( - "addr", - type=int, - help="Address of the chunk (malloc_chunk struct start, prev_size field)." -) -parser.add_argument( - "-f", "--fake", action="store_true", help="Is this a fake chunk?" -) -parser.add_argument( - "-v", - "--verbose", - action="store_true", - help="Print all chunk fields, even unused ones." -) -parser.add_argument( - "-s", - "--simple", - action="store_true", - help="Simply print malloc_chunk struct's contents." -) +parser.add_argument("addr", type=int, help="Address of the chunk (malloc_chunk struct start, prev_size field).") +parser.add_argument("-f", "--fake", action="store_true", help="Is this a fake chunk?") +parser.add_argument("-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones.") +parser.add_argument("-s", "--simple", action="store_true", help="Simply print malloc_chunk struct's contents.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -270,34 +243,36 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False): cursor = int(addr) allocator = pwndbg.heap.current + ptr_size = allocator.size_sz - size_field = allocator.chunk_size_nomask(cursor) - real_size = allocator.chunk_size(cursor) + size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size')) + real_size = size_field & ~allocator.malloc_align_mask headers_to_print = [] # both state (free/allocated) and flags fields_to_print = set() # in addition to addr and size - - prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(size_field) - if prev_inuse: - headers_to_print.append(message.hint('PREV_INUSE')) - if is_mmapped: - headers_to_print.append(message.hint('IS_MMAPED')) - if non_main_arena: - headers_to_print.append(message.hint('NON_MAIN_ARENA')) + out_fields = "Addr: {}\n".format(M.get(cursor)) if fake: - headers_to_print.append(message.on('Fake chunk')) + headers_to_print.append(message.on("Fake chunk")) verbose = True # print all fields for fake chunks if simple: - chunk = read_chunk_from_gdb(cursor) + chunk = read_chunk(cursor) + + if not headers_to_print: + headers_to_print.append(message.hint(M.get(cursor))) - # The address should be the first header - headers_to_print.insert(0, message.hint(M.get(cursor))) + prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(int(chunk['size'])) + if prev_inuse: + headers_to_print.append(message.hint('PREV_INUSE')) + if is_mmapped: + headers_to_print.append(message.hint('IS_MMAPED')) + if non_main_arena: + headers_to_print.append(message.hint('NON_MAIN_ARENA')) print(' | '.join(headers_to_print)) for key, val in chunk.items(): - print(message.system(key) + ': 0x{:02x}'.format(int(val))) + print(message.system(key) + ": 0x{:02x}".format(int(val))) print('') return @@ -308,55 +283,70 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False): arena_address = arena.address top_chunk = arena['top'] if cursor == top_chunk: - headers_to_print.append(message.off('Top chunk')) + headers_to_print.append(message.off("Top chunk")) is_top = True if not is_top: - bin_types = get_chunk_bin(cursor) - if BinType.NOT_IN_BIN in bin_types: - headers_to_print.append(message.hint('Allocated chunk')) + fastbins = allocator.fastbins(arena_address) or {} + smallbins = allocator.smallbins(arena_address) or {} + largebins = allocator.largebins(arena_address) or {} + unsortedbin = allocator.unsortedbin(arena_address) or {} + if allocator.has_tcache(): + tcachebins = allocator.tcachebins(None) + + if real_size in fastbins.keys() and cursor in fastbins[real_size]: + headers_to_print.append(message.on("Free chunk (fastbins)")) + if not verbose: + fields_to_print.add('fd') + + elif real_size in smallbins.keys() and cursor in bin_addrs(smallbins[real_size], "smallbins"): + headers_to_print.append(message.on("Free chunk (smallbins)")) + if not verbose: + fields_to_print.update(['fd', 'bk']) + + elif real_size >= list(largebins.items())[0][0] and cursor in bin_addrs(largebins[(list(largebins.items())[allocator.largebin_index(real_size) - 64][0])], "largebins"): + headers_to_print.append(message.on("Free chunk (largebins)")) + if not verbose: + fields_to_print.update(['fd', 'bk', 'fd_nextsize', 'bk_nextsize']) + + elif cursor in bin_addrs(unsortedbin['all'], "unsortedbin"): + headers_to_print.append(message.on("Free chunk (unsortedbin)")) + if not verbose: + fields_to_print.update(['fd', 'bk']) + + elif allocator.has_tcache() and real_size in tcachebins.keys() and cursor + ptr_size*2 in bin_addrs(tcachebins[real_size], "tcachebins"): + headers_to_print.append(message.on("Free chunk (tcache)")) + if not verbose: + fields_to_print.add('fd') else: - # TODO: Handle a chunk being in multiple bins - bin_type = bin_types[0] - headers_to_print.append( - message.on('Free chunk ({})'.format('|'.join(bin_types))) - ) - for bin_type in bin_types: - fields_to_print.update(bin_type.valid_fields()) - - - out_fields = ['Addr: {}'.format(M.get(cursor))] - fields_ordered = [ - 'prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize' - ] + headers_to_print.append(message.hint("Allocated chunk")) + if verbose: - fields_to_print.update(fields_ordered) + fields_to_print.update(['prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize']) else: - out_fields.append('Size: 0x{:02x}'.format(size_field)) + out_fields += "Size: 0x{:02x}\n".format(size_field) + + prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(size_field) + if prev_inuse: + headers_to_print.append(message.hint('PREV_INUSE')) + if is_mmapped: + headers_to_print.append(message.hint('IS_MMAPED')) + if non_main_arena: + headers_to_print.append(message.hint('NON_MAIN_ARENA')) - print(' | '.join(headers_to_print)) + fields_ordered = ['prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize'] for field_to_print in fields_ordered: if field_to_print in fields_to_print: - field_val = pwndbg.memory.u(cursor + allocator.chunk_key_offset(field_to_print)) - out_fields.append(message.system(field_to_print) + ': 0x{:02x}'.format(field_val)) + out_fields += message.system(field_to_print) + ": 0x{:02x}\n".format(pwndbg.memory.u(cursor + allocator.chunk_key_offset(field_to_print))) - print('\n'.join(out_fields)) - print('') + print(' | '.join(headers_to_print) + "\n" + out_fields) parser = argparse.ArgumentParser() parser.description = "Print the contents of all an arena's bins and a thread's tcache, default to the current thread's arena and tcache." -parser.add_argument( - "addr", nargs="?", type=int, default=None, help="Address of the arena." -) -parser.add_argument( - "tcache_addr", - nargs="?", - type=int, - default=None, - help="Address of the tcache." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") +parser.add_argument("tcache_addr", nargs="?", type=int, default=None, help="Address of the tcache.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -373,35 +363,10 @@ def bins(addr=None, tcache_addr=None): largebins(addr) -def print_bins(bin_type, addr=None, verbose=False): - allocator = pwndbg.heap.current - offset = None - - # TODO: Abstract this away - if bin_type == BinType.TCACHE: - offset = allocator.tcache_next_offset - else: - offset = None - - bins = allocator.get_bins(bin_type, addr=addr) - if bins is None: - return - - formatted_bins = format_bin(bins, verbose, offset=offset) - - print(C.banner(bin_type.value)) - for node in formatted_bins: - print(node) - - parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's fastbins, default to the current thread's arena." -parser.add_argument( - "addr", nargs="?", type=int, default=None, help="Address of the arena." -) -parser.add_argument( - "verbose", nargs="?", type=bool, default=True, help="Show extra detail." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") +parser.add_argument("verbose", nargs="?", type=bool, default=True, help="Show extra detail.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -410,17 +375,23 @@ def fastbins(addr=None, verbose=True): """Print the contents of an arena's fastbins, default to the current thread's arena. """ - print_bins(BinType.FAST, addr, verbose) + allocator = pwndbg.heap.current + fastbins = allocator.fastbins(addr) + + if fastbins is None: + return + + formatted_bins = format_bin(fastbins, verbose) + + print(C.banner('fastbins')) + for node in formatted_bins: + print(node) parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's unsortedbin, default to the current thread's arena." -parser.add_argument( - "addr", nargs="?", type=int, default=None, help="Address of the arena." -) -parser.add_argument( - "verbose", nargs="?", type=bool, default=True, help="Show extra detail." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") +parser.add_argument("verbose", nargs="?", type=bool, default=True, help="Show extra detail.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -429,17 +400,23 @@ def unsortedbin(addr=None, verbose=True): """Print the contents of an arena's unsortedbin, default to the current thread's arena. """ - print_bins(BinType.UNSORTED, addr, verbose) + allocator = pwndbg.heap.current + unsortedbin = allocator.unsortedbin(addr) + + if unsortedbin is None: + return + + formatted_bins = format_bin(unsortedbin, verbose) + + print(C.banner('unsortedbin')) + for node in formatted_bins: + print(node) parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's smallbins, default to the current thread's arena." -parser.add_argument( - "addr", nargs="?", type=int, default=None, help="Address of the arena." -) -parser.add_argument( - "verbose", nargs="?", type=bool, default=False, help="Show extra detail." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") +parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Show extra detail.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -448,17 +425,23 @@ def smallbins(addr=None, verbose=False): """Print the contents of an arena's smallbins, default to the current thread's arena. """ - print_bins(BinType.SMALL, addr, verbose) + allocator = pwndbg.heap.current + smallbins = allocator.smallbins(addr) + + if smallbins is None: + return + + formatted_bins = format_bin(smallbins, verbose) + + print(C.banner('smallbins')) + for node in formatted_bins: + print(node) parser = argparse.ArgumentParser() parser.description = "Print the contents of an arena's largebins, default to the current thread's arena." -parser.add_argument( - "addr", nargs="?", type=int, default=None, help="Address of the arena." -) -parser.add_argument( - "verbose", nargs="?", type=bool, default=False, help="Show extra detail." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.") +parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Show extra detail.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -467,25 +450,23 @@ def largebins(addr=None, verbose=False): """Print the contents of an arena's largebins, default to the current thread's arena. """ - print_bins(BinType.LARGE, addr, verbose) + allocator = pwndbg.heap.current + largebins = allocator.largebins(addr) + + if largebins is None: + return + + formatted_bins = format_bin(largebins, verbose) + + print(C.banner('largebins')) + for node in formatted_bins: + print(node) parser = argparse.ArgumentParser() parser.description = "Print the contents of a tcache, default to the current thread's tcache." -parser.add_argument( - "addr", - nargs="?", - type=int, - default=None, - help="The address of the tcache bins." -) -parser.add_argument( - "verbose", - nargs="?", - type=bool, - default=False, - help="Whether to show more details or not." -) +parser.add_argument("addr", nargs="?", type=int, default=None, help="The address of the tcache bins.") +parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Whether to show more details or not.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @@ -493,7 +474,17 @@ parser.add_argument( @pwndbg.commands.OnlyWithTcache def tcachebins(addr=None, verbose=False): """Print the contents of a tcache, default to the current thread's tcache.""" - print_bins(BinType.TCACHE, addr, verbose) + allocator = pwndbg.heap.current + tcachebins = allocator.tcachebins(addr) + + if tcachebins is None: + return + + formatted_bins = format_bin(tcachebins, verbose, offset = allocator.tcache_next_offset) + + print(C.banner('tcachebins')) + for node in formatted_bins: + print(node) parser = argparse.ArgumentParser() @@ -544,28 +535,14 @@ def find_fake_fast(addr, size=None): parser = argparse.ArgumentParser() parser.description = "Visualize chunks on a heap, default to the current arena's active heap." -parser.add_argument( - "count", - nargs="?", - type=lambda n: max(int(n, 0), 1), - default=10, - help="Number of chunks to visualize." -) -parser.add_argument( - "addr", nargs="?", default=None, help="Address of the first chunk." -) -parser.add_argument( - "--naive", - "-n", - action="store_true", - default=False, - help="Attempt to keep printing beyond the top chunk." -) +parser.add_argument("count", nargs="?", type=lambda n:max(int(n, 0),1), default=10, help="Number of chunks to visualize.") +parser.add_argument("addr", nargs="?", default=None, help="Address of the first chunk.") +parser.add_argument("--naive", "-n", action="store_true", default=False, help="Attempt to keep printing beyond the top chunk.") @pwndbg.commands.ArgparsedCommand(parser) @pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWithResolvedHeapSyms @pwndbg.commands.OnlyWhenHeapIsInitialized -def vis_heap_chunks(addr=None, count=None, naive=False): +def vis_heap_chunks(addr=None, count=None, naive=None): """Visualize chunks on a heap, default to the current arena's active heap.""" allocator = pwndbg.heap.current heap_region = allocator.get_heap_boundaries(addr) @@ -576,58 +553,73 @@ def vis_heap_chunks(addr=None, count=None, naive=False): # Build a list of addresses that delimit each chunk. chunk_delims = [] - - # If an address was supplied, start printing from there, otherwise start - # from the first chunk in the heap region if addr: cursor = int(addr) + elif arena == allocator.main_arena: + cursor = heap_region.start else: - heap_region = allocator.get_heap_boundaries(addr) - cursor = allocator.get_first_chunk_in_heap(heap_region.start) + cursor = heap_region.start + allocator.heap_info.sizeof + if pwndbg.vmmap.find(allocator.get_heap(heap_region.start)['ar_ptr']) == heap_region: + # Round up to a 2-machine-word alignment after an arena to + # compensate for the presence of the have_fastchunks variable + # in GLIBC versions >= 2.27. + cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask + + # Check if there is an alignment at the start of the heap, adjust if necessary. + if not addr: + first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size)) + if first_chunk_size == 0: + cursor += ptr_size * 2 cursor_backup = cursor - i = 0 - # TODO: This rewrite probably breaks --naive - # TODO: If we do it like this, we should store the first chunk, not the next one - for cursor in allocator.chunks(cursor): - if i == 0: - i += 1 - continue - - if i >= count: + for _ in range(count + 1): + # Don't read beyond the heap mapping if --naive or corrupted heap. + if cursor not in heap_region: + chunk_delims.append(heap_region.end) break - i += 1 + size_field = pwndbg.memory.u(cursor + ptr_size) + real_size = size_field & ~allocator.malloc_align_mask + prev_inuse = allocator.chunk_flags(size_field)[0] - next_chunk = allocator.next_chunk(cursor) - if cursor == top_chunk: - inuse = False + # Don't repeatedly operate on the same address (e.g. chunk size of 0). + if cursor in chunk_delims or cursor + ptr_size in chunk_delims: + break + + if prev_inuse: + chunk_delims.append(cursor + ptr_size) else: - inuse = allocator.prev_inuse(next_chunk) + chunk_delims.append(cursor) - # TODO: Is this check still necessary? - # Don't read beyond the heap mapping if --naive or corrupted heap. - if cursor not in heap_region: - chunk_delims.append((heap_region.end, inuse)) + if (cursor == top_chunk and not naive) or (cursor == heap_region.end - ptr_size*2): + chunk_delims.append(cursor + ptr_size*2) break - chunk_delims.append((cursor, inuse)) - - # if (cursor == top_chunk - # and not naive) or (cursor == heap_region.end - ptr_size * 2): - # chunk_delims.append(cursor + ptr_size * 2) - # break + cursor += real_size # Build the output buffer, changing color at each chunk delimiter. + # TODO: maybe print free chunks in bold or underlined color_funcs = [ - generateColorFunction('yellow'), - generateColorFunction('cyan'), - generateColorFunction('purple'), - generateColorFunction('green'), - generateColorFunction('blue'), + generateColorFunction("yellow"), + generateColorFunction("cyan"), + generateColorFunction("purple"), + generateColorFunction("green"), + generateColorFunction("blue"), ] + bin_collections = [ + allocator.fastbins(arena.address), + allocator.unsortedbin(arena.address), + allocator.smallbins(arena.address), + allocator.largebins(arena.address), + ] + if allocator.has_tcache(): + # Only check for tcache entries belonging to the current thread, + # it's difficult (impossible?) to find all the thread caches for a + # specific heap. + bin_collections.insert(0, allocator.tcachebins(None)) + printed = 0 out = '' asc = '' @@ -635,13 +627,7 @@ def vis_heap_chunks(addr=None, count=None, naive=False): cursor = cursor_backup - for c, (stop, inuse) in enumerate(chunk_delims): - if inuse: - stop += ptr_size - - # TODO: Are we duplicating work with bin_labels? - bin_type = get_chunk_bin(cursor) - + for c, stop in enumerate(chunk_delims): color_func = color_funcs[c % len(color_funcs)] while cursor != stop: @@ -649,26 +635,18 @@ def vis_heap_chunks(addr=None, count=None, naive=False): out += "\n0x%x" % cursor cell = pwndbg.arch.unpack(pwndbg.memory.read(cursor, ptr_size)) - cell_hex = '\t0x{:0{n}x}'.format(cell, n=ptr_size * 2) + cell_hex = '\t0x{:0{n}x}'.format(cell, n=ptr_size*2) - colored_text = color_func(cell_hex) - if bin_type != BinType.NOT_IN_BIN: - colored_text = underline(colored_text) - out += colored_text + out += color_func(cell_hex) printed += 1 - labels.extend(bin_labels(cursor, bin_type)) + labels.extend(bin_labels(cursor, bin_collections)) if cursor == top_chunk: labels.append('Top chunk') - asc += ''.join( - c if c.isprintable() and c.isascii() else '.' - for c in map(chr, pwndbg.memory.read(cursor, ptr_size)) - ) + asc += bin_ascii(pwndbg.memory.read(cursor, ptr_size)) if printed % 2 == 0: - out += '\t' + color_func(asc) + ( - '\t <-- ' + ', '.join(labels) if len(labels) else '' - ) + out += '\t' + color_func(asc) + ('\t <-- ' + ', '.join(labels) if len(labels) else '') asc = '' labels = [] @@ -677,31 +655,45 @@ def vis_heap_chunks(addr=None, count=None, naive=False): print(out) -def bin_labels(addr, bin_type): - labels = [] - allocator = pwndbg.heap.current - - bins = allocator.get_bins(bin_type) - if bins is None: - return [] +def bin_ascii(bs): + from string import printable + valid_chars = list(map(ord, set(printable) - set('\t\r\n\x0c\x0b'))) + return ''.join(chr(c) if c in valid_chars else '.'for c in bs) - for size, b in bins.bins.items(): - size_str = Bin.size_to_display_name(size) - if b.contains_chunk(addr): - count = '' - if bins.bin_type == BinType.TCACHE: - count = '/{:d}'.format(b.count) +def bin_labels(addr, collections): + labels = [] + for bins in collections: + bins_type = bins.get('type', None) + if not bins_type: + continue - labels.append( - '{:s}[{:s}][{:d}{:s}]'.format( - bins.bin_type.value, size_str, b.fd_chain.index(addr), count - ) - ) + for size in filter(lambda x: x != 'type', bins.keys()): + b = bins[size] + if isinstance(size, int): + size = hex(size) + count = '/{:d}'.format(b[1]) if bins_type == 'tcachebins' else None + chunks = bin_addrs(b, bins_type) + for chunk_addr in chunks: + if addr == chunk_addr: + labels.append('{:s}[{:s}][{:d}{}]'.format(bins_type, size, chunks.index(addr), count or '')) return labels +def bin_addrs(b, bins_type): + addrs = [] + if bins_type == 'fastbins': + return b + # tcachebins consists of single linked list and entries count + elif bins_type == 'tcachebins': + addrs, _ = b + # normal bins consists of double linked list and may be corrupted (we can detect corruption) + else: # normal bin + addrs, _, _ = b + return addrs + + try_free_parser = argparse.ArgumentParser(description='Check what would happen if free was called with given address') try_free_parser.add_argument('addr', nargs='?', help='Address passed to free') @pwndbg.commands.ArgparsedCommand(try_free_parser) @@ -734,7 +726,7 @@ def try_free(addr): ptr_size = pwndbg.arch.ptrsize def unsigned_size(size): - # read_chunk_from_gdb()['size'] is signed in pwndbg ;/ + # read_chunk()['size'] is signed in pwndbg ;/ # there may be better way to handle that if ptr_size < 8: return ctypes.c_uint32(size).value @@ -760,7 +752,7 @@ def try_free(addr): # try to get the chunk try: - chunk = read_chunk_from_gdb(addr) + chunk = read_chunk(addr) except gdb.MemoryError as e: print(message.error('Can\'t read chunk at address 0x{:x}, memory error'.format(addr))) return @@ -843,10 +835,10 @@ def try_free(addr): if chunk_size_unmasked <= allocator.global_max_fast: print(message.notice('Fastbin checks')) chunk_fastbin_idx = allocator.fastbin_index(chunk_size_unmasked) - fastbin_list = allocator.fastbins(int(arena.address)).bins[(chunk_fastbin_idx+2)*(ptr_size*2)] + fastbin_list = allocator.fastbins(int(arena.address))[(chunk_fastbin_idx+2)*(ptr_size*2)] try: - next_chunk = read_chunk_from_gdb(addr + chunk_size_unmasked) + next_chunk = read_chunk(addr + chunk_size_unmasked) except gdb.MemoryError as e: print(message.error('Can\'t read next chunk at address 0x{:x}, memory error'.format(chunk + chunk_size_unmasked))) finalize(errors_found, returned_before_error) @@ -862,7 +854,7 @@ def try_free(addr): errors_found += 1 # chunk is not the same as the one on top of fastbin[idx] - if int(fastbin_list.fd_chain[0]) == addr: + if int(fastbin_list[0]) == addr: err = 'double free or corruption (fasttop) -> chunk already is on top of fastbin list\n' err += ' fastbin idx == {}' err = err.format(chunk_fastbin_idx) @@ -870,10 +862,10 @@ def try_free(addr): errors_found += 1 # chunk's size is ~same as top chunk's size - fastbin_top_chunk = int(fastbin_list.fd_chain[0]) + fastbin_top_chunk = int(fastbin_list[0]) if fastbin_top_chunk != 0: try: - fastbin_top_chunk = read_chunk_from_gdb(fastbin_top_chunk) + fastbin_top_chunk = read_chunk(fastbin_top_chunk) except gdb.MemoryError as e: print(message.error('Can\'t read top fastbin chunk at address 0x{:x}, memory error'.format(fastbin_top_chunk))) finalize(errors_found, returned_before_error) @@ -903,7 +895,7 @@ def try_free(addr): # next chunk is not beyond the boundaries of the arena NONCONTIGUOUS_BIT = 2 top_chunk_addr = (int(arena['top'])) - top_chunk = read_chunk_from_gdb(top_chunk_addr) + top_chunk = read_chunk(top_chunk_addr) next_chunk_addr = addr + chunk_size_unmasked # todo: in libc, addition may overflow @@ -916,7 +908,7 @@ def try_free(addr): # now we need to dereference chunk try : - next_chunk = read_chunk_from_gdb(next_chunk_addr) + next_chunk = read_chunk(next_chunk_addr) next_chunk_size = chunksize(unsigned_size(next_chunk['size'])) except (OverflowError, gdb.MemoryError) as e: print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_chunk_addr))) @@ -946,7 +938,7 @@ def try_free(addr): prev_chunk_addr = addr - prev_size try : - prev_chunk = read_chunk_from_gdb(prev_chunk_addr) + prev_chunk = read_chunk(prev_chunk_addr) prev_chunk_size = chunksize(unsigned_size(prev_chunk['size'])) except (OverflowError, gdb.MemoryError) as e: print(message.error('Can\'t read next chunk at address 0x{:x}'.format(prev_chunk_addr))) @@ -970,7 +962,7 @@ def try_free(addr): print(message.notice('Next chunk is not top chunk')) try : next_next_chunk_addr = next_chunk_addr + next_chunk_size - next_next_chunk = read_chunk_from_gdb(next_next_chunk_addr) + next_next_chunk = read_chunk(next_next_chunk_addr) except (OverflowError, gdb.MemoryError) as e: print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_next_chunk_addr))) finalize(errors_found, returned_before_error) @@ -988,12 +980,12 @@ def try_free(addr): # unsorted bin fd->bk should be unsorted bean unsorted_addr = int(arena['bins'][0]) try: - unsorted = read_chunk_from_gdb(unsorted_addr) + unsorted = read_chunk(unsorted_addr) try: - if read_chunk_from_gdb(unsorted['fd'])['bk'] != unsorted_addr: + if read_chunk(unsorted['fd'])['bk'] != unsorted_addr: err = 'free(): corrupted unsorted chunks -> unsorted_chunk->fd->bk != unsorted_chunk\n' err += 'unsorted at 0x{:x}, unsorted->fd == 0x{:x}, unsorted->fd->bk == 0x{:x}' - err = err.format(unsorted_addr, unsorted['fd'], read_chunk_from_gdb(unsorted['fd'])['bk']) + err = err.format(unsorted_addr, unsorted['fd'], read_chunk(unsorted['fd'])['bk']) print(message.error(err)) errors_found += 1 except (OverflowError, gdb.MemoryError) as e: diff --git a/pwndbg/heap/ptmalloc.py b/pwndbg/heap/ptmalloc.py index bb906ba7e..d0a3411db 100644 --- a/pwndbg/heap/ptmalloc.py +++ b/pwndbg/heap/ptmalloc.py @@ -1,6 +1,5 @@ import importlib from collections import OrderedDict -from enum import Enum from functools import wraps import gdb @@ -24,101 +23,6 @@ from pwndbg.heap import heap_chain_limit HEAP_MAX_SIZE = 1024 * 1024 if pwndbg.arch.ptrsize == 4 else 2 * 4 * 1024 * 1024 * 8 -# Note that we must inherit from `str` before `Enum`: https://stackoverflow.com/a/58608362/803801 -class BinType(str, Enum): - TCACHE = 'tcachebins' - FAST = 'fastbins' - SMALL = 'smallbins' - LARGE = 'largebins' - UNSORTED = 'unsortedbin' - NOT_IN_BIN = 'not_in_bin' - - def valid_fields(self): - if self in [BinType.FAST, BinType.TCACHE]: - return ['fd'] - elif self in [BinType.SMALL, BinType.UNSORTED]: - return ['fd', 'bk'] - elif self == BinType.LARGE: - return ['fd', 'bk', 'fd_nextsize', 'bk_nextsize'] - -class Bin: - def __init__(self, fd_chain, bk_chain=None, count=None, is_corrupted=False): - self.fd_chain = fd_chain - self.bk_chain = bk_chain - self.count = count - self.is_corrupted = is_corrupted - - def contains_chunk(self, chunk): - return chunk in self.fd_chain - - @staticmethod - def size_to_display_name(size): - if size == 'all': - return size - - assert isinstance(size, int) - - return hex(size) - - -class Bins: - def __init__(self, bin_type): - self.bins = OrderedDict() - self.bin_type = bin_type - - # TODO: There's a bunch of bin-specific logic in here, maybe we should - # subclass and put that logic in there - def contains_chunk(self, size, chunk): - # TODO: It will be the same thing, but it would be better if we used - # pwndbg.heap.current.size_sz. I think each bin should already have a - # reference to the allocator and shouldn't need to access the `current` - # variable - ptr_size = pwndbg.arch.ptrsize - - if self.bin_type == BinType.UNSORTED: - # The unsorted bin only has one bin called 'all' - - # TODO: We shouldn't be mixing int and str types like this - size = 'all' - elif self.bin_type == BinType.LARGE: - # All the other bins (other than unsorted) store chunks of the same - # size in a bin, so we can use the size directly. But the largebin - # stores a range of sizes, so we need to compute which bucket this - # chunk falls into - - # TODO: Refactor this, the bin should know how to calculate - # largebin_index without calling into the allocator - size = pwndbg.heap.current.largebin_index(size) - 64 - elif self.bin_type == BinType.TCACHE: - # Unlike fastbins, tcache bins don't store the chunk address in the - # bins, they store the address of the fd pointer, so we need to - # search for that address in the tcache bin instead - - # TODO: Can we use chunk_key_offset? - chunk += ptr_size * 2 - - if size in self.bins: - return self.bins[size].contains_chunk(chunk) - - return False - - -def read_chunk_from_gdb(addr): - """Read a chunk's metadata.""" - # In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`. - # To support both versions, change the new names to the old ones here so that - # the rest of the code can deal with uniform names. - renames = { - "mchunk_size": "size", - "mchunk_prev_size": "prev_size", - } - if not pwndbg.config.resolve_heap_via_heuristic: - val = pwndbg.typeinfo.read_gdbvalue("struct malloc_chunk", addr) - else: - val = pwndbg.heap.current.malloc_chunk(addr) - return dict({ renames.get(key, key): int(val[key]) for key in val.type.keys() }) - - def heap_for_ptr(ptr): """Round a pointer to a chunk down to find its corresponding heap_info struct, the pointer must point inside a heap which does not belong to @@ -384,47 +288,11 @@ class Heap(pwndbg.heap.heap.BaseHeap): def get_heap(self, addr): raise NotImplementedError() - def get_first_chunk_in_heap(self, heap_start=None): - # Find which arena this heap belongs to - if heap_start: - arena = self.get_arena_for_chunk(heap_start) - else: - arena = self.get_arena() - - ptr_size = self.size_sz - - start_addr = heap_start - if arena != self.main_arena: - start_addr += self.heap_info.sizeof - heap_region = self.get_heap_boundaries(heap_start) - if pwndbg.vmmap.find(self.get_heap(heap_start)['ar_ptr']) == heap_region: - # Round up to a 2-machine-word alignment after an arena to - # compensate for the presence of the have_fastchunks variable - # in GLIBC versions >= 2.27. - start_addr += pwndbg.memory.align_down( - self.malloc_state.sizeof + ptr_size, - self.malloc_alignment - ) - - # In glibc 2.26, the malloc_alignment for i386 was hardcoded to 16 (instead - # of 2*sizeof(size_t), which is 8). In order for the data to be aligned to - # 16 bytes, the first chunk now needs to start offset 8 instead of offset 0 - - # TODO: Can we just check if this is 32bit and >= glibc 2.26? This type of - # check is confusing as is, and unnecessary in most cases - first_chunk_size = pwndbg.arch.unpack( - pwndbg.memory.read(start_addr + ptr_size, ptr_size) - ) - if first_chunk_size == 0: - start_addr += ptr_size * 2 - - return start_addr - def get_arena(self, arena_addr=None): raise NotImplementedError() def get_arena_for_chunk(self, addr): - chunk = read_chunk_from_gdb(addr) + chunk = pwndbg.commands.heap.read_chunk(addr) _,_,nm = self.chunk_flags(chunk['size']) if nm: r=self.get_arena(arena_addr=self.get_heap(addr)['ar_ptr']) @@ -442,20 +310,6 @@ class Heap(pwndbg.heap.heap.BaseHeap): """Find the memory map containing 'addr'.""" return pwndbg.vmmap.find(addr) - def get_bins(self, bin_type, addr=None): - if bin_type == BinType.TCACHE: - return self.tcachebins(addr) - elif bin_type == BinType.FAST: - return self.fastbins(addr) - elif bin_type == BinType.UNSORTED: - return self.unsortedbin(addr) - elif bin_type == BinType.SMALL: - return self.smallbins(addr) - elif bin_type == BinType.LARGE: - return self.largebins(addr) - else: - return None - def fastbin_index(self, size): if pwndbg.arch.ptrsize == 8: return (size >> 4) - 2 @@ -464,38 +318,33 @@ class Heap(pwndbg.heap.heap.BaseHeap): def fastbins(self, arena_addr=None): """Returns: chain or None""" - result = Bins(BinType.FAST) arena = self.get_arena(arena_addr) if arena is None: - return result + return - fastbinsY = arena['fastbinsY'] - fd_offset = self.chunk_key_offset('fd') + fastbinsY = arena['fastbinsY'] + fd_offset = self.chunk_key_offset('fd') num_fastbins = 7 - size = pwndbg.arch.ptrsize * 2 + size = pwndbg.arch.ptrsize * 2 safe_lnk = pwndbg.glibc.check_safe_linking() + result = OrderedDict() for i in range(num_fastbins): size += pwndbg.arch.ptrsize * 2 - chain = pwndbg.chain.get( - int(fastbinsY[i]), - offset=fd_offset, - limit=heap_chain_limit, - safe_linking=safe_lnk - ) + chain = pwndbg.chain.get(int(fastbinsY[i]), offset=fd_offset, limit=heap_chain_limit, safe_linking=safe_lnk) - result.bins[size] = Bin(chain) + result[size] = chain + result['type'] = 'fastbins' return result def tcachebins(self, tcache_addr=None): """Returns: tuple(chain, count) or None""" - result = Bins(BinType.TCACHE) tcache = self.get_tcache(tcache_addr) if tcache is None: - return result + return counts = tcache['counts'] entries = tcache['entries'] @@ -507,18 +356,15 @@ class Heap(pwndbg.heap.heap.BaseHeap): """Tcache bin index to chunk size, following tidx2usize macro in glibc malloc.c""" return idx * self.malloc_alignment + self.minsize - self.size_sz + result = OrderedDict() for i in range(num_tcachebins): size = self._request2size(tidx2usize(i)) count = int(counts[i]) - chain = pwndbg.chain.get( - int(entries[i]), - offset=self.tcache_next_offset, - limit=heap_chain_limit, - safe_linking=safe_lnk - ) + chain = pwndbg.chain.get(int(entries[i]), offset=self.tcache_next_offset, limit=heap_chain_limit, safe_linking=safe_lnk) - result.bins[size] = Bin(chain, count=count) + result[size] = (chain, count) + result['type'] = 'tcachebins' return result def bin_at(self, index, arena_addr=None): @@ -541,24 +387,18 @@ class Heap(pwndbg.heap.heap.BaseHeap): return normal_bins = arena['bins'] - num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof + num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof - bins_base = int(normal_bins.address) - (pwndbg.arch.ptrsize * 2) + bins_base = int(normal_bins.address) - (pwndbg.arch.ptrsize* 2) current_base = bins_base + (index * pwndbg.arch.ptrsize * 2) front, back = normal_bins[index * 2], normal_bins[index * 2 + 1] - fd_offset = self.chunk_key_offset('fd') - bk_offset = self.chunk_key_offset('bk') + fd_offset = self.chunk_key_offset('fd') + bk_offset = self.chunk_key_offset('bk') is_chain_corrupted = False - get_chain = lambda bin, offset: pwndbg.chain.get( - int(bin), - offset=offset, - hard_stop=current_base, - limit=heap_chain_limit, - include_start=True - ) + get_chain = lambda bin, offset: pwndbg.chain.get(int(bin), offset=offset, hard_stop=current_base, limit=heap_chain_limit, include_start=True) chain_fd = get_chain(front, fd_offset) chain_bk = get_chain(back, bk_offset) @@ -574,99 +414,51 @@ class Heap(pwndbg.heap.heap.BaseHeap): return (chain_fd, chain_bk, is_chain_corrupted) def unsortedbin(self, arena_addr=None): - result = Bins(BinType.UNSORTED) - chain = self.bin_at(1, arena_addr=arena_addr) + chain = self.bin_at(1, arena_addr=arena_addr) + result = OrderedDict() if chain is None: - return result + return - fd_chain, bk_chain, is_corrupted = chain - result.bins['all'] = Bin(fd_chain, bk_chain, is_corrupted=is_corrupted) + result['all'] = chain + result['type'] = 'unsortedbin' return result def smallbins(self, arena_addr=None): - size = self.min_chunk_size - self.malloc_alignment + size = self.min_chunk_size - self.malloc_alignment spaces_table = self._spaces_table() - result = Bins(BinType.SMALL) + result = OrderedDict() for index in range(2, 64): size += spaces_table[index] chain = self.bin_at(index, arena_addr=arena_addr) if chain is None: - # TODO: Should I return an empty Bins() instead? - return result + return - fd_chain, bk_chain, is_corrupted = chain - result.bins[size] = Bin( - fd_chain, bk_chain, is_corrupted=is_corrupted - ) + result[size] = chain + result['type'] = 'smallbins' return result def largebins(self, arena_addr=None): - size = ( - ptmalloc.NSMALLBINS * self.malloc_alignment - ) - self.malloc_alignment + size = (ptmalloc.NSMALLBINS * self.malloc_alignment) - self.malloc_alignment spaces_table = self._spaces_table() - result = Bins(BinType.LARGE) + result = OrderedDict() for index in range(64, 127): size += spaces_table[index] chain = self.bin_at(index, arena_addr=arena_addr) if chain is None: - # TODO: Should I return an empty Bins() instead? - return result + return - fd_chain, bk_chain, is_corrupted = chain - result.bins[size] = Bin( - fd_chain, bk_chain, is_corrupted=is_corrupted - ) + result[size] = chain + result['type'] = 'largebins' return result - - def chunks(self, addr): - # TODO: Add assertions to verify alignment? Maybe write a decorator - # that enforces that an argument is aligned - - heap_region = self.get_heap_boundaries(addr) - arena = self.get_arena_for_chunk(addr) - top_chunk = arena['top'] - - chunk = addr - while chunk in heap_region: - yield chunk - - if chunk == top_chunk: - break - - old_chunk = chunk - chunk = self.next_chunk(chunk) - - # Avoid an infinite loop when a chunk's size is 0. - if old_chunk == chunk: - break - - def chunk_size_nomask(self, addr): - return pwndbg.memory.u(addr + self.chunk_key_offset('size')) - - def chunk_size(self, addr): - # TODO: Check if this breaks on 32-bit glibc versions >= 2.26, where - # only the data is aligned and not the chunk address - return pwndbg.memory.align_down( - self.chunk_size_nomask(addr), self.malloc_alignment - ) - - def next_chunk(self, addr): - return addr + self.chunk_size(addr) - - def prev_inuse(self, addr): - prev_inuse, _, _ = self.chunk_flags(self.chunk_size(addr)) - return prev_inuse == ptmalloc.PREV_INUSE - def largebin_index_32(self, sz): """Modeled on the GLIBC malloc largebin_index_32 macro. diff --git a/tests/test_heap_bins.py b/tests/test_heap_bins.py index a635ba127..6ea00bc25 100644 --- a/tests/test_heap_bins.py +++ b/tests/test_heap_bins.py @@ -5,7 +5,6 @@ import pwndbg.memory import pwndbg.symbol import pwndbg.vmmap import tests -from pwndbg.heap.ptmalloc import BinType BINARY = tests.binaries.get('heap_bins.out') @@ -40,94 +39,94 @@ def test_heap_bins(start_binary): largebin_count = pwndbg.memory.u64(addr) result = allocator.tcachebins() - assert result.bin_type == BinType.TCACHE - assert tcache_size in result.bins - assert result.bins[tcache_size].count == 0 and len(result.bins[tcache_size].fd_chain) == 1 + assert result['type'] == 'tcachebins' + assert tcache_size in result + assert result[tcache_size][1] == 0 and len(result[tcache_size][0]) == 1 result = allocator.fastbins() - assert result.bin_type == BinType.FAST - assert fastbin_size in result.bins - assert len(result.bins[fastbin_size].fd_chain) == 1 + assert result['type'] == 'fastbins' + assert fastbin_size in result + assert len(result[fastbin_size]) == 1 result = allocator.unsortedbin() - assert result.bin_type == BinType.UNSORTED - assert len(result.bins['all'].fd_chain) == 1 - assert not result.bins['all'].is_corrupted + assert result['type'] == 'unsortedbin' + assert len(result['all'][0]) == 1 + assert not result['all'][2] result = allocator.smallbins() - assert result.bin_type == BinType.SMALL - assert smallbin_size in result.bins - assert len(result.bins[smallbin_size].fd_chain) == 1 and len(result.bins[smallbin_size].bk_chain) == 1 - assert not result.bins[smallbin_size].is_corrupted + assert result['type'] == 'smallbins' + assert smallbin_size in result + assert len(result[smallbin_size][0]) == 1 and len(result[smallbin_size][1]) == 1 + assert not result[smallbin_size][2] result = allocator.largebins() - assert result.bin_type == BinType.LARGE - largebin_size = list(result.bins.items())[allocator.largebin_index(largebin_size) - 64][0] - assert largebin_size in result.bins - assert len(result.bins[largebin_size].fd_chain) == 1 and len(result.bins[largebin_size].bk_chain) == 1 - assert not result.bins[largebin_size].is_corrupted + assert result['type'] == 'largebins' + largebin_size = list(result.items())[allocator.largebin_index(largebin_size) - 64][0] + assert largebin_size in result + assert len(result[largebin_size][0]) == 1 and len(result[largebin_size][1]) == 1 + assert not result[largebin_size][2] # check tcache gdb.execute('continue') result = allocator.tcachebins() - assert result.bin_type == BinType.TCACHE - assert tcache_size in result.bins - assert result.bins[tcache_size].count == tcache_count and len(result.bins[tcache_size].fd_chain) == tcache_count + 1 - for addr in result.bins[tcache_size].fd_chain[:-1]: + assert result['type'] == 'tcachebins' + assert tcache_size in result + assert result[tcache_size][1] == tcache_count and len(result[tcache_size][0]) == tcache_count + 1 + for addr in result[tcache_size][0][:-1]: assert pwndbg.vmmap.find(addr) # check fastbin gdb.execute('continue') result = allocator.fastbins() - assert result.bin_type == BinType.FAST - assert (fastbin_size in result.bins) and (len(result.bins[fastbin_size].fd_chain) == fastbin_count + 1) - for addr in result.bins[fastbin_size].fd_chain[:-1]: + assert result['type'] == 'fastbins' + assert (fastbin_size in result) and (len(result[fastbin_size]) == fastbin_count + 1) + for addr in result[fastbin_size][:-1]: assert pwndbg.vmmap.find(addr) # check unsortedbin gdb.execute('continue') result = allocator.unsortedbin() - assert result.bin_type == BinType.UNSORTED - assert len(result.bins['all'].fd_chain) == smallbin_count + 2 and len(result.bins['all'].bk_chain) == smallbin_count + 2 - assert not result.bins['all'].is_corrupted - for addr in result.bins['all'].fd_chain[:-1]: + assert result['type'] == 'unsortedbin' + assert len(result['all'][0]) == smallbin_count + 2 and len(result['all'][1]) == smallbin_count + 2 + assert not result['all'][2] + for addr in result['all'][0][:-1]: assert pwndbg.vmmap.find(addr) - for addr in result.bins['all'].bk_chain[:-1]: + for addr in result['all'][1][:-1]: assert pwndbg.vmmap.find(addr) # check smallbins gdb.execute('continue') result = allocator.smallbins() - assert result.bin_type == BinType.SMALL - assert len(result.bins[smallbin_size].fd_chain) == smallbin_count + 2 and len(result.bins[smallbin_size].bk_chain) == smallbin_count + 2 - assert not result.bins[smallbin_size].is_corrupted - for addr in result.bins[smallbin_size].fd_chain[:-1]: + assert result['type'] == 'smallbins' + assert len(result[smallbin_size][0]) == smallbin_count + 2 and len(result[smallbin_size][1]) == smallbin_count + 2 + assert not result[smallbin_size][2] + for addr in result[smallbin_size][0][:-1]: assert pwndbg.vmmap.find(addr) - for addr in result.bins[smallbin_size].bk_chain[:-1]: + for addr in result[smallbin_size][1][:-1]: assert pwndbg.vmmap.find(addr) # check largebins gdb.execute('continue') result = allocator.largebins() - assert result.bin_type == BinType.LARGE - assert len(result.bins[largebin_size].fd_chain) == largebin_count + 2 and len(result.bins[largebin_size].bk_chain) == largebin_count + 2 - assert not result.bins[largebin_size].is_corrupted - for addr in result.bins[largebin_size].fd_chain[:-1]: + assert result['type'] == 'largebins' + assert len(result[largebin_size][0]) == largebin_count + 2 and len(result[largebin_size][1]) == largebin_count + 2 + assert not result[largebin_size][2] + for addr in result[largebin_size][0][:-1]: assert pwndbg.vmmap.find(addr) - for addr in result.bins[largebin_size].bk_chain[:-1]: + for addr in result[largebin_size][1][:-1]: assert pwndbg.vmmap.find(addr) # check corrupted gdb.execute('continue') result = allocator.smallbins() - assert result.bin_type == BinType.SMALL - assert result.bins[smallbin_size].is_corrupted + assert result['type'] == 'smallbins' + assert result[smallbin_size][2] result = allocator.largebins() - assert result.bin_type == BinType.LARGE - assert result.bins[largebin_size].is_corrupted + assert result['type'] == 'largebins' + assert result[largebin_size][2]