geneva/tests/test_trigger.py

180 lines
6.8 KiB
Python
Raw Permalink Normal View History

2019-12-13 04:11:21 +01:00
import sys
# Include the root of the project
sys.path.append("..")
2020-06-24 14:20:51 +02:00
import layers.packet
2019-12-13 04:11:21 +01:00
import actions.strategy
import actions.tamper
import actions.utils
import evolve
2019-12-13 04:11:21 +01:00
from scapy.all import IP, TCP
def test_mutate():
"""
Tests the tamper 'replace' primitive.
"""
trigger = actions.trigger.Trigger("field", "flags", "TCP")
trigger.mutate(None)
2019-12-13 04:11:21 +01:00
def test_init(logger):
2019-12-13 04:11:21 +01:00
"""
Tests initialization.
2019-12-13 04:11:21 +01:00
"""
2020-06-24 14:20:51 +02:00
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
trigger = actions.trigger.Trigger(None, None, None)
trigger.is_applicable(packet, logger)
2019-12-13 04:11:21 +01:00
actions.trigger.FIXED_TRIGGER = actions.trigger.Trigger.parse("TCP:flags:SA")
assert actions.trigger.Trigger.get_rand_trigger("test", 1) == ("field", "TCP", "flags", "SA", None)
def test_trigger_gas(logger):
"""
Tests triggers having gas, including changing that gas while in use
"""
2020-06-24 14:20:51 +02:00
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
2019-12-13 04:11:21 +01:00
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="SA", gas=1)
print(trigger)
assert trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
print(trigger)
# test add gas #
trigger.add_gas(3)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
# Test disable, set, and enable gas #
trigger.disable_gas()
assert trigger.is_applicable(packet, logger)
trigger.set_gas(3)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
trigger.enable_gas()
trigger.set_gas(2)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
def test_bomb_trigger_gas(logger):
2019-12-13 04:11:21 +01:00
"""
Tests triggers having bomb gas, including changing that gas while in use
"""
2020-06-24 14:20:51 +02:00
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
2019-12-13 04:11:21 +01:00
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="SA", gas=-1)
print(trigger)
assert not trigger.is_applicable(packet, logger), "trigger should not fire on first run"
assert trigger.is_applicable(packet, logger), "trigger should fire on second run"
print(trigger)
# test add gas #
trigger.add_gas(-3)
assert not trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
# Test disable, set, and enable gas #
trigger.disable_gas()
assert trigger.is_applicable(packet, logger)
trigger.set_gas(-3)
assert not trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
trigger.enable_gas()
trigger.set_gas(-2)
assert not trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
def test_trigger_parse_gas(logger):
2019-12-13 04:11:21 +01:00
"""
Tests triggers having gas, including changing that gas while in use
"""
2020-06-24 14:20:51 +02:00
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
2019-12-13 04:11:21 +01:00
# parse a trigger with 1 gas
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:1")
assert trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
# parse a trigger with no gas left
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:0")
assert not trigger.is_applicable(packet, logger)
# parse a trigger not using gas
trigger = actions.trigger.Trigger.parse("TCP:flags:SA")
assert trigger.is_applicable(packet, logger)
# Check that adding gas while gas is disabled does not work
trigger.add_gas(10)
assert trigger.gas_remaining == None
trigger.enable_gas()
trigger.set_gas(2)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
# Test that it can handle leading/trailing []
trigger = actions.trigger.Trigger.parse("[TCP:flags:SA]")
assert trigger.is_applicable(packet, logger)
def test_bomb_trigger_parse_gas(logger):
2019-12-13 04:11:21 +01:00
"""
Tests bomb triggers having gas, including changing that gas while in use
"""
2020-06-24 14:20:51 +02:00
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
2019-12-13 04:11:21 +01:00
# parse a bomb trigger with 1 gas
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:-1")
assert not trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
# parse a trigger with no gas left
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:0")
assert not trigger.is_applicable(packet, logger)
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:-1")
assert not trigger.is_applicable(packet, logger)
# parse a trigger not using gas
trigger = actions.trigger.Trigger.parse("TCP:flags:SA")
assert trigger.is_applicable(packet, logger)
# Check that adding gas while gas is disabled does not work
trigger.add_gas(10)
assert trigger.gas_remaining == None
trigger.enable_gas()
trigger.set_gas(2)
assert trigger.is_applicable(packet, logger)
assert trigger.is_applicable(packet, logger)
assert not trigger.is_applicable(packet, logger)
# Test that it can handle leading/trailing []
trigger = actions.trigger.Trigger.parse("[TCP:flags:SA]")
assert trigger.is_applicable(packet, logger)
def test_wildcard(logger):
"""
Test wildcard trigger value
"""
2020-06-24 14:20:51 +02:00
packet_1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A"))
packet_2 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet_3 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="RA"))
packet_4 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="P"))
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="A*", gas=None)
assert trigger.is_applicable(packet_1, logger)
assert trigger.is_applicable(packet_2, logger)
assert trigger.is_applicable(packet_3, logger)
assert not trigger.is_applicable(packet_4, logger)