diff --git a/DEVELOPING.md b/DEVELOPING.md index ca17ba88d..224ab54ec 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -108,3 +108,4 @@ Feel free to update the list below! * We would like to add proper tests for pwndbg - see tests framework PR if you want to help on that. +* If you want to use `gdb.parse_and_eval("a_function_name()")` or something similar that call a function, please remember this might cause another thread to continue execution without `set scheduler-locking on`. If you didn't expect that, you should use `parse_and_eval_with_scheduler_lock` from `pwndbg.gdblib.scheduler` instead. diff --git a/pwndbg/commands/misc.py b/pwndbg/commands/misc.py index 764af9cbc..f9aa51b60 100644 --- a/pwndbg/commands/misc.py +++ b/pwndbg/commands/misc.py @@ -10,6 +10,7 @@ import pwndbg.commands import pwndbg.gdblib.regs import pwndbg.gdblib.symbol from pwndbg.commands import CommandCategory +from pwndbg.gdblib.scheduler import parse_and_eval_with_scheduler_lock errno.errorcode[0] = "OK" # type: ignore # manually add error code 0 for "OK" @@ -44,7 +45,11 @@ def errno_(err) -> None: if errno_loc_gotplt is None or pwndbg.gdblib.vmmap.find( pwndbg.gdblib.memory.pvoid(errno_loc_gotplt) ): - err = int(gdb.parse_and_eval("*((int *(*) (void)) __errno_location) ()")) + err = int( + parse_and_eval_with_scheduler_lock( + "*((int *(*) (void)) __errno_location) ()" + ) + ) else: print( "Could not determine error code automatically: the __errno_location@got.plt has no valid address yet (perhaps libc.so hasn't been loaded yet?)" diff --git a/pwndbg/gdblib/hooks.py b/pwndbg/gdblib/hooks.py index 1c4ac4073..ab62cee0e 100644 --- a/pwndbg/gdblib/hooks.py +++ b/pwndbg/gdblib/hooks.py @@ -45,7 +45,6 @@ def on_start() -> None: @pwndbg.gdblib.events.exit def on_exit() -> None: - pwndbg.gdblib.tls.reset() pwndbg.gdblib.file.reset_remote_files() pwndbg.gdblib.next.clear_temp_breaks() diff --git a/pwndbg/gdblib/scheduler.py b/pwndbg/gdblib/scheduler.py new file mode 100644 index 000000000..eff46be9e --- /dev/null +++ b/pwndbg/gdblib/scheduler.py @@ -0,0 +1,30 @@ +from contextlib import contextmanager + +import gdb + + +@contextmanager +def lock_scheduler(): + """ + This context manager can be used to run GDB commands with threads scheduling + being locked which means that other threads will be stopped during execution. + + This is useful to prevent bugs where e.g.: gdb.parse_and_eval("(int)foo()") + would execute foo() on the current debugee thread but would also unlock other + threads for being executed and those other threads may for example hit a + breakpoint we set previously which would be confusing for the user. + + See also: https://sourceware.org/gdb/onlinedocs/gdb/All_002dStop-Mode.html + """ + old_config = gdb.parameter("scheduler-locking") + if old_config != "on": + gdb.execute("set scheduler-locking on") + yield + gdb.execute("set scheduler-locking %s" % old_config) + else: + yield + + +def parse_and_eval_with_scheduler_lock(expr: str) -> gdb.Value: + with lock_scheduler(): + return gdb.parse_and_eval(expr) diff --git a/pwndbg/gdblib/tls.py b/pwndbg/gdblib/tls.py index cca877886..734d6efa2 100644 --- a/pwndbg/gdblib/tls.py +++ b/pwndbg/gdblib/tls.py @@ -12,76 +12,42 @@ import pwndbg.gdblib.memory import pwndbg.gdblib.regs import pwndbg.gdblib.symbol import pwndbg.gdblib.vmmap +from pwndbg.gdblib.scheduler import parse_and_eval_with_scheduler_lock class module(ModuleType): """Getting Thread Local Storage (TLS) information.""" - _errno_offset = None + def is_thread_local_variable_offset(self, offset: int) -> bool: + """Check if the offset to TLS is a valid offset for the heap heuristics.""" + if pwndbg.gdblib.arch.current in ("x86-64", "i386"): + is_valid = 0 < -offset < 0x250 + else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"): + is_valid = 0 < offset < 0x250 + # check alignment + return is_valid and offset % pwndbg.gdblib.arch.ptrsize == 0 - def get_tls_base_via_errno_location(self) -> int: - """Heuristically determine the base address of the TLS.""" - if pwndbg.gdblib.symbol.address( - "__errno_location" - ) is None or pwndbg.gdblib.arch.current not in ( - "x86-64", - "i386", - "arm", - ): - # Note: We doesn't implement this for aarch64 because its TPIDR_EL0 register seems always work - # If oneday we can't get TLS base via TPIDR_EL0, we should implement this for aarch64 - return 0 - already_lock = gdb.parameter("scheduler-locking") == "on" - old_config = gdb.parameter("scheduler-locking") - if not already_lock: - gdb.execute("set scheduler-locking on") - errno_addr = int(gdb.parse_and_eval("(int *)__errno_location()")) - if not already_lock: - gdb.execute("set scheduler-locking %s" % old_config) + def is_thread_local_variable(self, addr: int) -> bool: + """Check if the address is a valid thread local variable's address for the heap heuristics.""" + if not self.address: + # Since we can not get the TLS base address, we trust that the address is valid. + return True + return self.is_thread_local_variable_offset( + addr - self.address + ) and addr in pwndbg.gdblib.vmmap.find(self.address) - if not self._errno_offset: - __errno_location_instr = pwndbg.disasm.near( - pwndbg.gdblib.symbol.address("__errno_location"), 5, show_prev_insns=False - ) - if pwndbg.gdblib.arch.current == "x86-64": - for instr in __errno_location_instr: - # Find something like: mov rax, qword ptr [rip + disp] - if instr.mnemonic == "mov": - self._errno_offset = pwndbg.gdblib.memory.s64(instr.next + instr.disp) - break - elif pwndbg.gdblib.arch.current == "i386": - for instr in __errno_location_instr: - # Find something like: mov eax, dword ptr [eax + disp] - # (disp is a negative value) - if instr.mnemonic == "mov": - # base offset is from the first `add eax` after `call __x86.get_pc_thunk.bx` - base_offset_instr = next( - instr for instr in __errno_location_instr if instr.mnemonic == "add" - ) - base_offset = base_offset_instr.address + base_offset_instr.operands[1].int - self._errno_offset = pwndbg.gdblib.memory.s32(base_offset + instr.disp) - break - elif pwndbg.gdblib.arch.current == "arm": - ldr_instr = None - for instr in __errno_location_instr: - if not ldr_instr and instr.mnemonic == "ldr": - ldr_instr = instr - elif ldr_instr and instr.mnemonic == "add": - offset = ldr_instr.operands[1].mem.disp - offset = pwndbg.gdblib.memory.s32((ldr_instr.address + 4 & -4) + offset) - self._errno_offset = pwndbg.gdblib.memory.s32(instr.address + 4 + offset) - break - if not self._errno_offset: - raise OSError("Can not find tls base") - return errno_addr - self._errno_offset + def call_pthread_self(self) -> int: + """Get the address of TLS by calling pthread_self().""" + if pwndbg.gdblib.symbol.address("pthread_self") is None: + return 0 + try: + return int(parse_and_eval_with_scheduler_lock("(void *)pthread_self()")) + except gdb.error: + return 0 @property def address(self) -> int: """Get the base address of TLS.""" - if pwndbg.gdblib.arch.current not in ("x86-64", "i386", "aarch64", "arm"): - # Not supported yet - return 0 - tls_base = 0 if pwndbg.gdblib.arch.current == "x86-64": @@ -91,20 +57,12 @@ class module(ModuleType): elif pwndbg.gdblib.arch.current == "aarch64": tls_base = int(pwndbg.gdblib.regs.TPIDR_EL0) - # Sometimes, we need to get TLS base via errno location for the following reason: + # Sometimes, we need to get TLS base via pthread_self() for the following reason: # For x86-64, fsbase might be 0 if we are remotely debugging and the GDB version <= 8.X # For i386, gsbase might be 0 if we are remotely debugging - # For arm (32-bit), we doesn't have other choice + # For other archs, we can't get the TLS base address via register # Note: aarch64 seems doesn't have this issue - is_valid_tls_base = ( - pwndbg.gdblib.vmmap.find(tls_base) is not None - and tls_base % pwndbg.gdblib.arch.ptrsize == 0 - ) - return tls_base if is_valid_tls_base else self.get_tls_base_via_errno_location() - - def reset(self) -> None: - # We should reset the offset when we attach to a new process - self._errno_offset = None + return tls_base if tls_base else self.call_pthread_self() # To prevent garbage collection diff --git a/pwndbg/heap/ptmalloc.py b/pwndbg/heap/ptmalloc.py index 939d59d58..0866f399c 100644 --- a/pwndbg/heap/ptmalloc.py +++ b/pwndbg/heap/ptmalloc.py @@ -1136,7 +1136,7 @@ class DebugSymsHeap(GlibcMemoryAllocator): thread's tcache. """ if self.has_tcache(): - tcache = self.mp["sbrk_base"] + 0x10 + tcache = self.get_sbrk_heap_region().vaddr + 0x10 if self.multithreaded: tcache_addr = pwndbg.gdblib.memory.pvoid( pwndbg.gdblib.symbol.static_linkage_symbol_address("tcache") @@ -1440,27 +1440,12 @@ class HeuristicHeap(GlibcMemoryAllocator): if thread_arena_via_config > 0: return Arena(thread_arena_via_config) elif thread_arena_via_symbol: - if pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena"): - # If the symbol is static-linkage symbol, we trust it. - return Arena(pwndbg.gdblib.memory.u(thread_arena_via_symbol)) # Check &thread_arena is nearby TLS base or not to avoid false positive. - tls_base = pwndbg.gdblib.tls.address - if tls_base: - if pwndbg.gdblib.arch.current in ("x86-64", "i386"): - is_valid_address = 0 < tls_base - thread_arena_via_symbol < 0x250 - else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"): - is_valid_address = 0 < thread_arena_via_symbol - tls_base < 0x250 - - is_valid_address = ( - is_valid_address - and thread_arena_via_symbol in pwndbg.gdblib.vmmap.find(tls_base) - ) - - if is_valid_address: - thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol) - # Check &thread_arena is a valid address or not to avoid false positive. - if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr): - return Arena(thread_arena_struct_addr) + if pwndbg.gdblib.tls.is_thread_local_variable(thread_arena_via_symbol): + thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol) + # Check &thread_arena is a valid address or not to avoid false positive. + if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr): + return Arena(thread_arena_struct_addr) if not self._thread_arena_offset and pwndbg.gdblib.symbol.address("__libc_calloc"): # TODO/FIXME: This method should be updated if we find a better way to find the target assembly code @@ -1554,7 +1539,6 @@ class HeuristicHeap(GlibcMemoryAllocator): base_offset + offset ) break - elif pwndbg.gdblib.arch.current == "arm": # We need to find something near the first `mrc 15, ......` # The flow of assembly code will like: @@ -1584,7 +1568,9 @@ class HeuristicHeap(GlibcMemoryAllocator): ) break - if self._thread_arena_offset: + if self._thread_arena_offset and pwndbg.gdblib.tls.is_thread_local_variable_offset( + self._thread_arena_offset + ): tls_base = pwndbg.gdblib.tls.address if tls_base: thread_arena_struct_addr = tls_base + self._thread_arena_offset @@ -1600,6 +1586,9 @@ class HeuristicHeap(GlibcMemoryAllocator): """Locate a thread's tcache struct. We try to find its address in Thread Local Storage (TLS) first, and if that fails, we guess it's at the first chunk of the heap. """ + if not self.has_tcache(): + print(message.warn("This version of GLIBC was not compiled with tcache support.")) + return None thread_cache_via_config = int(str(pwndbg.gdblib.config.tcache), 0) thread_cache_via_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address( "tcache" @@ -1608,218 +1597,189 @@ class HeuristicHeap(GlibcMemoryAllocator): self._thread_cache = self.tcache_perthread_struct(thread_cache_via_config) return self._thread_cache elif thread_cache_via_symbol: - if pwndbg.gdblib.symbol.static_linkage_symbol_address("tcache"): - # If the symbol is static-linkage symbol, we trust it. - thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol) - self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) - return self._thread_cache # Check &tcache is nearby TLS base or not to avoid false positive. - tls_base = pwndbg.gdblib.tls.address - if tls_base: - if pwndbg.gdblib.arch.current in ("x86-64", "i386"): - is_valid_address = 0 < tls_base - thread_cache_via_symbol < 0x250 - else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"): - is_valid_address = 0 < thread_cache_via_symbol - tls_base < 0x250 - - is_valid_address = ( - is_valid_address - and thread_cache_via_symbol in pwndbg.gdblib.vmmap.find(tls_base) - ) - - if is_valid_address: - thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol) + if pwndbg.gdblib.tls.is_thread_local_variable(thread_cache_via_symbol): + thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol) + if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr): self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) return self._thread_cache - if self.has_tcache(): - # Each thread has a tcache struct, and the address of the tcache struct is stored in the TLS. + # If target is single-threaded, then the tcache struct is at the first chunk of the heap. + # We try to find the address by using mp_.srck_base + 0x10 first since it's more reliable than other methods. + if not self.multithreaded: + try: + thread_cache_struct_addr = self.get_sbrk_heap_region().vaddr + 0x10 + if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr): + self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) + return self._thread_cache + except SymbolUnresolvableError: + # mp_ is not available + pass - # Try to find tcache in TLS, so first we need to find the offset of tcache to TLS base - if not self._thread_cache_offset and pwndbg.gdblib.symbol.address("__libc_malloc"): - # TODO/FIXME: This method should be updated if we find a better way to find the target assembly code - __libc_malloc_instruction = pwndbg.disasm.near( - pwndbg.gdblib.symbol.address("__libc_malloc"), 100, show_prev_insns=False - )[10:] - # Try to find the reference to tcache in __libc_malloc, the target C code is like this: - # `if (tc_idx < mp_.tcache_bins && tcache && ......` - if pwndbg.gdblib.arch.current == "x86-64": - # Find the last `mov reg1, qword ptr [rip + disp]` before the first `mov reg2, fs:[reg1]` - # In other words, find the first __thread variable + # Each thread has a tcache struct, and the address of the tcache struct is stored in the TLS. + # Try to find tcache in TLS, so first we need to find the offset of tcache to TLS base + if not self._thread_cache_offset and pwndbg.gdblib.symbol.address("__libc_malloc"): + # TODO/FIXME: This method should be updated if we find a better way to find the target assembly code + __libc_malloc_instruction = pwndbg.disasm.near( + pwndbg.gdblib.symbol.address("__libc_malloc"), 100, show_prev_insns=False + )[10:] + # Try to find the reference to tcache in __libc_malloc, the target C code is like this: + # `if (tc_idx < mp_.tcache_bins && tcache && ......` + if pwndbg.gdblib.arch.current == "x86-64": + # Find the last `mov reg1, qword ptr [rip + disp]` before the first `mov reg2, fs:[reg1]` + # In other words, find the first __thread variable - get_offset_instruction = None + get_offset_instruction = None - for instr in __libc_malloc_instruction: - if ", qword ptr [rip +" in instr.op_str: - get_offset_instruction = instr - if ", qword ptr fs:[r" in instr.op_str: - break + for instr in __libc_malloc_instruction: + if ", qword ptr [rip +" in instr.op_str: + get_offset_instruction = instr + if ", qword ptr fs:[r" in instr.op_str: + break - if get_offset_instruction: - # rip + disp - self._thread_cache_offset = pwndbg.gdblib.memory.s64( - get_offset_instruction.next + get_offset_instruction.disp - ) - elif pwndbg.gdblib.arch.current == "i386" and self.possible_page_of_symbols: - # We still need to find the first __thread variable like we did for x86-64 But the assembly code - # of i386 is a little bit unstable sometimes(idk why), there are two versions of the code: - # 1. Find the last `mov reg1, dword ptr [reg0 + disp]` before the first `mov reg2, gs:[reg1]`(disp - # is a negative value) - # 2. Find the first `mov reg1, dword ptr [reg0 + disp]` after `mov reg3, - # [reg1 + reg2]` (disp is a negative value), and reg2 is from `mov reg2, gs:[0]` + if get_offset_instruction: + # rip + disp + self._thread_cache_offset = pwndbg.gdblib.memory.s64( + get_offset_instruction.next + get_offset_instruction.disp + ) + elif pwndbg.gdblib.arch.current == "i386" and self.possible_page_of_symbols: + # We still need to find the first __thread variable like we did for x86-64 But the assembly code + # of i386 is a little bit unstable sometimes(idk why), there are two versions of the code: + # 1. Find the last `mov reg1, dword ptr [reg0 + disp]` before the first `mov reg2, gs:[reg1]`(disp + # is a negative value) + # 2. Find the first `mov reg1, dword ptr [reg0 + disp]` after `mov reg3, + # [reg1 + reg2]` (disp is a negative value), and reg2 is from `mov reg2, gs:[0]` - get_offset_instruction = None - find_after = False + get_offset_instruction = None + find_after = False - for instr in __libc_malloc_instruction: - if ( - instr.disp < 0 - and instr.mnemonic == "mov" - and ", dword ptr [e" in instr.op_str - ): - get_offset_instruction = instr - if find_after: - break - if ", dword ptr gs:[e" in instr.op_str: + for instr in __libc_malloc_instruction: + if ( + instr.disp < 0 + and instr.mnemonic == "mov" + and ", dword ptr [e" in instr.op_str + ): + get_offset_instruction = instr + if find_after: break - elif instr.op_str.endswith("gs:[0]") and instr.mnemonic == "mov": - find_after = True - - if get_offset_instruction: - # reg + disp (value of reg is the page start of the last libc page) - base_offset = self.possible_page_of_symbols.vaddr - self._thread_cache_offset = pwndbg.gdblib.memory.s32( - base_offset + get_offset_instruction.disp - ) - elif pwndbg.gdblib.arch.current == "aarch64": - # The logic is the same as the previous one.. - # The assembly code to access tcache is sth like: - # `mrs reg1, tpidr_el0; - # adrp reg2, #base_offset; - # ldr reg2, [reg2, #offset] - # ... - # add reg3, reg1, reg2; - # ldr reg3, [reg3, #8]` - # Or: - # `adrp reg2, #base_offset; - # mrs reg1, tpidr_el0; - # ldr reg2, [reg2, #offset] - # ... - # add reg3, reg1, reg2; - # ldr reg3, [reg3, #8]` - # , then reg3 will be &tcache - mrs_instr = next( - instr for instr in __libc_malloc_instruction if instr.mnemonic == "mrs" + if ", dword ptr gs:[e" in instr.op_str: + break + elif instr.op_str.endswith("gs:[0]") and instr.mnemonic == "mov": + find_after = True + + if get_offset_instruction: + # reg + disp (value of reg is the page start of the last libc page) + base_offset = self.possible_page_of_symbols.vaddr + self._thread_cache_offset = pwndbg.gdblib.memory.s32( + base_offset + get_offset_instruction.disp ) - min_adrp_distance = 0x1000 # just a big enough number - nearest_adrp = None - nearest_adrp_idx = 0 - for i, instr in enumerate(__libc_malloc_instruction): - if ( - instr.mnemonic == "adrp" - and abs(mrs_instr.address - instr.address) < min_adrp_distance - ): - reg = instr.operands[0].str - nearest_adrp = instr - nearest_adrp_idx = i - min_adrp_distance = abs(mrs_instr.address - instr.address) - if instr.address - mrs_instr.address > min_adrp_distance: - break - for instr in __libc_malloc_instruction[nearest_adrp_idx + 1 :]: - if instr.mnemonic == "ldr": - base_offset = nearest_adrp.operands[1].int - offset = instr.operands[1].mem.disp + elif pwndbg.gdblib.arch.current == "aarch64": + # The logic is the same as the previous one.. + # The assembly code to access tcache is sth like: + # `mrs reg1, tpidr_el0; + # adrp reg2, #base_offset; + # ldr reg2, [reg2, #offset] + # ... + # add reg3, reg1, reg2; + # ldr reg3, [reg3, #8]` + # Or: + # `adrp reg2, #base_offset; + # mrs reg1, tpidr_el0; + # ldr reg2, [reg2, #offset] + # ... + # add reg3, reg1, reg2; + # ldr reg3, [reg3, #8]` + # , then reg3 will be &tcache + mrs_instr = next( + instr for instr in __libc_malloc_instruction if instr.mnemonic == "mrs" + ) + min_adrp_distance = 0x1000 # just a big enough number + nearest_adrp = None + nearest_adrp_idx = 0 + for i, instr in enumerate(__libc_malloc_instruction): + if ( + instr.mnemonic == "adrp" + and abs(mrs_instr.address - instr.address) < min_adrp_distance + ): + reg = instr.operands[0].str + nearest_adrp = instr + nearest_adrp_idx = i + min_adrp_distance = abs(mrs_instr.address - instr.address) + if instr.address - mrs_instr.address > min_adrp_distance: + break + for instr in __libc_malloc_instruction[nearest_adrp_idx + 1 :]: + if instr.mnemonic == "ldr": + base_offset = nearest_adrp.operands[1].int + offset = instr.operands[1].mem.disp + self._thread_cache_offset = ( + pwndbg.gdblib.memory.s64(base_offset + offset) + 8 + ) + break + elif pwndbg.gdblib.arch.current == "arm": + # We need to find something near the first `mrc 15, ......` + # The flow of assembly code will like: + # `ldr reg1, [pc, #offset]; + # ... + # mrc 15, 0, reg2, cr13, cr0, {3}; + # ... + # add reg1, pc; + # ldr reg1, [reg1]; + # ... + # add reg1, reg2 + # ... + # ldr reg3, [reg1, #4]` + # , then reg3 will be tcache address + found_mrc = False + ldr_instr = None + for instr in __libc_malloc_instruction: + if not found_mrc: + if instr.mnemonic == "mrc": + found_mrc = True + elif instr.mnemonic == "ldr": + ldr_instr = instr + else: + reg = ldr_instr.operands[0].str + if instr.mnemonic == "add" and instr.op_str == reg + ", pc": + offset = ldr_instr.operands[1].mem.disp + offset = pwndbg.gdblib.memory.s32((ldr_instr.address + 4 & -4) + offset) self._thread_cache_offset = ( - pwndbg.gdblib.memory.s64(base_offset + offset) + 8 + pwndbg.gdblib.memory.s32(instr.address + 4 + offset) + 4 ) break - elif pwndbg.gdblib.arch.current == "arm": - # We need to find something near the first `mrc 15, ......` - # The flow of assembly code will like: - # `ldr reg1, [pc, #offset]; - # ... - # mrc 15, 0, reg2, cr13, cr0, {3}; - # ... - # add reg1, pc; - # ldr reg1, [reg1]; - # ... - # add reg1, reg2 - # ... - # ldr reg3, [reg1, #4]` - # , then reg3 will be tcache address - found_mrc = False - ldr_instr = None - for instr in __libc_malloc_instruction: - if not found_mrc: - if instr.mnemonic == "mrc": - found_mrc = True - elif instr.mnemonic == "ldr": - ldr_instr = instr - else: - reg = ldr_instr.operands[0].str - if instr.mnemonic == "add" and instr.op_str == reg + ", pc": - offset = ldr_instr.operands[1].mem.disp - offset = pwndbg.gdblib.memory.s32( - (ldr_instr.address + 4 & -4) + offset - ) - self._thread_cache_offset = ( - pwndbg.gdblib.memory.s32(instr.address + 4 + offset) + 4 - ) - break - # Validate the the offset we found - is_offset_valid = False - - if pwndbg.gdblib.arch.current in ("x86-64", "i386"): - # The offset to tls should be a negative integer for x86/x64, but it can't be too small - # If it is too small, we find a wrong value - is_offset_valid = ( - self._thread_cache_offset and -0x250 < self._thread_cache_offset < 0 - ) - elif pwndbg.gdblib.arch.current in ("aarch64", "arm"): - # The offset to tls should be a positive integer for aarch64, but it can't be too big - # If it is too big, we find a wrong value - is_offset_valid = ( - self._thread_cache_offset and 0 < self._thread_cache_offset < 0x250 + # If the offset is valid, we add the offset to TLS base to locate the tcache struct + # Note: We do a lot of checks here to make sure the offset and address we found is valid, + # so we can use our fallback if they're invalid + if self._thread_cache_offset and pwndbg.gdblib.tls.is_thread_local_variable_offset( + self._thread_cache_offset + ): + tls_base = pwndbg.gdblib.tls.address + if tls_base: + thread_cache_struct_addr = pwndbg.gdblib.memory.pvoid( + tls_base + self._thread_cache_offset ) + if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr): + self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) + return self._thread_cache - is_offset_valid = ( - is_offset_valid and self._thread_cache_offset % pwndbg.gdblib.arch.ptrsize == 0 - ) - - # If the offset is valid, we add the offset to TLS base to locate the tcache struct - # Note: We do a lot of checks here to make sure the offset and address we found is valid, - # so we can use our fallback if they're invalid - if is_offset_valid: - tls_base = pwndbg.gdblib.tls.address - if tls_base: - thread_cache_struct_addr = pwndbg.gdblib.memory.pvoid( - tls_base + self._thread_cache_offset - ) - if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr): - self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr) - return self._thread_cache - - # If we still can't find the tcache, we guess tcache is in the first chunk of the heap - # Note: The result might be wrong if the arena is being shared by multiple threads - # And that's why we need to find the tcache address in TLS first - arena = self.thread_arena - ptr_size = pwndbg.gdblib.arch.ptrsize - - cursor = arena.active_heap.start + # If we still can't find the tcache, we guess tcache is in the first chunk of the heap + # Note: The result might be wrong if the arena is being shared by multiple threads + # And that's why we need to find the tcache address in TLS first + arena = self.thread_arena + ptr_size = pwndbg.gdblib.arch.ptrsize - # i686 alignment heuristic - first_chunk_size = pwndbg.gdblib.arch.unpack( - pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size) - ) - if first_chunk_size == 0: - cursor += ptr_size * 2 + cursor = arena.active_heap.start - self._thread_cache = self.tcache_perthread_struct(cursor + ptr_size * 2) + # i686 alignment heuristic + first_chunk_size = pwndbg.gdblib.arch.unpack( + pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size) + ) + if first_chunk_size == 0: + cursor += ptr_size * 2 - return self._thread_cache + self._thread_cache = self.tcache_perthread_struct(cursor + ptr_size * 2) - print(message.warn("This version of GLIBC was not compiled with tcache support.")) - return None + return self._thread_cache @property def mp(self): diff --git a/tests/gdb-tests/tests/heap/test_heap.py b/tests/gdb-tests/tests/heap/test_heap.py index e777909dd..cb25da8cc 100644 --- a/tests/gdb-tests/tests/heap/test_heap.py +++ b/tests/gdb-tests/tests/heap/test_heap.py @@ -1,4 +1,5 @@ import gdb +import pytest import pwndbg import pwndbg.gdblib.arch @@ -260,9 +261,13 @@ def test_main_arena_heuristic(start_binary): pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg # Level 3: We check we can get the address of `main_arena` by parsing the memory - with mock_for_heuristic(mock_all=True): - # Check the address of `main_arena` is correct - assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol + for _ in range(2): + with mock_for_heuristic(mock_all=True): + # Check the address of `main_arena` is correct + assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol + # Check if it works when there's more than one arena + gdb.execute("continue") + assert gdb.selected_thread().num == 2 def test_mp_heuristic(start_binary): @@ -328,12 +333,18 @@ def test_global_max_fast_heuristic(start_binary): assert pwndbg.heap.current._global_max_fast_addr == global_max_fast_addr_via_debug_symbol -def test_thread_cache_heuristic(start_binary): +@pytest.mark.parametrize( + "is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"] +) +def test_thread_cache_heuristic(start_binary, is_multi_threaded): # TODO: Support other architectures or different libc versions start_binary(HEAP_MALLOC_CHUNK) gdb.execute("set resolve-heap-via-heuristic force") gdb.execute("break break_here") gdb.execute("continue") + if is_multi_threaded: + gdb.execute("continue") + assert gdb.selected_thread().num == 2 # Use the debug symbol to find the address of `thread_cache` tcache_addr_via_debug_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address( @@ -363,12 +374,18 @@ def test_thread_cache_heuristic(start_binary): assert pwndbg.heap.current.thread_cache.address == thread_cache_addr_via_debug_symbol -def test_thread_arena_heuristic(start_binary): +@pytest.mark.parametrize( + "is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"] +) +def test_thread_arena_heuristic(start_binary, is_multi_threaded): # TODO: Support other architectures or different libc versions start_binary(HEAP_MALLOC_CHUNK) gdb.execute("set resolve-heap-via-heuristic force") gdb.execute("break break_here") gdb.execute("continue") + if is_multi_threaded: + gdb.execute("continue") + assert gdb.selected_thread().num == 2 # Use the debug symbol to find the value of `thread_arena` thread_arena_via_debug_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address( @@ -392,12 +409,18 @@ def test_thread_arena_heuristic(start_binary): assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol -def test_heuristic_fail_gracefully(start_binary): +@pytest.mark.parametrize( + "is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"] +) +def test_heuristic_fail_gracefully(start_binary, is_multi_threaded): # TODO: Support other architectures or different libc versions start_binary(HEAP_MALLOC_CHUNK) gdb.execute("set resolve-heap-via-heuristic force") gdb.execute("break break_here") gdb.execute("continue") + if is_multi_threaded: + gdb.execute("continue") + assert gdb.selected_thread().num == 2 def _test_heuristic_fail_gracefully(name): try: