* start try_free_parser

* try_free_parser done, tests written

* add try_free to FEATURES.md

* rename current_heap to allocator
pull/747/head
Paweł Płatek 6 years ago committed by GitHub
parent 42815836cc
commit e3b910c5d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -155,7 +155,8 @@ Pwndbg enables introspection of the glibc allocator, ptmalloc2, via a handful of
![](caps/heap_heap2.png)
![](caps/heap_mallocchunk.png)
![](caps/heap_topchunk.png)
![](caps/fake_fast.png)
![](caps/heap_fake_fast.png)
![](caps/heap_try_free.png)
## IDA Pro Integration

Before

Width:  |  Height:  |  Size: 211 KiB

After

Width:  |  Height:  |  Size: 211 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

@ -0,0 +1,13 @@
## Command: try_free ##
```
usage: try_free [-h] [addr]
```
Check what would happen if free was called with given address
| Positional Argument | Info |
|---------------------|------|
| addr | Address passed to free |
| Optional Argument | Info |
|---------------------|------|
| --help | show this help message and exit |

@ -6,6 +6,7 @@ from __future__ import print_function
from __future__ import unicode_literals
import argparse
import ctypes
import struct
import gdb
@ -123,6 +124,11 @@ def heap(addr=None, verbose=False):
# in GLIBC versions >= 2.27.
cursor += (allocator.malloc_state.sizeof + ptr_size) & ~allocator.malloc_align_mask
# i686 alignment heuristic
first_chunk_size = pwndbg.arch.unpack(pwndbg.memory.read(cursor + ptr_size, ptr_size))
if first_chunk_size == 0:
cursor += ptr_size * 2
while cursor in heap_region:
old_cursor = cursor
size_field = pwndbg.memory.u(cursor + allocator.chunk_key_offset('size'))
@ -285,6 +291,9 @@ def malloc_chunk(addr,fake=False):
arena = allocator.get_heap(addr)['ar_ptr']
fastbins = [] if fake else allocator.fastbins(arena)
if not fastbins:
fastbins = []
header = M.get(addr)
if fake:
header += message.prompt(' FAKE')
@ -648,3 +657,326 @@ def bin_addrs(b, bins_type):
else: # normal bin
addrs, _, _ = b
return addrs
try_free_parser = argparse.ArgumentParser(description='Check what would happen if free was called with given address')
try_free_parser.add_argument('addr', nargs='?', help='Address passed to free')
@pwndbg.commands.ArgparsedCommand(try_free_parser)
@pwndbg.commands.OnlyWhenRunning
@pwndbg.commands.OnlyWhenHeapIsInitialized
def try_free(addr):
addr = int(addr)
# check hook
free_hook = pwndbg.symbol.address('__free_hook')
if free_hook is not None:
if pwndbg.memory.pvoid(free_hook) != 0:
message.success('__libc_free: will execute __free_hook')
# free(0) has no effect
if addr == 0:
message.success('__libc_free: addr is 0, nothing to do')
return
# constants
allocator = pwndbg.heap.current
arena = allocator.get_arena()
aligned_lsb = allocator.malloc_align_mask.bit_length()
size_sz = allocator.size_sz
malloc_alignment = allocator.malloc_alignment
malloc_align_mask = allocator.malloc_align_mask
chunk_minsize = allocator.minsize
ptr_size = pwndbg.arch.ptrsize
def unsigned_size(size):
# read_chunk()['size'] is signed in pwndbg ;/
# there may be better way to handle that
if ptr_size < 8:
return ctypes.c_uint32(size).value
x = ctypes.c_uint64(size).value
return x
def chunksize(chunk_size):
# maybe move this to ptmalloc.py
return chunk_size & (~7)
def finalize(errors_found, returned_before_error):
print('-'*10)
if returned_before_error:
print(message.success('Free should succeed!'))
elif errors_found > 0:
print(message.error('Errors found!'))
else:
print(message.success('All checks passed!'))
# mem2chunk
addr -= 2 * size_sz
# try to get the chunk
try:
chunk = read_chunk(addr)
except gdb.MemoryError as e:
print(message.error('Can\'t read chunk at address 0x{:x}, memory error'.format(addr)))
return
chunk_size = unsigned_size(chunk['size'])
chunk_size_unmasked = chunksize(chunk_size)
_, is_mmapped, _ = allocator.chunk_flags(chunk_size)
if is_mmapped:
print(message.notice('__libc_free: Doing munmap_chunk'))
return
errors_found = False
returned_before_error = False
# chunk doesn't overlap memory
print(message.notice('General checks'))
max_mem = (1 << (ptr_size*8)) - 1
if addr + chunk_size >= max_mem:
err = 'free(): invalid pointer -> &chunk + chunk->size > max memory\n'
err += ' 0x{:x} + 0x{:x} > 0x{:x}'
err = err.format(addr, chunk_size, max_mem)
print(message.error(err))
errors_found += 1
# chunk address is aligned
addr_tmp = addr
if malloc_alignment != 2 * size_sz:
addr_tmp = addr + 2 * size_sz
if addr_tmp & malloc_align_mask != 0:
err = 'free(): invalid pointer -> misaligned chunk\n'
err += ' LSB of 0x{:x} are 0b{}, should be 0b{}'
if addr_tmp != addr:
err += ' (0x{:x} was added to the address)'.format(2*size_sz)
err = err.format(addr_tmp, bin(addr_tmp)[-aligned_lsb:], '0'*aligned_lsb)
print(message.error(err))
errors_found += 1
# chunk's size is big enough
if chunk_size_unmasked < chunk_minsize:
err = 'free(): invalid size -> chunk\'s size smaller than MINSIZE\n'
err += ' size is 0x{:x}, MINSIZE is 0x{:x}'
err = err.format(chunk_size_unmasked, chunk_minsize)
print(message.error(err))
errors_found += 1
# chunk's size is aligned
if chunk_size_unmasked & malloc_align_mask != 0:
err = 'free(): invalid size -> chunk\'s size is not aligned\n'
err += ' LSB of size 0x{:x} are 0b{}, should be 0b{}'
err = err.format(chunk_size_unmasked, bin(chunk_size_unmasked)[-aligned_lsb:], '0'*aligned_lsb)
print(message.error(err))
errors_found += 1
# tcache
if allocator.has_tcache() and 'key' in allocator.tcache_entry.keys():
tc_idx = (chunk_size_unmasked - chunk_minsize + malloc_alignment - 1) // malloc_alignment
if tc_idx < allocator.mp['tcache_bins']:
print(message.notice('Tcache checks'))
e = addr + 2*size_sz
e += allocator.tcache_entry.keys().index('key') * ptr_size
e = pwndbg.memory.pvoid(e)
tcache_addr = int(allocator.thread_cache.address)
if e == tcache_addr:
# todo, actually do checks
print(message.error('Will do checks for tcache double-free (memory_tcache_double_free)'))
errors_found += 1
if int(allocator.get_tcache()['counts'][tc_idx]) < int(allocator.mp['tcache_count']):
print(message.success('Using tcache_put'))
if errors_found == 0:
returned_before_error = True
if errors_found > 0:
finalize(errors_found, returned_before_error)
return
# is fastbin
if chunk_size_unmasked <= allocator.global_max_fast:
print(message.notice('Fastbin checks'))
chunk_fastbin_idx = allocator.fastbin_index(chunk_size_unmasked)
fastbin_list = allocator.fastbins(int(arena.address))[(chunk_fastbin_idx+2)*(ptr_size*2)]
try:
next_chunk = read_chunk(addr + chunk_size_unmasked)
except gdb.MemoryError as e:
print(message.error('Can\'t read next chunk at address 0x{:x}, memory error'.format(chunk + chunk_size_unmasked)))
finalize(errors_found, returned_before_error)
return
# next chunk's size is big enough and small enough
next_chunk_size = unsigned_size(next_chunk['size'])
if next_chunk_size <= 2*size_sz or chunksize(next_chunk_size) >= int(arena['system_mem']):
err = 'free(): invalid next size (fast) -> next chunk\'s size not in [2*size_sz; av->system_mem]\n'
err += ' next chunk\'s size is 0x{:x}, 2*size_sz is 0x{:x}, system_mem is 0x{:x}'
err = err.format(next_chunk_size, 2*size_sz, int(arena['system_mem']))
print(message.error(err))
errors_found += 1
# chunk is not the same as the one on top of fastbin[idx]
if int(fastbin_list[0]) == addr:
err = 'double free or corruption (fasttop) -> chunk already is on top of fastbin list\n'
err += ' fastbin idx == {}'
err = err.format(chunk_fastbin_idx)
print(message.error(err))
errors_found += 1
# chunk's size is ~same as top chunk's size
fastbin_top_chunk = int(fastbin_list[0])
if fastbin_top_chunk != 0:
try:
fastbin_top_chunk = read_chunk(fastbin_top_chunk)
except gdb.MemoryError as e:
print(message.error('Can\'t read top fastbin chunk at address 0x{:x}, memory error'.format(fastbin_top_chunk)))
finalize(errors_found, returned_before_error)
return
fastbin_top_chunk_size = chunksize(unsigned_size(fastbin_top_chunk['size']))
if chunk_fastbin_idx != allocator.fastbin_index(fastbin_top_chunk_size):
err = 'invalid fastbin entry (free) -> chunk\'s size is not near top chunk\'s size\n'
err += ' chunk\'s size == {}, idx == {}\n'
err += ' top chunk\'s size == {}, idx == {}'
err += ' if `have_lock` is false then the error is invalid'
err = err.format(chunk['size'], chunk_fastbin_idx,
fastbin_top_chunk_size, allocator.fastbin_index(fastbin_top_chunk_size))
print(message.error(err))
errors_found += 1
# is not mapped
elif is_mmapped == 0:
print(message.notice('Not mapped checks'))
# chunks is not top chunk
if addr == int(arena['top']):
err = 'double free or corruption (top) -> chunk is top chunk'
print(message.error(err))
errors_found += 1
# next chunk is not beyond the boundaries of the arena
NONCONTIGUOUS_BIT = 2
top_chunk_addr = (int(arena['top']))
top_chunk = read_chunk(top_chunk_addr)
next_chunk_addr = addr + chunk_size_unmasked
# todo: in libc, addition may overflow
if (arena['flags'] & NONCONTIGUOUS_BIT == 0) and next_chunk_addr >= top_chunk_addr + chunksize(top_chunk['size']):
err = 'double free or corruption (out) -> next chunk is beyond arena and arena is contiguous\n'
err += 'next chunk at 0x{:x}, end of arena at 0x{:x}'
err = err.format(next_chunk_addr, top_chunk_addr + chunksize(unsigned_size(top_chunk['size'])))
print(message.error(err))
errors_found += 1
# now we need to dereference chunk
try :
next_chunk = read_chunk(next_chunk_addr)
next_chunk_size = chunksize(unsigned_size(next_chunk['size']))
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_chunk_addr)))
finalize(errors_found, returned_before_error)
return
# next chunk's P bit is set
prev_inuse,_,_ = allocator.chunk_flags(next_chunk['size'])
if prev_inuse == 0:
err = 'double free or corruption (!prev) -> next chunk\'s previous-in-use bit is 0\n'
print(message.error(err))
errors_found += 1
# next chunk's size is big enough and small enough
if next_chunk_size <= 2*size_sz or next_chunk_size >= int(arena['system_mem']):
err = 'free(): invalid next size (normal) -> next chunk\'s size not in [2*size_sz; system_mem]\n'
err += 'next chunk\'s size is 0x{:x}, 2*size_sz is 0x{:x}, system_mem is 0x{:x}'
err = err.format(next_chunk_size, 2*size_sz, int(arena['system_mem']))
print(message.error(err))
errors_found += 1
# consolidate backward
prev_inuse,_,_ = allocator.chunk_flags(chunk['size'])
if prev_inuse == 0:
print(message.notice('Backward consolidation'))
prev_size = chunksize(unsigned_size(chunk['prev_size']))
prev_chunk_addr = addr - prev_size
try :
prev_chunk = read_chunk(prev_chunk_addr)
prev_chunk_size = chunksize(unsigned_size(prev_chunk['size']))
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read next chunk at address 0x{:x}'.format(prev_chunk_addr)))
finalize(errors_found, returned_before_error)
return
if unsigned_size(prev_chunk['size']) != prev_size:
err = 'corrupted size vs. prev_size while consolidating\n'
err += 'prev_size field is 0x{:x}, prev chunk at 0x{:x}, prev chunk size is 0x{:x}'
err = err.format(prev_size, prev_chunk_addr, unsigned_size(prev_chunk['size']))
print(message.error(err))
errors_found += 1
else:
addr = prev_chunk_addr
chunk_size += prev_size
chunk_size_unmasked += prev_size
try_unlink(addr)
# consolidate forward
if next_chunk_addr != top_chunk_addr:
print(message.notice('Next chunk is not top chunk'))
try :
next_next_chunk_addr = next_chunk_addr + next_chunk_size
next_next_chunk = read_chunk(next_next_chunk_addr)
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read next chunk at address 0x{:x}'.format(next_next_chunk_addr)))
finalize(errors_found, returned_before_error)
return
prev_inuse,_,_ = allocator.chunk_flags(next_next_chunk['size'])
if prev_inuse == 0:
print(message.notice('Forward consolidation'))
try_unlink(next_chunk_addr)
chunk_size += next_chunk_size
chunk_size_unmasked += next_chunk_size
else:
print(message.notice('Clearing next chunk\'s P bit'))
# unsorted bin fd->bk should be unsorted bean
unsorted_addr = int(arena['bins']) - 2*ptr_size
try:
unsorted = read_chunk(unsorted_addr)
try:
if read_chunk(unsorted['fd'])['bk'] != unsorted_addr:
err = 'free(): corrupted unsorted chunks -> unsorted_chunk->fd->bk != unsorted_chunk\n'
err += 'unsorted at 0x{:x}, unsorted->fd == 0x{:x}, unsorted->fd->bk == 0x{:x}'
err = err.format(unsorted_addr, unsorted['fd'], read_chunk(unsorted['fd'])['bk'])
print(message.error(err))
errors_found += 1
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read chunk at 0x{:x}, it is unsorted bin fd'.format(unsorted['fd'])))
errors_found += 1
except (OverflowError, gdb.MemoryError) as e:
print(message.error('Can\'t read unsorted bin chunk at 0x{:x}'.format(unsorted_addr)))
errors_found += 1
else:
print(message.notice('Next chunk is top chunk'))
chunk_size += next_chunk_size
chunk_size_unmasked += next_chunk_size
# todo: this may vary strongly
FASTBIN_CONSOLIDATION_THRESHOLD = 65536
if chunk_size_unmasked >= FASTBIN_CONSOLIDATION_THRESHOLD:
print(message.notice('Doing malloc_consolidate and systrim/heap_trim'))
#is mapped
else:
message.notice('Doing munmap_chunk')
finalize(errors_found, returned_before_error)
def try_unlink(addr):
pass

