Improve unsorted,small,large bins corruption check (#2289)

* Improve bin corruption checks

* Update pwndbg/heap/ptmalloc.py

* factor out and clean up bin corruption check

* check chunks even if bin is longer than limit

* add empty bin check

* lint.sh, remove testing if

* dont modify chain in check, allow corruption=0, cleanup

* typing, more reliable empty bin check

* cast params to int, otherwise not detected properly

* add regression test for corruption check

* lint.sh

---------

Co-authored-by: Gulshan Singh <gsingh2011@gmail.com>
Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
pull/2315/head
k4lizen 1 year ago committed by GitHub
parent 77fcf7a9e5
commit 0dfcf7c0fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -52,6 +52,12 @@ heap_chain_limit = add_heap_param(
"heap-dereference-limit", 8, "number of chunks to dereference in each bin"
)
heap_corruption_check_limit = add_heap_param(
"heap-corruption-check-limit",
64,
"amount of chunks to traverse (forwards and backwards) for the bin corruption check",
)
resolve_heap_via_heuristic = add_heap_param(
"resolve-heap-via-heuristic",
"auto",

@ -1254,6 +1254,67 @@ class GlibcMemoryAllocator(pwndbg.gdblib.heap.heap.MemoryAllocator, Generic[TheT
result.bins[size] = Bin(chain, count=count)
return result
def check_chain_corrupted(self, chain_fd: List[int], chain_bk: List[int]) -> bool:
"""
Checks if the doubly linked list (of a {unsorted, small, large} bin)
defined by chain_fd, chain_bk is corrupted.
Even if the chains do not cover the whole bin, they still are expected
to be of the same length.
Returns True if the bin is certainly corrupted, otherwise False.
"""
if len(chain_fd) != len(chain_bk):
# If the chain lengths aren't equal, the chain is corrupted
# The vast majority of corruptions will be caught here
return True
elif len(chain_fd) < 2 or len(chain_bk) < 2:
# Chains containing less than two entries are corrupted, as the smallest
# chain (an empty bin) would look something like `[main_arena+88, 0]`.
return True
elif len(chain_fd) == len(chain_bk) == 2:
# Check if the bin points to itself (is empty)
if chain_fd != chain_bk:
return True
elif chain_fd[-1] != 0:
return True
else:
bin_chk = Chunk(chain_fd[0])
if not (bin_chk.fd == bin_chk.bk == chain_fd[0]):
return True
else:
chain_sz = len(chain_fd) - (1 if chain_fd[-1] == 0 else 0)
# Forward and backward chains may have some overlap, we don't need to recheck those chunks
checked = set()
# Check connections in all chunks from the forward chain
for i in range(chain_sz):
chunk_addr = chain_fd[i]
chunk = Chunk(chunk_addr)
if chunk.fd is None or Chunk(chunk.fd).bk != chunk_addr:
return True
if chunk.bk is None or Chunk(chunk.bk).fd != chunk_addr:
return True
checked.add(chunk_addr)
# Check connections in unchecked chunks from the backward chain
for i in range(chain_sz):
chunk_addr = chain_bk[i]
if chunk_addr in checked:
# We don't need to check any more chunks
break
chunk = Chunk(chunk_addr)
if chunk.fd is None or Chunk(chunk.fd).bk != chunk_addr:
return True
if chunk.bk is None or Chunk(chunk.bk).fd != chunk_addr:
return True
return False
def bin_at(
self, index: int, arena_addr: int | None = None
) -> Tuple[List[int], List[int], bool] | None:
@ -1284,29 +1345,36 @@ class GlibcMemoryAllocator(pwndbg.gdblib.heap.heap.MemoryAllocator, Generic[TheT
bins_base = int(normal_bins.address) - (pwndbg.gdblib.arch.ptrsize * 2)
current_base = bins_base + (index * pwndbg.gdblib.arch.ptrsize * 2)
# check whether the bin is empty
bin_chunk = Chunk(current_base)
if bin_chunk.fd == bin_chunk.bk == current_base:
return ([0], [0], False)
front, back = normal_bins[index * 2], normal_bins[index * 2 + 1]
fd_offset = self.chunk_key_offset("fd")
bk_offset = self.chunk_key_offset("bk")
is_chain_corrupted = False
chain_size = int(pwndbg.gdblib.heap.heap_chain_limit)
corrupt_chain_size = int(pwndbg.gdblib.heap.heap_corruption_check_limit)
get_chain = lambda bin, offset: pwndbg.chain.get(
int(bin),
offset=offset,
hard_stop=current_base,
limit=pwndbg.gdblib.heap.heap_chain_limit,
limit=max(chain_size, corrupt_chain_size),
include_start=True,
)
chain_fd = get_chain(front, fd_offset)
chain_bk = get_chain(back, bk_offset)
# check if bin[index] points to itself (is empty)
if len(chain_fd) == len(chain_bk) == 2 and chain_fd[0] == chain_bk[0]:
chain_fd = [0]
chain_bk = [0]
full_chain_fd = get_chain(front, fd_offset)
full_chain_bk = get_chain(back, bk_offset)
chain_fd = full_chain_fd[: (chain_size + 1)]
chain_bk = full_chain_bk[: (chain_size + 1)]
corrupt_chain_fd = full_chain_fd[: (corrupt_chain_size + 1)]
corrupt_chain_bk = full_chain_bk[: (corrupt_chain_size + 1)]
# check if corrupted
elif chain_fd[:-1] != chain_bk[:-2][::-1] + [chain_bk[-2]]:
is_chain_corrupted = True
is_chain_corrupted = False
if corrupt_chain_size > 0:
is_chain_corrupted = self.check_chain_corrupted(corrupt_chain_fd, corrupt_chain_bk)
return (chain_fd, chain_bk, is_chain_corrupted)

@ -486,3 +486,27 @@ def test_smallbins_sizes_32bit_big(start_binary):
for bin_index, bin_size in enumerate(command_output):
assert bin_size.split(":")[0] == expected[bin_index]
def test_heap_corruption_low_dereference(start_binary):
"""
Tests that the bins corruption check doesn't report
corrupted bins when heap-dereference-limit is less
than the number of chunks in a bin.
"""
start_binary(BINARY)
gdb.execute("set context-output /dev/null")
gdb.execute("b breakpoint", to_string=True)
gdb.execute("continue")
gdb.execute("continue")
gdb.execute("continue")
gdb.execute("continue")
# unsorted bin now has 3 chunks
gdb.execute("set heap-dereference-limit 1")
bins_output = gdb.execute("bins", to_string=True)
assert "corrupted" not in bins_output

Loading…
Cancel
Save