Partial support for simultaneously debugging different kernel tasks (#3370)

* refactor

* refactored

* added files

* exported kfile

* kcurrent

* docs

* pagewalk

* cleaning up

* config detection

* tests

* improved arch symbol handlign

* cleaning up

* docs

* fix typo
pull/3364/head
jxuanli 1 month ago committed by GitHub
parent f13cd4654b
commit 217590668b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -80,8 +80,10 @@
- [kchecksec](kernel/kchecksec.md) - Checks for kernel hardening configuration options.
- [kcmdline](kernel/kcmdline.md) - Return the kernel commandline (/proc/cmdline).
- [kconfig](kernel/kconfig.md) - Outputs the kernel config.
- [kcurrent](kernel/kcurrent.md) - Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == None
- [kdmabuf](kernel/kdmabuf.md) - Prints DMA buf info
- [kdmesg](kernel/kdmesg.md) - Displays the kernel ring buffer (dmesg) contents.
- [kfile](kernel/kfile.md) - Displays information about fds accessible by a kernel task.
- [klookup](kernel/klookup.md) - Lookup kernel symbols
- [kmod](kernel/kmod.md) - Displays the loaded Linux kernel modules.
- [knft-dump](kernel/knft-dump.md) - Dump all nftables: tables, chains, rules, expressions

@ -0,0 +1,25 @@
<!-- THIS PART OF THIS FILE IS AUTOGENERATED. DO NOT MODIFY IT. See scripts/generate-docs.sh -->
# kcurrent
```text
usage: kcurrent [-h] [--set] [pid]
```
Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == None
Displays the task with pid if pid != None.
### Positional arguments
|Positional Argument|Help|
| :--- | :--- |
|pid||
### Optional arguments
|Short|Long|Help|
| :--- | :--- | :--- |
|-h|--help|show this help message and exit|
||--set|sets the kernel task used for supported pwndbg commands (kfile, pagewalk), this option does not change internal mem (purely effects how certain commands behaves)|
<!-- END OF AUTOGENERATED PART. Do not modify this line or the line below, they mark the end of the auto-generated part of the file. If you want to extend the documentation in a way which cannot easily be done by adding to the command help description, write below the following line. -->
<!-- ------------\>8---- ----\>8---- ----\>8------------ -->

@ -0,0 +1,24 @@
<!-- THIS PART OF THIS FILE IS AUTOGENERATED. DO NOT MODIFY IT. See scripts/generate-docs.sh -->
# kfile
```text
usage: kfile [-h] [--fd [FD]] [pid]
```
Displays information about fds accessible by a kernel task.
### Positional arguments
|Positional Argument|Help|
| :--- | :--- |
|pid||
### Optional arguments
|Short|Long|Help|
| :--- | :--- | :--- |
|-h|--help|show this help message and exit|
||--fd||
<!-- END OF AUTOGENERATED PART. Do not modify this line or the line below, they mark the end of the auto-generated part of the file. If you want to extend the documentation in a way which cannot easily be done by adding to the command help description, write below the following line. -->
<!-- ------------\>8---- ----\>8---- ----\>8------------ -->

@ -691,3 +691,9 @@ def map_idr() -> pwndbg.dbg_mod.Value:
if (syms := arch_symbols()) is not None:
return syms.map_idr()
return None
def current_task() -> pwndbg.dbg_mod.Value:
if (syms := arch_symbols()) is not None:
return syms.current_task()
return None

@ -259,11 +259,7 @@ class x86_64PagingInfo(ArchPagingInfo):
@property
@pwndbg.lib.cache.cache_until("stop")
def paging_level(self) -> int:
# CONFIG_X86_5LEVEL is only a hint -- whether 5lvl paging is used depends on the hardware
# see also: https://www.kernel.org/doc/html/next/x86/x86_64/mm.html
if first_kernel_page_start() < (0xFFF << (4 * 13)):
return 5
return 4
return 4 if (pwndbg.aglib.regs["cr4"] & (1 << 12)) == 0 else 5
@pwndbg.lib.cache.cache_until("stop")
def markers(self) -> Tuple[Tuple[str, int], ...]:

@ -273,6 +273,7 @@ class ArchSymbols:
)
self.bpf_prog_heuristic_func = "bpf_prog_free_id"
self.bpf_map_heuristic_func = "bpf_map_free_id"
self.current_task_heuristic_func = "common_cpu_up"
def disass(self, name, lines=None):
sym = pwndbg.aglib.symbol.lookup_symbol(name)
@ -353,6 +354,21 @@ class ArchSymbols:
prog_idr = self._prog_idr()
return pwndbg.aglib.memory.get_typed_pointer("unsigned long", prog_idr)
def current_task(self):
current_task = pwndbg.aglib.symbol.lookup_symbol("current_task")
if current_task:
current_task = pwndbg.aglib.kernel.per_cpu(current_task)
return current_task.dereference()
if pwndbg.aglib.arch.name == "aarch64":
current_task = self._current_task()
elif pwndbg.aglib.kernel.has_debug_symbols(self.current_task_heuristic_func):
current_task = self._current_task()
if current_task is not None:
current_task = pwndbg.aglib.kernel.per_cpu(current_task)
# current_task is int but needed here to make the linter happy
current_task = pwndbg.aglib.memory.read_pointer_width(int(current_task))
return pwndbg.aglib.memory.get_typed_pointer("unsigned long", current_task)
def _node_data(self):
raise NotImplementedError()
@ -374,29 +390,32 @@ class ArchSymbols:
def _prog_idr(self):
raise NotImplementedError()
def _current_task(self):
raise NotImplementedError()
class x86_64Symbols(ArchSymbols):
# mov reg, [... - 0x...]
# the ``-0x...` is a kernel address displayed as a negative number
# op ... [... +/- (0x...)]
# if negative, the `-0x...`` is a kernel address displayed as a negative number
# returns the first 0x... as an int if exists
def dword_mov_reg_memoff(self, disass, nth=0):
result = self.regex(disass, r".*?\bmov.*\[.*-.*(0x[0-9a-f]+)\]", nth)
def qword_op_reg_memoff(self, disass, op, sign="-", nth=0):
result = self.regex(disass, rf"{op}.*\[.*{re.escape(sign)}\s(0x[0-9a-f]+)\]", nth)
if result is not None:
return (1 << 64) - int(result.group(1), 16)
if sign == "-":
return (1 << 64) - int(result.group(1), 16)
else:
return int(result.group(1), 16)
return None
# add reg, [... - 0x...]
# the `-0x...`` is a kernel address displayed as a negative number
# returns the first 0x... as an int if exists
def dword_add_reg_memoff(self, disass, nth=0):
result = self.regex(disass, r".*?\badd.*\[.*-.*(0x[0-9a-f]+)\]", nth)
# mov reg, <kernel address as a constant>
def qword_mov_reg_const(self, disass, nth=0):
result = self.regex(disass, r"mov.*(0x[0-9a-f]{16})", nth)
if result is not None:
return (1 << 64) - int(result.group(1), 16)
return int(result.group(1), 16)
return None
# mov reg, <kernel address as a constant>
def qword_mov_reg_const(self, disass, nth=0):
result = self.regex(disass, r".*?\bmov.*(0x[0-9a-f]{16})", nth)
def dword_mov_reg_const(self, disass, nth=0):
result = self.regex(disass, r"mov.*(0x[0-9a-f]{1,8})\b(?!\])", nth)
if result is not None:
return int(result.group(1), 16)
return None
@ -413,7 +432,7 @@ class x86_64Symbols(ArchSymbols):
def _node_data(self):
disass = self.disass(self.node_data_heuristic_func)
result = self.dword_mov_reg_memoff(disass)
result = self.qword_op_reg_memoff(disass, op="mov", sign="-")
if result is not None:
return result
return self.qword_mov_reg_const(disass)
@ -424,7 +443,7 @@ class x86_64Symbols(ArchSymbols):
def _per_cpu_offset(self):
disass = self.disass(self.per_cpu_offset_heuristic_func)
result = self.dword_add_reg_memoff(disass)
result = self.qword_op_reg_memoff(disass, op="add", sign="-")
if result is not None:
return result
result = self.qword_mov_reg_const(disass)
@ -458,6 +477,14 @@ class x86_64Symbols(ArchSymbols):
return result
return self.qword_mov_reg_const(disass)
def _current_task(self):
disass = self.disass(self.current_task_heuristic_func)
result = self.dword_mov_reg_const(disass)
if result is not None:
return result
disass = self.disass(self.current_task_heuristic_func, lines=20)
return self.qword_op_reg_memoff(disass, op="mov", sign="+")
class Aarch64Symbols(ArchSymbols):
# adrp x?, <kernel address>
@ -546,3 +573,6 @@ class Aarch64Symbols(ArchSymbols):
if result is not None:
return result
return self.qword_adrp_add_const(disass)
def _current_task(self):
return pwndbg.aglib.regs["sp_el0"]

