Handling `kmem_cache` recovery edge case + improving SLUB corrupted list handling (#3311)

* gracefully handle the case when `char_ptr` points to a string that contains none utf-8 chars

* handling `struct kmem_cache` recovery edge case

* improved slub warning message handling

* added emphasize

* slab free obj addr bound checking

* refactored a bit

* linting

* added head of list error handling

* updated error message

* cleaning up

* cleaning up
pull/3319/head
jxuanli 2 months ago committed by GitHub
parent 962e11ef8a
commit c2c31fc01e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -10,7 +10,6 @@ import pwndbg.aglib.kernel.symbol
import pwndbg.aglib.memory
import pwndbg.aglib.symbol
import pwndbg.aglib.typeinfo
import pwndbg.color.message as M
from pwndbg.aglib import kernel
from pwndbg.aglib.kernel.macros import compound_head
from pwndbg.aglib.kernel.macros import for_each_entry
@ -82,61 +81,58 @@ def get_flags_list(flags: int) -> List[str]:
class Freelist:
def __init__(self, start_addr: int, offset: int, random: int = 0) -> None:
def __init__(self, start_addr: int, slab: Slab) -> None:
self.start_addr = start_addr
self.offset = offset
self.random = random
self.slab = slab
if not self.slab:
return
self.offset = slab.slab_cache.offset
self.random = slab.slab_cache.random
self.cyclic = None
def __iter__(self) -> Generator[int, None, None]:
if not self.slab:
return
seen: set[int] = set()
current_object = self.start_addr
while current_object:
try:
addr = int(current_object)
except Exception:
print(
M.warn(
f"Corrupted slab freelist detected at {hex(current_object)} when length is {len(seen)}"
)
)
curr = None
next = self.start_addr
while next:
if next in seen:
self.cyclic = curr
return
if not pwndbg.aglib.memory.is_kernel(next + self.offset):
break
yield current_object
current_object = pwndbg.aglib.memory.read_pointer_width(addr + self.offset)
if self.random:
current_object ^= self.random ^ swab(addr + self.offset)
if addr in seen:
# this can happen during exploit dev
print(
M.warn(
f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}"
)
)
if next not in self.slab or not self.is_valid_obj(next):
break
seen.add(addr)
curr = next
next = self.find_next(curr)
yield curr
seen.add(curr)
# reaching here means the freelist is not cyclic (prior to detections of other corruptions)
self.cyclic = None
def __int__(self) -> int:
return self.start_addr
def __len__(self) -> int:
seen: set[int] = set()
for addr in self:
if addr in seen:
# this can happen during exploit dev
print(
M.warn(
f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}"
)
)
break
seen.add(addr)
return len(seen)
return sum(1 for _ in self)
def find_next(self, addr: int) -> int:
freelist_iter = iter(self)
for obj in freelist_iter:
if obj == addr:
return next(freelist_iter, 0)
return 0
# assumes addr is in this freelist -> assert(addr in self)
# caller should assert this behaviour to avoid traversing the list unnecessarily
if not self.slab:
raise ValueError("slab freelist must belong to a slab")
next = pwndbg.aglib.memory.read_pointer_width(addr + self.offset)
if self.random:
next ^= self.random ^ swab(addr + self.offset)
return next
def is_valid_obj(self, addr):
if not self.slab:
return
diff = addr - self.slab.virt_address
sz = self.slab.slab_cache.size
return diff % sz == 0 and 0 <= (diff // sz) < self.slab.object_count
class SlabCache:
@ -289,11 +285,7 @@ class CpuCache:
@property
def freelist(self) -> Freelist:
return Freelist(
int(self._cpu_cache["freelist"]),
self.slab_cache.offset,
self.slab_cache.random,
)
return Freelist(int(self._cpu_cache["freelist"]), self.active_slab)
@property
def active_slab(self) -> Slab | None:
@ -301,7 +293,7 @@ class CpuCache:
_slab = self._cpu_cache[slab_key]
if not int(_slab):
return None
return Slab(_slab.dereference(), self, None)
return Slab(_slab.dereference(), cpu_cache=self, is_active=True)
@property
def partial_slabs(self) -> List[Slab]:
@ -312,7 +304,7 @@ class CpuCache:
cur_slab_int = int(cur_slab)
while cur_slab_int:
_slab = cur_slab.dereference()
partial_slabs.append(Slab(_slab, self, None, is_partial=True))
partial_slabs.append(Slab(_slab, cpu_cache=self))
cur_slab = _slab["next"]
cur_slab_int = int(cur_slab)
return partial_slabs
@ -334,7 +326,7 @@ class NodeCache:
for slab in for_each_entry(
self._node_cache["partial"], f"struct {slab_struct_type()}", "slab_list"
):
ret.append(Slab(slab.dereference(), None, self, is_partial=True))
ret.append(Slab(slab.dereference(), node_cache=self))
return ret
@property
@ -350,21 +342,19 @@ class Slab:
def __init__(
self,
slab: pwndbg.dbg_mod.Value,
cpu_cache: CpuCache | None,
node_cache: NodeCache | None,
is_partial: bool = False,
cpu_cache: CpuCache = None,
node_cache: NodeCache = None,
is_active: bool = False,
) -> None:
self._slab = slab
self.cpu_cache = cpu_cache
self.node_cache = node_cache
self.is_partial = is_partial
self.is_cpu = False
self.slab_cache = None
self.is_active = is_active
if cpu_cache is not None:
self.cpu_cache = cpu_cache
self.is_cpu = True
self.slab_cache = cpu_cache.slab_cache
assert node_cache is None
if node_cache is not None:
elif node_cache is not None:
self.node_cache = node_cache
self.is_cpu = False
self.slab_cache = node_cache.slab_cache
@property
@ -372,6 +362,7 @@ class Slab:
return int(self._slab.address)
@property
@pwndbg.lib.cache.cache_until("stop")
def virt_address(self) -> int:
return kernel.page_to_virt(self.slab_address)
@ -393,8 +384,8 @@ class Slab:
@property
def inuse(self) -> int:
inuse = int(self._slab["inuse"])
if not self.is_partial:
# I believe only the cpu freelist is considered "inuse" similar to glibc's tcache
if self.is_active:
# only the cpu freelist is considered "inuse" similar to glibc's tcache
inuse -= len(self.cpu_cache.freelist)
return inuse
@ -404,7 +395,7 @@ class Slab:
@property
def pobjects(self) -> int:
if not self.is_partial:
if self.is_active:
return 0
if self._slab.type.has_field("pobjects"):
return int(self._slab["pobjects"])
@ -416,22 +407,17 @@ class Slab:
@property
def freelist(self) -> Freelist:
return Freelist(
int(self._slab["freelist"]),
self.slab_cache.offset,
self.slab_cache.random,
)
@property
def freelists(self) -> List[Freelist]:
freelists = [self.freelist]
if not self.is_partial:
freelists.append(self.cpu_cache.freelist)
return freelists
return Freelist(int(self._slab["freelist"]), self)
@property
def free_objects(self) -> Set[int]:
return {obj for freelist in self.freelists for obj in freelist}
result = set()
for obj in self.freelist:
result.add(obj)
if self.is_active and self.cpu_cache.freelist:
for obj in self.cpu_cache.freelist:
result.add(obj)
return result
def __contains__(self, addr: int):
return self.virt_address <= addr < self.virt_address + self.slab_cache.slab_size
@ -484,6 +470,7 @@ def kmem_cache_pad_sz(kconfig) -> Tuple[int, int]:
name_off = i * 8
break
assert name_off, "can't determine kmem_cache name offset"
distance, node_cache_pad = None, None
if pwndbg.aglib.kernel.krelease() >= (6, 2) and all(
config not in kconfig
for config in (
@ -502,31 +489,36 @@ def kmem_cache_pad_sz(kconfig) -> Tuple[int, int]:
node_cache_pad = kmem_cache_node_pad_sz(
kmem_cache + name_off + 0x8 * 3
) # name ptr + 2 list ptrs
assert node_cache_pad, "can't determine kmem cache node padding size"
assert node_cache_pad, "can't find kmem_cache node"
distance = 8 if "CONFIG_SLAB_FREELIST_RANDOM" in kconfig else 0
return distance, node_cache_pad
elif "CONFIG_SLAB_FREELIST_RANDOM" in kconfig:
for i in range(3, 0x20):
ptr = kmem_cache + name_off + i * 8
val = pwndbg.aglib.memory.u64(ptr)
if pwndbg.aglib.memory.is_kernel(val):
distance = (i + 1) * 8
node_cache_pad = kmem_cache_node_pad_sz(kmem_cache + name_off + distance)
assert node_cache_pad, "can't determine kmem cache node padding size"
return distance, node_cache_pad
distance, node_cache_pad = None, None
for i in range(3, 0x20):
ptr = kmem_cache + name_off + i * 8
val = pwndbg.aglib.memory.u64(ptr - 8)
if pwndbg.aglib.memory.peek(val) is not None:
continue
val = pwndbg.aglib.memory.u64(ptr)
if pwndbg.aglib.memory.peek(val) is None:
continue
node_cache_pad = kmem_cache_node_pad_sz(val)
if node_cache_pad is not None:
distance = i * 8
break
if pwndbg.aglib.memory.is_kernel(val) and all(
pwndbg.aglib.memory.u32(val + i * 4) < 0x10000 for i in range(10)
):
_distance = (i + 1) * 8
val = pwndbg.aglib.memory.u64(kmem_cache + name_off + _distance)
node_cache_pad = kmem_cache_node_pad_sz(val)
if node_cache_pad is not None:
distance = _distance
break
assert distance, "can't find kmem_cache node"
if distance is None:
for i in range(3, 0x20):
ptr = kmem_cache + name_off + i * 8
val = pwndbg.aglib.memory.u64(ptr - 8)
if pwndbg.aglib.memory.peek(val) is not None:
continue
val = pwndbg.aglib.memory.u64(ptr)
if pwndbg.aglib.memory.peek(val) is None:
continue
node_cache_pad = kmem_cache_node_pad_sz(val)
if node_cache_pad is not None:
distance = i * 8
break
assert distance, "can't find kmem_cache node"
distance -= 0x18 # the name ptr + list_head
configs = (

@ -98,7 +98,40 @@ def slab(
slab_contains(addr)
def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None) -> None:
def emphasize(s):
return pwndbg.color.underline(pwndbg.color.bold(pwndbg.color.red(s)))
def handle_next(curr: int, freelist: Freelist, indent):
next = freelist.find_next(curr)
if next == 0:
return "no next"
desc = f"next: {indent.aux_hex(next)}"
if not pwndbg.aglib.memory.is_kernel(next + freelist.offset):
desc = emphasize("invalid address") + " " + desc
elif freelist.cyclic is not None and freelist.cyclic == curr:
desc = emphasize("cyclic list detected") + ", " + desc
elif next not in freelist.slab:
desc = emphasize("next is not within the slab") + ", " + desc
elif not freelist.is_valid_obj(next):
desc = emphasize("unaligned or out-of-range") + " " + desc
return desc
def freelist_desc(freelist: Freelist, indent):
head = int(freelist)
desc = None
if head:
if not pwndbg.aglib.memory.is_kernel(head):
desc = "invalid address"
elif head not in freelist.slab:
desc = "not within the slab"
elif not freelist.is_valid_obj(head):
desc = "unaligned or out-of-range"
return indent.addr_hex(head) + (f" [{emphasize(desc)}]" if desc else "")
def print_slab(slab: Slab, indent, verbose: bool) -> None:
indent.print(
f"- {indent.prefix('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:"
)
@ -106,12 +139,12 @@ def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None)
with indent:
indent.print(f"{indent.prefix('In-Use')}: {slab.inuse}/{slab.object_count}")
indent.print(f"{indent.prefix('Frozen')}: {slab.frozen}")
indent.print(f"{indent.prefix('Freelist')}: {indent.addr_hex(int(slab.freelist))}")
indent.print(f"{indent.prefix('Freelist')}: {freelist_desc(slab.freelist, indent)}")
idx = 0
cpu_freelist = slab.cpu_cache.freelist if slab.is_active else None
indexes = {}
freelist = slab.freelist
for addr in freelist:
for idx, addr in enumerate(freelist):
if addr in indexes:
break
indexes[addr] = idx
@ -127,28 +160,26 @@ def print_slab(slab: Slab, indent, verbose: bool, cpu_freelist: Freelist = None)
free_objects = slab.free_objects
for addr in slab.objects:
prefix = f"- {indent.prefix('[0x--]')} {hex(addr)}"
if addr in indexes:
prefix = (
f"- {indent.prefix(f'[0x{indexes[addr]:02}]')} {indent.addr_hex(addr)}"
)
if addr not in free_objects:
indent.print(f"{prefix} (in-use)")
continue
next_free = freelist.find_next(addr)
if next_free:
indent.print(f"{prefix} (next: {indent.aux_hex(next_free)})")
index = indexes[addr]
if addr in indexes:
prefix = f"- {indent.prefix(f'[0x{index:02x}]')} {indent.addr_hex(addr)}"
desc = None
in_cpu_freelist = False
if addr in freelist:
desc = handle_next(addr, freelist, indent)
elif cpu_freelist is not None and addr in cpu_freelist:
# need to traverse the list to catch potential freelist.cyclic
desc = handle_next(addr, cpu_freelist, indent)
in_cpu_freelist = True
if desc is None:
desc = "something went wrong"
if in_cpu_freelist:
indent.print(f"{prefix} ({desc}) [CPU cache]")
continue
if cpu_freelist is not None:
next_free = cpu_freelist.find_next(addr)
if next_free:
indent.print(
f"{prefix} (next: {indent.aux_hex(next_free)}) [CPU cache]"
)
continue
if addr in cpu_freelist:
indent.print(f"{prefix} (no next) [CPU cache]")
continue
indent.print(f"{prefix} (no next)")
indent.print(f"{prefix} ({desc})")
def print_cpu_cache(
@ -159,12 +190,12 @@ def print_cpu_cache(
)
with indent:
if active:
indent.print(f"{indent.prefix('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist)))
indent.print(f"{indent.prefix('Freelist')}:", freelist_desc(cpu_cache.freelist, indent))
active_slab = cpu_cache.active_slab
if active_slab:
indent.print(f"{indent.prefix('Active Slab')}:")
with indent:
print_slab(active_slab, indent, verbose, cpu_cache.freelist)
print_slab(active_slab, indent, verbose)
else:
indent.print("Active Slab: (none)")
@ -291,7 +322,7 @@ def slab_contains(address: str) -> None:
print(f"{addr:#x} @", M.hint(f"{slab_cache.name}"))
slab = slab_cache.find_containing_slab(addr)
if slab is None:
print(M.warn("Did not finding containing slab."))
print(M.warn("Did not find containing slab."))
return
desc = "[something went wrong]"
inuse = desc
@ -300,12 +331,14 @@ def slab_contains(address: str) -> None:
inuse = "free"
elif addr in slab.objects:
inuse = "in-use"
if slab.is_cpu and not slab.is_partial:
desc = f"[active, cpu {slab.cpu_cache.cpu}]"
elif slab.is_cpu and slab.is_partial:
desc = f"[partial, cpu {slab.cpu_cache.cpu}]"
elif not slab.is_cpu and slab.is_partial:
desc = f"[partial, node {slab.node_cache.node}]"
if slab.is_active:
if slab.is_cpu:
desc = f"[active, cpu {slab.cpu_cache.cpu}]"
else:
if slab.is_cpu:
desc = f"[partial, cpu {slab.cpu_cache.cpu}]"
else:
desc = f"[partial, node {slab.node_cache.node}]"
except Exception:
pass
print("slab:", M.hint(f"{hex(slab.virt_address)}"), desc)

@ -100,7 +100,7 @@ def test_command_slab_info():
pwndbg.aglib.kernel.slab.load_slab_typeinfo()
for cache in pwndbg.aglib.kernel.slab.caches():
cache_name = cache.name
res = gdb.execute(f"slab info -v {cache_name}", to_string=True)
res = gdb.execute(f"slab info {cache_name}", to_string=True)
assert cache_name in res
assert "Freelist" in res
for cpu in range(pwndbg.aglib.kernel.nproc()):

Loading…
Cancel
Save