Replace pwnlib.asm.asm with pwndbg.lib.zig.asm (#3207)

* Replace pwnlib.asm.asm with pwndbg.lib.zig.asm

* fix search

* move unit-tests to ci

* include pwnlib

* fix test

* fix docs

* fix comment

* fix import

* fixy
pull/3217/head
patryk4815 4 months ago committed by GitHub
parent ca0b86d04c
commit f9f90d9f02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -20,7 +20,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-24.04] os: [ubuntu-24.04]
type: [qemu-user-tests, qemu-system-tests, tests] type: [qemu-user-tests, qemu-system-tests, tests, unit-tests]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
timeout-minutes: 40 timeout-minutes: 40
env: env:
@ -47,6 +47,11 @@ jobs:
run: | run: |
sudo sysctl -w kernel.yama.ptrace_scope=0 sudo sysctl -w kernel.yama.ptrace_scope=0
- name: Run tests
if: matrix.type == 'unit-tests'
run: |
./unit-tests.sh
- name: Run tests - name: Run tests
if: matrix.type == 'tests' if: matrix.type == 'tests'
run: | run: |
@ -111,7 +116,6 @@ jobs:
run: | run: |
sudo sysctl -w kernel.yama.ptrace_scope=0 sudo sysctl -w kernel.yama.ptrace_scope=0
./tests.sh -d gdb -g gdb ./tests.sh -d gdb -g gdb
./unit-tests.sh
qemu-user-tests: qemu-user-tests:
runs-on: [ubuntu-24.04] runs-on: [ubuntu-24.04]

@ -3,10 +3,9 @@
```text ```text
usage: search [-h] [-t {byte,short,word,dword,qword,pointer,string,bytes,asm}] usage: search [-h] [-t {byte,short,word,dword,qword,pointer,string,bytes,asm}]
[-1] [-2] [-4] [-8] [-p] [--asm] [-1] [-2] [-4] [-8] [-p] [--asm] [--asmbp] [-x] [-e] [-w]
[--arch {powerpc64,aarch64,powerpc,riscv32,riscv64,sparc64,mips64,msp430,alpha,amd64,sparc,thumb,cris,i386,ia64,m68k,mips,s390,none,avr,arm,vax}] [-s STEP] [-l LIMIT] [-a ALIGNED] [--save] [--no-save] [-n]
[--asmbp] [-x] [-e] [-w] [-s STEP] [-l LIMIT] [-a ALIGNED] [--trunc-out]
[--save] [--no-save] [-n] [--trunc-out]
value [mapping_name] value [mapping_name]
``` ```
@ -33,7 +32,6 @@ By default search results are cached. If you want to cache all results, but only
|-8|--qword|Search for an 8-byte integer| |-8|--qword|Search for an 8-byte integer|
|-p|--pointer|Search for a pointer-width integer| |-p|--pointer|Search for a pointer-width integer|
||--asm|Search for an assembly instruction| ||--asm|Search for an assembly instruction|
||--arch|Target architecture|
||--asmbp|Set breakpoint for found assembly instruction| ||--asmbp|Set breakpoint for found assembly instruction|
|-x|--hex|Target is a hex-encoded (for bytes/strings)| |-x|--hex|Target is a hex-encoded (for bytes/strings)|
|-e|--executable|Search executable segments only| |-e|--executable|Search executable segments only|

@ -0,0 +1,26 @@
from __future__ import annotations
import pathlib
from typing import List
import pwnlib.context
import pwnlib.data
import pwndbg.aglib.arch
import pwndbg.lib.zig
def _get_pwntools_includes() -> List[pathlib.Path]:
include = (
pathlib.Path(pwnlib.data.path)
/ "includes"
/ str(pwnlib.context.context.os)
/ f"{pwnlib.context.context.arch}.h"
)
if not include.exists():
return []
return [include]
def asm(data: str) -> bytes:
return pwndbg.lib.zig.asm(pwndbg.aglib.arch, data, includes=_get_pwntools_includes())

