Add mmap command that executes the mmap syscall in the inferior (#1952)

* Initial implementation of the mmap command
 - Additionally, moves syscall execution and general inferior-scoped code
   execution facilities into a single, new file, in 'pwndbg/gdblib/shellcode.py'

* Add warnings and fix a few nits

* Lint

* Update pwndbg/commands/mmap.py

* Update pwndbg/commands/mmap.py

* Update pwndbg/commands/mmap.py

* Update pwndbg/commands/mmap.py

* Update pwndbg/commands/mmap.py

* Update pwndbg/gdblib/shellcode.py

* Update pwndbg/commands/mmap.py

* Update pwndbg/commands/mmap.py

* Make mmap faster for `--force`

* Add initial tests for `mmap`

* Update tests/gdb-tests/tests/test_mmap.py

* Add a testcase for fd-backed mmap calls

---------

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
pull/1959/head
Matt 2 years ago committed by GitHub
parent 427bf8c96e
commit dfd5f95b56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -640,6 +640,7 @@ def load_commands() -> None:
import pwndbg.commands.leakfind
import pwndbg.commands.memoize
import pwndbg.commands.misc
import pwndbg.commands.mmap
import pwndbg.commands.mprotect
import pwndbg.commands.nearpc
import pwndbg.commands.next

@ -0,0 +1,253 @@
from __future__ import annotations
import argparse
import pwndbg.chain
import pwndbg.color.message as message
import pwndbg.commands
import pwndbg.enhance
import pwndbg.gdblib.file
import pwndbg.gdblib.shellcode
import pwndbg.lib.memory
import pwndbg.wrappers.checksec
import pwndbg.wrappers.readelf
from pwndbg.commands import CommandCategory
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description="""
Calls the mmap syscall and prints its resulting address.
Note that the mmap syscall may fail for various reasons
(see `man mmap`) and, in case of failure, its return value
will not be a valid pointer.
PROT values: NONE (0), READ (1), WRITE (2), EXEC (4)
MAP values: SHARED (1), PRIVATE (2), SHARED_VALIDATE (3), FIXED (0x10),
ANONYMOUS (0x20)
Flags and protection values can be either a string containing the names of the
flags or permissions or a single number corresponding to the bitwise OR of the
protection and flag numbers.
Examples:
mmap 0x0 4096 PROT_READ|PROT_WRITE|PROT_EXEC MAP_PRIVATE|MAP_ANONYMOUS -1 0
- Maps a new private+anonymous page with RWX permissions at a location
decided by the kernel.
mmap 0x0 4096 PROT_READ MAP_PRIVATE 10 0
- Maps 4096 bytes of the file pointed to by file descriptor number 10 with
read permission at a location decided by the kernel.
mmap 0xdeadbeef 0x1000
- Maps a new private+anonymous page with RWX permissions at a page boundary
near 0xdeadbeef.
""",
)
parser.add_argument(
"addr", help="Address hint to be given to mmap.", type=pwndbg.commands.sloppy_gdb_parse
)
parser.add_argument(
"length",
help="Length of the mapping, in bytes. Needs to be greater than zero.",
type=int,
)
parser.add_argument(
"prot",
help='Prot enum or int as in mmap(2). Eg. "PROT_READ|PROT_EXEC" or 7 (for RWX).',
type=str,
nargs="?",
default="7",
)
parser.add_argument(
"flags",
help='Flags enum or int as in mmap(2). Eg. "MAP_PRIVATE|MAP_ANONYMOUS" or 0x22.',
type=str,
nargs="?",
default="0x22",
)
parser.add_argument(
"fd",
help="File descriptor of the file to be mapped, or -1 if using MAP_ANONYMOUS.",
type=int,
nargs="?",
default=-1,
)
parser.add_argument(
"offset",
help="Offset from the start of the file, in bytes, if using file based mapping.",
type=int,
nargs="?",
default=0,
)
parser.add_argument(
"--quiet", "-q", help="Disable address validity warnings and hints", action="store_true"
)
parser.add_argument(
"--force", "-f", help="Force potentially unsafe actions to happen", action="store_true"
)
prot_dict = {
"PROT_NONE": 0x0,
"PROT_READ": 0x1,
"PROT_WRITE": 0x2,
"PROT_EXEC": 0x4,
}
flag_dict = {
"MAP_SHARED": 0x1,
"MAP_PRIVATE": 0x2,
"MAP_SHARED_VALIDATE": 0x3,
"MAP_FIXED": 0x10,
"MAP_ANONYMOUS": 0x20,
}
def prot_str_to_val(protstr):
"""Heuristic to convert PROT_EXEC|PROT_WRITE to integer value."""
prot_int = 0
for k, v in prot_dict.items():
if k in protstr:
prot_int |= v
return prot_int
def flag_str_to_val(flagstr):
"""Heuristic to convert MAP_SHARED|MAP_FIXED to integer value."""
flag_int = 0
for k, v in flag_dict.items():
if k in flagstr:
flag_int |= v
return flag_int
def parse_str_or_int(val, parser):
"""
Try parsing a string with one of the parsers above or by converting it to
an int, or passes the value through if it is already an integer.
"""
if type(val) is str:
candidate = parser(val)
if candidate != 0:
return candidate
return int(val, 0)
elif type(val) is int:
return val
else:
# Getting here is a bug, we shouldn't be seeing other types at all.
raise TypeError(f"invalid type for value: {type(val)}")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.MEMORY)
@pwndbg.commands.OnlyWhenRunning
def mmap(addr, length, prot=7, flags=0x22, fd=-1, offset=0, quiet=False, force=False) -> None:
try:
prot_int = parse_str_or_int(prot, prot_str_to_val)
except ValueError as e:
print(message.error(f'Invalid protection value "{prot}": {e}'))
return
try:
flag_int = parse_str_or_int(flags, flag_str_to_val)
except ValueError as e:
print(message.error(f'Invalid flags value "{flags}": {e}'))
return
aligned_addr = int(pwndbg.lib.memory.page_align(addr))
if flag_int & flag_dict["MAP_FIXED"] != 0:
# When using MAP_FIXED, it's only safe to call mmap(2) when the address
# overlaps no other maps. We want to make sure that, unless the user
# _really_ knows what they're doing, this call will be safe.
#
# Additionally, it's nice to highlight cases where the call is likely
# to fail because the address is not properly aligned.
addr = int(addr)
if addr != aligned_addr and not quiet:
print(
message.warn(
f"""\
Address {addr:#x} is not properly aligned. Calling mmap with MAP_FIXED and an
unaligned address is likely to fail. Consider using the address {aligned_addr:#x}
instead.\
"""
)
)
# Collision checking can get expensive for some combinations of number
# of existing mappings and size of maps. If the user is using `--force`,
# it's fair to assume they know what they're doing enough that we don't
# need to bother them with any of this information, and get a nice
# speedup as a bonus.
if not force:
page = pwndbg.lib.memory.Page(addr, int(length), 0, 0)
collisions = []
vm = pwndbg.gdblib.vmmap.get()
# FIXME: The ends of the maps are sorted. We could bisect the array
# in order to quickly reject all of the items we could never hit
# (all of those such that `vm[i].end < page.start`).
#
# The target Python version as of the writing (3.8) does not support
# `bissect.bissect_left(key=*)`, and cooking up our own
# implementation feels overkill for something that could just be
# fixed later with a version bump.
for i in range(len(vm)):
cand = vm[i]
if cand.end > page.start and cand.start < page.end:
collisions.append(cand)
if cand.start >= page.end:
# No more collisions are possible.
break
if len(collisions) > 0:
m = message.error
print(
m(
f"""\
Trying to mmap with MAP_FIXED for an address range that collides with {len(collisions)}
existing range{'s' if len(collisions) > 1 else ''}:\
"""
)
)
for c in collisions:
print(m(f" {c}"))
print(
m(
"""
This operation is destructive and will delete all of the listed mappings.\
"""
)
)
print(
m(
"Run this command again with `--force` if you still \
wish to proceed."
)
)
return
elif int(addr) != aligned_addr and not quiet:
# Highlight to the user that the address they've specified is likely to
# be changed by the kernel.
print(
message.warn(
f"""\
Address {addr:#x} is not properly aligned. It is likely to be changed to an
aligned address by the kernel automatically. If this is not desired, consider
using the address {aligned_addr:#x} instead.\
"""
)
)
pointer = pwndbg.gdblib.shellcode.exec_syscall(
"SYS_mmap",
int(pwndbg.lib.memory.page_align(addr)),
int(length),
prot_int,
flag_int,
int(fd),
int(offset),
)
print(f"{pointer:#x}")

