From aa2045077b457d70fcd800556b8353faa5349360 Mon Sep 17 00:00:00 2001 From: Kkevsterrr Date: Thu, 12 Dec 2019 22:07:51 -0500 Subject: [PATCH] Added tests for strategy --- tests/test_strategy.py | 82 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 tests/test_strategy.py diff --git a/tests/test_strategy.py b/tests/test_strategy.py new file mode 100644 index 0000000..3cdafaa --- /dev/null +++ b/tests/test_strategy.py @@ -0,0 +1,82 @@ +import logging +import pytest + +import actions.tree +import actions.drop +import actions.tamper +import actions.duplicate +import actions.sleep +import actions.utils +import actions.strategy + +from scapy.all import IP, TCP + +logger = logging.getLogger("test") + + +def test_run(): + """ + Tests strategy execution. + """ + strat1 = actions.utils.parse("[TCP:flags:R]-duplicate-| \/", logger) + strat2 = actions.utils.parse("[TCP:flags:S]-drop-| \/", logger) + strat3 = actions.utils.parse("[TCP:flags:A]-duplicate(tamper{TCP:dataofs:replace:0},)-| \/", logger) + strat4 = actions.utils.parse("[TCP:flags:A]-duplicate(tamper{TCP:flags:replace:R}(tamper{TCP:chksum:replace:15239},),duplicate(tamper{TCP:flags:replace:S}(tamper{TCP:chksum:replace:14539}(tamper{TCP:seq:corrupt},),),))-| \/", logger) + + p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")) + packets = strat1.act_on_packet(p1, logger, direction="out") + assert packets, "Strategy dropped SYN packets" + assert len(packets) == 1 + assert packets[0]["TCP"].flags == "S" + + p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")) + packets = strat2.act_on_packet(p1, logger, direction="out") + assert not packets, "Strategy failed to drop SYN packets" + + p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A", dataofs=5)) + packets = strat3.act_on_packet(p1, logger, direction="out") + assert packets, "Strategy dropped packets" + assert len(packets) == 2, "Incorrect number of packets emerged from forest" + assert packets[0]["TCP"].dataofs == 0, "Packet tamper failed" + assert packets[1]["TCP"].dataofs == 5, "Duplicate packet was tampered" + + p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A", dataofs=5, chksum=100)) + packets = strat4.act_on_packet(p1, logger, direction="out") + assert packets, "Strategy dropped packets" + assert len(packets) == 3, "Incorrect number of packets emerged from forest" + assert packets[0]["TCP"].flags == "R", "Packet tamper failed" + assert packets[0]["TCP"].chksum != p1["TCP"].chksum, "Packet tamper failed" + assert packets[1]["TCP"].flags == "S", "Packet tamper failed" + assert packets[1]["TCP"].chksum != p1["TCP"].chksum, "Packet tamper failed" + assert packets[1]["TCP"].seq != p1["TCP"].seq, "Packet tamper failed" + assert packets[2]["TCP"].flags == "A", "Duplicate failed" + + strat4 = actions.utils.parse("[TCP:load:]-tamper{TCP:load:replace:mhe76jm0bd}(fragment{ip:-1:True}(tamper{IP:load:corrupt},drop),)-| \/ ", logger) + p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")) + packets = strat4.act_on_packet(p1, logger) + + # Will fail with scapy 2.4.2 if packet is reparsed + strat5 = actions.utils.parse("[TCP:options-eol:]-tamper{TCP:load:replace:o}(tamper{TCP:dataofs:replace:11},)-| \/", logger) + p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")) + packets = strat5.act_on_packet(p1, logger) + + +def test_pretty_print(): + """ + Tests if the string representation of this strategy is correct + """ + logger = logging.getLogger("test") + strat = actions.utils.parse("[TCP:flags:A]-duplicate(tamper{TCP:flags:replace:R}(tamper{TCP:chksum:corrupt},),)-| \/ ", logger) + correct = "TCP:flags:A\nduplicate\n├── tamper{TCP:flags:replace:R}\n│ └── tamper{TCP:chksum:corrupt}\n│ └── ===> \n└── ===> \n \n \/ \n " + assert strat.pretty_print() == correct + + +def test_sleep_parse_handling(): + """ + Tests that the sleep action handles bad parsing. + """ + + print("Testing incorrect parsing:") + assert not actions.sleep.SleepAction().parse("THISHSOULDFAIL", logger) + + assert actions.sleep.SleepAction().parse("10.5", logger)