Implement fzf completion in lldb like GEP (#3075)

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP

* Implement fzf completion in lldb like GEP
pull/3105/head
Zhi-Qiang Zhou 6 months ago committed by GitHub
parent 4769476b7b
commit ac7813bb21
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -40,6 +40,7 @@ import argparse
import asyncio
import os
import re
import shutil
import signal
import sys
import threading
@ -70,12 +71,19 @@ from pwndbg.dbg.lldb.repl.io import IODriver
from pwndbg.dbg.lldb.repl.io import get_io_driver
from pwndbg.dbg.lldb.repl.proc import EventHandler
from pwndbg.dbg.lldb.repl.proc import ProcessDriver
from pwndbg.dbg.lldb.repl.readline import PROMPT
from pwndbg.dbg.lldb.repl.readline import enable_readline
from pwndbg.dbg.lldb.repl.readline import wrap_with_history
from pwndbg.lib.tips import color_tip
from pwndbg.lib.tips import get_tip_of_the_day
HAS_FZF = shutil.which("fzf") is not None
if HAS_FZF:
from pwndbg.dbg.lldb.repl.fuzzy import PROMPT
from pwndbg.dbg.lldb.repl.fuzzy import get_prompt_session
from pwndbg.dbg.lldb.repl.fuzzy import wrap_with_history
else:
from pwndbg.dbg.lldb.repl.readline import PROMPT
from pwndbg.dbg.lldb.repl.readline import enable_readline
from pwndbg.dbg.lldb.repl.readline import wrap_with_history
show_tip = pwndbg.config.add_param(
"show-tips", True, "whether to display the tip of the day on startup"
)
@ -284,7 +292,10 @@ def run(
assert isinstance(pwndbg.dbg, LLDB)
dbg: LLDB = pwndbg.dbg
enable_readline(dbg)
if HAS_FZF:
session = get_prompt_session(dbg)
else:
enable_readline(dbg)
# We're gonna be dealing with process events ourselves, so we'll want to run
# LLDB in asynchronous mode.
@ -335,7 +346,13 @@ def run(
print("[-] REPL: Prompt next command from user interactively")
try:
line = input(PROMPT)
if HAS_FZF:
try:
line = session.prompt(message=PROMPT)
except KeyboardInterrupt:
continue
else:
line = input(PROMPT)
# If the input is empty (i.e., 'Enter'), use the previous command
if line:
last_command = line

@ -0,0 +1,332 @@
# Licensed under the MIT License
# Modified from https://github.com/lebr0nli/GEP
# Copyright (c) 2022 Alan Li
# Copyright (c) 2025 Zhi-Qiang Zhou
from __future__ import annotations
import atexit
import functools
import os
import re
import shutil
import sys
import tempfile
import threading
from subprocess import PIPE
from subprocess import Popen
from typing import Callable
from typing import Iterator
from typing import ParamSpec
from typing import TypeVar
import lldb
from prompt_toolkit import ANSI
from prompt_toolkit import PromptSession
from prompt_toolkit.application import run_in_terminal
from prompt_toolkit.completion import CompleteEvent
from prompt_toolkit.completion import Completer
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.output import create_output
from pwndbg.dbg.lldb import LLDB
# global variables
P = ParamSpec("P")
T = TypeVar("T")
PROMPT = ANSI("\x1b[34mpwndbg-lldb> ")
HISTORY_FILE = os.path.expanduser("~/.pwndbg_history")
FZF_RUN_CMD = (
"fzf",
"--select-1",
"--exit-0",
"--tiebreak=index",
"--no-multi",
"--height=40%",
"--layout=reverse",
)
FZF_PRVIEW_WINDOW_ARGS = (
"--preview-window",
"right:55%:wrap",
)
def get_lldb_completes(dbg: LLDB, query: str = "") -> list[str]:
interp: lldb.SBCommandInterpreter = dbg.debugger.GetCommandInterpreter()
matches = lldb.SBStringList()
_num_matches = interp.HandleCompletion(query, len(query), 0, -1, matches)
proposals = [matches.GetStringAtIndex(i) for i in range(1, matches.GetSize())]
return proposals
def safe_get_help_docs(dbg: LLDB, command: str) -> str | None:
interp: lldb.SBCommandInterpreter = dbg.debugger.GetCommandInterpreter()
add_to_history = False
result = lldb.SBCommandReturnObject()
interp.HandleCommand(f"help {command}", result, add_to_history)
return result.GetOutput()
def should_get_help_docs(dbg: LLDB, completion: str) -> bool:
"""
Check if we need to get help docs for another completion that generated by same command.
"""
if " " not in completion.strip():
return True
parent_command, _ = completion.rsplit(maxsplit=1)
return safe_get_help_docs(dbg, parent_command) != safe_get_help_docs(dbg, completion)
def get_lldb_completion_and_status(dbg: LLDB, query: str) -> tuple[list[str], bool]:
"""
Return all possible completions and whether we need to get help docs for all completions.
"""
all_completions = get_lldb_completes(dbg, query)
# peek the first completion
should_get_all_help_docs = False
if all_completions:
should_get_all_help_docs = should_get_help_docs(dbg, all_completions[0])
return all_completions, should_get_all_help_docs
def create_fzf_process(query: str, preview: str = "", pre_cmd: str = "") -> Popen[str]:
"""
Create a fzf process with given query and preview command.
"""
if query.startswith("!"):
# ! in the beginning of query means we want to run the command directly for fzf
query = "^" + query
cmd: tuple[str, ...] = (
FZF_RUN_CMD + ("--query", query) + ("--prompt", "> \x1b[35m" + pre_cmd + "\x1b[0m")
)
if preview:
cmd += FZF_PRVIEW_WINDOW_ARGS
cmd += ("--preview", preview)
return Popen(cmd, stdin=PIPE, stdout=PIPE, text=True)
def create_preview_fifos() -> tuple[str, str]:
"""
Create a temporary directory and two FIFOs in it, return the paths of these FIFOs.
This is modified from:
https://github.com/infokiller/config-public/blob/652b4638a0a0ffed9743fa9e0ad2a8d4e4e90572/.config/ipython/profile_default/startup/ext/fzf_history.py#L128
"""
fifo_dir = tempfile.mkdtemp(prefix="pwndbg_lldb_tab_fzf_")
fifo_input_path = os.path.join(fifo_dir, "input")
fifo_output_path = os.path.join(fifo_dir, "output")
os.mkfifo(fifo_input_path)
os.mkfifo(fifo_output_path)
atexit.register(shutil.rmtree, fifo_dir)
return fifo_input_path, fifo_output_path
def fzf_reverse_search(event: KeyPressEvent) -> None:
"""Reverse search history with fzf."""
def _fzf_reverse_search() -> None:
if not os.path.exists(HISTORY_FILE):
# just create an empty file
with open(HISTORY_FILE, "w"):
pass
p = create_fzf_process(event.app.current_buffer.document.text_before_cursor)
with open(HISTORY_FILE) as f:
visited = set()
# Reverse the history, and only keep the youngest and unique one
for line in f.read().strip().split("\n")[::-1]:
if line and line not in visited:
visited.add(line)
p.stdin.write(line + "\n")
stdout, _ = p.communicate()
if stdout:
event.app.current_buffer.document = Document() # clear buffer
event.app.current_buffer.insert_text(stdout.strip())
run_in_terminal(_fzf_reverse_search)
def fzf_tab_autocomplete(
event: KeyPressEvent, dbg: LLDB, preview: str, fifo_in: str, fifo_out: str
) -> None:
"""
Tab autocomplete with fzf.
"""
def _fzf_tab_autocomplete() -> None:
target_text = (
event.app.current_buffer.document.text_before_cursor.lstrip()
) # Ignore leading whitespaces
if " " in target_text:
pre_cmd = " ".join([i for i in target_text.split(" ")[:-1] if i != " "]) + " "
else:
pre_cmd = ""
all_completions, should_get_all_help_docs = get_lldb_completion_and_status(dbg, pre_cmd)
if not all_completions:
return
query = target_text[len(pre_cmd) :]
p = create_fzf_process(
query, preview=preview if should_get_all_help_docs else None, pre_cmd=pre_cmd
)
completion_help_docs = {}
for i, completion in enumerate(all_completions):
p.stdin.write(completion + "\n")
if should_get_all_help_docs:
completion_help_docs[i] = safe_get_help_docs(dbg, pre_cmd + completion)
t = FzfTabCompletePreviewThread(fifo_in, fifo_out, completion_help_docs)
t.start()
stdout, _ = p.communicate()
t.stop()
if stdout:
event.app.current_buffer.delete_before_cursor(len(query))
event.app.current_buffer.insert_text(stdout.rstrip() + " ")
run_in_terminal(_fzf_tab_autocomplete)
class FzfTabCompletePreviewThread(threading.Thread):
"""
A thread for previewing help docs of selected completion with fzf.
This is modified from:
https://github.com/infokiller/config-public/blob/master/.config/ipython/profile_default/startup/ext/fzf_history.py#L72
"""
def __init__(
self,
fifo_input_path: str,
fifo_output_path: str,
completion_help_docs: dict[int, str],
**kwargs,
) -> None:
super().__init__(**kwargs)
self.fifo_input_path = fifo_input_path
self.fifo_output_path = fifo_output_path
self.completion_help_docs = completion_help_docs
self.is_done = threading.Event()
def run(self) -> None:
while not self.is_done.is_set():
with open(self.fifo_input_path, encoding="utf-8") as fifo_input:
while not self.is_done.is_set():
data = fifo_input.read()
if len(data) == 0:
break
with open(self.fifo_output_path, "w", encoding="utf-8") as fifo_output:
try:
idx = int(data)
except ValueError:
continue
help_doc = self.completion_help_docs.get(idx)
if help_doc is not None:
fifo_output.write(help_doc)
def stop(self) -> None:
self.is_done.set()
with open(self.fifo_input_path, "w", encoding="utf-8") as f:
f.close()
self.join()
class LLDBHistory(FileHistory):
"""
Manage your LLDB History
"""
def __init__(self, filename: str, ignore_duplicates: bool = False) -> None:
self.ignore_duplicates = ignore_duplicates
super().__init__(filename=filename)
def load_history_strings(self) -> list[str]:
strings = []
if os.path.exists(self.filename):
with open(self.filename) as f:
for string in reversed(f.read().splitlines()):
if self.ignore_duplicates and string in strings:
continue
if string:
strings.append(string)
return strings
def store_string(self, string: str) -> None:
with open(self.filename, "a") as f:
f.write(string.strip() + "\n")
class LLDBCompleter(Completer):
"""
Completer of LLDB
"""
def __init__(self, dbg):
super().__init__()
self.dbg = dbg
def get_completions(
self, document: Document, complete_event: CompleteEvent
) -> Iterator[Completion]:
target_text = document.text_before_cursor.lstrip() # Ignore leading whitespaces
cursor_idx_in_completion = len(target_text)
all_completions, should_get_all_help_docs = get_lldb_completion_and_status(
self.dbg, target_text
)
if not all_completions:
return
for completion in all_completions:
if not completion.startswith(target_text):
continue
display_meta = (
None
if not should_get_all_help_docs
else safe_get_help_docs(self.dbg, completion) or None
)
# remove some prefix of raw completion
completion = completion[cursor_idx_in_completion:]
# display readable completion based on the text before cursor
display = re.split(r"\W+", target_text)[-1] + completion
yield Completion(completion, display=display, display_meta=display_meta)
def wrap_with_history(function: Callable[P, T]) -> Callable[P, T]:
@functools.wraps(function)
def _wrapped(*a: P.args, **kw: P.kwargs) -> T:
return function(*a, **kw)
return _wrapped
def get_prompt_session(dbg):
bindings = KeyBindings()
# key binding for fzf history search
bindings.add("c-r")(fzf_reverse_search)
# key binding for fzf tab completion
fifo_in, fifo_out = create_preview_fifos()
preview = "echo {n} > %s\ncat %s" % (fifo_in, fifo_out)
bindings.add("c-i")(
functools.partial(
fzf_tab_autocomplete,
dbg=dbg,
preview=preview,
fifo_in=fifo_in,
fifo_out=fifo_out,
)
)
return PromptSession(
history=LLDBHistory(HISTORY_FILE, ignore_duplicates=True),
completer=LLDBCompleter(dbg),
complete_while_typing=False,
key_bindings=bindings,
output=create_output(stdout=sys.__stdout__),
)
Loading…
Cancel
Save