Improving the slab commands (#3135)

* improving the slab command

* improved slab display

* displaying object inuse status

* removed verbose

* cleaned up

* updated test
pull/3137/head
jxuanli 6 months ago committed by GitHub
parent 309d4ebc96
commit c25c60875d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -91,7 +91,15 @@ class Freelist:
seen: set[int] = set()
current_object = self.start_addr
while current_object:
addr = int(current_object)
try:
addr = int(current_object)
except Exception:
print(
M.warn(
f"Corrupted slab freelist detected at {hex(current_object)} when length is {len(seen)}"
)
)
break
yield current_object
current_object = pwndbg.aglib.memory.read_pointer_width(addr + self.offset)
if self.random:
@ -163,6 +171,10 @@ class SlabCache:
def size(self) -> int:
return int(self._slab_cache["size"])
@property
def slab_size(self) -> int:
return 0x1000 << self.oo_order
@property
def object_size(self) -> int:
return int(self._slab_cache["object_size"])
@ -250,6 +262,20 @@ class SlabCache:
def oo_objects(self):
return oo_objects(self.__oo_x)
def find_containing_slab(self, address) -> Slab | None:
for cpu_cache in self.cpu_caches:
slab = cpu_cache.active_slab
if slab is not None and address in slab:
return slab
for slab in cpu_cache.partial_slabs:
if slab is not None and address in slab:
return slab
for node_cache in self.node_caches:
for slab in node_cache.partial_slabs:
if slab is not None and address in slab:
return slab
return None
class CpuCache:
def __init__(self, cpu_cache: pwndbg.dbg_mod.Value, slab_cache: SlabCache, cpu: int) -> None:
@ -275,7 +301,7 @@ class CpuCache:
_slab = self._cpu_cache[slab_key]
if not int(_slab):
return None
return Slab(_slab.dereference(), self, self.slab_cache)
return Slab(_slab.dereference(), self, None)
@property
def partial_slabs(self) -> List[Slab]:
@ -284,7 +310,7 @@ class CpuCache:
cur_slab_int = int(cur_slab)
while cur_slab_int:
_slab = cur_slab.dereference()
partial_slabs.append(Slab(_slab, self, self.slab_cache, is_partial=True))
partial_slabs.append(Slab(_slab, self, None, is_partial=True))
cur_slab = _slab["next"]
cur_slab_int = int(cur_slab)
return partial_slabs
@ -306,7 +332,7 @@ class NodeCache:
for slab in for_each_entry(
self._node_cache["partial"], f"struct {slab_struct_type()}", "slab_list"
):
ret.append(Slab(slab.dereference(), None, self.slab_cache, is_partial=True))
ret.append(Slab(slab.dereference(), None, self, is_partial=True))
return ret
@property
@ -323,13 +349,21 @@ class Slab:
self,
slab: pwndbg.dbg_mod.Value,
cpu_cache: CpuCache | None,
slab_cache: SlabCache,
node_cache: NodeCache | None,
is_partial: bool = False,
) -> None:
self._slab = slab
self.cpu_cache = cpu_cache
self.slab_cache = slab_cache
self.node_cache = node_cache
self.is_partial = is_partial
self.is_cpu = False
self.slab_cache = None
if cpu_cache is not None:
self.is_cpu = True
self.slab_cache = cpu_cache.slab_cache
assert node_cache is None
if node_cache is not None:
self.slab_cache = node_cache.slab_cache
@property
def slab_address(self) -> int:
@ -399,6 +433,9 @@ class Slab:
def free_objects(self) -> Set[int]:
return {obj for freelist in self.freelists for obj in freelist}
def __contains__(self, addr: int):
return self.virt_address <= addr < self.virt_address + self.slab_cache.slab_size
def find_containing_slab_cache(addr: int) -> SlabCache | None:
"""Find the slab cache associated with the provided address."""

