Revert "Refactor heap code (#1063)" (#1084)

This reverts commit a0f3744743.
pull/1086/head
Disconnect3d 3 years ago committed by GitHub
parent a0f3744743
commit b51b07de77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,14 +12,25 @@ import pwndbg.glibc
import pwndbg.typeinfo
from pwndbg.color import generateColorFunction
from pwndbg.color import message
from pwndbg.color import underline
from pwndbg.commands.config import extend_value_with_default
from pwndbg.commands.config import get_config_parameters
from pwndbg.commands.config import print_row
from pwndbg.heap.ptmalloc import Bin
from pwndbg.heap.ptmalloc import Bins
from pwndbg.heap.ptmalloc import BinType
from pwndbg.heap.ptmalloc import read_chunk_from_gdb
def read_chunk(addr):
"""Read a chunk's metadata."""
# In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`.
# To support both versions, change the new names to the old ones here so that
# the rest of the code can deal with uniform names.
renames = {
"mchunk_size": "size",
"mchunk_prev_size": "prev_size",
}
if not pwndbg.config.resolve_heap_via_heuristic:
val = pwndbg.typeinfo.read_gdbvalue("struct malloc_chunk", addr)
else:
val = pwndbg.heap.current.malloc_chunk(addr)
return dict({ renames.get(key, key): int(val[key]) for key in val.type.keys() })
def format_bin(bins, verbose=False, offset=None):
@ -28,44 +39,49 @@ def format_bin(bins, verbose=False, offset=None):
offset = allocator.chunk_key_offset('fd')
result = []
bins_type = bins.bin_type
for size, b in bins.bins.items():
if not verbose and (
b.fd_chain == [0] and not b.count
) and not b.is_corrupted:
continue
bins_type = bins.pop('type')
# TODO: Abstract this away
for size in bins:
b = bins[size]
count, is_chain_corrupted = None, False
safe_lnk = False
if bins_type in [BinType.FAST, BinType.TCACHE]:
# fastbins consists of only single linked list
if bins_type == 'fastbins':
chain_fd = b
safe_lnk = pwndbg.glibc.check_safe_linking()
# tcachebins consists of single linked list and entries count
elif bins_type == 'tcachebins':
chain_fd, count = b
safe_lnk = pwndbg.glibc.check_safe_linking()
# normal bins consists of double linked list and may be corrupted (we can detect corruption)
else: # normal bin
chain_fd, chain_bk, is_chain_corrupted = b
if not verbose and (chain_fd == [0] and not count) and not is_chain_corrupted:
continue
if bins_type == BinType.TCACHE:
limit = min(8, b.count + 1)
if bins_type == 'tcachebins':
limit = 8
if count <= 7:
limit = count + 1
formatted_chain = pwndbg.chain.format(chain_fd[0], offset=offset, limit=limit, safe_linking=safe_lnk)
else:
limit = pwndbg.chain.LIMIT
formatted_chain = pwndbg.chain.format(chain_fd[0], offset=offset, safe_linking=safe_lnk)
formatted_chain = pwndbg.chain.format(
b.fd_chain[0], limit=limit, offset=offset, safe_linking=safe_lnk
)
size_str = Bin.size_to_display_name(size)
if isinstance(size, int):
size = hex(size)
if b.is_corrupted:
line = message.hint(size_str) + message.error(' [corrupted]') + '\n'
if is_chain_corrupted:
line = message.hint(size) + message.error(' [corrupted]') + '\n'
line += message.hint('FD: ') + formatted_chain + '\n'
line += message.hint('BK: ') + pwndbg.chain.format(
b.bk_chain[0], offset=allocator.chunk_key_offset('bk')
)
line += message.hint('BK: ') + pwndbg.chain.format(chain_bk[0], offset=allocator.chunk_key_offset('bk'))
else:
line = message.hint(size_str)
if b.count is not None:
line += message.hint(' [%3d]' % b.count)
line += ': '
line.ljust(13)
if count is not None:
line = (message.hint(size) + message.hint(' [%3d]' % count) + ': ').ljust(13)
else:
line = (message.hint(size) + ': ').ljust(13)
line += formatted_chain
result.append(line)
@ -78,26 +94,9 @@ def format_bin(bins, verbose=False, offset=None):
parser = argparse.ArgumentParser()
parser.description = "Iteratively print chunks on a heap, default to the current thread's active heap."
parser.add_argument(
"addr",
nargs="?",
type=int,
default=None,
help=
"Address of the first chunk (malloc_chunk struct start, prev_size field)."
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Print all chunk fields, even unused ones."
)
parser.add_argument(
"-s",
"--simple",
action="store_true",
help="Simply print malloc_chunk struct's contents."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the first chunk (malloc_chunk struct start, prev_size field).")
parser.add_argument("-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones.")
parser.add_argument("-s", "--simple", action="store_true", help="Simply print malloc_chunk struct's contents.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -107,23 +106,49 @@ def heap(addr=None, verbose=False, simple=False):
active heap.
"""
allocator = pwndbg.heap.current
heap_region = allocator.get_heap_boundaries(addr)
arena = allocator.get_arena_for_chunk(addr) if addr else allocator.get_arena()
top_chunk = arena['top']
ptr_size = allocator.size_sz
# Store the heap base address in a GDB variable that can be used in other
# GDB commands
gdb.execute('set $heap_base=0x{:x}'.format(heap_region.start))
# If an address was supplied, start printing from there, otherwise start
# from the first chunk in the heap region
# Calculate where to start printing; if an address was supplied, use that,
# if this heap belongs to the main arena, start at the beginning of the
# heap's mapping, otherwise, compensate for the presence of a heap_info
# struct and possibly an arena.
if addr:
cursor = int(addr)
elif arena == allocator.main_arena:
cursor = heap_region.start
else:
heap_region = allocator.get_heap_boundaries(addr)
cursor = allocator.get_first_chunk_in_heap(heap_region.start)
cursor = heap_region.start + allocator.heap_info.sizeof
if pwndbg.vmmap.find(allocator.get_heap(heap_region.start)['ar_ptr']) == heap_region:
# Round up to a 2-machine-word alignment after an arena to
# compensate for the presence of the have_fastchunks variable
# in GLIBC versions >= 2.27.
cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask
# i686 alignment heuristic
first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size))
if first_chunk_size == 0:
cursor += ptr_size * 2
# Store the heap base address in a GDB variable that can be used in other
# GDB commands
# TODO: See https://github.com/pwndbg/pwndbg/issues/1060
gdb.execute('set $heap_base=0x{:x}'.format(heap_region.start))
while cursor in heap_region:
malloc_chunk(cursor, verbose=verbose, simple=simple)
if cursor == top_chunk:
break
size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size'))
real_size = size_field & ~allocator.malloc_align_mask
cursor += real_size
for chunk in allocator.chunks(cursor):
malloc_chunk(chunk, verbose=verbose, simple=simple)
# Avoid an infinite loop when a chunk's size is 0.
if real_size == 0:
break
parser = argparse.ArgumentParser()
@ -184,9 +209,7 @@ def mp():
parser = argparse.ArgumentParser()
parser.description = "Print relevant information about an arena's top chunk, default to current thread's arena."
parser.add_argument(
"addr", nargs="?", type=int, default=None, help="Address of the arena."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -198,68 +221,18 @@ def top_chunk(addr=None):
allocator = pwndbg.heap.current
arena = allocator.get_arena(addr)
address = arena['top']
size = allocator.chunk_size_nomask(int(address))
size = pwndbg.memory.u(int(address) + allocator.chunk_key_offset('size'))
out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format(
M.get(address), size
)
out = message.off("Top chunk\n") + "Addr: {}\nSize: 0x{:02x}".format(M.get(address), size)
print(out)
def get_chunk_bin(addr):
# points to the real start of the chunk
cursor = int(addr)
allocator = pwndbg.heap.current
size = allocator.chunk_size(cursor)
arena = allocator.get_arena_for_chunk(addr)
bins = [
allocator.fastbins(arena.address),
allocator.smallbins(arena.address),
allocator.largebins(arena.address),
allocator.unsortedbin(arena.address),
]
if allocator.has_tcache():
bins.append(allocator.tcachebins(None))
# TODO: What if we somehow got this chunk into a bin of a different size? We
# would miss it with this logic. Should we check every bin?
res = []
for bin_ in bins:
if bin_.contains_chunk(size, cursor):
res.append(bin_.bin_type)
if len(res) == 0:
return [BinType.NOT_IN_BIN]
else:
return res
parser = argparse.ArgumentParser()
parser.description = "Print a chunk."
parser.add_argument(
"addr",
type=int,
help="Address of the chunk (malloc_chunk struct start, prev_size field)."
)
parser.add_argument(
"-f", "--fake", action="store_true", help="Is this a fake chunk?"
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Print all chunk fields, even unused ones."
)
parser.add_argument(
"-s",
"--simple",
action="store_true",
help="Simply print malloc_chunk struct's contents."
)
parser.add_argument("addr", type=int, help="Address of the chunk (malloc_chunk struct start, prev_size field).")
parser.add_argument("-f", "--fake", action="store_true", help="Is this a fake chunk?")
parser.add_argument("-v", "--verbose", action="store_true", help="Print all chunk fields, even unused ones.")
parser.add_argument("-s", "--simple", action="store_true", help="Simply print malloc_chunk struct's contents.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -270,34 +243,36 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False):
cursor = int(addr)
allocator = pwndbg.heap.current
ptr_size = allocator.size_sz
size_field = allocator.chunk_size_nomask(cursor)
real_size = allocator.chunk_size(cursor)
size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size'))
real_size = size_field & ~allocator.malloc_align_mask
headers_to_print = [] # both state (free/allocated) and flags
fields_to_print = set() # in addition to addr and size
prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(size_field)
if prev_inuse:
headers_to_print.append(message.hint('PREV_INUSE'))
if is_mmapped:
headers_to_print.append(message.hint('IS_MMAPED'))
if non_main_arena:
headers_to_print.append(message.hint('NON_MAIN_ARENA'))
out_fields = "Addr: {}\n".format(M.get(cursor))
if fake:
headers_to_print.append(message.on('Fake chunk'))
headers_to_print.append(message.on("Fake chunk"))
verbose = True # print all fields for fake chunks
if simple:
chunk = read_chunk_from_gdb(cursor)
chunk = read_chunk(cursor)
if not headers_to_print:
headers_to_print.append(message.hint(M.get(cursor)))
# The address should be the first header
headers_to_print.insert(0, message.hint(M.get(cursor)))
prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(int(chunk['size']))
if prev_inuse:
headers_to_print.append(message.hint('PREV_INUSE'))
if is_mmapped:
headers_to_print.append(message.hint('IS_MMAPED'))
if non_main_arena:
headers_to_print.append(message.hint('NON_MAIN_ARENA'))
print(' | '.join(headers_to_print))
for key, val in chunk.items():
print(message.system(key) + ': 0x{:02x}'.format(int(val)))
print(message.system(key) + ": 0x{:02x}".format(int(val)))
print('')
return
@ -308,55 +283,70 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False):
arena_address = arena.address
top_chunk = arena['top']
if cursor == top_chunk:
headers_to_print.append(message.off('Top chunk'))
headers_to_print.append(message.off("Top chunk"))
is_top = True
if not is_top:
bin_types = get_chunk_bin(cursor)
if BinType.NOT_IN_BIN in bin_types:
headers_to_print.append(message.hint('Allocated chunk'))
fastbins = allocator.fastbins(arena_address) or {}
smallbins = allocator.smallbins(arena_address) or {}
largebins = allocator.largebins(arena_address) or {}
unsortedbin = allocator.unsortedbin(arena_address) or {}
if allocator.has_tcache():
tcachebins = allocator.tcachebins(None)
if real_size in fastbins.keys() and cursor in fastbins[real_size]:
headers_to_print.append(message.on("Free chunk (fastbins)"))
if not verbose:
fields_to_print.add('fd')
elif real_size in smallbins.keys() and cursor in bin_addrs(smallbins[real_size], "smallbins"):
headers_to_print.append(message.on("Free chunk (smallbins)"))
if not verbose:
fields_to_print.update(['fd', 'bk'])
elif real_size >= list(largebins.items())[0][0] and cursor in bin_addrs(largebins[(list(largebins.items())[allocator.largebin_index(real_size) - 64][0])], "largebins"):
headers_to_print.append(message.on("Free chunk (largebins)"))
if not verbose:
fields_to_print.update(['fd', 'bk', 'fd_nextsize', 'bk_nextsize'])
elif cursor in bin_addrs(unsortedbin['all'], "unsortedbin"):
headers_to_print.append(message.on("Free chunk (unsortedbin)"))
if not verbose:
fields_to_print.update(['fd', 'bk'])
elif allocator.has_tcache() and real_size in tcachebins.keys() and cursor + ptr_size*2 in bin_addrs(tcachebins[real_size], "tcachebins"):
headers_to_print.append(message.on("Free chunk (tcache)"))
if not verbose:
fields_to_print.add('fd')
else:
# TODO: Handle a chunk being in multiple bins
bin_type = bin_types[0]
headers_to_print.append(
message.on('Free chunk ({})'.format('|'.join(bin_types)))
)
for bin_type in bin_types:
fields_to_print.update(bin_type.valid_fields())
out_fields = ['Addr: {}'.format(M.get(cursor))]
fields_ordered = [
'prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize'
]
headers_to_print.append(message.hint("Allocated chunk"))
if verbose:
fields_to_print.update(fields_ordered)
fields_to_print.update(['prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize'])
else:
out_fields.append('Size: 0x{:02x}'.format(size_field))
out_fields += "Size: 0x{:02x}\n".format(size_field)
prev_inuse, is_mmapped, non_main_arena = allocator.chunk_flags(size_field)
if prev_inuse:
headers_to_print.append(message.hint('PREV_INUSE'))
if is_mmapped:
headers_to_print.append(message.hint('IS_MMAPED'))
if non_main_arena:
headers_to_print.append(message.hint('NON_MAIN_ARENA'))
print(' | '.join(headers_to_print))
fields_ordered = ['prev_size', 'size', 'fd', 'bk', 'fd_nextsize', 'bk_nextsize']
for field_to_print in fields_ordered:
if field_to_print in fields_to_print:
field_val = pwndbg.memory.u(cursor + allocator.chunk_key_offset(field_to_print))
out_fields.append(message.system(field_to_print) + ': 0x{:02x}'.format(field_val))
out_fields += message.system(field_to_print) + ": 0x{:02x}\n".format(pwndbg.memory.u(cursor + allocator.chunk_key_offset(field_to_print)))
print('\n'.join(out_fields))
print('')
print(' | '.join(headers_to_print) + "\n" + out_fields)
parser = argparse.ArgumentParser()
parser.description = "Print the contents of all an arena's bins and a thread's tcache, default to the current thread's arena and tcache."
parser.add_argument(
"addr", nargs="?", type=int, default=None, help="Address of the arena."
)
parser.add_argument(
"tcache_addr",
nargs="?",
type=int,
default=None,
help="Address of the tcache."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
parser.add_argument("tcache_addr", nargs="?", type=int, default=None, help="Address of the tcache.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -373,35 +363,10 @@ def bins(addr=None, tcache_addr=None):
largebins(addr)
def print_bins(bin_type, addr=None, verbose=False):
allocator = pwndbg.heap.current
offset = None
# TODO: Abstract this away
if bin_type == BinType.TCACHE:
offset = allocator.tcache_next_offset
else:
offset = None
bins = allocator.get_bins(bin_type, addr=addr)
if bins is None:
return
formatted_bins = format_bin(bins, verbose, offset=offset)
print(C.banner(bin_type.value))
for node in formatted_bins:
print(node)
parser = argparse.ArgumentParser()
parser.description = "Print the contents of an arena's fastbins, default to the current thread's arena."
parser.add_argument(
"addr", nargs="?", type=int, default=None, help="Address of the arena."
)
parser.add_argument(
"verbose", nargs="?", type=bool, default=True, help="Show extra detail."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
parser.add_argument("verbose", nargs="?", type=bool, default=True, help="Show extra detail.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -410,17 +375,23 @@ def fastbins(addr=None, verbose=True):
"""Print the contents of an arena's fastbins, default to the current
thread's arena.
"""
print_bins(BinType.FAST, addr, verbose)
allocator = pwndbg.heap.current
fastbins = allocator.fastbins(addr)
if fastbins is None:
return
formatted_bins = format_bin(fastbins, verbose)
print(C.banner('fastbins'))
for node in formatted_bins:
print(node)
parser = argparse.ArgumentParser()
parser.description = "Print the contents of an arena's unsortedbin, default to the current thread's arena."
parser.add_argument(
"addr", nargs="?", type=int, default=None, help="Address of the arena."
)
parser.add_argument(
"verbose", nargs="?", type=bool, default=True, help="Show extra detail."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
parser.add_argument("verbose", nargs="?", type=bool, default=True, help="Show extra detail.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -429,17 +400,23 @@ def unsortedbin(addr=None, verbose=True):
"""Print the contents of an arena's unsortedbin, default to the current
thread's arena.
"""
print_bins(BinType.UNSORTED, addr, verbose)
allocator = pwndbg.heap.current
unsortedbin = allocator.unsortedbin(addr)
if unsortedbin is None:
return
formatted_bins = format_bin(unsortedbin, verbose)
print(C.banner('unsortedbin'))
for node in formatted_bins:
print(node)
parser = argparse.ArgumentParser()
parser.description = "Print the contents of an arena's smallbins, default to the current thread's arena."
parser.add_argument(
"addr", nargs="?", type=int, default=None, help="Address of the arena."
)
parser.add_argument(
"verbose", nargs="?", type=bool, default=False, help="Show extra detail."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Show extra detail.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -448,17 +425,23 @@ def smallbins(addr=None, verbose=False):
"""Print the contents of an arena's smallbins, default to the current
thread's arena.
"""
print_bins(BinType.SMALL, addr, verbose)
allocator = pwndbg.heap.current
smallbins = allocator.smallbins(addr)
if smallbins is None:
return
formatted_bins = format_bin(smallbins, verbose)
print(C.banner('smallbins'))
for node in formatted_bins:
print(node)
parser = argparse.ArgumentParser()
parser.description = "Print the contents of an arena's largebins, default to the current thread's arena."
parser.add_argument(
"addr", nargs="?", type=int, default=None, help="Address of the arena."
)
parser.add_argument(
"verbose", nargs="?", type=bool, default=False, help="Show extra detail."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="Address of the arena.")
parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Show extra detail.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -467,25 +450,23 @@ def largebins(addr=None, verbose=False):
"""Print the contents of an arena's largebins, default to the current
thread's arena.
"""
print_bins(BinType.LARGE, addr, verbose)
allocator = pwndbg.heap.current
largebins = allocator.largebins(addr)
if largebins is None:
return
formatted_bins = format_bin(largebins, verbose)
print(C.banner('largebins'))
for node in formatted_bins:
print(node)
parser = argparse.ArgumentParser()
parser.description = "Print the contents of a tcache, default to the current thread's tcache."
parser.add_argument(
"addr",
nargs="?",
type=int,
default=None,
help="The address of the tcache bins."
)
parser.add_argument(
"verbose",
nargs="?",
type=bool,
default=False,
help="Whether to show more details or not."
)
parser.add_argument("addr", nargs="?", type=int, default=None, help="The address of the tcache bins.")
parser.add_argument("verbose", nargs="?", type=bool, default=False, help="Whether to show more details or not.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@ -493,7 +474,17 @@ parser.add_argument(
@pwndbg.commands.OnlyWithTcache
def tcachebins(addr=None, verbose=False):
"""Print the contents of a tcache, default to the current thread's tcache."""
print_bins(BinType.TCACHE, addr, verbose)
allocator = pwndbg.heap.current
tcachebins = allocator.tcachebins(addr)
if tcachebins is None:
return
formatted_bins = format_bin(tcachebins, verbose, offset = allocator.tcache_next_offset)
print(C.banner('tcachebins'))
for node in formatted_bins:
print(node)
parser = argparse.ArgumentParser()
@ -544,28 +535,14 @@ def find_fake_fast(addr, size=None):
parser = argparse.ArgumentParser()
parser.description = "Visualize chunks on a heap, default to the current arena's active heap."
parser.add_argument(
"count",
nargs="?",
type=lambda n: max(int(n, 0), 1),
default=10,
help="Number of chunks to visualize."
)
parser.add_argument(
"addr", nargs="?", default=None, help="Address of the first chunk."
)
parser.add_argument(
"--naive",
"-n",
action="store_true",
default=False,
help="Attempt to keep printing beyond the top chunk."
)
parser.add_argument("count", nargs="?", type=lambda n:max(int(n, 0),1), default=10, help="Number of chunks to visualize.")
parser.add_argument("addr", nargs="?", default=None, help="Address of the first chunk.")
parser.add_argument("--naive", "-n", action="store_true", default=False, help="Attempt to keep printing beyond the top chunk.")
@pwndbg.commands.ArgparsedCommand(parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWithResolvedHeapSyms
@pwndbg.commands.OnlyWhenHeapIsInitialized
def vis_heap_chunks(addr=None, count=None, naive=False):
def vis_heap_chunks(addr=None, count=None, naive=None):
"""Visualize chunks on a heap, default to the current arena's active heap."""
allocator = pwndbg.heap.current
heap_region = allocator.get_heap_boundaries(addr)
@ -576,58 +553,73 @@ def vis_heap_chunks(addr=None, count=None, naive=False):
# Build a list of addresses that delimit each chunk.
chunk_delims = []
# If an address was supplied, start printing from there, otherwise start
# from the first chunk in the heap region
if addr:
cursor = int(addr)
elif arena == allocator.main_arena:
cursor = heap_region.start
else:
heap_region = allocator.get_heap_boundaries(addr)
cursor = allocator.get_first_chunk_in_heap(heap_region.start)
cursor = heap_region.start + allocator.heap_info.sizeof
if pwndbg.vmmap.find(allocator.get_heap(heap_region.start)['ar_ptr']) == heap_region:
# Round up to a 2-machine-word alignment after an arena to
# compensate for the presence of the have_fastchunks variable
# in GLIBC versions >= 2.27.
cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask
# Check if there is an alignment at the start of the heap, adjust if necessary.
if not addr:
first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size))
if first_chunk_size == 0:
cursor += ptr_size * 2
cursor_backup = cursor
i = 0
# TODO: This rewrite probably breaks --naive
# TODO: If we do it like this, we should store the first chunk, not the next one
for cursor in allocator.chunks(cursor):
if i == 0:
i += 1
continue
if i >= count:
for _ in range(count + 1):
# Don't read beyond the heap mapping if --naive or corrupted heap.
if cursor not in heap_region:
chunk_delims.append(heap_region.end)
break
i += 1
size_field = pwndbg.memory.u(cursor + ptr_size)
real_size = size_field & ~allocator.malloc_align_mask
prev_inuse = allocator.chunk_flags(size_field)[0]
next_chunk = allocator.next_chunk(cursor)
if cursor == top_chunk:
inuse = False
# Don't repeatedly operate on the same address (e.g. chunk size of 0).
if cursor in chunk_delims or cursor + ptr_size in chunk_delims:
break
if prev_inuse:
chunk_delims.append(cursor + ptr_size)
else:
inuse = allocator.prev_inuse(next_chunk)
chunk_delims.append(cursor)
# TODO: Is this check still necessary?
# Don't read beyond the heap mapping if --naive or corrupted heap.
if cursor not in heap_region:
chunk_delims.append((heap_region.end, inuse))
if (cursor == top_chunk and not naive) or (cursor == heap_region.end - ptr_size*2):
chunk_delims.append(cursor + ptr_size*2)
break
chunk_delims.append((cursor, inuse))
# if (cursor == top_chunk
# and not naive) or (cursor == heap_region.end - ptr_size * 2):
# chunk_delims.append(cursor + ptr_size * 2)
# break
cursor += real_size
# Build the output buffer, changing color at each chunk delimiter.
# TODO: maybe print free chunks in bold or underlined
color_funcs = [
generateColorFunction('yellow'),
generateColorFunction('cyan'),
generateColorFunction('purple'),
generateColorFunction('green'),
generateColorFunction('blue'),
generateColorFunction("yellow"),
generateColorFunction("cyan"),
generateColorFunction("purple"),
generateColorFunction("green"),
generateColorFunction("blue"),
]
bin_collections = [
allocator.fastbins(arena.address),
allocator.unsortedbin(arena.address),
allocator.smallbins(arena.address),
allocator.largebins(arena.address),
]
if allocator.has_tcache():
# Only check for tcache entries belonging to the current thread,
# it's difficult (impossible?) to find all the thread caches for a
# specific heap.
bin_collections.insert(0, allocator.tcachebins(None))
printed = 0
out = ''
asc = ''
@ -635,13 +627,7 @@ def vis_heap_chunks(addr=None, count=None, naive=False):
cursor = cursor_backup
for c, (stop, inuse) in enumerate(chunk_delims):
if inuse:
stop += ptr_size
# TODO: Are we duplicating work with bin_labels?
bin_type = get_chunk_bin(cursor)
for c, stop in enumerate(chunk_delims):
color_func = color_funcs[c % len(color_funcs)]
while cursor != stop:
@ -649,26 +635,18 @@ def vis_heap_chunks(addr=None, count=None, naive=False):
out += "\n0x%x" % cursor
cell = pwndbg.arch.unpack(pwndbg.memory.read(cursor, ptr_size))
cell_hex = '\t0x{:0{n}x}'.format(cell, n=ptr_size * 2)
cell_hex = '\t0x{:0{n}x}'.format(cell, n=ptr_size*2)
colored_text = color_func(cell_hex)
if bin_type != BinType.NOT_IN_BIN:
colored_text = underline(colored_text)
out += colored_text
out += color_func(cell_hex)
printed += 1
labels.extend(bin_labels(cursor, bin_type))
labels.extend(bin_labels(cursor, bin_collections))
if cursor == top_chunk:
labels.append('Top chunk')
asc += ''.join(
c if c.isprintable() and c.isascii() else '.'
for c in map(chr, pwndbg.memory.read(cursor, ptr_size))
)
asc += bin_ascii(pwndbg.memory.read(cursor, ptr_size))
if printed % 2 == 0:
out += '\t' + color_func(asc) + (
'\t <-- ' + ', '.join(labels) if len(labels) else ''
)
out += '\t' + color_func(asc) + ('\t <-- ' + ', '.join(labels) if len(labels) else '')
asc = ''
labels = []
@ -677,31 +655,45 @@ def vis_heap_chunks(addr=None, count=None, naive=False):
print(out)
def bin_labels(addr, bin_type):
labels = []
allocator = pwndbg.heap.current
bins = allocator.get_bins(bin_type)
if bins is None:
return []
def bin_ascii(bs):
from string import printable
valid_chars = list(map(ord, set(printable) - set('\t\r\n\x0c\x0b')))
return ''.join(chr(c) if c in valid_chars else '.'for c in bs)
for size, b in bins.bins.items():
size_str = Bin.size_to_display_name(size)
if b.contains_chunk(addr):
count = ''
if bins.bin_type == BinType.TCACHE:
count = '/{:d}'.format(b.count)
def bin_labels(addr, collections):
labels = []
for bins in collections:
bins_type = bins.get('type', None)
if not bins_type:
continue
labels.append(
'{:s}[{:s}][{:d}{:s}]'.format(
bins.bin_type.value, size_str, b.fd_chain.index(addr), count
)
)
for size in filter(lambda x: x != 'type', bins.keys()):
b = bins[size]
if isinstance(size, int):
size = hex(size)
count = '/{:d}'.format(b[1]) if bins_type == 'tcachebins' else None
chunks = bin_addrs(b, bins_type)
for chunk_addr in chunks:
if addr == chunk_addr:
labels.append('{:s}[{:s}][{:d}{}]'.format(bins_type, size, chunks.index(addr), count or ''))
return labels
def bin_addrs(b, bins_type):
addrs = []
if bins_type == 'fastbins':
return b
# tcachebins consists of single linked list and entries count
elif bins_type == 'tcachebins':
addrs, _ = b
# normal bins consists of double linked list and may be corrupted (we can detect corruption)
else: # normal bin
addrs, _, _ = b
return addrs
try_free_parser = argparse.ArgumentParser(description='Check what would happen if free was called with given address')
try_free_parser.add_argument('addr', nargs='?', help='Address passed to free')
@pwndbg.commands.ArgparsedCommand(try_free_parser)
@ -734,7 +726,7 @@ def try_free(addr):
ptr_size = pwndbg.arch.ptrsize
def unsigned_size(size):
# read_chunk_from_gdb()['size'] is signed in pwndbg ;/
# read_chunk()['size'] is signed in pwndbg ;/
# there may be better way to handle that
if ptr_size < 8:
return ctypes.c_uint32(size).value
@ -760,7 +752,7 @@ def try_free(addr):
# try to get the chunk
try:
chunk = read_chunk_from_gdb(addr)
chunk = read_chunk(addr)
except gdb.MemoryError as e:
print(message.error('Can\'t read chunk at address 0x{:x}, memory error'.format(addr)))
return
@ -843,10 +835,10 @@ def try_free(addr):
if chunk_size_unmasked <= allocator.global_max_fast:
print(message.notice('Fastbin checks'))
chunk_fastbin_idx = allocator.fastbin_index(chunk_size_unmasked)
fastbin_list = allocator.fastbins(int(arena.address)).bins[(chunk_fastbin_idx+2)*(ptr_size*2)]
fastbin_list = allocator.fastbins(int(arena.address))[(chunk_fastbin_idx+2)*(ptr_size*2)]
try:
next_chunk = read_chunk_from_gdb(addr + chunk_size_unmasked)
next_chunk = read_chunk(addr + chunk_size_unmasked)
except gdb.MemoryError as e:
print(message.error('Can\'t read next chunk at address 0x{:x}, memory error'.format(chunk + chunk_size_unmasked)))
finalize(errors_found, returned_before_error)
@ -862,7 +854,7 @@ def try_free(addr):
errors_found += 1
# chunk is not the same as the one on top of fastbin[idx]
if int(fastbin_list.fd_chain[0]) == addr:
if int(fastbin_list[0]) == addr:
err = 'double free or corruption (fasttop) -> chunk already is on top of fastbin list\n'
err += ' fastbin idx == {}'
err = err.format(chunk_fastbin_idx)
@ -870,10 +862,10 @@ def try_free(addr):
errors_found += 1
# chunk's size is ~same as top chunk's size
fastbin_top_chunk = int(fastbin_list.fd_chain[0])
fastbin_top_chunk = int(fastbin_list[0])
if fastbin_top_chunk != 0:
try:
fastbin_top_chunk = read_chunk_from_gdb(fastbin_top_chunk)
fastbin_top_chunk = read_chunk(fastbin_top_chunk)
except gdb.MemoryError as e:
print(message.error('Can\'t read top fastbin chunk at address 0x{:x}, memory error'.format(fastbin_top_chunk)))
finalize(errors_found, returned_before_error)
@ -903,7 +895,7 @@ def try_free(addr):
# next chunk is not beyond the boundaries of the arena
NONCONTIGUOUS_BIT = 2
top_chunk_addr = (int(arena['top']))
top_chunk = read_chunk_from_gdb(top_chunk_addr)
top_chunk = read_chunk(top_chunk_addr)
next_chunk_addr = addr + chunk_size_unmasked
# todo: in libc, addition may overflow
@ -916,7 +908,7 @@ def try_free(addr):
# now we need to dereference chunk
try :
next_chunk = read_chunk_from_gdb(next_chunk_addr)
next_chunk = read_chunk(next_chunk_addr)
next_chunk_size = chunksize(unsigned_size(next_chunk['size']))
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_chunk_addr)))
@ -946,7 +938,7 @@ def try_free(addr):
prev_chunk_addr = addr - prev_size
try :
prev_chunk = read_chunk_from_gdb(prev_chunk_addr)
prev_chunk = read_chunk(prev_chunk_addr)
prev_chunk_size = chunksize(unsigned_size(prev_chunk['size']))
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read next chunk at address 0x{:x}'.format(prev_chunk_addr)))
@ -970,7 +962,7 @@ def try_free(addr):
print(message.notice('Next chunk is not top chunk'))
try :
next_next_chunk_addr = next_chunk_addr + next_chunk_size
next_next_chunk = read_chunk_from_gdb(next_next_chunk_addr)
next_next_chunk = read_chunk(next_next_chunk_addr)
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_next_chunk_addr)))
finalize(errors_found, returned_before_error)
@ -988,12 +980,12 @@ def try_free(addr):
# unsorted bin fd->bk should be unsorted bean
unsorted_addr = int(arena['bins'][0])
try:
unsorted = read_chunk_from_gdb(unsorted_addr)
unsorted = read_chunk(unsorted_addr)
try:
if read_chunk_from_gdb(unsorted['fd'])['bk'] != unsorted_addr:
if read_chunk(unsorted['fd'])['bk'] != unsorted_addr:
err = 'free(): corrupted unsorted chunks -> unsorted_chunk->fd->bk != unsorted_chunk\n'
err += 'unsorted at 0x{:x}, unsorted->fd == 0x{:x}, unsorted->fd->bk == 0x{:x}'
err = err.format(unsorted_addr, unsorted['fd'], read_chunk_from_gdb(unsorted['fd'])['bk'])
err = err.format(unsorted_addr, unsorted['fd'], read_chunk(unsorted['fd'])['bk'])
print(message.error(err))
errors_found += 1
except (OverflowError, gdb.MemoryError) as e:

@ -1,6 +1,5 @@
import importlib
from collections import OrderedDict
from enum import Enum
from functools import wraps
import gdb
@ -24,101 +23,6 @@ from pwndbg.heap import heap_chain_limit
HEAP_MAX_SIZE = 1024 * 1024 if pwndbg.arch.ptrsize == 4 else 2 * 4 * 1024 * 1024 * 8
# Note that we must inherit from `str` before `Enum`: https://stackoverflow.com/a/58608362/803801
class BinType(str, Enum):
TCACHE = 'tcachebins'
FAST = 'fastbins'
SMALL = 'smallbins'
LARGE = 'largebins'
UNSORTED = 'unsortedbin'
NOT_IN_BIN = 'not_in_bin'
def valid_fields(self):
if self in [BinType.FAST, BinType.TCACHE]:
return ['fd']
elif self in [BinType.SMALL, BinType.UNSORTED]:
return ['fd', 'bk']
elif self == BinType.LARGE:
return ['fd', 'bk', 'fd_nextsize', 'bk_nextsize']
class Bin:
def __init__(self, fd_chain, bk_chain=None, count=None, is_corrupted=False):
self.fd_chain = fd_chain
self.bk_chain = bk_chain
self.count = count
self.is_corrupted = is_corrupted
def contains_chunk(self, chunk):
return chunk in self.fd_chain
@staticmethod
def size_to_display_name(size):
if size == 'all':
return size
assert isinstance(size, int)
return hex(size)
class Bins:
def __init__(self, bin_type):
self.bins = OrderedDict()
self.bin_type = bin_type
# TODO: There's a bunch of bin-specific logic in here, maybe we should
# subclass and put that logic in there
def contains_chunk(self, size, chunk):
# TODO: It will be the same thing, but it would be better if we used
# pwndbg.heap.current.size_sz. I think each bin should already have a
# reference to the allocator and shouldn't need to access the `current`
# variable
ptr_size = pwndbg.arch.ptrsize
if self.bin_type == BinType.UNSORTED:
# The unsorted bin only has one bin called 'all'
# TODO: We shouldn't be mixing int and str types like this
size = 'all'
elif self.bin_type == BinType.LARGE:
# All the other bins (other than unsorted) store chunks of the same
# size in a bin, so we can use the size directly. But the largebin
# stores a range of sizes, so we need to compute which bucket this
# chunk falls into
# TODO: Refactor this, the bin should know how to calculate
# largebin_index without calling into the allocator
size = pwndbg.heap.current.largebin_index(size) - 64
elif self.bin_type == BinType.TCACHE:
# Unlike fastbins, tcache bins don't store the chunk address in the
# bins, they store the address of the fd pointer, so we need to
# search for that address in the tcache bin instead
# TODO: Can we use chunk_key_offset?
chunk += ptr_size * 2
if size in self.bins:
return self.bins[size].contains_chunk(chunk)
return False
def read_chunk_from_gdb(addr):
"""Read a chunk's metadata."""
# In GLIBC versions <= 2.24 the `mchunk_[prev_]size` field was named `[prev_]size`.
# To support both versions, change the new names to the old ones here so that
# the rest of the code can deal with uniform names.
renames = {
"mchunk_size": "size",
"mchunk_prev_size": "prev_size",
}
if not pwndbg.config.resolve_heap_via_heuristic:
val = pwndbg.typeinfo.read_gdbvalue("struct malloc_chunk", addr)
else:
val = pwndbg.heap.current.malloc_chunk(addr)
return dict({ renames.get(key, key): int(val[key]) for key in val.type.keys() })
def heap_for_ptr(ptr):
"""Round a pointer to a chunk down to find its corresponding heap_info
struct, the pointer must point inside a heap which does not belong to
@ -384,47 +288,11 @@ class Heap(pwndbg.heap.heap.BaseHeap):
def get_heap(self, addr):
raise NotImplementedError()
def get_first_chunk_in_heap(self, heap_start=None):
# Find which arena this heap belongs to
if heap_start:
arena = self.get_arena_for_chunk(heap_start)
else:
arena = self.get_arena()
ptr_size = self.size_sz
start_addr = heap_start
if arena != self.main_arena:
start_addr += self.heap_info.sizeof
heap_region = self.get_heap_boundaries(heap_start)
if pwndbg.vmmap.find(self.get_heap(heap_start)['ar_ptr']) == heap_region:
# Round up to a 2-machine-word alignment after an arena to
# compensate for the presence of the have_fastchunks variable
# in GLIBC versions >= 2.27.
start_addr += pwndbg.memory.align_down(
self.malloc_state.sizeof + ptr_size,
self.malloc_alignment
)
# In glibc 2.26, the malloc_alignment for i386 was hardcoded to 16 (instead
# of 2*sizeof(size_t), which is 8). In order for the data to be aligned to
# 16 bytes, the first chunk now needs to start offset 8 instead of offset 0
# TODO: Can we just check if this is 32bit and >= glibc 2.26? This type of
# check is confusing as is, and unnecessary in most cases
first_chunk_size = pwndbg.arch.unpack(
pwndbg.memory.read(start_addr + ptr_size, ptr_size)
)
if first_chunk_size == 0:
start_addr += ptr_size * 2
return start_addr
def get_arena(self, arena_addr=None):
raise NotImplementedError()
def get_arena_for_chunk(self, addr):
chunk = read_chunk_from_gdb(addr)
chunk = pwndbg.commands.heap.read_chunk(addr)
_,_,nm = self.chunk_flags(chunk['size'])
if nm:
r=self.get_arena(arena_addr=self.get_heap(addr)['ar_ptr'])
@ -442,20 +310,6 @@ class Heap(pwndbg.heap.heap.BaseHeap):
"""Find the memory map containing 'addr'."""
return pwndbg.vmmap.find(addr)
def get_bins(self, bin_type, addr=None):
if bin_type == BinType.TCACHE:
return self.tcachebins(addr)
elif bin_type == BinType.FAST:
return self.fastbins(addr)
elif bin_type == BinType.UNSORTED:
return self.unsortedbin(addr)
elif bin_type == BinType.SMALL:
return self.smallbins(addr)
elif bin_type == BinType.LARGE:
return self.largebins(addr)
else:
return None
def fastbin_index(self, size):
if pwndbg.arch.ptrsize == 8:
return (size >> 4) - 2
@ -464,38 +318,33 @@ class Heap(pwndbg.heap.heap.BaseHeap):
def fastbins(self, arena_addr=None):
"""Returns: chain or None"""
result = Bins(BinType.FAST)
arena = self.get_arena(arena_addr)
if arena is None:
return result
return
fastbinsY = arena['fastbinsY']
fd_offset = self.chunk_key_offset('fd')
fastbinsY = arena['fastbinsY']
fd_offset = self.chunk_key_offset('fd')
num_fastbins = 7
size = pwndbg.arch.ptrsize * 2
size = pwndbg.arch.ptrsize * 2
safe_lnk = pwndbg.glibc.check_safe_linking()
result = OrderedDict()
for i in range(num_fastbins):
size += pwndbg.arch.ptrsize * 2
chain = pwndbg.chain.get(
int(fastbinsY[i]),
offset=fd_offset,
limit=heap_chain_limit,
safe_linking=safe_lnk
)
chain = pwndbg.chain.get(int(fastbinsY[i]), offset=fd_offset, limit=heap_chain_limit, safe_linking=safe_lnk)
result.bins[size] = Bin(chain)
result[size] = chain
result['type'] = 'fastbins'
return result
def tcachebins(self, tcache_addr=None):
"""Returns: tuple(chain, count) or None"""
result = Bins(BinType.TCACHE)
tcache = self.get_tcache(tcache_addr)
if tcache is None:
return result
return
counts = tcache['counts']
entries = tcache['entries']
@ -507,18 +356,15 @@ class Heap(pwndbg.heap.heap.BaseHeap):
"""Tcache bin index to chunk size, following tidx2usize macro in glibc malloc.c"""
return idx * self.malloc_alignment + self.minsize - self.size_sz
result = OrderedDict()
for i in range(num_tcachebins):
size = self._request2size(tidx2usize(i))
count = int(counts[i])
chain = pwndbg.chain.get(
int(entries[i]),
offset=self.tcache_next_offset,
limit=heap_chain_limit,
safe_linking=safe_lnk
)
chain = pwndbg.chain.get(int(entries[i]), offset=self.tcache_next_offset, limit=heap_chain_limit, safe_linking=safe_lnk)
result.bins[size] = Bin(chain, count=count)
result[size] = (chain, count)
result['type'] = 'tcachebins'
return result
def bin_at(self, index, arena_addr=None):
@ -541,24 +387,18 @@ class Heap(pwndbg.heap.heap.BaseHeap):
return
normal_bins = arena['bins']
num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof
num_bins = normal_bins.type.sizeof // normal_bins.type.target().sizeof
bins_base = int(normal_bins.address) - (pwndbg.arch.ptrsize * 2)
bins_base = int(normal_bins.address) - (pwndbg.arch.ptrsize* 2)
current_base = bins_base + (index * pwndbg.arch.ptrsize * 2)
front, back = normal_bins[index * 2], normal_bins[index * 2 + 1]
fd_offset = self.chunk_key_offset('fd')
bk_offset = self.chunk_key_offset('bk')
fd_offset = self.chunk_key_offset('fd')
bk_offset = self.chunk_key_offset('bk')
is_chain_corrupted = False
get_chain = lambda bin, offset: pwndbg.chain.get(
int(bin),
offset=offset,
hard_stop=current_base,
limit=heap_chain_limit,
include_start=True
)
get_chain = lambda bin, offset: pwndbg.chain.get(int(bin), offset=offset, hard_stop=current_base, limit=heap_chain_limit, include_start=True)
chain_fd = get_chain(front, fd_offset)
chain_bk = get_chain(back, bk_offset)
@ -574,99 +414,51 @@ class Heap(pwndbg.heap.heap.BaseHeap):
return (chain_fd, chain_bk, is_chain_corrupted)
def unsortedbin(self, arena_addr=None):
result = Bins(BinType.UNSORTED)
chain = self.bin_at(1, arena_addr=arena_addr)
chain = self.bin_at(1, arena_addr=arena_addr)
result = OrderedDict()
if chain is None:
return result
return
fd_chain, bk_chain, is_corrupted = chain
result.bins['all'] = Bin(fd_chain, bk_chain, is_corrupted=is_corrupted)
result['all'] = chain
result['type'] = 'unsortedbin'
return result
def smallbins(self, arena_addr=None):
size = self.min_chunk_size - self.malloc_alignment
size = self.min_chunk_size - self.malloc_alignment
spaces_table = self._spaces_table()
result = Bins(BinType.SMALL)
result = OrderedDict()
for index in range(2, 64):
size += spaces_table[index]
chain = self.bin_at(index, arena_addr=arena_addr)
if chain is None:
# TODO: Should I return an empty Bins() instead?
return result
return
fd_chain, bk_chain, is_corrupted = chain
result.bins[size] = Bin(
fd_chain, bk_chain, is_corrupted=is_corrupted
)
result[size] = chain
result['type'] = 'smallbins'
return result
def largebins(self, arena_addr=None):
size = (
ptmalloc.NSMALLBINS * self.malloc_alignment
) - self.malloc_alignment
size = (ptmalloc.NSMALLBINS * self.malloc_alignment) - self.malloc_alignment
spaces_table = self._spaces_table()
result = Bins(BinType.LARGE)
result = OrderedDict()
for index in range(64, 127):
size += spaces_table[index]
chain = self.bin_at(index, arena_addr=arena_addr)
if chain is None:
# TODO: Should I return an empty Bins() instead?
return result
return
fd_chain, bk_chain, is_corrupted = chain
result.bins[size] = Bin(
fd_chain, bk_chain, is_corrupted=is_corrupted
)
result[size] = chain
result['type'] = 'largebins'
return result
def chunks(self, addr):
# TODO: Add assertions to verify alignment? Maybe write a decorator
# that enforces that an argument is aligned
heap_region = self.get_heap_boundaries(addr)
arena = self.get_arena_for_chunk(addr)
top_chunk = arena['top']
chunk = addr
while chunk in heap_region:
yield chunk
if chunk == top_chunk:
break
old_chunk = chunk
chunk = self.next_chunk(chunk)
# Avoid an infinite loop when a chunk's size is 0.
if old_chunk == chunk:
break
def chunk_size_nomask(self, addr):
return pwndbg.memory.u(addr + self.chunk_key_offset('size'))
def chunk_size(self, addr):
# TODO: Check if this breaks on 32-bit glibc versions >= 2.26, where
# only the data is aligned and not the chunk address
return pwndbg.memory.align_down(
self.chunk_size_nomask(addr), self.malloc_alignment
)
def next_chunk(self, addr):
return addr + self.chunk_size(addr)
def prev_inuse(self, addr):
prev_inuse, _, _ = self.chunk_flags(self.chunk_size(addr))
return prev_inuse == ptmalloc.PREV_INUSE
def largebin_index_32(self, sz):
"""Modeled on the GLIBC malloc largebin_index_32 macro.