@ -333,7 +333,11 @@ class Heap(pwndbg.heap.heap.BaseHeap):
return self.main_arena
return pwndbg.memory.poi(self.malloc_state, arena_addr)
try:
return pwndbg.memory.poi(self.malloc_state, arena_addr)
except gdb.MemoryError:
# print(message.warn('Bad arena address {}'.format(arena_addr.address)))
return None
def get_arena_for_chunk(self, addr):
chunk = pwndbg.commands.heap.read_chunk(addr)

@ -25,7 +25,7 @@ return_code = pytest.main(args)
if return_code != 0:
print('-' * 80)
print('If you want to debug tests locally, modify tests_launcher.py and add --pdb to its args')
print('If you want to debug tests locally, modify {} and add --pdb to its args'.format(__file__))
print('-' * 80)
sys.exit(return_code)

@ -0,0 +1,247 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* copy-paste from malloc.c */
# define INTERNAL_SIZE_T size_t
#define SIZE_SZ (sizeof (INTERNAL_SIZE_T))
#define MALLOC_ALIGNMENT (2 * SIZE_SZ < __alignof__ (long double) \
? __alignof__ (long double) : 2 * SIZE_SZ)
typedef struct malloc_chunk {
INTERNAL_SIZE_T prev_size; /* Size of previous chunk (if P == 0). */
INTERNAL_SIZE_T size; /* Size in bytes, including overhead. 3LSB: N,M,P*/
/* A(NON_MAIN_ARENA), M(IS_MMAPPED), P(PREV_INUSE) */
struct malloc_chunk* fd; /* double links -- used only if free. */
struct malloc_chunk* bk;
/* Only used for large blocks: pointer to next larger size. */
struct malloc_chunk* fd_nextsize; /* double links -- used only if free. */
struct malloc_chunk* bk_nextsize;
} malloc_chunk;
/* common heap setup */
#define setup_heap char *a = malloc(50); \
char *b = malloc(50); \
char *c = malloc(50); \
char *d = malloc(4000); \
char *e = malloc(4000); \
char *f = malloc(4000); \
char *g = malloc(4000); \
malloc_chunk *a_real = (malloc_chunk*)(a - 2*SIZE_SZ); \
malloc_chunk *b_real = (malloc_chunk*)(b - 2*SIZE_SZ); \
malloc_chunk *c_real = (malloc_chunk*)(c - 2*SIZE_SZ); \
malloc_chunk *d_real = (malloc_chunk*)(d - 2*SIZE_SZ); \
malloc_chunk *e_real = (malloc_chunk*)(e - 2*SIZE_SZ); \
malloc_chunk *f_real = (malloc_chunk*)(f - 2*SIZE_SZ); \
malloc_chunk *g_real = (malloc_chunk*)(g - 2*SIZE_SZ); \
int *tmp; \
int tmp2, tmp3; \
printf("a=%p\nb=%p\nc=%p\nd=%p\ne=%p\nf=%p\ng=%p\n", a, b, c, d, e, f, g);
/*
Every function MUST have two comments: "break1" and "break2"
One after setup, second just before line triggering the bug
*/
void invalid_pointer_overflow() {
// free(): invalid pointer
setup_heap
// break1
tmp2 = a_real->size;
a_real->size = 0xffffffffffffff00;
// break2
free(a);
a_real->size = tmp2;
}
void invalid_pointer_misaligned() {
// free(): invalid pointer
setup_heap
// break1
// break2
free(a+2);
}
void invalid_size_minsize() {
// free(): invalid size
setup_heap
// break1
tmp2 = a_real->size;
a_real->size = 8;
// break2
free(a);
a_real->size = tmp2;
}
void invalid_size_misaligned() {
// free(): invalid size
setup_heap
// break1
tmp2 = a_real->size;
a_real->size = 24;
// break2
free(a);
a_real->size = tmp2;
}
void invalid_next_size_fast() {
// free(): invalid next size (fast)
setup_heap
// break1
tmp2 = a_real->size;
tmp3 = a[32 - 2*SIZE_SZ + SIZE_SZ];
a_real->size = 32;
a[32 - 2*SIZE_SZ + SIZE_SZ] = (size_t*)3;
// break2
free(a);
a[32 - 2*SIZE_SZ + SIZE_SZ] = (size_t*)tmp3;
a_real->size = tmp2;
}
void double_free_tcache() {
// free(): double free detected in tcache 2
setup_heap
// break1
free(a);
// break2
free(a);
}
void double_free_fastbin() {
// double free or corruption (fasttop)
setup_heap
// break1
void *ptrs[10];
for (int i = 0; i < 10; ++i)
ptrs[i] = malloc(50);
for (int i = 0; i < 10; ++i)
free(ptrs[i]);
free(a);
// break2
free(a);
}
void invalid_fastbin_entry() {
// invalid fastbin entry (free)
// not working, dunno why
// maybe because of 'have_lock == 0'
setup_heap
// break1
void *ptrs[10];
for (int i = 0; i < 10; ++i)
ptrs[i] = malloc(50);
for (int i = 0; i < 10; ++i)
free(ptrs[i]);
free(a);
a_real->size = 88;
// break2
free(c);
}
void double_free_or_corruption_top() {
// double free or corruption (top)
setup_heap
// break1
malloc_chunk *top_chunk_real = (malloc_chunk*) (((size_t)c_real + c_real->size) & (~7));
char *top_chunk = (char*) ((size_t)top_chunk_real + 2*SIZE_SZ);
// break2
free(top_chunk);
}
void double_free_or_corruption_out() {
// double free or corruption (out)
setup_heap
// break1
d_real->size = 0xffffff00;
// break2
free(d);
}
void double_free_or_corruption_prev() {
// double free or corruption (!prev)
setup_heap
// break1
e_real->size &= ~1;
// break2
free(d);
}
void invalid_next_size_normal() {
// free(): invalid next size (normal)
setup_heap
// break1
e_real->size = 1;
// break2
free(d);
}
void corrupted_consolidate_backward() {
// corrupted size vs. prev_size while consolidating
setup_heap
// break1
free(d);
d_real->size = 0xaa;
// break2
free(e);
}
void corrupted_unsorted_chunks() {
// free(): corrupted unsorted chunks
setup_heap
// break1
free(d); // it goes to unsorted
d_real->bk = a_real;
// break2
free(f);
}
int main(int argc, char const *argv[]) {
setvbuf(stdout, NULL, _IONBF, 0);
if (argc < 2) {
printf("Usage: %s bug_to_trigger\n", argv[0]);
return 1;
}
int choice;
sscanf(argv[1], "%d", &choice);
switch(choice) {
case 1: invalid_pointer_overflow(); break;
case 2: invalid_pointer_misaligned(); break;
case 3: invalid_size_minsize(); break;
case 4: invalid_size_misaligned(); break;
case 5: double_free_tcache(); break;
case 6: invalid_next_size_fast(); break;
case 7: double_free_fastbin(); break;
case 8: invalid_fastbin_entry(); break;
case 9: double_free_or_corruption_top(); break;
case 10: double_free_or_corruption_out(); break;
case 11: double_free_or_corruption_prev(); break;
case 12: invalid_next_size_normal(); break;
case 13: corrupted_consolidate_backward(); break;
case 14: corrupted_unsorted_chunks(); break;
default: printf("Unknown\n");
}
puts("END");
return 0;
}

