diff --git a/pwndbg/commands/attachp.py b/pwndbg/commands/attachp.py index 3b14d6fce..c8fabdf81 100644 --- a/pwndbg/commands/attachp.py +++ b/pwndbg/commands/attachp.py @@ -5,12 +5,27 @@ import os import stat from subprocess import CalledProcessError from subprocess import check_output +from typing import Union import gdb +from tabulate import tabulate import pwndbg.commands from pwndbg.color import message from pwndbg.commands import CommandCategory +from pwndbg.ui import get_window_size + +_NONE = "none" +_OLDEST = "oldest" +_NEWEST = "newest" +_ASK = "ask" +_OPTIONS = [_NONE, _OLDEST, _NEWEST, _ASK] + +pwndbg.gdblib.config.add_param( + "attachp-resolution-method", + _ASK, + f'how to determine the process to attach when multiple candidates exists ("{_OLDEST}", "{_NEWEST}", "{_NONE}" or "{_ASK}"(default))', +) parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter, @@ -34,11 +49,12 @@ Original GDB attach command help: to specify the program, and to load its symbol table.""", ) +parser.add_argument("--no-truncate", action="store_true", help="dont truncate command args") parser.add_argument("target", type=str, help="pid, process name or device file to attach to") @pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.START) -def attachp(target) -> None: +def attachp(no_truncate, target) -> None: try: resolved_target = int(target) except ValueError: @@ -71,16 +87,112 @@ def attachp(target) -> None: return if len(pids) > 1: - print(message.warn(f"Found pids: {', '.join(pids)} (use `attach `)")) - return - - resolved_target = int(pids[0]) + method = pwndbg.gdblib.config.attachp_resolution_method + + if method not in _OPTIONS: + print( + message.warn( + f'Invalid value for `attachp-resolution-method` config. Fallback to default value("{_ASK}").' + ) + ) + method = _ASK + + try: + ps_output = check_output( + [ + "ps", + "--no-headers", + "-ww", + "-p", + ",".join(pids), + "-o", + "pid,ruser,etime,args", + "--sort", + "+lstart", + ] + ).decode() + except FileNotFoundError: + print(message.error("Error: did not find `ps` command")) + print( + message.warn(f"Use `attach ` instead (found pids: {', '.join(pids)})") + ) + return + except CalledProcessError: + print(message.error("Error: failed to get process details")) + print( + message.warn(f"Use `attach ` instead (found pids: {', '.join(pids)})") + ) + return + + print( + message.warn( + f'Multiple processes found. Current resolution method is "{method}". Run the command `config attachp-resolution-method` to see more informations.' + ) + ) + + # Here, we can safely use split to capture each field + # since none of the columns except args can contain spaces + proc_infos = [row.split(maxsplit=3) for row in ps_output.splitlines()] + if method == _OLDEST: + resolved_target = int(proc_infos[0][0]) + elif method == _NEWEST: + resolved_target = int(proc_infos[-1][0]) + else: + headers = ["pid", "user", "elapsed", "command"] + showindex: Union[bool, range] = ( + False if method == _NONE else range(1, len(proc_infos) + 1) + ) + + # calculate max_col_widths to fit window width + test_table = tabulate(proc_infos, headers=headers, showindex=showindex) + table_orig_width = len(test_table.splitlines()[1]) + max_command_width = max(len(command) for _, _, _, command in proc_infos) + max_col_widths = max( + max_command_width - (table_orig_width - get_window_size()[1]), 10 + ) + + # truncation + if not no_truncate: + for info in proc_infos: + info[-1] = _truncate_string(info[-1], max_col_widths) + + msg = tabulate( + proc_infos, + headers=headers, + showindex=showindex, + maxcolwidths=max_col_widths, + ) + print(message.notice(msg)) + + if method == _NONE: + print(message.warn("use `attach ` to attach")) + return + elif method == _ASK: + while True: + msg = message.notice(f"which process to attach?(1-{len(proc_infos)}) ") + try: + inp = input(msg).strip() + except EOFError: + return + try: + choice = int(inp) + if not (1 <= choice <= len(proc_infos)): + continue + except ValueError: + continue + break + resolved_target = int(proc_infos[choice - 1][0]) + else: + raise Exception("unreachable") + else: + resolved_target = int(pids[0]) print(message.on(f"Attaching to {resolved_target}")) try: gdb.execute(f"attach {resolved_target}") except gdb.error as e: print(message.error(f"Error: {e}")) + return def _is_device(path) -> bool: @@ -93,3 +205,14 @@ def _is_device(path) -> bool: return True return False + + +def _truncate_string(s: str, length: int): + TRUNCATE_FILLER = " ... " + if len(s) < length: + return s + truncate_point = (length - len(TRUNCATE_FILLER)) // 2 + result = s[:truncate_point] + result += TRUNCATE_FILLER + result += s[-(length - len(result)) :] + return result diff --git a/tests/gdb-tests/tests/test_attachp.py b/tests/gdb-tests/tests/test_attachp.py index b57c3e245..fdd6e8140 100644 --- a/tests/gdb-tests/tests/test_attachp.py +++ b/tests/gdb-tests/tests/test_attachp.py @@ -1,5 +1,6 @@ from __future__ import annotations +import getpass import os import re import subprocess @@ -65,24 +66,143 @@ def test_attachp_command_attaches_to_pid(launched_bash_binary): @pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH) -def test_attachp_command_attaches_to_procname_too_many_pids(launched_bash_binary): +def test_attachp_command_attaches_to_procname_resolve_none(launched_bash_binary): pid, binary_path = launched_bash_binary - process = subprocess.Popen([binary_path], stdout=subprocess.PIPE, stdin=subprocess.PIPE) + process = subprocess.Popen( + [binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) binary_name = binary_path.split("/")[-1] - result = run_gdb_with_script(pyafter=f"attachp {binary_name}") + result = run_gdb_with_script( + pyafter=["set attachp-resolution-method none", f"attachp {binary_name}"] + ) + + process.kill() + + regex = r"pid +user +elapsed +command\n" + regex += r"-+ -+ -+ -+\n" + regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n" + regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n" + regex += r"use `attach \` to attach\n" + matches = re.search(regex, result).groups() + + expected = (str(pid), getpass.getuser(), binary_path, str(process.pid), getpass.getuser()) + + assert matches[:-1] == expected + assert matches[-1].startswith(f"{binary_path} -i -i") and " ... " in matches[-1] + + +@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH) +def test_attachp_command_attaches_to_procname_resolve_none_no_truncate(launched_bash_binary): + pid, binary_path = launched_bash_binary + + process = subprocess.Popen( + [binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + + binary_name = binary_path.split("/")[-1] + result = run_gdb_with_script( + pyafter=["set attachp-resolution-method none", f"attachp --no-truncate {binary_name}"] + ) process.kill() - matches = re.search(r"Found pids: ([0-9]+), ([0-9]+) \(use `attach `\)", result).groups() - matches = list(map(int, matches)) - matches.sort() + regex = r"pid +user +elapsed +command\n" + regex += r"-+ -+ -+ -+\n" + regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n" + regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n" + regex += r"(?: +-?(?: -i)+(?: | -)?\n)+" + regex += r"use `attach \` to attach\n" + matches = re.search(regex, result).groups() - expected_pids = [pid, process.pid] - expected_pids.sort() + expected = (str(pid), getpass.getuser(), binary_path, str(process.pid), getpass.getuser()) + + assert matches[:-1] == expected + assert matches[-1].startswith(f"{binary_path} -i -i") + + +@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH) +def test_attachp_command_attaches_to_procname_resolve_ask(launched_bash_binary): + pid, binary_path = launched_bash_binary + + process = subprocess.Popen( + [binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + + binary_name = binary_path.split("/")[-1] + result = run_gdb_with_script( + pyafter=["set attachp-resolution-method ask", f"attachp {binary_name}"], + stdin_input=b"0\n1\n", + ) + + process.kill() + + regex = r"pid +user +elapsed +command\n" + regex += r"-+ -+ -+ -+ -+\n" + regex += r" 1 +([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n" + regex += r" 2 +([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n" + regex += r"which process to attach\?\(1-2\) " + regex += r"which process to attach\?\(1-2\) " + matches = re.search(regex, result).groups() + + expected = ( + str(pid), + getpass.getuser(), + binary_path, + str(process.pid), + getpass.getuser(), + ) + + assert matches[:-1] == expected + assert matches[-1].startswith(f"{binary_path} -i -i") and " ... " in matches[-1] + + matches = re.search(r"Attaching to ([0-9]+)", result).groups() + assert matches == (str(pid),) + + assert re.search(rf"Detaching from program: {binary_path}, process {pid}", result) + + +@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH) +def test_attachp_command_attaches_to_procname_resolve_oldest(launched_bash_binary): + pid, binary_path = launched_bash_binary + + process = subprocess.Popen( + [binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + + binary_name = binary_path.split("/")[-1] + result = run_gdb_with_script( + pyafter=["set attachp-resolution-method oldest", f"attachp {binary_name}"] + ) + + process.kill() + + matches = re.search(r"Attaching to ([0-9]+)", result).groups() + assert matches == (str(pid),) + + assert re.search(rf"Detaching from program: {binary_path}, process {pid}", result) + + +@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH) +def test_attachp_command_attaches_to_procname_resolve_newest(launched_bash_binary): + pid, binary_path = launched_bash_binary + + process = subprocess.Popen( + [binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + + binary_name = binary_path.split("/")[-1] + result = run_gdb_with_script( + pyafter=["set attachp-resolution-method newest", f"attachp {binary_name}"] + ) + + process.kill() + + matches = re.search(r"Attaching to ([0-9]+)", result).groups() + assert matches == (str(process.pid),) - assert matches == expected_pids + assert re.search(rf"Detaching from program: {binary_path}, process {process.pid}", result) @pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH) diff --git a/tests/gdb-tests/tests/utils.py b/tests/gdb-tests/tests/utils.py index 7ede89d62..249bfddc8 100644 --- a/tests/gdb-tests/tests/utils.py +++ b/tests/gdb-tests/tests/utils.py @@ -5,7 +5,9 @@ import re import subprocess -def run_gdb_with_script(binary="", core="", pybefore=None, pyafter=None, timeout=None): +def run_gdb_with_script( + binary="", core="", stdin_input=None, pybefore=None, pyafter=None, timeout=None +): """ Runs GDB with given commands launched before and after loading of gdbinit.py Returns GDB output. @@ -32,7 +34,9 @@ def run_gdb_with_script(binary="", core="", pybefore=None, pyafter=None, timeout command += ["--eval-command", "quit"] print(f"Launching command: {command}") - output = subprocess.check_output(command, stderr=subprocess.STDOUT, timeout=timeout) + output = subprocess.check_output( + command, stderr=subprocess.STDOUT, timeout=timeout, input=stdin_input + ) # Python 3 returns bytes-like object so lets have it consistent output = codecs.decode(output, "utf8")