@ -2,18 +2,14 @@ from __future__ import annotations
import argparse
import gdb
import pwnlib
from pwnlib import asm
import pwndbg.chain
import pwndbg.commands
import pwndbg.enhance
import pwndbg.gdblib.file
import pwndbg.gdblib.shellcode
import pwndbg.wrappers.checksec
import pwndbg.wrappers.readelf
from pwndbg.commands import CommandCategory
from pwndbg.lib.regs import reg_sets
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
@ -65,38 +61,7 @@ def prot_str_to_val(protstr):
def mprotect(addr, length, prot) -> None:
prot_int = prot_str_to_val(prot)
# generate a shellcode that executes the mprotect syscall
shellcode_asm = pwnlib.shellcraft.syscall(
ret = pwndbg.gdblib.shellcode.exec_syscall(
"SYS_mprotect", int(pwndbg.lib.memory.page_align(addr)), int(length), int(prot_int)
)
shellcode = asm.asm(shellcode_asm)
# obtain the registers that need to be saved for the current platform
# we save the registers that are used for arguments, return value and the program counter
current_regs = reg_sets[pwndbg.gdblib.arch.current]
regs_to_save = current_regs.args + (current_regs.retval, current_regs.pc)
# save the registers
saved_registers = {reg: pwndbg.gdblib.regs[reg] for reg in regs_to_save}
# save the memory which will be overwritten by the shellcode
saved_instruction_bytes = pwndbg.gdblib.memory.read(
saved_registers[current_regs.pc], len(shellcode)
)
pwndbg.gdblib.memory.write(saved_registers[current_regs.pc], shellcode)
# execute syscall
gdb.execute("nextsyscall")
gdb.execute("stepi")
# get the return value
ret = pwndbg.gdblib.regs[current_regs.retval]
print("mprotect returned %d (%s)" % (ret, current_regs.retval))
# restore registers and memory
pwndbg.gdblib.memory.write(saved_registers[current_regs.pc], saved_instruction_bytes)
# restore the registers
for register, value in saved_registers.items():
setattr(pwndbg.gdblib.regs, register, value)
print(f"mprotect returned {ret}")

@ -0,0 +1,108 @@
"""
Shellcode
This module implements functionality that allows for the execution of a small
amount of code in the context of the inferior.
"""
from __future__ import annotations
import gdb
import pwnlib.asm
import pwnlib.shellcraft
import pwndbg
def _get_syscall_return_value():
"""
Reads the value corresponding to the return value of a syscall that has
just returned.
"""
register_set = pwndbg.lib.regs.reg_sets[pwndbg.gdblib.arch.current]
return pwndbg.gdblib.regs[register_set.retval]
def exec_syscall(
syscall, arg0=None, arg1=None, arg2=None, arg3=None, arg4=None, arg5=None, arg6=None
):
"""
Tries executing the given syscall in the context of the inferior.
"""
# Build machine code that runs the requested syscall.
syscall_asm = pwnlib.shellcraft.syscall(syscall, arg0, arg1, arg2, arg3, arg4, arg5)
syscall_bin = pwnlib.asm.asm(syscall_asm)
# Run the syscall and pass its return value onward to the caller.
return exec_shellcode(syscall_bin, restore_context=True, capture=_get_syscall_return_value)
def exec_shellcode(blob, restore_context=True, capture=None):
"""
Tries executing the given blob of machine code in the current context of the
inferior, optionally restoring the values of the registers as they were
before the shellcode ran, as a means to allow for execution of the inferior
to continue uninterrupted. The value of the program counter is always
restored.
Additionally, the caller may specify an object to be called before the
context is restored, so that information stored in the registers after the
shellcode finishes can be retrieved. The return value of that call will be
returned by this function.
# Safety
Seeing as this function injects code directly into the inferior and runs it,
the caller must be careful to inject code that will (1) terminate and (2)
not cause the inferior to misbehave. Otherwise, it is fairly easy to crash
or currupt the memory in the inferior.
"""
register_set = pwndbg.lib.regs.reg_sets[pwndbg.gdblib.arch.current]
preserve_set = register_set.gpr + register_set.args + (register_set.pc, register_set.stack)
registers = {reg: pwndbg.gdblib.regs[reg] for reg in preserve_set}
starting_address = registers[register_set.pc]
# Make sure the blob fits in the rest of the space we have in this page.
#
# NOTE: Technically, we could actually use anything from the whole page to
# all of the pages currently mapped as executable for this. There is no
# technical limitation stopping us from doing that, but seeing as doing it
# is harder to make sure it works correctly, we don't (for now, at least).
page = pwndbg.gdblib.vmmap.find(starting_address)
assert page is not None
clearance = page.end - len(blob) - 1
if clearance < 0:
# The page isn't large enough to hold our shellcode.
raise RuntimeError(
f"Not enough space to execute code as inferior: \
need at least {len(blob)} bytes, have {clearance} bytes available"
)
# Swap the code in the range with our shellcode.
existing_code = pwndbg.gdblib.memory.read(starting_address, len(blob))
pwndbg.gdblib.memory.write(starting_address, blob)
# Execute.
bp = gdb.Breakpoint(f"*{starting_address+len(blob):#x}", internal=True, temporary=True)
gdb.execute("continue")
# Give the caller a chance to collect information from the environment
# before any of the context gets restored.
captured = None
if capture is not None:
captured = capture()
# Restore the code and the program counter and, if requested, the rest of
# the registers.
pwndbg.gdblib.memory.write(starting_address, existing_code)
setattr(pwndbg.gdblib.regs, register_set.pc, starting_address)
if restore_context:
for reg, val in registers.items():
setattr(pwndbg.gdblib.regs, reg, val)
return captured

@ -0,0 +1,83 @@
from __future__ import annotations
import gdb
import pwndbg
import tests
USE_FDS_BINARY = tests.binaries.get("use-fds.out")
def test_mmap_executes_properly(start_binary):
"""
Tests the mmap command
"""
start_binary(USE_FDS_BINARY)
pc = pwndbg.gdblib.regs.pc
page_size = pwndbg.lib.memory.PAGE_SIZE
# Checks for an mmap(2) error.
#
# mmap(2) is documented to only return a (void*) -1 on failure, but we are a
# little stricter and consider any value on the last page to be a mapping
# error. While technically we don't need to do this, we make the assumption
# that any mapping landing in the last page during a test should warrant
# manual investigation.
def is_mmap_error(ptr):
err = ((1 << pwndbg.gdblib.arch.ptrsize) - 1) & pwndbg.lib.memory.PAGE_MASK
return ptr & pwndbg.lib.memory.PAGE_MASK == err
# Checks whether permissions match.
def has_correct_perms(ptr, perm):
page = pwndbg.gdblib.vmmap.find(ptr)
return (
not (page.read ^ ("r" in perm))
and not (page.write ^ ("w" in perm))
and not (page.execute ^ ("x" in perm))
)
# Check basic private+anonymous page mmap.
ptr = int(gdb.execute(f"mmap 0x0 {page_size}", to_string=True), 0)
assert not is_mmap_error(ptr)
assert has_correct_perms(ptr, "rwx")
# Check basic fixed mapping.
base_addr = 0xDEADBEEF & pwndbg.lib.memory.PAGE_MASK
while True:
page = pwndbg.gdblib.vmmap.find(base_addr)
if page is None:
break
base_addr = page.end
ptr = int(
gdb.execute(
f"mmap {base_addr:#x} {page_size} 7 MAP_FIXED|MAP_ANONYMOUS|MAP_PRIVATE", to_string=True
),
0,
)
assert not is_mmap_error(ptr)
assert has_correct_perms(ptr, "rwx")
assert ptr == base_addr
# Continue the program until just before close(2) is called.
gdb.execute("break use-fds.c:16")
gdb.execute("continue")
# Retrieve the file descriptor number and map it to memory.
fd_num = int(gdb.newest_frame().read_var("fd"))
ptr = int(gdb.execute(f"mmap 0x0 16 PROT_READ MAP_PRIVATE {fd_num} 0", to_string=True), 0)
assert not is_mmap_error(ptr)
assert has_correct_perms(ptr, "r")
# Load the 16 bytes read in by the read() call in the program, as well as
# the first 16 bytes present in our newly created memory map, and compare
# them.
data_ptr = int(gdb.newest_frame().read_var("buf").address)
data_local = pwndbg.gdblib.memory.read(data_ptr, 16)
data_mapped = pwndbg.gdblib.memory.read(ptr, 16)
assert data_local == data_mapped
def test_cannot_run_mmap_when_not_running(start_binary):
# expect error message
assert "mmap: The program is not being run.\n" == gdb.execute("mmap 0x0 0x1000", to_string=True)
Loading…
Cancel
Save