Binary file not shown.

@ -28,6 +28,7 @@ else
CFLAGS += -O1
endif
GLIBC=/glibc_versions/2.29/tcache_x64
.PHONY : all clean
@ -56,6 +57,21 @@ all: $(LINKED) $(LINKED_ASM) $(COMPILED_GO)
@GOARCH=amd64 $(GO) build $?
@# Not stripped on purpose
heap_bugs: heap_bugs.c
${CC} \
-Wl,-rpath=${GLIBC}:\
${GLIBC}/math:\
${GLIBC}/elf:\
${GLIBC}/dlfcn:\
${GLIBC}/nss:\
${GLIBC}/nis:\
${GLIBC}/rt:\
${GLIBC}/resolv:\
${GLIBC}/crypt:\
${GLIBC}/nptl_db:\
${GLIBC}/nptl:\
-Wl,--dynamic-linker=${GLIBC}/elf/ld.so \
-g -o heap_bugs.out heap_bugs.c
clean :
@echo "[+] Cleaning stuff"

@ -17,13 +17,14 @@ def start_binary():
"""
Returns function that launches given binary with 'start' command
"""
def _start_binary(path):
def _start_binary(path, *args):
gdb.execute('file ' + path)
gdb.execute('start')
gdb.execute('break _start')
gdb.execute('run ' + ' '.join(args))
global _start_binary_called
if _start_binary_called:
raise Exception('Starting more than one binary is not supported in pwndbg tests.')
# if _start_binary_called:
# raise Exception('Starting more than one binary is not supported in pwndbg tests.')
_start_binary_called = True

