Port to aglib: ropgadget (#2563)

* Port to aglib: ropgadget, rop

* rop: fix capstone mode

* rop: fix msg
pull/2508/head^2
patryk4815 1 year ago committed by GitHub
parent 921ba71cf4
commit 64bd3fee8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -14,10 +14,6 @@ let
]
++ pkgs.lib.optionals pkgs.stdenv.isLinux [
python3.pkgs.ropper # ref: https://github.com/pwndbg/pwndbg/blob/2023.07.17/pwndbg/commands/ropper.py#L30
python3.pkgs.ropgadget # ref: https://github.com/pwndbg/pwndbg/blob/2023.07.17/pwndbg/commands/rop.py#L34
]
++ pkgs.lib.optionals isLLDB [
python3.pkgs.gnureadline
]
);

@ -19,6 +19,7 @@ from typing_extensions import ParamSpec
import pwndbg.aglib.heap
import pwndbg.aglib.proc
import pwndbg.aglib.qemu
import pwndbg.aglib.regs
import pwndbg.exception
from pwndbg.aglib.heap.ptmalloc import DebugSymsHeap
from pwndbg.aglib.heap.ptmalloc import GlibcMemoryAllocator
@ -31,7 +32,6 @@ from pwndbg.aglib.heap.ptmalloc import SymbolUnresolvableError
# TODO: Replace these with uses of the Debugger API.
if pwndbg.dbg.is_gdblib_available():
import pwndbg.gdblib.kernel
import pwndbg.gdblib.regs
log = logging.getLogger(__name__)
@ -709,7 +709,6 @@ def load_commands() -> None:
import pwndbg.commands.pcplist
import pwndbg.commands.peda
import pwndbg.commands.reload
import pwndbg.commands.rop
import pwndbg.commands.ropper
import pwndbg.commands.segments
import pwndbg.commands.shell
@ -758,6 +757,7 @@ def load_commands() -> None:
import pwndbg.commands.radare2
import pwndbg.commands.retaddr
import pwndbg.commands.rizin
import pwndbg.commands.rop
import pwndbg.commands.search
import pwndbg.commands.sigreturn
import pwndbg.commands.spray