@ -12,11 +12,11 @@ import contextlib
from asyncio import CancelledError from asyncio import CancelledError
from typing import Iterator from typing import Iterator
import pwnlib.asm
import pwnlib.shellcraft import pwnlib.shellcraft
import pwndbg import pwndbg
import pwndbg.aglib.arch import pwndbg.aglib.arch
import pwndbg.aglib.asm
import pwndbg.aglib.memory import pwndbg.aglib.memory
import pwndbg.aglib.regs import pwndbg.aglib.regs
import pwndbg.aglib.vmmap import pwndbg.aglib.vmmap
@ -51,7 +51,7 @@ async def exec_syscall(
# Build machine code that runs the requested syscall. # Build machine code that runs the requested syscall.
syscall_asm = pwnlib.shellcraft.syscall(syscall, arg0, arg1, arg2, arg3, arg4, arg5) syscall_asm = pwnlib.shellcraft.syscall(syscall, arg0, arg1, arg2, arg3, arg4, arg5)
syscall_bin = pwnlib.asm.asm(syscall_asm) syscall_bin = pwndbg.aglib.asm.asm(syscall_asm)
# Run the syscall and pass its return value onward to the caller. # Run the syscall and pass its return value onward to the caller.
async with exec_shellcode( async with exec_shellcode(

@ -4,8 +4,7 @@ import argparse
from typing import Optional from typing import Optional
from typing import Tuple from typing import Tuple
import pwnlib.asm import pwndbg.aglib.asm
import pwndbg.aglib.file import pwndbg.aglib.file
from pwndbg.commands import CommandCategory from pwndbg.commands import CommandCategory
@ -58,7 +57,7 @@ def parse_range(msr_range: str, arch: str) -> Optional[Tuple[int, int]]:
def x86_msr_read(msr: int) -> None: def x86_msr_read(msr: int) -> None:
async def ctrl(ec: pwndbg.dbg_mod.ExecutionController): async def ctrl(ec: pwndbg.dbg_mod.ExecutionController):
sc = pwnlib.asm.asm(f"mov ecx, {msr}; rdmsr") sc = pwndbg.aglib.asm.asm(f"mov ecx, {msr}; rdmsr")
async with pwndbg.aglib.shellcode.exec_shellcode(ec, sc): async with pwndbg.aglib.shellcode.exec_shellcode(ec, sc):
edx = int(pwndbg.aglib.regs["edx"]) << 32 edx = int(pwndbg.aglib.regs["edx"]) << 32
eax = int(pwndbg.aglib.regs["eax"]) eax = int(pwndbg.aglib.regs["eax"])
@ -72,7 +71,7 @@ def x86_msr_write(msr: int, write_value: int) -> None:
async def ctrl(ec: pwndbg.dbg_mod.ExecutionController): async def ctrl(ec: pwndbg.dbg_mod.ExecutionController):
eax = write_value & 0xFFFFFFFF eax = write_value & 0xFFFFFFFF
edx = write_value >> 32 edx = write_value >> 32
sc = pwnlib.asm.asm(f"mov ecx, {msr}; mov eax, {eax}; mov edx, {edx}; wrmsr") sc = pwndbg.aglib.asm.asm(f"mov ecx, {msr}; mov eax, {eax}; mov edx, {edx}; wrmsr")
async with pwndbg.aglib.shellcode.exec_shellcode(ec, sc): async with pwndbg.aglib.shellcode.exec_shellcode(ec, sc):
return return

@ -7,10 +7,9 @@ import os
import struct import struct
from typing import Set from typing import Set
import pwnlib
import pwndbg import pwndbg
import pwndbg.aglib.arch import pwndbg.aglib.arch
import pwndbg.aglib.asm
import pwndbg.aglib.disasm.disassembly import pwndbg.aglib.disasm.disassembly
import pwndbg.aglib.vmmap import pwndbg.aglib.vmmap
import pwndbg.color.memory as M import pwndbg.color.memory as M
@ -115,12 +114,6 @@ parser.add_argument(
const="asm", const="asm",
help="Search for an assembly instruction", help="Search for an assembly instruction",
) )
parser.add_argument(
"--arch",
choices=pwnlib.context.context.architectures.keys(),
type=str,
help="Target architecture",
)
parser.add_argument( parser.add_argument(
"--asmbp", action="store_true", help="Set breakpoint for found assembly instruction" "--asmbp", action="store_true", help="Set breakpoint for found assembly instruction"
) )
@ -180,7 +173,6 @@ parser.add_argument(
@pwndbg.commands.OnlyWhenRunning @pwndbg.commands.OnlyWhenRunning
def search( def search(
type, type,
arch,
asmbp, asmbp,
hex, hex,
executable, executable,
@ -202,9 +194,6 @@ def search(
next = False next = False
save = True save = True
if not arch:
arch = pwnlib.context.context.arch
# Initialize is_pointer to track whether the search type is a pointer # Initialize is_pointer to track whether the search type is a pointer
is_pointer = None is_pointer = None
# Adjust pointer sizes to the local architecture # Adjust pointer sizes to the local architecture
@ -254,8 +243,7 @@ def search(
value += b"\x00" value += b"\x00"
elif type == "asm" or asmbp: elif type == "asm" or asmbp:
bits_for_arch = pwnlib.context.context.architectures.get(arch, {}).get("bits") value = pwndbg.aglib.asm.asm(value)
value = pwnlib.asm.asm(value, arch=arch, bits=bits_for_arch)
# `pwndbg.search.search` expects a `bytes` object for its pattern. Convert the string pattern we # `pwndbg.search.search` expects a `bytes` object for its pattern. Convert the string pattern we
# were given to a bytes object by encoding it as an UTF-8 byte sequence. This matches the behavior # were given to a bytes object by encoding it as an UTF-8 byte sequence. This matches the behavior

@ -9,11 +9,11 @@ amount of code in the context of the inferior.
from __future__ import annotations from __future__ import annotations
import gdb import gdb
import pwnlib.asm
import pwnlib.shellcraft import pwnlib.shellcraft
import pwndbg import pwndbg
import pwndbg.aglib.arch import pwndbg.aglib.arch
import pwndbg.aglib.asm
import pwndbg.aglib.memory import pwndbg.aglib.memory
import pwndbg.aglib.regs import pwndbg.aglib.regs
import pwndbg.aglib.vmmap import pwndbg.aglib.vmmap
@ -48,7 +48,7 @@ def exec_syscall(
# Build machine code that runs the requested syscall. # Build machine code that runs the requested syscall.
syscall_asm = pwnlib.shellcraft.syscall(syscall, arg0, arg1, arg2, arg3, arg4, arg5) syscall_asm = pwnlib.shellcraft.syscall(syscall, arg0, arg1, arg2, arg3, arg4, arg5)
syscall_bin = pwnlib.asm.asm(syscall_asm) syscall_bin = pwndbg.aglib.asm.asm(syscall_asm)
# Run the syscall and pass its return value onward to the caller. # Run the syscall and pass its return value onward to the caller.
return exec_shellcode( return exec_shellcode(

@ -1,6 +1,9 @@
from __future__ import annotations from __future__ import annotations
import os.path import os.path
import pathlib
import subprocess
import tempfile
from typing import Dict from typing import Dict
from typing import List from typing import List
from typing import Literal from typing import Literal
@ -36,6 +39,37 @@ _arch_mapping: Dict[Tuple[PWNDBG_SUPPORTED_ARCHITECTURES_TYPE, Literal["little",
("s390x", "big", 8): "s390x", ("s390x", "big", 8): "s390x",
} }
_prefix_header = ".global _start\n.global __start\n.section .text\n_start:\n__start:\n"
_asm_header: Dict[str, str] = {
# `.intel_syntax noprefix` forces the use of Intel assembly syntax instead of AT&T
"x86_64": _prefix_header + ".intel_syntax noprefix\n",
"x86": _prefix_header + ".intel_syntax noprefix\n",
# `.set noreorder` disables instruction reordering for MIPS to handle delay slots correctly
"mips": _prefix_header + ".set noreorder\n",
"mipsel": _prefix_header + ".set noreorder\n",
"mips64": _prefix_header + ".set noreorder\n",
"mips64el": _prefix_header + ".set noreorder\n",
"aarch64": _prefix_header,
"aarch64_be": _prefix_header,
# `.syntax unified` enables the unified assembly syntax for ARM/Thumb
"arm": _prefix_header + ".syntax unified\n",
"armeb": _prefix_header + ".syntax unified\n",
"thumb": _prefix_header + ".syntax unified\n",
"thumbeb": _prefix_header + ".syntax unified\n",
"riscv32": _prefix_header,
"riscv64": _prefix_header,
"sparc": _prefix_header,
"sparc64": _prefix_header,
"powerpc": _prefix_header,
"powerpcle": _prefix_header,
"powerpc64": _prefix_header,
"powerpc64le": _prefix_header,
"loongarch64": _prefix_header,
"s390x": _prefix_header,
}
def _get_zig_target(arch: ArchDefinition) -> str | None: def _get_zig_target(arch: ArchDefinition) -> str | None:
if arch.platform == Platform.LINUX: if arch.platform == Platform.LINUX:
# "gnu", "gnuabin32", "gnuabi64", "gnueabi", "gnueabihf", # "gnu", "gnuabin32", "gnuabi64", "gnueabi", "gnueabihf",
@ -54,7 +88,7 @@ def _get_zig_target(arch: ArchDefinition) -> str | None:
return f"{arch_mapping}-{osabi}" return f"{arch_mapping}-{osabi}"
def flags(arch: ArchDefinition) -> List[str] | None: def flags(arch: ArchDefinition) -> List[str]:
try: try:
import ziglang # type: ignore[import-untyped] import ziglang # type: ignore[import-untyped]
except ImportError: except ImportError:
@ -70,3 +104,79 @@ def flags(arch: ArchDefinition) -> List[str] | None:
"-target", "-target",
zig_target, zig_target,
] ]
def asm(arch: ArchDefinition, data: str, includes: List[pathlib.Path] | None=None) -> bytes:
arch_mapping = _arch_mapping.get((arch.name, arch.endian, arch.ptrsize), None)
if arch_mapping is None:
raise ValueError(f"Can't find ziglang target for ({(arch.name, arch.endian, arch.ptrsize)})")
return _asm(arch_mapping, data, includes)
def _asm(arch_mapping: str, data: str, includes: List[pathlib.Path] | None=None) -> bytes:
try:
import ziglang
except ImportError:
raise ValueError("Can't import ziglang")
header = _asm_header.get(arch_mapping, None)
if header is None:
raise ValueError(f"Can't find asm header for target {arch_mapping}")
if includes is None:
includes = []
includes = ''.join((f'#include "{path}"\n' for path in includes))
target = f'{arch_mapping}-freestanding'
with tempfile.TemporaryDirectory() as tmpdir:
asm_file = os.path.join(tmpdir, "input.S")
compiled_file = os.path.join(tmpdir, "out.elf")
bytecode_file = os.path.join(tmpdir, "out.bytecode")
with open(asm_file, "w") as f:
f.write(includes)
f.write(header)
f.write(data)
# Build the binary with Zig
compile_process = subprocess.run(
[
os.path.join(os.path.dirname(ziglang.__file__), "zig"),
"cc",
"-target",
target,
asm_file,
"-o",
compiled_file,
],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
if compile_process.returncode != 0:
raise Exception("Compilation error", compile_process.stdout, compile_process.stderr)
# Extract bytecode
objcopy_process = subprocess.run(
[
os.path.join(os.path.dirname(ziglang.__file__), "zig"),
"objcopy",
"-O",
"binary",
"--only-section=.text",
compiled_file,
bytecode_file,
],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
if objcopy_process.returncode != 0:
raise Exception("Extracting bytecode error", objcopy_process.stdout, objcopy_process.stderr)
with open(bytecode_file, "rb") as f:
return f.read()

@ -28,7 +28,8 @@ dependencies = [
"rich>=13.7.1,<14", "rich>=13.7.1,<14",
# Optional? only for qemu-system vmmap # Optional? only for qemu-system vmmap
"pt", "pt",
# Optional? only for 'cymbol' command # Optional? 'ziglang' must be optional, as it is not available on every platform.
# It is only used for the 'cymbol' command and some `asm` functionality.
"ziglang==0.14.1", "ziglang==0.14.1",
] ]

@ -0,0 +1,206 @@
from __future__ import annotations
import pathlib
import tempfile
import pytest
import unicorn as uc
from unicorn import arm64_const
from unicorn import arm_const
from unicorn import mips_const
from unicorn import ppc_const
from unicorn import riscv_const
from unicorn import s390x_const
from unicorn import sparc_const
from unicorn import x86_const
import pwndbg.lib.zig
expected_value = 60
include_text = f"""
#define FROM_INCLUDE_VALUE {expected_value}
"""
regs_and_instr = {
"x86": (
"mov eax, FROM_INCLUDE_VALUE",
uc.UC_ARCH_X86,
uc.UC_MODE_32,
None,
x86_const.UC_X86_REG_EAX,
),
"x86_64": (
"mov rax, FROM_INCLUDE_VALUE",
uc.UC_ARCH_X86,
uc.UC_MODE_64,
None,
x86_const.UC_X86_REG_RAX,
),
"mips": (
"li $a0, FROM_INCLUDE_VALUE",
uc.UC_ARCH_MIPS,
uc.UC_MODE_MIPS32 | uc.UC_MODE_BIG_ENDIAN,
None,
mips_const.UC_MIPS_REG_A0,
),
"mipsel": (
"li $a0, FROM_INCLUDE_VALUE",
uc.UC_ARCH_MIPS,
uc.UC_MODE_MIPS32 | uc.UC_MODE_LITTLE_ENDIAN,
None,
mips_const.UC_MIPS_REG_A0,
),
"mips64": (
"li $a0, FROM_INCLUDE_VALUE",
uc.UC_ARCH_MIPS,
uc.UC_MODE_MIPS64 | uc.UC_MODE_BIG_ENDIAN,
None,
mips_const.UC_MIPS_REG_A0,
),
"mips64el": (
"li $a0, FROM_INCLUDE_VALUE",
uc.UC_ARCH_MIPS,
uc.UC_MODE_MIPS64 | uc.UC_MODE_LITTLE_ENDIAN,
None,
mips_const.UC_MIPS_REG_A0,
),
"arm": (
"mov r0, #FROM_INCLUDE_VALUE",
uc.UC_ARCH_ARM,
uc.UC_MODE_ARM,
None,
arm_const.UC_ARM_REG_R0,
),
"armeb": (
"mov r0, #FROM_INCLUDE_VALUE",
uc.UC_ARCH_ARM,
uc.UC_MODE_ARM | uc.UC_MODE_BIG_ENDIAN,
None,
arm_const.UC_ARM_REG_R0,
),
"thumb": (
"mov r0, #FROM_INCLUDE_VALUE",
uc.UC_ARCH_ARM,
uc.UC_MODE_THUMB,
None,
arm_const.UC_ARM_REG_R0,
),
"thumbeb": (
"mov r0, #FROM_INCLUDE_VALUE",
uc.UC_ARCH_ARM,
uc.UC_MODE_THUMB | uc.UC_MODE_BIG_ENDIAN,
None,
arm_const.UC_ARM_REG_R0,
),
"aarch64": (
"mov x0, #FROM_INCLUDE_VALUE",
uc.UC_ARCH_ARM64,
uc.UC_MODE_ARM,
None,
arm64_const.UC_ARM64_REG_X0,
),
"aarch64_be": (
"mov x0, #FROM_INCLUDE_VALUE",
uc.UC_ARCH_ARM64,
uc.UC_MODE_ARM | uc.UC_MODE_BIG_ENDIAN,
None,
arm64_const.UC_ARM64_REG_X0,
),
"riscv32": (
"li a0, FROM_INCLUDE_VALUE",
uc.UC_ARCH_RISCV,
uc.UC_MODE_RISCV32,
None,
riscv_const.UC_RISCV_REG_A0,
),
"riscv64": (
"li a0, FROM_INCLUDE_VALUE",
uc.UC_ARCH_RISCV,
uc.UC_MODE_RISCV64,
None,
riscv_const.UC_RISCV_REG_A0,
),
"s390x": (
"lghi %r2, FROM_INCLUDE_VALUE",
uc.UC_ARCH_S390X,
uc.UC_MODE_BIG_ENDIAN,
s390x_const.UC_CPU_S390X_Z14,
s390x_const.UC_S390X_REG_R2,
),
# FIXME: upstream bug, https://github.com/ziglang/zig/issues/23674
# 'sparc': ('mov 60,%i0', uc.UC_ARCH_SPARC, uc.UC_MODE_SPARC32 | uc.UC_MODE_BIG_ENDIAN, None, sparc_const.UC_SPARC_REG_I0),
"sparc64": (
"mov FROM_INCLUDE_VALUE,%i0",
uc.UC_ARCH_SPARC,
uc.UC_MODE_SPARC64 | uc.UC_MODE_BIG_ENDIAN,
None,
sparc_const.UC_SPARC_REG_I0,
),
"powerpc": (
"li %r1, FROM_INCLUDE_VALUE",
uc.UC_ARCH_PPC,
uc.UC_MODE_32 | uc.UC_MODE_BIG_ENDIAN,
ppc_const.UC_CPU_PPC32_7457A_V1_2,
ppc_const.UC_PPC_REG_1,
),
"powerpc64": (
"li %r1, FROM_INCLUDE_VALUE",
uc.UC_ARCH_PPC,
uc.UC_MODE_64 | uc.UC_MODE_BIG_ENDIAN,
ppc_const.UC_CPU_PPC64_970_V2_2,
ppc_const.UC_PPC_REG_1,
),
"powerpcle": (
"li %r1, FROM_INCLUDE_VALUE",
None,
None,
None,
None,
), # FIXME: UC_MODE_LITTLE_ENDIAN, Not supported by Unicorn
"powerpc64le": (
"li %r1, FROM_INCLUDE_VALUE",
None,
None,
None,
None,
), # FIXME: UC_MODE_LITTLE_ENDIAN, Not supported by Unicorn
"loongarch64": (
"addi.d $r1, $r1, FROM_INCLUDE_VALUE",
None,
None,
None,
None,
), # FIXME: Not supported by Unicorn
}
test_cases = list(regs_and_instr.keys())
@pytest.mark.parametrize("arch", test_cases)
def test_zig_asm_compiles(arch):
asm_line, uc_arch, uc_mode, uc_cpu, reg_id = regs_and_instr[arch]
with tempfile.NamedTemporaryFile(mode="wt", suffix="test.h", delete=False) as example_h:
example_h.write(include_text)
bytecode = pwndbg.lib.zig._asm(arch, asm_line, includes=[pathlib.Path(example_h.name)])
assert len(bytecode) > 0, "Bytecode too short"
if uc_arch is None:
pytest.skip("unsupported by Unicorn")
mu = uc.Uc(uc_arch, uc_mode, uc_cpu)
# Map 4KB memory at 0x20000
ADDRESS = 0x20000
mu.mem_map(ADDRESS, 0x2000)
mu.mem_write(ADDRESS, bytes(bytecode))
# Zero the register
mu.reg_write(reg_id, 0)
# Run the code
mu.emu_start(ADDRESS, ADDRESS + len(bytecode), count=1)
# Read result
value = mu.reg_read(reg_id)
assert value == expected_value, "Value mismatch"
Loading…
Cancel
Save