@ -924,6 +924,7 @@ def load_commands() -> None:
import pwndbg.commands.kchecksec
import pwndbg.commands.kcmdline
import pwndbg.commands.kconfig
import pwndbg.commands.kcurrent
import pwndbg.commands.kdmabuf
import pwndbg.commands.kdmesg
import pwndbg.commands.klookup

@ -0,0 +1,103 @@
from __future__ import annotations
import argparse
import pwndbg.color as C
import pwndbg.color.message as M
import pwndbg.commands
import pwndbg.lib
from pwndbg.lib.exception import IndentContextManager
from pwndbg.lib.regs import BitFlags
indent = IndentContextManager()
fmode_flags = BitFlags([("R", 0), ("W", 1), ("X", 5)])
KCURRENT_PID = None
KCURRENT_PGD = None
parser = argparse.ArgumentParser(
description="Displays information about fds accessible by a kernel task."
)
parser.add_argument("pid", nargs="?", type=int, help="")
parser.add_argument("--fd", nargs="?", type=int, help="")
@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL)
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWhenPagingEnabled
@pwndbg.commands.OnlyWithKernelDebugInfo
def kfile(pid=None, fd=None):
if pid is None:
if KCURRENT_PID is None:
kcurrent(None, set_pid=True, verbose=False)
pid = KCURRENT_PID
if pid is None:
print(M.warn("no pid specified (either specify pid or set with kcurrent)"))
return
indent = IndentContextManager()
threads = []
for task in pwndbg.commands.ktask.get_ktasks():
threads += task.threads
for thread in threads:
if thread.pid != pid:
continue
indent.print(thread)
with indent:
for i, file in thread.files():
if fd is not None and i != fd:
continue
addr = int(file)
ops = int(file["f_op"])
prefix = indent.prefix(f"[fileno {i:03}]")
flags = C.context.format_flags(int(file["f_mode"]), fmode_flags)
desc = f"ops @ {C.red(pwndbg.chain.format(ops, limit=0))}"
indent.print(f"- {prefix} file @ {indent.addr_hex(addr)}: {desc}")
private_data = int(file["private_data"])
with indent:
indent.print(f"private: {indent.addr_hex(private_data)}, fmode: {flags}")
parser = argparse.ArgumentParser(
description="""
Displays the current kernel task debugged by the debugger (gdb/lldb) if pid == None
Displays the task with pid if pid != None.
"""
)
parser.add_argument("pid", nargs="?", type=int, help="")
parser.add_argument(
"--set",
dest="set_pid",
action="store_true",
help="sets the kernel task used for supported pwndbg commands (kfile, pagewalk), this option does not change internal mem (purely effects how certain commands behaves)",
)
@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL)
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWhenPagingEnabled
@pwndbg.commands.OnlyWithKernelDebugInfo
def kcurrent(pid=None, set_pid=False, verbose=True):
global KCURRENT_PID, KCURRENT_PGD
kthread = None
if pid is None:
kcurrent = pwndbg.aglib.kernel.current_task()
kcurrent = pwndbg.aglib.memory.get_typed_pointer("struct task_struct", kcurrent)
if kcurrent:
pid = int(kcurrent["pid"])
if pid is not None:
for task in pwndbg.commands.ktask.get_ktasks():
for _kthread in task.threads:
if _kthread.pid == pid:
kthread = _kthread
if kthread is None:
print(M.warn("cannot find kernel task"))
return
if verbose:
indent.print(kthread)
if set_pid:
mm = kthread.mm
if not mm:
print(M.warn("current kernel task not set."))
return
KCURRENT_PID = pid
KCURRENT_PGD = int(mm["pgd"])

