geneva/tests/test_tamper.py
2020-06-24 05:20:51 -07:00

467 lines
19 KiB
Python

import copy
import sys
import pytest
import random
# Include the root of the project
sys.path.append("..")
import evolve
import evaluator
import actions.strategy
import layers.packet
import actions.utils
import actions.tamper
import layers.layer
import layers.ip_layer
from scapy.all import IP, TCP, UDP, DNS, DNSQR, sr1
def test_tamper(logger):
"""
Tests tampering with replace
"""
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="flags", tamper_type="replace", tamper_value="R")
lpacket, rpacket = tamper.run(packet, logger)
assert not rpacket, "Tamper must not return right child"
assert lpacket, "Tamper must give a left child"
assert id(lpacket) == id(packet), "Tamper must edit in place"
# Confirm tamper replaced the field it was supposed to
assert packet[TCP].flags == "R", "Tamper did not replace flags."
new_value = packet[TCP].flags
# Must run this check repeatedly - if a scapy fuzz-ed value is not properly
# ._fix()-ed, it will return different values each time it's requested
for _ in range(0, 5):
assert packet[TCP].flags == new_value, "Replaced value is not stable"
# Confirm tamper didn't corrupt anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["flags"])
# Confirm tamper didn't corrupt anything in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_tamper_ip(logger):
"""
Tests tampering with IP
"""
packet = layers.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="src", tamper_type="replace", tamper_value="192.168.1.1", tamper_proto="IP")
lpacket, rpacket = tamper.run(packet, logger)
assert not rpacket, "Tamper must not return right child"
assert lpacket, "Tamper must give a left child"
assert id(lpacket) == id(packet), "Tamper must edit in place"
# Confirm tamper replaced the field it was supposed to
assert packet[IP].src == "192.168.1.1", "Tamper did not replace flags."
# Confirm tamper didn't corrupt anything in the TCP header
assert confirm_unchanged(packet, original, TCP, [])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, ["src"])
def test_tamper_udp(logger):
"""
Tests tampering with UDP
"""
packet = layers.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=2222, dport=53))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="chksum", tamper_type="replace", tamper_value=4444, tamper_proto="UDP")
lpacket, rpacket = tamper.run(packet, logger)
assert not rpacket, "Tamper must not return right child"
assert lpacket, "Tamper must give a left child"
assert id(lpacket) == id(packet), "Tamper must edit in place"
# Confirm tamper replaced the field it was supposed to
assert packet[UDP].chksum == 4444, "Tamper did not replace flags."
# Confirm tamper didn't corrupt anything in the TCP header
assert confirm_unchanged(packet, original, UDP, ["chksum"])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_tamper_ip_ident(logger):
"""
Tests tampering with IP and that the checksum is correctly changed
"""
packet = layers.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field='id', tamper_type='replace', tamper_value=3333, tamper_proto="IP")
lpacket, rpacket = tamper.run(packet, logger)
assert not rpacket, "Tamper must not return right child"
assert lpacket, "Tamper must give a left child"
assert id(lpacket) == id(packet), "Tamper must edit in place"
# Confirm tamper replaced the field it was supposed to
assert packet[IP].id == 3333, "Tamper did not replace flags."
# Confirm tamper didn't corrupt anything in the TCP header
assert confirm_unchanged(packet, original, TCP, [])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, ["id"])
def confirm_unchanged(packet, original, protocol, changed):
"""
Checks that no other field besides the given array of changed fields
are different between these two packets.
"""
for header in packet.layers:
if packet.layers[header].protocol != protocol:
continue
for field in packet.layers[header].fields:
# Skip checking the field we just changed
if field in changed or field == "load":
continue
assert packet.get(protocol.__name__, field) == original.get(protocol.__name__, field), "Tamper changed %s field %s." % (str(protocol), field)
return True
@pytest.mark.parametrize("use_canary", [False, True], ids=["without_canary", "with_canary"])
def test_mutate(logger, use_canary):
"""
Tests the tamper 'replace' primitive.
"""
logger.setLevel("ERROR")
canary_id = None
# Create an evaluator
if use_canary:
cmd = [
"--test-type", "echo",
"--censor", "censor2",
"--log", actions.utils.CONSOLE_LOG_LEVEL,
"--no-skip-empty",
"--bad-word", "facebook",
"--output-directory", actions.utils.RUN_DIRECTORY
]
tester = evaluator.Evaluator(cmd, logger)
canary_id = evolve.run_collection_phase(logger, tester)
for _ in range(0, 25):
tamper = actions.tamper.TamperAction(None, field="flags", tamper_type="replace", tamper_value="R", tamper_proto="TCP")
# Test mutation 200 times to ensure it remains stable
for _ in range(0, 200):
tamper._mutate(canary_id)
tamper2 = actions.tamper.TamperAction(None)
# Confirm tamper value was properly ._fix()-ed
val = tamper.tamper_value
for _ in range(0, 5):
assert tamper.tamper_value == val, "Tamper value is not stable."
# Create a test packet to ensure the field/proto choice was safe
if random.random() < 0.5:
test_packet = layers.packet.Packet(IP()/TCP())
else:
test_packet = layers.packet.Packet(IP()/UDP())
# Check that tamper can run safely after mutation
try:
tamper.run(test_packet, logger)
except:
print(str(tamper))
raise
tamper._mutate_tamper_type()
# Test that parsing tamper works - note we have to remove the tamper{} to make a call directly using tamper's parse.
tamper2.parse(str(tamper)[7:-1], logger)
assert str(tamper2) == str(tamper)
def test_parse_parameters(logger):
"""
Tests that tamper properly rejects malformed tamper actions
"""
with pytest.raises(Exception):
actions.tamper.TamperAction().parse("this:has:too:many:parameters", logger)
with pytest.raises(Exception):
actions.tamper.TamperAction().parse("not:enough", logger)
def test_corrupt(logger):
"""
Tests the tamper 'corrupt' primitive.
"""
tamper = actions.tamper.TamperAction(None, field="flags", tamper_type="corrupt", tamper_value="R")
assert tamper.field == "flags", "Tamper action changed fields."
assert tamper.tamper_type == "corrupt", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:flags:corrupt}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
new_value = packet[TCP].flags
# Must run this check repeatedly - if a scapy fuzz-ed value is not properly
# ._fix()-ed, it will return different values each time it's requested
for _ in range(0, 5):
assert packet[TCP].flags == new_value, "Corrupted value is not stable"
# Confirm tamper didn't corrupt anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["flags"])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_add(logger):
"""
Tests the tamper 'add' primitive.
"""
tamper = actions.tamper.TamperAction(None, field="seq", tamper_type="add", tamper_value=10)
assert tamper.field == "seq", "Tamper action changed fields."
assert tamper.tamper_type == "add", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:seq:add:10}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
new_value = packet[TCP].seq
assert new_value == 110, "Tamper did not add"
# Must run this check repeatedly - if a scapy fuzz-ed value is not properly
# ._fix()-ed, it will return different values each time it's requested
for _ in range(0, 5):
assert packet[TCP].seq == new_value, "Corrupted value is not stable"
# Confirm tamper didn't corrupt anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["seq"])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_decompress(logger):
"""
Tests the tamper 'decompress' primitive.
"""
tamper = actions.tamper.TamperAction(None, field="qd", tamper_type="compress", tamper_value=10, tamper_proto="DNS")
assert tamper.field == "qd", "Tamper action changed fields."
assert tamper.tamper_type == "compress", "Tamper action changed types."
assert str(tamper) == "tamper{DNS:qd:compress}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = layers.packet.Packet(IP(dst="8.8.8.8")/UDP(dport=53)/DNS(qd=DNSQR(qname="minghui.ca.")))
original = packet.copy()
tamper.tamper(packet, logger)
assert bytes(packet["DNS"]) == b'\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x07minghui\xc0\x1a\x00\x01\x00\x01\x02ca\x00\x00\x01\x00\x01'
resp = sr1(packet.packet)
assert resp["DNS"]
assert resp["DNS"].rcode != 1
assert resp["DNSQR"]
assert resp["DNSRR"].rdata
assert confirm_unchanged(packet, original, IP, ["len"])
print(resp.summary())
packet = layers.packet.Packet(IP(dst="8.8.8.8")/UDP(dport=53)/DNS(qd=DNSQR(qname="maps.google.com")))
original = packet.copy()
tamper.tamper(packet, logger)
assert bytes(packet["DNS"]) == b'\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x04maps\xc0\x17\x00\x01\x00\x01\x06google\x03com\x00\x00\x01\x00\x01'
resp = sr1(packet.packet)
assert resp["DNS"]
assert resp["DNS"].rcode != 1
assert resp["DNSQR"]
assert resp["DNSRR"].rdata
assert confirm_unchanged(packet, original, IP, ["len"])
print(resp.summary())
# Confirm this is a NOP on normal packets
packet = layers.packet.Packet(IP()/UDP())
original = packet.copy()
tamper.tamper(packet, logger)
assert packet.packet.summary() == original.packet.summary()
# Confirm tamper didn't corrupt anything else in the TCP header
assert confirm_unchanged(packet, original, UDP, [])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_corrupt_chksum(logger):
"""
Tests the tamper 'replace' primitive.
"""
tamper = actions.tamper.TamperAction(None, field="chksum", tamper_type="corrupt", tamper_value="R")
assert tamper.field == "chksum", "Tamper action changed checksum."
assert tamper.tamper_type == "corrupt", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:chksum:corrupt}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
# Confirm tamper actually corrupted the checksum
assert packet[TCP].chksum != 0
new_value = packet[TCP].chksum
# Must run this check repeatedly - if a scapy fuzz-ed value is not properly
# ._fix()-ed, it will return different values each time it's requested
for _ in range(0, 5):
assert packet[TCP].chksum == new_value, "Corrupted value is not stable"
# Confirm tamper didn't corrupt anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["chksum"])
# Confirm tamper didn't corrupt anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_corrupt_dataofs(logger):
"""
Tests the tamper 'replace' primitive.
"""
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S", dataofs="6L"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="dataofs", tamper_type="corrupt")
tamper.tamper(packet, logger)
# Confirm tamper actually corrupted the checksum
assert packet[TCP].dataofs != "0"
new_value = packet[TCP].dataofs
# Must run this check repeatedly - if a scapy fuzz-ed value is not properly
# ._fix()-ed, it will return different values each time it's requested
for _ in range(0, 5):
assert packet[TCP].dataofs == new_value, "Corrupted value is not stable"
# Confirm tamper didn't corrupt anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["dataofs"])
# Confirm tamper didn't corrupt anything in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_replace(logger):
"""
Tests the tamper 'replace' primitive.
"""
tamper = actions.tamper.TamperAction(None, field="flags", tamper_type="replace", tamper_value="R")
assert tamper.field == "flags", "Tamper action changed fields."
assert tamper.tamper_type == "replace", "Tamper action changed types."
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
# Confirm tamper replaced the field it was supposed to
assert packet[TCP].flags == "R", "Tamper did not replace flags."
# Confirm tamper didn't replace anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["flags"])
# Confirm tamper didn't replace anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
# chksums must be handled specially by tamper, so run a second check on this value
tamper.field = "chksum"
tamper.tamper_value = 0x4444
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
assert packet[TCP].chksum == 0x4444, "Tamper failed to change chksum."
# Confirm tamper didn't replace anything else in the TCP header
assert confirm_unchanged(packet, original, TCP, ["chksum"])
# Confirm tamper didn't replace anything else in the IP header
assert confirm_unchanged(packet, original, IP, [])
def test_init():
"""
Tests initializing with no parameters
"""
tamper = actions.tamper.TamperAction(None)
assert tamper.field
assert tamper.tamper_proto
assert tamper.tamper_value is not None
def test_parse_flags(logger):
"""
Tests the tamper 'replace' primitive.
"""
tamper = actions.tamper.TamperAction(None, field="flags", tamper_type="replace", tamper_value="FRAPUN")
assert tamper.field == "flags", "Tamper action changed checksum."
assert tamper.tamper_type == "replace", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:flags:replace:FRAPUN}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
tamper.tamper(packet, logger)
assert packet[TCP].flags == "FRAPUN", "Tamper failed to change flags."
@pytest.mark.parametrize("test_type", ["parsed", "direct"])
@pytest.mark.parametrize("value", ["EOL", "NOP", "Timestamp", "MSS", "WScale", "SAckOK", "SAck", "Timestamp", "AltChkSum", "AltChkSumOpt", "UTO"])
def test_options(logger, value, test_type):
"""
Tests tampering options
"""
if test_type == "direct":
tamper = actions.tamper.TamperAction(None, field="options-%s" % value.lower(), tamper_type="corrupt", tamper_value=bytes([12]))
else:
tamper = actions.tamper.TamperAction(None)
assert tamper.parse("TCP:options-%s:corrupt" % value.lower(), logger)
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
tamper.run(packet, logger)
opts_dict_lookup = value.lower().replace(" ", "_")
for optname, optval in packet["TCP"].options:
if optname == value:
break
elif optname == layers.ip_layer.TCPLayer.options_names[opts_dict_lookup]:
break
else:
pytest.fail("Failed to find %s in options" % value)
assert len(packet["TCP"].options) == 1
raw_p = bytes(packet)
assert raw_p, "options broke scapy bytes"
p2 = layers.packet.Packet(IP(bytes(raw_p)))
assert p2.haslayer("IP")
assert p2.haslayer("TCP")
# EOLs might be added for padding, so just check >= 1
assert len(p2["TCP"].options) >= 1
for optname, optval in p2["TCP"].options:
if optname == value:
break
elif optname == layers.ip_layer.TCPLayer.options_names[opts_dict_lookup]:
break
else:
pytest.fail("Failed to find %s in options" % value)
def test_tamper_mutate_compress(logger):
"""
Tests that compress is handled right if its enabled
"""
backup = copy.deepcopy(actions.tamper.ACTIVATED_PRIMITIVES)
actions.tamper.ACTIVATED_PRIMITIVES = ["compress"]
try:
tamper = actions.tamper.TamperAction(None)
assert tamper.parse("TCP:flags:corrupt", logger)
tamper._mutate_tamper_type()
assert tamper.tamper_type == "compress"
assert tamper.tamper_proto_str == "DNS"
assert tamper.field == "qd"
packet = layers.packet.Packet(IP()/TCP()/DNS()/DNSQR())
packet2 = tamper.tamper(packet, logger)
assert packet2 == packet
finally:
actions.tamper.ACTIVATED_PRIMITIVES = backup