Implement a way to resolve multiple process in attachp (#1956)

* implement various resolve method to attachp

* add tests

* lint tests

* fix resolve_none test

* fix procps issue

* remove unused function

* Update pwndbg/commands/attachp.py

* Update pwndbg/commands/attachp.py

* first/last -> oldest/newest

* change default value to ask

* Provide informations about the config

* fix test

* fix lint

* catch eoferror

---------

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
pull/1960/head
keymoon 2 years ago committed by GitHub
parent 40b7928d9e
commit 03e97e9ea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,12 +5,27 @@ import os
import stat
from subprocess import CalledProcessError
from subprocess import check_output
from typing import Union
import gdb
from tabulate import tabulate
import pwndbg.commands
from pwndbg.color import message
from pwndbg.commands import CommandCategory
from pwndbg.ui import get_window_size
_NONE = "none"
_OLDEST = "oldest"
_NEWEST = "newest"
_ASK = "ask"
_OPTIONS = [_NONE, _OLDEST, _NEWEST, _ASK]
pwndbg.gdblib.config.add_param(
"attachp-resolution-method",
_ASK,
f'how to determine the process to attach when multiple candidates exists ("{_OLDEST}", "{_NEWEST}", "{_NONE}" or "{_ASK}"(default))',
)
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
@ -34,11 +49,12 @@ Original GDB attach command help:
to specify the program, and to load its symbol table.""",
)
parser.add_argument("--no-truncate", action="store_true", help="dont truncate command args")
parser.add_argument("target", type=str, help="pid, process name or device file to attach to")
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.START)
def attachp(target) -> None:
def attachp(no_truncate, target) -> None:
try:
resolved_target = int(target)
except ValueError:
@ -71,9 +87,104 @@ def attachp(target) -> None:
return
if len(pids) > 1:
print(message.warn(f"Found pids: {', '.join(pids)} (use `attach <pid>`)"))
method = pwndbg.gdblib.config.attachp_resolution_method
if method not in _OPTIONS:
print(
message.warn(
f'Invalid value for `attachp-resolution-method` config. Fallback to default value("{_ASK}").'
)
)
method = _ASK
try:
ps_output = check_output(
[
"ps",
"--no-headers",
"-ww",
"-p",
",".join(pids),
"-o",
"pid,ruser,etime,args",
"--sort",
"+lstart",
]
).decode()
except FileNotFoundError:
print(message.error("Error: did not find `ps` command"))
print(
message.warn(f"Use `attach <pid>` instead (found pids: {', '.join(pids)})")
)
return
except CalledProcessError:
print(message.error("Error: failed to get process details"))
print(
message.warn(f"Use `attach <pid>` instead (found pids: {', '.join(pids)})")
)
return
print(
message.warn(
f'Multiple processes found. Current resolution method is "{method}". Run the command `config attachp-resolution-method` to see more informations.'
)
)
# Here, we can safely use split to capture each field
# since none of the columns except args can contain spaces
proc_infos = [row.split(maxsplit=3) for row in ps_output.splitlines()]
if method == _OLDEST:
resolved_target = int(proc_infos[0][0])
elif method == _NEWEST:
resolved_target = int(proc_infos[-1][0])
else:
headers = ["pid", "user", "elapsed", "command"]
showindex: Union[bool, range] = (
False if method == _NONE else range(1, len(proc_infos) + 1)
)
# calculate max_col_widths to fit window width
test_table = tabulate(proc_infos, headers=headers, showindex=showindex)
table_orig_width = len(test_table.splitlines()[1])
max_command_width = max(len(command) for _, _, _, command in proc_infos)
max_col_widths = max(
max_command_width - (table_orig_width - get_window_size()[1]), 10
)
# truncation
if not no_truncate:
for info in proc_infos:
info[-1] = _truncate_string(info[-1], max_col_widths)
msg = tabulate(
proc_infos,
headers=headers,
showindex=showindex,
maxcolwidths=max_col_widths,
)
print(message.notice(msg))
if method == _NONE:
print(message.warn("use `attach <pid>` to attach"))
return
elif method == _ASK:
while True:
msg = message.notice(f"which process to attach?(1-{len(proc_infos)}) ")
try:
inp = input(msg).strip()
except EOFError:
return
try:
choice = int(inp)
if not (1 <= choice <= len(proc_infos)):
continue
except ValueError:
continue
break
resolved_target = int(proc_infos[choice - 1][0])
else:
raise Exception("unreachable")
else:
resolved_target = int(pids[0])
print(message.on(f"Attaching to {resolved_target}"))
@ -81,6 +192,7 @@ def attachp(target) -> None:
gdb.execute(f"attach {resolved_target}")
except gdb.error as e:
print(message.error(f"Error: {e}"))
return
def _is_device(path) -> bool:
@ -93,3 +205,14 @@ def _is_device(path) -> bool:
return True
return False
def _truncate_string(s: str, length: int):
TRUNCATE_FILLER = " ... "
if len(s) < length:
return s
truncate_point = (length - len(TRUNCATE_FILLER)) // 2
result = s[:truncate_point]
result += TRUNCATE_FILLER
result += s[-(length - len(result)) :]
return result

@ -1,5 +1,6 @@
from __future__ import annotations
import getpass
import os
import re
import subprocess
@ -65,24 +66,143 @@ def test_attachp_command_attaches_to_pid(launched_bash_binary):
@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH)
def test_attachp_command_attaches_to_procname_too_many_pids(launched_bash_binary):
def test_attachp_command_attaches_to_procname_resolve_none(launched_bash_binary):
pid, binary_path = launched_bash_binary
process = subprocess.Popen([binary_path], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
process = subprocess.Popen(
[binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
binary_name = binary_path.split("/")[-1]
result = run_gdb_with_script(pyafter=f"attachp {binary_name}")
result = run_gdb_with_script(
pyafter=["set attachp-resolution-method none", f"attachp {binary_name}"]
)
process.kill()
regex = r"pid +user +elapsed +command\n"
regex += r"-+ -+ -+ -+\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r"use `attach \<pid\>` to attach\n"
matches = re.search(regex, result).groups()
expected = (str(pid), getpass.getuser(), binary_path, str(process.pid), getpass.getuser())
assert matches[:-1] == expected
assert matches[-1].startswith(f"{binary_path} -i -i") and " ... " in matches[-1]
@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH)
def test_attachp_command_attaches_to_procname_resolve_none_no_truncate(launched_bash_binary):
pid, binary_path = launched_bash_binary
process = subprocess.Popen(
[binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
binary_name = binary_path.split("/")[-1]
result = run_gdb_with_script(
pyafter=["set attachp-resolution-method none", f"attachp --no-truncate {binary_name}"]
)
process.kill()
matches = re.search(r"Found pids: ([0-9]+), ([0-9]+) \(use `attach <pid>`\)", result).groups()
matches = list(map(int, matches))
matches.sort()
regex = r"pid +user +elapsed +command\n"
regex += r"-+ -+ -+ -+\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r"(?: +-?(?: -i)+(?: | -)?\n)+"
regex += r"use `attach \<pid\>` to attach\n"
matches = re.search(regex, result).groups()
expected_pids = [pid, process.pid]
expected_pids.sort()
expected = (str(pid), getpass.getuser(), binary_path, str(process.pid), getpass.getuser())
assert matches[:-1] == expected
assert matches[-1].startswith(f"{binary_path} -i -i")
@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH)
def test_attachp_command_attaches_to_procname_resolve_ask(launched_bash_binary):
pid, binary_path = launched_bash_binary
process = subprocess.Popen(
[binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
binary_name = binary_path.split("/")[-1]
result = run_gdb_with_script(
pyafter=["set attachp-resolution-method ask", f"attachp {binary_name}"],
stdin_input=b"0\n1\n",
)
process.kill()
regex = r"pid +user +elapsed +command\n"
regex += r"-+ -+ -+ -+ -+\n"
regex += r" 1 +([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r" 2 +([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r"which process to attach\?\(1-2\) "
regex += r"which process to attach\?\(1-2\) "
matches = re.search(regex, result).groups()
expected = (
str(pid),
getpass.getuser(),
binary_path,
str(process.pid),
getpass.getuser(),
)
assert matches[:-1] == expected
assert matches[-1].startswith(f"{binary_path} -i -i") and " ... " in matches[-1]
matches = re.search(r"Attaching to ([0-9]+)", result).groups()
assert matches == (str(pid),)
assert re.search(rf"Detaching from program: {binary_path}, process {pid}", result)
@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH)
def test_attachp_command_attaches_to_procname_resolve_oldest(launched_bash_binary):
pid, binary_path = launched_bash_binary
process = subprocess.Popen(
[binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
binary_name = binary_path.split("/")[-1]
result = run_gdb_with_script(
pyafter=["set attachp-resolution-method oldest", f"attachp {binary_name}"]
)
process.kill()
matches = re.search(r"Attaching to ([0-9]+)", result).groups()
assert matches == (str(pid),)
assert re.search(rf"Detaching from program: {binary_path}, process {pid}", result)
@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH)
def test_attachp_command_attaches_to_procname_resolve_newest(launched_bash_binary):
pid, binary_path = launched_bash_binary
process = subprocess.Popen(
[binary_path] + ["-i"] * 1000, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
binary_name = binary_path.split("/")[-1]
result = run_gdb_with_script(
pyafter=["set attachp-resolution-method newest", f"attachp {binary_name}"]
)
process.kill()
matches = re.search(r"Attaching to ([0-9]+)", result).groups()
assert matches == (str(process.pid),)
assert matches == expected_pids
assert re.search(rf"Detaching from program: {binary_path}, process {process.pid}", result)
@pytest.mark.skipif(can_attach is False, reason=REASON_CANNOT_ATTACH)

@ -5,7 +5,9 @@ import re
import subprocess
def run_gdb_with_script(binary="", core="", pybefore=None, pyafter=None, timeout=None):
def run_gdb_with_script(
binary="", core="", stdin_input=None, pybefore=None, pyafter=None, timeout=None
):
"""
Runs GDB with given commands launched before and after loading of gdbinit.py
Returns GDB output.
@ -32,7 +34,9 @@ def run_gdb_with_script(binary="", core="", pybefore=None, pyafter=None, timeout
command += ["--eval-command", "quit"]
print(f"Launching command: {command}")
output = subprocess.check_output(command, stderr=subprocess.STDOUT, timeout=timeout)
output = subprocess.check_output(
command, stderr=subprocess.STDOUT, timeout=timeout, input=stdin_input
)
# Python 3 returns bytes-like object so lets have it consistent
output = codecs.decode(output, "utf8")

Loading…
Cancel
Save