mirror of https://github.com/pwndbg/pwndbg.git
* Add tests for issue #2621 https://github.com/pwndbg/pwndbg/issues/2621#issuecomment-2595162380 * Update to use xuntil, use threading event, and reduce time for timeout * fix f-string mistakepull/3157/head
parent
e5530ca8f3
commit
0abe0a8066
@ -0,0 +1,301 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
|
||||
import gdb
|
||||
import pytest
|
||||
|
||||
import tests
|
||||
|
||||
REFERENCE_BINARY = tests.get_binary("reference-binary.out")
|
||||
USE_FDS_BINARY = tests.get_binary("use-fds.out")
|
||||
|
||||
|
||||
class TCPServerThread(threading.Thread):
|
||||
def __init__(self, *, ip: str, port: int):
|
||||
super().__init__(daemon=True)
|
||||
self.sock = socket.socket(
|
||||
socket.AF_INET6 if ":" in ip else socket.AF_INET, socket.SOCK_STREAM
|
||||
)
|
||||
try:
|
||||
self.sock.bind((ip, port))
|
||||
except OSError:
|
||||
pytest.skip(f"Could not bind to {ip}:{port}.")
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self.sock.listen(1)
|
||||
self.stop_event = threading.Event()
|
||||
|
||||
def stop(self):
|
||||
self.stop_event.set()
|
||||
self.sock.close()
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
# Accept one conn and wait for stop event
|
||||
conn, addr = self.sock.accept()
|
||||
while not self.stop_event.is_set():
|
||||
time.sleep(0.1)
|
||||
except OSError:
|
||||
pass # Socket closed
|
||||
|
||||
|
||||
class UDPServerThread(threading.Thread):
|
||||
def __init__(self, *, ip: str, port: int):
|
||||
super().__init__(daemon=True)
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
try:
|
||||
self.sock.bind((ip, port))
|
||||
except OSError:
|
||||
pytest.skip(f"Could not bind UDP to {ip}:{port}.")
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self.received_data = None
|
||||
|
||||
def stop(self):
|
||||
self.sock.close()
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
# Wait for data with timeout
|
||||
self.sock.settimeout(1)
|
||||
data, addr = self.sock.recvfrom(1024)
|
||||
self.received_data = data
|
||||
except socket.timeout:
|
||||
pass
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def test_hijack_fd_file_redirection(start_binary):
|
||||
"""
|
||||
Test hijack_fd command with file redirection
|
||||
"""
|
||||
start_binary(REFERENCE_BINARY)
|
||||
|
||||
# Create a temporary file for testing
|
||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
|
||||
temp_file.write("test content for hijack_fd")
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
try:
|
||||
# Run until break_here
|
||||
gdb.execute("xuntil &break_here")
|
||||
|
||||
# Hijack stdout (fd 1) to point to our temporary file
|
||||
result = gdb.execute(f"hijack-fd 1 {temp_file_path}", to_string=True)
|
||||
assert "Operation succeeded" in result
|
||||
|
||||
# Actually write to the hijacked file descriptor to validate it works
|
||||
gdb.execute('call write(1, "hello\\n", 6)')
|
||||
|
||||
# Check the file content to verify the write went to our file
|
||||
with open(temp_file_path, "r") as f:
|
||||
content = f.read()
|
||||
assert "hello" in content
|
||||
|
||||
# Verify the file descriptor change using procinfo
|
||||
final_procinfo = gdb.execute("procinfo", to_string=True)
|
||||
|
||||
# Check that fd[1] now points to our temporary file
|
||||
assert temp_file_path in final_procinfo
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if os.path.exists(temp_file_path):
|
||||
os.unlink(temp_file_path)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ip_connect", ["127.0.0.1", "::1"])
|
||||
def test_hijack_fd_socket_redirection(start_binary, ip_connect):
|
||||
"""
|
||||
Test hijack_fd command with TCP socket redirection (IPv4 and IPv6)
|
||||
"""
|
||||
# Start TCP server
|
||||
server = TCPServerThread(ip=ip_connect, port=0)
|
||||
server.start()
|
||||
|
||||
try:
|
||||
start_binary(REFERENCE_BINARY)
|
||||
|
||||
gdb.execute("xuntil break_here")
|
||||
|
||||
# Use URI style for IPv6, plain for IPv4
|
||||
if ":" in ip_connect:
|
||||
uri = f"tcp://[{ip_connect}]:{server.port}"
|
||||
else:
|
||||
uri = f"{ip_connect}:{server.port}"
|
||||
|
||||
result = gdb.execute(f"hijack-fd 2 {uri}", to_string=True)
|
||||
assert "Operation succeeded" in result
|
||||
|
||||
# Verify the file descriptor change using procinfo
|
||||
final_procinfo = gdb.execute("procinfo", to_string=True)
|
||||
|
||||
# Check that fd[2] now shows a socket connection
|
||||
if ":" in ip_connect:
|
||||
assert f"[{ip_connect}]:{server.port}" in final_procinfo
|
||||
else:
|
||||
assert f"{ip_connect}:{server.port}" in final_procinfo
|
||||
|
||||
finally:
|
||||
server.stop()
|
||||
server.join(timeout=2) # Wait for thread to finish
|
||||
|
||||
|
||||
def test_hijack_fd_udp_socket_redirection(start_binary):
|
||||
"""
|
||||
Test hijack_fd command with UDP socket redirection
|
||||
"""
|
||||
# Start UDP server
|
||||
server = UDPServerThread(ip="127.0.0.1", port=0)
|
||||
server.start()
|
||||
|
||||
try:
|
||||
start_binary(REFERENCE_BINARY)
|
||||
|
||||
gdb.execute("xuntil break_here")
|
||||
|
||||
# Hijack stderr (fd 2) to point to our UDP socket
|
||||
result = gdb.execute(f"hijack-fd 2 udp://127.0.0.1:{server.port}", to_string=True)
|
||||
assert "Operation succeeded" in result
|
||||
|
||||
# Verify the file descriptor change using procinfo
|
||||
final_procinfo = gdb.execute("procinfo", to_string=True)
|
||||
|
||||
# Check that fd[2] is now a socket (UDP sockets may not show IP:port in procinfo)
|
||||
# The important thing is that it's a socket, not the original stderr
|
||||
assert "socket:" in final_procinfo
|
||||
|
||||
# Verify that fd[2] specifically is not pointing to /dev/pts/ (original stderr)
|
||||
# We need to check that the line containing fd[2] doesn't contain /dev/pts/
|
||||
lines = final_procinfo.split("\n")
|
||||
fd2_line = None
|
||||
for line in lines:
|
||||
if line.strip().startswith("fd[2]"):
|
||||
fd2_line = line
|
||||
break
|
||||
|
||||
assert fd2_line is not None, "fd[2] not found in procinfo output"
|
||||
assert "/dev/pts/" not in fd2_line, f"fd[2] still points to /dev/pts/: {fd2_line}"
|
||||
|
||||
finally:
|
||||
server.stop()
|
||||
server.join(timeout=2) # Wait for thread to finish
|
||||
|
||||
|
||||
def test_hijack_fd_invalid_fd(start_binary):
|
||||
"""
|
||||
Test hijack_fd command with invalid file descriptor
|
||||
"""
|
||||
start_binary(REFERENCE_BINARY)
|
||||
|
||||
# Run until break_here
|
||||
gdb.execute("xuntil break_here")
|
||||
|
||||
# Try to hijack an invalid file descriptor (negative number)
|
||||
result = gdb.execute("hijack-fd -1 /dev/null", to_string=True)
|
||||
# The command should execute but may not work as expected
|
||||
# We verify it doesn't crash and returns some response
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_hijack_fd_nonexistent_file(start_binary):
|
||||
"""
|
||||
Test hijack_fd command with nonexistent file
|
||||
"""
|
||||
start_binary(REFERENCE_BINARY)
|
||||
|
||||
gdb.execute("xuntil break_here")
|
||||
|
||||
# Use a path that can be created without root permissions
|
||||
test_file_path = "/tmp/nonexistent_test_file"
|
||||
|
||||
# Try to hijack to a nonexistent file
|
||||
result = gdb.execute(f"hijack-fd 1 {test_file_path}", to_string=True)
|
||||
# The command should succeed in creating the file descriptor,
|
||||
# even if the file doesn't exist initially
|
||||
assert "Operation succeeded" in result
|
||||
|
||||
# Verify the file was actually created
|
||||
assert os.path.exists(test_file_path)
|
||||
|
||||
# Clean up
|
||||
if os.path.exists(test_file_path):
|
||||
os.unlink(test_file_path)
|
||||
|
||||
|
||||
def test_hijack_fd_invalid_socket_address(start_binary):
|
||||
"""
|
||||
Test hijack_fd command with invalid socket address
|
||||
"""
|
||||
start_binary(REFERENCE_BINARY)
|
||||
|
||||
gdb.execute("xuntil break_here")
|
||||
|
||||
# Try to hijack to an invalid socket address
|
||||
result = gdb.execute("hijack-fd 2 invalid://address:port", to_string=True)
|
||||
# The command should handle the error gracefully
|
||||
# We verify it doesn't crash the debugger and returns some response
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_hijack_fd_before_binary_start():
|
||||
"""
|
||||
Test hijack_fd command before binary is started
|
||||
"""
|
||||
# Try to use hijack_fd before starting any binary
|
||||
result = gdb.execute("hijack-fd 1 /dev/null", to_string=True)
|
||||
assert "The program is not being run" in result
|
||||
|
||||
|
||||
def test_hijack_fd_help():
|
||||
"""
|
||||
Test hijack_fd command help
|
||||
"""
|
||||
result = gdb.execute("hijack-fd --help", to_string=True)
|
||||
assert "usage: hijack-fd" in result
|
||||
assert "Replace a file descriptor" in result
|
||||
assert "fdnum" in result
|
||||
assert "newfile" in result
|
||||
|
||||
|
||||
def test_hijack_fd_with_use_fds_binary(start_binary):
|
||||
"""
|
||||
Test hijack_fd command with the use-fds binary which opens a file
|
||||
"""
|
||||
start_binary(USE_FDS_BINARY)
|
||||
|
||||
# Run until main
|
||||
gdb.execute("start")
|
||||
|
||||
# Stop after the open() call
|
||||
gdb.execute("nextcall")
|
||||
gdb.execute("nextcall")
|
||||
|
||||
# Get the file descriptor number
|
||||
fd_var = gdb.newest_frame().read_var("fd")
|
||||
fd_num = int(fd_var)
|
||||
|
||||
# Create a temporary file for testing
|
||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
|
||||
temp_file.write("hijacked content")
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
try:
|
||||
# Hijack the opened file descriptor to our temporary file
|
||||
result = gdb.execute(f"hijack-fd {fd_num} {temp_file_path}", to_string=True)
|
||||
assert "Operation succeeded" in result
|
||||
|
||||
# Verify the file descriptor change using procinfo
|
||||
final_procinfo = gdb.execute("procinfo", to_string=True)
|
||||
|
||||
# Check that the hijacked fd now points to our temporary file
|
||||
assert temp_file_path in final_procinfo
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if os.path.exists(temp_file_path):
|
||||
os.unlink(temp_file_path)
|
||||
Loading…
Reference in new issue