mirror of https://github.com/Kkevsterrr/geneva
485 lines
16 KiB
Python
485 lines
16 KiB
Python
import copy
|
|
import tempfile
|
|
import traceback
|
|
import os
|
|
import pytest
|
|
import sys
|
|
# Include the root of the project
|
|
sys.path.append("..")
|
|
|
|
import library
|
|
import common
|
|
import evolve
|
|
import evaluator
|
|
import actions.utils
|
|
import layers.packet
|
|
from actions.tamper import TamperAction
|
|
from scapy.all import IP, TCP, UDP
|
|
import random
|
|
|
|
|
|
def test_evolve(logger):
|
|
"""
|
|
Work in Progress
|
|
"""
|
|
strategies = []
|
|
for strategy in library.LAB_STRATEGIES:
|
|
strategies.append(strategy["strategy"])
|
|
logger.setLevel("ERROR")
|
|
options = {}
|
|
options["non-unique-hall"] = False
|
|
options["hall-size"] = 100000
|
|
options["population_size"] = 500
|
|
options["in-trees"] = 0
|
|
options["out-trees"] = 1
|
|
options["in-actions"] = 0
|
|
options["out-actions"] = 3
|
|
options["force_cleanup"] = False
|
|
options["num_generations"] = 10
|
|
options["seed"] = None
|
|
options["elite_clones"] = 0
|
|
options["allowed_retries"] = 20
|
|
options["no-canary"] = True
|
|
options["load_from"] = False
|
|
options["disable_action"] = []
|
|
hall = evolve.genetic_solve(logger, options, None)
|
|
evolve.print_results(hall, logger)
|
|
|
|
|
|
def test_disable_single_action(logger):
|
|
"""
|
|
Tests disabling a single action
|
|
"""
|
|
layers.packet.Packet.reset_restrictions()
|
|
try:
|
|
logger.setLevel("ERROR")
|
|
actions.action.ACTION_CACHE={}
|
|
actions.action.ACTION_CACHE["in"] = {}
|
|
actions.action.ACTION_CACHE["out"] = {}
|
|
disable_actions=["fragment", "drop", "tamper", "duplicate"]
|
|
for action in disable_actions:
|
|
print("Testing disable %s" % (str(action)))
|
|
for i in range(0, 2000):
|
|
p = evolve.generate_strategy(logger, 0, 2, 0, 4, None, disabled=[action])
|
|
assert str(action) not in str(p)
|
|
actions.action.ACTION_CACHE={}
|
|
actions.action.ACTION_CACHE["in"] = {}
|
|
actions.action.ACTION_CACHE["out"] = {}
|
|
finally:
|
|
layers.packet.Packet.reset_restrictions()
|
|
|
|
|
|
def test_disable_multiple_actions(logger):
|
|
"""
|
|
Tests disabling multiple actions
|
|
"""
|
|
layers.packet.Packet.reset_restrictions()
|
|
try:
|
|
logger.setLevel("ERROR")
|
|
actions.action.ACTION_CACHE={}
|
|
actions.action.ACTION_CACHE["in"] = {}
|
|
actions.action.ACTION_CACHE["out"] = {}
|
|
disable_actions=["fragment", "drop", "tamper", "duplicate"]
|
|
for num in range(0,10):
|
|
action1 = disable_actions[random.randint(0,3)]
|
|
action2 = disable_actions[random.randint(0,3)]
|
|
action3 = disable_actions[random.randint(0,3)]
|
|
action_list = [action1, action2, action3]
|
|
print("Testing disable %s" % (str(action_list)))
|
|
for i in range(0, 1000):
|
|
p = evolve.generate_strategy(logger, 0, 2, 0, 4, None, disabled=action_list)
|
|
assert str(action1) not in str(p)
|
|
assert str(action2) not in str(p)
|
|
assert str(action3) not in str(p)
|
|
actions.action.ACTION_CACHE={}
|
|
actions.action.ACTION_CACHE["in"] = {}
|
|
actions.action.ACTION_CACHE["out"] = {}
|
|
finally:
|
|
layers.packet.Packet.reset_restrictions()
|
|
|
|
|
|
def assert_only(ind, field):
|
|
"""
|
|
Helper method to assert that the only tamper field in a given
|
|
individual is the given field.
|
|
"""
|
|
for forest in [ind.in_actions, ind.out_actions]:
|
|
for tree in forest:
|
|
for action in tree:
|
|
if isinstance(action, TamperAction):
|
|
assert action.field == field
|
|
|
|
|
|
def assert_not(ind, fields):
|
|
"""
|
|
Helper method to assert that the tamper field in a given
|
|
individual is not in the list of given fields.
|
|
"""
|
|
for forest in [ind.in_actions, ind.out_actions]:
|
|
for tree in forest:
|
|
for action in tree:
|
|
if isinstance(action, TamperAction):
|
|
assert action.field not in fields
|
|
|
|
|
|
@pytest.mark.parametrize("use_canary", [False, True], ids=["without_canary", "with_canary"])
|
|
def test_disable_fields(logger, use_canary):
|
|
"""
|
|
Tests disabling fields.
|
|
"""
|
|
# Restrict evolve to using ONLY the ack field in the TCP header
|
|
try:
|
|
evolve.restrict_headers(logger, "TCP", "ack", "")
|
|
population = []
|
|
print("Generating population pool")
|
|
canary_id = None
|
|
|
|
# Create an evaluator
|
|
if use_canary:
|
|
cmd = [
|
|
"--test-type", "http",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--use-external-sites",
|
|
"--no-skip-empty",
|
|
"--bad-word", "facebook",
|
|
"--output-directory", actions.utils.RUN_DIRECTORY
|
|
]
|
|
tester = evaluator.Evaluator(cmd, logger)
|
|
canary_id = evolve.run_collection_phase(logger, tester)
|
|
assert canary_id and canary_id != -1
|
|
|
|
# Generate random strategies to initialize the population
|
|
for i in range(0, 2000):
|
|
p = evolve.generate_strategy(logger, 0, 2, 0, 4, None, environment_id=canary_id)
|
|
assert_only(p, "ack")
|
|
population.append(p)
|
|
|
|
for generation in range(0, 20):
|
|
print("Starting fake generation %d" % generation)
|
|
for p in population:
|
|
p.mutate(logger)
|
|
assert_only(p, "ack")
|
|
|
|
layers.packet.Packet.reset_restrictions()
|
|
|
|
# Restrict evolve to using NOT the dataofs or chksum field in the TCP header
|
|
evolve.restrict_headers(logger, "TCP,UDP", "", "dataofs,chksum",)
|
|
population = []
|
|
print("Generating population pool")
|
|
canary_id = None
|
|
# Create an evaluator
|
|
if use_canary:
|
|
cmd = [
|
|
"--test-type", "http",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--use-external-sites",
|
|
"--no-skip-empty",
|
|
"--bad-word", "facebook",
|
|
"--output-directory", actions.utils.RUN_DIRECTORY
|
|
]
|
|
tester = evaluator.Evaluator(cmd, logger)
|
|
canary_id = evolve.run_collection_phase(logger, tester)
|
|
assert canary_id and canary_id != -1
|
|
|
|
# Generate random strategies to initialize the population
|
|
for i in range(0, 2000):
|
|
p = evolve.generate_strategy(logger, 0, 2, 0, 4, None, environment_id=canary_id)
|
|
assert_not(p, ["dataofs", "chksum"])
|
|
population.append(p)
|
|
|
|
for generation in range(0, 20):
|
|
print("Starting fake generation %d" % generation)
|
|
for p in population:
|
|
p.mutate(logger)
|
|
assert_not(p, ["dataofs", "chksum"])
|
|
|
|
finally:
|
|
layers.packet.Packet.reset_restrictions()
|
|
|
|
|
|
@pytest.mark.parametrize("use_canary", [True, False], ids=["with_canary", "without_canary"])
|
|
def test_population_pool(logger, use_canary):
|
|
"""
|
|
Creates a large population pool and runs them through packets.
|
|
The goal of this test is to basically fuzz the framework (and scapy) without having
|
|
to use the evaluator to do so to look for any exceptions/issues that may arise
|
|
to catch them early.
|
|
"""
|
|
logger.setLevel("ERROR")
|
|
canary_id = None
|
|
# Create an evaluator
|
|
if use_canary:
|
|
cmd = [
|
|
"--test-type", "echo",
|
|
"--censor", "censor2",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
"--bad-word", "facebook",
|
|
"--output-directory", actions.utils.RUN_DIRECTORY
|
|
]
|
|
tester = evaluator.Evaluator(cmd, logger)
|
|
canary_id = evolve.run_collection_phase(logger, tester)
|
|
|
|
layers.packet.Packet.reset_restrictions()
|
|
population = []
|
|
print("Generating population pool")
|
|
# Generate random strategies to initialize the population
|
|
for i in range(0, 2000):
|
|
p = evolve.generate_strategy(logger, 0, 2, 0, 4, None, environment_id=canary_id)
|
|
population.append(p)
|
|
print("Population pool generated")
|
|
packets = [
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"),
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"),
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="PA"),
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"),
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="R"),
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x4444),
|
|
IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x8888)
|
|
]
|
|
packets = [layers.packet.Packet(packet) for packet in packets]
|
|
for generation in range(0, 20):
|
|
print("Starting fake generation %d" % generation)
|
|
for ind in population:
|
|
for packet in packets:
|
|
try:
|
|
ind.act_on_packet(packet, logger)
|
|
except:
|
|
traceback.print_exc()
|
|
print(str(ind))
|
|
print(packet)
|
|
packet.show()
|
|
print(layers.packet.SUPPORTED_LAYERS)
|
|
raise
|
|
for p in population:
|
|
try:
|
|
p.mutate(logger)
|
|
except:
|
|
traceback.print_exc()
|
|
print(str(p))
|
|
print(packet)
|
|
print(str(ind))
|
|
raise
|
|
|
|
|
|
def test_eval_only(logger):
|
|
"""
|
|
Tests eval-only.
|
|
"""
|
|
cmd = [
|
|
"--test-type", "http",
|
|
"--censor", "censor2",
|
|
"--server", "http://facebook.com",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
"--bad-word", "facebook",
|
|
"--output-directory", actions.utils.RUN_DIRECTORY
|
|
]
|
|
tester = evaluator.Evaluator(cmd, logger)
|
|
|
|
strat = "\/"
|
|
success_rate = evolve.eval_only(logger, strat, tester, runs=1)
|
|
assert success_rate == 0
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(str(strat).encode('utf-8'))
|
|
f.flush()
|
|
success_rate = evolve.eval_only(logger, f.name, tester, runs=1)
|
|
assert success_rate == 0
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
# If the file is empty, we should just return None
|
|
assert not evolve.eval_only(logger, f.name, tester, runs=1)
|
|
|
|
# Eval only with two successful strategies that are in a file
|
|
strat = "\/ [TCP:flags:R]-drop-|"
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(str(strat).encode('utf-8'))
|
|
f.write(str(strat).encode('utf-8'))
|
|
f.flush()
|
|
success_rate = evolve.eval_only(logger, f.name, tester, runs=1)
|
|
assert success_rate == 1
|
|
|
|
|
|
def test_mutation(logger):
|
|
"""
|
|
Tests mutation.
|
|
"""
|
|
|
|
layers.packet.Packet.reset_restrictions()
|
|
population = [actions.utils.parse("[TCP:flags:PA]-| \/", logger)]
|
|
population[0].in_enabled = False
|
|
assert population
|
|
assert str(actions.utils.parse(str(population[0]), logger)) == str(population[0])
|
|
# Create a hall with a strategy that has failed 5x, but not yet 10x
|
|
hall = { "[TCP:flags:PA]-drop-| \/ ": [-400] * 5 }
|
|
options = {
|
|
"cxpb": 0,
|
|
"mutpb": 1,
|
|
}
|
|
for i in range(0, 2000):
|
|
offspring = evolve.mutation_crossover(logger, population, hall, options)
|
|
assert offspring
|
|
print(offspring[0])
|
|
if str(offspring[0]).strip() == "[TCP:flags:PA]-drop-| \/":
|
|
print("Good mutation")
|
|
break
|
|
else:
|
|
pytest.fail("Never mutated to test strategy")
|
|
print("Rejecting future mutations to [TCP:flags:PA]-drop-| \\/ ")
|
|
stred = str(actions.utils.parse("[TCP:flags:PA]-drop-| \/", logger))
|
|
hall = { "[TCP:flags:PA]-drop-| \\/ ": [-400] * 11 }
|
|
assert stred in hall
|
|
for _ in range(0, 2000):
|
|
offspring = evolve.mutation_crossover(logger, population, hall, options)
|
|
assert offspring
|
|
assert str(offspring[0]).strip() != "[TCP:flags:PA]-drop-| \/" # should always reject this mutation
|
|
print("No rejected mutations found.")
|
|
|
|
|
|
def test_driver(logger):
|
|
"""
|
|
Tests evolve.py driver.
|
|
"""
|
|
cmd = [
|
|
"--no-lock-file",
|
|
"--eval-only", "\/",
|
|
"--test-type", "http",
|
|
"--port", "80",
|
|
"--censor", "censor2",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
"--bad-word", "facebook",
|
|
"--output-directory", actions.utils.RUN_DIRECTORY
|
|
]
|
|
try:
|
|
evolve.driver(cmd)
|
|
finally:
|
|
print("Test shutting down any lingering containers.")
|
|
common.clean_containers()
|
|
|
|
def test_driver_lock_file(logger):
|
|
"""
|
|
Tests driver with lock file
|
|
"""
|
|
# Try with lock file
|
|
cmd = [
|
|
"--population", "10",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--port", "80",
|
|
"--censor", "censor2",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
"--bad-word", "facebook",
|
|
"--force-cleanup",
|
|
"--output-directory", actions.utils.RUN_DIRECTORY
|
|
]
|
|
try:
|
|
evolve.driver(cmd)
|
|
finally:
|
|
print("Test shutting down any lingering containers.")
|
|
common.clean_containers()
|
|
|
|
|
|
def test_driver_failure_cases(logger):
|
|
"""
|
|
Tests driver error handling
|
|
"""
|
|
print("Testing --no-eval with --eval-only")
|
|
# Try --no-eval and --eval-only
|
|
cmd = [
|
|
"--population", "10",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--no-eval",
|
|
"--eval-only", "\/",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
]
|
|
assert not evolve.driver(cmd)
|
|
|
|
print("testing with unparseable seed")
|
|
# Try with unparseable seed
|
|
cmd = [
|
|
"--population", "10",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--seed", "<thiswillnotparse>",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
]
|
|
with pytest.raises(actions.tree.ActionTreeParseError):
|
|
evolve.driver(cmd)
|
|
|
|
print("testing with nonexistent field")
|
|
# Try with unparseable seed
|
|
cmd = [
|
|
"--population", "10",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--seed", "[TCP:thisdontexist:1]-drop-|",
|
|
"--log", actions.utils.CONSOLE_LOG_LEVEL,
|
|
"--no-skip-empty",
|
|
]
|
|
with pytest.raises(AssertionError):
|
|
evolve.driver(cmd)
|
|
|
|
|
|
def test_argparse():
|
|
"""
|
|
Normally we don't test argparsers, but the evolve.py argparser involves collecting
|
|
help strings and importing plugins, so we test it here.
|
|
"""
|
|
cmd = ["--help"]
|
|
# Should print multiple help messages and raise SystemExit
|
|
with pytest.raises(SystemExit):
|
|
evolve.get_args(cmd)
|
|
|
|
cmd = [
|
|
"--no-lock-file",
|
|
"--population", "10",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--port", "80",
|
|
"--censor", "censor2",
|
|
"--log", "debug",
|
|
"--no-skip-empty",
|
|
]
|
|
args = evolve.get_args(cmd)
|
|
assert args.no_lock_file
|
|
assert args.population == 10
|
|
assert args.test_type == "http"
|
|
assert args.log == "debug"
|
|
|
|
|
|
def test_genetic_solve():
|
|
"""
|
|
Normally we don't test argparsers, but the evolve.py argparser involves collecting
|
|
help strings and importing plugins, so we test it here.
|
|
"""
|
|
cmd = [
|
|
"--population", "3",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--port", "80",
|
|
"--censor", "censor2",
|
|
"--log", "info",
|
|
"--seed", "[TCP:flags:PA]-duplicate-|",
|
|
"--no-skip-empty",
|
|
]
|
|
print(evolve.driver(cmd))
|
|
print("testing without evaluator")
|
|
cmd = [
|
|
"--no-eval",
|
|
"--no-lock-file",
|
|
"--population", "3",
|
|
"--generations", "1",
|
|
"--test-type", "http",
|
|
"--port", "80",
|
|
"--censor", "censor2",
|
|
"--log", "info",
|
|
"--seed", "[TCP:flags:PA]-duplicate-|",
|
|
"--no-skip-empty",
|
|
]
|
|
print(evolve.driver(cmd))
|