diff --git a/pwndbg/aglib/kernel/slab.py b/pwndbg/aglib/kernel/slab.py index f24a0c859..f085eebc3 100644 --- a/pwndbg/aglib/kernel/slab.py +++ b/pwndbg/aglib/kernel/slab.py @@ -8,6 +8,7 @@ import pwndbg import pwndbg.aglib.memory import pwndbg.aglib.symbol import pwndbg.aglib.typeinfo +import pwndbg.color.message as M from pwndbg.aglib import kernel from pwndbg.aglib.kernel.macros import compound_head from pwndbg.aglib.kernel.macros import for_each_entry @@ -87,6 +88,7 @@ class Freelist: self.random = random def __iter__(self) -> Generator[int, None, None]: + seen: set[int] = set() current_object = self.start_addr while current_object: addr = int(current_object) @@ -94,12 +96,32 @@ class Freelist: current_object = pwndbg.aglib.memory.pvoid(addr + self.offset) if self.random: current_object ^= self.random ^ swab(addr + self.offset) + if addr in seen: + # this can happen during exploit dev + print( + M.warn( + f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}" + ) + ) + break + seen.add(addr) def __int__(self) -> int: return self.start_addr def __len__(self) -> int: - return sum(1 for _ in self) + seen: set[int] = set() + for addr in self: + if addr in seen: + # this can happen during exploit dev + print( + M.warn( + f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}" + ) + ) + break + seen.add(addr) + return len(seen) def find_next(self, addr: int) -> int: freelist_iter = iter(self) @@ -177,9 +199,44 @@ class SlabCache: def cpu_partial(self) -> int: return int(self._slab_cache["cpu_partial"]) + @property + def cpu_partial_slabs(self) -> int: + if self._slab_cache.dereference().type.has_field("cpu_partial_slabs"): + return int(self._slab_cache["cpu_partial_slabs"]) + return None + + @property + def min_partial(self) -> int: + return int(self._slab_cache["min_partial"]) + @property def inuse(self) -> int: - return int(self._slab_cache["inuse"]) + # somewhat mirrors libslub's implementation + # looks for per_cpu active lists and per_cpu and node partial lists + # no good way to track full slabs unless CONFIG_SLUB_DEBUG is enabled + # which is typically not from what I have seen + cnt = 0 + for cpu_cache in self.cpu_caches: + if cpu_cache.active_slab is not None: + cnt += cpu_cache.active_slab.inuse + for partial_slab in cpu_cache.partial_slabs: + cnt += partial_slab.inuse + for node_cache in self.node_caches: + for partial_slab in node_cache.partial_slabs: + cnt += partial_slab.inuse + return cnt + + @property + def useroffset(self) -> int: + if not self._slab_cache.dereference().type.has_field("useroffset"): + return None + return int(self._slab_cache["useroffset"]) + + @property + def usersize(self) -> int: + if not self._slab_cache.dereference().type.has_field("usersize"): + return None + return int(self._slab_cache["usersize"]) @property def __oo_x(self) -> int: @@ -252,6 +309,14 @@ class NodeCache: ret.append(Slab(slab.dereference(), None, self.slab_cache, is_partial=True)) return ret + @property + def nr_partial(self) -> int: + return int(self._node_cache["nr_partial"]) + + @property + def min_partial(self) -> int: + return self.slab_cache.min_partial + class Slab: def __init__( diff --git a/pwndbg/commands/slab.py b/pwndbg/commands/slab.py index eb413365f..1b1816833 100644 --- a/pwndbg/commands/slab.py +++ b/pwndbg/commands/slab.py @@ -12,11 +12,13 @@ import sys from tabulate import tabulate +import pwndbg import pwndbg.aglib.kernel.slab -import pwndbg.color as C +import pwndbg.aglib.memory import pwndbg.color.message as M import pwndbg.commands from pwndbg.aglib.kernel.slab import CpuCache +from pwndbg.aglib.kernel.slab import Freelist from pwndbg.aglib.kernel.slab import NodeCache from pwndbg.aglib.kernel.slab import Slab from pwndbg.aglib.kernel.slab import find_containing_slab_cache @@ -37,14 +39,22 @@ parser_list.add_argument( "filter_", metavar="filter", type=str, + default=None, nargs="?", help="Only show caches that contain the given filter string", ) -# TODO: --cpu, --node, --partial, --active parser_info = subparsers.add_parser("info", prog="slab info") parser_info.add_argument("names", metavar="name", type=str, nargs="+", help="") parser_info.add_argument("-v", "--verbose", action="store_true", help="") +parser_info.add_argument("-c", "--cpu", type=int, default=False, help="CPU to display") +parser_info.add_argument("-n", "--node", type=int, help="") +parser_info.add_argument( + "-p", "--partial-only", action="store_true", help="only displays partial lists" +) +parser_info.add_argument( + "-a", "--active-only", action="store_true", help="only displays the active list" +) parser_contains = subparsers.add_parser("contains", prog="slab contains") parser_contains.add_argument("addresses", metavar="addr", type=str, nargs="+", help="") @@ -54,73 +64,121 @@ parser_contains.add_argument("addresses", metavar="addr", type=str, nargs="+", h @pwndbg.commands.OnlyWhenQemuKernel @pwndbg.commands.OnlyWithKernelDebugSyms @pwndbg.commands.OnlyWhenPagingEnabled -def slab(command, filter_=None, names=None, verbose=False, addresses=None) -> None: +def slab( + command, + filter_=None, + names=None, + verbose=False, + addresses=None, + cpu=None, + node=None, + partial_only=False, + active_only=False, +) -> None: if command == "list": slab_list(filter_) elif command == "info": + partial, active = True, True + if partial_only and active_only: + print(M.warn("partial_only and active_only are both specified")) + return + if partial_only: + active = False + if active_only: + partial = False for name in names: - slab_info(name, verbose) + slab_info(name, verbose, cpu, node, active, partial) elif command == "contains": for addr in addresses: slab_contains(addr) -def print_slab(slab: Slab, indent, verbose: bool) -> None: +def print_slab(slab: Slab, indent, verbose: bool, freelist: Freelist = None) -> None: indent.print( - f"- {C.green('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:" + f"- {indent.prefix('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:" ) with indent: - indent.print(f"{C.blue('In-Use')}: {slab.inuse}/{slab.object_count}") - indent.print(f"{C.blue('Frozen')}: {slab.frozen}") - indent.print(f"{C.blue('Freelist')}: {indent.addr_hex(int(slab.freelist))}") + indent.print(f"{indent.prefix('In-Use')}: {slab.inuse}/{slab.object_count}") + indent.print(f"{indent.prefix('Frozen')}: {slab.frozen}") + indent.print(f"{indent.prefix('Freelist')}: {indent.addr_hex(int(slab.freelist))}") + + idx = 0 + indexes = {} + if freelist is None: + freelist = slab.freelist + for addr in freelist: + if addr in indexes: + break + indexes[addr] = idx + idx += 1 if verbose: with indent: free_objects = slab.free_objects for addr in slab.objects: + index = "0x--" + if addr in indexes: + index = f"0x{indexes[addr]:02}" + prefix = f"- {indent.prefix(f'[{index}]')} {indent.addr_hex(addr)}" if addr not in free_objects: - indent.print(f"- {addr:#x} (in-use)") + indent.print(f"{prefix} (in-use)") continue - for freelist in slab.freelists: - next_free = freelist.find_next(addr) - if next_free: - indent.print(f"- {indent.addr_hex(addr)} (next: {next_free:#x})") - break + next_free = freelist.find_next(addr) + if next_free: + indent.print(f"{prefix} (next: {indent.aux_hex(next_free)})") else: - indent.print(f"- {indent.addr_hex(addr)} (no next)") + indent.print(f"{prefix} (no next)") -def print_cpu_cache(cpu_cache: CpuCache, verbose: bool, indent) -> None: +def print_cpu_cache( + cpu_cache: CpuCache, verbose: bool, active: bool, partial: bool, indent +) -> None: indent.print( - f"{C.green('kmem_cache_cpu')} @ {indent.addr_hex(cpu_cache.address)} [CPU {cpu_cache.cpu}]:" + f"{indent.prefix('kmem_cache_cpu')} @ {indent.addr_hex(cpu_cache.address)} [CPU {cpu_cache.cpu}]:" ) with indent: - indent.print(f"{C.blue('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist))) - - active_slab = cpu_cache.active_slab - if active_slab: - indent.print(f"{C.green('Active Slab')}:") - with indent: - print_slab(active_slab, indent, verbose) - else: - indent.print("Active Slab: (none)") - + if active: + indent.print(f"{indent.prefix('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist))) + active_slab = cpu_cache.active_slab + if active_slab: + indent.print(f"{indent.prefix('Active Slab')}:") + with indent: + print_slab(active_slab, indent, verbose, cpu_cache.freelist) + else: + indent.print("Active Slab: (none)") + + if not partial: + return partial_slabs = cpu_cache.partial_slabs if not partial_slabs: indent.print("Partial Slabs: (none)") return slabs = partial_slabs[0].slabs - pobjects = partial_slabs[0].pobjects - cpu_partial = partial_slabs[0].slab_cache.cpu_partial - indent.print(f"{C.green('Partial Slabs')} [{slabs}] [PO: ~{pobjects}/{cpu_partial}]:") - for partial_slab in partial_slabs: - print_slab(partial_slab, indent, verbose) + # the kernel checks cpu_partial_slabs to determine whether partial slabs are to be flushed + # see: https://elixir.bootlin.com/linux/v6.13/source/mm/slub.c#L3209 + cpu_partial_slabs = partial_slabs[0].slab_cache.cpu_partial_slabs + if cpu_partial_slabs is None: + # legacy + cpu_partial_slabs = partial_slabs[0].pobjects + indent.print( + f"{indent.prefix('Partial Slabs')} [nr_slabs/cpu_partial_slabs: {indent.aux_hex(slabs)}/{indent.aux_hex(cpu_partial_slabs)}]" + ) + with indent: + for partial_slab in partial_slabs: + print_slab(partial_slab, indent, verbose) def print_node_cache(node_cache: NodeCache, verbose: bool, indent) -> None: + address, nr_partial, min_partial, node = ( + node_cache.address, + node_cache.nr_partial, + node_cache.min_partial, + node_cache.node, + ) + # https://elixir.bootlin.com/linux/v6.13/source/mm/slub.c#L3140 indent.print( - f"{C.green('kmem_cache_node')} @ {indent.addr_hex(node_cache.address)} [NUMA node {node_cache.node}]:" + f"{indent.prefix('kmem_cache_node')} @ {indent.addr_hex(address)} [NUMA node {node}, nr_partial/min_partial: {indent.aux_hex(nr_partial)}/{indent.aux_hex(min_partial)}]:" ) with indent: partial_slabs = node_cache.partial_slabs @@ -128,12 +186,15 @@ def print_node_cache(node_cache: NodeCache, verbose: bool, indent) -> None: indent.print("Partial Slabs: (none)") return - indent.print(f"{C.green('Partial Slabs')}:") - for slab in partial_slabs: - print_slab(slab, indent, verbose) + indent.print( + f"{indent.prefix('Partial Slabs')} [nr_partial: {indent.aux_hex(len(partial_slabs))}]" + ) + with indent: + for slab in partial_slabs: + print_slab(slab, indent, verbose) -def slab_info(name: str, verbose: bool) -> None: +def slab_info(name: str, verbose: bool, cpu: int, node: int, active: bool, partial: bool) -> None: slab_cache = pwndbg.aglib.kernel.slab.get_cache(name) if slab_cache is None: @@ -142,24 +203,40 @@ def slab_info(name: str, verbose: bool) -> None: indent = IndentContextManager() - indent.print(f"{C.green('Slab Cache')} @ {indent.addr_hex(slab_cache.address)}") + indent.print(f"{indent.prefix('Slab Cache')} @ {indent.addr_hex(slab_cache.address)}") with indent: - indent.print(f"{C.blue('Name')}: {slab_cache.name}") + indent.print(f"{indent.prefix('Name')}: {slab_cache.name}") flags_list = slab_cache.flags if flags_list: - indent.print(f"{C.blue('Flags')}: {' | '.join(flags_list)}") + indent.print(f"{indent.prefix('Flags')}: {' | '.join(flags_list)}") else: - indent.print(f"{C.blue('Flags')}: (none)") - - indent.print(f"{C.blue('Offset')}: {slab_cache.offset}") - indent.print(f"{C.blue('Size')}: {slab_cache.size}") - indent.print(f"{C.blue('Align')}: {slab_cache.align}") - indent.print(f"{C.blue('Object Size')}: {slab_cache.object_size}") + indent.print(f"{indent.prefix('Flags')}: (none)") + + indent.print(f"{indent.prefix('Offset')}: {indent.aux_hex(slab_cache.offset)}") + indent.print( + f"{indent.prefix('Slab size')}: {indent.aux_hex(0x1000 << slab_cache.oo_order)}" + ) + indent.print( + f"{indent.prefix('Size (without metadata)')}: {indent.aux_hex(slab_cache.size)}" + ) + indent.print(f"{indent.prefix('Align')}: {indent.aux_hex(slab_cache.align)}") + indent.print(f"{indent.prefix('Object Size')}: {indent.aux_hex(slab_cache.object_size)}") + useroffset, usersize = slab_cache.useroffset, slab_cache.useroffset + if useroffset is not None and usersize is not None: + indent.print(f"{indent.prefix('Usercopy region offset')}: {useroffset}") + indent.print(f"{indent.prefix('Usercopy region size')}: {usersize}") for cpu_cache in slab_cache.cpu_caches: - print_cpu_cache(cpu_cache, verbose, indent) + if cpu_cache.cpu is not None and cpu_cache.cpu != cpu: + continue + print_cpu_cache(cpu_cache, verbose, active, partial, indent) + + if not partial: + return for node_cache in slab_cache.node_caches: + if node is not None and node != node_cache.node: + continue print_node_cache(node_cache, verbose, indent) @@ -190,6 +267,9 @@ def slab_contains(address: str) -> None: print(M.error(f"Message: {e}")) return - addr = int(parsed_addr) - slab_cache = find_containing_slab_cache(addr) - print(f"{addr:#x} @", M.hint(f"{slab_cache.name}")) + addr = int(pwndbg.aglib.memory.get_typed_pointer("void", parsed_addr)) + try: + slab_cache = find_containing_slab_cache(addr) + print(f"{addr:#x} @", M.hint(f"{slab_cache.name}")) + except Exception: + print(M.warn("address does not belong to a SLUB cache")) diff --git a/tests/qemu-tests/tests/system/test_commands_kernel.py b/tests/qemu-tests/tests/system/test_commands_kernel.py index b72dafbb0..3c5922aa4 100644 --- a/tests/qemu-tests/tests/system/test_commands_kernel.py +++ b/tests/qemu-tests/tests/system/test_commands_kernel.py @@ -96,7 +96,9 @@ def get_slab_object_address(): for cache in caches: cache_name = cache.name info = gdb.execute(f"slab info -v {cache_name}", to_string=True) - matches = re.findall(r"- (0x[0-9a-fA-F]+)", info) + ansi_escape = re.compile(r"\x1b\[[0-9;]*m") + info = ansi_escape.sub("", info) + matches = re.findall(r"- \[0x[0-9a-fA-F\-]{2}\] (0x[0-9a-fA-F]+)", info) if len(matches) > 0: return (matches[0], cache_name) raise ValueError("Could not find any slab objects")