From c07d843d686d90844b03eabc4c29e64019f2bcd1 Mon Sep 17 00:00:00 2001 From: jxuanli <65455765+jxuanli@users.noreply.github.com> Date: Tue, 20 May 2025 03:22:21 -0700 Subject: [PATCH] Adding full buddy allocator support when debugging x86-64 linux kernels (#2980) * added/modified registers for kernel pwning * added a RegisterContext class for more complex register context handling * cleaned up register context selection and flag bits * further cleaned up register context selection * fixing None deref issue * handling NoneType registers * linting * removed most of the extra register classes * fully removed extra register classes in commands/context.py * renamed var so that the linter doesn't confuse the var name with dataclass type name * some comments on newly added classes * fixed issues based on suggestions * fixed issues when debug symbols are not present in x64 kernel * added full buddy allocator debugging support and abstracted indent context * added options for pcplist * added dynamic arg checking and implemented __len__ for GDBValue * added new ParsedBuddyArgs class and THBs support and improved overall handling * handling function params using a class to cleanly pass values around such that can find free pages * added help info * added comments for newly added classes * changed cmd name and added test * added reference and linting * added docs * fixed typo * fixed quotes * supporting filter by numa node index * actually filtering by node index --- docs/commands/index.md | 2 +- docs/commands/kernel/buddydump.md | 33 ++ docs/commands/kernel/pcplist.md | 29 -- pwndbg/aglib/kernel/__init__.py | 57 ++- pwndbg/commands/__init__.py | 2 +- pwndbg/commands/buddydump.py | 347 ++++++++++++++++++ pwndbg/commands/pcplist.py | 62 ---- pwndbg/commands/slab.py | 52 +-- pwndbg/dbg/__init__.py | 6 + pwndbg/dbg/gdb/__init__.py | 9 + pwndbg/lib/exception.py | 38 ++ .../tests/system/test_commands_kernel.py | 34 +- 12 files changed, 523 insertions(+), 148 deletions(-) create mode 100644 docs/commands/kernel/buddydump.md delete mode 100644 docs/commands/kernel/pcplist.md create mode 100644 pwndbg/commands/buddydump.py delete mode 100644 pwndbg/commands/pcplist.py create mode 100644 pwndbg/lib/exception.py diff --git a/docs/commands/index.md b/docs/commands/index.md index 5beb3966b..9f7c43146 100644 --- a/docs/commands/index.md +++ b/docs/commands/index.md @@ -70,6 +70,7 @@ ## Kernel - [binder](kernel/binder.md) - Show Android Binder information +- [buddydump](kernel/buddydump.md) - Displays metadata and freelists of the buddy allocator. - [kbase](kernel/kbase.md) - Finds the kernel virtual base address. - [kchecksec](kernel/kchecksec.md) - Checks for kernel hardening configuration options. - [kcmdline](kernel/kcmdline.md) - Return the kernel commandline (/proc/cmdline). @@ -85,7 +86,6 @@ - [knft-list-tables](kernel/knft-list-tables.md) - Dump netfliter tables from a specific network namespace - [kversion](kernel/kversion.md) - Outputs the kernel version (/proc/version). - [msr](kernel/msr.md) - Read or write to Model Specific Register (MSR) -- [pcplist](kernel/pcplist.md) - Print Per-CPU page list - [slab](kernel/slab.md) - Prints information about the slab allocator ## Linux/libc/ELF diff --git a/docs/commands/kernel/buddydump.md b/docs/commands/kernel/buddydump.md new file mode 100644 index 000000000..fd91ea4a8 --- /dev/null +++ b/docs/commands/kernel/buddydump.md @@ -0,0 +1,33 @@ + + + + + +# buddydump + + +```text +usage: buddydump [-h] [-z {DMA,DMA32,Normal,HighMem,Movable,Device}] + [-o ORDER] + [-m {Unmovable,Movable,Reclaimable,HighAtomic,CMA,Isolate}] + [-p] [-c CPU] [-n NODE] [-f FIND] + +``` + +Displays metadata and freelists of the buddy allocator. +### Optional arguments + +|Short|Long|Help| +| :--- | :--- | :--- | +|-h|--help|show this help message and exit| +|-z|--zone|Displays/searches lists only in the specified zone.| +|-o|--order|Displays/searches lists only with the specified order.| +|-m|--mtype|Displays/searches lists only with the specified mtype.| +|-p|--pcp-only|Displays/searches only PCP lists.| +|-c|--cpu|CPU nr for searching PCP.| +|-n|--node| (default: 0)| +|-f|--find|The address to find in page free lists.| + + + + diff --git a/docs/commands/kernel/pcplist.md b/docs/commands/kernel/pcplist.md deleted file mode 100644 index ec04f2bd3..000000000 --- a/docs/commands/kernel/pcplist.md +++ /dev/null @@ -1,29 +0,0 @@ - - - - - -# pcplist - - -```text -usage: pcplist [-h] [zone] - -``` - -Print Per-CPU page list -### Positional arguments - -|Positional Argument|Help| -| :--- | :--- | -|zone|| - -### Optional arguments - -|Short|Long|Help| -| :--- | :--- | :--- | -|-h|--help|show this help message and exit| - - - - diff --git a/pwndbg/aglib/kernel/__init__.py b/pwndbg/aglib/kernel/__init__.py index f2299075f..4d7ac0f85 100644 --- a/pwndbg/aglib/kernel/__init__.py +++ b/pwndbg/aglib/kernel/__init__.py @@ -92,6 +92,21 @@ def nproc() -> int: return val +@requires_debug_syms(default=12) +def npcplist() -> int: + """returns NR_PCP_LISTS (https://elixir.bootlin.com/linux/v6.13/source/include/linux/mmzone.h#L671)""" + node_data = pwndbg.aglib.symbol.lookup_symbol("node_data") + zone = node_data.dereference()[0]["node_zones"][0] + # index 0 should always exist + if zone.type.has_field("per_cpu_pageset"): + lists = zone["per_cpu_pageset"]["lists"] + return len(lists) + if zone.type.has_field("pageset"): + lists = zone["pageset"]["pcp"]["lists"] + return len(lists) + return 0 + + def get_first_kernel_ro() -> pwndbg.lib.memory.Page | None: """Returns the first kernel mapping which contains the linux_banner""" base = kbase() @@ -274,6 +289,10 @@ class ArchOps(ABC): def page_to_pfn(self, page: int) -> int: raise NotImplementedError() + @property + def page_offset(self) -> int: + raise NotImplementedError() + def virt_to_pfn(self, virt: int) -> int: return phys_to_pfn(virt_to_phys(virt)) @@ -286,6 +305,9 @@ class ArchOps(ABC): def page_to_phys(self, page: int) -> int: return pfn_to_phys(page_to_pfn(page)) + def page_to_physmap(self, page: int) -> int: + return page_to_phys(page) + self.page_offset + def virt_to_page(self, virt: int) -> int: return pfn_to_page(virt_to_pfn(virt)) @@ -361,6 +383,27 @@ class i386Ops(x86Ops): class x86_64Ops(x86Ops): def __init__(self) -> None: + self.STRUCT_PAGE_SIZE = pwndbg.aglib.typeinfo.load("struct page").sizeof + self.STRUCT_PAGE_SHIFT = int(math.log2(self.STRUCT_PAGE_SIZE)) + self.phys_base = 0x1000000 + + try: + self.START_KERNEL_map = pwndbg.aglib.kernel.kbase() + except Exception: + print("WARNING: an error ocurred when retrieving kbase") + self.START_KERNEL_map = None + + if self.START_KERNEL_map is None: + # put this here in case kbase also returns None + self.START_KERNEL_map = 0xFFFFFFFF80000000 + + if pwndbg.aglib.kernel.has_debug_syms(): + # if there are debug symbols + self._PAGE_OFFSET = pwndbg.aglib.symbol.lookup_symbol_value("page_offset_base") + self.VMEMMAP_START = pwndbg.aglib.symbol.lookup_symbol_value("vmemmap_base") + if self._PAGE_OFFSET is not None and self.VMEMMAP_START is not None: + return + if self.uses_5lvl_paging(): # https://elixir.bootlin.com/linux/v6.2/source/arch/x86/include/asm/page_64_types.h#L41 self._PAGE_OFFSET = 0xFF11000000000000 @@ -372,12 +415,6 @@ class x86_64Ops(x86Ops): # https://elixir.bootlin.com/linux/v6.2/source/arch/x86/include/asm/pgtable_64_types.h#L130 self.VMEMMAP_START = 0xFFFFEA0000000000 - self.STRUCT_PAGE_SIZE = pwndbg.aglib.typeinfo.load("struct page").sizeof - self.STRUCT_PAGE_SHIFT = int(math.log2(self.STRUCT_PAGE_SIZE)) - - self.START_KERNEL_map = 0xFFFFFFFF80000000 - self.phys_base = 0x1000000 - @property def ptr_size(self) -> int: return 64 @@ -613,6 +650,14 @@ def page_to_phys(page: int) -> int: raise NotImplementedError() +def page_to_physmap(page: int) -> int: + ops = arch_ops() + if ops: + return ops.page_to_physmap(page) + else: + raise NotImplementedError() + + def virt_to_page(virt: int) -> int: ops = arch_ops() if ops: diff --git a/pwndbg/commands/__init__.py b/pwndbg/commands/__init__.py index ff19448e3..776c9b7e8 100644 --- a/pwndbg/commands/__init__.py +++ b/pwndbg/commands/__init__.py @@ -853,6 +853,7 @@ def load_commands() -> None: import pwndbg.commands.auxv import pwndbg.commands.binder import pwndbg.commands.binja + import pwndbg.commands.buddydump import pwndbg.commands.canary import pwndbg.commands.checksec import pwndbg.commands.comments @@ -894,7 +895,6 @@ def load_commands() -> None: import pwndbg.commands.onegadget import pwndbg.commands.p2p import pwndbg.commands.patch - import pwndbg.commands.pcplist import pwndbg.commands.pie import pwndbg.commands.plist import pwndbg.commands.probeleak diff --git a/pwndbg/commands/buddydump.py b/pwndbg/commands/buddydump.py new file mode 100644 index 000000000..a5a1447e5 --- /dev/null +++ b/pwndbg/commands/buddydump.py @@ -0,0 +1,347 @@ +from __future__ import annotations + +import argparse +import logging +from dataclasses import dataclass +from typing import List +from typing import Tuple + +import pwndbg +import pwndbg.aglib.memory +import pwndbg.aglib.symbol +import pwndbg.commands +from pwndbg.aglib import kernel +from pwndbg.aglib.kernel import per_cpu +from pwndbg.aglib.kernel.macros import for_each_entry +from pwndbg.commands import CommandCategory +from pwndbg.lib.exception import IndentContextManager + +log = logging.getLogger(__name__) + + +MAX_PG_FREE_LIST_STR_RESULT_CNT = 0x10 +MAX_PG_FREE_LIST_CNT = 0x1000 +NONE_TUPLE = (None, None) +# https://elixir.bootlin.com/linux/v6.13.12/source/include/linux/mmzone.h#L52 +MIGRATE_PCPTYPES = 3 + + +@dataclass +class ParsedBuddyArgs: + # stores the input options + zone: pwndbg.dbg_mod.Value | None + order: int | None + mtype: str | None + cpu: int | None + find: int | None + + +@dataclass +class CurrentBuddyParams: + # stores the current properties of the freelist being/to be traversed + # this is so that values can be cleanly passed around + sections: List[Tuple[str, str]] + indent: IndentContextManager + order: int + mtype: str | None + freelists: pwndbg.dbg_mod.Value | None + freelist: pwndbg.dbg_mod.Value | None + nr_types: int | None + found: bool + + +def cpu_limitcheck(cpu: str): + if cpu is None: + return None + nr_cpus = pwndbg.aglib.kernel.nproc() + if cpu.isdigit() and int(cpu) < nr_cpus: + return int(cpu) + raise argparse.ArgumentTypeError( + f"The --cpu option takes in a number less than nr_cpu_ids ({nr_cpus})." + ) + + +parser = argparse.ArgumentParser( + description="Displays metadata and freelists of the buddy allocator." +) +parser.add_argument( + "-z", + "--zone", + type=str, + dest="zone", + choices=["DMA", "DMA32", "Normal", "HighMem", "Movable", "Device"], + default=None, + help="Displays/searches lists only in the specified zone.", +) +parser.add_argument( + "-o", + "--order", + type=int, + dest="order", + help="Displays/searches lists only with the specified order.", +) +parser.add_argument( + "-m", + "--mtype", + type=str, + dest="mtype", + choices=["Unmovable", "Movable", "Reclaimable", "HighAtomic", "CMA", "Isolate"], + default=None, + help="Displays/searches lists only with the specified mtype.", +) +parser.add_argument( + "-p", + "--pcp-only", + action="store_true", + dest="pcp_only", + default=False, + help="Displays/searches only PCP lists.", +) +parser.add_argument( + "-c", "--cpu", type=cpu_limitcheck, dest="cpu", default=None, help="CPU nr for searching PCP." +) +parser.add_argument("-n", "--node", type=int, dest="node", default=0, help="") +parser.add_argument( + "-f", + "--find", + type=int, + dest="find", + default=None, + help="The address to find in page free lists.", +) + + +def static_str_arr(name: str) -> List[str]: + arr = pwndbg.aglib.symbol.lookup_symbol(name).dereference() + return [arr[i].string() for i in range(len(arr))] + + +def check_find(counter: int, physmap_addr: int, pba: ParsedBuddyArgs, cbp: CurrentBuddyParams): + if counter < MAX_PG_FREE_LIST_STR_RESULT_CNT and pba.find is None: + return True + if pba.find is None: + return False + start = physmap_addr + end = physmap_addr + (1 << cbp.order) + return pba.find >= start and pba.find < end + + +def traverse_pglist( + pba: ParsedBuddyArgs, cbp: CurrentBuddyParams +) -> Tuple[List[Tuple[int, str]], int, List[str]]: + freelist = cbp.freelist + if freelist is None or int(freelist["next"]) == 0: + return None, 0, None + indent = cbp.indent + seen_pages = set() + results = [] + counter = 0 + msgs = [] + for e in for_each_entry(freelist, "struct page", "lru"): + page = int(e) + phys_addr = pwndbg.aglib.kernel.page_to_phys(page) + physmap_addr = pwndbg.aglib.kernel.page_to_physmap(page) + if check_find(counter, physmap_addr, pba, cbp): + results.append( + ( + counter, + f"{indent.addr_hex(physmap_addr)} [page: {indent.aux_hex(page)}, phys: {indent.aux_hex(phys_addr)}]", + ) + ) + cbp.found = True + if counter == MAX_PG_FREE_LIST_STR_RESULT_CNT: + msgs.append(f"{indent.prefix('... (truncated)')}") + msgs.append( + f"This doubly linked list reached size {indent.aux_hex(MAX_PG_FREE_LIST_STR_RESULT_CNT)}" + ) + counter += 1 + if page in seen_pages: + msgs.append(f"Cyclic doubly linked list detected: {results[-1]}") + break + seen_pages.add(page) + if counter == MAX_PG_FREE_LIST_CNT: + msgs.append( + f"This doubly link list exceeds size {indent.aux_hex(MAX_PG_FREE_LIST_CNT)}" + ) + break + return results, counter, msgs + + +def print_section(section: Tuple[str, str], indent: IndentContextManager): + prefix, desc = section + if prefix is not None: + title = indent.prefix(prefix) + if desc is not None: + title = f"{title} ({desc}):" + indent.print(title) + + +def print_pglist(pba: ParsedBuddyArgs, cbp: CurrentBuddyParams): + sections, indent = cbp.sections, cbp.indent + if len(sections) != 3: + log.warning(f"The number ({len(sections)}) of sections is not 2!") + return + results, counter, msgs = traverse_pglist(pba, cbp) + if not results or len(results) == 0 or counter == 0: + return + print_section(sections[0], indent) + sections[0] = NONE_TUPLE # so that the header info is not reprinted + with indent: + print_section(sections[1], indent) + sections[1] = NONE_TUPLE + with indent: + print_section(sections[2], indent) + sections[2] = NONE_TUPLE + with indent: + indent.print( + f"- {indent.prefix(cbp.mtype)} (contains {indent.aux_hex(counter)} elements)" + ) + with indent: + for i, result in results: + indent.print(indent.prefix(f"[0x{i:02x}] ") + result) + if msgs is not None: + for msg in msgs: + indent.print(msg) + print() + + +def print_mtypes(pba: ParsedBuddyArgs, cbp: CurrentBuddyParams): + freelists, nr_types = cbp.freelists, cbp.nr_types + mtypes = static_str_arr("migratetype_names") + if nr_types is None: + nr_types = len(mtypes) + for i in range(nr_types): + cbp.mtype = mtypes[i] + if pba.mtype is not None and cbp.mtype != pba.mtype: + continue + cbp.freelist = freelists[i] + print_pglist(pba, cbp) + + +def print_pcp_set(pba: ParsedBuddyArgs, cbp: CurrentBuddyParams): + pcp = None + pcp_lists = None + if pba.zone.type.has_field("per_cpu_pageset"): + pcp = per_cpu(pba.zone["per_cpu_pageset"], pba.cpu) + pcp_lists = pcp["lists"] + cbp.sections[1] = ( + "per_cpu_pageset", + f"number of pages {cbp.indent.aux_hex(int(pcp['count']))}", + ) + elif pba.zone.type.has_field("pageset"): + pcp = per_cpu(pba.zone["pageset"], pba.cpu) + pcp_lists = pcp["pcp"]["lists"] + cbp.sections[1] = ("per_cpu_pageset", None) + if pcp is None or pcp_lists is None: + log.warning("cannot find pcplist") + return + nr_pcp_lists = pwndbg.aglib.kernel.npcplist() + for i in range(0, nr_pcp_lists, MIGRATE_PCPTYPES): + # https://elixir.bootlin.com/linux/v6.13.12/source/include/linux/mmzone.h#L660 + order = i // MIGRATE_PCPTYPES + if pba.order is not None and pba.order != order: + continue + cbp.freelists = pcp_lists[order * MIGRATE_PCPTYPES].address + cbp.nr_types = MIGRATE_PCPTYPES + if order == 4: + # https://elixir.bootlin.com/linux/v6.13/source/arch/x86/include/asm/page_types.h#L20 + order = 21 - 12 # HPAGE_SHIFT - PAGE_SHIFT + cbp.nr_types = nr_pcp_lists % MIGRATE_PCPTYPES + cbp.sections[2] = ( + f"Order {order}", + f"size: {cbp.indent.aux_hex(0x1000 * (1 << order))}", + ) + cbp.order = order + print_mtypes(pba, cbp) + + +def print_free_area(pba: ParsedBuddyArgs, cbp: CurrentBuddyParams): + free_area = pba.zone["free_area"] + cbp.sections[1] = ("free_area", None) + for order in range(len(free_area)): + if pba.order is not None and pba.order != order: + continue + cbp.freelists = free_area[order]["free_list"] + nr_free = int(free_area[order]["nr_free"]) + cbp.sections[2] = ( + f"Order {order}", + f"nr_free: {cbp.indent.aux_hex(nr_free)}, size: {cbp.indent.aux_hex(0x1000 * (1 << order))}", + ) + cbp.order = order + print_mtypes(pba, cbp) + + +""" +Based off https://github.com/bata24/gef and https://elixir.bootlin.com/linux/v6.13/source + +Simplified visualization from bata24/gef: + ++-node_data[MAX_NUMNODES]-+ +| *pglist_data (node 0) |--+ +| *pglist_data (node 1) | | +| *pglist_data (node 2) | | +| ... | | ++-------------------------+ | + | ++----------------------------+ +| +v ++-pglist_data------------------------------+ +| node_zones[MAX_NR_ZONES] | +| +-node_zones[0]----------------------+ | +--->+-per_cpu_pages--------+ +| | ... | | | | ... | +| | per_cpu_pageset |-----+ | lists[NR_PCP_LISTS] | +-page-----+ +| | ... | | | +-lists[0]-------+ | | flags | +| | name | | | | next |----->| lru.next |->..." +| | ... | | | | prev | | | lru.prev | +| | free_area[MAX_ORDER] | | | +-lists[1]-------+ | | ... | +| | +-free_area[0]----------------+ | | | | ... | | +----------+ +| | | free_list[MIGRATE_TYPES] | | | | +----------------+ | +| | | +-free_list[0]----------+ | | | +----------------------+ +| | | | next |---------+ +| | | | prev | | | | | +| | | +-free_list[1]----------+ | | | | +-page-----+ +-page-----+ +-page-----+ +| | | | ... | | | | | | flags | | flags | | flags | +| | | +-----------------------+ | | | +--->| lru.next |--->| lru.next |--->| lru.next |->..." +| | | nr_free | | | | lru.prev | | lru.prev | | lru.prev | +| | +-free_area[1]----------------+ | | | ... | | ... | | ... | +| | | ... | | | +----------+ +----------+ +----------+ +| | +-----------------------------+ | | +| +-node_zones[1]----------------------+ | +| | ... | | +| +------------------------------------+ | +| ... | ++------------------------------------------+ +""" + + +@pwndbg.commands.Command(parser, category=CommandCategory.KERNEL) +@pwndbg.commands.OnlyWhenQemuKernel +@pwndbg.commands.OnlyWithKernelDebugSyms +@pwndbg.commands.OnlyWhenPagingEnabled +def buddydump( + zone: str, pcp_only: bool, order: int, mtype: str, cpu: int, node: int, find: int +) -> None: + node_data = pwndbg.aglib.symbol.lookup_symbol("node_data") + if not node_data: + log.warning("WARNING: Symbol 'node_data' not found") + return + pba = ParsedBuddyArgs(None, order, mtype, cpu, find) + cbp = CurrentBuddyParams( + [NONE_TUPLE] * 3, IndentContextManager(), None, None, None, None, None, False + ) + for node_idx in range(kernel.num_numa_nodes()): + # only display one node per invocation is probably sufficient under most use cases + if node is not None and node_idx != node: + continue + zones = node_data.dereference()[node_idx]["node_zones"] + for i, name in enumerate(static_str_arr("zone_names")): + if zone is not None and zone != name: + continue + pba.zone = zones[i] + cbp.sections[0] = (f"Zone {name}", None) + print_pcp_set(pba, cbp) + if not pcp_only: + print_free_area(pba, cbp) + if not cbp.found: + log.warning("No free pages with specified filters found.") diff --git a/pwndbg/commands/pcplist.py b/pwndbg/commands/pcplist.py deleted file mode 100644 index f2afa005e..000000000 --- a/pwndbg/commands/pcplist.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import annotations - -import argparse -import logging - -import pwndbg -import pwndbg.aglib.memory -import pwndbg.aglib.symbol -import pwndbg.commands -from pwndbg.aglib.kernel import per_cpu -from pwndbg.aglib.kernel.macros import for_each_entry -from pwndbg.commands import CommandCategory - -log = logging.getLogger(__name__) - -parser = argparse.ArgumentParser(description="Print Per-CPU page list") - -parser.add_argument("zone", type=int, nargs="?", help="") -# parser.add_argument("list_num", type=int, help="") - - -def print_zone(zone: int, list_num=None) -> None: - contig_value = pwndbg.aglib.symbol.lookup_symbol("contig_page_data") - if not contig_value: - print("WARNING: Symbol 'contig_page_data' not found") - return - - print(f"Zone {zone}") - contig_value = contig_value.dereference() - pageset_addr = per_cpu(contig_value["node_zones"][zone]["pageset"]) - pageset = pwndbg.aglib.memory.get_typed_pointer_value("struct per_cpu_pageset", pageset_addr) - pcp = pageset["pcp"] - print("count: ", int(pcp["count"])) - print("high: ", int(pcp["high"])) - print("") - for i in range(4): - print(f"pcp.lists[{i}]:") - - count = 0 - for e in for_each_entry(pcp["lists"][i], "struct page", "lru"): - count += 1 - print(e.value_to_human_readable()) - - if count == 0: - print("EMPTY") - else: - print(f"{count} entries") - - print("") - - -@pwndbg.commands.Command(parser, category=CommandCategory.KERNEL) -@pwndbg.commands.OnlyWhenQemuKernel -@pwndbg.commands.OnlyWithKernelDebugSyms -@pwndbg.commands.OnlyWhenPagingEnabled -def pcplist(zone: int = None, list_num: int = None) -> None: - log.warning("This command is a work in progress and may not work as expected.") - if zone: - print_zone(zone, list_num) - else: - for i in range(3): - print_zone(i) diff --git a/pwndbg/commands/slab.py b/pwndbg/commands/slab.py index 74c5776a7..eb413365f 100644 --- a/pwndbg/commands/slab.py +++ b/pwndbg/commands/slab.py @@ -9,9 +9,6 @@ from __future__ import annotations import argparse import sys -from types import TracebackType -from typing import Optional -from typing import Type from tabulate import tabulate @@ -24,6 +21,7 @@ from pwndbg.aglib.kernel.slab import NodeCache from pwndbg.aglib.kernel.slab import Slab from pwndbg.aglib.kernel.slab import find_containing_slab_cache from pwndbg.commands import CommandCategory +from pwndbg.lib.exception import IndentContextManager parser = argparse.ArgumentParser(description="Prints information about the slab allocator") subparsers = parser.add_subparsers(dest="command") @@ -67,41 +65,15 @@ def slab(command, filter_=None, names=None, verbose=False, addresses=None) -> No slab_contains(addr) -class IndentContextManager: - def __init__(self) -> None: - self.indent = 0 - - def __enter__(self) -> None: - self.indent += 1 - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.indent -= 1 - assert self.indent >= 0 - - def print(self, *a, **kw) -> None: - print(" " * self.indent, *a, **kw) - - -def _yx(val: int) -> str: - return C.yellow(hex(val)) - - -def _rx(val: int) -> str: - return C.red(hex(val)) - - def print_slab(slab: Slab, indent, verbose: bool) -> None: - indent.print(f"- {C.green('Slab')} @ {_yx(slab.virt_address)} [{_rx(slab.slab_address)}]:") + indent.print( + f"- {C.green('Slab')} @ {indent.addr_hex(slab.virt_address)} [{indent.aux_hex(slab.slab_address)}]:" + ) with indent: indent.print(f"{C.blue('In-Use')}: {slab.inuse}/{slab.object_count}") indent.print(f"{C.blue('Frozen')}: {slab.frozen}") - indent.print(f"{C.blue('Freelist')}: {_yx(int(slab.freelist))}") + indent.print(f"{C.blue('Freelist')}: {indent.addr_hex(int(slab.freelist))}") if verbose: with indent: @@ -113,16 +85,18 @@ def print_slab(slab: Slab, indent, verbose: bool) -> None: for freelist in slab.freelists: next_free = freelist.find_next(addr) if next_free: - indent.print(f"- {_yx(addr)} (next: {next_free:#x})") + indent.print(f"- {indent.addr_hex(addr)} (next: {next_free:#x})") break else: - indent.print(f"- {_yx(addr)} (no next)") + indent.print(f"- {indent.addr_hex(addr)} (no next)") def print_cpu_cache(cpu_cache: CpuCache, verbose: bool, indent) -> None: - indent.print(f"{C.green('kmem_cache_cpu')} @ {_yx(cpu_cache.address)} [CPU {cpu_cache.cpu}]:") + indent.print( + f"{C.green('kmem_cache_cpu')} @ {indent.addr_hex(cpu_cache.address)} [CPU {cpu_cache.cpu}]:" + ) with indent: - indent.print(f"{C.blue('Freelist')}:", _yx(int(cpu_cache.freelist))) + indent.print(f"{C.blue('Freelist')}:", indent.addr_hex(int(cpu_cache.freelist))) active_slab = cpu_cache.active_slab if active_slab: @@ -146,7 +120,7 @@ def print_cpu_cache(cpu_cache: CpuCache, verbose: bool, indent) -> None: def print_node_cache(node_cache: NodeCache, verbose: bool, indent) -> None: indent.print( - f"{C.green('kmem_cache_node')} @ {_yx(node_cache.address)} [NUMA node {node_cache.node}]:" + f"{C.green('kmem_cache_node')} @ {indent.addr_hex(node_cache.address)} [NUMA node {node_cache.node}]:" ) with indent: partial_slabs = node_cache.partial_slabs @@ -168,7 +142,7 @@ def slab_info(name: str, verbose: bool) -> None: indent = IndentContextManager() - indent.print(f"{C.green('Slab Cache')} @ {_yx(slab_cache.address)}") + indent.print(f"{C.green('Slab Cache')} @ {indent.addr_hex(slab_cache.address)}") with indent: indent.print(f"{C.blue('Name')}: {slab_cache.name}") flags_list = slab_cache.flags diff --git a/pwndbg/dbg/__init__.py b/pwndbg/dbg/__init__.py index 36e657e3a..b4add5a21 100644 --- a/pwndbg/dbg/__init__.py +++ b/pwndbg/dbg/__init__.py @@ -955,6 +955,12 @@ class Value: """ raise NotImplementedError() + def __len__(self) -> int: + """ + Return len(self). + """ + raise NotImplementedError() + class CommandHandle: """ diff --git a/pwndbg/dbg/gdb/__init__.py b/pwndbg/dbg/gdb/__init__.py index b560e57d5..b392c81ff 100644 --- a/pwndbg/dbg/gdb/__init__.py +++ b/pwndbg/dbg/gdb/__init__.py @@ -1271,6 +1271,15 @@ class GDBValue(pwndbg.dbg_mod.Value): except gdb.error as e: raise pwndbg.dbg_mod.Error(e) + @override + def __len__(self): + try: + if self.type.code == pwndbg.dbg_mod.TypeCode.ARRAY: + return self.type.sizeof // self.type.target().sizeof + return self.type.sizeof + except gdb.error as e: + raise pwndbg.dbg_mod.Error(e) + def _gdb_event_class_from_event_type(ty: pwndbg.dbg_mod.EventType) -> Any: """ diff --git a/pwndbg/lib/exception.py b/pwndbg/lib/exception.py new file mode 100644 index 000000000..07f90f24e --- /dev/null +++ b/pwndbg/lib/exception.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from types import TracebackType +from typing import Optional +from typing import Type + +import pwndbg.color as C + + +class IndentContextManager: + def __init__(self) -> None: + self.indent = 0 + + def __enter__(self) -> None: + self.indent += 1 + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.indent -= 1 + assert self.indent >= 0 + + def print(self, *a, **kw) -> None: + print(" " * self.indent, *a, **kw) + + def addr_hex(self, val: int) -> str: + return C.yellow(hex(val)) + + def aux_hex(self, val: int) -> str: + return C.red(hex(val)) + + def prefix(self, s: str): + if self.indent % 2 == 0: + return C.blue(s) + return C.green(s) diff --git a/tests/qemu-tests/tests/system/test_commands_kernel.py b/tests/qemu-tests/tests/system/test_commands_kernel.py index fad40e68a..b72dafbb0 100644 --- a/tests/qemu-tests/tests/system/test_commands_kernel.py +++ b/tests/qemu-tests/tests/system/test_commands_kernel.py @@ -74,12 +74,12 @@ def test_command_slab_contains(): assert f"{addr} @ {slab_cache}" in res +@pytest.mark.skipif( + pwndbg.aglib.arch.name not in ["x86", "x86-64"], + reason="function page_offset is only implemented for x86", +) def test_x64_extra_registers_under_kernel_mode(): res = gdb.execute("context", to_string=True) - if "RAX" not in res or "RSP" not in res: - # we are not debugging x64 - # there's probably a better way to check this but good enough - return for reg in ["cr0", "cr3", "cr4", "fs_base", "gs_base", "efer", "ss", "cs"]: assert reg.upper() in res # those are the most important ones, and their presence should indicate it's working as intended @@ -102,19 +102,21 @@ def get_slab_object_address(): raise ValueError("Could not find any slab objects") +@pytest.mark.skipif( + pwndbg.aglib.arch.name not in ["x86", "x86-64"], + reason="Unsupported architecture: msr tests only work on x86 and x86-64", +) def test_command_msr_read(): - if pwndbg.aglib.arch.name not in ["x86", "x86-64"]: - pytest.skip("Unsupported architecture: msr tests only work on x86 and x86-64") - msr_lstar_literal = int(gdb.execute("msr MSR_LSTAR", to_string=True).split(":\t")[1], 16) msr_lstar = int(gdb.execute("msr 0xc0000082", to_string=True).split(":\t")[1], 16) assert msr_lstar == msr_lstar_literal +@pytest.mark.skipif( + pwndbg.aglib.arch.name not in ["x86", "x86-64"], + reason="Unsupported architecture: msr tests only work on x86 and x86-64", +) def test_command_msr_write(): - if pwndbg.aglib.arch.name not in ["x86", "x86-64"]: - pytest.skip("Unsupported architecture: msr tests only work on x86 and x86-64") - prev_msr_lstar = int(gdb.execute("msr MSR_LSTAR", to_string=True).split(":\t")[1], 16) new_val = 0x4141414142424242 @@ -122,3 +124,15 @@ def test_command_msr_write(): new_msr_lstar = int(gdb.execute("msr 0xc0000082", to_string=True).split(":\t")[1], 16) assert new_msr_lstar == new_val gdb.execute(f"msr MSR_LSTAR -w {prev_msr_lstar}") + + +@pytest.mark.skipif(not pwndbg.aglib.kernel.has_debug_syms(), reason="test requires debug symbols") +@pytest.mark.skipif( + pwndbg.aglib.arch.name not in ["x86", "x86-64"], + reason="function page_offset is only implemented for x86", +) +def test_command_buddydump(): + res = gdb.execute("buddydump", to_string=True) + assert ( + "Order" in res and "Zone" in res and ("per_cpu_pageset" in res or "free_area" in res) + ) or res == "WARNING: Symbol 'node_data' not found\n"