mallocng: implement inspection of non-allocated slots (#3152)

* allow ng-find to return free/avail slots

also pulled out Mallocng.find_slot which returns the Slot instead of just the address

* better slot_state calculation, print it with ng-slot*

* propagate group/meta info when going through find

* add GroupedSlot to propagate find_slot information

* show GroupedSlot data to user when local reading fails

* gate the search behind a config option

* sanely handle insane slots

* use alt values for some meta fields

* get rid of set_padding

* add extra line for visual clarity

* autogen docs

* fix extra alignment
pull/3120/head^2
k4lizen 5 months ago committed by GitHub
parent 63812e0043
commit 032ba5fb96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -71,6 +71,21 @@ The address of mp_.
----------
## **ng-search-on-fail**
Let the ng-slot* commands search the heap if necessary.
For freed, avail(able) and corrupted slots, it may be
impossible to recover the start of the group and meta.
When this option is set to True, the ng-slotu and ng-slots
commands will search the heap to try to find the correct meta/group.
**Default:** on
----------
## **resolve-heap-via-heuristic**
=== "GDB"

@ -5,6 +5,7 @@ https://elixir.bootlin.com/musl/v1.2.5/source/src/malloc/mallocng
from __future__ import annotations
from enum import Enum
from typing import List
from typing import Optional
from typing import Tuple
@ -38,6 +39,13 @@ size_classes: List[int] = [
# fmt: on
class SlotState(Enum):
ALLOCATED = "allocated"
FREED = "freed"
# Available - this slot has not yet been allocated.
AVAIL = "available"
# Shorthand
def int_size():
return pwndbg.aglib.typeinfo.sint.sizeof
@ -167,6 +175,7 @@ class Slot:
self._reserved: int = None
self._group: Group = None
self._meta: Meta = None
self._slot_state: SlotState = None
def preload(self) -> None:
"""
@ -195,12 +204,10 @@ class Slot:
self._pn3 = pheader[5]
# ==
# Read the group's meta pointer.
_ = self.meta
# To calculate footer and p header fields
# To calculate footer and start header fields
# we need self.meta.stride. However we want to be able to
# return some information even if the meta is corrupt, so
# return some information even if the meta is corrupt or
# unreachable (e.g. this slot is freed or avail), so
# we won't load that here.
# Other fields are calculated without memory reads.
@ -445,7 +452,7 @@ class Slot:
# https://elixir.bootlin.com/musl/v1.2.5/source/src/malloc/mallocng/meta.h#L199
return (self.meta.stride - self.nominal_size - IB) // UNIT
# non-local..
# custom..
@property
def group(self) -> Group:
@ -467,6 +474,37 @@ class Slot:
return self._meta
@property
def slot_state(self) -> SlotState:
if self._slot_state is None:
# The actual "source of truth" for slot allocation state is
# self.meta.slotstate_at_index() however we can only resolve
# the meta if the state is ALLOCATED.
# We will do a heuristic check that should be good in most cases.
meta_says: SlotState = None
try:
meta_says = self.meta.slotstate_at_index(self.idx)
except pwndbg.dbg_mod.Error:
# We can't reach the meta. Either the slot is not allocated
# or it is allocated but the meta pointer is corrupted.
meta_says = None
if meta_says is not None:
self._slot_state = meta_says
else:
# When a slot is freed, its p[-3] gets set to 0xFF so the
# offset to group start (and by extension, meta) is unrecoverable.
# We will check for this, although musl only ever sets this
# and never uses this as a source of truth.
if self.pn3 == 0xFF:
# https://elixir.bootlin.com/musl/v1.2.5/source/src/malloc/mallocng/free.c#L112
self._slot_state = SlotState.FREED
else:
self._slot_state = SlotState.AVAIL
return self._slot_state
# checks..
def is_cyclic(self) -> int:
@ -486,6 +524,17 @@ class Slot:
# https://elixir.bootlin.com/musl/v1.2.5/source/src/malloc/mallocng/malloc.c#L269
return self.reserved_in_header == 6
# external setters..
def set_group(self, group: Group) -> None:
"""
If the slot is FREED or AVAIL, it is impossible for it to
recover the start of its group, and ergo its meta.
You can thus use this to set it externally.
"""
self._group = group
# constructors..
@classmethod
@ -503,19 +552,51 @@ class Slot:
obj = cls(p)
obj._sn3 = sn3
else:
# freed / avail slots will also go into this branch.
p = start
obj = cls(p)
obj._sn3 = obj._pn3 = sn3
# FIXME: Not good if the slot is corrupted and we can't
# access the meta.
assert obj.start == start
obj._start = start
return obj
class GroupedSlot:
"""
This is *not* a mallocng concept, this is a pwndbg abstraction.
A Slot object uses its inband metadata to recover all its fields and
uncover more information about itself by locating its group and meta.
It works essentially the same way mallocng's free() works.
However, if a slot is freed or available, most of its in-band metadata
will be invalid and it will not be able to recover group and meta. But,
given the start of the slot, we can infer which group it belongs to and
what its index is by walking allocator state i.e. ctx i.e. by using
Mallocng.find_slot().
A GroupedSlot then describes all information we can glean about a slot
which is described by a (group, idx) pair. Many of its fields can be
completely different from a Slot at the same location. They are guaranteed
to be the same only if the slot is ALLOCATED and hasn't been corrupted.
Not all fields that are available in Slot are available in GroupedSlot.
Make sure the group you are passing to the constructor points to a valid meta
object.
"""
def __init__(self, group: Group, idx: int) -> None:
self.group = group
self.meta = self.group.meta
self.idx = idx
self.stride = self.meta.stride
self.slot_state = self.meta.slotstate_at_index(self.idx)
self.start = self.group.storage + self.meta.stride * self.idx
self.end = self.start + self.stride - IB
class Meta:
"""
The metadata of a group.
@ -760,6 +841,15 @@ class Meta:
"""
return not self.is_donated and not self.is_mmaped
def slotstate_at_index(self, idx: int) -> SlotState:
me = 1 << idx
if self.freed_mask & me:
return SlotState.FREED
elif self.avail_mask & me:
return SlotState.AVAIL
else:
return SlotState.ALLOCATED
@staticmethod
def sizeof():
return 2 * int_size() + 4 * pwndbg.aglib.arch.ptrsize
@ -1088,10 +1178,11 @@ class Mallocng(pwndbg.aglib.heap.heap.MemoryAllocator):
def libc_has_debug_syms(self) -> bool:
return self.has_debug_syms
@override
def containing(self, address: int, metadata: bool = False, shallow: bool = False) -> int:
def find_slot(
self, address: int, metadata: bool = False, shallow: bool = False
) -> Tuple[Optional[GroupedSlot], Optional[Slot]]:
"""
Get the `start` of a slot which contains this address.
Get the slot which contains this address.
We say a slot "contains" an address, if the address is in
[start, start + stride). Thus, this will match the previous
@ -1103,6 +1194,8 @@ class Mallocng(pwndbg.aglib.heap.heap.MemoryAllocator):
If `shallow` is True, return the first slot hit without trying
to look for nested groups.
Returns (None, None) if nothing is found.
"""
hit_group: Optional[Group] = None
@ -1117,7 +1210,7 @@ class Mallocng(pwndbg.aglib.heap.heap.MemoryAllocator):
f"Mallocng.containing: Could not read meta_area ({e}), returning early."
)
)
return 0
return (None, None)
# Iterate over all metas in the meta_area.
for i in range(meta_area.nslots):
@ -1151,9 +1244,12 @@ class Mallocng(pwndbg.aglib.heap.heap.MemoryAllocator):
meta_area_addr = meta_area.next
if hit_group is None:
return 0
return (None, None)
# Need to read memory for the .contains_group() check.
hit_slot: Optional[Slot] = None
# Contains extra information.
hit_grouped_slot: Optional[GroupedSlot] = None
metadata_offset = IB if metadata else 0
@ -1169,20 +1265,28 @@ class Mallocng(pwndbg.aglib.heap.heap.MemoryAllocator):
if hit_slot is not None:
# If we are already in some slot, just return
# that slot since we can't look any deeper.
return hit_slot.start
return hit_grouped_slot, hit_slot
else:
# We are in no slot.
# We could return *some* information to the callee
# but alas, let's be technically correct.
return 0
return (None, None)
# Calculate the correct inner slot.
slot_idx = (address - valid_start) // hit_group.meta.stride
hit_slot = Slot.from_start(hit_group.at_index(slot_idx))
hit_grouped_slot = GroupedSlot(hit_group, slot_idx)
hit_slot = Slot.from_start(hit_grouped_slot.start)
# If the slot is not allocated, we know that we for sure can't
# recurse deeper.
if hit_grouped_slot.slot_state != SlotState.ALLOCATED:
return hit_grouped_slot, hit_slot
# Maybe there is a group inside this slot!
hit_group = Group(hit_slot.p)
return hit_slot.start
return hit_grouped_slot, hit_slot
except pwndbg.dbg_mod.Error as e:
print(
@ -1191,10 +1295,20 @@ class Mallocng(pwndbg.aglib.heap.heap.MemoryAllocator):
f" nested groups: {e}.\nReturning last valid slot."
)
)
if hit_slot is None:
return 0
else:
return hit_slot.start
# Could be None.
return hit_grouped_slot, hit_slot
@override
def containing(self, address: int, metadata: bool = False, shallow: bool = False) -> int:
"""
Same as find_slot() but returns only the `start` address of the slot, or zero
if no slot is found.
"""
found, _ = self.find_slot(address, metadata, shallow)
if found is None:
return 0
else:
return found.start
mallocng = Mallocng()

@ -5,6 +5,7 @@ Commands that help with debugging musl's allocator, mallocng.
from __future__ import annotations
import argparse
from typing import Optional
import pwndbg
import pwndbg.aglib.heap.mallocng as mallocng
@ -12,11 +13,27 @@ import pwndbg.aglib.memory as memory
import pwndbg.aglib.typeinfo as typeinfo
import pwndbg.color as C
import pwndbg.color.message as message
from pwndbg import config
from pwndbg.aglib.heap.mallocng import mallocng as ng
from pwndbg.commands import CommandCategory
from pwndbg.lib.pretty_print import Property
from pwndbg.lib.pretty_print import PropertyPrinter
search_on_fail = config.add_param(
"ng-search-on-fail",
True,
"let the ng-slot* commands search the heap if necessary",
help_docstring="""
For freed, avail(able) and corrupted slots, it may be
impossible to recover the start of the group and meta.
When this option is set to True, the ng-slotu and ng-slots
commands will search the heap to try to find the correct meta/group.
""",
param_class=pwndbg.lib.config.PARAM_BOOLEAN,
scope=pwndbg.lib.config.Scope.heap,
)
@pwndbg.commands.Command(
"Gives a quick explanation of musl's mallocng allocator.",
@ -245,7 +262,6 @@ def dump_group(group: mallocng.Group) -> str:
pp = PropertyPrinter()
pp.start_section("group", group_range)
pp.set_padding(5)
pp.add(
[
Property(name="meta", value=group.meta.addr, is_addr=True),
@ -256,7 +272,6 @@ def dump_group(group: mallocng.Group) -> str:
if group_size != -1:
pp.write("---\n")
pp.set_padding(5)
pp.add(
[
Property(name="group size", value=group_size),
@ -274,7 +289,6 @@ def dump_meta(meta: mallocng.Meta) -> str:
pp = PropertyPrinter()
pp.start_section("meta", "@ " + C.memory.get(meta.addr))
pp.set_padding(5)
pp.add(
[
Property(name="prev", value=meta.prev, is_addr=True),
@ -282,20 +296,17 @@ def dump_meta(meta: mallocng.Meta) -> str:
Property(name="mem", value=meta.mem, is_addr=True, extra="the group"),
Property(name="avail_mask", value=meta.avail_mask, extra=avail_binary),
Property(name="freed_mask", value=meta.freed_mask, extra=freed_binary),
Property(name="last_idx", value=meta.last_idx, extra="index of last slot"),
Property(
name="last_idx",
value=meta.last_idx,
alt_value=f"cnt: {meta.cnt:#x}",
extra="index of last slot",
),
Property(name="freeable", value=str(bool(meta.freeable))),
Property(name="sizeclass", value=meta.sizeclass),
Property(name="sizeclass", value=meta.sizeclass, alt_value=f"stride: {meta.stride:#x}"),
Property(name="maplen", value=meta.maplen),
]
)
pp.write("---\n")
pp.set_padding(9)
pp.add(
[
Property(name="cnt", value=meta.cnt, extra="the number of slots"),
Property(name="stride", value=meta.stride),
]
)
pp.end_section()
output = pp.dump()
@ -331,43 +342,60 @@ def dump_meta(meta: mallocng.Meta) -> str:
return output
def dump_slot(slot: mallocng.Slot, all: bool) -> str:
try:
slot.preload()
except pwndbg.dbg_mod.Error as e:
print(message.error(f"Error while reading slot: {e}"))
return ""
def get_colored_slot_state(ss: mallocng.SlotState) -> str:
match ss:
case mallocng.SlotState.ALLOCATED:
return C.green(ss.value)
case mallocng.SlotState.FREED:
return C.red(ss.value)
case mallocng.SlotState.AVAIL:
return C.blue(ss.value)
read_success: bool = True
try:
slot.group.preload()
except pwndbg.dbg_mod.Error as e:
print(message.error(f"Error while reading group: {e}"))
read_success = False
def dump_grouped_slot(gslot: mallocng.GroupedSlot, all: bool) -> str:
pp = PropertyPrinter()
try:
slot.meta.preload()
try:
slot.preload_meta_dependants()
except pwndbg.dbg_mod.Error as e1:
print(message.error(f"Error while loading slot fields that depend on the meta:\n{e1}"))
read_success = False
if not all:
pp.start_section("slab")
pp.add(
[
Property(name="group", value=gslot.group.addr, is_addr=True),
Property(name="meta", value=gslot.meta.addr, is_addr=True),
]
)
pp.end_section()
except pwndbg.dbg_mod.Error as e2:
print(message.error(f"Error while reading meta: {e2}"))
read_success = False
pp.start_section("slot")
pp.add(
[
Property(name="start", value=gslot.start, is_addr=True),
Property(name="end", value=gslot.end, is_addr=True),
Property(name="index", value=gslot.idx),
Property(name="stride", value=gslot.stride),
Property(name="state", value=get_colored_slot_state(gslot.slot_state)),
]
)
pp.end_section()
output = pp.dump()
if all:
output += dump_group(gslot.group)
output += dump_meta(gslot.meta)
if not read_success:
print(message.info("Only showing partial information."))
all = False
return output
def dump_slot(
slot: mallocng.Slot, all: bool, successful_preload: bool, will_dump_gslot: bool
) -> str:
pp = PropertyPrinter()
all = all and successful_preload and not will_dump_gslot
if not all:
pp.start_section("slab")
pp.set_padding(10)
if read_success:
if successful_preload:
pp.add(
[
Property(name="group", value=slot.group.addr, is_addr=True),
@ -382,9 +410,8 @@ def dump_slot(slot: mallocng.Slot, all: bool) -> str:
)
pp.end_section()
if read_success:
if successful_preload:
pp.start_section("general")
pp.set_padding(5)
pp.add(
[
Property(name="start", value=slot.start, is_addr=True),
@ -405,7 +432,6 @@ def dump_slot(slot: mallocng.Slot, all: bool) -> str:
pp.end_section()
pp.start_section("in-band")
pp.set_padding(2)
reserved_extra = ["describes: end - p - n"]
if slot.reserved_in_header == 5:
@ -428,12 +454,12 @@ def dump_slot(slot: mallocng.Slot, all: bool) -> str:
if slot.reserved_in_header == 5:
ftrsv = "NA (meta error)"
if read_success:
if successful_preload:
ftrsv = slot.reserved_in_footer
inband_group.append(Property(name="ftr reserved", value=ftrsv))
if read_success:
if successful_preload:
# Start header fields.
if slot.is_cyclic():
cyc_val = slot.cyclic_offset
@ -455,6 +481,13 @@ def dump_slot(slot: mallocng.Slot, all: bool) -> str:
output = pp.dump()
if not will_dump_gslot:
# The grouped_slot will have accurate information on this,
# no need for us to guess.
output += C.bold(
"\nThe slot is (probably) " + get_colored_slot_state(slot.slot_state) + ".\n\n"
)
if all:
output += dump_group(slot.group)
output += dump_meta(slot.meta)
@ -462,6 +495,92 @@ def dump_slot(slot: mallocng.Slot, all: bool) -> str:
return output
def smart_dump_slot(
slot: mallocng.Slot, all: bool, gslot: Optional[mallocng.GroupedSlot] = None
) -> str:
try:
slot.preload()
except pwndbg.dbg_mod.Error as e:
print(message.error(f"Error while reading slot: {e}"))
return ""
successful_preload: bool = True
err_msg = ""
try:
slot.group.preload()
except pwndbg.dbg_mod.Error as e:
err_msg = message.error(f"Error while reading group: {e}")
successful_preload = False
if successful_preload:
try:
slot.meta.preload()
try:
slot.preload_meta_dependants()
except pwndbg.dbg_mod.Error as e1:
err_msg = message.error(
f"Error while loading slot fields that depend on the meta:\n{e1}"
)
successful_preload = False
except pwndbg.dbg_mod.Error as e2:
err_msg = message.error(f"Error while reading meta: {e2}")
successful_preload = False
if successful_preload:
# If we successfully got the group and meta, using the grouped_slot won't
# give us any new information.
# (Unless the grouped_slot reports a different group than slot.group, which
# could be possible in exploitation I suppose).
return dump_slot(slot, all, True, False)
if not (slot._pn3 == 0xFF or slot._offset == 0):
# If the group/meta read failed because the slot is freed/avail,
# we won't throw an error. This is just a heuristic check for
# better UX. I'm using the private fields for the check so we
# don't accidentally cause an exception here if we are bordering
# unreadable memory.
print(err_msg)
output = ""
if gslot is None:
if not search_on_fail:
output += "Could not load valid meta from local information.\n"
output += "Will not attempt to search the heap because ng-search-on-fail = False.\n\n"
output += dump_slot(slot, all, False, False)
return output
# If it wasn't provided to us, let's try to search for it now.
output += "Could not load valid meta from local information, searching the heap.. "
ng.init_if_needed()
gslot, fslot = ng.find_slot(slot.p, False, False)
if gslot is None:
output += "Not found.\n\n"
output += dump_slot(slot, all, False, False)
return output
else:
if fslot.p == slot.p:
output += "Found it.\n\n"
else:
output += "\nFound a slot with p @ " + C.memory.get(fslot.p) + "."
output += " The slot you are looking for\ndoesn't seem to exist. Maybe its group got freed?\n\n"
output += "Local memory:\n"
output += dump_slot(slot, all, False, False)
return output
# Now we have a valid gslot.
output += "Local slot memory:\n"
output += dump_slot(slot, all, False, True)
output += "\nSlot information from the group/meta:\n"
output += dump_grouped_slot(gslot, all)
return output
parser = argparse.ArgumentParser(
description="""
Dump information about a mallocng slot, given its user address.
@ -492,7 +611,7 @@ def mallocng_slot_user(address: int, all: bool) -> None:
return
slot = mallocng.Slot(address)
print(dump_slot(slot, all), end="")
print(smart_dump_slot(slot, all, None), end="")
parser = argparse.ArgumentParser(
@ -525,7 +644,7 @@ def mallocng_slot_start(address: int, all: bool) -> None:
return
slot = mallocng.Slot.from_start(address)
print(dump_slot(slot, all), end="")
print(smart_dump_slot(slot, all, None), end="")
parser = argparse.ArgumentParser(
@ -662,10 +781,10 @@ def mallocng_find(
ng.init_if_needed()
slot_start = ng.containing(address, metadata, shallow)
grouped_slot, slot = ng.find_slot(address, metadata, shallow)
if slot_start == 0:
if slot is None:
print(message.info("No slot found containing that address."))
return
mallocng_slot_start(slot_start, all=all)
print(smart_dump_slot(slot, all, grouped_slot), end="")

@ -34,12 +34,17 @@ class PropertyPrinter:
def __init__(
self,
value_offset: int = 14,
extra_offset: int = 16,
*,
name_color_func: Optional[Callable[[str], str]] = None,
value_color_func: Optional[Callable[[str], str]] = None,
section_color_func: Optional[Callable[[str], str]] = None,
indent_size: int = 2,
):
self.value_offset = value_offset
self.extra_offset = extra_offset
self.name_color_func = name_color_func
if self.name_color_func is None:
self.name_color_func = color.bold
@ -53,9 +58,7 @@ class PropertyPrinter:
self.section_color_func = color.green
self.indent_size = indent_size
self.indent_level = 0
self.padding = 2
self.text = ""
def add(self, prop_group: List[Property]) -> None:
@ -75,24 +78,16 @@ class PropertyPrinter:
else:
prop.alt_value = str(prop.alt_value)
# Get max lengths to calculate proper ljust
# + 1 to account for the ":"
max_name_len = max(len(prop.name) for prop in prop_group) + 1
# max_value_len = max(len(prop.value) for prop in prop_group)
# Use constant so it works between different groups
max_value_len = 16
indentation_str = self.indent_level * self.indent_size * " "
padding_str = self.padding * " "
name_pad_str = max_name_len * " "
val_pad_str = max_value_len * " "
extra_list_pad_str = indentation_str + name_pad_str + padding_str + val_pad_str
extra_list_pad_str = (
indentation_str + self.value_offset * " " + " " + self.extra_offset * " "
)
for prop in prop_group:
self.text += (
indentation_str
+ color.ljust_colored(self.name_color_func(prop.name) + ":", max_name_len)
+ padding_str
+ color.ljust_colored(self.name_color_func(prop.name) + ":", self.value_offset)
+ " "
)
if prop.is_addr:
@ -105,7 +100,7 @@ class PropertyPrinter:
if prop.alt_value is not None:
colored_alt_val = " (" + self.value_color_func(prop.alt_value) + ")"
self.text += color.ljust_colored(colored_val + colored_alt_val, max_value_len)
self.text += color.ljust_colored(colored_val + colored_alt_val, self.extra_offset)
if isinstance(prop.extra, str):
self.text += " " + prop.extra
@ -181,10 +176,3 @@ class PropertyPrinter:
End a section.
"""
self.unindent()
def set_padding(self, pad: int) -> None:
"""
Set the distance between the end of the longest
property name and the start of the value column.
"""
self.padding = pad

Loading…
Cancel
Save