@ -6,82 +6,116 @@ and prints details about each task, including its address, PID, user space statu
from __future__ import annotations
import argparse
from typing import Tuple
import pwndbg.color as C
import pwndbg.color.message as message
import pwndbg.commands
from pwndbg.aglib.kernel.macros import container_of
import pwndbg.lib
from pwndbg.aglib.kernel.macros import for_each_entry
from pwndbg.lib.exception import IndentContextManager
parser = argparse.ArgumentParser(description="Displays information about kernel tasks.")
parser.add_argument("task_name", nargs="?", type=str, help="A task name to search for")
@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL)
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWhenPagingEnabled
@pwndbg.commands.OnlyWithKernelDebugInfo
def ktask(task_name=None) -> None:
print(f"{'Address':>18} {'PID':>6} {'User':>4} {'CPU':>4} {'UID':>4} {'GID':>4} {'Name'}")
indent = IndentContextManager()
class Kthread:
def __init__(self, thread: pwndbg.dbg_mod.Value):
self.thread = thread
self.name = thread["comm"].string()
self.pid = int(thread["pid"])
self.has_user_page = int(thread["mm"]) != 0
krelease = pwndbg.aglib.kernel.krelease()
if krelease is None or "CONFIG_THREAD_INFO_IN_TASK" not in pwndbg.aglib.kernel.kconfig():
self.cpu = "-"
elif krelease < (5, 16):
self.cpu = int(thread["cpu"])
else:
self.cpu = int(thread["thread_info"]["cpu"])
self.uid = int(thread["real_cred"]["uid"]["val"])
self.gid = int(thread["real_cred"]["gid"]["val"])
@pwndbg.lib.cache.cache_until("stop")
def files(self):
fdt = self.thread["files"]["fdt"]
fds = fdt["fd"]
files = []
for i in range(int(fdt["max_fds"])):
file = fds[i]
addr = int(file)
if addr == 0:
continue
files.append((i, file))
return tuple(files)
@property
def mm(self):
mm = self.thread["mm"]
if int(mm) != 0:
return mm
# for anonymous tasks
mm = self.thread["active_mm"]
if int(mm) != 0:
return mm
return None
def __str__(self):
thread = C.blue(hex(int(self.thread)))
prefix = f"[pid {self.pid}]"
desc = " "
prefix = C.blue(f"{prefix:<9}") + f"task @ {thread}: {self.name:<16}"
user = ", has user pages" if self.has_user_page else ""
desc = C.red(f"cpu #{self.cpu} (uid: {self.uid}, gid: {self.gid}{user})")
return f"{prefix} {desc}"
class Ktask:
def __init__(self, task: pwndbg.dbg_mod.Value):
self.task = task
threads = []
signal = task["signal"]
# Iterate through all threads in the task_struct's thread list.
for thread in for_each_entry(signal["thread_head"], "struct task_struct", "thread_node"):
kthread = Kthread(thread)
threads.append(kthread)
self.threads = threads
@pwndbg.lib.cache.cache_until("stop")
def get_ktasks() -> Tuple[Ktask, ...]:
tasks = []
# Look up the init_task symbol, which is the first task in the kernel's task list.
init_task = pwndbg.aglib.symbol.lookup_symbol_addr("init_task")
init_task = pwndbg.aglib.symbol.lookup_symbol("init_task")
if init_task is None:
print(
"The init_task symbol was not found. This may indicate that the symbol is not available in the current build."
)
return
curr_task = init_task
return None
try:
tasks.append(Ktask(init_task))
# The task list is implemented a circular doubly linked list, so we traverse starting from init_task.
while True:
task_struct = pwndbg.aglib.memory.get_typed_pointer_value(
"struct task_struct", curr_task
)
thread_head = task_struct["signal"]["thread_head"]
curr_thread = thread_head["next"]
# Iterate through all threads in the task_struct's thread list.
while True:
if int(thread_head.address) == int(curr_thread):
break
thread = container_of(int(curr_thread), "struct task_struct", "thread_node")
task_struct2 = pwndbg.aglib.memory.get_typed_pointer_value(
"struct task_struct", thread
)
comm = task_struct2["comm"].string()
# Print task information if no specific task name is provided or if the current task matches the provided name.
if not task_name or task_name in comm:
curr_task_hex = hex(curr_task)
pid = int(task_struct2["pid"])
user = "" if int(task_struct2["mm"]) != 0 else ""
cpu = int(task_struct2["thread_info"]["cpu"])
# Get UID and GID from the credentials structure
uid = int(task_struct2["real_cred"]["uid"]["val"])
gid = int(task_struct2["real_cred"]["gid"]["val"])
print(
f"{curr_task_hex:>18} {pid:>6} {user:>4} {cpu:>4} {uid:>6} {gid:>6} {comm:<7}"
)
curr_thread = curr_thread["next"]
next_task = container_of(
int(task_struct["tasks"]["next"]), "struct task_struct", "tasks"
)
if int(next_task) == init_task:
break
curr_task = int(next_task)
for task in for_each_entry(init_task["tasks"], "struct task_struct", "tasks"):
ktask = Ktask(task)
tasks.append(ktask)
except pwndbg.dbg_mod.Error as e:
print(message.error(f"ERROR: {e}"))
return
return None
return tuple(tasks)
@pwndbg.commands.Command(parser, category=pwndbg.commands.CommandCategory.KERNEL)
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWhenPagingEnabled
@pwndbg.commands.OnlyWithKernelDebugInfo
def ktask(task_name=None) -> None:
threads = []
for task in get_ktasks():
for thread in task.threads:
if task_name is not None and task_name not in thread.name:
continue
threads.append(thread)
for thread in threads:
indent.print(thread)

