* added options to specify the num of lines to disass and heuristics for map/prog_idr

* partial recovery for structs relevant to bpf

* added kbpf command

* added array offset recovery

* added verbose option

* added disass support

* refactored

* added flags

* docs

* typos
pull/3360/head
jxuanli 2 months ago committed by GitHub
parent 2318adae78
commit 4008901e66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -76,6 +76,7 @@
- [binder](kernel/binder.md) - Show Android Binder information
- [buddydump](kernel/buddydump.md) - Displays metadata and freelists of the buddy allocator.
- [kbase](kernel/kbase.md) - Finds the kernel virtual base address.
- [kbpf](kernel/kbpf.md) - Prints information about the linux kernel bpf progs and maps.
- [kchecksec](kernel/kchecksec.md) - Checks for kernel hardening configuration options.
- [kcmdline](kernel/kcmdline.md) - Return the kernel commandline (/proc/cmdline).
- [kconfig](kernel/kconfig.md) - Outputs the kernel config.

@ -0,0 +1,20 @@
<!-- THIS PART OF THIS FILE IS AUTOGENERATED. DO NOT MODIFY IT. See scripts/generate-docs.sh -->
# kbpf
```text
usage: kbpf [-h] [-v] [-p] [-m]
```
Prints information about the linux kernel bpf progs and maps.
### Optional arguments
|Short|Long|Help|
| :--- | :--- | :--- |
|-h|--help|show this help message and exit|
|-v|--verbose| (default: 0)|
|-p|--progs||
|-m|--maps||
<!-- END OF AUTOGENERATED PART. Do not modify this line or the line below, they mark the end of the auto-generated part of the file. If you want to extend the documentation in a way which cannot easily be done by adding to the command help description, write below the following line. -->
<!-- ------------\>8---- ----\>8---- ----\>8------------ -->

@ -679,3 +679,15 @@ def db_list() -> pwndbg.dbg_mod.Value:
if (syms := arch_symbols()) is not None:
return syms.db_list()
return None
def prog_idr() -> pwndbg.dbg_mod.Value:
if (syms := arch_symbols()) is not None:
return syms.prog_idr()
return None
def map_idr() -> pwndbg.dbg_mod.Value:
if (syms := arch_symbols()) is not None:
return syms.map_idr()
return None