@ -1,22 +1,211 @@
from __future__ import annotations
import argparse
import binascii
import re
import subprocess
import tempfile
from typing import Iterator
from typing import List
from typing import Tuple
import gdb
import pwndbg.aglib.arch
import pwndbg.aglib.memory
import pwndbg.aglib.proc
import pwndbg.aglib.vmmap
import pwndbg.color.message as M
import pwndbg.commands
import pwndbg.lib.memory
from pwndbg.aglib.disasm import get_disassembler
from pwndbg.commands import CommandCategory
class RawMemoryBinary(object):
def __init__(self, options, start_addr: int):
self.start_addr = start_addr
self.__fileName = options.binary
self.__rawBinary = None
self.cs = get_disassembler(pwndbg.aglib.regs.pc)
with open(self.__fileName, "rb") as fp:
self.__rawBinary = fp.read()
def getBinary(self):
return self
def getFileName(self):
return self.__fileName
def getRawBinary(self):
return self.__rawBinary
def getEntryPoint(self):
raise NotImplementedError()
def getExecSections(self):
return [
{
"name": "raw",
"offset": 0,
"size": len(self.__rawBinary),
"vaddr": self.start_addr,
"opcodes": bytes(self.__rawBinary),
}
]
def getDataSections(self):
raise NotImplementedError()
def getArch(self):
return self.cs.arch
def getArchMode(self):
return self.cs.mode
def getEndian(self):
# Already returned in `getArchMode` func
return 0
def getFormat(self):
return "Raw"
def _rop(
file_path: str, grep: str | None, argument: List[str], start_addr: int | None = None
) -> None:
from ropgadget.args import Args
from ropgadget.core import Core
try:
args = Args(
arguments=[
"--binary",
file_path,
*argument,
]
)
except ValueError as e:
print(M.error(f"rop invalid args: {e}"))
return
options = args.getArgs()
c = Core(options)
if start_addr is not None:
# HACK: to load from our class
c._Core__binary = RawMemoryBinary(options, start_addr=start_addr)
else:
c.do_binary(file_path, silent=True)
# Find gadgets
c.do_load(0, silent=True)
print("Gadgets information\n============================================================")
for gadget in c.gadgets():
insts = gadget.get("gadget", "")
if not insts:
continue
if grep:
# grep search
if not re.search(grep, insts):
continue
vaddr = gadget["vaddr"]
bytesStr = " // " + binascii.hexlify(gadget["bytes"]).decode("utf8") if options.dump else ""
print(
"0x{{0:0{}x}} : {{1}}{{2}}".format(pwndbg.aglib.arch.ptrsize).format(
vaddr, insts, bytesStr
)
)
print("\nUnique gadgets found: %d\n\n" % (len(c.gadgets())))
def split_range_to_chunks(
range_start: int, range_end: int, chunk_size: int = 10 * 1024 * 1024
) -> Iterator[Tuple[int, int, int, int]]:
total_parts = ((range_end - range_start) + chunk_size - 1) // chunk_size
for current_part, range_start_chunk in enumerate(range(range_start, range_end, chunk_size), 1):
range_end_chunk = min(range_start_chunk + chunk_size, range_end)
range_size = range_end_chunk - range_start_chunk
yield (
range_start_chunk,
range_size,
current_part,
total_parts,
)
def parse_size(size_str: str) -> int:
unit_multipliers = {
"b": 1,
"kb": 1024,
"mb": 1024**2,
"gb": 1024**3,
"tb": 1024**4,
"kib": 1024,
"mib": 1024**2,
"gib": 1024**3,
"tib": 1024**4,
}
size_str = size_str.strip().lower()
match = re.match(r"(\d+)\s*(b|kb|mb|gb|tb|kib|mib|gib|tib)", size_str)
if not match:
raise ValueError(f"Invalid size string: {size_str}")
value = int(match.group(1))
unit = match.group(2)
return value * unit_multipliers[unit]
def iterate_over_pages(mem_limit: int) -> Iterator[Tuple[str, pwndbg.lib.memory.Page | None]]:
if not pwndbg.aglib.proc.alive:
yield pwndbg.aglib.proc.exe, None
return
proc = pwndbg.dbg.selected_inferior()
for page in proc.vmmap().ranges():
if not page.execute:
continue
print(M.info(f"Searching in {hex(page.start)} {hex(page.end)} {page.objfile}"))
if page.memsz > mem_limit:
print(
M.hint(
"WARNING: The memory page size is too large to dump.\n"
"WARNING: Parsing this large memory page might take an excessive amount of time...\n"
"WARNING: To process larger pages, increase the `--memlimit` parameter (e.g., `--memlimit 100MB`)."
)
)
continue
with tempfile.NamedTemporaryFile(mode="a+b") as fmem:
try:
for start, size, progress_cur, progress_max in split_range_to_chunks(
page.start, page.end
):
if progress_max > 1:
print(M.hint(f"Dumping memory... {progress_cur} / {progress_max}"))
mem_data = proc.read_memory(address=start, size=size)
fmem.write(mem_data)
except pwndbg.dbg_mod.Error as e:
print(M.error(f"WARNING: failed to read page: {e}"))
continue
fmem.flush()
yield fmem.name, page
parser = argparse.ArgumentParser(
description="Dump ROP gadgets with Jon Salwan's ROPgadget tool.",
epilog="Example: rop --grep 'pop rdi' -- --nojop",
)
parser.add_argument("--grep", type=str, help="String to grep the output for")
parser.add_argument("--memlimit", type=str, default="50MB", help="String to grep the output for")
parser.add_argument("argument", nargs="*", type=str, help="Arguments to pass to ROPgadget")
@ -24,33 +213,8 @@ parser.add_argument("argument", nargs="*", type=str, help="Arguments to pass to
parser, aliases=["ropgadget"], category=CommandCategory.INTEGRATIONS
)
@pwndbg.commands.OnlyWithFile
def rop(grep, argument) -> None:
with tempfile.NamedTemporaryFile() as corefile:
# If the process is running, dump a corefile so we get actual addresses.
if pwndbg.aglib.proc.alive:
filename = corefile.name
gdb.execute(f"gcore {filename}")
else:
filename = pwndbg.aglib.proc.exe
# Build up the command line to run
cmd = ["ROPgadget", "--binary", filename]
cmd += argument
try:
io = subprocess.Popen(cmd, stdout=subprocess.PIPE)
except Exception:
print("Could not run ROPgadget. Please ensure it's installed and in $PATH.")
return
(stdout, stderr) = io.communicate()
stdout = stdout.decode("latin-1")
if not grep:
print(stdout)
return
for line in stdout.splitlines():
if re.search(grep, line):
print(line)
def rop(grep: str | None, memlimit: str, argument: List[str]) -> None:
memlimit = parse_size(memlimit)
for file_path, page in iterate_over_pages(memlimit):
_rop(file_path, grep, argument, start_addr=page.start if page else None)

@ -124,7 +124,7 @@ module = [
disable_error_code = ["index", "name-defined", "attr-defined"]
[[tool.mypy.overrides]]
module = ["capstone.*", "unicorn.*", "pwnlib.*", "elftools.*", "ipdb.*", "r2pipe", "rzpipe", "rich.*", "pt_gdb", "lldb.*", "gnureadline"]
module = ["capstone.*", "unicorn.*", "pwnlib.*", "ropgadget.*", "elftools.*", "ipdb.*", "r2pipe", "rzpipe", "rich.*", "pt_gdb", "lldb.*", "gnureadline"]
ignore_missing_imports = true
[tool.isort]

Loading…
Cancel
Save