@ -5,7 +5,6 @@ import pwndbg.memory
import pwndbg.symbol
import pwndbg.vmmap
import tests
from pwndbg.heap.ptmalloc import BinType
BINARY = tests.binaries.get('heap_bins.out')
@ -40,94 +39,94 @@ def test_heap_bins(start_binary):
largebin_count = pwndbg.memory.u64(addr)
result = allocator.tcachebins()
assert result.bin_type == BinType.TCACHE
assert tcache_size in result.bins
assert result.bins[tcache_size].count == 0 and len(result.bins[tcache_size].fd_chain) == 1
assert result['type'] == 'tcachebins'
assert tcache_size in result
assert result[tcache_size][1] == 0 and len(result[tcache_size][0]) == 1
result = allocator.fastbins()
assert result.bin_type == BinType.FAST
assert fastbin_size in result.bins
assert len(result.bins[fastbin_size].fd_chain) == 1
assert result['type'] == 'fastbins'
assert fastbin_size in result
assert len(result[fastbin_size]) == 1
result = allocator.unsortedbin()
assert result.bin_type == BinType.UNSORTED
assert len(result.bins['all'].fd_chain) == 1
assert not result.bins['all'].is_corrupted
assert result['type'] == 'unsortedbin'
assert len(result['all'][0]) == 1
assert not result['all'][2]
result = allocator.smallbins()
assert result.bin_type == BinType.SMALL
assert smallbin_size in result.bins
assert len(result.bins[smallbin_size].fd_chain) == 1 and len(result.bins[smallbin_size].bk_chain) == 1
assert not result.bins[smallbin_size].is_corrupted
assert result['type'] == 'smallbins'
assert smallbin_size in result
assert len(result[smallbin_size][0]) == 1 and len(result[smallbin_size][1]) == 1
assert not result[smallbin_size][2]
result = allocator.largebins()
assert result.bin_type == BinType.LARGE
largebin_size = list(result.bins.items())[allocator.largebin_index(largebin_size) - 64][0]
assert largebin_size in result.bins
assert len(result.bins[largebin_size].fd_chain) == 1 and len(result.bins[largebin_size].bk_chain) == 1
assert not result.bins[largebin_size].is_corrupted
assert result['type'] == 'largebins'
largebin_size = list(result.items())[allocator.largebin_index(largebin_size) - 64][0]
assert largebin_size in result
assert len(result[largebin_size][0]) == 1 and len(result[largebin_size][1]) == 1
assert not result[largebin_size][2]
# check tcache
gdb.execute('continue')
result = allocator.tcachebins()
assert result.bin_type == BinType.TCACHE
assert tcache_size in result.bins
assert result.bins[tcache_size].count == tcache_count and len(result.bins[tcache_size].fd_chain) == tcache_count + 1
for addr in result.bins[tcache_size].fd_chain[:-1]:
assert result['type'] == 'tcachebins'
assert tcache_size in result
assert result[tcache_size][1] == tcache_count and len(result[tcache_size][0]) == tcache_count + 1
for addr in result[tcache_size][0][:-1]:
assert pwndbg.vmmap.find(addr)
# check fastbin
gdb.execute('continue')
result = allocator.fastbins()
assert result.bin_type == BinType.FAST
assert (fastbin_size in result.bins) and (len(result.bins[fastbin_size].fd_chain) == fastbin_count + 1)
for addr in result.bins[fastbin_size].fd_chain[:-1]:
assert result['type'] == 'fastbins'
assert (fastbin_size in result) and (len(result[fastbin_size]) == fastbin_count + 1)
for addr in result[fastbin_size][:-1]:
assert pwndbg.vmmap.find(addr)
# check unsortedbin
gdb.execute('continue')
result = allocator.unsortedbin()
assert result.bin_type == BinType.UNSORTED
assert len(result.bins['all'].fd_chain) == smallbin_count + 2 and len(result.bins['all'].bk_chain) == smallbin_count + 2
assert not result.bins['all'].is_corrupted
for addr in result.bins['all'].fd_chain[:-1]:
assert result['type'] == 'unsortedbin'
assert len(result['all'][0]) == smallbin_count + 2 and len(result['all'][1]) == smallbin_count + 2
assert not result['all'][2]
for addr in result['all'][0][:-1]:
assert pwndbg.vmmap.find(addr)
for addr in result.bins['all'].bk_chain[:-1]:
for addr in result['all'][1][:-1]:
assert pwndbg.vmmap.find(addr)
# check smallbins
gdb.execute('continue')
result = allocator.smallbins()
assert result.bin_type == BinType.SMALL
assert len(result.bins[smallbin_size].fd_chain) == smallbin_count + 2 and len(result.bins[smallbin_size].bk_chain) == smallbin_count + 2
assert not result.bins[smallbin_size].is_corrupted
for addr in result.bins[smallbin_size].fd_chain[:-1]:
assert result['type'] == 'smallbins'
assert len(result[smallbin_size][0]) == smallbin_count + 2 and len(result[smallbin_size][1]) == smallbin_count + 2
assert not result[smallbin_size][2]
for addr in result[smallbin_size][0][:-1]:
assert pwndbg.vmmap.find(addr)
for addr in result.bins[smallbin_size].bk_chain[:-1]:
for addr in result[smallbin_size][1][:-1]:
assert pwndbg.vmmap.find(addr)
# check largebins
gdb.execute('continue')
result = allocator.largebins()
assert result.bin_type == BinType.LARGE
assert len(result.bins[largebin_size].fd_chain) == largebin_count + 2 and len(result.bins[largebin_size].bk_chain) == largebin_count + 2
assert not result.bins[largebin_size].is_corrupted
for addr in result.bins[largebin_size].fd_chain[:-1]:
assert result['type'] == 'largebins'
assert len(result[largebin_size][0]) == largebin_count + 2 and len(result[largebin_size][1]) == largebin_count + 2
assert not result[largebin_size][2]
for addr in result[largebin_size][0][:-1]:
assert pwndbg.vmmap.find(addr)
for addr in result.bins[largebin_size].bk_chain[:-1]:
for addr in result[largebin_size][1][:-1]:
assert pwndbg.vmmap.find(addr)
# check corrupted
gdb.execute('continue')
result = allocator.smallbins()
assert result.bin_type == BinType.SMALL
assert result.bins[smallbin_size].is_corrupted
assert result['type'] == 'smallbins'
assert result[smallbin_size][2]
result = allocator.largebins()
assert result.bin_type == BinType.LARGE
assert result.bins[largebin_size].is_corrupted
assert result['type'] == 'largebins'
assert result[largebin_size][2]

Loading…
Cancel
Save