mirror of https://github.com/Kkevsterrr/geneva
180 lines
6.8 KiB
Python
180 lines
6.8 KiB
Python
import sys
|
|
# Include the root of the project
|
|
sys.path.append("..")
|
|
|
|
import layers.packet
|
|
import actions.strategy
|
|
import actions.tamper
|
|
import actions.utils
|
|
import evolve
|
|
|
|
from scapy.all import IP, TCP
|
|
|
|
|
|
def test_mutate():
|
|
"""
|
|
Tests the tamper 'replace' primitive.
|
|
"""
|
|
trigger = actions.trigger.Trigger("field", "flags", "TCP")
|
|
trigger.mutate(None)
|
|
|
|
|
|
def test_init(logger):
|
|
"""
|
|
Tests initialization.
|
|
"""
|
|
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
|
|
trigger = actions.trigger.Trigger(None, None, None)
|
|
trigger.is_applicable(packet, logger)
|
|
|
|
actions.trigger.FIXED_TRIGGER = actions.trigger.Trigger.parse("TCP:flags:SA")
|
|
assert actions.trigger.Trigger.get_rand_trigger("test", 1) == ("field", "TCP", "flags", "SA", None)
|
|
|
|
|
|
def test_trigger_gas(logger):
|
|
"""
|
|
Tests triggers having gas, including changing that gas while in use
|
|
"""
|
|
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
|
|
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="SA", gas=1)
|
|
print(trigger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
print(trigger)
|
|
# test add gas #
|
|
trigger.add_gas(3)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
# Test disable, set, and enable gas #
|
|
trigger.disable_gas()
|
|
assert trigger.is_applicable(packet, logger)
|
|
trigger.set_gas(3)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
trigger.enable_gas()
|
|
trigger.set_gas(2)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
|
|
def test_bomb_trigger_gas(logger):
|
|
"""
|
|
Tests triggers having bomb gas, including changing that gas while in use
|
|
"""
|
|
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
|
|
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="SA", gas=-1)
|
|
print(trigger)
|
|
assert not trigger.is_applicable(packet, logger), "trigger should not fire on first run"
|
|
assert trigger.is_applicable(packet, logger), "trigger should fire on second run"
|
|
print(trigger)
|
|
# test add gas #
|
|
trigger.add_gas(-3)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
|
|
# Test disable, set, and enable gas #
|
|
trigger.disable_gas()
|
|
assert trigger.is_applicable(packet, logger)
|
|
trigger.set_gas(-3)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
trigger.enable_gas()
|
|
trigger.set_gas(-2)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
|
|
|
|
def test_trigger_parse_gas(logger):
|
|
"""
|
|
Tests triggers having gas, including changing that gas while in use
|
|
"""
|
|
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
|
|
|
|
|
|
# parse a trigger with 1 gas
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:1")
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
# parse a trigger with no gas left
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:0")
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
# parse a trigger not using gas
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA")
|
|
assert trigger.is_applicable(packet, logger)
|
|
# Check that adding gas while gas is disabled does not work
|
|
trigger.add_gas(10)
|
|
assert trigger.gas_remaining == None
|
|
|
|
trigger.enable_gas()
|
|
trigger.set_gas(2)
|
|
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
# Test that it can handle leading/trailing []
|
|
trigger = actions.trigger.Trigger.parse("[TCP:flags:SA]")
|
|
assert trigger.is_applicable(packet, logger)
|
|
|
|
def test_bomb_trigger_parse_gas(logger):
|
|
"""
|
|
Tests bomb triggers having gas, including changing that gas while in use
|
|
"""
|
|
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
|
|
|
|
# parse a bomb trigger with 1 gas
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:-1")
|
|
assert not trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
|
|
# parse a trigger with no gas left
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:0")
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:-1")
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
# parse a trigger not using gas
|
|
trigger = actions.trigger.Trigger.parse("TCP:flags:SA")
|
|
assert trigger.is_applicable(packet, logger)
|
|
# Check that adding gas while gas is disabled does not work
|
|
trigger.add_gas(10)
|
|
assert trigger.gas_remaining == None
|
|
|
|
trigger.enable_gas()
|
|
trigger.set_gas(2)
|
|
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert trigger.is_applicable(packet, logger)
|
|
assert not trigger.is_applicable(packet, logger)
|
|
|
|
# Test that it can handle leading/trailing []
|
|
trigger = actions.trigger.Trigger.parse("[TCP:flags:SA]")
|
|
assert trigger.is_applicable(packet, logger)
|
|
|
|
def test_wildcard(logger):
|
|
"""
|
|
Test wildcard trigger value
|
|
"""
|
|
packet_1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A"))
|
|
packet_2 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
|
|
packet_3 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="RA"))
|
|
packet_4 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="P"))
|
|
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="A*", gas=None)
|
|
assert trigger.is_applicable(packet_1, logger)
|
|
assert trigger.is_applicable(packet_2, logger)
|
|
assert trigger.is_applicable(packet_3, logger)
|
|
assert not trigger.is_applicable(packet_4, logger)
|