From c2c31fc01ebab82e280837074ea048846f18722e Mon Sep 17 00:00:00 2001 From: jxuanli <65455765+jxuanli@users.noreply.github.com> Date: Mon, 29 Sep 2025 12:53:17 -0700 Subject: [PATCH] Handling `kmem_cache` recovery edge case + improving SLUB corrupted list handling (#3311) * gracefully handle the case when `char_ptr` points to a string that contains none utf-8 chars * handling `struct kmem_cache` recovery edge case * improved slub warning message handling * added emphasize * slab free obj addr bound checking * refactored a bit * linting * added head of list error handling * updated error message * cleaning up * cleaning up --- pwndbg/aglib/kernel/slab.py | 184 +++++++++--------- pwndbg/commands/slab.py | 95 ++++++--- .../qemu_system/tests/test_commands_kernel.py | 2 +- 3 files changed, 153 insertions(+), 128 deletions(-) diff --git a/pwndbg/aglib/kernel/slab.py b/pwndbg/aglib/kernel/slab.py index 2db4b03da..4fe495558 100644 --- a/pwndbg/aglib/kernel/slab.py +++ b/pwndbg/aglib/kernel/slab.py @@ -10,7 +10,6 @@ import pwndbg.aglib.kernel.symbol import pwndbg.aglib.memory import pwndbg.aglib.symbol import pwndbg.aglib.typeinfo -import pwndbg.color.message as M from pwndbg.aglib import kernel from pwndbg.aglib.kernel.macros import compound_head from pwndbg.aglib.kernel.macros import for_each_entry @@ -82,61 +81,58 @@ def get_flags_list(flags: int) -> List[str]: class Freelist: - def __init__(self, start_addr: int, offset: int, random: int = 0) -> None: + def __init__(self, start_addr: int, slab: Slab) -> None: self.start_addr = start_addr - self.offset = offset - self.random = random + self.slab = slab + if not self.slab: + return + self.offset = slab.slab_cache.offset + self.random = slab.slab_cache.random + self.cyclic = None def __iter__(self) -> Generator[int, None, None]: + if not self.slab: + return seen: set[int] = set() - current_object = self.start_addr - while current_object: - try: - addr = int(current_object) - except Exception: - print( - M.warn( - f"Corrupted slab freelist detected at {hex(current_object)} when length is {len(seen)}" - ) - ) + curr = None + next = self.start_addr + while next: + if next in seen: + self.cyclic = curr + return + if not pwndbg.aglib.memory.is_kernel(next + self.offset): break - yield current_object - current_object = pwndbg.aglib.memory.read_pointer_width(addr + self.offset) - if self.random: - current_object ^= self.random ^ swab(addr + self.offset) - if addr in seen: - # this can happen during exploit dev - print( - M.warn( - f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}" - ) - ) + if next not in self.slab or not self.is_valid_obj(next): break - seen.add(addr) + curr = next + next = self.find_next(curr) + yield curr + seen.add(curr) + # reaching here means the freelist is not cyclic (prior to detections of other corruptions) + self.cyclic = None def __int__(self) -> int: return self.start_addr def __len__(self) -> int: - seen: set[int] = set() - for addr in self: - if addr in seen: - # this can happen during exploit dev - print( - M.warn( - f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}" - ) - ) - break - seen.add(addr) - return len(seen) + return sum(1 for _ in self) def find_next(self, addr: int) -> int: - freelist_iter = iter(self) - for obj in freelist_iter: - if obj == addr: - return next(freelist_iter, 0) - return 0 + # assumes addr is in this freelist -> assert(addr in self) + # caller should assert this behaviour to avoid traversing the list unnecessarily + if not self.slab: + raise ValueError("slab freelist must belong to a slab") + next = pwndbg.aglib.memory.read_pointer_width(addr + self.offset) + if self.random: + next ^= self.random ^ swab(addr + self.offset) + return next + + def is_valid_obj(self, addr): + if not self.slab: + return + diff = addr - self.slab.virt_address + sz = self.slab.slab_cache.size + return diff % sz == 0 and 0 <= (diff // sz) < self.slab.object_count class SlabCache: @@ -289,11 +285,7 @@ class CpuCache: @property def freelist(self) -> Freelist: - return Freelist( - int(self._cpu_cache["freelist"]), - self.slab_cache.offset, - self.slab_cache.random, - ) + return Freelist(int(self._cpu_cache["freelist"]), self.active_slab) @property def active_slab(self) -> Slab | None: @@ -301,7 +293,7 @@ class CpuCache: _slab = self._cpu_cache[slab_key] if not int(_slab): return None - return Slab(_slab.dereference(), self, None) + return Slab(_slab.dereference(), cpu_cache=self, is_active=True) @property def partial_slabs(self) -> List[Slab]: @@ -312,7 +304,7 @@ class CpuCache: cur_slab_int = int(cur_slab) while cur_slab_int: _slab = cur_slab.dereference() - partial_slabs.append(Slab(_slab, self, None, is_partial=True)) + partial_slabs.append(Slab(_slab, cpu_cache=self)) cur_slab = _slab["next"] cur_slab_int = int(cur_slab) return partial_slabs @@ -334,7 +326,7 @@ class NodeCache: for slab in for_each_entry( self._node_cache["partial"], f"struct {slab_struct_type()}", "slab_list" ): - ret.append(Slab(slab.dereference(), None, self, is_partial=True)) + ret.append(Slab(slab.dereference(), node_cache=self)) return ret @property @@ -350,21 +342,19 @@ class Slab: def __init__( self, slab: pwndbg.dbg_mod.Value, - cpu_cache: CpuCache | None, - node_cache: NodeCache | None, - is_partial: bool = False, + cpu_cache: CpuCache = None, + node_cache: NodeCache = None, + is_active: bool = False, ) -> None: self._slab = slab - self.cpu_cache = cpu_cache - self.node_cache = node_cache - self.is_partial = is_partial - self.is_cpu = False - self.slab_cache = None + self.is_active = is_active if cpu_cache is not None: + self.cpu_cache = cpu_cache self.is_cpu = True self.slab_cache = cpu_cache.slab_cache - assert node_cache is None - if node_cache is not None: + elif node_cache is not None: + self.node_cache = node_cache + self.is_cpu = False self.slab_cache = node_cache.slab_cache @property @@ -372,6 +362,7 @@ class Slab: return int(self._slab.address) @property + @pwndbg.lib.cache.cache_until("stop") def virt_address(self) -> int: return kernel.page_to_virt(self.slab_address) @@ -393,8 +384,8 @@ class Slab: @property def inuse(self) -> int: inuse = int(self._slab["inuse"]) - if not self.is_partial: - # I believe only the cpu freelist is considered "inuse" similar to glibc's tcache + if self.is_active: + # only the cpu freelist is considered "inuse" similar to glibc's tcache inuse -= len(self.cpu_cache.freelist) return inuse @@ -404,7 +395,7 @@ class Slab: @property def pobjects(self) -> int: - if not self.is_partial: + if self.is_active: return 0 if self._slab.type.has_field("pobjects"): return int(self._slab["pobjects"]) @@ -416,22 +407,17 @@ class Slab: @property def freelist(self) -> Freelist: - return Freelist( - int(self._slab["freelist"]), - self.slab_cache.offset, - self.slab_cache.random, - ) - - @property - def freelists(self) -> List[Freelist]: - freelists = [self.freelist] - if not self.is_partial: - freelists.append(self.cpu_cache.freelist) - return freelists + return Freelist(int(self._slab["freelist"]), self) @property def free_objects(self) -> Set[int]: - return {obj for freelist in self.freelists for obj in freelist} + result = set() + for obj in self.freelist: + result.add(obj) + if self.is_active and self.cpu_cache.freelist: + for obj in self.cpu_cache.freelist: + result.add(obj) + return result def __contains__(self, addr: int): return self.virt_address <= addr < self.virt_address + self.slab_cache.slab_size @@ -484,6 +470,7 @@ def kmem_cache_pad_sz(kconfig) -> Tuple[int, int]: name_off = i * 8 break assert name_off, "can't determine kmem_cache name offset" + distance, node_cache_pad = None, None if pwndbg.aglib.kernel.krelease() >= (6, 2) and all( config not in kconfig for config in ( @@ -502,31 +489,36 @@ def kmem_cache_pad_sz(kconfig) -> Tuple[int, int]: node_cache_pad = kmem_cache_node_pad_sz( kmem_cache + name_off + 0x8 * 3 ) # name ptr + 2 list ptrs - assert node_cache_pad, "can't determine kmem cache node padding size" + assert node_cache_pad, "can't find kmem_cache node" distance = 8 if "CONFIG_SLAB_FREELIST_RANDOM" in kconfig else 0 return distance, node_cache_pad elif "CONFIG_SLAB_FREELIST_RANDOM" in kconfig: for i in range(3, 0x20): ptr = kmem_cache + name_off + i * 8 val = pwndbg.aglib.memory.u64(ptr) - if pwndbg.aglib.memory.is_kernel(val): - distance = (i + 1) * 8 - node_cache_pad = kmem_cache_node_pad_sz(kmem_cache + name_off + distance) - assert node_cache_pad, "can't determine kmem cache node padding size" - return distance, node_cache_pad - distance, node_cache_pad = None, None - for i in range(3, 0x20): - ptr = kmem_cache + name_off + i * 8 - val = pwndbg.aglib.memory.u64(ptr - 8) - if pwndbg.aglib.memory.peek(val) is not None: - continue - val = pwndbg.aglib.memory.u64(ptr) - if pwndbg.aglib.memory.peek(val) is None: - continue - node_cache_pad = kmem_cache_node_pad_sz(val) - if node_cache_pad is not None: - distance = i * 8 - break + if pwndbg.aglib.memory.is_kernel(val) and all( + pwndbg.aglib.memory.u32(val + i * 4) < 0x10000 for i in range(10) + ): + _distance = (i + 1) * 8 + val = pwndbg.aglib.memory.u64(kmem_cache + name_off + _distance) + node_cache_pad = kmem_cache_node_pad_sz(val) + if node_cache_pad is not None: + distance = _distance + break + assert distance, "can't find kmem_cache node" + if distance is None: + for i in range(3, 0x20): + ptr = kmem_cache + name_off + i * 8 + val = pwndbg.aglib.memory.u64(ptr - 8) + if pwndbg.aglib.memory.peek(val) is not None: + continue + val = pwndbg.aglib.memory.u64(ptr) + if pwndbg.aglib.memory.peek(val) is None: + continue + node_cache_pad = kmem_cache_node_pad_sz(val) + if node_cache_pad is not None: + distance = i * 8 + break assert distance, "can't find kmem_cache node" distance -= 0x18 # the name ptr + list_head configs = ( diff --git a/pwndbg/commands/slab.py b/pwndbg/commands/slab.py index 6f989e0a8..aa2ccd91e 100644 --- a/pwndbg/commands/slab.py +++ b/pwndbg/commands/slab.py @@ -98,7 +98,40 @@ def slab( slab_contains(addr) -def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None) -> None: +def emphasize(s): + return pwndbg.color.underline(pwndbg.color.bold(pwndbg.color.red(s))) + + +def handle_next(curr: int, freelist: Freelist, indent): + next = freelist.find_next(curr) + if next == 0: + return "no next" + desc = f"next: {indent.aux_hex(next)}" + if not pwndbg.aglib.memory.is_kernel(next + freelist.offset): + desc = emphasize("invalid address") + " " + desc + elif freelist.cyclic is not None and freelist.cyclic == curr: + desc = emphasize("cyclic list detected") + ", " + desc + elif next not in freelist.slab: + desc = emphasize("next is not within the slab") + ", " + desc + elif not freelist.is_valid_obj(next): + desc = emphasize("unaligned or out-of-range") + " " + desc + return desc + + +def freelist_desc(freelist: Freelist, indent): + head = int(freelist) + desc = None + if head: + if not pwndbg.aglib.memory.is_kernel(head): + desc = "invalid address" + elif head not in freelist.slab: + desc = "not within the slab" + elif not freelist.is_valid_obj(head): + desc = "unaligned or out-of-range" + return indent.addr_hex(head) + (f" [{emphasize(desc)}]" if desc else "") + + +def print_slab(slab: Slab, indent, verbose: bool) -> None: indent.print( f"- {indent.prefix('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:" ) @@ -106,12 +139,12 @@ def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None) with indent: indent.print(f"{indent.prefix('In-Use')}: {slab.inuse}/{slab.object_count}") indent.print(f"{indent.prefix('Frozen')}: {slab.frozen}") - indent.print(f"{indent.prefix('Freelist')}: {indent.addr_hex(int(slab.freelist))}") + indent.print(f"{indent.prefix('Freelist')}: {freelist_desc(slab.freelist, indent)}") - idx = 0 + cpu_freelist = slab.cpu_cache.freelist if slab.is_active else None indexes = {} freelist = slab.freelist - for addr in freelist: + for idx, addr in enumerate(freelist): if addr in indexes: break indexes[addr] = idx @@ -127,28 +160,26 @@ def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None) free_objects = slab.free_objects for addr in slab.objects: prefix = f"- {indent.prefix('[0x--]')} {hex(addr)}" - if addr in indexes: - prefix = ( - f"- {indent.prefix(f'[0x{indexes[addr]:02}]')} {indent.addr_hex(addr)}" - ) if addr not in free_objects: indent.print(f"{prefix} (in-use)") continue - next_free = freelist.find_next(addr) - if next_free: - indent.print(f"{prefix} (next: {indent.aux_hex(next_free)})") + index = indexes[addr] + if addr in indexes: + prefix = f"- {indent.prefix(f'[0x{index:02x}]')} {indent.addr_hex(addr)}" + desc = None + in_cpu_freelist = False + if addr in freelist: + desc = handle_next(addr, freelist, indent) + elif cpu_freelist is not None and addr in cpu_freelist: + # need to traverse the list to catch potential freelist.cyclic + desc = handle_next(addr, cpu_freelist, indent) + in_cpu_freelist = True + if desc is None: + desc = "something went wrong" + if in_cpu_freelist: + indent.print(f"{prefix} ({desc}) [CPU cache]") continue - if cpu_freelist is not None: - next_free = cpu_freelist.find_next(addr) - if next_free: - indent.print( - f"{prefix} (next: {indent.aux_hex(next_free)}) [CPU cache]" - ) - continue - if addr in cpu_freelist: - indent.print(f"{prefix} (no next) [CPU cache]") - continue - indent.print(f"{prefix} (no next)") + indent.print(f"{prefix} ({desc})") def print_cpu_cache( @@ -159,12 +190,12 @@ def print_cpu_cache( ) with indent: if active: - indent.print(f"{indent.prefix('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist))) + indent.print(f"{indent.prefix('Freelist')}:", freelist_desc(cpu_cache.freelist, indent)) active_slab = cpu_cache.active_slab if active_slab: indent.print(f"{indent.prefix('Active Slab')}:") with indent: - print_slab(active_slab, indent, verbose, cpu_cache.freelist) + print_slab(active_slab, indent, verbose) else: indent.print("Active Slab: (none)") @@ -291,7 +322,7 @@ def slab_contains(address: str) -> None: print(f"{addr:#x} @", M.hint(f"{slab_cache.name}")) slab = slab_cache.find_containing_slab(addr) if slab is None: - print(M.warn("Did not finding containing slab.")) + print(M.warn("Did not find containing slab.")) return desc = "[something went wrong]" inuse = desc @@ -300,12 +331,14 @@ def slab_contains(address: str) -> None: inuse = "free" elif addr in slab.objects: inuse = "in-use" - if slab.is_cpu and not slab.is_partial: - desc = f"[active, cpu {slab.cpu_cache.cpu}]" - elif slab.is_cpu and slab.is_partial: - desc = f"[partial, cpu {slab.cpu_cache.cpu}]" - elif not slab.is_cpu and slab.is_partial: - desc = f"[partial, node {slab.node_cache.node}]" + if slab.is_active: + if slab.is_cpu: + desc = f"[active, cpu {slab.cpu_cache.cpu}]" + else: + if slab.is_cpu: + desc = f"[partial, cpu {slab.cpu_cache.cpu}]" + else: + desc = f"[partial, node {slab.node_cache.node}]" except Exception: pass print("slab:", M.hint(f"{hex(slab.virt_address)}"), desc) diff --git a/tests/library/qemu_system/tests/test_commands_kernel.py b/tests/library/qemu_system/tests/test_commands_kernel.py index 9962a6893..bbf551800 100644 --- a/tests/library/qemu_system/tests/test_commands_kernel.py +++ b/tests/library/qemu_system/tests/test_commands_kernel.py @@ -100,7 +100,7 @@ def test_command_slab_info(): pwndbg.aglib.kernel.slab.load_slab_typeinfo() for cache in pwndbg.aglib.kernel.slab.caches(): cache_name = cache.name - res = gdb.execute(f"slab info -v {cache_name}", to_string=True) + res = gdb.execute(f"slab info {cache_name}", to_string=True) assert cache_name in res assert "Freelist" in res for cpu in range(pwndbg.aglib.kernel.nproc()):