#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ This takes a crashing qtest trace and tries to remove superflous operations """ import sys import os import subprocess import time import struct QEMU_ARGS = None QEMU_PATH = None TIMEOUT = 5 CRASH_TOKEN = None # Minimization levels M1 = False # try removing IO commands iteratively M2 = False # try setting bits in operand of write/out to zero write_suffix_lookup = {"b": (1, "B"), "w": (2, "H"), "l": (4, "L"), "q": (8, "Q")} def usage(): sys.exit("""\ Usage: QEMU_PATH="/path/to/qemu" QEMU_ARGS="args" {} [Options] input_trace output_trace By default, will try to use the second-to-last line in the output to identify whether the crash occred. Optionally, manually set a string that idenitifes the crash by setting CRASH_TOKEN= Options: -M1: enable a loop around the remove minimizer, which may help decrease some timing dependant instructions. Off by default. -M2: try setting bits in operand of write/out to zero. Off by default. """.format((sys.argv[0]))) deduplication_note = """\n\ Note: While trimming the input, sometimes the mutated trace triggers a different type crash but indicates the same bug. Under this situation, our minimizer is incapable of recognizing and stopped from removing it. In the future, we may use a more sophisticated crash case deduplication method. \n""" def check_if_trace_crashes(trace, path): with open(path, "w") as tracefile: tracefile.write("".join(trace)) rc = subprocess.Popen("timeout -s 9 {timeout}s {qemu_path} {qemu_args} 2>&1\ < {trace_path}".format(timeout=TIMEOUT, qemu_path=QEMU_PATH, qemu_args=QEMU_ARGS, trace_path=path), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding="utf-8") global CRASH_TOKEN if CRASH_TOKEN is None: try: outs, _ = rc.communicate(timeout=5) CRASH_TOKEN = " ".join(outs.splitlines()[-2].split()[0:3]) except subprocess.TimeoutExpired: print("subprocess.TimeoutExpired") return False print("Identifying Crashes by this string: {}".format(CRASH_TOKEN)) global deduplication_note print(deduplication_note) return True for line in iter(rc.stdout.readline, ""): if "CLOSED" in line: return False if CRASH_TOKEN in line: return True print("\nWarning:") print(" There is no 'CLOSED'or CRASH_TOKEN in the stdout of subprocess.") print(" Usually this indicates a different type of crash.\n") return False # If previous write commands write the same length of data at the same # interval, we view it as a hint. def split_write_hint(newtrace, i): HINT_LEN = 3 # > 2 if i <=(HINT_LEN-1): return None #find previous continuous write traces k = 0 l = i-1 writes = [] while (k != HINT_LEN and l >= 0): if newtrace[l].startswith("write "): writes.append(newtrace[l]) k += 1 l -= 1 elif newtrace[l] == "": l -= 1 else: return None if k != HINT_LEN: return None length = int(writes[0].split()[2], 16) for j in range(1, HINT_LEN): if length != int(writes[j].split()[2], 16): return None step = int(writes[0].split()[1], 16) - int(writes[1].split()[1], 16) for j in range(1, HINT_LEN-1): if step != int(writes[j].split()[1], 16) - \ int(writes[j+1].split()[1], 16): return None return (int(writes[0].split()[1], 16)+step, length) def remove_lines(newtrace, outpath): remove_step = 1 i = 0 while i < len(newtrace): # 1.) Try to remove lines completely and reproduce the crash. # If it works, we're done. if (i+remove_step) >= len(newtrace): remove_step = 1 prior = newtrace[i:i+remove_step] for j in range(i, i+remove_step): newtrace[j] = "" print("Removing {lines} ...\n".format(lines=prior)) if check_if_trace_crashes(newtrace, outpath): i += remove_step # Double the number of lines to remove for next round remove_step *= 2 continue # Failed to remove multiple IOs, fast recovery if remove_step > 1: for j in range(i, i+remove_step): newtrace[j] = prior[j-i] remove_step = 1 continue newtrace[i] = prior[0] # remove_step = 1 # 2.) Try to replace write{bwlq} commands with a write addr, len # command. Since this can require swapping endianness, try both LE and # BE options. We do this, so we can "trim" the writes in (3) if (newtrace[i].startswith("write") and not newtrace[i].startswith("write ")): suffix = newtrace[i].split()[0][-1] assert(suffix in write_suffix_lookup) addr = int(newtrace[i].split()[1], 16) value = int(newtrace[i].split()[2], 16) for endianness in ['<', '>']: data = struct.pack("{end}{size}".format(end=endianness, size=write_suffix_lookup[suffix][1]), value) newtrace[i] = "write {addr} {size} 0x{data}\n".format( addr=hex(addr), size=hex(write_suffix_lookup[suffix][0]), data=data.hex()) if(check_if_trace_crashes(newtrace, outpath)): break else: newtrace[i] = prior[0] # 3.) If it is a qtest write command: write addr len data, try to split # it into two separate write commands. If splitting the data operand # from length/2^n bytes to the left does not work, try to move the pivot # to the right side, then add one to n, until length/2^n == 0. The idea # is to prune unneccessary bytes from long writes, while accommodating # arbitrary MemoryRegion access sizes and alignments. # This algorithm will fail under some rare situations. # e.g., xxxxxxxxxuxxxxxx (u is the unnecessary byte) if newtrace[i].startswith("write "): addr = int(newtrace[i].split()[1], 16) length = int(newtrace[i].split()[2], 16) data = newtrace[i].split()[3][2:] if length > 1: # Can we get a hint from previous writes? hint = split_write_hint(newtrace, i) if hint is not None: hint_addr = hint[0] hint_len = hint[1] if hint_addr >= addr and hint_addr+hint_len <= addr+length: newtrace[i] = "write {addr} {size} 0x{data}\n".format( addr=hex(hint_addr), size=hex(hint_len), data=data[(hint_addr-addr)*2:\ (hint_addr-addr)*2+hint_len*2]) if check_if_trace_crashes(newtrace, outpath): # next round i += 1 continue newtrace[i] = prior[0] # Try splitting it using a binary approach leftlength = int(length/2) rightlength = length - leftlength newtrace.insert(i+1, "") power = 1 while leftlength > 0: newtrace[i] = "write {addr} {size} 0x{data}\n".format( addr=hex(addr), size=hex(leftlength), data=data[:leftlength*2]) newtrace[i+1] = "write {addr} {size} 0x{data}\n".format( addr=hex(addr+leftlength), size=hex(rightlength), data=data[leftlength*2:]) if check_if_trace_crashes(newtrace, outpath): break # move the pivot to right side if leftlength < rightlength: rightlength, leftlength = leftlength, rightlength continue power += 1 leftlength = int(length/pow(2, power)) rightlength = length - leftlength if check_if_trace_crashes(newtrace, outpath): i -= 1 else: newtrace[i] = prior[0] del newtrace[i+1] i += 1 def clear_bits(newtrace, outpath): # try setting bits in operands of out/write to zero i = 0 while i < len(newtrace): if (not newtrace[i].startswith("write ") and not newtrace[i].startswith("out")): i += 1 continue # write ADDR SIZE DATA # outx ADDR VALUE print("\nzero setting bits: {}".format(newtrace[i])) prefix = " ".join(newtrace[i].split()[:-1]) data = newtrace[i].split()[-1] data_bin = bin(int(data, 16)) data_bin_list = list(data_bin) for j in range(2, len(data_bin_list)): prior = newtrace[i] if (data_bin_list[j] == '1'): data_bin_list[j] = '0' data_try = hex(int("".join(data_bin_list), 2)) # It seems qtest only accepts padded hex-values. if len(data_try) % 2 == 1: data_try = data_try[:2] + "0" + data_try[2:] newtrace[i] = "{prefix} {data_try}\n".format( prefix=prefix, data_try=data_try) if not check_if_trace_crashes(newtrace, outpath): data_bin_list[j] = '1' newtrace[i] = prior i += 1 def minimize_trace(inpath, outpath): global TIMEOUT with open(inpath) as f: trace = f.readlines() start = time.time() if not check_if_trace_crashes(trace, outpath): sys.exit("The input qtest trace didn't cause a crash...") end = time.time() print("Crashed in {} seconds".format(end-start)) TIMEOUT = (end-start)*5 print("Setting the timeout for {} seconds".format(TIMEOUT)) newtrace = trace[:] global M1, M2 # remove lines old_len = len(newtrace) + 1 while(old_len > len(newtrace)): old_len = len(newtrace) print("trace lenth = ", old_len) remove_lines(newtrace, outpath) if not M1 and not M2: break newtrace = list(filter(lambda s: s != "", newtrace)) assert(check_if_trace_crashes(newtrace, outpath)) # set bits to zero if M2: clear_bits(newtrace, outpath) assert(check_if_trace_crashes(newtrace, outpath)) if __name__ == '__main__': if len(sys.argv) < 3: usage() if "-M1" in sys.argv: M1 = True if "-M2" in sys.argv: M2 = True QEMU_PATH = os.getenv("QEMU_PATH") QEMU_ARGS = os.getenv("QEMU_ARGS") if QEMU_PATH is None or QEMU_ARGS is None: usage() # if "accel" not in QEMU_ARGS: # QEMU_ARGS += " -accel qtest" CRASH_TOKEN = os.getenv("CRASH_TOKEN") QEMU_ARGS += " -qtest stdio -monitor none -serial none " minimize_trace(sys.argv[-2], sys.argv[-1])