Start migrating to Python logging and add log_level command (#2230)

* Start migrating to Python logging and add log_level command

* Add debug log messages to gdbinit.py
pull/2272/head
Gulshan Singh 1 year ago committed by GitHub
parent 83cc8c57cf
commit 95dd553ab7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,6 +7,7 @@ Pwndbg relies on several environment variables to customize its behavior. Below
- `HOME`, `XDG_CACHE_HOME`: Used by `lib.tempfile` to determine temporary file locations. - `HOME`, `XDG_CACHE_HOME`: Used by `lib.tempfile` to determine temporary file locations.
- `PWNDBG_VENV_PATH`: Specifies the virtual environment path for Pwndbg. - `PWNDBG_VENV_PATH`: Specifies the virtual environment path for Pwndbg.
- `PWNDBG_DISABLE_COLORS`: Disables colored output in Pwndbg. - `PWNDBG_DISABLE_COLORS`: Disables colored output in Pwndbg.
- `PWNDBG_LOGLEVEL`: Initial log level to use for log messages.
- `OPENAI_API_KEY`, `ANTHROPIC_API_KEY`: Used by the `ai` command for accessing respective AI APIs. - `OPENAI_API_KEY`, `ANTHROPIC_API_KEY`: Used by the `ai` command for accessing respective AI APIs.
- `GITHUB_ACTIONS`, `RUN_FLAKY`: Used by `tests_commands.py` to determine the test environment. - `GITHUB_ACTIONS`, `RUN_FLAKY`: Used by `tests_commands.py` to determine the test environment.
- `PWNDBG_PROFILE`: Enables profiling for benchmarking. - `PWNDBG_PROFILE`: Enables profiling for benchmarking.

@ -2,6 +2,7 @@ from __future__ import annotations
import cProfile import cProfile
import hashlib import hashlib
import logging
import os import os
import shutil import shutil
import site import site
@ -29,9 +30,10 @@ def hash_file(file_path: str | Path) -> str:
def run_poetry_install(poetry_path: os.PathLike[str], dev: bool = False) -> Tuple[str, str, int]: def run_poetry_install(poetry_path: os.PathLike[str], dev: bool = False) -> Tuple[str, str, int]:
command: List[str | os.PathLike[str]] = [poetry_path, "install"] command: List[str] = [str(poetry_path), "install"]
if dev: if dev:
command.extend(("--with", "dev")) command.extend(("--with", "dev"))
logging.debug(f"Updating deps with command: {' '.join(command)}")
result = subprocess.run(command, capture_output=True, text=True) result = subprocess.run(command, capture_output=True, text=True)
return result.stdout.strip(), result.stderr.strip(), result.returncode return result.stdout.strip(), result.stderr.strip(), result.returncode
@ -60,9 +62,14 @@ def update_deps(src_root: Path, venv_path: Path) -> None:
poetry_lock_hash_path = venv_path / "poetry.lock.hash" poetry_lock_hash_path = venv_path / "poetry.lock.hash"
current_hash = hash_file(src_root / "poetry.lock") current_hash = hash_file(src_root / "poetry.lock")
logging.debug(f"Current poetry.lock hash: {current_hash}")
stored_hash = None stored_hash = None
if poetry_lock_hash_path.exists(): if poetry_lock_hash_path.exists():
stored_hash = poetry_lock_hash_path.read_text().strip() stored_hash = poetry_lock_hash_path.read_text().strip()
logging.debug(f"Stored poetry.lock hash: {stored_hash}")
else:
logging.debug("No stored hash found")
# If the hashes don't match, update the dependencies # If the hashes don't match, update the dependencies
if current_hash != stored_hash: if current_hash != stored_hash:
@ -128,6 +135,21 @@ def skip_venv(src_root) -> bool:
) )
def init_logger():
log_level_env = os.environ.get("PWNDBG_LOGLEVEL", "WARNING")
log_level = getattr(logging, log_level_env.upper())
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Add a custom StreamHandler we will use to customize log message formatting. We
# configure the handler later, after pwndbg has been imported.
handler = logging.StreamHandler()
root_logger.addHandler(handler)
return handler
def main() -> None: def main() -> None:
profiler = cProfile.Profile() profiler = cProfile.Profile()
@ -136,6 +158,8 @@ def main() -> None:
start_time = time.time() start_time = time.time()
profiler.enable() profiler.enable()
handler = init_logger()
src_root = Path(__file__).parent.resolve() src_root = Path(__file__).parent.resolve()
if not skip_venv(src_root): if not skip_venv(src_root):
venv_path = get_venv_path(src_root) venv_path = get_venv_path(src_root)
@ -156,8 +180,12 @@ def main() -> None:
pwndbg.dbg = pwndbg.dbg_mod.gdb.GDB() pwndbg.dbg = pwndbg.dbg_mod.gdb.GDB()
pwndbg.dbg.setup() pwndbg.dbg.setup()
import pwndbg.log
import pwndbg.profiling import pwndbg.profiling
# ColorFormatter relies on pwndbg being loaded, so we can't set it up until now
handler.setFormatter(pwndbg.log.ColorFormatter())
pwndbg.profiling.init(profiler, start_time) pwndbg.profiling.init(profiler, start_time)
if os.environ.get("PWNDBG_PROFILE") == "1": if os.environ.get("PWNDBG_PROFILE") == "1":
pwndbg.profiling.profiler.stop("pwndbg-load.pstats") pwndbg.profiling.profiler.stop("pwndbg-load.pstats")

