From b49b95487e08b71f07094a55c1aad8fb1e14596a Mon Sep 17 00:00:00 2001 From: jxuanli <65455765+jxuanli@users.noreply.github.com> Date: Fri, 12 Dec 2025 19:04:55 -0800 Subject: [PATCH] Scanning page tables but on steroids (#3394) * from stash * added pagetable scan func * abstracted to a class * finished the pagescan func * pagetable scan aarch64 * added if * refactored kernel/vmmap * Squashed commit of the following: commit cf4a658a7002daa0c43dea5a20b2853bc884b4de Author: jxuanli Date: Wed Nov 5 13:06:47 2025 -0800 addressed comments commit 32f46afb9246ffaf6409d9945e46ce7b846b1d60 Author: jxuanli Date: Wed Nov 5 13:03:45 2025 -0800 handle none value commit b958d620552abb724379b0755ae5f55267f5fec9 Author: jxuanli Date: Wed Nov 5 12:41:56 2025 -0800 addressed comments commit ac34ad5d376b1f53b1b0abf41d3bef08266338ca Author: jxuanli Date: Wed Nov 5 11:58:53 2025 -0800 further optimizations commit 6fde305fbccf760740c0397f981b2bda1a1b1351 Author: jxuanli Date: Wed Nov 5 00:57:07 2025 -0800 fixing tests commit 4077f7a7f36a8dac977825314fa51c34f06b1fde Author: jxuanli Date: Wed Nov 5 00:39:46 2025 -0800 restore commit 1e62c62c42cc3bd2469cfa4468268304b15d8b8a Merge: 696dc6d0 30299571 Author: jxuanli Date: Wed Nov 5 00:38:57 2025 -0800 Merge branch 'dev' of https://github.com/pwndbg/pwndbg into vmmap-opt commit 696dc6d0c39e37775a5a885f9cff586ae1412cfb Author: jxuanli Date: Wed Nov 5 00:27:09 2025 -0800 fix x64 vmmap perf issue * using physmem mode * Squashed commit of the following: commit 3d7bd9b7845d6b906c8cb1f6c7c2683371f81f92 Author: jxuanli Date: Thu Nov 6 12:29:33 2025 -0800 handle `kbase == None` commit 17979d4c1a439a1b41b4c265bd122ea3c6990b18 Author: jxuanli <65455765+jxuanli@users.noreply.github.com> Date: Thu Nov 6 07:16:43 2025 -0800 Tracing kernel memory management (#3379) * added kmemtrace class * added ret trace handler * added lldb ret trace handler * making the output more colourful * added the actual command * storing output * temp suspend ctx output * tracing with mutex * add option to only trace relevant allocations and frees * cleaned up * renaming * docs * format * refactored + addressing comments commit 03dfc4d929ff50c1b301521fbe1801549fe0f896 Author: jxuanli <65455765+jxuanli@users.noreply.github.com> Date: Thu Nov 6 07:15:31 2025 -0800 Fixing `kernel_vmmap` perf issue when symbol file is not added (#3390) * fix x64 vmmap perf issue * restore * fixing tests * further optimizations * addressed comments * handle none value * addressed comments commit 0488970cfdedf3250ba998dbbcf49c2b6ed69529 Author: OBarronCS <55004530+OBarronCS@users.noreply.github.com> Date: Thu Nov 6 10:14:01 2025 -0500 Update one-liner install (#3389) * opt * more optimizations * docs * comments * revert back * opt * allow debugging the vmmap of different tasks * docs and comments * catching pagetable scan errors * checks for kernelland pc * stuff * changed access * defaulting riscv handling * fixing kconfig * fixing disass * fixing pagetable scan x kcurrent --set * del check * proper cache invalidation * moved pagewalk def * caching * updated test * cleaning up * cleaning up * improving nearpc * making linter complain less * cleaning up * cleaning up * cleaning up * cleaning up --- docs/commands/kernel/kcurrent.md | 2 +- docs/configuration/config.md | 5 +- pwndbg/aglib/kernel/__init__.py | 15 +- pwndbg/aglib/kernel/paging.py | 462 ++++++++++++------ pwndbg/aglib/kernel/symbol.py | 2 +- pwndbg/aglib/kernel/vmmap.py | 61 ++- pwndbg/aglib/memory.py | 4 +- pwndbg/aglib/nearpc.py | 5 +- pwndbg/commands/kcurrent.py | 6 +- pwndbg/commands/paging.py | 2 +- pwndbg/lib/memory.py | 19 +- .../qemu_system/tests/test_commands_kernel.py | 20 +- 12 files changed, 404 insertions(+), 199 deletions(-) diff --git a/docs/commands/kernel/kcurrent.md b/docs/commands/kernel/kcurrent.md index 47c9fa37c..42d4f59e1 100644 --- a/docs/commands/kernel/kcurrent.md +++ b/docs/commands/kernel/kcurrent.md @@ -19,7 +19,7 @@ Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == N |Short|Long|Help| | :--- | :--- | :--- | |-h|--help|show this help message and exit| -||--set|sets the kernel task used for supported pwndbg commands (kfile, pagewalk), this option does not change internal mem (purely effects how certain commands behaves)| +||--set|sets the kernel task used for supported pwndbg commands (kfile, pagewalk, vmmap), this option does not change internal mem (purely effects how certain commands behaves)| diff --git a/docs/configuration/config.md b/docs/configuration/config.md index d5f56ab4b..77d4b4d34 100644 --- a/docs/configuration/config.md +++ b/docs/configuration/config.md @@ -752,14 +752,15 @@ The method to get vmmap information when debugging via QEMU kernel. Values explained: -+ `page-tables` - read /proc/$qemu-pid/mem to parse kernel page tables to render vmmap ++ `page-tables` - walk page tables to render vmmap ++ `pt-dump` - read /proc/$qemu-pid/mem to parse kernel page tables to render vmmap + `monitor` - use QEMU's `monitor info mem` to render vmmap + `none` - disable vmmap rendering; useful if rendering is particularly slow Note that the page-tables method will require the QEMU kernel process to be on the same machine and within the same PID namespace. Running QEMU kernel and GDB in different Docker containers will not work. Consider running both containers with --pid=host (meaning they will see and so be able to interact with all processes on the machine). **Default:** 'page-tables' -**Valid values:** 'page-tables', 'monitor', 'none' +**Valid values:** 'page-tables', 'pt-dump', 'monitor', 'none' ---------- diff --git a/pwndbg/aglib/kernel/__init__.py b/pwndbg/aglib/kernel/__init__.py index 42c58aec6..0c0ce2eb2 100644 --- a/pwndbg/aglib/kernel/__init__.py +++ b/pwndbg/aglib/kernel/__init__.py @@ -160,7 +160,11 @@ def kconfig() -> pwndbg.lib.kernel.kconfig.Kconfig | None: if result is not None: config_start = result + len("IKCFG_ST") config_end = next(pwndbg.search.search(b"IKCFG_ED", start=config_start), None) - if config_start is None or config_end is None: + if ( + not pwndbg.aglib.memory.is_kernel(config_start) + or not pwndbg.aglib.memory.is_kernel(config_end) + or config_start >= config_end + ): _kconfig = pwndbg.lib.kernel.kconfig.Kconfig(None) return _kconfig @@ -623,6 +627,15 @@ def pagewalk(addr, entry=None) -> Tuple[PageTableLevel, ...]: raise NotImplementedError() +@pwndbg.lib.cache.cache_until("stop") +def pagetable_scan(entry=None) -> Tuple[pwndbg.lib.memory.Page, ...]: + pi = arch_paginginfo() + if pi: + return tuple(pi.pagetable_scan(entry)) + else: + raise NotImplementedError() + + def paging_enabled() -> bool: arch_name = pwndbg.aglib.arch.name if arch_name == "i386": diff --git a/pwndbg/aglib/kernel/paging.py b/pwndbg/aglib/kernel/paging.py index f6f54fcd5..61a31280d 100644 --- a/pwndbg/aglib/kernel/paging.py +++ b/pwndbg/aglib/kernel/paging.py @@ -2,8 +2,10 @@ from __future__ import annotations import math import re +import struct from dataclasses import dataclass from typing import Dict +from typing import List from typing import Tuple import pwndbg @@ -12,11 +14,11 @@ import pwndbg.aglib.memory import pwndbg.aglib.symbol import pwndbg.aglib.typeinfo import pwndbg.aglib.vmmap_custom -import pwndbg.color.message as M import pwndbg.lib.cache import pwndbg.lib.memory import pwndbg.lib.regs from pwndbg.aglib.kernel.vmmap import kernel_vmmap_pages +from pwndbg.lib.memory import Page from pwndbg.lib.regs import BitFlags # don't return None but rather an invalid value for address markers @@ -25,7 +27,7 @@ INVALID_ADDR = 1 << 64 @pwndbg.lib.cache.cache_until("stop") -def first_kernel_page_start(): +def first_kernel_page_start() -> int: for page in kernel_vmmap_pages(): if page.start and pwndbg.aglib.memory.is_kernel(page.start): return page.start @@ -40,6 +42,126 @@ class PageTableLevel: idx: int +class PageTableScan: + MAX_SAME_PG_TABLE_ENTRY = 0x10 + + # is_kernel is used only for Aarch64 + def __init__(self, pi: ArchPagingInfo, is_kernel: bool) -> None: + # from ArchPagingInfo: + self.paging_level = pi.paging_level + self.PAGE_ENTRY_MASK = pi.PAGE_ENTRY_MASK + self.PAGE_INDEX_LEN = pi.PAGE_INDEX_LEN + self.PAGE_INDEX_MASK = pi.PAGE_INDEX_MASK + self.page_shift = pi.page_shift + self.pageentry_flags = pi.pageentry_flags + self.should_stop_pagewalk = pi.should_stop_pagewalk + # for scanning + self.result: List[Page] = [] + self.pagesz = 1 << self.page_shift + self.counters: Dict[int, int] = {} + self.ptrsize = pwndbg.aglib.arch.ptrsize + self.inf = pwndbg.dbg.selected_inferior() + self.fmt = "<" + ("Q" if self.ptrsize == 8 else "I") * (self.pagesz // self.ptrsize) + self.cache: Dict[int, List[int]] = {} + # below are info relating to the current page chunks being coalesced + self.level_idxes = [0] * (self.paging_level + 1) + self.curr = None + self.is_kernel = is_kernel + self.arch = pwndbg.aglib.arch.name + + def scan(self, entry: int, level_remaining: int) -> None: + # this needs to be EXTREMELY optimized as it is used to display context + # making as few functions calls or memory reads as possible + # avoid unnecessary python pointer deferences or repetative computations whenever possible + # on average takes less than 0.09 seconds to complete for x64 and 0.12 for aarch64 + # around 25% of the time is used to read qemu system memory + # in comparison, gdb-pt-dump takes ~0.12 for x64 and a few seconds for aarch64 + # --> 25% speed up for x64 and more than 10x speed up for aarch64 + pagesz = self.pagesz + addr = entry & self.PAGE_ENTRY_MASK + entries = self.cache.get(addr, None) + if not entries: + self.cache[addr] = entries = struct.unpack(self.fmt, self.inf.read_memory(addr, pagesz)) + for i, entry in enumerate(entries): + if entry == 0: + if self.curr: + self.result.append(self.curr) + self.curr = None + elif level_remaining == 1 or self.should_stop_pagewalk(entry): + curr = self.curr + cnt = self.counters.get(entry, 0) + if cnt > self.MAX_SAME_PG_TABLE_ENTRY and not curr: + continue + + self.counters[entry] = cnt + 1 + flags = self.pageentry_flags(entry) + if flags == 0: # only append present pages + continue + + # len(entries) == self.pagesz // self.ptrsize, try not to do division here + size = pagesz * (len(entries) ** (level_remaining - 1)) + if curr: + if flags != 0 and flags == curr.flags: + curr.memsz += size + continue + self.result.append(curr) + self.curr = None + + # creating a new page + self.level_idxes[level_remaining] = i + match self.arch: + case "x86-64": + bit = self.level_idxes[-1] >> (self.PAGE_INDEX_LEN - 1) # highest bit + case "aarch64": + bit = 1 if self.is_kernel else 0 + case _: + raise NotImplementedError() + nbits = self.ptrsize * 8 - ( + self.paging_level * self.PAGE_INDEX_LEN + self.page_shift + ) + addr = bit * ((1 << nbits) - 1) + for i in range(self.paging_level, 0, -1): + addr <<= self.PAGE_INDEX_LEN + addr += 0 if i < level_remaining else self.level_idxes[i] + addr <<= self.page_shift + self.curr = Page(addr, size, flags, 0) + else: # only call when should keep scanning the page tree + self.level_idxes[level_remaining] = i + # we need to reduce this recursive call as much as possible + # each time the level_remaining decremented, garanteed to terminate + self.scan(entry, level_remaining - 1) + if level_remaining == self.paging_level and self.curr: + self.result.append(self.curr) + self.curr = None + + def walk(self, target: int, entry: int) -> List[PageTableLevel]: + page_shift = self.page_shift + result = [PageTableLevel(None, None, None, None) for _ in range(self.paging_level + 1)] + resolved = offset_mask = None + for i in range(self.paging_level, 0, -1): + resolved = None + shift = page_shift + self.PAGE_INDEX_LEN * (i - 1) + idx = (target >> shift) & self.PAGE_INDEX_MASK + addr = entry & self.PAGE_ENTRY_MASK + if addr not in self.cache: + break + entry = self.cache[addr][idx] + if not entry: + break + result[i].virt = addr # phys addr at this point + result[i].idx = idx + result[i].entry = entry + offset_mask = (1 << shift) - 1 + resolved = (entry & self.PAGE_ENTRY_MASK, offset_mask) + if self.should_stop_pagewalk(entry): + break + if resolved and offset_mask is not None: + addr, offset_mask = resolved + result[0].virt = addr + (target & offset_mask) + result[0].entry = entry + return result + + class ArchPagingInfo: USERLAND = "userland" KERNELLAND = "kernel [.text]" @@ -51,15 +173,9 @@ class ArchPagingInfo: VMALLOC = "vmalloc" VMEMMAP = "vmemmap" - addr_marker_sz: int - va_bits: int - pagetable_cache: Dict[pwndbg.dbg_mod.Value, Dict[int, int]] = {} - pagetableptr_cache: Dict[int, pwndbg.dbg_mod.Value] = {} - pagetable_level_names: Tuple[str, ...] - @property @pwndbg.lib.cache.cache_until("objfile") - def STRUCT_PAGE_SIZE(self): + def STRUCT_PAGE_SIZE(self) -> int: a = pwndbg.aglib.typeinfo.load("struct page") if a is None: # true with the most common set of configurations @@ -70,7 +186,7 @@ class ArchPagingInfo: @property @pwndbg.lib.cache.cache_until("objfile") - def STRUCT_PAGE_SHIFT(self): + def STRUCT_PAGE_SHIFT(self) -> int: return int(math.log2(self.STRUCT_PAGE_SIZE)) @property @@ -103,11 +219,11 @@ class ArchPagingInfo: def markers(self) -> Tuple[Tuple[str, int], ...]: raise NotImplementedError() - def handle_kernel_pages(self, pages): + def handle_kernel_pages(self, pages: Tuple[Page, ...]) -> None: # this is arch dependent raise NotImplementedError() - def kbase_helper(self, address): + def kbase_helper(self, address: int) -> int | None: if address is None: return None for mapping in kernel_vmmap_pages(): @@ -123,94 +239,101 @@ class ArchPagingInfo: return None - def pagewalk(self, target, entry) -> Tuple[PageTableLevel, ...]: + def pagewalk(self, target: int, entry: int | None) -> Tuple[PageTableLevel, ...]: raise NotImplementedError() - def pagewalk_helper(self, target, entry) -> Tuple[PageTableLevel, ...]: + def pagetable_scan(self, entry: int | None = None) -> List[Page]: + raise NotImplementedError() + + @property + def PAGE_ENTRY_MASK(self) -> int: + return ~((1 << self.page_shift) - 1) & ((1 << self.va_bits) - 1) + + @property + def PAGE_INDEX_LEN(self) -> int: + return self.page_shift - math.ceil(math.log2(pwndbg.aglib.arch.ptrsize)) + + @property + def PAGE_INDEX_MASK(self) -> int: + return (1 << (self.PAGE_INDEX_LEN)) - 1 + + @pwndbg.lib.cache.cache_until("stop") + def scan_pagetable(self, entry: int, is_kernel: bool) -> PageTableScan | None: + # only two possible return values: https://qemu-project.gitlab.io/qemu/system/gdb.html + oldval = pwndbg.dbg.selected_inferior().send_remote("qqemu.PhyMemMode").decode() + pwndbg.dbg.selected_inferior().send_remote("Qqemu.PhyMemMode:1") + if pwndbg.dbg.selected_inferior().send_remote("qqemu.PhyMemMode") != b"1": + return None + try: + scan = PageTableScan(self, is_kernel) + scan.scan(entry, self.paging_level) + finally: # so that the PhyMemMode value is always restored + pwndbg.dbg.selected_inferior().send_remote(f"Qqemu.PhyMemMode:{oldval}") + return scan + + def pagewalk_helper(self, target: int, entry: int) -> Tuple[PageTableLevel, ...]: base = self.physmap if entry > base: # user inputted a physmap address as pointer to pgd entry -= base - level = self.paging_level - result = [PageTableLevel(None, None, None, None)] * (level + 1) - page_shift = self.page_shift - ENTRYMASK = ~((1 << page_shift) - 1) & ((1 << self.va_bits) - 1) - IDXMASK = (1 << (page_shift - math.ceil(math.log2(pwndbg.aglib.arch.ptrsize)))) - 1 - for i in range(level, 0, -1): - vaddr = (entry & ENTRYMASK) + base - self.phys_offset - if self.should_stop_pagewalk(entry): - break - shift = (i - 1) * (page_shift - 3) + page_shift - offset = target & ((1 << shift) - 1) - idx = (target & (IDXMASK << shift)) >> shift - entry = 0 - try: - # with this optimization, roughly x2 as fast on average - # especially useful when parsing a large number of pages, e.g. set kernel-vmmap monitor - if vaddr not in self.pagetableptr_cache: - self.pagetableptr_cache[vaddr] = pwndbg.aglib.memory.get_typed_pointer( - "unsigned long", vaddr - ) - table = self.pagetableptr_cache[vaddr] - if table not in self.pagetable_cache: - self.pagetable_cache[table] = {} - table_cache = self.pagetable_cache[table] - if idx not in table_cache: - table_cache[idx] = int(table[idx]) - entry = table_cache[idx] - # Prior to optimization: - # table = pwndbg.aglib.memory.get_typed_pointer("unsigned long", vaddr) - # entry = int(table[idx]) - except Exception as e: - print(M.warn(f"Exception while page walking: {e}")) - entry = 0 - if entry == 0: - return tuple(result) - result[i] = PageTableLevel(self.pagetable_level_names[i], entry, vaddr, idx) - result[0] = PageTableLevel( - self.pagetable_level_names[0], - entry, - (entry & ENTRYMASK) + base + offset - self.phys_offset, - None, - ) + scan = self.scan_pagetable(entry, pwndbg.aglib.memory.is_kernel(target)) + if scan is None: + return () + result = scan.walk(target, entry) + for i, level in enumerate(result): + if level.virt is None: + continue + level.virt = level.virt + base - self.phys_offset + level.name = self.pagetable_level_names[i] return tuple(result) - def pageentry_flags(self, level) -> BitFlags: + def pagetable_scan_helper(self, entry: int, is_kernel: bool = False) -> List[Page]: + scan = self.scan_pagetable(entry, is_kernel) + if scan is None: + return [] + return scan.result + + def pageentry_bitflags(self, level: int) -> BitFlags: raise NotImplementedError() - def should_stop_pagewalk(self, is_last): + def should_stop_pagewalk(self, level: int) -> bool: raise NotImplementedError() @property - def phys_offset(self): + def phys_offset(self) -> int: return 0 + @property + def va_bits(self) -> int: + raise NotImplementedError() + + @property + def pagetable_level_names(self) -> Tuple[str, ...]: + raise NotImplementedError() + + def pageentry_flags(self, entry: int) -> int: + raise NotImplementedError() + class x86_64PagingInfo(ArchPagingInfo): - def __init__(self): - self.va_bits = 48 if self.paging_level == 4 else 51 + @property + @pwndbg.lib.cache.cache_until("stop") + def pagetable_level_names(self) -> Tuple[str, ...]: # https://blog.zolutal.io/understanding-paging/ - self.pagetable_level_names = ( - ( - "Page", - "PT", - "PMD", - "PUD", - "PGD", - ) - if self.paging_level == 4 - else ( - "Page", - "PT", - "PMD", - "P4D", - "PUD", - "PGD", - ) - ) + match self.paging_level: + case 4: + return ("Page", "PT", "PMD", "PUD", "PGD") + case 5: + return ("Page", "PT", "PMD", "P4D", "PUD", "PGD") + return () + @property @pwndbg.lib.cache.cache_until("stop") - def get_vmalloc_vmemmap_bases(self): + def va_bits(self) -> int: + return 48 if self.paging_level == 4 else 51 + + @pwndbg.lib.cache.cache_until("stop") + def get_vmalloc_vmemmap_bases(self) -> Tuple[int, int]: result = None try: target = self.physmap.to_bytes(8, byteorder="little") @@ -230,7 +353,7 @@ class x86_64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def physmap(self): + def physmap(self) -> int: result = pwndbg.aglib.kernel.symbol.try_usymbol("page_offset_base") if result is None: result = first_kernel_page_start() @@ -238,7 +361,7 @@ class x86_64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def kbase(self): + def kbase(self) -> int | None: idt_entries = pwndbg.aglib.kernel.get_idt_entries() if len(idt_entries) == 0: return None @@ -250,7 +373,7 @@ class x86_64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def vmalloc(self): + def vmalloc(self) -> int: result = pwndbg.aglib.kernel.symbol.try_usymbol("vmalloc_base") if result is not None: return result @@ -262,7 +385,7 @@ class x86_64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def vmemmap(self): + def vmemmap(self) -> int: result = pwndbg.aglib.kernel.symbol.try_usymbol("vmemmap_base") if result is not None: return result @@ -296,7 +419,7 @@ class x86_64PagingInfo(ArchPagingInfo): (None, 0xFFFFFFFFFFFFFFFF), ) - def adjust(self, name): + def adjust(self, name: str) -> str: name = name.lower() if "low kernel" in name: return self.PHYSMAP @@ -310,7 +433,7 @@ class x86_64PagingInfo(ArchPagingInfo): return name[:-5] return name - def handle_kernel_pages(self, pages): + def handle_kernel_pages(self, pages: Tuple[Page, ...]) -> None: kernel_idx = None kbase = self.kbase for i, page in enumerate(pages): @@ -338,78 +461,97 @@ class x86_64PagingInfo(ArchPagingInfo): if pwndbg.aglib.regs.read_reg(pwndbg.aglib.regs.stack) in page: page.objfile = "kernel [stack]" - def pagewalk(self, target, entry) -> Tuple[PageTableLevel, ...]: + def pagewalk(self, target: int, entry: int | None) -> Tuple[PageTableLevel, ...]: if entry is None: entry = pwndbg.aglib.regs.read_reg("cr3") return self.pagewalk_helper(target, entry) - def pageentry_flags(self, is_last) -> BitFlags: + def pagetable_scan(self, entry: int | None = None) -> List[Page]: + if entry is None: + entry = pwndbg.aglib.regs.read_reg("cr3") + return self.pagetable_scan_helper(entry) + + def pageentry_bitflags(self, _: int) -> BitFlags: return BitFlags([("NX", 63), ("PS", 7), ("A", 5), ("U", 2), ("W", 1), ("P", 0)]) - def should_stop_pagewalk(self, entry): + def should_stop_pagewalk(self, entry: int) -> bool: return entry & (1 << 7) > 0 + def pageentry_flags(self, entry: int) -> int: + if entry & 1 == 0: # not present + return 0 + flags = Page.R_OK + if entry & (1 << 1): + flags |= Page.W_OK + if entry & (1 << 63) == 0: + flags |= Page.X_OK + return flags + class Aarch64PagingInfo(ArchPagingInfo): - def __init__(self): - self.tcr_el1 = pwndbg.lib.regs.aarch64_tcr_flags - self.tcr_el1.value = pwndbg.aglib.regs.read_reg("TCR_EL1") + def __init__(self) -> None: + self.VMEMMAP_START = self.VMEMMAP_SIZE = self.PAGE_OFFSET = None + + @property + @pwndbg.lib.cache.cache_until("stop") + def pagetable_level_names(self) -> Tuple[str, ...]: + match self.paging_level: + case 4: + return ("Page", "L3", "L2", "L1", "L0") + case 3: + return ("Page", "L3", "L2", "L1") + case 2: + return ("Page", "L3", "L2") + return () + + @property + @pwndbg.lib.cache.cache_until("stop") + def tcr_el1(self) -> BitFlags: + tcr = pwndbg.lib.regs.aarch64_tcr_flags + tcr.value = pwndbg.aglib.regs.read_reg("TCR_EL1") + return tcr + + @property + @pwndbg.lib.cache.cache_until("stop") + def va_bits(self) -> int: id_aa64mmfr2_el1 = pwndbg.lib.regs.aarch64_mmfr_flags id_aa64mmfr2_el1.value = pwndbg.aglib.regs.read_reg("ID_AA64MMFR2_EL1") feat_lva = id_aa64mmfr2_el1.value is not None and id_aa64mmfr2_el1["VARange"] == 0b0001 - self.va_bits = 64 - self.tcr_el1["T1SZ"] # this is prob only `vabits_actual` - self.PAGE_OFFSET = self._PAGE_OFFSET(self.va_bits) # physmap base address without KASLR + va_bits: int = 64 - self.tcr_el1["T1SZ"] # this is prob only `vabits_actual` + self.PAGE_OFFSET = self._PAGE_OFFSET(va_bits) # physmap base address without KASLR if feat_lva: - self.va_bits = min(52, self.va_bits) - self.va_bits_min = 48 if self.va_bits > 48 else self.va_bits - self._vmalloc = self._PAGE_END( - self.va_bits_min - ) # also includes KASAN and kernel module regions - if self.paging_level == 4: - self.pagetable_level_names = ( - "Page", - "L3", - "L2", - "L1", - "L0", - ) - elif self.paging_level == 3: - self.pagetable_level_names = ( - "Page", - "L3", - "L2", - "L1", - ) + va_bits = min(52, va_bits) + return va_bits - elif self.paging_level == 2: - self.pagetable_level_names = ( - "Page", - "L3", - "L2", - ) + @property + @pwndbg.lib.cache.cache_until("stop") + def va_bits_min(self) -> int: + return 48 if self.va_bits > 48 else self.va_bits @property + @pwndbg.lib.cache.cache_until("stop") def vmalloc(self) -> int: - return self._vmalloc + # also includes KASAN and kernel module regions + return self._PAGE_END(self.va_bits_min) @property @pwndbg.lib.cache.cache_until("stop") - def physmap(self): + def physmap(self) -> int: return first_kernel_page_start() @property @pwndbg.lib.cache.cache_until("stop") - def kbase(self): + def kbase(self) -> int: return self.kbase_helper(pwndbg.aglib.regs.read_reg("vbar")) @property @pwndbg.lib.cache.cache_until("stop") - def kversion(self): + def kversion(self) -> Tuple[int, ...] | None: return pwndbg.aglib.kernel.krelease() @property @pwndbg.lib.cache.cache_until("stop") - def module_start(self): + def module_start(self) -> int: if self.kbase is None: return None res = None @@ -423,18 +565,20 @@ class Aarch64PagingInfo(ArchPagingInfo): break return res - def _PAGE_OFFSET(self, va): # aka PAGE_START + def _PAGE_OFFSET(self, va: int) -> int: # aka PAGE_START return (-(1 << va)) & 0xFFFFFFFFFFFFFFFF - def _PAGE_END(self, va): + def _PAGE_END(self, va: int) -> int: return (-(1 << (va - 1))) & 0xFFFFFFFFFFFFFFFF @property @pwndbg.lib.cache.cache_until("stop") - def vmemmap(self): - if self.kversion is None: + def vmemmap(self) -> int: + _ = self.va_bits_min + if self.kversion is None or self.PAGE_OFFSET is None: return INVALID_ADDR vmemmap_shift = self.page_shift - self.STRUCT_PAGE_SHIFT + # self.PAGE_OFFSET is set by self.va_bits(_min) so must exist if self.kversion < (5, 4): self.VMEMMAP_SIZE = 1 << (self.va_bits - self.page_shift - 1 + self.STRUCT_PAGE_SHIFT) self.VMEMMAP_START = self.PAGE_OFFSET - self.VMEMMAP_SIZE @@ -459,8 +603,8 @@ class Aarch64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def pci(self): - if self.kversion is None: + def pci(self) -> int: + if self.kversion is None or self.VMEMMAP_START is None or self.VMEMMAP_SIZE is None: return None self.pci_end = INVALID_ADDR if self.kversion >= (6, 9): @@ -475,7 +619,7 @@ class Aarch64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def fixmap(self): + def fixmap(self) -> int: if self.kversion is None: return INVALID_ADDR if self.kversion < (5, 11): @@ -491,7 +635,7 @@ class Aarch64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") - def ksize(self): + def ksize(self) -> int: start = pwndbg.aglib.symbol.lookup_symbol_addr("_text") end = pwndbg.aglib.symbol.lookup_symbol_addr("_end") if start is not None and end is not None: @@ -535,8 +679,8 @@ class Aarch64PagingInfo(ArchPagingInfo): return self.page_shift_heuristic @property - @pwndbg.lib.cache.cache_until("forever") - def paging_level(self): + @pwndbg.lib.cache.cache_until("stop") + def paging_level(self) -> int: # https://www.kernel.org/doc/html/v5.3/arm64/memory.html if self.page_shift == 16: return 2 @@ -565,6 +709,8 @@ class Aarch64PagingInfo(ArchPagingInfo): vmalloc_end = None if self.vmemmap and self.pci and self.fixmap: vmalloc_end = min(self.vmemmap, self.pci, self.fixmap) + if self.VMEMMAP_START is None or self.VMEMMAP_SIZE is None or self.PAGE_OFFSET is None: + return () return ( (self.USERLAND, 0), (None, self.PAGE_OFFSET), @@ -580,7 +726,7 @@ class Aarch64PagingInfo(ArchPagingInfo): (None, 0xFFFFFFFFFFFFFFFF), ) - def adjust(self, name): + def adjust(self, name: str) -> str: name = name.lower() if "end" in name: return None @@ -594,7 +740,7 @@ class Aarch64PagingInfo(ArchPagingInfo): return self.VMALLOC return " ".join(name.strip().split()[:-1]) - def handle_kernel_pages(self, pages): + def handle_kernel_pages(self, pages: Tuple[Page, ...]) -> None: if self.kbase is None: return for i in range(len(pages)): @@ -617,7 +763,7 @@ class Aarch64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("start") - def phys_offset(self): + def phys_offset(self) -> int: found_system = False try: for line in pwndbg.dbg.selected_inferior().send_monitor("info mtree -f").splitlines(): @@ -632,20 +778,42 @@ class Aarch64PagingInfo(ArchPagingInfo): pass return 0x40000000 # default - def pagewalk(self, target, entry) -> Tuple[PageTableLevel, ...]: + def pagewalk(self, target: int, entry: int | None) -> Tuple[PageTableLevel, ...]: if entry is None: if pwndbg.aglib.memory.is_kernel(target): entry = pwndbg.aglib.regs.read_reg("TTBR1_EL1") else: entry = pwndbg.aglib.regs.read_reg("TTBR0_EL1") - self.entry = entry + entry |= 3 # marks the entry as a table return self.pagewalk_helper(target, entry) - def pageentry_flags(self, is_last) -> BitFlags: - if is_last: - return BitFlags([("UNX", 54), ("PNX", 53), ("AP", (6, 7))]) - return BitFlags([("UNX", 60), ("PNX", 59), ("AP", (6, 7))]) + def pagetable_scan(self, entry: int | None = None) -> List[Page]: + # assumes entry should be from `kcurrent --set` and should be TTBR0_EL1 for a task + if entry is None: + entry = pwndbg.aglib.regs.read_reg("TTBR0_EL1") + result = self.pagetable_scan_helper(entry | 3, is_kernel=False) + if pwndbg.aglib.memory.is_kernel(pwndbg.aglib.regs.pc): + result += self.pagetable_scan_helper( + pwndbg.aglib.regs.read_reg("TTBR1_EL1") | 3, is_kernel=True + ) + return result - def should_stop_pagewalk(self, entry): - # self.entry is set because the call chain - return (((entry & 1) == 0) or ((entry & 3) == 1)) and entry != self.entry + def pageentry_bitflags(self, level: int) -> BitFlags: + if level != 0: + # block or page + return BitFlags([("UNX", 54), ("PNX", 53), ("AP", (6, 7))]) + return BitFlags([("UNX", 60), ("PNX", 59), ("AP", (61, 62))]) + + def should_stop_pagewalk(self, entry: int) -> bool: + return (entry & 1) == 0 or (entry & 3) == 1 + + def pageentry_flags(self, entry: int) -> int: + if entry & 1 == 0: + return 0 + flags = Page.R_OK + if (entry >> 53) & 3 != 3: + flags |= Page.X_OK + ap = (entry >> 6) & 3 + if ap == 1 or ap == 0: + flags |= Page.W_OK + return flags diff --git a/pwndbg/aglib/kernel/symbol.py b/pwndbg/aglib/kernel/symbol.py index 2323e470f..5dfab93a4 100644 --- a/pwndbg/aglib/kernel/symbol.py +++ b/pwndbg/aglib/kernel/symbol.py @@ -281,7 +281,7 @@ class ArchSymbols: self.bpf_map_heuristic_func = "bpf_map_free_id" self.current_task_heuristic_func = "common_cpu_up" - def disass(self, name, lines=5): + def disass(self, name, lines=10): sym = pwndbg.aglib.symbol.lookup_symbol(name) if sym is None: return None diff --git a/pwndbg/aglib/kernel/vmmap.py b/pwndbg/aglib/kernel/vmmap.py index a33a66d07..a10b33fdd 100644 --- a/pwndbg/aglib/kernel/vmmap.py +++ b/pwndbg/aglib/kernel/vmmap.py @@ -25,10 +25,11 @@ import pwndbg.aglib.vmmap import pwndbg.color.message as M import pwndbg.lib.cache import pwndbg.lib.memory +from pwndbg.lib.memory import Page class KernelVmmap: - def __init__(self, pages: Tuple[pwndbg.lib.memory.Page, ...]): + def __init__(self, pages: Tuple[Page, ...]): self.pages = pages self.sections = None self.pi = pwndbg.aglib.kernel.arch_paginginfo() @@ -176,7 +177,7 @@ class QemuMachine(Machine): @pwndbg.lib.cache.cache_until("stop") -def kernel_vmmap_via_page_tables() -> Tuple[pwndbg.lib.memory.Page, ...]: +def kernel_vmmap_via_page_tables() -> Tuple[Page, ...]: if not pwndbg.aglib.qemu.is_qemu_kernel(): return () @@ -231,7 +232,7 @@ def kernel_vmmap_via_page_tables() -> Tuple[pwndbg.lib.memory.Page, ...]: p = PageTableDump(machine_backend, arch_backend) pages = p.arch_backend.parse_tables(p.cache, p.parser.parse_args("")) - retpages: List[pwndbg.lib.memory.Page] = [] + retpages: List[Page] = [] for page in pages: start = page.va size = page.page_size @@ -241,14 +242,14 @@ def kernel_vmmap_via_page_tables() -> Tuple[pwndbg.lib.memory.Page, ...]: if page.pwndbg_is_executable(): flags |= 1 objfile = f"[pt_{hex(start)[2:-3]}]" - retpages.append(pwndbg.lib.memory.Page(start, size, flags, 0, objfile)) + retpages.append(Page(start, size, flags, 0, objfile)) return tuple(retpages) monitor_info_mem_not_warned = True -def _parser_mem_info_line_x86(line: str) -> pwndbg.lib.memory.Page | None: +def _parser_mem_info_line_x86(line: str) -> Page | None: """ Example response from `info mem`: ``` @@ -270,11 +271,11 @@ def _parser_mem_info_line_x86(line: str) -> pwndbg.lib.memory.Page | None: flags = 0 if "r" in perm: - flags |= 4 + flags |= Page.R_OK if "w" in perm: - flags |= 2 + flags |= Page.W_OK if "x" in perm: - flags |= 1 + flags |= Page.X_OK global monitor_info_mem_not_warned if end - start != size and monitor_info_mem_not_warned: @@ -291,10 +292,10 @@ def _parser_mem_info_line_x86(line: str) -> pwndbg.lib.memory.Page | None: ) monitor_info_mem_not_warned = False - return pwndbg.lib.memory.Page(start, size, flags, 0, "") + return Page(start, size, flags, 0, "") -def _parser_mem_info_line_riscv64(line: str) -> pwndbg.lib.memory.Page | None: +def _parser_mem_info_line_riscv64(line: str) -> Page | None: """ Example response from `info mem`: ``` @@ -317,17 +318,17 @@ def _parser_mem_info_line_riscv64(line: str) -> pwndbg.lib.memory.Page | None: flags = 0 if "r" in perm: - flags |= 4 + flags |= Page.R_OK if "w" in perm: - flags |= 2 + flags |= Page.W_OK if "x" in perm: - flags |= 1 + flags |= Page.X_OK - return pwndbg.lib.memory.Page(start, size, flags, 0, "") + return Page(start, size, flags, 0, "") @pwndbg.lib.cache.cache_until("stop") -def kernel_vmmap_via_monitor_info_mem() -> Tuple[pwndbg.lib.memory.Page, ...]: +def kernel_vmmap_via_monitor_info_mem() -> Tuple[Page, ...]: """ Returns Linux memory maps information by parsing `monitor info mem` output from QEMU kernel GDB stub. @@ -367,7 +368,7 @@ def kernel_vmmap_via_monitor_info_mem() -> Tuple[pwndbg.lib.memory.Page, ...]: ) return () - pages: List[pwndbg.lib.memory.Page] = [] + pages: List[Page] = [] for line in monitor_info_mem.splitlines(): try: page = parser_func(line) @@ -386,23 +387,37 @@ kernel_vmmap_mode = pwndbg.config.add_param( help_docstring="""\ Values explained: -+ `page-tables` - read /proc/$qemu-pid/mem to parse kernel page tables to render vmmap ++ `page-tables` - walk page tables to render vmmap ++ `pt-dump` - read /proc/$qemu-pid/mem to parse kernel page tables to render vmmap + `monitor` - use QEMU's `monitor info mem` to render vmmap + `none` - disable vmmap rendering; useful if rendering is particularly slow Note that the page-tables method will require the QEMU kernel process to be on the same machine and within the same PID namespace. Running QEMU kernel and GDB in different Docker containers will not work. Consider running both containers with --pid=host (meaning they will see and so be able to interact with all processes on the machine). """, param_class=pwndbg.lib.config.PARAM_ENUM, - enum_sequence=["page-tables", "monitor", "none"], + enum_sequence=["page-tables", "pt-dump", "monitor", "none"], ) @pwndbg.lib.cache.cache_until("stop") -def kernel_vmmap_pages() -> Tuple[pwndbg.lib.memory.Page, ...]: - if kernel_vmmap_mode == "page-tables": - return kernel_vmmap_via_page_tables() - elif kernel_vmmap_mode == "monitor": - return kernel_vmmap_via_monitor_info_mem() +def kernel_vmmap_pages() -> Tuple[Page, ...]: + mode = kernel_vmmap_mode + if mode == "page-tables" and pwndbg.aglib.arch.name in ("rv32", "rv64"): + # TODO: remove this by implementing `RiscvPagingInfo`, `RiscvOps`, etc + print(M.warn("`page-tables` unsupported for riscv, defaulting to `monitor info mem`")) + mode = "monitor" + match mode: + case "page-tables": + # has the user set the pgd with kcurrent? + # None if not which gets properly handled + entry = pwndbg.commands.kcurrent.KCURRENT_PGD + if entry and pwndbg.aglib.memory.is_kernel(entry): + entry = pwndbg.aglib.kernel.virt_to_phys(entry) + return pwndbg.aglib.kernel.pagetable_scan(entry) + case "pt-dump": + return kernel_vmmap_via_page_tables() + case "monitor": + return kernel_vmmap_via_monitor_info_mem() return () diff --git a/pwndbg/aglib/memory.py b/pwndbg/aglib/memory.py index 5deb67b68..9ae80f3c2 100644 --- a/pwndbg/aglib/memory.py +++ b/pwndbg/aglib/memory.py @@ -445,5 +445,5 @@ def is_pagefault_supported() -> bool: return pwndbg.dbg.selected_inferior().is_linux() -def is_kernel(addr: int): - return (addr >> 63 == 1) and peek(addr) is not None +def is_kernel(addr: int | None): + return addr is not None and (addr >> 63 == 1) and peek(addr) is not None diff --git a/pwndbg/aglib/nearpc.py b/pwndbg/aglib/nearpc.py index 055617ecf..b248ba399 100644 --- a/pwndbg/aglib/nearpc.py +++ b/pwndbg/aglib/nearpc.py @@ -88,7 +88,7 @@ opcode_separator_bytes = pwndbg.config.add_param( def nearpc( pc: int = None, - lines: int = 5, # consistent with previous nearpc_lines + lines: int = None, back_lines: int = 0, total_lines: int = None, emulate=False, @@ -123,6 +123,9 @@ def nearpc( if not pwndbg.aglib.memory.peek(pc): result.append(message.error("Invalid address %#x" % pc)) + if lines is None: + lines = int(pwndbg.config.nearpc_lines) + # # Load source data if it's available # pc_to_linenos = collections.defaultdict(lambda: []) # lineno_to_src = {} diff --git a/pwndbg/commands/kcurrent.py b/pwndbg/commands/kcurrent.py index 0a18d6dd3..598463b70 100644 --- a/pwndbg/commands/kcurrent.py +++ b/pwndbg/commands/kcurrent.py @@ -68,7 +68,7 @@ parser.add_argument( "--set", dest="set_pid", action="store_true", - help="sets the kernel task used for supported pwndbg commands (kfile, pagewalk), this option does not change internal mem (purely effects how certain commands behaves)", + help="sets the kernel task used for supported pwndbg commands (kfile, pagewalk, vmmap), this option does not change internal mem (purely effects how certain commands behaves)", ) @@ -82,7 +82,7 @@ def kcurrent(pid=None, set_pid=False, verbose=True): if pid is None: kcurrent = pwndbg.aglib.kernel.current_task() kcurrent = pwndbg.aglib.memory.get_typed_pointer("struct task_struct", kcurrent) - if kcurrent: + if kcurrent and pwndbg.aglib.memory.is_kernel(int(kcurrent)): pid = int(kcurrent["pid"]) if pid is not None: for task in pwndbg.commands.ktask.get_ktasks(): @@ -97,7 +97,7 @@ def kcurrent(pid=None, set_pid=False, verbose=True): if set_pid: mm = kthread.mm if not mm: - print(M.warn("current kernel task not set.")) + print(M.warn("mm not found, current kernel task not set.")) return KCURRENT_PID = pid KCURRENT_PGD = int(mm["pgd"]) diff --git a/pwndbg/commands/paging.py b/pwndbg/commands/paging.py index aea213de9..7324060f1 100644 --- a/pwndbg/commands/paging.py +++ b/pwndbg/commands/paging.py @@ -31,7 +31,7 @@ PAGETYPES = ( def print_pagetable_entry(ptl: PageTableLevel, level: int, is_last: bool): - pageflags = pwndbg.aglib.kernel.arch_paginginfo().pageentry_flags(is_last) + pageflags = pwndbg.aglib.kernel.arch_paginginfo().pageentry_bitflags(is_last) flags = "" arrow_right = pwndbg.chain.c.arrow(f"{pwndbg.chain.config_arrow_right}") name, entry, vaddr, idx = ptl.name, ptl.entry, ptl.virt, ptl.idx diff --git a/pwndbg/lib/memory.py b/pwndbg/lib/memory.py index a96d7f586..85c032eae 100644 --- a/pwndbg/lib/memory.py +++ b/pwndbg/lib/memory.py @@ -67,6 +67,13 @@ class Page: one page of memory. """ + """ + consts + """ + R_OK = os.R_OK + W_OK = os.W_OK + X_OK = os.X_OK + vaddr = 0 #: Starting virtual address memsz = 0 #: Size of the address space, in bytes flags = 0 #: Flags set by the ELF file, see PF_X, PF_R, PF_W @@ -131,15 +138,15 @@ class Page: @property def read(self) -> bool: - return bool(self.flags & os.R_OK) + return bool(self.flags & self.R_OK) @property def write(self) -> bool: - return bool(self.flags & os.W_OK) + return bool(self.flags & self.W_OK) @property def execute(self) -> bool: - return bool(self.flags & os.X_OK) + return bool(self.flags & self.X_OK) @property def rw(self) -> bool: @@ -162,9 +169,9 @@ class Page: flags = self.flags return "".join( [ - "r" if flags & os.R_OK else "-", - "w" if flags & os.W_OK else "-", - "x" if flags & os.X_OK else "-", + "r" if flags & self.R_OK else "-", + "w" if flags & self.W_OK else "-", + "x" if flags & self.X_OK else "-", "p", ] ) diff --git a/tests/library/qemu_system/tests/test_commands_kernel.py b/tests/library/qemu_system/tests/test_commands_kernel.py index cf1582176..849bebcb2 100644 --- a/tests/library/qemu_system/tests/test_commands_kernel.py +++ b/tests/library/qemu_system/tests/test_commands_kernel.py @@ -74,10 +74,11 @@ def test_command_ktask(): return res = gdb.execute("ktask", to_string=True) assert "task @" in res - res = gdb.execute("kcurrent --set", to_string=True) + res = gdb.execute("kcurrent --set 1", to_string=True) assert "task @" in res - res2 = gdb.execute("kfile", to_string=True) - assert res in res2 + if "not found" not in res: + res2 = gdb.execute("kfile", to_string=True) + assert res in res2 def test_command_kversion(): @@ -122,7 +123,8 @@ def test_command_slab_contains(): pwndbg.aglib.kernel.slab.load_slab_typeinfo() # retrieve a valid slab object address (first address from freelist) - addr, slab_cache = get_slab_object_address() + addrs, slab_cache = get_slab_object_address() + addr = addrs[0] res = gdb.execute(f"slab contains {addr}", to_string=True) assert f"{addr} @ {slab_cache}" in res @@ -142,11 +144,6 @@ def test_x64_extra_registers_under_kernel_mode(): assert flag in res or flag.upper() in res -def get_slab_freelist_elements(out): - out = pwndbg.color.strip(out) - return re.findall(r"- \[0x[0-9a-fA-F\-]{2}\] (0x[0-9a-fA-F]+)", out) - - def get_slab_object_address(): """helper function to get the address of some kmalloc slab object and the associated slab cache name""" @@ -154,9 +151,10 @@ def get_slab_object_address(): for cache in caches: cache_name = cache.name info = gdb.execute(f"slab info -v {cache_name}", to_string=True) - matches = get_slab_freelist_elements(info) + info = pwndbg.color.strip(info) + matches = re.findall(r"- \[0x[0-9a-fA-F\-]{2}\] (0x[0-9a-fA-F]+)", info) if len(matches) > 0: - return (matches[0], cache_name) + return (matches, cache_name) raise ValueError("Could not find any slab objects")