From 6ec0a4dc4178e03276cb1d271f91357251f9eb35 Mon Sep 17 00:00:00 2001 From: Kkevsterrr Date: Thu, 12 Dec 2019 22:01:32 -0500 Subject: [PATCH] tests for fragment --- tests/test_fragment.py | 220 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 tests/test_fragment.py diff --git a/tests/test_fragment.py b/tests/test_fragment.py new file mode 100644 index 0000000..3e7bcee --- /dev/null +++ b/tests/test_fragment.py @@ -0,0 +1,220 @@ +import logging +import pytest +import sys +# Include the root of the project +sys.path.append("..") + +import actions.fragment +import actions.packet +import actions.strategy +import actions.utils + +from scapy.all import IP, TCP, UDP + +logger = logging.getLogger("test") + + +def test_segment(): + """ + Tests the duplicate action primitive. + """ + fragment = actions.fragment.FragmentAction(correct_order=True) + assert str(fragment) == "fragment{tcp:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + + assert packet1["Raw"].load != packet2["Raw"].load, "Packets were not different" + assert packet1["Raw"].load == b'da', "Left packet incorrectly fragmented" + assert packet2["Raw"].load == b"ta", "Right packet incorrectly fragmented" + + +def test_segment_reverse(): + """ + Tests the duplicate action primitive in reverse! + """ + fragment = actions.fragment.FragmentAction(correct_order=False) + assert str(fragment) == "fragment{tcp:-1:False}", "Fragment returned incorrect string representation: %s" % str(fragment) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + + assert packet1["Raw"].load != packet2["Raw"].load, "Packets were not different" + assert packet1["Raw"].load == b'ta', "Left packet incorrectly fragmented" + assert packet2["Raw"].load == b"da", "Right packet incorrectly fragmented" + + +def test_odd_fragment(): + """ + Tests long IP fragmentation + """ + + fragment = actions.fragment.FragmentAction(correct_order=True, segment=False) + assert str(fragment) == "fragment{ip:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("dataisodd")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + + assert str(packet1["Raw"].load) != str(packet2["Raw"].load), "Packets were not different" + assert packet1["Raw"].load == b'\x08\xae\r\x05\x00\x00\x00d', "Left packet incorrectly fragmented" + assert packet2["Raw"].load == b'\x00\x00\x00dP\x02 \x00e\xc1\x00\x00dataisodd', "Right packet incorrectly fragmented" + assert packet1["Raw"].load + packet2["Raw"].load == b'\x08\xae\r\x05\x00\x00\x00d\x00\x00\x00dP\x02 \x00e\xc1\x00\x00dataisodd', "Packets fragmentation was incorrect" + + +def test_custom_fragment(): + """ + Tests IP fragments with custom sized lengths + """ + + fragment = actions.fragment.FragmentAction(correct_order=True, fragsize=3, segment=False) + assert str(fragment) == "fragment{ip:3:True}", "Fragment returned incorrect string representation: %s" % str(fragment) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("thisissomedata")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1["Raw"].load) != str(packet2["Raw"].load), "Packets were not different" + assert packet1["Raw"].load == b'\x08\xae\r\x05\x00\x00\x00d\x00\x00\x00dP\x02 \x00zp\x00\x00this', "Left packet incorrectly fragmented" + assert packet2["Raw"].load == b'issomedata', "Right packet incorrectly fragmented" + assert packet1["Raw"].load + packet2["Raw"].load == b'\x08\xae\r\x05\x00\x00\x00d\x00\x00\x00dP\x02 \x00zp\x00\x00thisissomedata', "Packets fragmentation was incorrect" + + +def test_reverse_fragment(): + """ + Tests fragmentation with reversed packets + """ + + fragment = actions.fragment.FragmentAction(correct_order=False, fragsize=3, segment=False) + assert str(fragment) == "fragment{ip:3:False}", "Fragment returned incorrect string representation: %s" % str(fragment) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("thisissomedata")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1["Raw"].load) != str(packet2["Raw"].load), "Packets were not different" + assert packet2["Raw"].load == b'\x08\xae\r\x05\x00\x00\x00d\x00\x00\x00dP\x02 \x00zp\x00\x00this', "Left packet incorrectly fragmented" + assert packet1["Raw"].load == b'issomedata', "Right packet incorrectly fragmented" + assert packet2["Raw"].load + packet1["Raw"].load == b'\x08\xae\r\x05\x00\x00\x00d\x00\x00\x00dP\x02 \x00zp\x00\x00thisissomedata', "Packets fragmentation was incorrect" + + +def test_udp_fragment(): + """ + Tests fragmentation with reversed packets + """ + + fragment = actions.fragment.FragmentAction(correct_order=False, fragsize=2, segment=False) + assert str(fragment) == "fragment{ip:2:False}", "Fragment returned incorrect string representation: %s" % str(fragment) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1["Raw"].load) != str(packet2["Raw"].load), "Packets were not different" + + +def test_parse(): + """ + Tests parsing. + """ + fragment = actions.fragment.FragmentAction(correct_order=False, fragsize=2, segment=False) + assert str(fragment) == "fragment{ip:2:False}", "Fragment returned incorrect string representation: %s" % str(fragment) + + fragment.parse("fragment{tcp:5:False}", logger) + assert fragment.correct_order == False + assert fragment.fragsize == 5 + assert fragment.segment == True + + with pytest.raises(Exception): + fragment.parse("fragment{tcp:5}", logger) + + with pytest.raises(Exception): + fragment.parse("fragment{tcp:a:True}", logger) + + assert fragment.correct_order == False + assert fragment.fragsize == 5 + assert fragment.segment == True + + fragment = actions.fragment.FragmentAction() + assert fragment.correct_order in [True, False] + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")) + + strat = actions.utils.parse("[IP:proto:6:0]-tamper{IP:proto:replace:6}(fragment{ip:-1:True}(tamper{TCP:dataofs:replace:8}(duplicate,),tamper{IP:frag:replace:0}),)-| [IP:tos:0:0]-duplicate-| \/", logger) + strat.act_on_packet(packet, logger) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x4444)) + strat = actions.utils.parse("[IP:proto:6:0]-tamper{IP:proto:replace:6}(fragment{ip:-1:True}(tamper{TCP:dataofs:replace:8}(duplicate,),tamper{IP:frag:replace:0}),)-| [IP:tos:0:0]-duplicate-| \/", logger) + strat.act_on_packet(packet, logger) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, chksum=0x4444)) + strat = actions.utils.parse("[TCP:urgptr:0]-tamper{TCP:options-altchksumopt:corrupt}(fragment{tcp:-1:True}(tamper{IP:proto:corrupt},tamper{TCP:seq:replace:654077552}),)-| \/", logger) + strat.act_on_packet(packet, logger) + + strat = actions.utils.parse("[TCP:options-mss:]-tamper{TCP:load:replace:}(fragment{tcp:-1:True},)-| \/", logger) + strat.act_on_packet(packet, logger) + + strat = actions.utils.parse("[TCP:options-mss:]-tamper{IP:frag:replace:1353}(tamper{TCP:load:replace:}(fragment{tcp:-1:True},),)-| \/", logger) + strat.act_on_packet(packet, logger) + + strat = actions.utils.parse("[IP:ihl:5]-duplicate-| [TCP:options-mss:]-tamper{IP:frag:replace:1353}(fragment{tcp:-1:True}(tamper{TCP:load:replace:}(fragment{tcp:-1:False},),tamper{DNSQR:qtype:replace:45416}),)-| \/", logger) + strat.act_on_packet(packet, logger) + + strat = actions.utils.parse("[DNSQR:qclass:25989]-duplicate(duplicate(tamper{DNSQR:qtype:replace:30882},),tamper{UDP:sport:replace:42042})-| [TCP:options-nop:]-tamper{TCP:options-nop:corrupt}(tamper{TCP:load:replace:mjkuskjzgy}(tamper{IP:frag:replace:410}(fragment{tcp:-1:True},),),)-| \/", logger) + strat.act_on_packet(packet, logger) + + +def test_fallback(): + """ + Tests fallback behavior. + """ + fragment = actions.fragment.FragmentAction(correct_order=False, fragsize=2, segment=False) + assert str(fragment) == "fragment{ip:2:False}", "Fragment returned incorrect string representation: %s" % str(fragment) + + fragment.parse("fragment{ip:0:False}", logger) + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata")) + packet1, packet2 = fragment.run(packet, logger) + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1) == str(packet2) + + fragment.parse("fragment{tcp:-1:False}", logger) + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata")) + packet1, packet2 = fragment.run(packet, logger) + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1) == str(packet2) + + fragment.parse("fragment{tcp:-1:False}", logger) + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, chksum=0x4444)) + packet1, packet2 = fragment.run(packet, logger) + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1) == str(packet2) + + fragment.parse("fragment{ip:-1:False}", logger) + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)) + packet1, packet2 = fragment.run(packet, logger) + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + assert str(packet1) == str(packet2) + + +def test_ip_only_fragment(): + """ + Tests fragmentation without higher protocols. + """ + + fragment = actions.fragment.FragmentAction(correct_order=True) + fragment.parse("fragment{ip:-1:True}", logger) + + packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/("datadata11datadata")) + packet1, packet2 = fragment.run(packet, logger) + + assert id(packet1) != id(packet2), "Duplicate aliased packet objects" + + assert packet1["Raw"].load != packet2["Raw"].load, "Packets were not different" + assert packet1["Raw"].load == b'datadata', "Left packet incorrectly fragmented" + assert packet2["Raw"].load == b"11datadata", "Right packet incorrectly fragmented" + +