Improving `slab` commands (#2988)

* added/modified registers for kernel pwning

* added full buddy allocator debugging support and abstracted indent context

* fixed slub info/contains bugs + improved slub print handling

* improved slab info outputs

* debugging support when hardened usercopy is enabled

* fixed slab cyclic freelist infinite loop issue

* fixed inuse for `slab list`

* supporting filter by numa node

* fixed test
pull/2989/head^2
jxuanli 7 months ago committed by GitHub
parent d0c9f690ca
commit b2465743de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -8,6 +8,7 @@ import pwndbg
import pwndbg.aglib.memory
import pwndbg.aglib.symbol
import pwndbg.aglib.typeinfo
import pwndbg.color.message as M
from pwndbg.aglib import kernel
from pwndbg.aglib.kernel.macros import compound_head
from pwndbg.aglib.kernel.macros import for_each_entry
@ -87,6 +88,7 @@ class Freelist:
self.random = random
def __iter__(self) -> Generator[int, None, None]:
seen: set[int] = set()
current_object = self.start_addr
while current_object:
addr = int(current_object)
@ -94,12 +96,32 @@ class Freelist:
current_object = pwndbg.aglib.memory.pvoid(addr + self.offset)
if self.random:
current_object ^= self.random ^ swab(addr + self.offset)
if addr in seen:
# this can happen during exploit dev
print(
M.warn(
f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}"
)
)
break
seen.add(addr)
def __int__(self) -> int:
return self.start_addr
def __len__(self) -> int:
return sum(1 for _ in self)
seen: set[int] = set()
for addr in self:
if addr in seen:
# this can happen during exploit dev
print(
M.warn(
f"Cyclic slab freelist detected at {hex(addr)} when length is {len(seen)}"
)
)
break
seen.add(addr)
return len(seen)
def find_next(self, addr: int) -> int:
freelist_iter = iter(self)
@ -177,9 +199,44 @@ class SlabCache:
def cpu_partial(self) -> int:
return int(self._slab_cache["cpu_partial"])
@property
def cpu_partial_slabs(self) -> int:
if self._slab_cache.dereference().type.has_field("cpu_partial_slabs"):
return int(self._slab_cache["cpu_partial_slabs"])
return None
@property
def min_partial(self) -> int:
return int(self._slab_cache["min_partial"])
@property
def inuse(self) -> int:
return int(self._slab_cache["inuse"])
# somewhat mirrors libslub's implementation
# looks for per_cpu active lists and per_cpu and node partial lists
# no good way to track full slabs unless CONFIG_SLUB_DEBUG is enabled
# which is typically not from what I have seen
cnt = 0
for cpu_cache in self.cpu_caches:
if cpu_cache.active_slab is not None:
cnt += cpu_cache.active_slab.inuse
for partial_slab in cpu_cache.partial_slabs:
cnt += partial_slab.inuse
for node_cache in self.node_caches:
for partial_slab in node_cache.partial_slabs:
cnt += partial_slab.inuse
return cnt
@property
def useroffset(self) -> int:
if not self._slab_cache.dereference().type.has_field("useroffset"):
return None
return int(self._slab_cache["useroffset"])
@property
def usersize(self) -> int:
if not self._slab_cache.dereference().type.has_field("usersize"):
return None
return int(self._slab_cache["usersize"])
@property
def __oo_x(self) -> int:
@ -252,6 +309,14 @@ class NodeCache:
ret.append(Slab(slab.dereference(), None, self.slab_cache, is_partial=True))
return ret
@property
def nr_partial(self) -> int:
return int(self._node_cache["nr_partial"])
@property
def min_partial(self) -> int:
return self.slab_cache.min_partial
class Slab:
def __init__(

@ -12,11 +12,13 @@ import sys
from tabulate import tabulate
import pwndbg
import pwndbg.aglib.kernel.slab
import pwndbg.color as C
import pwndbg.aglib.memory
import pwndbg.color.message as M
import pwndbg.commands
from pwndbg.aglib.kernel.slab import CpuCache
from pwndbg.aglib.kernel.slab import Freelist
from pwndbg.aglib.kernel.slab import NodeCache
from pwndbg.aglib.kernel.slab import Slab
from pwndbg.aglib.kernel.slab import find_containing_slab_cache
@ -37,14 +39,22 @@ parser_list.add_argument(
"filter_",
metavar="filter",
type=str,
default=None,
nargs="?",
help="Only show caches that contain the given filter string",
)
# TODO: --cpu, --node, --partial, --active
parser_info = subparsers.add_parser("info", prog="slab info")
parser_info.add_argument("names", metavar="name", type=str, nargs="+", help="")
parser_info.add_argument("-v", "--verbose", action="store_true", help="")
parser_info.add_argument("-c", "--cpu", type=int, default=False, help="CPU to display")
parser_info.add_argument("-n", "--node", type=int, help="")
parser_info.add_argument(
"-p", "--partial-only", action="store_true", help="only displays partial lists"
)
parser_info.add_argument(
"-a", "--active-only", action="store_true", help="only displays the active list"
)
parser_contains = subparsers.add_parser("contains", prog="slab contains")
parser_contains.add_argument("addresses", metavar="addr", type=str, nargs="+", help="")
@ -54,73 +64,121 @@ parser_contains.add_argument("addresses", metavar="addr", type=str, nargs="+", h
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWithKernelDebugSyms
@pwndbg.commands.OnlyWhenPagingEnabled
def slab(command, filter_=None, names=None, verbose=False, addresses=None) -> None:
def slab(
command,
filter_=None,
names=None,
verbose=False,
addresses=None,
cpu=None,
node=None,
partial_only=False,
active_only=False,
) -> None:
if command == "list":
slab_list(filter_)
elif command == "info":
partial, active = True, True
if partial_only and active_only:
print(M.warn("partial_only and active_only are both specified"))
return
if partial_only:
active = False
if active_only:
partial = False
for name in names:
slab_info(name, verbose)
slab_info(name, verbose, cpu, node, active, partial)
elif command == "contains":
for addr in addresses:
slab_contains(addr)
def print_slab(slab: Slab, indent, verbose: bool) -> None:
def print_slab(slab: Slab, indent, verbose: bool, freelist: Freelist = None) -> None:
indent.print(
f"- {C.green('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:"
f"- {indent.prefix('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:"
)
with indent:
indent.print(f"{C.blue('In-Use')}: {slab.inuse}/{slab.object_count}")
indent.print(f"{C.blue('Frozen')}: {slab.frozen}")
indent.print(f"{C.blue('Freelist')}: {indent.addr_hex(int(slab.freelist))}")
indent.print(f"{indent.prefix('In-Use')}: {slab.inuse}/{slab.object_count}")
indent.print(f"{indent.prefix('Frozen')}: {slab.frozen}")
indent.print(f"{indent.prefix('Freelist')}: {indent.addr_hex(int(slab.freelist))}")
idx = 0
indexes = {}
if freelist is None:
freelist = slab.freelist
for addr in freelist:
if addr in indexes:
break
indexes[addr] = idx
idx += 1
if verbose:
with indent:
free_objects = slab.free_objects
for addr in slab.objects:
index = "0x--"
if addr in indexes:
index = f"0x{indexes[addr]:02}"
prefix = f"- {indent.prefix(f'[{index}]')} {indent.addr_hex(addr)}"
if addr not in free_objects:
indent.print(f"- {addr:#x} (in-use)")
indent.print(f"{prefix} (in-use)")
continue
for freelist in slab.freelists:
next_free = freelist.find_next(addr)
if next_free:
indent.print(f"- {indent.addr_hex(addr)} (next: {next_free:#x})")
break
next_free = freelist.find_next(addr)
if next_free:
indent.print(f"{prefix} (next: {indent.aux_hex(next_free)})")
else:
indent.print(f"- {indent.addr_hex(addr)} (no next)")
indent.print(f"{prefix} (no next)")
def print_cpu_cache(cpu_cache: CpuCache, verbose: bool, indent) -> None:
def print_cpu_cache(
cpu_cache: CpuCache, verbose: bool, active: bool, partial: bool, indent
) -> None:
indent.print(
f"{C.green('kmem_cache_cpu')} @ {indent.addr_hex(cpu_cache.address)} [CPU {cpu_cache.cpu}]:"
f"{indent.prefix('kmem_cache_cpu')} @ {indent.addr_hex(cpu_cache.address)} [CPU {cpu_cache.cpu}]:"
)
with indent:
indent.print(f"{C.blue('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist)))
active_slab = cpu_cache.active_slab
if active_slab:
indent.print(f"{C.green('Active Slab')}:")
with indent:
print_slab(active_slab, indent, verbose)
else:
indent.print("Active Slab: (none)")
if active:
indent.print(f"{indent.prefix('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist)))
active_slab = cpu_cache.active_slab
if active_slab:
indent.print(f"{indent.prefix('Active Slab')}:")
with indent:
print_slab(active_slab, indent, verbose, cpu_cache.freelist)
else:
indent.print("Active Slab: (none)")
if not partial:
return
partial_slabs = cpu_cache.partial_slabs
if not partial_slabs:
indent.print("Partial Slabs: (none)")
return
slabs = partial_slabs[0].slabs
pobjects = partial_slabs[0].pobjects
cpu_partial = partial_slabs[0].slab_cache.cpu_partial
indent.print(f"{C.green('Partial Slabs')} [{slabs}] [PO: ~{pobjects}/{cpu_partial}]:")
for partial_slab in partial_slabs:
print_slab(partial_slab, indent, verbose)
# the kernel checks cpu_partial_slabs to determine whether partial slabs are to be flushed
# see: https://elixir.bootlin.com/linux/v6.13/source/mm/slub.c#L3209
cpu_partial_slabs = partial_slabs[0].slab_cache.cpu_partial_slabs
if cpu_partial_slabs is None:
# legacy
cpu_partial_slabs = partial_slabs[0].pobjects
indent.print(
f"{indent.prefix('Partial Slabs')} [nr_slabs/cpu_partial_slabs: {indent.aux_hex(slabs)}/{indent.aux_hex(cpu_partial_slabs)}]"
)
with indent:
for partial_slab in partial_slabs:
print_slab(partial_slab, indent, verbose)
def print_node_cache(node_cache: NodeCache, verbose: bool, indent) -> None:
address, nr_partial, min_partial, node = (
node_cache.address,
node_cache.nr_partial,
node_cache.min_partial,
node_cache.node,
)
# https://elixir.bootlin.com/linux/v6.13/source/mm/slub.c#L3140
indent.print(
f"{C.green('kmem_cache_node')} @ {indent.addr_hex(node_cache.address)} [NUMA node {node_cache.node}]:"
f"{indent.prefix('kmem_cache_node')} @ {indent.addr_hex(address)} [NUMA node {node}, nr_partial/min_partial: {indent.aux_hex(nr_partial)}/{indent.aux_hex(min_partial)}]:"
)
with indent:
partial_slabs = node_cache.partial_slabs
@ -128,12 +186,15 @@ def print_node_cache(node_cache: NodeCache, verbose: bool, indent) -> None:
indent.print("Partial Slabs: (none)")
return
indent.print(f"{C.green('Partial Slabs')}:")
for slab in partial_slabs:
print_slab(slab, indent, verbose)
indent.print(
f"{indent.prefix('Partial Slabs')} [nr_partial: {indent.aux_hex(len(partial_slabs))}]"
)
with indent:
for slab in partial_slabs:
print_slab(slab, indent, verbose)
def slab_info(name: str, verbose: bool) -> None:
def slab_info(name: str, verbose: bool, cpu: int, node: int, active: bool, partial: bool) -> None:
slab_cache = pwndbg.aglib.kernel.slab.get_cache(name)
if slab_cache is None:
@ -142,24 +203,40 @@ def slab_info(name: str, verbose: bool) -> None:
indent = IndentContextManager()
indent.print(f"{C.green('Slab Cache')} @ {indent.addr_hex(slab_cache.address)}")
indent.print(f"{indent.prefix('Slab Cache')} @ {indent.addr_hex(slab_cache.address)}")
with indent:
indent.print(f"{C.blue('Name')}: {slab_cache.name}")
indent.print(f"{indent.prefix('Name')}: {slab_cache.name}")
flags_list = slab_cache.flags
if flags_list:
indent.print(f"{C.blue('Flags')}: {' | '.join(flags_list)}")
indent.print(f"{indent.prefix('Flags')}: {' | '.join(flags_list)}")
else:
indent.print(f"{C.blue('Flags')}: (none)")
indent.print(f"{C.blue('Offset')}: {slab_cache.offset}")
indent.print(f"{C.blue('Size')}: {slab_cache.size}")
indent.print(f"{C.blue('Align')}: {slab_cache.align}")
indent.print(f"{C.blue('Object Size')}: {slab_cache.object_size}")
indent.print(f"{indent.prefix('Flags')}: (none)")
indent.print(f"{indent.prefix('Offset')}: {indent.aux_hex(slab_cache.offset)}")
indent.print(
f"{indent.prefix('Slab size')}: {indent.aux_hex(0x1000 << slab_cache.oo_order)}"
)
indent.print(
f"{indent.prefix('Size (without metadata)')}: {indent.aux_hex(slab_cache.size)}"
)
indent.print(f"{indent.prefix('Align')}: {indent.aux_hex(slab_cache.align)}")
indent.print(f"{indent.prefix('Object Size')}: {indent.aux_hex(slab_cache.object_size)}")
useroffset, usersize = slab_cache.useroffset, slab_cache.useroffset
if useroffset is not None and usersize is not None:
indent.print(f"{indent.prefix('Usercopy region offset')}: {useroffset}")
indent.print(f"{indent.prefix('Usercopy region size')}: {usersize}")
for cpu_cache in slab_cache.cpu_caches:
print_cpu_cache(cpu_cache, verbose, indent)
if cpu_cache.cpu is not None and cpu_cache.cpu != cpu:
continue
print_cpu_cache(cpu_cache, verbose, active, partial, indent)
if not partial:
return
for node_cache in slab_cache.node_caches:
if node is not None and node != node_cache.node:
continue
print_node_cache(node_cache, verbose, indent)
@ -190,6 +267,9 @@ def slab_contains(address: str) -> None:
print(M.error(f"Message: {e}"))
return
addr = int(parsed_addr)
slab_cache = find_containing_slab_cache(addr)
print(f"{addr:#x} @", M.hint(f"{slab_cache.name}"))
addr = int(pwndbg.aglib.memory.get_typed_pointer("void", parsed_addr))
try:
slab_cache = find_containing_slab_cache(addr)
print(f"{addr:#x} @", M.hint(f"{slab_cache.name}"))
except Exception:
print(M.warn("address does not belong to a SLUB cache"))

@ -96,7 +96,9 @@ def get_slab_object_address():
for cache in caches:
cache_name = cache.name
info = gdb.execute(f"slab info -v {cache_name}", to_string=True)
matches = re.findall(r"- (0x[0-9a-fA-F]+)", info)
ansi_escape = re.compile(r"\x1b\[[0-9;]*m")
info = ansi_escape.sub("", info)
matches = re.findall(r"- \[0x[0-9a-fA-F\-]{2}\] (0x[0-9a-fA-F]+)", info)
if len(matches) > 0:
return (matches[0], cache_name)
raise ValueError("Could not find any slab objects")

Loading…
Cancel
Save