Rewrite some unreliable methods for the heap heuristics (#1579)

* Refactor `pwndbg.glibc`

- Add type hints
- Use `info sharedlibrary` to find libc
- Update the regex of libc filename
- Rename `get_data_address()` to `get_data_section_address()`

* Add a function to dump libc ELF file's .data section

* Use the new methods to find `main_arena` and `mp_`

With ELF of libc, we can use the default value of `main_arena` and `mp_` to find their address

* Drop some unreliable methods for the heap heuristics

* Update the tests for the heap heuristics

* Show `main_arena` address in the `arenas` command output

* Make the heap hueristics support statically linked targets

* Drop some deprecated TLS functions and refactor the command

- Drop some deprecated TLS functions for the deprecated heap heuristics
- Don't call `pthread_self()` in the `tls` command without `-p` option
- Show the page of TLS in the `tls` command output

* Update the hint for the heap heuristics for multi-threaded

* Fix the wrong usage of the exception

* Fix the outdated description

* Return the default global_max_fast when we cannot find the address

* Enhance the output of `arena` and `mp`

- Show the address of the arena we print in the output of `arena` command if we didn't specify the address by ourselves.
- Avoid the bug that `arena` command might get an error if thread_arena doesn't allocate yet.
- Show the address of `mp_`  in the output of the `mp` command

* Remove wrong hint

* Support using brute-force to find the address of main_arena

If the user allows, brute-force the left and right sides of the TLS address to find the closest possible value to the TLS address.

* Refactor the code about thread_arena and add the new brute-force strategy

In the .got section, brute-force search for possible TLS-reference values to find possible thread_arena locations

* Add tests for thread_arena and global_max_fast

- Check if we can get default global_max_fast
- Check if we can use brute-force to find thread_arena

* Update the output of `arenas`

* Add the test for the `tls` command

Add two tests for the `tls` command:

```
test_tls_address_and_command[x86-64]                                   PASSED
test_tls_address_and_command[i386]                                     PASSED
```

* Update and refactor the heuristics for `thread_arena` and `tcache`

- We provide an option for users to brute force `tcache` like what we did for `thread_arena`
- Cache `thread_arena` even when we are single-threaded
- Refactor the code for `thread_arena`, to make it work for `tcache` as well
- Update the tests for `tcache`
- Remove some redundant hint

* Fix the wrong cache mechanism

Cache the address of the arena instead of the instance of `Arena`, because `Arena` will cache the value of the field, resulting in getting the old value the next time the same property is used

* Update the description of some configs about heap heuristics

* Handling the case when tcache is NULL

* Handling the case when thread_arena is NULL

* Fix a bug that occurred when the TLS address could not be found

* Fix #1550

* Show tid only if no address is specified

* Update pwndbg/commands/__init__.py

* Update pwndbg/commands/heap.py

* Update pwndbg/commands/heap.py

* Update pwndbg/commands/heap.py

* Update pwndbg/commands/heap.py

* Update pwndbg/commands/heap.py

* Update pwndbg/commands/heap.py

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>

* Fix lint

* Move some code into `pwndbg.gdblib.elf`

---------

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
pull/1590/head
Alan Li 3 years ago committed by GitHub
parent 8b7dd56e97
commit 449070557d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -363,9 +363,15 @@ def _try2run_heap_command(function, a, kw):
return function(*a, **kw)
except SymbolUnresolvableError as err:
e(f"{function.__name__}: Fail to resolve the symbol: `{err.symbol}`")
w(
f"You can try to determine the libc symbols addresses manually and set them appropriately. For this, see the `heap_config` command output and set the config about `{err.symbol}`."
)
if "thread_arena" == err.symbol:
w(
"You are probably debugging a multi-threaded target without debug symbols, so we failed to determine which arena is used by the current thread.\n"
"To resolve this issue, you can use the `arenas` command to list all arenas, and use `set thread-arena <addr>` to set the current thread's arena address you think is correct.\n"
)
else:
w(
f"You can try to determine the libc symbols addresses manually and set them appropriately. For this, see the `heap_config` command output and set the config for `{err.symbol}`."
)
if pwndbg.gdblib.config.exception_verbose or pwndbg.gdblib.config.exception_debugger:
raise err
else:

@ -141,6 +141,14 @@ def heap(addr=None, verbose=False, simple=False) -> None:
chunk = chunk.next_chunk()
else:
arena = allocator.thread_arena
# arena might be None if the current thread doesn't allocate the arena
if arena is None:
print(
message.notice(
"No arena found for current thread (the thread hasn't performed any allocations)."
)
)
return
h = arena.active_heap
for chunk in h:
@ -168,9 +176,21 @@ def arena(addr=None) -> None:
arena = Arena(addr)
else:
arena = allocator.thread_arena
tid = pwndbg.gdblib.proc.thread_id
# arena might be None if the current thread doesn't allocate the arena
if arena is None:
print(
message.notice(
f"No arena found for thread {message.hint(tid)} (the thread hasn't performed any allocations)."
)
)
return
print(
message.notice(
f"Arena for thread {message.hint(tid)} is located at: {message.hint(hex(arena.address))}"
)
)
tid = pwndbg.gdblib.proc.thread_id
print(message.hint(f"Arena for thread {tid}:"))
print(arena._gdbValue) # Breaks encapsulation, find a better way.
@ -184,8 +204,13 @@ parser = argparse.ArgumentParser(description="List this process's arenas.")
def arenas() -> None:
"""Lists this process's arenas."""
allocator = pwndbg.heap.current
for ar in allocator.arenas:
print(ar)
arenas = allocator.arenas
print("main_arena:")
print(arenas[0])
if arenas[1:]:
print("non-main arena:")
for arena in arenas[1:]:
print(arena)
parser = argparse.ArgumentParser(
@ -208,7 +233,13 @@ def tcache(addr=None) -> None:
"""
allocator = pwndbg.heap.current
tcache = allocator.get_tcache(addr)
print(tcache)
# if the current thread doesn't allocate the arena, tcache will be NULL
print(
message.notice("tcache is pointing to: ")
+ message.hint(hex(tcache.address) if tcache else "NULL")
)
if tcache:
print(tcache)
parser = argparse.ArgumentParser(description="Print the mp_ struct's contents.")
@ -221,6 +252,7 @@ parser = argparse.ArgumentParser(description="Print the mp_ struct's contents.")
def mp() -> None:
"""Print the mp_ struct's contents."""
allocator = pwndbg.heap.current
print(message.notice("mp_ struct at: ") + message.hint(hex(allocator.mp.address)))
print(allocator.mp)
@ -247,6 +279,14 @@ def top_chunk(addr=None) -> None:
arena = Arena(addr)
else:
arena = allocator.thread_arena
# arena might be None if the current thread doesn't allocate the arena
if arena is None:
print(
message.notice(
"No arena found for current thread (the thread hasn't performed any allocations)"
)
)
return
malloc_chunk(arena.top)
@ -295,7 +335,6 @@ def malloc_chunk(addr, fake=False, verbose=False, simple=False) -> None:
headers_to_print.append(message.off("Top chunk"))
if not chunk.is_top_chunk and arena:
bins_list = [
allocator.fastbins(arena.address),
allocator.smallbins(arena.address),
@ -672,6 +711,14 @@ def vis_heap_chunks(addr=None, count=None, naive=None, display_all=None) -> None
arena = heap_region.arena
else:
arena = allocator.thread_arena
# arena might be None if the current thread doesn't allocate the arena
if arena is None:
print(
message.notice(
"No arena found for current thread (the thread hasn't performed any allocations)"
)
)
return
heap_region = arena.active_heap
cursor = heap_region.start
@ -852,6 +899,14 @@ def try_free(addr) -> None:
# constants
allocator = pwndbg.heap.current
arena = allocator.thread_arena
# arena might be None if the current thread doesn't allocate the arena
if arena is None:
print(
message.notice(
"No arena found for current thread (the thread hasn't performed any allocations)"
)
)
return
aligned_lsb = allocator.malloc_align_mask.bit_length()
size_sz = allocator.size_sz
@ -1215,6 +1270,6 @@ def heap_config(filter_pattern) -> None:
print(
message.hint(
"Some config(e.g. main_arena) will only working when resolve-heap-via-heuristic is `True`"
"Some config values (e.g. main_arena) will be used only when resolve-heap-via-heuristic is `auto` or `force`"
)
)

@ -1,21 +1,45 @@
"""
Command to print the information of the current Thread Local Storage (TLS).
"""
import argparse
import pwndbg.commands
import pwndbg.gdblib.tls
from pwndbg.color import message
from pwndbg.commands import CommandCategory
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description="Print out base address of the current Thread Local Storage (TLS).",
)
@pwndbg.commands.ArgparsedCommand(
"Print out base address of the current Thread Local Storage (TLS).",
category=CommandCategory.LINUX,
parser.add_argument(
"-p",
"--pthread-self",
action="store_true",
default=False,
help="Try to get the address of TLS by calling pthread_self().",
)
@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.LINUX)
@pwndbg.commands.OnlyWhenRunning
def tls() -> None:
tls_base = pwndbg.gdblib.tls.address
if tls_base:
def tls(pthread_self=False) -> None:
tls_base = (
pwndbg.gdblib.tls.find_address_with_register()
if not pthread_self
else pwndbg.gdblib.tls.find_address_with_pthread_self()
)
if pwndbg.gdblib.memory.is_readable_address(tls_base):
print(message.success("Thread Local Storage (TLS) base: %#x" % tls_base))
else:
print(message.error("Couldn't find Thread Local Storage (TLS) base."))
print(message.success("TLS is located at:"))
print(message.notice(pwndbg.gdblib.vmmap.find(tls_base)))
return
print(message.error("Couldn't find Thread Local Storage (TLS) base."))
if not pthread_self:
print(
message.notice(
"You can try to use -p/--pthread option to get the address of TLS by calling pthread_self().\n"
"(This might cause problems if the pthread_self() is not in libc or not initialized yet.)"
)
)

@ -11,6 +11,8 @@ import importlib
import sys
from collections import namedtuple
from typing import List
from typing import Optional
from typing import Tuple
import gdb
from elftools.elf.constants import SH_FLAGS
@ -171,6 +173,19 @@ def get_containing_sections(elf_filepath, elf_loadaddr, vaddr):
return sections
def dump_section_by_name(filepath: str, section_name: str) -> Optional[Tuple[int, int, bytes]]:
"""
Dump the content of a section from an ELF file, return the start address, size and content.
"""
# TODO: We should have some cache mechanism or something at `pndbg.gdblib.file.get_file()` in the future to avoid downloading the same file multiple times when we are debugging a remote process
local_path = pwndbg.gdblib.file.get_file(filepath)
with open(local_path, "rb") as f:
elffile = ELFFile(f)
section = elffile.get_section_by_name(section_name)
return (section["sh_addr"], section["sh_size"], section.data()) if section else None
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.memoize.reset_on_start
def exe():

@ -46,7 +46,9 @@ def get_file(path: str) -> str:
Returns:
The local path to the file
"""
assert path.startswith("/") or path.startswith("target:"), "get_file called with incorrect path"
assert path.startswith(("/", "./")) or path.startswith(
"target:"
), "get_file called with incorrect path"
if path.startswith("target:"):
path = path[7:] # len('target:') == 7

@ -9,6 +9,7 @@ import sys
from types import ModuleType
from typing import Any
from typing import Callable
from typing import Optional
from typing import Tuple
import gdb
@ -103,6 +104,38 @@ class module(ModuleType):
def binary_vmmap(self) -> Tuple[pwndbg.lib.memory.Page, ...]:
return tuple(p for p in pwndbg.gdblib.vmmap.get() if p.objfile == self.exe)
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def dump_elf_data_section(self) -> Optional[Tuple[int, int, bytes]]:
"""
Dump .data section of current process's ELF file
"""
return pwndbg.gdblib.elf.dump_section_by_name(self.exe, ".data")
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def get_data_section_address(self) -> int:
"""
Find .data section address of current process.
"""
out = pwndbg.gdblib.info.files()
for line in out.splitlines():
if line.endswith(" is .data"):
return int(line.split()[0], 16)
return 0
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def get_got_section_address(self) -> int:
"""
Find .got section address of current process.
"""
out = pwndbg.gdblib.info.files()
for line in out.splitlines():
if line.endswith(" is .got"):
return int(line.split()[0], 16)
return 0
def OnlyWhenRunning(self, func):
@functools.wraps(func)
def wrapper(*a, **kw):

@ -1,8 +1,6 @@
"""
Getting Thread Local Storage (TLS) information.
"""
import sys
from types import ModuleType
import gdb
@ -15,56 +13,48 @@ import pwndbg.gdblib.vmmap
from pwndbg.gdblib.scheduler import parse_and_eval_with_scheduler_lock
class module(ModuleType):
"""Getting Thread Local Storage (TLS) information."""
def is_thread_local_variable_offset(self, offset: int) -> bool:
"""Check if the offset to TLS is a valid offset for the heap heuristics."""
if pwndbg.gdblib.arch.current in ("x86-64", "i386"):
is_valid = 0 < -offset < 0x250
else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"):
is_valid = 0 < offset < 0x250
# check alignment
return is_valid and offset % pwndbg.gdblib.arch.ptrsize == 0
def is_thread_local_variable(self, addr: int) -> bool:
"""Check if the address is a valid thread local variable's address for the heap heuristics."""
if not self.address:
# Since we can not get the TLS base address, we trust that the address is valid.
return True
return self.is_thread_local_variable_offset(
addr - self.address
) and addr in pwndbg.gdblib.vmmap.find(self.address)
def call_pthread_self(self) -> int:
"""Get the address of TLS by calling pthread_self()."""
if pwndbg.gdblib.symbol.address("pthread_self") is None:
return 0
try:
return int(parse_and_eval_with_scheduler_lock("(void *)pthread_self()"))
except gdb.error:
return 0
@property
def address(self) -> int:
"""Get the base address of TLS."""
tls_base = 0
if pwndbg.gdblib.arch.current == "x86-64":
tls_base = int(pwndbg.gdblib.regs.fsbase)
elif pwndbg.gdblib.arch.current == "i386":
tls_base = int(pwndbg.gdblib.regs.gsbase)
elif pwndbg.gdblib.arch.current == "aarch64":
tls_base = int(pwndbg.gdblib.regs.TPIDR_EL0)
# Sometimes, we need to get TLS base via pthread_self() for the following reason:
# For x86-64, fsbase might be 0 if we are remotely debugging and the GDB version <= 8.X
# For i386, gsbase might be 0 if we are remotely debugging
# For other archs, we can't get the TLS base address via register
# Note: aarch64 seems doesn't have this issue
return tls_base if tls_base else self.call_pthread_self()
# To prevent garbage collection
tether = sys.modules[__name__]
sys.modules[__name__] = module(__name__, "")
def __call_pthread_self() -> int:
"""Get the address of TLS by calling pthread_self()."""
if pwndbg.gdblib.symbol.address("pthread_self") is None:
return 0
try:
return int(parse_and_eval_with_scheduler_lock("(void *)pthread_self()"))
except gdb.error:
return 0
def find_address_with_pthread_self() -> int:
"""Get the address of TLS with pthread_self()."""
if pwndbg.gdblib.arch.current not in ("x86-64", "i386", "arm"):
# Note: we should support aarch64 if it's possible that TPIDR_EL0 register can not be accessed.
return 0
result = __call_pthread_self()
if result <= 0:
# pthread_self() is not valid
return 0
# pthread_self() is defined as: https://elixir.bootlin.com/glibc/glibc-2.37/source/nptl/pthread_self.c#L22
# THREAD_SELF is defined as:
# i386: https://elixir.bootlin.com/glibc/glibc-2.37/source/sysdeps/i386/nptl/tls.h#L234
# x86-64: https://elixir.bootlin.com/glibc/glibc-2.37/source/sysdeps/x86_64/nptl/tls.h#L181
# arm: https://elixir.bootlin.com/glibc/latest/source/sysdeps/arm/nptl/tls.h#L76
# For i386 and x86-64, the return value of the pthread_self() is the address of TLS, because the value is self reference of the TLS: https://elixir.bootlin.com/glibc/glibc-2.37/source/nptl/pthread_create.c#L671
# But for arm, the implementation of THREAD_SELF is different, we need to add sizeof(struct pthread) to the result to get the address of TLS.
if pwndbg.gdblib.arch.current == "arm":
# 0x4c0 is sizeof(struct pthread)
# TODO: we might need to adjust the value if the size of struct pthread is changed in the future.
result += 0x4C0
return result
def find_address_with_register() -> int:
"""Get the address of TLS with register."""
if pwndbg.gdblib.arch.current == "x86-64":
return int(pwndbg.gdblib.regs.fsbase)
elif pwndbg.gdblib.arch.current == "i386":
return int(pwndbg.gdblib.regs.gsbase)
elif pwndbg.gdblib.arch.current == "aarch64":
return int(pwndbg.gdblib.regs.TPIDR_EL0)
# TODO: is it possible that we can get the address of TLS with register on arm?
return 0

@ -5,10 +5,14 @@ Get information about the GLibc
import functools
import os
import re
from typing import Optional
from typing import Tuple
import gdb
import pwndbg.gdblib.config
import pwndbg.gdblib.elf
import pwndbg.gdblib.file
import pwndbg.gdblib.info
import pwndbg.gdblib.memory
import pwndbg.gdblib.proc
@ -30,7 +34,7 @@ glibc_version = pwndbg.gdblib.config.add_param(
@pwndbg.gdblib.proc.OnlyWhenRunning
def get_version():
def get_version() -> Optional[Tuple[int, ...]]:
if glibc_version.value:
ret = re.search(r"(\d+)\.(\d+)", glibc_version.value)
if ret:
@ -46,7 +50,7 @@ def get_version():
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def _get_version():
def _get_version() -> Optional[Tuple[int, ...]]:
if pwndbg.heap.current.libc_has_debug_syms():
addr = pwndbg.gdblib.symbol.address("__libc_version")
if addr is not None:
@ -63,21 +67,62 @@ def _get_version():
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def get_data_address():
def get_libc_filename_from_info_sharedlibrary() -> Optional[str]:
for line in pwndbg.gdblib.info.sharedlibrary().splitlines()[1:]:
filename = line.split(maxsplit=3)[-1].lstrip("(*)").lstrip()
# Is it possible that the libc is not called `libc.so.6`?
if os.path.basename(filename) == "libc.so.6":
return filename
return None
@pwndbg.gdblib.proc.OnlyWhenRunning
def dump_elf_data_section() -> Optional[Tuple[int, int, bytes]]:
"""
Dump .data section of libc ELF file
"""
libc_filename = get_libc_filename_from_info_sharedlibrary()
if not libc_filename:
# libc not loaded yet, or it's static linked
return None
return pwndbg.gdblib.elf.dump_section_by_name(libc_filename, ".data")
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def get_data_section_address() -> int:
"""
Find .data section address of libc
"""
# Try every possible object file, to find which one has `.data` section showed in `info files`
for libc_filename in (
objfile.filename
for objfile in gdb.objfiles()
if re.search(r"^libc(\.|-.+\.)so", os.path.basename(objfile.filename))
):
# Will `info files` always work? If not, we should probably use `ELFFile` to parse libc file directly
out = pwndbg.gdblib.info.files()
for line in out.splitlines():
if libc_filename in line and " is .data in " in line:
return int(line.strip().split()[0], 16)
libc_filename = get_libc_filename_from_info_sharedlibrary()
if not libc_filename:
# libc not loaded yet, or it's static linked
return 0
# TODO: If we are debugging a remote process, this might not work if GDB cannot load the so file
out = pwndbg.gdblib.info.files()
for line in out.splitlines():
if line.endswith(" is .data in " + libc_filename):
return int(line.split()[0], 16)
return 0
@pwndbg.gdblib.proc.OnlyWhenRunning
@pwndbg.lib.memoize.reset_on_start
@pwndbg.lib.memoize.reset_on_objfile
def get_got_section_address() -> int:
"""
Find .got section address of libc
"""
libc_filename = get_libc_filename_from_info_sharedlibrary()
if not libc_filename:
# libc not loaded yet, or it's static linked
return 0
# TODO: If we are debugging a remote process, this might not work if GDB cannot load the so file
out = pwndbg.gdblib.info.files()
for line in out.splitlines():
if line.endswith(" is .got in " + libc_filename):
return int(line.split()[0], 16)
return 0

@ -22,15 +22,15 @@ def add_heap_param(
)
main_arena = add_heap_param("main-arena", "0", "&main_arena for heuristics")
main_arena = add_heap_param("main-arena", "0", "the address of main_arena")
thread_arena = add_heap_param("thread-arena", "0", "*thread_arena for heuristics")
thread_arena = add_heap_param("thread-arena", "0", "the address pointed by thread_arena")
mp_ = add_heap_param("mp", "0", "&mp_ for heuristics")
mp_ = add_heap_param("mp", "0", "the address of mp_")
tcache = add_heap_param("tcache", "0", "*tcache for heuristics")
tcache = add_heap_param("tcache", "0", "the address pointed by tcache")
global_max_fast = add_heap_param("global-max-fast", "0", "&global_max_fast for heuristics")
global_max_fast = add_heap_param("global-max-fast", "0", "the address of global_max_fast")
symbol_list = [main_arena, thread_arena, mp_, tcache, global_max_fast]

File diff suppressed because it is too large Load Diff

@ -6,6 +6,7 @@ import pwndbg.gdblib.arch
import pwndbg.gdblib.memory
import pwndbg.gdblib.typeinfo
import pwndbg.glibc
from pwndbg.gdblib.ctypes import Structure
def request2size(req):
@ -14,7 +15,7 @@ def request2size(req):
return (req + SIZE_SZ + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK
def fastbin_index(size):
def fastbin_index(size: int) -> int:
if pwndbg.gdblib.arch.ptrsize == 8:
return (size >> 4) - 2
else:
@ -44,6 +45,12 @@ else:
PTR = ctypes.c_uint64 # type: ignore[misc]
SIZE_T = ctypes.c_uint64 # type: ignore[misc]
DEFAULT_TOP_PAD = 131072
DEFAULT_MMAP_MAX = 65536
DEFAULT_MMAP_THRESHOLD = 128 * 1024
DEFAULT_TRIM_THRESHOLD = 128 * 1024
TCACHE_FILL_COUNT = 7
class c_pvoid(PTR):
"""
@ -201,7 +208,7 @@ class CStruct2GDB:
return tuple((field[0], getattr(self, field[0])) for field in self._c_struct._fields_)
class c_malloc_state_2_26(ctypes.LittleEndianStructure):
class c_malloc_state_2_26(Structure):
"""
This class represents malloc_state struct for GLIBC < 2.27 as a ctypes struct.
@ -264,7 +271,7 @@ class c_malloc_state_2_26(ctypes.LittleEndianStructure):
]
class c_malloc_state_2_27(ctypes.LittleEndianStructure):
class c_malloc_state_2_27(Structure):
"""
This class represents malloc_state struct for GLIBC >= 2.27 as a ctypes struct.
@ -345,7 +352,7 @@ class MallocState(CStruct2GDB):
sizeof = ctypes.sizeof(_c_struct)
class c_heap_info(ctypes.LittleEndianStructure):
class c_heap_info(Structure):
"""
This class represents heap_info struct as a ctypes struct.
@ -383,7 +390,7 @@ class HeapInfo(CStruct2GDB):
sizeof = ctypes.sizeof(_c_struct)
class c_malloc_chunk(ctypes.LittleEndianStructure):
class c_malloc_chunk(Structure):
"""
This class represents malloc_chunk struct as a ctypes struct.
@ -422,7 +429,7 @@ class MallocChunk(CStruct2GDB):
sizeof = ctypes.sizeof(_c_struct)
class c_tcache_perthread_struct_2_29(ctypes.LittleEndianStructure):
class c_tcache_perthread_struct_2_29(Structure):
"""
This class represents tcache_perthread_struct for GLIBC < 2.30 as a ctypes struct.
@ -441,7 +448,7 @@ class c_tcache_perthread_struct_2_29(ctypes.LittleEndianStructure):
]
class c_tcache_perthread_struct_2_30(ctypes.LittleEndianStructure):
class c_tcache_perthread_struct_2_30(Structure):
"""
This class represents the tcache_perthread_struct for GLIBC >= 2.30 as a ctypes struct.
@ -472,7 +479,7 @@ class TcachePerthreadStruct(CStruct2GDB):
sizeof = ctypes.sizeof(_c_struct)
class c_tcache_entry_2_28(ctypes.LittleEndianStructure):
class c_tcache_entry_2_28(Structure):
"""
This class represents the tcache_entry struct for GLIBC < 2.29 as a ctypes struct.
@ -487,7 +494,7 @@ class c_tcache_entry_2_28(ctypes.LittleEndianStructure):
_fields_ = [("next", c_pvoid)]
class c_tcache_entry_2_29(ctypes.LittleEndianStructure):
class c_tcache_entry_2_29(Structure):
"""
This class represents the tcache_entry struct for GLIBC >= 2.29 as a ctypes struct.
@ -516,7 +523,7 @@ class TcacheEntry(CStruct2GDB):
sizeof = ctypes.sizeof(_c_struct)
class c_malloc_par_2_23(ctypes.LittleEndianStructure):
class c_malloc_par_2_23(Structure):
"""
This class represents the malloc_par struct for GLIBC < 2.24 as a ctypes struct.
@ -569,7 +576,7 @@ class c_malloc_par_2_23(ctypes.LittleEndianStructure):
]
class c_malloc_par_2_24(ctypes.LittleEndianStructure):
class c_malloc_par_2_24(Structure):
"""
This class represents the malloc_par struct for GLIBC >= 2.24 as a ctypes struct.
@ -619,7 +626,7 @@ class c_malloc_par_2_24(ctypes.LittleEndianStructure):
]
class c_malloc_par_2_26(ctypes.LittleEndianStructure):
class c_malloc_par_2_26(Structure):
"""
This class represents the malloc_par struct for GLIBC >= 2.26 as a ctypes struct.
@ -683,7 +690,7 @@ class c_malloc_par_2_26(ctypes.LittleEndianStructure):
]
class c_malloc_par_2_35(ctypes.LittleEndianStructure):
class c_malloc_par_2_35(Structure):
"""
This class represents the malloc_par struct for GLIBC >= 2.35 as a ctypes struct.
@ -773,3 +780,33 @@ class MallocPar(CStruct2GDB):
else:
_c_struct = c_malloc_par_2_23
sizeof = ctypes.sizeof(_c_struct)
# https://github.com/bminor/glibc/blob/glibc-2.37/malloc/malloc.c#L1911-L1926
# static struct malloc_par mp_ =
# {
# .top_pad = DEFAULT_TOP_PAD,
# .n_mmaps_max = DEFAULT_MMAP_MAX,
# .mmap_threshold = DEFAULT_MMAP_THRESHOLD,
# .trim_threshold = DEFAULT_TRIM_THRESHOLD,
# #define NARENAS_FROM_NCORES(n) ((n) * (sizeof (long) == 4 ? 2 : 8))
# .arena_test = NARENAS_FROM_NCORES (1)
# #if USE_TCACHE
# ,
# .tcache_count = TCACHE_FILL_COUNT,
# .tcache_bins = TCACHE_MAX_BINS,
# .tcache_max_bytes = tidx2usize (TCACHE_MAX_BINS-1),
# .tcache_unsorted_limit = 0 /* No limit. */
# #endif
# };
DEFAULT_MP_ = MallocPar._c_struct()
DEFAULT_MP_.top_pad = DEFAULT_TOP_PAD
DEFAULT_MP_.n_mmaps_max = DEFAULT_MMAP_MAX
DEFAULT_MP_.mmap_threshold = DEFAULT_MMAP_THRESHOLD
DEFAULT_MP_.trim_threshold = DEFAULT_TRIM_THRESHOLD
DEFAULT_MP_.arena_test = 2 if pwndbg.gdblib.arch.ptrsize == 4 else 8
if MallocPar._c_struct != c_malloc_par_2_23:
# the only difference between 2.23 and the rest is the lack of tcache
DEFAULT_MP_.tcache_count = TCACHE_FILL_COUNT
DEFAULT_MP_.tcache_bins = TCACHE_MAX_BINS
DEFAULT_MP_.tcache_max_bytes = (TCACHE_MAX_BINS - 1) * MALLOC_ALIGN + MINSIZE - SIZE_SZ

@ -93,6 +93,20 @@ heap_malloc_chunk.out: heap_malloc_chunk.c
@echo "[+] Building heap_malloc_chunk.out"
${CC} -g -O0 -Wno-nonnull -Wno-unused-result -o heap_malloc_chunk.out heap_malloc_chunk.c -pthread -lpthread
tls.x86-64.out: tls.x86-64.c
@echo "[+] Building tls.x86-64.c"
${ZIGCC} \
${CFLAGS} \
-target x86_64-linux-gnu \
-o tls.x86-64.out tls.x86-64.c
tls.i386.out: tls.i386.c
@echo "[+] Building tls.i386.c"
${ZIGCC} \
${CFLAGS} \
-target i386-linux-gnu \
-o tls.i386.out tls.i386.c
clean :
@echo "[+] Cleaning stuff"
@rm -f $(COMPILED) $(LINKED) $(COMPILED_ASM) $(LINKED_ASM) $(COMPILED_GO)

@ -0,0 +1,10 @@
void *tls_address;
void break_here(void) {}
int main(){
// TODO: This only works for i386, we should support arm/aarch64 in the future
asm("movl %%gs:0, %0" : "=r" (tls_address));
break_here();
return 0;
}

@ -0,0 +1,10 @@
void *tls_address;
void break_here(void) {}
int main(){
// TODO: This only works for x86-64, we should support arm/aarch64 in the future
asm("movq %%fs:0, %0" : "=r" (tls_address));
break_here();
return 0;
}

@ -2,16 +2,12 @@ import gdb
import pytest
import pwndbg
import pwndbg.gdblib.arch
import pwndbg.gdblib.memory
import pwndbg.gdblib.symbol
import pwndbg.gdblib.typeinfo
import pwndbg.gdblib.vmmap
import pwndbg.glibc
import pwndbg.heap
import tests
from pwndbg.heap.ptmalloc import SymbolUnresolvableError
from pwndbg.lib.memory import Page
HEAP_MALLOC_CHUNK = tests.binaries.get("heap_malloc_chunk.out")
@ -177,9 +173,7 @@ def test_malloc_chunk_command_heuristic(start_binary):
class mock_for_heuristic:
def __init__(
self, mock_symbols=[], mock_all=False, mess_up_memory=False, test_memory_parsing=False
):
def __init__(self, mock_symbols=[], mock_all=False):
self.mock_symbols = (
mock_symbols # every symbol's address in the list will be mocked to `None`
)
@ -189,25 +183,6 @@ class mock_for_heuristic:
self.saved_static_linkage_symbol_address_func = (
pwndbg.gdblib.symbol.static_linkage_symbol_address
)
# We mess up the memory in the page of the symbols, to make sure that the heuristic will not succeed by parsing the memory
self.mess_up_memory = mess_up_memory
# Some addresses can be found by `pwndbg.gdblib.vmmap.find()`, but it is not a valid memory address to access (e.g. the address in [vsyscall])
# This option is to make sure that the heuristic will not affect by this
self.test_memory_parsing = test_memory_parsing
if mess_up_memory:
# Save all the memory before we mess it up
self.page = pwndbg.heap.current.possible_page_of_symbols
self.saved_memory = pwndbg.gdblib.memory.read(self.page.vaddr, self.page.memsz)
if test_memory_parsing:
def fake_vmmap_find(addr):
# The heuristics should work without vmmap working
return Page(0, 0xFFFFFFFFFFFFFFFF, 4, 0, "[deadbeaf]")
fake_vmmap_find.original = pwndbg.gdblib.vmmap.find
pwndbg.gdblib.vmmap.find = fake_vmmap_find
def __enter__(self):
def mock(original):
@ -226,11 +201,6 @@ class mock_for_heuristic:
pwndbg.gdblib.symbol.static_linkage_symbol_address = mock(
pwndbg.gdblib.symbol.static_linkage_symbol_address
)
if self.mess_up_memory:
# Fill the page with `0xff`
pwndbg.gdblib.memory.write(self.page.vaddr, b"\xff" * self.page.memsz)
if self.test_memory_parsing:
pwndbg.gdblib.vmmap.find = pwndbg.gdblib.vmmap.find.original
def __exit__(self, exc_type, exc_value, traceback):
# Restore `pwndbg.gdblib.symbol.address` and `pwndbg.gdblib.symbol.static_linkage_symbol_address`
@ -238,9 +208,6 @@ class mock_for_heuristic:
pwndbg.gdblib.symbol.static_linkage_symbol_address = (
self.saved_static_linkage_symbol_address_func
)
if self.mess_up_memory:
# Restore the memory
pwndbg.gdblib.memory.write(self.page.vaddr, self.saved_memory)
def test_main_arena_heuristic(start_binary):
@ -254,7 +221,7 @@ def test_main_arena_heuristic(start_binary):
"main_arena"
) or pwndbg.gdblib.symbol.address("main_arena")
# Level 1: We check we can get the address of `main_arena` from debug symbols and the struct of `main_arena` is correct
# Check if we can get the address of `main_arena` from debug symbols and the struct of `main_arena` is correct
assert pwndbg.heap.current.main_arena is not None
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
@ -265,30 +232,11 @@ def test_main_arena_heuristic(start_binary):
)
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2.1: We check we can get the address of `main_arena` by parsing the assembly code of `malloc_trim`
with mock_for_heuristic(["main_arena"], mess_up_memory=True):
assert pwndbg.heap.current.main_arena is not None
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2.2: No `__malloc_hook` this time, because it's possible to find `main_arena` by some magic about it
with mock_for_heuristic(["main_arena", "__malloc_hook"], mess_up_memory=True):
# Check if we can get the address of `main_arena` by parsing the .data section of the ELF of libc
with mock_for_heuristic(["main_arena"]):
assert pwndbg.heap.current.main_arena is not None
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 3: We check we can get the address of `main_arena` by parsing the memory
with mock_for_heuristic(mock_all=True, test_memory_parsing=True):
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Check if it works when there's more than one arena
gdb.execute("continue")
assert gdb.selected_thread().num == 2
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
def test_mp_heuristic(start_binary):
@ -302,7 +250,7 @@ def test_mp_heuristic(start_binary):
"mp_"
) or pwndbg.gdblib.symbol.address("mp_")
# Level 1: We check we can get the address of `mp_` from debug symbols and the struct of `mp_` is correct
# Check if we can get the address of `mp_` from debug symbols and the struct of `mp_` is correct
assert pwndbg.heap.current.mp is not None
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.mp.address == mp_addr_via_debug_symbol
@ -313,45 +261,11 @@ def test_mp_heuristic(start_binary):
)
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2: We check we can get the address of `mp_` by parsing the assembly code of `__libc_free`
with mock_for_heuristic(["mp_"], mess_up_memory=True):
# Check if we can get the address of `mp_` by parsing the .data section of the ELF of libc
with mock_for_heuristic(["mp_"]):
assert pwndbg.heap.current.mp is not None
# Check the address of `mp_` is correct
assert pwndbg.heap.current.mp.address == mp_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 3: We check we can get the address of `mp_` by parsing the memory
with mock_for_heuristic(mock_all=True, test_memory_parsing=True):
# Check the address of `mp_` is correct
assert pwndbg.heap.current.mp.address == mp_addr_via_debug_symbol
def test_global_max_fast_heuristic(start_binary):
# TODO: Support other architectures or different libc versions
start_binary(HEAP_MALLOC_CHUNK)
gdb.execute("set resolve-heap-via-heuristic force")
gdb.execute("break break_here")
gdb.execute("continue")
# Use the debug symbol to find the address of `global_max_fast`
global_max_fast_addr_via_debug_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address(
"global_max_fast"
) or pwndbg.gdblib.symbol.address("global_max_fast")
assert global_max_fast_addr_via_debug_symbol is not None
# Level 1: We check we can get the address of `global_max_fast` from debug symbols and the value of `global_max_fast` is correct
assert pwndbg.heap.current.global_max_fast is not None
# Check the address of `global_max_fast` is correct
assert pwndbg.heap.current._global_max_fast_addr == global_max_fast_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2: We check we can get the address of `global_max_fast` by parsing the assembly code of `__libc_free`
# Mock the address of `global_max_fast` to None
with mock_for_heuristic(["global_max_fast"]):
# Use heuristic to find `global_max_fast`
assert pwndbg.heap.current.global_max_fast is not None
# Check the address of `global_max_fast` is correct
assert pwndbg.heap.current._global_max_fast_addr == global_max_fast_addr_via_debug_symbol
@pytest.mark.parametrize(
@ -373,7 +287,7 @@ def test_thread_cache_heuristic(start_binary, is_multi_threaded):
) or pwndbg.gdblib.symbol.address("tcache")
thread_cache_addr_via_debug_symbol = pwndbg.gdblib.memory.u(tcache_addr_via_debug_symbol)
# Level 1: We check we can get the address of `thread_cache` from debug symbols and the struct of `thread_cache` is correct
# Check if we can get the address of `thread_cache` from debug symbols and the struct of `thread_cache` is correct
assert pwndbg.heap.current.thread_cache is not None
# Check the address of `thread_cache` is correct
assert pwndbg.heap.current.thread_cache.address == thread_cache_addr_via_debug_symbol
@ -384,14 +298,15 @@ def test_thread_cache_heuristic(start_binary, is_multi_threaded):
)
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2: We check we can get the address of `thread_cache` by parsing the assembly code of `__libc_malloc`
# TODO: Find a good way to check we scuessfully get the address of `thread_cache` by parsing the assembly code instead of using the first chunk of `thread_cache`
# Note: This only useful when we can NOT find the heap boundaries and the the arena is been shared, it should not be a big problem in most of the cases
# Level 3: We check we can get the address of `thread_cache` by using the first chunk
# Note: This will NOT work when can NOT find the heap boundaries or the the arena is been shared
with mock_for_heuristic(["tcache", "__libc_malloc"]):
# Check the address of `thread_cache` is correct
# Check if we can get the address of `tcache` by using the first chunk or by brute force
with mock_for_heuristic(["tcache"]):
# Check if we can find tcache by brute force
pwndbg.heap.current.prompt_for_brute_force_thread_cache_permission = lambda: True
assert pwndbg.heap.current.thread_cache.address == thread_cache_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Check if we can find tcache by using the first chunk
# # Note: This will NOT work when can NOT find the heap boundaries or the the arena is been shared
pwndbg.heap.current.prompt_for_brute_force_thread_cache_permission = lambda: False
assert pwndbg.heap.current.thread_cache.address == thread_cache_addr_via_debug_symbol
@ -416,20 +331,47 @@ def test_thread_arena_heuristic(start_binary, is_multi_threaded):
thread_arena_via_debug_symbol = pwndbg.gdblib.memory.u(thread_arena_via_debug_symbol)
assert thread_arena_via_debug_symbol > 0
# Level 1: We check we can get the address of `thread_arena` from debug symbols and the value of `thread_arena` is correct
# Check if we can get the address of `thread_arena` from debug symbols and the value of `thread_arena` is correct
assert pwndbg.heap.current.thread_arena is not None
# Check the address of `thread_arena` is correct
assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 2: We check we can get the address of `thread_arena` by parsing the assembly code of `__libc_calloc`
# Mock the address of `thread_arena` to None
# Check if we can use brute-force to find the `thread_arena` when multi-threaded, and if we can use the `main_arena` as the `thread_arena` when single-threaded
with mock_for_heuristic(["thread_arena"]):
assert pwndbg.gdblib.symbol.address("thread_arena") is None
# mock the prompt to avoid input
pwndbg.heap.current.prompt_for_brute_force_thread_arena_permission = lambda: True
assert pwndbg.heap.current.thread_arena is not None
# Check the value of `thread_arena` is correct
assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol
def test_global_max_fast_heuristic(start_binary):
# TODO: Support other architectures or different libc versions
start_binary(HEAP_MALLOC_CHUNK)
gdb.execute("set resolve-heap-via-heuristic force")
gdb.execute("break break_here")
gdb.execute("continue")
# Use the debug symbol to find the address of `global_max_fast`
global_max_fast_addr_via_debug_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address(
"global_max_fast"
) or pwndbg.gdblib.symbol.address("global_max_fast")
assert global_max_fast_addr_via_debug_symbol is not None
# Check if we can get the address of `global_max_fast` from debug symbols and the value of `global_max_fast` is correct
assert pwndbg.heap.current.global_max_fast is not None
# Check the address of `global_max_fast` is correct
assert pwndbg.heap.current._global_max_fast_addr == global_max_fast_addr_via_debug_symbol
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Check if we can return the default value even if we can NOT find the address of `global_max_fast`
with mock_for_heuristic(["global_max_fast"]):
assert pwndbg.heap.current.global_max_fast == pwndbg.gdblib.memory.u(
global_max_fast_addr_via_debug_symbol
)
@pytest.mark.parametrize(
"is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"]
)
@ -446,16 +388,15 @@ def test_heuristic_fail_gracefully(start_binary, is_multi_threaded):
def _test_heuristic_fail_gracefully(name):
try:
getattr(pwndbg.heap.current, name)
raise AssertionError(
"The heuristic for pwndbg.heap.current.%s should fail with SymbolUnresolvableError"
% name
)
except SymbolUnresolvableError as e:
# That's the only exception we expect
assert e.symbol # we should show what symbol we failed to resolve
# Mock all address and mess up the memory
with mock_for_heuristic(mock_all=True, mess_up_memory=True):
with mock_for_heuristic(mock_all=True):
# mock the prompt to avoid input
pwndbg.heap.current.prompt_for_brute_force_thread_arena_permission = lambda: False
pwndbg.heap.current.prompt_for_brute_force_thread_cache_permission = lambda: False
_test_heuristic_fail_gracefully("main_arena")
_test_heuristic_fail_gracefully("mp")
_test_heuristic_fail_gracefully("global_max_fast")

@ -0,0 +1,40 @@
import gdb
import pytest
import pwndbg.gdblib.tls
import pwndbg.gdblib.vmmap
import tests
TLS_X86_64_BINARY = tests.binaries.get("tls.x86-64.out")
TLS_I386_BINARY = tests.binaries.get("tls.i386.out")
# TODO: Support other architectures
@pytest.mark.parametrize("binary", [TLS_X86_64_BINARY, TLS_I386_BINARY], ids=["x86-64", "i386"])
def test_tls_address_and_command(start_binary, binary):
try:
start_binary(binary)
except gdb.error:
pytest.skip("This device does not support this test")
gdb.execute("break break_here")
gdb.execute("continue")
expected_tls_address = int(gdb.parse_and_eval("(void *)tls_address"))
assert pwndbg.gdblib.tls.find_address_with_register() == expected_tls_address
assert pwndbg.gdblib.tls.find_address_with_pthread_self() == expected_tls_address
assert (
gdb.execute("tls", to_string=True)
== f"""Thread Local Storage (TLS) base: {expected_tls_address:#x}
TLS is located at:
{pwndbg.gdblib.vmmap.find(expected_tls_address)}\n"""
)
assert (
gdb.execute("tls --pthread-self", to_string=True)
== f"""Thread Local Storage (TLS) base: {expected_tls_address:#x}
TLS is located at:
{pwndbg.gdblib.vmmap.find(expected_tls_address)}\n"""
)
Loading…
Cancel
Save