@ -0,0 +1,213 @@
import gdb
import pwndbg
import tests
import stat
import os
import tempfile
HEAP_BINARY = tests.binaries.get('heap_bugs.out')
HEAP_CODE = tests.binaries.get('heap_bugs.c')
_, OUTPUT_FILE = tempfile.mkstemp()
def binary_parse_breakpoints(binary_code):
"""
Find comments with breakpoints in binary code
and map them to function's cmd line ids
"""
# map bug id to function name (f.e: 2 -> invalid_pointer_misaligned())
with open(binary_code, 'r') as f:
func_names = {}
for line in f:
if 'case ' in line:
bug_id = int(line.split(':')[0].split()[-1])
func_name = line.split(':')[1].split(';')[0].strip()
func_names[bug_id] = func_name
# map bug id to breakpoint line numbers
with open(binary_code, 'r') as f:
breakpoints = {}
lines = f.readlines()
line_no = 0
# find functions
while line_no < len(lines) and len(breakpoints) < len(func_names):
line = lines[line_no]
line_no += 1
for bug_id, func_name in func_names.items():
if 'void {}'.format(func_name) in line:
# find break1 and break2 inside function
b1, b2 = None, None
while line_no < len(lines) and (b1 is None or b2 is None):
line = lines[line_no]
line_no += 1
if 'break1' in line:
b1 = line_no
if 'break2' in line:
b2 = line_no
breakpoints[bug_id] = (b1, b2)
return breakpoints
# breakpoints: (line after setup_heap, line before the one triggering the bug)
breakpoints = binary_parse_breakpoints(HEAP_CODE)
def setup_heap(start_binary, bug_no):
"""
Start binary
Pause after (valid) heap is set-up
Save valid chunks
Continue up until buggy code line
"""
global breakpoints
# for communication python<->HEAP_BINARY
try:
os.remove(OUTPUT_FILE)
except FileNotFoundError:
pass
start_binary(HEAP_BINARY, str(bug_no), '> {}'.format(OUTPUT_FILE))
gdb.execute('break ' + str(breakpoints[bug_no][0]))
gdb.execute('break ' + str(breakpoints[bug_no][1]))
gdb.execute('continue')
gdb.execute('continue')
chunks = {}
with open(OUTPUT_FILE, 'r') as f:
chunk_id = 'a'
for _ in range(7):
chunk = int(f.readline().split('=')[1], 16)
chunks[chunk_id] = chunk
chunk_id = chr(ord(chunk_id) + 1)
return chunks
def test_try_free_invalid_overflow(start_binary):
chunks = setup_heap(start_binary, 1)
result = gdb.execute('try_free {}'.format(hex(chunks['a'])), to_string=True)
assert 'free(): invalid pointer -> &chunk + chunk->size > max memory' in result
os.remove(OUTPUT_FILE)
# def test_try_free_invalid_misaligned(start_binary):
chunks = setup_heap(start_binary, 2)
result = gdb.execute('try_free {}'.format(hex(chunks['a']+2)), to_string=True)
assert 'free(): invalid pointer -> misaligned chunk' in result
os.remove(OUTPUT_FILE)
def test_try_free_invalid_size_minsize(start_binary):
chunks = setup_heap(start_binary, 3)
result = gdb.execute('try_free {}'.format(hex(chunks['a'])), to_string=True)
assert 'free(): invalid size -> chunk\'s size smaller than MINSIZE' in result
os.remove(OUTPUT_FILE)
def test_try_free_invalid_size_misaligned(start_binary):
chunks = setup_heap(start_binary, 4)
result = gdb.execute('try_free {}'.format(hex(chunks['a'])), to_string=True)
assert 'free(): invalid size -> chunk\'s size is not aligned' in result
os.remove(OUTPUT_FILE)
def test_try_free_double_free_tcache(start_binary):
chunks = setup_heap(start_binary, 5)
result = gdb.execute('try_free {}'.format(hex(chunks['a'])), to_string=True)
assert 'Will do checks for tcache double-free' in result
os.remove(OUTPUT_FILE)
def test_try_free_invalid_next_size_fast(start_binary):
chunks = setup_heap(start_binary, 6)
result = gdb.execute('try_free {}'.format(hex(chunks['a'])), to_string=True)
assert 'free(): invalid next size (fast)' in result
os.remove(OUTPUT_FILE)
def test_try_free_double_free(start_binary):
chunks = setup_heap(start_binary, 7)
result = gdb.execute('try_free {}'.format(hex(chunks['a'])), to_string=True)
assert 'double free or corruption (fasttop)' in result
os.remove(OUTPUT_FILE)
def test_try_free_invalid_fastbin_entry(start_binary):
chunks = setup_heap(start_binary, 8)
result = gdb.execute('try_free {}'.format(hex(chunks['c'])), to_string=True)
assert 'invalid fastbin entry (free)' in result
os.remove(OUTPUT_FILE)
def test_try_free_double_free_or_corruption_top(start_binary):
setup_heap(start_binary, 9)
ptr_size = pwndbg.arch.ptrsize
top_chunk = int(pwndbg.heap.current.get_arena()['top']) + 2*ptr_size
result = gdb.execute('try_free {}'.format(hex(top_chunk)), to_string=True)
assert 'double free or corruption (top)' in result
os.remove(OUTPUT_FILE)
def test_try_free_double_free_or_corruption_out(start_binary):
chunks = setup_heap(start_binary, 10)
result = gdb.execute('try_free {}'.format(hex(chunks['d'])), to_string=True)
assert 'double free or corruption (out)' in result
os.remove(OUTPUT_FILE)
def test_try_free_double_free_or_corruption_prev(start_binary):
chunks = setup_heap(start_binary, 11)
result = gdb.execute('try_free {}'.format(hex(chunks['d'])), to_string=True)
assert 'double free or corruption (!prev)' in result
os.remove(OUTPUT_FILE)
def test_try_free_invalid_next_size_normal(start_binary):
chunks = setup_heap(start_binary, 12)
result = gdb.execute('try_free {}'.format(hex(chunks['d'])), to_string=True)
assert 'free(): invalid next size (normal)' in result
os.remove(OUTPUT_FILE)
def test_try_free_corrupted_consolidate_backward(start_binary):
chunks = setup_heap(start_binary, 13)
result = gdb.execute('try_free {}'.format(hex(chunks['e'])), to_string=True)
assert 'corrupted size vs. prev_size while consolidating' in result
os.remove(OUTPUT_FILE)
def test_try_free_corrupted_consolidate_backward(start_binary):
chunks = setup_heap(start_binary, 13)
result = gdb.execute('try_free {}'.format(hex(chunks['e'])), to_string=True)
assert 'corrupted size vs. prev_size while consolidating' in result
os.remove(OUTPUT_FILE)
def test_try_free_corrupted_unsorted_chunks(start_binary):
chunks = setup_heap(start_binary, 14)
result = gdb.execute('try_free {}'.format(hex(chunks['f'])), to_string=True)
assert 'free(): corrupted unsorted chunks' in result
os.remove(OUTPUT_FILE)
Loading…
Cancel
Save