@ -95,7 +95,7 @@ def slab(
slab_contains(addr)
def print_slab(slab: Slab, indent, verbose: bool, freelist: Freelist = None) -> None:
def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None) -> None:
indent.print(
f"- {indent.prefix('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:"
)
@ -107,30 +107,45 @@ def print_slab(slab: Slab, indent, verbose: bool, freelist: Freelist = None) ->
idx = 0
indexes = {}
if freelist is None:
freelist = slab.freelist
freelist = slab.freelist
for addr in freelist:
if addr in indexes:
break
indexes[addr] = idx
idx += 1
if cpu_freelist is not None:
for idx, addr in enumerate(cpu_freelist):
if addr in indexes:
break
indexes[addr] = idx
if verbose:
with indent:
free_objects = slab.free_objects
for addr in slab.objects:
index = "0x--"
prefix = f"- {indent.prefix('[0x--]')} {hex(addr)}"
if addr in indexes:
index = f"0x{indexes[addr]:02}"
prefix = f"- {indent.prefix(f'[{index}]')} {indent.addr_hex(addr)}"
prefix = (
f"- {indent.prefix(f'[0x{indexes[addr]:02}]')} {indent.addr_hex(addr)}"
)
if addr not in free_objects:
indent.print(f"{prefix} (in-use)")
continue
next_free = freelist.find_next(addr)
if next_free:
indent.print(f"{prefix} (next: {indent.aux_hex(next_free)})")
else:
indent.print(f"{prefix} (no next)")
continue
if cpu_freelist is not None:
next_free = cpu_freelist.find_next(addr)
if next_free:
indent.print(
f"{prefix} (next: {indent.aux_hex(next_free)}) [CPU cache]"
)
continue
if addr in cpu_freelist:
indent.print(f"{prefix} (no next) [CPU cache]")
continue
indent.print(f"{prefix} (no next)")
def print_cpu_cache(
@ -215,15 +230,13 @@ def slab_info(name: str, verbose: bool, cpu: int, node: int, active: bool, parti
indent.print(f"{indent.prefix('Flags')}: (none)")
indent.print(f"{indent.prefix('Offset')}: {indent.aux_hex(slab_cache.offset)}")
indent.print(
f"{indent.prefix('Slab size')}: {indent.aux_hex(0x1000 << slab_cache.oo_order)}"
)
indent.print(f"{indent.prefix('Slab size')}: {indent.aux_hex(slab_cache.slab_size)}")
indent.print(
f"{indent.prefix('Size (without metadata)')}: {indent.aux_hex(slab_cache.size)}"
)
indent.print(f"{indent.prefix('Align')}: {indent.aux_hex(slab_cache.align)}")
indent.print(f"{indent.prefix('Object Size')}: {indent.aux_hex(slab_cache.object_size)}")
useroffset, usersize = slab_cache.useroffset, slab_cache.useroffset
useroffset, usersize = slab_cache.useroffset, slab_cache.usersize
if useroffset is not None and usersize is not None:
indent.print(f"{indent.prefix('Usercopy region offset')}: {useroffset}")
indent.print(f"{indent.prefix('Usercopy region size')}: {usersize}")
@ -262,16 +275,37 @@ def slab_list(filter_) -> None:
def slab_contains(address: str) -> None:
"""prints the slab_cache associated with the provided address"""
addr = None
try:
parsed_addr = pwndbg.dbg.selected_frame().evaluate_expression(address)
addr = int(pwndbg.dbg.selected_frame().evaluate_expression(address))
except pwndbg.dbg_mod.Error as e:
print(M.error(f"Could not parse '{address}'"))
print(M.error(f"Message: {e}"))
return
addr = int(pwndbg.aglib.memory.get_typed_pointer("void", parsed_addr))
try:
slab_cache = find_containing_slab_cache(addr)
print(f"{addr:#x} @", M.hint(f"{slab_cache.name}"))
slab = slab_cache.find_containing_slab(addr)
if slab is None:
print(M.warn("Did not finding containing slab."))
return
desc = "[something went wrong]"
inuse = desc
try:
if addr in slab.free_objects:
inuse = "free"
elif addr in slab.objects:
inuse = "in-use"
if slab.is_cpu and not slab.is_partial:
desc = f"[active, cpu {slab.cpu_cache.cpu}]"
elif slab.is_cpu and slab.is_partial:
desc = f"[partial, cpu {slab.cpu_cache.cpu}]"
elif not slab.is_cpu and slab.is_partial:
desc = f"[partial, node {slab.node_cache.node}]"
except Exception:
pass
print("slab:", M.hint(f"{hex(slab.virt_address)}"), desc)
print("status:", M.hint(inuse))
except Exception:
print(M.warn("address does not belong to a SLUB cache"))

@ -111,6 +111,7 @@ def test_command_slab_contains():
res = gdb.execute(f"slab contains {addr}", to_string=True)
assert f"{addr} @ {slab_cache}" in res
assert "cpu" in res or "node" in res
@pytest.mark.skipif(

Loading…
Cancel
Save