From 217590668b9a414f26899032a698232cd1563fef Mon Sep 17 00:00:00 2001 From: jxuanli <65455765+jxuanli@users.noreply.github.com> Date: Sat, 25 Oct 2025 02:26:50 -0700 Subject: [PATCH] Partial support for simultaneously debugging different kernel tasks (#3370) * refactor * refactored * added files * exported kfile * kcurrent * docs * pagewalk * cleaning up * config detection * tests * improved arch symbol handlign * cleaning up * docs * fix typo --- docs/commands/index.md | 2 + docs/commands/kernel/kcurrent.md | 25 +++ docs/commands/kernel/kfile.md | 24 +++ pwndbg/aglib/kernel/__init__.py | 6 + pwndbg/aglib/kernel/paging.py | 6 +- pwndbg/aglib/kernel/symbol.py | 62 +++++-- pwndbg/commands/__init__.py | 1 + pwndbg/commands/kcurrent.py | 103 ++++++++++++ pwndbg/commands/ktask.py | 158 +++++++++++------- pwndbg/commands/paging.py | 4 + pwndbg/lib/kernel/kconfig.py | 6 + .../qemu_system/tests/test_commands_kernel.py | 6 +- 12 files changed, 319 insertions(+), 84 deletions(-) create mode 100644 docs/commands/kernel/kcurrent.md create mode 100644 docs/commands/kernel/kfile.md create mode 100644 pwndbg/commands/kcurrent.py diff --git a/docs/commands/index.md b/docs/commands/index.md index 98a6feab8..e3f16e27f 100644 --- a/docs/commands/index.md +++ b/docs/commands/index.md @@ -80,8 +80,10 @@ - [kchecksec](kernel/kchecksec.md) - Checks for kernel hardening configuration options. - [kcmdline](kernel/kcmdline.md) - Return the kernel commandline (/proc/cmdline). - [kconfig](kernel/kconfig.md) - Outputs the kernel config. +- [kcurrent](kernel/kcurrent.md) - Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == None - [kdmabuf](kernel/kdmabuf.md) - Prints DMA buf info - [kdmesg](kernel/kdmesg.md) - Displays the kernel ring buffer (dmesg) contents. +- [kfile](kernel/kfile.md) - Displays information about fds accessible by a kernel task. - [klookup](kernel/klookup.md) - Lookup kernel symbols - [kmod](kernel/kmod.md) - Displays the loaded Linux kernel modules. - [knft-dump](kernel/knft-dump.md) - Dump all nftables: tables, chains, rules, expressions diff --git a/docs/commands/kernel/kcurrent.md b/docs/commands/kernel/kcurrent.md new file mode 100644 index 000000000..47c9fa37c --- /dev/null +++ b/docs/commands/kernel/kcurrent.md @@ -0,0 +1,25 @@ + +# kcurrent + +```text +usage: kcurrent [-h] [--set] [pid] + +``` + +Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == None + Displays the task with pid if pid != None. +### Positional arguments + +|Positional Argument|Help| +| :--- | :--- | +|pid|| + +### Optional arguments + +|Short|Long|Help| +| :--- | :--- | :--- | +|-h|--help|show this help message and exit| +||--set|sets the kernel task used for supported pwndbg commands (kfile, pagewalk), this option does not change internal mem (purely effects how certain commands behaves)| + + + diff --git a/docs/commands/kernel/kfile.md b/docs/commands/kernel/kfile.md new file mode 100644 index 000000000..0fa390134 --- /dev/null +++ b/docs/commands/kernel/kfile.md @@ -0,0 +1,24 @@ + +# kfile + +```text +usage: kfile [-h] [--fd [FD]] [pid] + +``` + +Displays information about fds accessible by a kernel task. +### Positional arguments + +|Positional Argument|Help| +| :--- | :--- | +|pid|| + +### Optional arguments + +|Short|Long|Help| +| :--- | :--- | :--- | +|-h|--help|show this help message and exit| +||--fd|| + + + diff --git a/pwndbg/aglib/kernel/__init__.py b/pwndbg/aglib/kernel/__init__.py index 48d4be165..54c9505eb 100644 --- a/pwndbg/aglib/kernel/__init__.py +++ b/pwndbg/aglib/kernel/__init__.py @@ -691,3 +691,9 @@ def map_idr() -> pwndbg.dbg_mod.Value: if (syms := arch_symbols()) is not None: return syms.map_idr() return None + + +def current_task() -> pwndbg.dbg_mod.Value: + if (syms := arch_symbols()) is not None: + return syms.current_task() + return None diff --git a/pwndbg/aglib/kernel/paging.py b/pwndbg/aglib/kernel/paging.py index 07d049db8..1de8c5eda 100644 --- a/pwndbg/aglib/kernel/paging.py +++ b/pwndbg/aglib/kernel/paging.py @@ -259,11 +259,7 @@ class x86_64PagingInfo(ArchPagingInfo): @property @pwndbg.lib.cache.cache_until("stop") def paging_level(self) -> int: - # CONFIG_X86_5LEVEL is only a hint -- whether 5lvl paging is used depends on the hardware - # see also: https://www.kernel.org/doc/html/next/x86/x86_64/mm.html - if first_kernel_page_start() < (0xFFF << (4 * 13)): - return 5 - return 4 + return 4 if (pwndbg.aglib.regs["cr4"] & (1 << 12)) == 0 else 5 @pwndbg.lib.cache.cache_until("stop") def markers(self) -> Tuple[Tuple[str, int], ...]: diff --git a/pwndbg/aglib/kernel/symbol.py b/pwndbg/aglib/kernel/symbol.py index 2252db44b..14e66f1a6 100644 --- a/pwndbg/aglib/kernel/symbol.py +++ b/pwndbg/aglib/kernel/symbol.py @@ -273,6 +273,7 @@ class ArchSymbols: ) self.bpf_prog_heuristic_func = "bpf_prog_free_id" self.bpf_map_heuristic_func = "bpf_map_free_id" + self.current_task_heuristic_func = "common_cpu_up" def disass(self, name, lines=None): sym = pwndbg.aglib.symbol.lookup_symbol(name) @@ -353,6 +354,21 @@ class ArchSymbols: prog_idr = self._prog_idr() return pwndbg.aglib.memory.get_typed_pointer("unsigned long", prog_idr) + def current_task(self): + current_task = pwndbg.aglib.symbol.lookup_symbol("current_task") + if current_task: + current_task = pwndbg.aglib.kernel.per_cpu(current_task) + return current_task.dereference() + if pwndbg.aglib.arch.name == "aarch64": + current_task = self._current_task() + elif pwndbg.aglib.kernel.has_debug_symbols(self.current_task_heuristic_func): + current_task = self._current_task() + if current_task is not None: + current_task = pwndbg.aglib.kernel.per_cpu(current_task) + # current_task is int but needed here to make the linter happy + current_task = pwndbg.aglib.memory.read_pointer_width(int(current_task)) + return pwndbg.aglib.memory.get_typed_pointer("unsigned long", current_task) + def _node_data(self): raise NotImplementedError() @@ -374,29 +390,32 @@ class ArchSymbols: def _prog_idr(self): raise NotImplementedError() + def _current_task(self): + raise NotImplementedError() + class x86_64Symbols(ArchSymbols): - # mov reg, [... - 0x...] - # the ``-0x...` is a kernel address displayed as a negative number + # op ... [... +/- (0x...)] + # if negative, the `-0x...`` is a kernel address displayed as a negative number # returns the first 0x... as an int if exists - def dword_mov_reg_memoff(self, disass, nth=0): - result = self.regex(disass, r".*?\bmov.*\[.*-.*(0x[0-9a-f]+)\]", nth) + def qword_op_reg_memoff(self, disass, op, sign="-", nth=0): + result = self.regex(disass, rf"{op}.*\[.*{re.escape(sign)}\s(0x[0-9a-f]+)\]", nth) if result is not None: - return (1 << 64) - int(result.group(1), 16) + if sign == "-": + return (1 << 64) - int(result.group(1), 16) + else: + return int(result.group(1), 16) return None - # add reg, [... - 0x...] - # the `-0x...`` is a kernel address displayed as a negative number - # returns the first 0x... as an int if exists - def dword_add_reg_memoff(self, disass, nth=0): - result = self.regex(disass, r".*?\badd.*\[.*-.*(0x[0-9a-f]+)\]", nth) + # mov reg, + def qword_mov_reg_const(self, disass, nth=0): + result = self.regex(disass, r"mov.*(0x[0-9a-f]{16})", nth) if result is not None: - return (1 << 64) - int(result.group(1), 16) + return int(result.group(1), 16) return None - # mov reg, - def qword_mov_reg_const(self, disass, nth=0): - result = self.regex(disass, r".*?\bmov.*(0x[0-9a-f]{16})", nth) + def dword_mov_reg_const(self, disass, nth=0): + result = self.regex(disass, r"mov.*(0x[0-9a-f]{1,8})\b(?!\])", nth) if result is not None: return int(result.group(1), 16) return None @@ -413,7 +432,7 @@ class x86_64Symbols(ArchSymbols): def _node_data(self): disass = self.disass(self.node_data_heuristic_func) - result = self.dword_mov_reg_memoff(disass) + result = self.qword_op_reg_memoff(disass, op="mov", sign="-") if result is not None: return result return self.qword_mov_reg_const(disass) @@ -424,7 +443,7 @@ class x86_64Symbols(ArchSymbols): def _per_cpu_offset(self): disass = self.disass(self.per_cpu_offset_heuristic_func) - result = self.dword_add_reg_memoff(disass) + result = self.qword_op_reg_memoff(disass, op="add", sign="-") if result is not None: return result result = self.qword_mov_reg_const(disass) @@ -458,6 +477,14 @@ class x86_64Symbols(ArchSymbols): return result return self.qword_mov_reg_const(disass) + def _current_task(self): + disass = self.disass(self.current_task_heuristic_func) + result = self.dword_mov_reg_const(disass) + if result is not None: + return result + disass = self.disass(self.current_task_heuristic_func, lines=20) + return self.qword_op_reg_memoff(disass, op="mov", sign="+") + class Aarch64Symbols(ArchSymbols): # adrp x?, @@ -546,3 +573,6 @@ class Aarch64Symbols(ArchSymbols): if result is not None: return result return self.qword_adrp_add_const(disass) + + def _current_task(self): + return pwndbg.aglib.regs["sp_el0"] diff --git a/pwndbg/commands/__init__.py b/pwndbg/commands/__init__.py index d24c1680a..d0baeee70 100644 --- a/pwndbg/commands/__init__.py +++ b/pwndbg/commands/__init__.py @@ -924,6 +924,7 @@ def load_commands() -> None: import pwndbg.commands.kchecksec import pwndbg.commands.kcmdline import pwndbg.commands.kconfig + import pwndbg.commands.kcurrent import pwndbg.commands.kdmabuf import pwndbg.commands.kdmesg import pwndbg.commands.klookup diff --git a/pwndbg/commands/kcurrent.py b/pwndbg/commands/kcurrent.py new file mode 100644 index 000000000..0a18d6dd3 --- /dev/null +++ b/pwndbg/commands/kcurrent.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import argparse + +import pwndbg.color as C +import pwndbg.color.message as M +import pwndbg.commands +import pwndbg.lib +from pwndbg.lib.exception import IndentContextManager +from pwndbg.lib.regs import BitFlags + +indent = IndentContextManager() + +fmode_flags = BitFlags([("R", 0), ("W", 1), ("X", 5)]) +KCURRENT_PID = None +KCURRENT_PGD = None + +parser = argparse.ArgumentParser( + description="Displays information about fds accessible by a kernel task." +) +parser.add_argument("pid", nargs="?", type=int, help="") +parser.add_argument("--fd", nargs="?", type=int, help="") + + +@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL) +@pwndbg.commands.OnlyWhenQemuKernel +@pwndbg.commands.OnlyWhenPagingEnabled +@pwndbg.commands.OnlyWithKernelDebugInfo +def kfile(pid=None, fd=None): + if pid is None: + if KCURRENT_PID is None: + kcurrent(None, set_pid=True, verbose=False) + pid = KCURRENT_PID + if pid is None: + print(M.warn("no pid specified (either specify pid or set with kcurrent)")) + return + indent = IndentContextManager() + threads = [] + for task in pwndbg.commands.ktask.get_ktasks(): + threads += task.threads + for thread in threads: + if thread.pid != pid: + continue + indent.print(thread) + with indent: + for i, file in thread.files(): + if fd is not None and i != fd: + continue + addr = int(file) + ops = int(file["f_op"]) + prefix = indent.prefix(f"[fileno {i:03}]") + flags = C.context.format_flags(int(file["f_mode"]), fmode_flags) + desc = f"ops @ {C.red(pwndbg.chain.format(ops, limit=0))}" + indent.print(f"- {prefix} file @ {indent.addr_hex(addr)}: {desc}") + private_data = int(file["private_data"]) + with indent: + indent.print(f"private: {indent.addr_hex(private_data)}, fmode: {flags}") + + +parser = argparse.ArgumentParser( + description=""" + Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == None + Displays the task with pid if pid != None. + """ +) +parser.add_argument("pid", nargs="?", type=int, help="") +parser.add_argument( + "--set", + dest="set_pid", + action="store_true", + help="sets the kernel task used for supported pwndbg commands (kfile, pagewalk), this option does not change internal mem (purely effects how certain commands behaves)", +) + + +@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL) +@pwndbg.commands.OnlyWhenQemuKernel +@pwndbg.commands.OnlyWhenPagingEnabled +@pwndbg.commands.OnlyWithKernelDebugInfo +def kcurrent(pid=None, set_pid=False, verbose=True): + global KCURRENT_PID, KCURRENT_PGD + kthread = None + if pid is None: + kcurrent = pwndbg.aglib.kernel.current_task() + kcurrent = pwndbg.aglib.memory.get_typed_pointer("struct task_struct", kcurrent) + if kcurrent: + pid = int(kcurrent["pid"]) + if pid is not None: + for task in pwndbg.commands.ktask.get_ktasks(): + for _kthread in task.threads: + if _kthread.pid == pid: + kthread = _kthread + if kthread is None: + print(M.warn("cannot find kernel task")) + return + if verbose: + indent.print(kthread) + if set_pid: + mm = kthread.mm + if not mm: + print(M.warn("current kernel task not set.")) + return + KCURRENT_PID = pid + KCURRENT_PGD = int(mm["pgd"]) diff --git a/pwndbg/commands/ktask.py b/pwndbg/commands/ktask.py index 08edabb37..23154af81 100644 --- a/pwndbg/commands/ktask.py +++ b/pwndbg/commands/ktask.py @@ -6,82 +6,116 @@ and prints details about each task, including its address, PID, user space statu from __future__ import annotations import argparse +from typing import Tuple +import pwndbg.color as C import pwndbg.color.message as message import pwndbg.commands -from pwndbg.aglib.kernel.macros import container_of +import pwndbg.lib +from pwndbg.aglib.kernel.macros import for_each_entry +from pwndbg.lib.exception import IndentContextManager parser = argparse.ArgumentParser(description="Displays information about kernel tasks.") - parser.add_argument("task_name", nargs="?", type=str, help="A task name to search for") - -@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL) -@pwndbg.commands.OnlyWhenQemuKernel -@pwndbg.commands.OnlyWhenPagingEnabled -@pwndbg.commands.OnlyWithKernelDebugInfo -def ktask(task_name=None) -> None: - print(f"{'Address':>18} {'PID':>6} {'User':>4} {'CPU':>4} {'UID':>4} {'GID':>4} {'Name'}") - +indent = IndentContextManager() + + +class Kthread: + def __init__(self, thread: pwndbg.dbg_mod.Value): + self.thread = thread + self.name = thread["comm"].string() + self.pid = int(thread["pid"]) + self.has_user_page = int(thread["mm"]) != 0 + krelease = pwndbg.aglib.kernel.krelease() + if krelease is None or "CONFIG_THREAD_INFO_IN_TASK" not in pwndbg.aglib.kernel.kconfig(): + self.cpu = "-" + elif krelease < (5, 16): + self.cpu = int(thread["cpu"]) + else: + self.cpu = int(thread["thread_info"]["cpu"]) + self.uid = int(thread["real_cred"]["uid"]["val"]) + self.gid = int(thread["real_cred"]["gid"]["val"]) + + @pwndbg.lib.cache.cache_until("stop") + def files(self): + fdt = self.thread["files"]["fdt"] + fds = fdt["fd"] + files = [] + for i in range(int(fdt["max_fds"])): + file = fds[i] + addr = int(file) + if addr == 0: + continue + files.append((i, file)) + return tuple(files) + + @property + def mm(self): + mm = self.thread["mm"] + if int(mm) != 0: + return mm + # for anonymous tasks + mm = self.thread["active_mm"] + if int(mm) != 0: + return mm + return None + + def __str__(self): + thread = C.blue(hex(int(self.thread))) + prefix = f"[pid {self.pid}]" + desc = " " + prefix = C.blue(f"{prefix:<9}") + f"task @ {thread}: {self.name:<16}" + user = ", has user pages" if self.has_user_page else "" + desc = C.red(f"cpu #{self.cpu} (uid: {self.uid}, gid: {self.gid}{user})") + return f"{prefix} {desc}" + + +class Ktask: + def __init__(self, task: pwndbg.dbg_mod.Value): + self.task = task + threads = [] + signal = task["signal"] + # Iterate through all threads in the task_struct's thread list. + for thread in for_each_entry(signal["thread_head"], "struct task_struct", "thread_node"): + kthread = Kthread(thread) + threads.append(kthread) + self.threads = threads + + +@pwndbg.lib.cache.cache_until("stop") +def get_ktasks() -> Tuple[Ktask, ...]: + tasks = [] # Look up the init_task symbol, which is the first task in the kernel's task list. - init_task = pwndbg.aglib.symbol.lookup_symbol_addr("init_task") + init_task = pwndbg.aglib.symbol.lookup_symbol("init_task") if init_task is None: print( "The init_task symbol was not found. This may indicate that the symbol is not available in the current build." ) - return - - curr_task = init_task + return None try: + tasks.append(Ktask(init_task)) # The task list is implemented a circular doubly linked list, so we traverse starting from init_task. - while True: - task_struct = pwndbg.aglib.memory.get_typed_pointer_value( - "struct task_struct", curr_task - ) - thread_head = task_struct["signal"]["thread_head"] - - curr_thread = thread_head["next"] - - # Iterate through all threads in the task_struct's thread list. - while True: - if int(thread_head.address) == int(curr_thread): - break - - thread = container_of(int(curr_thread), "struct task_struct", "thread_node") - - task_struct2 = pwndbg.aglib.memory.get_typed_pointer_value( - "struct task_struct", thread - ) - - comm = task_struct2["comm"].string() - - # Print task information if no specific task name is provided or if the current task matches the provided name. - if not task_name or task_name in comm: - curr_task_hex = hex(curr_task) - pid = int(task_struct2["pid"]) - user = "✓" if int(task_struct2["mm"]) != 0 else "✗" - cpu = int(task_struct2["thread_info"]["cpu"]) - - # Get UID and GID from the credentials structure - uid = int(task_struct2["real_cred"]["uid"]["val"]) - gid = int(task_struct2["real_cred"]["gid"]["val"]) - - print( - f"{curr_task_hex:>18} {pid:>6} {user:>4} {cpu:>4} {uid:>6} {gid:>6} {comm:<7}" - ) - - curr_thread = curr_thread["next"] - - next_task = container_of( - int(task_struct["tasks"]["next"]), "struct task_struct", "tasks" - ) - - if int(next_task) == init_task: - break - - curr_task = int(next_task) - + for task in for_each_entry(init_task["tasks"], "struct task_struct", "tasks"): + ktask = Ktask(task) + tasks.append(ktask) except pwndbg.dbg_mod.Error as e: print(message.error(f"ERROR: {e}")) - return + return None + return tuple(tasks) + + +@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL) +@pwndbg.commands.OnlyWhenQemuKernel +@pwndbg.commands.OnlyWhenPagingEnabled +@pwndbg.commands.OnlyWithKernelDebugInfo +def ktask(task_name=None) -> None: + threads = [] + for task in get_ktasks(): + for thread in task.threads: + if task_name is not None and task_name not in thread.name: + continue + threads.append(thread) + for thread in threads: + indent.print(thread) diff --git a/pwndbg/commands/paging.py b/pwndbg/commands/paging.py index 5cea39cef..2471c34c9 100644 --- a/pwndbg/commands/paging.py +++ b/pwndbg/commands/paging.py @@ -90,6 +90,10 @@ def page_info(page): def pagewalk(vaddr, entry=None): if entry is not None: entry = int(pwndbg.dbg.selected_frame().evaluate_expression(entry)) + else: + # did the user set pgd with kcurrent? + # safe because pagewalk fallbacks to control regs when entry==None + entry = pwndbg.commands.kcurrent.KCURRENT_PGD vaddr = int(pwndbg.dbg.selected_frame().evaluate_expression(vaddr)) levels = pwndbg.aglib.kernel.pagewalk(vaddr, entry) for i in range(len(levels) - 1, 0, -1): diff --git a/pwndbg/lib/kernel/kconfig.py b/pwndbg/lib/kernel/kconfig.py index c98570f55..93142cea2 100644 --- a/pwndbg/lib/kernel/kconfig.py +++ b/pwndbg/lib/kernel/kconfig.py @@ -66,6 +66,8 @@ class Kconfig(UserDict): # type: ignore[type-arg] self.data["CONFIG_DEBUG_FS"] = "y" if self.CONFIG_SECURITY: self.data["CONFIG_SECURITY"] = "y" + if self.CONFIG_THREAD_INFO_IN_TASK: + self.data["CONFIG_THREAD_INFO_IN_TASK"] = "y" def get_key(self, name: str) -> str | None: # First attempt to lookup the value assuming the user passed in a name @@ -183,6 +185,10 @@ class Kconfig(UserDict): # type: ignore[type-arg] def CONFIG_SECURITY(self) -> bool: return pwndbg.aglib.symbol.lookup_symbol("security_inode_init_security") is not None + @property + def CONFIG_THREAD_INFO_IN_TASK(self) -> bool: + return pwndbg.aglib.symbol.lookup_symbol("put_task_stack") is not None + def update_with_file(self, file_path): for line in open(file_path, "r").read().splitlines(): split = line.split("=") diff --git a/tests/library/qemu_system/tests/test_commands_kernel.py b/tests/library/qemu_system/tests/test_commands_kernel.py index f70bd6f97..293d4e4a6 100644 --- a/tests/library/qemu_system/tests/test_commands_kernel.py +++ b/tests/library/qemu_system/tests/test_commands_kernel.py @@ -73,7 +73,11 @@ def test_command_ktask(): assert "may only be run when debugging a Linux kernel with debug" in res return res = gdb.execute("ktask", to_string=True) - assert "Address" in res + assert "task @" in res + res = gdb.execute("kcurrent --set", to_string=True) + assert "task @" in res + res2 = gdb.execute("kfile", to_string=True) + assert res in res2 def test_command_kversion():