@ -0,0 +1,266 @@
from __future__ import annotations
import pwndbg
import pwndbg.aglib.kernel.symbol
import pwndbg.aglib.memory
import pwndbg.aglib.symbol
import pwndbg.aglib.typeinfo
import pwndbg.color.message as M
def get_struct_bpf_prog():
result = f"#define KVERSION {pwndbg.aglib.kernel.symbol.kversion_cint()}\n"
result += """
/* the enum types (probably) have been added to the kernel in decending order */
enum bpf_prog_type {
BPF_PROG_TYPE_UNSPEC,
BPF_PROG_TYPE_SOCKET_FILTER,
BPF_PROG_TYPE_KPROBE,
BPF_PROG_TYPE_SCHED_CLS,
BPF_PROG_TYPE_SCHED_ACT,
BPF_PROG_TYPE_TRACEPOINT,
BPF_PROG_TYPE_XDP,
BPF_PROG_TYPE_PERF_EVENT,
BPF_PROG_TYPE_CGROUP_SKB,
BPF_PROG_TYPE_CGROUP_SOCK,
BPF_PROG_TYPE_LWT_IN,
BPF_PROG_TYPE_LWT_OUT,
BPF_PROG_TYPE_LWT_XMIT,
BPF_PROG_TYPE_SOCK_OPS,
BPF_PROG_TYPE_SK_SKB,
BPF_PROG_TYPE_CGROUP_DEVICE,
BPF_PROG_TYPE_SK_MSG,
BPF_PROG_TYPE_RAW_TRACEPOINT,
BPF_PROG_TYPE_CGROUP_SOCK_ADDR,
BPF_PROG_TYPE_LWT_SEG6LOCAL,
BPF_PROG_TYPE_LIRC_MODE2,
BPF_PROG_TYPE_SK_REUSEPORT,
BPF_PROG_TYPE_FLOW_DISSECTOR,
BPF_PROG_TYPE_CGROUP_SYSCTL,
BPF_PROG_TYPE_RAW_TRACEPOINT_WRITABLE,
BPF_PROG_TYPE_CGROUP_SOCKOPT,
BPF_PROG_TYPE_TRACING,
BPF_PROG_TYPE_STRUCT_OPS,
BPF_PROG_TYPE_EXT,
BPF_PROG_TYPE_LSM,
BPF_PROG_TYPE_SK_LOOKUP,
BPF_PROG_TYPE_SYSCALL, /* a program that can execute syscalls */
BPF_PROG_TYPE_NETFILTER,
__MAX_BPF_PROG_TYPE
};
enum bpf_attach_type {
BPF_CGROUP_INET_INGRESS,
BPF_CGROUP_INET_EGRESS,
BPF_CGROUP_INET_SOCK_CREATE,
BPF_CGROUP_SOCK_OPS,
BPF_SK_SKB_STREAM_PARSER,
BPF_SK_SKB_STREAM_VERDICT,
BPF_CGROUP_DEVICE,
BPF_SK_MSG_VERDICT,
BPF_CGROUP_INET4_BIND,
BPF_CGROUP_INET6_BIND,
BPF_CGROUP_INET4_CONNECT,
BPF_CGROUP_INET6_CONNECT,
BPF_CGROUP_INET4_POST_BIND,
BPF_CGROUP_INET6_POST_BIND,
BPF_CGROUP_UDP4_SENDMSG,
BPF_CGROUP_UDP6_SENDMSG,
BPF_LIRC_MODE2,
BPF_FLOW_DISSECTOR,
BPF_CGROUP_SYSCTL,
BPF_CGROUP_UDP4_RECVMSG,
BPF_CGROUP_UDP6_RECVMSG,
BPF_CGROUP_GETSOCKOPT,
BPF_CGROUP_SETSOCKOPT,
BPF_TRACE_RAW_TP,
BPF_TRACE_FENTRY,
BPF_TRACE_FEXIT,
BPF_MODIFY_RETURN,
BPF_LSM_MAC,
BPF_TRACE_ITER,
BPF_CGROUP_INET4_GETPEERNAME,
BPF_CGROUP_INET6_GETPEERNAME,
BPF_CGROUP_INET4_GETSOCKNAME,
BPF_CGROUP_INET6_GETSOCKNAME,
BPF_XDP_DEVMAP,
BPF_CGROUP_INET_SOCK_RELEASE,
BPF_XDP_CPUMAP,
BPF_SK_LOOKUP,
BPF_XDP,
BPF_SK_SKB_VERDICT,
BPF_SK_REUSEPORT_SELECT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE,
BPF_PERF_EVENT,
BPF_TRACE_KPROBE_MULTI,
BPF_LSM_CGROUP,
BPF_STRUCT_OPS,
BPF_NETFILTER,
BPF_TCX_INGRESS,
BPF_TCX_EGRESS,
BPF_TRACE_UPROBE_MULTI,
BPF_CGROUP_UNIX_CONNECT,
BPF_CGROUP_UNIX_SENDMSG,
BPF_CGROUP_UNIX_RECVMSG,
BPF_CGROUP_UNIX_GETPEERNAME,
BPF_CGROUP_UNIX_GETSOCKNAME,
BPF_NETKIT_PRIMARY,
BPF_NETKIT_PEER,
BPF_TRACE_KPROBE_SESSION,
BPF_TRACE_UPROBE_SESSION,
__MAX_BPF_ATTACH_TYPE
};
#define BPF_TAG_SIZE 8 // true for v5.0 - 6.17
struct bpf_prog {
u16 pages; /* Number of allocated pages */
u16 fields; /* bit fields */
enum bpf_prog_type type; /* Type of BPF program */
enum bpf_attach_type expected_attach_type; /* For some prog types */
u32 len; /* Number of filter blocks */
u32 jited_len; /* Size of jited insns in bytes */
u8 tag[BPF_TAG_SIZE];
#if KVERSION >= KERNEL_VERSION(5, 12, 0)
void *stats; // bpf_prog_stats
int *active;
unsigned int (*bpf_func)(void *ctx, void *insn);
#endif
void *aux; /* Auxiliary fields */
void *orig_prog; /* Original BPF program */
#if KVERSION < KERNEL_VERSION(5, 12, 0)
unsigned int (*bpf_func)(void *ctx, void *insn);
#endif
char insns[];
};
"""
return result
def get_struct_bpf_map():
result = ""
if "CONFIG_SECURITY" in pwndbg.aglib.kernel.kconfig():
result += "#define CONFIG_SECURITY\n"
result += """
enum bpf_map_type {
BPF_MAP_TYPE_UNSPEC,
BPF_MAP_TYPE_HASH,
BPF_MAP_TYPE_ARRAY,
BPF_MAP_TYPE_PROG_ARRAY,
BPF_MAP_TYPE_PERF_EVENT_ARRAY,
BPF_MAP_TYPE_PERCPU_HASH,
BPF_MAP_TYPE_PERCPU_ARRAY,
BPF_MAP_TYPE_STACK_TRACE,
BPF_MAP_TYPE_CGROUP_ARRAY,
BPF_MAP_TYPE_LRU_HASH,
BPF_MAP_TYPE_LRU_PERCPU_HASH,
BPF_MAP_TYPE_LPM_TRIE,
BPF_MAP_TYPE_ARRAY_OF_MAPS,
BPF_MAP_TYPE_HASH_OF_MAPS,
BPF_MAP_TYPE_DEVMAP,
BPF_MAP_TYPE_SOCKMAP,
BPF_MAP_TYPE_CPUMAP,
BPF_MAP_TYPE_XSKMAP,
BPF_MAP_TYPE_SOCKHASH,
BPF_MAP_TYPE_CGROUP_STORAGE,
BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
BPF_MAP_TYPE_PERCPU_CGROUP_STORAGE,
BPF_MAP_TYPE_QUEUE,
BPF_MAP_TYPE_STACK,
BPF_MAP_TYPE_SK_STORAGE,
BPF_MAP_TYPE_DEVMAP_HASH,
BPF_MAP_TYPE_STRUCT_OPS,
BPF_MAP_TYPE_RINGBUF,
BPF_MAP_TYPE_INODE_STORAGE,
BPF_MAP_TYPE_TASK_STORAGE,
BPF_MAP_TYPE_BLOOM_FILTER,
BPF_MAP_TYPE_USER_RINGBUF,
BPF_MAP_TYPE_CGRP_STORAGE,
BPF_MAP_TYPE_ARENA,
__MAX_BPF_MAP_TYPE
};
"""
result += """
struct bpf_map {
const void *ops; // struct bpf_map_ops
struct bpf_map *inner_map_meta;
#ifdef CONFIG_SECURITY
void *security;
#endif
enum bpf_map_type map_type;
u32 key_size;
u32 value_size;
u32 max_entries;
// char _pad[{padsz}];
};
struct bpf_array {
struct bpf_map map;
/* ignore the rest of the fields for now */
};
"""
return result
def get_bpf_struct_offsets(prog_idr, map_idr) -> int:
xarray_pad_sz = None
map_idr = int(map_idr)
prog_idr = int(prog_idr)
ptrsize = pwndbg.aglib.arch.ptrsize
max_idr_sz = abs(map_idr - prog_idr)
xa_node = None
for i in range(0, max_idr_sz, ptrsize):
xa_node = pwndbg.aglib.memory.read_pointer_width(prog_idr + i) & ~3 # remove tag
if pwndbg.aglib.memory.is_kernel(xa_node):
xarray_pad_sz = i
if xarray_pad_sz:
return xarray_pad_sz
for i in range(0, max_idr_sz, ptrsize):
xa_node = pwndbg.aglib.memory.read_pointer_width(map_idr + i) & ~3 # remove tag
if pwndbg.aglib.memory.is_kernel(xa_node):
xarray_pad_sz = i
return xarray_pad_sz
def load_bpf_typeinfo():
if pwndbg.aglib.typeinfo.lookup_types("struct bpf_map") is not None:
return
if pwndbg.aglib.kernel.symbol.kversion_cint() is None:
return
prog_idr = pwndbg.aglib.kernel.prog_idr()
map_idr = pwndbg.aglib.kernel.map_idr()
if not prog_idr or not map_idr:
print(M.warn("cannot find either prog_idr or map_idr"))
return
xarray_pad_sz = get_bpf_struct_offsets(prog_idr, map_idr)
if not xarray_pad_sz:
print(M.warn("cannot find xa_head -- might be uninitialized (add a bpf prog/map first!)"))
return
result = pwndbg.aglib.kernel.symbol.COMMON_TYPES
result += f"""
struct xarray {{
char _xarray_pad[{xarray_pad_sz}];
void *xa_head;
}};
"""
result += """
struct idr {
struct xarray idr_rt;
unsigned int idr_base;
unsigned int idr_next;
};
struct xa_node {
unsigned char shift; /* Bits remaining in each slot */
unsigned char offset; /* Slot offset in parent */
unsigned char count; /* Total entry count */
unsigned char nr_values; /* Value entry count */
struct xa_node *parent; /* NULL at top of tree */
struct xarray *array; /* The array we belong to */
union {
struct list_head private_list; /* For tree user */
// struct rcu_head rcu_head; /* Used when freeing node */
};
void *slots[64]; // 16 or 64
/* the rest is not relevant */
};
"""
result += get_struct_bpf_prog()
result += get_struct_bpf_map()
header_file_path = pwndbg.commands.cymbol.create_temp_header_file(result)
pwndbg.commands.cymbol.add_structure_from_header(header_file_path, "bpf_structs", True)