@ -21,6 +21,8 @@ config_hint_color = theme.add_color_param(
config_success_color = theme.add_color_param( config_success_color = theme.add_color_param(
"message-success-color", "green", "color of success messages" "message-success-color", "green", "color of success messages"
) )
config_debug_color = theme.add_color_param("message-debug-color", "blue", "color of debug messages")
config_info_color = theme.add_color_param("message-info-color", "white", "color of info messages")
config_warning_color = theme.add_color_param( config_warning_color = theme.add_color_param(
"message-warning-color", "yellow", "color of warning messages" "message-warning-color", "yellow", "color of warning messages"
) )
@ -62,6 +64,14 @@ def success(msg: object) -> str:
return generateColorFunction(config.message_success_color)(msg) return generateColorFunction(config.message_success_color)(msg)
def debug(msg: object) -> str:
return generateColorFunction(config.message_warning_color)(msg)
def info(msg: object) -> str:
return generateColorFunction(config.message_warning_color)(msg)
def warn(msg: object) -> str: def warn(msg: object) -> str:
return generateColorFunction(config.message_warning_color)(msg) return generateColorFunction(config.message_warning_color)(msg)

@ -3,6 +3,7 @@ from __future__ import annotations
import argparse import argparse
import functools import functools
import io import io
import logging
from enum import Enum from enum import Enum
from typing import Any from typing import Any
from typing import Callable from typing import Callable
@ -22,12 +23,13 @@ import pwndbg.gdblib.kernel
import pwndbg.gdblib.proc import pwndbg.gdblib.proc
import pwndbg.gdblib.qemu import pwndbg.gdblib.qemu
import pwndbg.gdblib.regs import pwndbg.gdblib.regs
from pwndbg.color import message
from pwndbg.gdblib.heap.ptmalloc import DebugSymsHeap from pwndbg.gdblib.heap.ptmalloc import DebugSymsHeap
from pwndbg.gdblib.heap.ptmalloc import GlibcMemoryAllocator from pwndbg.gdblib.heap.ptmalloc import GlibcMemoryAllocator
from pwndbg.gdblib.heap.ptmalloc import HeuristicHeap from pwndbg.gdblib.heap.ptmalloc import HeuristicHeap
from pwndbg.gdblib.heap.ptmalloc import SymbolUnresolvableError from pwndbg.gdblib.heap.ptmalloc import SymbolUnresolvableError
log = logging.getLogger(__name__)
T = TypeVar("T") T = TypeVar("T")
P = ParamSpec("P") P = ParamSpec("P")
@ -266,9 +268,9 @@ def OnlyWithFile(function: Callable[P, T]) -> Callable[P, Optional[T]]:
return function(*a, **kw) return function(*a, **kw)
else: else:
if pwndbg.gdblib.qemu.is_qemu(): if pwndbg.gdblib.qemu.is_qemu():
print(message.error("Could not determine the target binary on QEMU.")) log.error("Could not determine the target binary on QEMU.")
else: else:
print(message.error(f"{function.__name__}: There is no file loaded.")) log.error(f"{function.__name__}: There is no file loaded.")
return None return None
return _OnlyWithFile return _OnlyWithFile
@ -280,7 +282,7 @@ def OnlyWhenQemuKernel(function: Callable[P, T]) -> Callable[P, Optional[T]]:
if pwndbg.gdblib.qemu.is_qemu_kernel(): if pwndbg.gdblib.qemu.is_qemu_kernel():
return function(*a, **kw) return function(*a, **kw)
else: else:
print( log.error(
f"{function.__name__}: This command may only be run when debugging the Linux kernel in QEMU." f"{function.__name__}: This command may only be run when debugging the Linux kernel in QEMU."
) )
return None return None
@ -294,7 +296,7 @@ def OnlyWhenUserspace(function: Callable[P, T]) -> Callable[P, Optional[T]]:
if not pwndbg.gdblib.qemu.is_qemu_kernel(): if not pwndbg.gdblib.qemu.is_qemu_kernel():
return function(*a, **kw) return function(*a, **kw)
else: else:
print( log.error(
f"{function.__name__}: This command may only be run when not debugging a QEMU kernel target." f"{function.__name__}: This command may only be run when not debugging a QEMU kernel target."
) )
return None return None
@ -317,9 +319,8 @@ def OnlyWithArch(arch_names: List[str]) -> Callable[[Callable[P, T]], Callable[P
return function(*a, **kw) return function(*a, **kw)
else: else:
arches_str = ", ".join(arch_names) arches_str = ", ".join(arch_names)
print( log.error(
f"%s: This command may only be run on the {arches_str} architecture(s)" f"{function.__name__}: This command may only be run on the {arches_str} architecture(s)"
% function.__name__
) )
return None return None
@ -334,7 +335,7 @@ def OnlyWithKernelDebugSyms(function: Callable[P, T]) -> Callable[P, Optional[T]
if pwndbg.gdblib.kernel.has_debug_syms(): if pwndbg.gdblib.kernel.has_debug_syms():
return function(*a, **kw) return function(*a, **kw)
else: else:
print( log.error(
f"{function.__name__}: This command may only be run when debugging a Linux kernel with debug symbols." f"{function.__name__}: This command may only be run when debugging a Linux kernel with debug symbols."
) )
return None return None
@ -348,7 +349,7 @@ def OnlyWhenPagingEnabled(function: Callable[P, T]) -> Callable[P, Optional[T]]:
if pwndbg.gdblib.kernel.paging_enabled(): if pwndbg.gdblib.kernel.paging_enabled():
return function(*a, **kw) return function(*a, **kw)
else: else:
print(f"{function.__name__}: This command may only be run when paging is enabled.") log.error(f"{function.__name__}: This command may only be run when paging is enabled.")
return None return None
return _OnlyWhenPagingEnabled return _OnlyWhenPagingEnabled
@ -360,7 +361,7 @@ def OnlyWhenRunning(function: Callable[P, T]) -> Callable[P, Optional[T]]:
if pwndbg.gdblib.proc.alive: if pwndbg.gdblib.proc.alive:
return function(*a, **kw) return function(*a, **kw)
else: else:
print(f"{function.__name__}: The program is not being run.") log.error(f"{function.__name__}: The program is not being run.")
return None return None
return _OnlyWhenRunning return _OnlyWhenRunning
@ -373,7 +374,7 @@ def OnlyWithTcache(function: Callable[P, T]) -> Callable[P, Optional[T]]:
if pwndbg.gdblib.heap.current.has_tcache(): if pwndbg.gdblib.heap.current.has_tcache():
return function(*a, **kw) return function(*a, **kw)
else: else:
print( log.error(
f"{function.__name__}: This version of GLIBC was not compiled with tcache support." f"{function.__name__}: This version of GLIBC was not compiled with tcache support."
) )
return None return None
@ -387,7 +388,7 @@ def OnlyWhenHeapIsInitialized(function: Callable[P, T]) -> Callable[P, Optional[
if pwndbg.gdblib.heap.current is not None and pwndbg.gdblib.heap.current.is_initialized(): if pwndbg.gdblib.heap.current is not None and pwndbg.gdblib.heap.current.is_initialized():
return function(*a, **kw) return function(*a, **kw)
else: else:
print(f"{function.__name__}: Heap is not initialized yet.") log.error(f"{function.__name__}: Heap is not initialized yet.")
return None return None
return _OnlyWhenHeapIsInitialized return _OnlyWhenHeapIsInitialized
@ -400,8 +401,8 @@ def _is_statically_linked() -> bool:
def _try2run_heap_command(function: Callable[P, T], *a: P.args, **kw: P.kwargs) -> T | None: def _try2run_heap_command(function: Callable[P, T], *a: P.args, **kw: P.kwargs) -> T | None:
e = lambda s: print(message.error(s)) e = log.error
w = lambda s: print(message.warn(s)) w = log.warning
# Note: We will still raise the error for developers when exception-* is set to "on" # Note: We will still raise the error for developers when exception-* is set to "on"
try: try:
return function(*a, **kw) return function(*a, **kw)
@ -438,8 +439,8 @@ def _try2run_heap_command(function: Callable[P, T], *a: P.args, **kw: P.kwargs)
def OnlyWithResolvedHeapSyms(function: Callable[P, T]) -> Callable[P, T | None]: def OnlyWithResolvedHeapSyms(function: Callable[P, T]) -> Callable[P, T | None]:
@functools.wraps(function) @functools.wraps(function)
def _OnlyWithResolvedHeapSyms(*a: P.args, **kw: P.kwargs) -> T | None: def _OnlyWithResolvedHeapSyms(*a: P.args, **kw: P.kwargs) -> T | None:
e = lambda s: print(message.error(s)) e = log.error
w = lambda s: print(message.warn(s)) w = log.warn
if ( if (
isinstance(pwndbg.gdblib.heap.current, HeuristicHeap) isinstance(pwndbg.gdblib.heap.current, HeuristicHeap)
and pwndbg.config.resolve_heap_via_heuristic == "auto" and pwndbg.config.resolve_heap_via_heuristic == "auto"

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import logging
import pwndbg.color.message as MessageColor import pwndbg.color.message as MessageColor
import pwndbg.commands import pwndbg.commands
@ -63,3 +64,20 @@ def dev_dump_instruction(address=None, force_emulate=False, no_emulate=False) ->
if instructions: if instructions:
insn = instructions[0] insn = instructions[0]
print(repr(insn)) print(repr(insn))
parser = argparse.ArgumentParser(description="Set the log level.")
parser.add_argument(
"level",
type=str,
nargs="?",
choices=["debug", "info", "warning", "error", "critical"],
default="warning",
help="The log level to set.",
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.DEV)
def log_level(level: str) -> None:
logging.getLogger().setLevel(getattr(logging, level.upper()))
print(f"Log level set to {level}")

@ -0,0 +1,20 @@
from __future__ import annotations
import logging
import pwndbg.color
class ColorFormatter(logging.Formatter):
log_funcs = {
logging.DEBUG: pwndbg.color.message.debug,
logging.INFO: pwndbg.color.message.info,
logging.WARNING: pwndbg.color.message.warn,
logging.ERROR: pwndbg.color.message.error,
logging.CRITICAL: pwndbg.color.message.error,
}
def format(self, record):
log_func = self.log_funcs.get(record.levelno)
formatter = logging.Formatter(log_func("%(message)s"))
return formatter.format(record)
Loading…
Cancel
Save