@ -90,6 +90,10 @@ def page_info(page):
def pagewalk(vaddr, entry=None):
if entry is not None:
entry = int(pwndbg.dbg.selected_frame().evaluate_expression(entry))
else:
# did the user set pgd with kcurrent?
# safe because pagewalk fallbacks to control regs when entry==None
entry = pwndbg.commands.kcurrent.KCURRENT_PGD
vaddr = int(pwndbg.dbg.selected_frame().evaluate_expression(vaddr))
levels = pwndbg.aglib.kernel.pagewalk(vaddr, entry)
for i in range(len(levels) - 1, 0, -1):

@ -66,6 +66,8 @@ class Kconfig(UserDict): # type: ignore[type-arg]
self.data["CONFIG_DEBUG_FS"] = "y"
if self.CONFIG_SECURITY:
self.data["CONFIG_SECURITY"] = "y"
if self.CONFIG_THREAD_INFO_IN_TASK:
self.data["CONFIG_THREAD_INFO_IN_TASK"] = "y"
def get_key(self, name: str) -> str | None:
# First attempt to lookup the value assuming the user passed in a name
@ -183,6 +185,10 @@ class Kconfig(UserDict): # type: ignore[type-arg]
def CONFIG_SECURITY(self) -> bool:
return pwndbg.aglib.symbol.lookup_symbol("security_inode_init_security") is not None
@property
def CONFIG_THREAD_INFO_IN_TASK(self) -> bool:
return pwndbg.aglib.symbol.lookup_symbol("put_task_stack") is not None
def update_with_file(self, file_path):
for line in open(file_path, "r").read().splitlines():
split = line.split("=")

@ -73,7 +73,11 @@ def test_command_ktask():
assert "may only be run when debugging a Linux kernel with debug" in res
return
res = gdb.execute("ktask", to_string=True)
assert "Address" in res
assert "task @" in res
res = gdb.execute("kcurrent --set", to_string=True)
assert "task @" in res
res2 = gdb.execute("kfile", to_string=True)
assert res in res2
def test_command_kversion():

Loading…
Cancel
Save