@ -157,7 +157,6 @@ enum pageflags {
PG_unevictable, /* Page is "unevictable" */
PG_dropbehind, /* drop pages on IO completion */
};
"""
@ -272,17 +271,24 @@ class ArchSymbols:
if pwndbg.aglib.kernel.krelease() >= (5, 10)
else "dma_buf_release"
)
self.bpf_prog_heuristic_func = "bpf_prog_free_id"
self.bpf_map_heuristic_func = "bpf_map_free_id"
def disass(self, name):
def disass(self, name, lines=None):
sym = pwndbg.aglib.symbol.lookup_symbol(name)
if sym is None:
return None
disass = "\n".join(pwndbg.aglib.nearpc.nearpc(int(sym)))
disass = "\n".join(pwndbg.aglib.nearpc.nearpc(int(sym), lines=lines))
return pwndbg.color.strip(disass)
def regex(self, s, pattern):
def regex(self, s, pattern, nth):
pattern = re.compile(pattern)
if nth == 0:
return pattern.search(s)
matches = list(pattern.finditer(s))
if nth < len(matches):
return matches[nth]
return None
def node_data(self):
node_data = pwndbg.aglib.symbol.lookup_symbol("node_data")
@ -331,6 +337,22 @@ class ArchSymbols:
db_list = self._db_list()
return pwndbg.aglib.memory.get_typed_pointer("struct list_head", db_list)
def map_idr(self):
map_idr = pwndbg.aglib.symbol.lookup_symbol("map_idr")
if map_idr:
return map_idr
if pwndbg.aglib.kernel.has_debug_symbols(self.bpf_map_heuristic_func):
map_idr = self._map_idr()
return pwndbg.aglib.memory.get_typed_pointer("unsigned long", map_idr)
def prog_idr(self):
prog_idr = pwndbg.aglib.symbol.lookup_symbol("prog_idr")
if prog_idr:
return prog_idr
if pwndbg.aglib.kernel.has_debug_symbols(self.bpf_prog_heuristic_func):
prog_idr = self._prog_idr()
return pwndbg.aglib.memory.get_typed_pointer("unsigned long", prog_idr)
def _node_data(self):
raise NotImplementedError()
@ -346,13 +368,19 @@ class ArchSymbols:
def _db_list(self):
raise NotImplementedError()
def _map_idr(self):
raise NotImplementedError()
def _prog_idr(self):
raise NotImplementedError()
class x86_64Symbols(ArchSymbols):
# mov reg, [... - 0x...]
# the ``-0x...` is a kernel address displayed as a negative number
# returns the first 0x... as an int if exists
def dword_mov_reg_memoff(self, disass):
result = self.regex(disass, r".*?\bmov.*\[.*-.*(0x[0-9a-f]+)\]")
def dword_mov_reg_memoff(self, disass, nth=0):
result = self.regex(disass, r".*?\bmov.*\[.*-.*(0x[0-9a-f]+)\]", nth)
if result is not None:
return (1 << 64) - int(result.group(1), 16)
return None
@ -360,23 +388,24 @@ class x86_64Symbols(ArchSymbols):
# add reg, [... - 0x...]
# the `-0x...`` is a kernel address displayed as a negative number
# returns the first 0x... as an int if exists
def dword_add_reg_memoff(self, disass):
result = self.regex(disass, r".*?\badd.*\[.*-.*(0x[0-9a-f]+)\]")
def dword_add_reg_memoff(self, disass, nth=0):
result = self.regex(disass, r".*?\badd.*\[.*-.*(0x[0-9a-f]+)\]", nth)
if result is not None:
return (1 << 64) - int(result.group(1), 16)
return None
# mov reg, <kernel address as a constant>
def qword_mov_reg_const(self, disass):
result = self.regex(disass, r".*?\bmov.*(0x[0-9a-f]{16})")
def qword_mov_reg_const(self, disass, nth=0):
result = self.regex(disass, r".*?\bmov.*(0x[0-9a-f]{16})", nth)
if result is not None:
return int(result.group(1), 16)
return None
def qword_mov_reg_ripoff(self, disass):
def qword_mov_reg_ripoff(self, disass, nth=0):
result = self.regex(
"".join(disass.splitlines()),
r".*?\bmov.*\[rip\s\+\s(0x[0-9a-f]+)\].*?(0x[0-9a-f]{16})\s\<",
nth,
)
if result is not None:
return int(result.group(1), 16) + int(result.group(2), 16)
@ -415,22 +444,37 @@ class x86_64Symbols(ArchSymbols):
return result - offset
return None
def _map_idr(self):
disass = self.disass(self.bpf_map_heuristic_func, lines=50)
result = self.qword_mov_reg_const(disass, nth=1)
if result is not None:
return result
return self.qword_mov_reg_const(disass)
def _prog_idr(self):
disass = self.disass(self.bpf_prog_heuristic_func, lines=50)
result = self.qword_mov_reg_const(disass, nth=1)
if result is not None:
return result
return self.qword_mov_reg_const(disass)
class Aarch64Symbols(ArchSymbols):
# adrp x?, <kernel address>
# add x?, x?, #0x...
def qword_adrp_add_const(self, disass):
def qword_adrp_add_const(self, disass, nth=0):
prev = ""
for line in disass.splitlines():
if "adrp" in prev and "add" in line:
result = self.regex(prev, r"\,\s*0x([0-9a-f]+)")
if result is None:
return None
result = self.regex(prev, r"\,\s*0x([0-9a-f]+)", nth=0)
tmp = None
if result is not None:
tmp = int(result.group(1), 16)
result = self.regex(line, r"#0x([0-9a-f]+)")
if result is None:
return None
result = self.regex(line, r"#0x([0-9a-f]+)", nth=0)
if result is not None and tmp is not None:
if nth == 0:
return tmp + int(result.group(1), 16)
nth -= 1
prev = line
return None
@ -488,3 +532,17 @@ class Aarch64Symbols(ArchSymbols):
if result is not None:
return result - offset
return None
def _map_idr(self):
disass = self.disass(self.bpf_map_heuristic_func, lines=50)
result = self.qword_adrp_add_const(disass, nth=1)
if result is not None:
return result
return self.qword_adrp_add_const(disass)
def _prog_idr(self):
disass = self.disass(self.bpf_prog_heuristic_func, lines=50)
result = self.qword_adrp_add_const(disass, nth=1)
if result is not None:
return result
return self.qword_adrp_add_const(disass)

@ -920,6 +920,7 @@ def load_commands() -> None:
import pwndbg.commands.integration
import pwndbg.commands.jemalloc
import pwndbg.commands.kbase
import pwndbg.commands.kbpf
import pwndbg.commands.kchecksec
import pwndbg.commands.kcmdline
import pwndbg.commands.kconfig

@ -0,0 +1,241 @@
from __future__ import annotations
import argparse
import math
import re
import capstone
import pwndbg
import pwndbg.aglib.kernel
import pwndbg.aglib.kernel.bpf
import pwndbg.aglib.memory
import pwndbg.color.message as M
import pwndbg.commands
from pwndbg.commands import CommandCategory
from pwndbg.lib.exception import IndentContextManager
parser = argparse.ArgumentParser(
description="Prints information about the linux kernel bpf progs and maps."
)
parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument("-p", "--progs", dest="print_progs", action="store_true", default=False)
parser.add_argument("-m", "--maps", dest="print_maps", action="store_true", default=False)
_bpf_map_array_off = None
MAX_PRINTED_VALUE_SIZE = 0x20
MAX_BPF_VERBOSE_LEVEL1_OUTPUT_LEN = 0x10
BPF_FIRST_REG, BPF_SECOND_REG = 1 << 0, 1 << 1
BPF_AUX_REG_STRING = "ax"
BPF_MAP_ARRAY_TYPES = (
"BPF_MAP_TYPE_ARRAY",
"BPF_MAP_TYPE_PROG_ARRAY",
)
def handle_bpf_aux_reg_for_insns_bytes(insns_bytes):
# https://elixir.bootlin.com/linux/v6.17.1/source/include/linux/filter.h#L62
sz = len(insns_bytes)
result = [0] * (len(insns_bytes) // 8)
for i in range(1, sz, 8):
b = insns_bytes[i]
if b & 0xF == 0xB:
result[i // 8] |= BPF_FIRST_REG
insns_bytes[i] &= ~0xF
if b & 0xF0 == 0xB0:
result[i // 8] |= BPF_SECOND_REG
insns_bytes[i] &= ~0xF0
return result
def handle_bpf_aux_reg_for_opstr(opstr, regflag):
if regflag == 0:
return opstr
pattern = re.compile(r"r0")
matches = list(pattern.finditer(opstr))
if regflag & BPF_FIRST_REG:
start, end = matches[0].span()
opstr = opstr[:start] + BPF_AUX_REG_STRING + opstr[end:]
if regflag & BPF_SECOND_REG:
start, end = matches[-1].span()
opstr = opstr[:start] + BPF_AUX_REG_STRING + opstr[end:]
return opstr
def bpf_map_array_offset(bpf_array, t, max_entries, value_size):
global _bpf_map_array_off
if _bpf_map_array_off:
# pwndbg.lib.cache is not used here because it would also cache None
return _bpf_map_array_off
if t in BPF_MAP_ARRAY_TYPES:
expected_elem_size = math.ceil(value_size / 8) * 8
expected_index_mask = (1 << math.ceil(math.log2(max_entries))) - 1
for i in range(200):
elem_size = pwndbg.aglib.memory.u32(bpf_array + 4 * i)
index_mask = pwndbg.aglib.memory.u32(bpf_array + 4 * (i + 1))
if elem_size == expected_elem_size and index_mask == expected_index_mask:
"""
struct bpf_array {
struct bpf_map map;
u32 elem_size; // i points here
u32 index_mask;
struct bpf_array_aux *aux;
union {
DECLARE_FLEX_ARRAY(char, value) __aligned(8);
DECLARE_FLEX_ARRAY(void *, ptrs) __aligned(8);
DECLARE_FLEX_ARRAY(void __percpu *, pptrs) __aligned(8);
};
};
"""
_bpf_map_array_off = (i + 2) * 4 + pwndbg.aglib.arch.ptrsize
break
return _bpf_map_array_off
def parse_xa_node(xa_node):
xa_node = int(xa_node) & ~3
if xa_node == 0 or not pwndbg.aglib.memory.is_kernel(xa_node):
return []
xa_node = pwndbg.aglib.memory.get_typed_pointer("struct xa_node", xa_node)
result = []
shift = int(xa_node["shift"])
count = int(xa_node["count"])
for i in range(64):
slot = int(xa_node["slots"][i])
if slot == 0:
continue
if shift:
result += parse_xa_node(slot)
else:
result.append(slot)
count -= 1
if count == 0:
break
return result
def print_bpf_progs(verbose):
indent = IndentContextManager()
prog_idr = pwndbg.aglib.kernel.prog_idr()
if int(prog_idr) == 0:
print(M.warn("cannot find prog_idr"))
return
prog_idr = pwndbg.aglib.memory.get_typed_pointer("struct idr", prog_idr)
xa_node = prog_idr["idr_rt"]["xa_head"]
indent.print(indent.prefix("bpf progs") + f": prog_idr @ {indent.addr_hex(int(prog_idr))}")
if int(xa_node) == 0:
return
slots = parse_xa_node(xa_node)
with indent:
for idx, slot in enumerate(slots):
bpf_prog = pwndbg.aglib.memory.get_typed_pointer("struct bpf_prog", slot)
t = bpf_prog["type"].value_to_human_readable()
attach_t = bpf_prog["expected_attach_type"].value_to_human_readable()
prefix = indent.prefix(f"[0x{idx:02x}] {indent.addr_hex(slot)}")
indent.print(f"{prefix} (type: {M.success(t)}, attach: {M.success(attach_t)})")
with indent:
func = int(bpf_prog["bpf_func"])
aux = int(bpf_prog["aux"])
jited_len = int(bpf_prog["jited_len"])
desc = f"func @ {indent.aux_hex(func)} (jited_len: {indent.aux_hex(jited_len)}), aux @ {indent.aux_hex(aux)}"
indent.print(desc)
if verbose > 0:
cs = capstone.Cs(
capstone.CS_ARCH_BPF,
capstone.CS_MODE_LITTLE_ENDIAN | capstone.CS_MODE_BPF_EXTENDED,
)
num_insns = int(bpf_prog["len"])
insns = int(bpf_prog["insns"].address)
insns_bytes = pwndbg.aglib.memory.read(insns, num_insns * 8)
aux_regs = handle_bpf_aux_reg_for_insns_bytes(insns_bytes)
with indent:
indent.print(indent.prefix(f"{num_insns} insns") + ":")
for i in range(num_insns):
if i == MAX_BPF_VERBOSE_LEVEL1_OUTPUT_LEN and verbose == 1:
indent.print("... (truncated)")
indent.print(
M.warn("max output len reached, use -vv for full output")
)
break
off = i * 8
address = insns + off
disass = list(
cs.disasm(bytes(insns_bytes[off : off + 8]), insns + address)
)
if len(disass) == 0:
bytecode = ""
for b in insns_bytes[off : off + 8]:
bytecode += f"{b:02x} "
desc = M.error(f"invalid insn: {bytecode}")
indent.print(f"{indent.addr_hex(address)}\t{desc}")
continue
insn = disass[0]
mnemonic = insn.mnemonic
opstr = insn.op_str
opstr = handle_bpf_aux_reg_for_opstr(opstr, aux_regs[i])
indent.print(f"{indent.addr_hex(address)}\t{mnemonic}\t{opstr}")
def print_bpf_maps(verbose):
indent = IndentContextManager()
map_idr = pwndbg.aglib.kernel.map_idr()
if int(map_idr) == 0:
print(M.warn("cannot find map_idr"))
return
map_idr = pwndbg.aglib.memory.get_typed_pointer("struct idr", map_idr)
xa_node = map_idr["idr_rt"]["xa_head"]
if int(xa_node) == 0:
return
indent.print(indent.prefix("bpf maps") + f": map_idr @ {indent.addr_hex(int(map_idr))}")
slots = parse_xa_node(xa_node)
with indent:
for idx, slot in enumerate(slots):
bpf_array = pwndbg.aglib.memory.get_typed_pointer("struct bpf_array", slot)
prefix = indent.prefix(f"[0x{idx:02x}] {indent.addr_hex(slot)}")
t = bpf_array["map"]["map_type"].value_to_human_readable()
indent.print(f"{prefix} (type: {M.success(t)})")
with indent:
key_size = int(bpf_array["map"]["key_size"])
value_size = int(bpf_array["map"]["value_size"])
max_entries = int(bpf_array["map"]["max_entries"])
bpf_array = int(bpf_array)
off = bpf_map_array_offset(bpf_array, t, max_entries, value_size)
content = indent.aux_hex(bpf_array + off) if off else "unknown"
desc = f"array @ {content} (key_size: {indent.aux_hex(key_size)}, value_size: {indent.aux_hex(value_size)}, max_entries: {indent.aux_hex(max_entries)})"
indent.print(desc)
# TODO: what about types other than array
if off is not None and verbose > 0 and t in BPF_MAP_ARRAY_TYPES:
with indent:
entrysz = math.ceil(value_size / 8) * 8
for i in range(max_entries):
if i == MAX_BPF_VERBOSE_LEVEL1_OUTPUT_LEN and verbose == 1:
indent.print("... (truncated)")
indent.print(
M.warn("max output len reached, use -vv for full output")
)
break
idxfmt = f"[0x{i:02x}]"
sz = min(value_size, MAX_PRINTED_VALUE_SIZE)
value = ""
for b in pwndbg.aglib.memory.read(bpf_array + off + i * entrysz, sz):
value += f"{b:02x} "
if sz < value_size:
value += "... (" + M.warn("truncated") + ")"
indent.print(f"- {indent.prefix(idxfmt)} {value}")
@pwndbg.commands.Command(parser, category=CommandCategory.KERNEL)
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWithKernelDebugSymbols
@pwndbg.commands.OnlyWhenPagingEnabled
def kbpf(verbose: int, print_progs: bool, print_maps: bool):
if not pwndbg.aglib.kernel.has_debug_info():
pwndbg.aglib.kernel.bpf.load_bpf_typeinfo()
if pwndbg.aglib.typeinfo.load("struct idr") is None:
return
if not print_progs and not print_maps:
print_progs = print_maps = True
if print_progs:
print_bpf_progs(verbose)
if print_maps:
print_bpf_maps(verbose)

@ -64,6 +64,8 @@ class Kconfig(UserDict): # type: ignore[type-arg]
self.data["CONFIG_SYSFS"] = "y"
if self.CONFIG_DEBUG_FS:
self.data["CONFIG_DEBUG_FS"] = "y"
if self.CONFIG_SECURITY:
self.data["CONFIG_SECURITY"] = "y"
def get_key(self, name: str) -> str | None:
# First attempt to lookup the value assuming the user passed in a name
@ -177,6 +179,10 @@ class Kconfig(UserDict): # type: ignore[type-arg]
def CONFIG_DEBUG_FS(self) -> bool:
return pwndbg.aglib.symbol.lookup_symbol("debugfs_attr_read") is not None
@property
def CONFIG_SECURITY(self) -> bool:
return pwndbg.aglib.symbol.lookup_symbol("security_inode_init_security") is not None
def update_with_file(self, file_path):
for line in open(file_path, "r").read().splitlines():
split = line.split("=")

Loading…
Cancel
Save