Drop external `ps` calls in favor of `psutil` (#3179)

* Drop external `ps` calls in favor of `psutil`

* if pid is None, return

* fix test
pull/3184/head
patryk4815 4 months ago committed by GitHub
parent bc3b5ec5dd
commit 72dc157686
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,10 +1,12 @@
from __future__ import annotations
import argparse
import datetime
import os
import stat
from subprocess import CalledProcessError
from subprocess import check_output
import time
from typing import List
from typing import NamedTuple
from typing import Union
import gdb
@ -77,48 +79,45 @@ parser.add_argument(
)
def find_pids(target, user, exact, all):
import psutil
def find_pids(target: str, user: str | None, exact: bool, all: bool) -> List[int]:
# Note: we can't use `ps -C <target>` because this does not accept process names with spaces
# so target='a b' would actually match process names 'a' and 'b' here
# so instead, we will filter by process name or full cmdline later on
# if provided, filter by effective username or uid; otherwise, select all processes
ps_filter = ["-u", user] if user is not None else ["-e"]
ps_cmd = ["ps", "-o", "pid,args"] + ps_filter
try:
ps_output = check_output(ps_cmd, universal_newlines=True)
except FileNotFoundError:
print(message.error("Error: did not find `ps` command"))
return
except CalledProcessError:
print(message.error(f"The `{' '.join(ps_cmd)}` command returned non-zero status"))
return
current_pid = os.getpid()
pids_exact_match_cmd = []
pids_partial_match_cmd = []
pids_partial_match_args = []
# Skip header line
for line in ps_output.strip().splitlines()[1:]:
process_info = line.split(maxsplit=1)
iter_process = psutil.process_iter(["pid", "name", "cmdline", "username"])
iter_process = filter(lambda p: p.pid != current_pid, iter_process)
if user is not None:
iter_process = filter(lambda p: proc.username() == user, iter_process)
if len(process_info) <= 1:
# No command name?
for proc in iter_process:
try:
cmdline = proc.cmdline()
except (psutil.NoSuchProcess, psutil.AccessDenied):
# The process no longer exists or we don't have permission
continue
else:
if not cmdline:
continue
pid, args = process_info
cmd = args.split(maxsplit=1)[0]
cmd = cmdline[0]
args = " ".join(cmdline)
# Cannot attach to gdb itself.
if int(pid) == os.getpid():
continue
if target == cmd:
pids_exact_match_cmd.append(pid)
elif target in cmd:
pids_partial_match_cmd.append(pid)
elif target in args:
pids_partial_match_args.append(pid)
if target == cmd:
pids_exact_match_cmd.append(proc.pid)
elif target in cmd:
pids_partial_match_cmd.append(proc.pid)
elif target in args:
pids_partial_match_args.append(proc.pid)
if exact and all:
return pids_exact_match_cmd + pids_partial_match_cmd + pids_partial_match_args
@ -130,8 +129,105 @@ def find_pids(target, user, exact, all):
return pids_exact_match_cmd or pids_partial_match_cmd or pids_partial_match_args
class ProcessInfo(NamedTuple):
pid: int
user: str
elapsed: str
command: str
start_time: float
def resolve_target_process(
pids: List[int],
method: str,
no_truncate: bool,
) -> int | None:
proc_infos: List[ProcessInfo] = []
for pid in pids:
try:
proc = psutil.Process(pid)
start_time = proc.create_time()
proc_infos.append(
ProcessInfo(
proc.pid,
proc.username(),
str(datetime.timedelta(seconds=int(time.time() - start_time))),
" ".join(proc.cmdline()),
start_time,
)
)
except (psutil.NoSuchProcess, psutil.AccessDenied, ValueError):
continue
if not proc_infos:
print(message.error("No accessible processes found."))
return None
# Sort by start time (oldest first)
proc_infos.sort(key=lambda x: x.start_time)
if method == _OLDEST:
return proc_infos[0].pid
elif method == _NEWEST:
return proc_infos[-1].pid
else:
print(
message.warn(
f'Multiple processes found. Current resolution method is "{method}". '
f"Run `config attachp-resolution-method` to see more information."
)
)
headers = ["pid", "user", "elapsed", "command"]
showindex: Union[bool, range] = False if method == _NONE else range(1, len(proc_infos) + 1)
# Cast proc_infos for printing
table_proc_infos = [
[str(o.pid), str(o.user), str(o.elapsed), str(o.command)] for o in proc_infos
]
# Calculate column width for truncation
test_table = tabulate(table_proc_infos, headers=headers, showindex=showindex)
table_width = len(test_table.splitlines()[1])
max_command_width = max(len(row.command) for row in proc_infos)
max_col_widths = max(max_command_width - (table_width - get_window_size()[1]), 10)
# Truncate commands
if not no_truncate:
for row in table_proc_infos:
row[3] = _truncate_string(row[3], max_col_widths)
# Show the table
msg = tabulate(
table_proc_infos, headers=headers, showindex=showindex, maxcolwidths=max_col_widths
)
print(message.notice(msg))
if method == _NONE:
print(message.warn("Use `attach <pid>` to attach"))
return None
elif method == _ASK:
while True:
prompt = message.notice(f"Which process to attach to? (1-{len(proc_infos)}) ")
try:
inp = input(prompt).strip()
except EOFError:
return None
try:
choice = int(inp)
if 1 <= choice <= len(proc_infos):
return proc_infos[choice - 1].pid
except ValueError:
continue
else:
raise Exception("unreachable")
@pwndbg.commands.Command(parser, category=CommandCategory.START)
def attachp(target, no_truncate, retry, exact, all, user=None) -> None:
def attachp(
target: str, no_truncate: bool, retry: bool, exact: bool, all: bool, user: str | None = None
) -> None:
# As a default, the user may want to attach to a binary name taken from currently loaded file name
if target is None:
bin_path = pwndbg.dbg.selected_inferior().main_module_name()
@ -181,98 +277,12 @@ def attachp(target, no_truncate, retry, exact, all, user=None) -> None:
if len(pids) > 1:
method = pwndbg.config.attachp_resolution_method
try:
ps_output = check_output(
[
"ps",
"--no-headers",
"-ww",
"-p",
",".join(pids),
"-o",
"pid,ruser,etime,args",
"--sort",
"+lstart",
]
).decode()
except FileNotFoundError:
print(message.error("Error: did not find `ps` command"))
print(
message.warn(f"Use `attach <pid>` instead (found pids: {', '.join(pids)})")
)
return
except CalledProcessError:
print(message.error("Error: failed to get process details"))
print(
message.warn(f"Use `attach <pid>` instead (found pids: {', '.join(pids)})")
)
return
print(
message.warn(
f'Multiple processes found. Current resolution method is "{method}". Run the command `config attachp-resolution-method` to see more informations.'
)
)
# Here, we can safely use split to capture each field
# since none of the columns except args can contain spaces
proc_infos = [row.split(maxsplit=3) for row in ps_output.splitlines()]
if method == _OLDEST:
resolved_target = int(proc_infos[0][0])
elif method == _NEWEST:
resolved_target = int(proc_infos[-1][0])
else:
headers = ["pid", "user", "elapsed", "command"]
showindex: Union[bool, range] = (
False if method == _NONE else range(1, len(proc_infos) + 1)
)
# calculate max_col_widths to fit window width
test_table = tabulate(proc_infos, headers=headers, showindex=showindex)
table_orig_width = len(test_table.splitlines()[1])
max_command_width = max(len(command) for _, _, _, command in proc_infos)
max_col_widths = max(
max_command_width - (table_orig_width - get_window_size()[1]), 10
)
# truncation
if not no_truncate:
for info in proc_infos:
info[-1] = _truncate_string(info[-1], max_col_widths)
msg = tabulate(
proc_infos,
headers=headers,
showindex=showindex,
maxcolwidths=max_col_widths,
)
print(message.notice(msg))
if method == _NONE:
print(message.warn("use `attach <pid>` to attach"))
return
elif method == _ASK:
while True:
msg = message.notice(
f"Which process to attach to? (1-{len(proc_infos)}) "
)
try:
inp = input(msg).strip()
except EOFError:
return
try:
choice = int(inp)
if not (1 <= choice <= len(proc_infos)):
continue
except ValueError:
continue
break
resolved_target = int(proc_infos[choice - 1][0])
else:
raise Exception("unreachable")
resolved_target = resolve_target_process(pids, method.value, no_truncate)
else:
resolved_target = int(pids[0])
resolved_target = pids[0]
if resolved_target is None:
return
print(message.on(f"Attaching to {resolved_target}"))
try:

@ -96,7 +96,7 @@ def test_attachp_command_attaches_to_procname_resolve_none(launched_sleep_binary
regex += r"-+ -+ -+ -+\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r"use `attach \<pid\>` to attach\n"
regex += r"Use `attach \<pid\>` to attach\n"
matches = re.search(regex, result).groups()
expected = (
@ -131,7 +131,7 @@ def test_attachp_command_attaches_to_procname_resolve_none_no_truncate(launched_
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += r" *([0-9]+) +(\S+) +[0-9:-]+ +(.*)\n"
regex += rf"(?: +-?(?: {FLAG})+(?: | -)?\n)+"
regex += r"use `attach \<pid\>` to attach\n"
regex += r"Use `attach \<pid\>` to attach\n"
matches = re.search(regex, result).groups()
expected = (

Loading…
Cancel
Save