Reorganized layers into new directory.

This commit is contained in:
Michael Harrity 2020-06-24 05:20:51 -07:00
parent b27ea50522
commit 0796312589
45 changed files with 1203 additions and 1174 deletions

View File

@ -1,6 +1,6 @@
import random
from actions.action import Action
import actions.packet
import layers.packet
from scapy.all import IP, TCP, fragment
@ -88,8 +88,8 @@ class FragmentAction(Action):
else:
# packet can be fragmented as requested
frags = self.fragment(packet.copy().packet, fragsize=self.fragsize*8)
packet1 = actions.packet.Packet(frags[0])
packet2 = actions.packet.Packet(frags[1])
packet1 = layers.packet.Packet(frags[0])
packet2 = layers.packet.Packet(frags[1])
if self.correct_order:
return packet1, packet2
else:
@ -132,8 +132,8 @@ class FragmentAction(Action):
if not pkt2.haslayer("TCP"):
pkt2 = IP(packet["IP"])/TCP(bytes(pkt2["IP"].load))
packet1 = actions.packet.Packet(pkt1)
packet2 = actions.packet.Packet(pkt2)
packet1 = layers.packet.Packet(pkt1)
packet2 = layers.packet.Packet(pkt2)
# Reset packet2's SYN number
if packet2["TCP"].seq + fragsize > MAX_UINT:

View File

@ -1,854 +0,0 @@
import binascii
import copy
import random
import string
import os
import urllib.parse
from scapy.all import IP, RandIP, UDP, DNS, DNSQR, Raw, TCP, fuzz
class Layer():
"""
Base class defining a Geneva packet layer.
"""
protocol = None
def __init__(self, layer):
"""
Initializes this layer.
"""
self.layer = layer
# No custom setter, getters, generators, or parsers are needed by default
self.setters = {}
self.getters = {}
self.generators = {}
self.parsers = {}
@classmethod
def reset_restrictions(cls):
"""
Resets field restrictions placed on this layer.
"""
cls.fields = cls._fields
def get_next_layer(self):
"""
Given the current layer returns the next layer beneath us.
"""
if len(self.layer.layers()) == 1:
return None
return self.layer[1]
def get_random(self):
"""
Retreives a random field and value.
"""
field = random.choice(self.fields)
return field, self.get(field)
def gen_random(self):
"""
Generates a random field and value.
"""
assert self.fields, "Layer %s doesn't have any fields" % str(self)
field = random.choice(self.fields)
return field, self.gen(field)
@classmethod
def name_matches(cls, name):
"""
Checks if given name matches this layer name.
"""
return name.upper() == cls.name.upper()
def get(self, field):
"""
Retrieves the value from a given field.
"""
assert field in self.fields
if field in self.getters:
return self.getters[field](field)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
base = field.split("-")[0]
if "-" in field and base in self.getters:
return self.getters[base](field)
return getattr(self.layer, field)
def set(self, packet, field, value):
"""
Sets the value for a given field.
"""
assert field in self.fields
base = field.split("-")[0]
if field in self.setters:
self.setters[field](packet, field, value)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
elif "-" in field and base in self.setters:
self.setters[base](packet, field, value)
else:
setattr(self.layer, field, value)
# Request the packet be reparsed to confirm the value is stable
# XXX Temporarily disabling the reconstitution check due to scapy bug (#2034)
#assert bytes(self.protocol(bytes(self.layer))) == bytes(self.layer)
def gen(self, field):
"""
Generates a value for this field.
"""
assert field in self.fields
if field in self.generators:
return self.generators[field](field)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
base = field.split("-")[0]
if "-" in field and base in self.generators:
return self.generators[base](field)
sample = fuzz(self.protocol())
new_value = getattr(sample, field)
if new_value == None:
new_value = 0
elif type(new_value) != int:
new_value = new_value._fix()
return new_value
def parse(self, field, value):
"""
Parses the given value for a given field. This is useful for fields whose
value cannot be represented in a string type easily - it lets us define
a common string representation for the strategy, and parse it back into
a real value here.
"""
assert field in self.fields
if field in self.parsers:
return self.parsers[field](field, value)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
base = field.split("-")[0]
if "-" in field and base in self.parsers:
return self.parsers[base](field, value)
try:
parsed = int(value)
except ValueError:
parsed = value
return parsed
def get_load(self, field):
"""
Helper method to retrieve load, as scapy doesn't recognize 'load' as
a regular field properly.
"""
try:
load = self.layer.payload.load
except AttributeError:
pass
try:
load = self.layer.load
except AttributeError:
return ""
if not load:
return ""
return urllib.parse.quote(load.decode('utf-8', 'ignore'))
def set_load(self, packet, field, value):
"""
Helper method to retrieve load, as scapy doesn't recognize 'load' as
a field properly.
"""
if packet.haslayer("IP"):
del packet["IP"].len
value = urllib.parse.unquote(value)
value = value.encode('utf-8')
dns_payload = b"\x009ib\x81\x80\x00\x01\x00\x01\x00\x00\x00\x01\x08faceface\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x01+\x00\x04\xc7\xbf2I\x00\x00)\x02\x00\x00\x00\x00\x00\x00\x00"
http_payload = b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n"
value = value.replace(b"__DNS_REQUEST__", dns_payload)
value = value.replace(b"__HTTP_REQUEST__", http_payload)
self.layer.payload = Raw(value)
def gen_load(self, field):
"""
Helper method to generate a random load, as scapy doesn't recognize 'load'
as a field properly.
"""
load = ''.join([random.choice(string.ascii_lowercase + string.digits) for k in range(10)])
return random.choice(["", "__DNS_REQUEST__", "__HTTP_REQUEST__", urllib.parse.quote(load)])
class RawLayer(Layer):
"""
Defines an interface for the scapy Raw layer.
"""
name = "Raw"
protocol = Raw
_fields = []
fields = []
class IPLayer(Layer):
"""
Defines an interface to access IP header fields.
"""
name = "IP"
protocol = IP
_fields = [
'version',
'ihl',
'tos',
'len',
'id',
'flags',
'frag',
'ttl',
'proto',
'chksum',
'src',
'dst',
'load'
]
fields = _fields
def __init__(self, layer):
"""
Initializes the IP layer.
"""
Layer.__init__(self, layer)
self.getters = {
"flags" : self.get_flags,
"load" : self.get_load
}
self.setters = {
"flags" : self.set_flags,
"load" : self.set_load
}
self.generators = {
"src" : self.gen_ip,
"dst" : self.gen_ip,
"chksum" : self.gen_chksum,
"len" : self.gen_len,
"load" : self.gen_load,
"flags" : self.gen_flags
}
def gen_len(self, field):
"""
Generates a valid IP length. Scapy breaks if the length is set to 0, so
return a random int starting at 1.
"""
return random.randint(1, 500)
def gen_chksum(self, field):
"""
Generates a checksum.
"""
return random.randint(1, 65535)
def gen_ip(self, field):
"""
Generates an IP address.
"""
return RandIP()._fix()
def get_flags(self, field):
"""
Retrieves flags as a string.
"""
return str(self.layer.flags)
def set_flags(self, packet, field, value):
"""
Sets the flags field. There is a bug in scapy, if you retrieve an empty
flags field, it will return "", but you cannot set this value back.
To reproduce this bug:
.. code-block:: python
>>> setattr(IP(), "flags", str(IP().flags)) # raises a ValueError
To handle this case, this method converts empty string to zero so that
it can be safely stored.
"""
if value == "":
value = 0
self.layer.flags = value
def gen_flags(self, field):
"""
Generates random valid flags.
"""
sample = fuzz(self.protocol())
# Since scapy lazily evaluates fuzzing, we first must set a
# legitimate value for scapy to evaluate what combination of flags it is
sample.flags = sample.flags
return str(sample.flags)
class TCPLayer(Layer):
"""
Defines an interface to access TCP header fields.
"""
name = "TCP"
protocol = TCP
_fields = [
'sport',
'dport',
'seq',
'ack',
'dataofs',
'reserved',
'flags',
'window',
'chksum',
'urgptr',
'load',
'options-eol',
'options-nop',
'options-mss',
'options-wscale',
'options-sackok',
'options-sack',
'options-timestamp',
'options-altchksum',
'options-altchksumopt',
'options-md5header',
'options-uto'
]
fields = _fields
options_names = {
"eol": 0,
"nop": 1,
"mss": 2,
"wscale": 3,
"sackok": 4,
"sack": 5,
#"echo" : 6,
#"echo_reply" : 7,
"timestamp": 8,
"altchksum": 14,
"altchksumopt": 15,
"md5header": 19,
#"quick_start" : 27,
"uto": 28
#"authentication": 29,
#"experiment": 254
}
# Each entry is Kind: length
options_length = {
0: 0, # EOL
1: 0, # NOP
2: 2, # MSS
3: 1, # WScale
4: 0, # SAckOK
5: 0, # SAck
6: 4, # Echo
7: 4, # Echo Reply
8: 8, # Timestamp
14: 3, # AltChkSum
15: 0, # AltChkSumOpt
19: 16, # MD5header Option
27: 6, # Quick-Start response
28: 2, # User Timeout Option
29: 4, # TCP Authentication Option
254: 8, # Experiment
}
# Required by scapy
scapy_options = {
0: "EOL",
1: "NOP",
2: "MSS",
3: "WScale",
4: "SAckOK",
5: "SAck",
8: "Timestamp",
14: "AltChkSum",
15: "AltChkSumOpt",
28: "UTO",
# 254:"Experiment" # scapy has two versions of this, so it doesn't work
}
def __init__(self, layer):
"""
Initializes the TCP layer.
"""
Layer.__init__(self, layer)
# Special methods to help access fields that cannot be accessed normally
self.getters = {
'load' : self.get_load,
'options' : self.get_options
}
self.setters = {
'load' : self.set_load,
'options' : self.set_options
}
# Special methods to help generate fields that cannot be generated normally
self.generators = {
'load' : self.gen_load,
'dataofs' : self.gen_dataofs,
'flags' : self.gen_flags,
'chksum' : self.gen_chksum,
'options' : self.gen_options,
'window' : self.gen_window
}
def gen_window(self, field):
"""
Generates a window size.
"""
return random.choice(range(10, 200, 10))
def gen_chksum(self, field):
"""
Generates a checksum.
"""
return random.randint(1, 65535)
def gen_dataofs(self, field):
"""
Generates a valid value for the data offset field.
"""
# Dataofs is a 4 bit header, so a max of 15
return random.randint(1, 15)
def gen_flags(self, field):
"""
Generates a random set of flags. 50% of the time it picks randomly from
a list of real flags, otherwise it returns fuzzed flags.
"""
if random.random() < 0.5:
return random.choice(['S', 'A', 'SA', 'PA', 'FA', 'R', 'P', 'F', 'RA', ''])
else:
sample = fuzz(self.protocol())
# Since scapy lazily evaluates fuzzing, we first must set a
# legitimate value for scapy to evaluate what combination of flags it is
sample.flags = sample.flags
return str(sample.flags)
def get_options(self, field):
"""
Helper method to retrieve options.
"""
base, req_option = field.split("-")
assert base == "options", "get_options can only be used to fetch options."
option_type = self.option_str_to_int(req_option)
i = 0
# First, check if the option is already present in the packet
for option in self.layer.options:
# Scapy may try to be helpful and return the string of the option
next_option = self.option_str_to_int(option[0])
if option_type == next_option:
_name, value = self.layer.options[i]
# Some options (timestamp, checksums, nop) store their value in a
# tuple.
if isinstance(value, tuple):
# Scapy returns values in any of these types
if value in [None, b'', ()]:
return ''
value = value[0]
if value in [None, b'', ()]:
return ''
if req_option == "md5header":
return binascii.hexlify(value).decode("utf-8")
return value
i += 1
return ''
def set_options(self, packet, field, value):
"""
Helper method to set options.
"""
base, option = field.split("-")
assert base == "options", "Must use an options field with set_options"
option_type = self.option_str_to_int(option)
if type(value) == str:
# Prepare the value for storage in the packet
value = binascii.unhexlify(value)
# Scapy requires these options to be a tuple - since evaling this
# is not yet supported, for now, SAck will always be an empty tuple
if option in ["sack"]:
value = ()
# These options must be set as integers - if they didn't exist, they can
# be added like this
if option in ["timestamp", "mss", "wscale", "altchksum", "uto"] and not value:
value = 0
i = 0
# First, check if the option is already present in the packet
for option in self.layer.options:
# Scapy may try to be helpful and return the string of the option
next_option = self.option_str_to_int(option[0])
if option_type == next_option:
packet["TCP"].options[i] = self.format_option(option_type, value)
break
i += 1
# If we didn't break, the option doesn't exist in the packet currently.
else:
old_options_array = packet["TCP"].options
old_options_array.append(self.format_option(option_type, value))
packet["TCP"].options = old_options_array
# Let scapy recalculate the required values
del self.layer.chksum
del self.layer.dataofs
if packet.haslayer("IP"):
del packet["IP"].chksum
del packet["IP"].len
return True
def gen_options(self, field):
"""
Helper method to set options.
"""
_, option = field.split("-")
option_num = self.options_names[option]
length = self.options_length[option_num]
data = b''
if length > 0:
data = os.urandom(length)
data = binascii.hexlify(data).decode()
# MSS must be a 2-byte int
if option_num == 2:
data = random.randint(0, 65535)
# WScale must be a 1-byte int
elif option_num == 3:
data = random.randint(0, 255)
# Timestamp must be an int
elif option_num == 8:
data = random.randint(0, 4294967294)
elif option_num == 14:
data = random.randint(0, 255)
elif option_num == 28:
data = random.randint(0, 255)
return data
def option_str_to_int(self, option):
"""
Takes a string representation of an option and returns the option integer code.
"""
if type(option) == int:
return option
assert "-" not in option, "Must be given specific option: %s." % option
for val in self.scapy_options:
if self.scapy_options[val].lower() == option.lower():
return val
if " " in option:
option = option.replace(" ", "_").lower()
if option.lower() in self.options_names:
return self.options_names[option.lower()]
def format_option(self, options_int, value):
"""
Formats the options so they will work with scapy.
"""
# NOPs
if options_int == 1:
return (self.scapy_options[options_int], ())
elif options_int in [5]:
return (self.scapy_options[options_int], value)
# Timestamp
elif options_int in [8, 14]:
return (self.scapy_options[options_int], (value, 0))
elif options_int in self.scapy_options:
return (self.scapy_options[options_int], value)
else:
return (options_int, value)
class UDPLayer(Layer):
"""
Defines an interface to access UDP header fields.
"""
name = "UDP"
protocol = UDP
_fields = [
"sport",
"dport",
"chksum",
"len",
"load"
]
fields = _fields
def __init__(self, layer):
"""
Initializes the UDP layer.
"""
Layer.__init__(self, layer)
self.getters = {
'load' : self.get_load,
}
self.setters = {
'load' : self.set_load,
}
self.generators = {
'load' : self.gen_load,
}
class DNSLayer(Layer):
"""
Defines an interface to access DNS header fields.
"""
name = "DNS"
protocol = DNS
_fields = [
"id",
"qr",
"opcode",
"aa",
"tc",
"rd",
"ra",
"z",
"ad",
"cd",
"qd",
"rcode",
"qdcount",
"ancount",
"nscount",
"arcount"
]
fields = _fields
def __init__(self, layer):
"""
Initializes the DNS layer.
"""
Layer.__init__(self, layer)
self.getters = {
"qr" : self.get_bitfield,
"aa" : self.get_bitfield,
"tc" : self.get_bitfield,
"rd" : self.get_bitfield,
"ra" : self.get_bitfield,
"z" : self.get_bitfield,
"ad" : self.get_bitfield,
"cd" : self.get_bitfield
}
self.setters = {
"qr" : self.set_bitfield,
"aa" : self.set_bitfield,
"tc" : self.set_bitfield,
"rd" : self.set_bitfield,
"ra" : self.set_bitfield,
"z" : self.set_bitfield,
"ad" : self.set_bitfield,
"cd" : self.set_bitfield
}
self.generators = {
"id" : self.gen_id,
"qr" : self.gen_bitfield,
"opcode" : self.gen_opcode,
"aa" : self.gen_bitfield,
"tc" : self.gen_bitfield,
"rd" : self.gen_bitfield,
"ra" : self.gen_bitfield,
"z" : self.gen_bitfield,
"ad" : self.gen_bitfield,
"cd" : self.gen_bitfield,
"rcode" : self.gen_rcode,
"qdcount" : self.gen_count,
"ancount" : self.gen_count,
"nscount" : self.gen_count,
"arcount" : self.gen_count
}
def get_bitfield(self, field):
""""""
return int(getattr(self.layer, field))
def set_bitfield(self, packet, field, value):
""""""
return setattr(self.layer, field, int(value))
def gen_bitfield(self, field):
""""""
return random.choice([0,1])
def gen_id(self, field):
return random.randint(0, 65535)
def gen_opcode(self, field):
return random.randint(0, 15)
def gen_rcode(self, field):
return random.randint(0, 15)
def gen_count(self, field):
return random.randint(0, 65535)
@staticmethod
def dns_decompress(packet, logger):
"""
Performs DNS decompression on the given scapy packet, if applicable.
Note that DNS compression/decompression must be done on the boundaries
of a label, so DNS compression does not support arbitrary offsets.
"""
# If this is a TCP packet
if packet.haslayer("TCP"):
raise NotImplementedError
# Perform no action if this is not a DNS or DNSRQ packet
if not packet.haslayer("DNS") or not packet.haslayer("DNSQR"):
return packet
# Extract the query from the DNSQR layer
query = packet["DNSQR"].qname.decode()
if query[len(query) - 1] != '.':
query += '.'
# Split the query by label
labels = query.split(".")
# Collect the first and second half of the query
fhalf = labels[0]
shalf = ".".join(labels[1:])
# Build the first DNS query directly. The format of this a byte string like this:
# b'\x07minghui\xc0\x1a\x00\x01\x00\x01'
# \x07 = the length of the label in this DNSQR
# minghui = the portion of the domain we will request in the first DNSQR
# \xc0\x1a = offset into the DNS packet where the rest of the query will be. The actual offset
# here is the \x1a - DNS mandates that if compression is used, the first two bits be 11
# to differentiate them from the rest. \x1A = 26, which is the length of the DNS header
# plus the length of this DNSQR.
# \x00\x01 = type A record
# \x00\x01 = IN
length = bytes([len(fhalf)])
label = fhalf.encode()
# Since the domain will include an extra ".", add 1
# 2 * 6 is the DNS header
# 1 is the byte that determines the length of the label
# len(label) is the length of the label
# 2 is the offset pointer
# 4 - other record information (class, IN)
packet_offset = 2 * 6 + 1 + len(label) + 2 + 2 + 2
# The word must start with binary 11, so OR the offset with 0xC000.
offset = (0xc000 | packet_offset).to_bytes(2, byteorder='big')
request = b'\x00\x01\x00\x01'
dns_qr1 = length + label + offset + request
# Build the second DNS query directly. The format of the byte string is the same as above
# b'\x02ca\x00\x00\x01\x00\x01'
# \x02 = length of the remaining domain
# ca = portion of the domain in this DNSQR
# \x00 = null byte to signify the end of the query
# \x00\x01 = type A record
# \x00\x01 = IN
# Since the second half could potentially contain many labels, this is done in a list comprehension
dns_qr2 = b"".join([bytes([len(tld)]) + tld.encode() for tld in shalf.split(".")]) + b"\x00\x01\x00\x01"
# Next, we must rebuild the DNS packet itself. If we try to have scapy parse either dns_qr1 or dns_qr2, they
# will look malformed, since neither contains a complete request. Therefore, we must build the entire
# DNS packet at once. First, we must remove the original DNSQR, since this contains the original request
del packet["DNS"].qd
# Once the DNSQR is removed, scapy automatically sets the qdcount to 0. Adjust it to 2
packet["DNS"].qdcount = 2
# Extract the DNS header standalone now for building
dns_header = bytes(packet["DNS"])
dns_packet = DNS(dns_header + dns_qr1 + dns_qr2)
del packet["DNS"]
packet = packet / dns_packet
# Since the size and data of the packet have changed, force scapy to recalculate the important fields
# in below layers, if applicable
if packet.haslayer("IP"):
del packet["IP"].chksum
del packet["IP"].len
if packet.haslayer("UDP"):
del packet["UDP"].chksum
del packet["UDP"].len
return packet
class DNSQRLayer(Layer):
"""
Defines an interface to access DNSQR header fields.
"""
name = "DNSQR"
protocol = DNSQR
_fields = [
"qname",
"qtype",
"qclass"
]
fields = _fields
def __init__(self, layer):
"""
Initializes the DNS layer.
"""
Layer.__init__(self, layer)
self.getters = {
"qname" : self.get_qname
}
self.generators = {
"qname" : self.gen_qname
}
def get_qname(self, field):
"""
Returns decoded qname from packet.
"""
return self.layer.qname.decode('utf-8')
def gen_qname(self, field):
"""
Generates domain name.
"""
return "example.com."
@classmethod
def name_matches(cls, name):
"""
Scapy returns the name of DNSQR as _both_ DNSQR and "DNS Question Record",
which breaks parsing. Override the name_matches method to handle that case
here.
"""
return name.upper() in ["DNSQR", "DNS QUESTION RECORD"]

View File

@ -1,7 +1,7 @@
import threading
import os
import actions.packet
import layers.packet
from scapy.all import sniff
from scapy.utils import PcapWriter
@ -33,7 +33,7 @@ class Sniffer():
This callback is called whenever a packet is applied.
Returns true if it should finish, otherwise, returns false.
"""
packet = actions.packet.Packet(scapy_packet)
packet = layers.packet.Packet(scapy_packet)
for proto in ["TCP", "UDP"]:
if(packet.haslayer(proto) and ((packet[proto].sport == self.port) or (packet[proto].dport == self.port))):
break

View File

@ -12,7 +12,7 @@ modifications (particularly header modifications). It supports the following pri
from actions.action import Action
import actions.utils
from actions.layer import DNSLayer
from layers.dns_layer import DNSLayer
import random

View File

@ -1,6 +1,7 @@
import actions.utils
import random
import re
import layers.packet
FIXED_TRIGGER = None
GAS_ENABLED = True
@ -177,7 +178,7 @@ class Trigger(object):
value = m.group(3)
# Parse out the given value if necessary
value = actions.packet.Packet.parse(proto, field, value)
value = layers.packet.Packet.parse(proto, field, value)
# Trigger gas is set to None if it is disabled
trigger_gas = None

View File

@ -12,7 +12,7 @@ import urllib.parse
import actions.action
import actions.trigger
import actions.packet
import layers.packet
import plugins.plugin_client
import plugins.plugin_server
@ -309,7 +309,7 @@ def get_from_fuzzed_or_real_packet(environment_id, real_packet_probability, enab
if packets and random.random() < real_packet_probability:
packet = random.choice(packets)
return packet.get_random()
return actions.packet.Packet().gen_random()
return layers.packet.Packet().gen_random()
def read_packets(environment_id):
@ -327,7 +327,7 @@ def read_packets(environment_id):
parsed = []
try:
packets = rdpcap(packets_path)
parsed = [actions.packet.Packet(p) for p in packets]
parsed = [layers.packet.Packet(p) for p in packets]
except Exception as e:
print(e)
print("FAILED TO PARSE!")

View File

@ -4,7 +4,7 @@ import logging
import random
import os
import actions.packet
import layers.packet
import actions.utils
# Squelch annoying scapy ::1 runtime errors
@ -104,7 +104,7 @@ class Censor(object):
Sends a packet with scapy.
"""
if "TCP" in packet:
self.logger.debug(actions.packet.Packet._str_packet(packet))
self.logger.debug(layers.packet.Packet._str_packet(packet))
send(packet, verbose=False)
return

View File

@ -8,7 +8,7 @@ drops all packets after a TCP forbidden keyword is detected.
"""
import logging
import actions.packet
import layers.packet
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -33,7 +33,7 @@ class Censor1(Censor):
Check if the censor should run against this packet. Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if self.drop_all_from == packet["IP"].src:
self.logger.debug("Dropping all from this IP %s..." % self.drop_all_from)
return True
@ -45,20 +45,20 @@ class Censor1(Censor):
# Initial TCP synchronization
if packet["TCP"].sprintf('%TCP.flags%') == "S":
self.tcb = packet["TCP"].seq + 1
self.logger.debug(("Synchronizing TCB (%d) on packet " + actions.packet.Packet._str_packet(packet)) % self.tcb)
self.logger.debug(("Synchronizing TCB (%d) on packet " + layers.packet.Packet._str_packet(packet)) % self.tcb)
return False
# If we're tracking this packet stream
if packet["TCP"].seq == self.tcb:
self.tcb += len(self.get_payload(packet))
else:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -9,7 +9,7 @@ work.
"""
import netifaces
import actions.packet
import layers.packet
from censors.censor import Censor
from scapy.all import raw, IP, TCP
@ -33,7 +33,7 @@ class Censor10(Censor):
Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: %s" % layers.packet.Packet._str_packet(packet))
if packet["IP"].src in self.flagged_ips:
self.logger.debug("Content from a flagged IP detected %s..." % packet["IP"].src)
return True
@ -65,7 +65,7 @@ class Censor10(Censor):
if not tcb:
tcb = self.get_partial_tcb(packet)
if tcb is None:
self.logger.debug("Making a new TCB for packet %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Making a new TCB for packet %s" % layers.packet.Packet._str_packet(packet))
tcb = {}
tcb["src"] = packet["IP"].src
@ -81,13 +81,13 @@ class Censor10(Censor):
self.tcbs.append(tcb)
self.resynchronize[(tcb["src"], tcb["dst"], tcb["sport"], tcb["dport"])] = False
self.logger.debug("Synchronizing a TCB (%s) on packet %s " % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Synchronizing a TCB (%s) on packet %s " % (str(tcb), layers.packet.Packet._str_packet(packet)))
return False
# If connection is getting torn down
elif tcb and packet["TCP"].sprintf('%TCP.flags%') in ["R", "F"]:
self.resynchronize[(tcb["src"], tcb["dst"], tcb["sport"], tcb["dport"])] = True
self.logger.debug(("Entering resynchronization state on packet %s" % actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Entering resynchronization state on packet %s" % layers.packet.Packet._str_packet(packet)))
if not tcb:
self.logger.debug("No TCB matches packet.")
@ -99,7 +99,7 @@ class Censor10(Censor):
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: %s" % layers.packet.Packet._str_packet(packet))
return True
return False
@ -150,7 +150,7 @@ class Censor10(Censor):
Checks if the packet matches the stored TCB.
"""
for tcb in self.tcbs:
self.logger.debug("Checking %s against packet %s" % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Checking %s against packet %s" % (str(tcb), layers.packet.Packet._str_packet(packet)))
if (packet["IP"].src == tcb["src"] and \
packet["IP"].dst == tcb["dst"] and \
@ -166,7 +166,7 @@ class Censor10(Censor):
are correct.
"""
for tcb in self.tcbs:
self.logger.debug("Checking %s against packet %s for partial match" % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Checking %s against packet %s for partial match" % (str(tcb), layers.packet.Packet._str_packet(packet)))
if (packet["IP"].src == tcb["src"] and \
packet["IP"].dst == tcb["dst"] and \

View File

@ -9,7 +9,7 @@ work. It also resynchronizes on both S and ACK, defeating strategies that trigge
before the 3-way handshake has finished.
"""
import actions.packet
import layers.packet
import netifaces
from censors.censor import Censor
from scapy.all import raw, IP, TCP
@ -33,7 +33,7 @@ class Censor11(Censor):
Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: %s" % layers.packet.Packet._str_packet(packet))
if packet["IP"].src in self.flagged_ips:
self.logger.debug("Content from a flagged IP detected %s..." % packet["IP"].src)
return True
@ -65,7 +65,7 @@ class Censor11(Censor):
# so we can just replace that tcb with updated info
tcb = self.get_partial_tcb(packet)
if tcb is None:
self.logger.debug("Making a new TCB for packet %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Making a new TCB for packet %s" % layers.packet.Packet._str_packet(packet))
tcb = {}
tcb["src"] = packet["IP"].src
@ -81,13 +81,13 @@ class Censor11(Censor):
self.tcbs.append(tcb)
self.resynchronize[(tcb["src"], tcb["dst"], tcb["sport"], tcb["dport"])] = False
self.logger.debug("Synchronizing a TCB (%s) on packet %s " % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Synchronizing a TCB (%s) on packet %s " % (str(tcb), layers.packet.Packet._str_packet(packet)))
return False
# If connection is getting torn down
elif tcb and packet["TCP"].sprintf('%TCP.flags%') in ["R", "F", "RA", "FA"]:
self.resynchronize[(tcb["src"], tcb["dst"], tcb["sport"], tcb["dport"])] = True
self.logger.debug(("Entering resynchronization state on packet %s" % actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Entering resynchronization state on packet %s" % layers.packet.Packet._str_packet(packet)))
if not tcb:
self.logger.debug("No TCB matches packet.")
@ -99,7 +99,7 @@ class Censor11(Censor):
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: %s" % layers.packet.Packet._str_packet(packet))
return True
return False
@ -150,7 +150,7 @@ class Censor11(Censor):
Checks if the packet matches the stored TCB.
"""
for tcb in self.tcbs:
self.logger.debug("Checking %s against packet %s" % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Checking %s against packet %s" % (str(tcb), layers.packet.Packet._str_packet(packet)))
if (packet["IP"].src == tcb["src"] and \
packet["IP"].dst == tcb["dst"] and \
@ -166,7 +166,7 @@ class Censor11(Censor):
are correct.
"""
for tcb in self.tcbs:
self.logger.debug("Checking %s against packet %s for partial match" % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Checking %s against packet %s for partial match" % (str(tcb), layers.packet.Packet._str_packet(packet)))
if (packet["IP"].src == tcb["src"] and \
packet["IP"].dst == tcb["dst"] and \

View File

@ -6,7 +6,7 @@ Designed to be run by the evaluator.
TCP Censor that synchronizes on first SYN only, works 100% of the time, sends 5 RSTs to client.
"""
import actions.packet
import layers.packet
import logging
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -26,7 +26,7 @@ class Censor2(Censor):
Check if the censor should run against this packet. Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: %s", actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: %s", layers.packet.Packet._str_packet(packet))
# Only censor TCP packets for now
if "TCP" not in packet:
@ -34,19 +34,19 @@ class Censor2(Censor):
if packet["TCP"].sprintf('%TCP.flags%') == "S":
self.tcb = packet["TCP"].seq + 1
self.logger.debug("Synchronizing TCB on packet: %s", actions.packet.Packet._str_packet(packet))
self.logger.debug("Synchronizing TCB on packet: %s", layers.packet.Packet._str_packet(packet))
return False
if packet["TCP"].seq == self.tcb:
self.tcb += len(self.get_payload(packet))
else:
self.logger.debug("Ignoring packet: %s", actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: %s", layers.packet.Packet._str_packet(packet))
return False
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: %s", actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: %s", layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -9,7 +9,7 @@ server AND client.
import logging
import netifaces
import actions.packet
import layers.packet
# Disable scapy ::1 warnings
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
@ -42,25 +42,25 @@ class Censor3(Censor):
self.num += 1
# Only censor TCP packets for now
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if "TCP" not in packet:
return False
if packet["TCP"].sprintf('%TCP.flags%') == "S":
self.tcb = packet["TCP"].seq + 1
self.logger.debug("Synchronizing TCB on packet " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Synchronizing TCB on packet " + layers.packet.Packet._str_packet(packet))
return False
if packet["TCP"].seq == self.tcb:
self.tcb += len(self.get_payload(packet))
else:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -4,7 +4,7 @@ Censor 4
Dropping censor that synchronizes TCB on all SYN and ACK packets.
"""
import actions.packet
import layers.packet
import logging
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -28,7 +28,7 @@ class Censor4(Censor):
self.num += 1
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if self.drop_all_from == packet["IP"].src:
self.logger.debug("Dropping all from this IP %s..." % self.drop_all_from)
return True
@ -40,25 +40,25 @@ class Censor4(Censor):
# Initial TCP synchronization
if "S" == packet["TCP"].sprintf('%TCP.flags%'):
self.tcb = packet["TCP"].seq + 1
self.logger.debug(("Synchronizing TCB (%d) on S packet " + actions.packet.Packet._str_packet(packet)) % self.tcb)
self.logger.debug(("Synchronizing TCB (%d) on S packet " + layers.packet.Packet._str_packet(packet)) % self.tcb)
return False
if "A" == packet["TCP"].sprintf('%TCP.flags%'):
self.tcb = packet["TCP"].seq
self.logger.debug(("Synchronizing TCB (%d) on A packet " + actions.packet.Packet._str_packet(packet)) % self.tcb)
self.logger.debug(("Synchronizing TCB (%d) on A packet " + layers.packet.Packet._str_packet(packet)) % self.tcb)
return False
# If we're tracking this packet stream
if packet["TCP"].seq == self.tcb:
self.tcb += len(self.get_payload(packet))
else:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -7,7 +7,7 @@ TCP Censor that synchronizes on first SYN only, works 100% of the time, sends 5
the server only.
"""
import actions.packet
import layers.packet
import logging
import netifaces
# Disable scapy ::1 warnings
@ -42,25 +42,25 @@ class Censor5(Censor):
self.num += 1
# Only censor TCP packets for now
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if "TCP" not in packet:
return False
if packet["TCP"].sprintf('%TCP.flags%') == "S":
self.tcb = packet["TCP"].seq + 1
self.logger.debug("Synchronizing TCB on packet " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Synchronizing TCB on packet " + layers.packet.Packet._str_packet(packet))
return False
if packet["TCP"].seq == self.tcb:
self.tcb += len(self.get_payload(packet))
else:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -4,7 +4,7 @@ FIN or RST packet.
Does not check if the ports are correct for the FIN/RST.
"""
import actions.packet
import layers.packet
import logging
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -24,7 +24,7 @@ class Censor6(Censor):
Check if the censor should run against this packet. Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if self.drop_all_from == packet["IP"].src:
self.logger.debug("Dropping all from this IP %s..." % self.drop_all_from)
return True
@ -36,17 +36,17 @@ class Censor6(Censor):
# Some stacks send RA to tear down a connection
if packet["TCP"].sprintf('%TCP.flags%') in ["R", "RA", "F"]:
self.tcb = None
self.logger.debug(("Tearing down TCB on packet " + actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Tearing down TCB on packet " + layers.packet.Packet._str_packet(packet)))
return False
if self.tcb is None:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -5,7 +5,7 @@ Does not check if the SEQ/ACK are in window for the FIN/RST.
"""
import logging
import actions.packet
import layers.packet
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -24,7 +24,7 @@ class Censor7(Censor):
Check if the censor should run against this packet. Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if self.drop_all_from == packet["IP"].src:
self.logger.debug("Dropping all from this IP %s..." % self.drop_all_from)
return True
@ -43,7 +43,7 @@ class Censor7(Censor):
packet["TCP"].sport in self.tcb["ports"]:
self.tcb = None
self.logger.debug(("Tearing down TCB on packet " + actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Tearing down TCB on packet " + layers.packet.Packet._str_packet(packet)))
return False
elif not self.tcb and self.tcb is not None:
@ -51,13 +51,13 @@ class Censor7(Censor):
self.tcb["ports"] = [packet["TCP"].sport, packet["TCP"].dport]
if self.tcb is None:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -4,7 +4,7 @@ if the full tuple of the TCB matches (src, dst, sport, dport, seq).
"""
import logging
import actions.packet
import layers.packet
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -23,7 +23,7 @@ class Censor8(Censor):
Check if the censor should run against this packet. Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if self.drop_all_from == packet["IP"].src:
self.logger.debug("Dropping all from this IP %s..." % self.drop_all_from)
return True
@ -36,7 +36,7 @@ class Censor8(Censor):
self.tcb["ips"] = [packet["IP"].src, packet["IP"].dst]
self.tcb["ports"] = [packet["TCP"].sport, packet["TCP"].dport]
self.tcb["seq"] = packet["TCP"].seq + 1
self.logger.debug("Synchronizing TCB on packet " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Synchronizing TCB on packet " + layers.packet.Packet._str_packet(packet))
return False
# TCB teardown
elif packet["TCP"].sprintf('%TCP.flags%') == "R" or packet["TCP"].sprintf('%TCP.flags%') == "F":
@ -49,11 +49,11 @@ class Censor8(Censor):
packet["TCP"].seq == self.tcb["seq"]:
self.tcb = None
self.logger.debug(("Tearing down TCB on packet " + actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Tearing down TCB on packet " + layers.packet.Packet._str_packet(packet)))
return False
if self.tcb is None:
self.logger.debug("Ignoring packet: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Ignoring packet: " + layers.packet.Packet._str_packet(packet))
return False
elif "seq" in self.tcb and packet["TCP"].seq == self.tcb["seq"]:
self.tcb["seq"] += len(self.get_payload(packet))
@ -61,7 +61,7 @@ class Censor8(Censor):
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -5,7 +5,7 @@ state if a RST or FIN, it simply tears down. It creates new TCBs for connections
yet aware of, but does not check the checksums of incoming packets.
"""
import actions.packet
import layers.packet
import netifaces
from censors.censor import Censor
from scapy.all import IP, TCP
@ -28,7 +28,7 @@ class Censor8b(Censor):
Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: %s" % layers.packet.Packet._str_packet(packet))
if packet["IP"].src in self.flagged_ips:
self.logger.debug("Content from a flagged IP detected %s..." % packet["IP"].src)
return True
@ -49,7 +49,7 @@ class Censor8b(Censor):
# so we can just replace that tcb with updated info
tcb = self.get_partial_tcb(packet)
if tcb is None:
self.logger.debug("Making a new TCB for packet %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Making a new TCB for packet %s" % layers.packet.Packet._str_packet(packet))
tcb = {}
tcb["src"] = packet["IP"].src
@ -64,12 +64,12 @@ class Censor8b(Censor):
tcb["seq"] += len(self.get_payload(packet))
self.tcbs.append(tcb)
self.logger.debug("Synchronizing a TCB (%s) on packet %s " % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Synchronizing a TCB (%s) on packet %s " % (str(tcb), layers.packet.Packet._str_packet(packet)))
return False
# If connection is getting torn down
elif tcb and packet["TCP"].sprintf('%TCP.flags%') in ["R", "RA"]:
self.tcbs.remove(tcb)
self.logger.debug(("Deleting TCB for packet %s" % actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Deleting TCB for packet %s" % layers.packet.Packet._str_packet(packet)))
return False
if not tcb:
@ -82,7 +82,7 @@ class Censor8b(Censor):
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: %s" % actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: %s" % layers.packet.Packet._str_packet(packet))
return True
return False
@ -133,7 +133,7 @@ class Censor8b(Censor):
Checks if the packet matches the stored TCB.
"""
for tcb in self.tcbs:
self.logger.debug("Checking %s against packet %s" % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Checking %s against packet %s" % (str(tcb), layers.packet.Packet._str_packet(packet)))
if (packet["IP"].src == tcb["src"] and \
packet["IP"].dst == tcb["dst"] and \
@ -149,7 +149,7 @@ class Censor8b(Censor):
are correct.
"""
for tcb in self.tcbs:
self.logger.debug("Checking %s against packet %s for partial match" % (str(tcb), actions.packet.Packet._str_packet(packet)))
self.logger.debug("Checking %s against packet %s for partial match" % (str(tcb), layers.packet.Packet._str_packet(packet)))
if (packet["IP"].src == tcb["src"] and \
packet["IP"].dst == tcb["dst"] and \

View File

@ -7,7 +7,7 @@ More closely mimics GFW behavior.
"""
import logging
import actions.packet
import layers.packet
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import IP, TCP
@ -27,7 +27,7 @@ class Censor9(Censor):
Check if the censor should run against this packet. Returns true or false.
"""
try:
self.logger.debug("Inbound packet to censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Inbound packet to censor: " + layers.packet.Packet._str_packet(packet))
if self.drop_all_from == packet["IP"].src:
self.logger.debug("Dropping all from this IP %s..." % self.drop_all_from)
return True
@ -51,7 +51,7 @@ class Censor9(Censor):
self.tcb["seq"] += len(self.get_payload(packet))
self.resynchronize = False
self.logger.debug("Synchronizing TCB on packet " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Synchronizing TCB on packet " + layers.packet.Packet._str_packet(packet))
return self.check_forbidden(packet)
# If connection is getting torn down
@ -59,7 +59,7 @@ class Censor9(Censor):
(packet["TCP"].sprintf('%TCP.flags%') == "R" or \
packet["TCP"].sprintf('%TCP.flags%') == "F"):
self.resynchronize = True
self.logger.debug(("Entering resynchronization state on packet " + actions.packet.Packet._str_packet(packet)))
self.logger.debug(("Entering resynchronization state on packet " + layers.packet.Packet._str_packet(packet)))
if not self.tcb_matches(packet):
self.logger.debug("TCB does not match packet.")
@ -89,7 +89,7 @@ class Censor9(Censor):
# Check if any forbidden words appear in the packet payload
for keyword in self.forbidden:
if keyword in self.get_payload(packet):
self.logger.debug("Packet triggered censor: " + actions.packet.Packet._str_packet(packet))
self.logger.debug("Packet triggered censor: " + layers.packet.Packet._str_packet(packet))
return True
return False

View File

@ -1,4 +1,4 @@
geneva.actions.layer
geneva.layers.layer
====================
.. automodule:: layer

View File

@ -1,4 +1,4 @@
geneva.actions.packet
geneva.layers.packet
=====================
.. automodule:: packet

View File

@ -25,7 +25,7 @@ from scapy.config import conf
socket.setdefaulttimeout(1)
import actions.packet
import layers.packet
import actions.strategy
import actions.utils
@ -141,10 +141,10 @@ class Engine():
configured route, and clears the checksums for recalculating
Args:
packet (Actions.packet.Packet): packet to modify before sending
packet (layers.packet.Packet): packet to modify before sending
Returns:
Actions.packet.Packet: the modified packet
layers.packet.Packet: the modified packet
"""
if packet["IP"].src == self.sender_ip:
packet["IP"].dst = self.forward_ip
@ -329,7 +329,7 @@ class Engine():
if not self.running_nfqueue:
return
packet = actions.packet.Packet(IP(nfpacket.get_payload()))
packet = layers.packet.Packet(IP(nfpacket.get_payload()))
self.logger.debug("Received outbound packet %s", str(packet))
# Record this packet for a .pacp later
@ -365,7 +365,7 @@ class Engine():
"""
if not self.running_nfqueue:
return
packet = actions.packet.Packet(IP(nfpacket.get_payload()))
packet = layers.packet.Packet(IP(nfpacket.get_payload()))
if self.save_seen_packets:
self.seen_packets.append(packet)

View File

@ -16,6 +16,7 @@ import actions.strategy
import actions.tree
import actions.trigger
import evaluator
import layers.packet
# Grab the terminal size for printing
try:
@ -715,7 +716,7 @@ def restrict_headers(logger, protos, filter_fields, disabled_fields):
if disabled_fields:
disabled_fields = disabled_fields.split(",")
actions.packet.Packet.restrict_fields(logger, protos, filter_fields, disabled_fields)
layers.packet.Packet.restrict_fields(logger, protos, filter_fields, disabled_fields)
def driver(cmd):

189
layers/dns_layer.py Normal file
View File

@ -0,0 +1,189 @@
import random
from layers.layer import Layer
from scapy.all import DNS
class DNSLayer(Layer):
"""
Defines an interface to access DNS header fields.
"""
name = "DNS"
protocol = DNS
_fields = [
"id",
"qr",
"opcode",
"aa",
"tc",
"rd",
"ra",
"z",
"ad",
"cd",
"qd",
"rcode",
"qdcount",
"ancount",
"nscount",
"arcount"
]
fields = _fields
def __init__(self, layer):
"""
Initializes the DNS layer.
"""
Layer.__init__(self, layer)
self.getters = {
"qr" : self.get_bitfield,
"aa" : self.get_bitfield,
"tc" : self.get_bitfield,
"rd" : self.get_bitfield,
"ra" : self.get_bitfield,
"z" : self.get_bitfield,
"ad" : self.get_bitfield,
"cd" : self.get_bitfield
}
self.setters = {
"qr" : self.set_bitfield,
"aa" : self.set_bitfield,
"tc" : self.set_bitfield,
"rd" : self.set_bitfield,
"ra" : self.set_bitfield,
"z" : self.set_bitfield,
"ad" : self.set_bitfield,
"cd" : self.set_bitfield
}
self.generators = {
"id" : self.gen_id,
"qr" : self.gen_bitfield,
"opcode" : self.gen_opcode,
"aa" : self.gen_bitfield,
"tc" : self.gen_bitfield,
"rd" : self.gen_bitfield,
"ra" : self.gen_bitfield,
"z" : self.gen_bitfield,
"ad" : self.gen_bitfield,
"cd" : self.gen_bitfield,
"rcode" : self.gen_rcode,
"qdcount" : self.gen_count,
"ancount" : self.gen_count,
"nscount" : self.gen_count,
"arcount" : self.gen_count
}
def get_bitfield(self, field):
""""""
return int(getattr(self.layer, field))
def set_bitfield(self, packet, field, value):
""""""
return setattr(self.layer, field, int(value))
def gen_bitfield(self, field):
""""""
return random.choice([0,1])
def gen_id(self, field):
return random.randint(0, 65535)
def gen_opcode(self, field):
return random.randint(0, 15)
def gen_rcode(self, field):
return random.randint(0, 15)
def gen_count(self, field):
return random.randint(0, 65535)
@staticmethod
def dns_decompress(packet, logger):
"""
Performs DNS decompression on the given scapy packet, if applicable.
Note that DNS compression/decompression must be done on the boundaries
of a label, so DNS compression does not support arbitrary offsets.
"""
# If this is a TCP packet
if packet.haslayer("TCP"):
raise NotImplementedError
# Perform no action if this is not a DNS or DNSRQ packet
if not packet.haslayer("DNS") or not packet.haslayer("DNSQR"):
return packet
# Extract the query from the DNSQR layer
query = packet["DNSQR"].qname.decode()
if query[len(query) - 1] != '.':
query += '.'
# Split the query by label
labels = query.split(".")
# Collect the first and second half of the query
fhalf = labels[0]
shalf = ".".join(labels[1:])
# Build the first DNS query directly. The format of this a byte string like this:
# b'\x07minghui\xc0\x1a\x00\x01\x00\x01'
# \x07 = the length of the label in this DNSQR
# minghui = the portion of the domain we will request in the first DNSQR
# \xc0\x1a = offset into the DNS packet where the rest of the query will be. The actual offset
# here is the \x1a - DNS mandates that if compression is used, the first two bits be 11
# to differentiate them from the rest. \x1A = 26, which is the length of the DNS header
# plus the length of this DNSQR.
# \x00\x01 = type A record
# \x00\x01 = IN
length = bytes([len(fhalf)])
label = fhalf.encode()
# Since the domain will include an extra ".", add 1
# 2 * 6 is the DNS header
# 1 is the byte that determines the length of the label
# len(label) is the length of the label
# 2 is the offset pointer
# 4 - other record information (class, IN)
packet_offset = 2 * 6 + 1 + len(label) + 2 + 2 + 2
# The word must start with binary 11, so OR the offset with 0xC000.
offset = (0xc000 | packet_offset).to_bytes(2, byteorder='big')
request = b'\x00\x01\x00\x01'
dns_qr1 = length + label + offset + request
# Build the second DNS query directly. The format of the byte string is the same as above
# b'\x02ca\x00\x00\x01\x00\x01'
# \x02 = length of the remaining domain
# ca = portion of the domain in this DNSQR
# \x00 = null byte to signify the end of the query
# \x00\x01 = type A record
# \x00\x01 = IN
# Since the second half could potentially contain many labels, this is done in a list comprehension
dns_qr2 = b"".join([bytes([len(tld)]) + tld.encode() for tld in shalf.split(".")]) + b"\x00\x01\x00\x01"
# Next, we must rebuild the DNS packet itself. If we try to have scapy parse either dns_qr1 or dns_qr2, they
# will look malformed, since neither contains a complete request. Therefore, we must build the entire
# DNS packet at once. First, we must remove the original DNSQR, since this contains the original request
del packet["DNS"].qd
# Once the DNSQR is removed, scapy automatically sets the qdcount to 0. Adjust it to 2
packet["DNS"].qdcount = 2
# Extract the DNS header standalone now for building
dns_header = bytes(packet["DNS"])
dns_packet = DNS(dns_header + dns_qr1 + dns_qr2)
del packet["DNS"]
packet = packet / dns_packet
# Since the size and data of the packet have changed, force scapy to recalculate the important fields
# in below layers, if applicable
if packet.haslayer("IP"):
del packet["IP"].chksum
del packet["IP"].len
if packet.haslayer("UDP"):
del packet["UDP"].chksum
del packet["UDP"].len
return packet

48
layers/dnsqr_layer.py Normal file
View File

@ -0,0 +1,48 @@
from layers.layer import Layer
from scapy.all import DNSQR
class DNSQRLayer(Layer):
"""
Defines an interface to access DNSQR header fields.
"""
name = "DNSQR"
protocol = DNSQR
_fields = [
"qname",
"qtype",
"qclass"
]
fields = _fields
def __init__(self, layer):
"""
Initializes the DNS layer.
"""
Layer.__init__(self, layer)
self.getters = {
"qname" : self.get_qname
}
self.generators = {
"qname" : self.gen_qname
}
def get_qname(self, field):
"""
Returns decoded qname from packet.
"""
return self.layer.qname.decode('utf-8')
def gen_qname(self, field):
"""
Generates domain name.
"""
return "example.com."
@classmethod
def name_matches(cls, name):
"""
Scapy returns the name of DNSQR as _both_ DNSQR and "DNS Question Record",
which breaks parsing. Override the name_matches method to handle that case
here.
"""
return name.upper() in ["DNSQR", "DNS QUESTION RECORD"]

103
layers/ip_layer.py Normal file
View File

@ -0,0 +1,103 @@
import random
from layers.layer import Layer
from scapy.all import IP, fuzz, RandIP
class IPLayer(Layer):
"""
Defines an interface to access IP header fields.
"""
name = "IP"
protocol = IP
_fields = [
'version',
'ihl',
'tos',
'len',
'id',
'flags',
'frag',
'ttl',
'proto',
'chksum',
'src',
'dst',
'load'
]
fields = _fields
def __init__(self, layer):
"""
Initializes the IP layer.
"""
Layer.__init__(self, layer)
self.getters = {
"flags" : self.get_flags,
"load" : self.get_load
}
self.setters = {
"flags" : self.set_flags,
"load" : self.set_load
}
self.generators = {
"src" : self.gen_ip,
"dst" : self.gen_ip,
"chksum" : self.gen_chksum,
"len" : self.gen_len,
"load" : self.gen_load,
"flags" : self.gen_flags
}
def gen_len(self, field):
"""
Generates a valid IP length. Scapy breaks if the length is set to 0, so
return a random int starting at 1.
"""
return random.randint(1, 500)
def gen_chksum(self, field):
"""
Generates a checksum.
"""
return random.randint(1, 65535)
def gen_ip(self, field):
"""
Generates an IP address.
"""
return RandIP()._fix()
def get_flags(self, field):
"""
Retrieves flags as a string.
"""
return str(self.layer.flags)
def set_flags(self, packet, field, value):
"""
Sets the flags field. There is a bug in scapy, if you retrieve an empty
flags field, it will return "", but you cannot set this value back.
To reproduce this bug:
.. code-block:: python
>>> setattr(IP(), "flags", str(IP().flags)) # raises a ValueError
To handle this case, this method converts empty string to zero so that
it can be safely stored.
"""
if value == "":
value = 0
self.layer.flags = value
def gen_flags(self, field):
"""
Generates random valid flags.
"""
sample = fuzz(self.protocol())
# Since scapy lazily evaluates fuzzing, we first must set a
# legitimate value for scapy to evaluate what combination of flags it is
sample.flags = sample.flags
return str(sample.flags)

197
layers/layer.py Normal file
View File

@ -0,0 +1,197 @@
import binascii
import copy
import random
import string
import os
import urllib.parse
from scapy.all import IP, RandIP, UDP, DNS, DNSQR, Raw, TCP, fuzz
class Layer():
"""
Base class defining a Geneva packet layer.
"""
protocol = None
def __init__(self, layer):
"""
Initializes this layer.
"""
self.layer = layer
# No custom setter, getters, generators, or parsers are needed by default
self.setters = {}
self.getters = {}
self.generators = {}
self.parsers = {}
@classmethod
def reset_restrictions(cls):
"""
Resets field restrictions placed on this layer.
"""
cls.fields = cls._fields
def get_next_layer(self):
"""
Given the current layer returns the next layer beneath us.
"""
if len(self.layer.layers()) == 1:
return None
return self.layer[1]
def get_random(self):
"""
Retreives a random field and value.
"""
field = random.choice(self.fields)
return field, self.get(field)
def gen_random(self):
"""
Generates a random field and value.
"""
assert self.fields, "Layer %s doesn't have any fields" % str(self)
field = random.choice(self.fields)
return field, self.gen(field)
@classmethod
def name_matches(cls, name):
"""
Checks if given name matches this layer name.
"""
return name.upper() == cls.name.upper()
def get(self, field):
"""
Retrieves the value from a given field.
"""
assert field in self.fields
if field in self.getters:
return self.getters[field](field)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
base = field.split("-")[0]
if "-" in field and base in self.getters:
return self.getters[base](field)
return getattr(self.layer, field)
def set(self, packet, field, value):
"""
Sets the value for a given field.
"""
assert field in self.fields
base = field.split("-")[0]
if field in self.setters:
self.setters[field](packet, field, value)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
elif "-" in field and base in self.setters:
self.setters[base](packet, field, value)
else:
setattr(self.layer, field, value)
# Request the packet be reparsed to confirm the value is stable
# XXX Temporarily disabling the reconstitution check due to scapy bug (#2034)
#assert bytes(self.protocol(bytes(self.layer))) == bytes(self.layer)
def gen(self, field):
"""
Generates a value for this field.
"""
assert field in self.fields
if field in self.generators:
return self.generators[field](field)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
base = field.split("-")[0]
if "-" in field and base in self.generators:
return self.generators[base](field)
sample = fuzz(self.protocol())
new_value = getattr(sample, field)
if new_value == None:
new_value = 0
elif type(new_value) != int:
new_value = new_value._fix()
return new_value
def parse(self, field, value):
"""
Parses the given value for a given field. This is useful for fields whose
value cannot be represented in a string type easily - it lets us define
a common string representation for the strategy, and parse it back into
a real value here.
"""
assert field in self.fields
if field in self.parsers:
return self.parsers[field](field, value)
# Dual field accessors are fields that require two pieces of information
# to retrieve them (for example, "options-eol"). These are delimited by
# a dash "-".
base = field.split("-")[0]
if "-" in field and base in self.parsers:
return self.parsers[base](field, value)
try:
parsed = int(value)
except ValueError:
parsed = value
return parsed
def get_load(self, field):
"""
Helper method to retrieve load, as scapy doesn't recognize 'load' as
a regular field properly.
"""
try:
load = self.layer.payload.load
except AttributeError:
pass
try:
load = self.layer.load
except AttributeError:
return ""
if not load:
return ""
return urllib.parse.quote(load.decode('utf-8', 'ignore'))
def set_load(self, packet, field, value):
"""
Helper method to retrieve load, as scapy doesn't recognize 'load' as
a field properly.
"""
if packet.haslayer("IP"):
del packet["IP"].len
value = urllib.parse.unquote(value)
value = value.encode('utf-8')
dns_payload = b"\x009ib\x81\x80\x00\x01\x00\x01\x00\x00\x00\x01\x08faceface\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x01+\x00\x04\xc7\xbf2I\x00\x00)\x02\x00\x00\x00\x00\x00\x00\x00"
http_payload = b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n"
value = value.replace(b"__DNS_REQUEST__", dns_payload)
value = value.replace(b"__HTTP_REQUEST__", http_payload)
self.layer.payload = Raw(value)
def gen_load(self, field):
"""
Helper method to generate a random load, as scapy doesn't recognize 'load'
as a field properly.
"""
load = ''.join([random.choice(string.ascii_lowercase + string.digits) for k in range(10)])
return random.choice(["", "__DNS_REQUEST__", "__HTTP_REQUEST__", urllib.parse.quote(load)])

View File

@ -1,15 +1,19 @@
import copy
import random
import actions.layer
import layers.layer
import layers.ip_layer
import layers.tcp_layer
import layers.udp_layer
import layers.dns_layer
import layers.dnsqr_layer
_SUPPORTED_LAYERS = [
actions.layer.IPLayer,
actions.layer.TCPLayer,
actions.layer.UDPLayer,
actions.layer.DNSLayer,
actions.layer.DNSQRLayer
layers.ip_layer.IPLayer,
layers.tcp_layer.TCPLayer,
layers.udp_layer.UDPLayer,
layers.dns_layer.DNSLayer,
layers.dnsqr_layer.DNSQRLayer
]
SUPPORTED_LAYERS = _SUPPORTED_LAYERS
@ -326,6 +330,6 @@ class Packet():
"""
Performs DNS decompression, if applicable. Returns a new packet.
"""
self.packet = actions.layer.DNSLayer.dns_decompress(self.packet, logger)
self.packet = layers.dns_layer.DNSLayer.dns_decompress(self.packet, logger)
self.layers = self.setup_layers()
return self

10
layers/raw_layer.py Normal file
View File

@ -0,0 +1,10 @@
from layers.layer import Layer
class RawLayer(Layer):
"""
Defines an interface for the scapy Raw layer.
"""
name = "Raw"
protocol = Raw
_fields = []
fields = []

285
layers/tcp_layer.py Normal file
View File

@ -0,0 +1,285 @@
import random
import binascii
from layers.layer import Layer
from scapy.all import TCP, fuzz
class TCPLayer(Layer):
"""
Defines an interface to access TCP header fields.
"""
name = "TCP"
protocol = TCP
_fields = [
'sport',
'dport',
'seq',
'ack',
'dataofs',
'reserved',
'flags',
'window',
'chksum',
'urgptr',
'load',
'options-eol',
'options-nop',
'options-mss',
'options-wscale',
'options-sackok',
'options-sack',
'options-timestamp',
'options-altchksum',
'options-altchksumopt',
'options-md5header',
'options-uto'
]
fields = _fields
options_names = {
"eol": 0,
"nop": 1,
"mss": 2,
"wscale": 3,
"sackok": 4,
"sack": 5,
#"echo" : 6,
#"echo_reply" : 7,
"timestamp": 8,
"altchksum": 14,
"altchksumopt": 15,
"md5header": 19,
#"quick_start" : 27,
"uto": 28
#"authentication": 29,
#"experiment": 254
}
# Each entry is Kind: length
options_length = {
0: 0, # EOL
1: 0, # NOP
2: 2, # MSS
3: 1, # WScale
4: 0, # SAckOK
5: 0, # SAck
6: 4, # Echo
7: 4, # Echo Reply
8: 8, # Timestamp
14: 3, # AltChkSum
15: 0, # AltChkSumOpt
19: 16, # MD5header Option
27: 6, # Quick-Start response
28: 2, # User Timeout Option
29: 4, # TCP Authentication Option
254: 8, # Experiment
}
# Required by scapy
scapy_options = {
0: "EOL",
1: "NOP",
2: "MSS",
3: "WScale",
4: "SAckOK",
5: "SAck",
8: "Timestamp",
14: "AltChkSum",
15: "AltChkSumOpt",
28: "UTO",
# 254:"Experiment" # scapy has two versions of this, so it doesn't work
}
def __init__(self, layer):
"""
Initializes the TCP layer.
"""
Layer.__init__(self, layer)
# Special methods to help access fields that cannot be accessed normally
self.getters = {
'load' : self.get_load,
'options' : self.get_options
}
self.setters = {
'load' : self.set_load,
'options' : self.set_options
}
# Special methods to help generate fields that cannot be generated normally
self.generators = {
'load' : self.gen_load,
'dataofs' : self.gen_dataofs,
'flags' : self.gen_flags,
'chksum' : self.gen_chksum,
'options' : self.gen_options,
'window' : self.gen_window
}
def gen_window(self, field):
"""
Generates a window size.
"""
return random.choice(range(10, 200, 10))
def gen_chksum(self, field):
"""
Generates a checksum.
"""
return random.randint(1, 65535)
def gen_dataofs(self, field):
"""
Generates a valid value for the data offset field.
"""
# Dataofs is a 4 bit header, so a max of 15
return random.randint(1, 15)
def gen_flags(self, field):
"""
Generates a random set of flags. 50% of the time it picks randomly from
a list of real flags, otherwise it returns fuzzed flags.
"""
if random.random() < 0.5:
return random.choice(['S', 'A', 'SA', 'PA', 'FA', 'R', 'P', 'F', 'RA', ''])
else:
sample = fuzz(self.protocol())
# Since scapy lazily evaluates fuzzing, we first must set a
# legitimate value for scapy to evaluate what combination of flags it is
sample.flags = sample.flags
return str(sample.flags)
def get_options(self, field):
"""
Helper method to retrieve options.
"""
base, req_option = field.split("-")
assert base == "options", "get_options can only be used to fetch options."
option_type = self.option_str_to_int(req_option)
i = 0
# First, check if the option is already present in the packet
for option in self.layer.options:
# Scapy may try to be helpful and return the string of the option
next_option = self.option_str_to_int(option[0])
if option_type == next_option:
_name, value = self.layer.options[i]
# Some options (timestamp, checksums, nop) store their value in a
# tuple.
if isinstance(value, tuple):
# Scapy returns values in any of these types
if value in [None, b'', ()]:
return ''
value = value[0]
if value in [None, b'', ()]:
return ''
if req_option == "md5header":
return binascii.hexlify(value).decode("utf-8")
return value
i += 1
return ''
def set_options(self, packet, field, value):
"""
Helper method to set options.
"""
base, option = field.split("-")
assert base == "options", "Must use an options field with set_options"
option_type = self.option_str_to_int(option)
if type(value) == str:
# Prepare the value for storage in the packet
value = binascii.unhexlify(value)
# Scapy requires these options to be a tuple - since evaling this
# is not yet supported, for now, SAck will always be an empty tuple
if option in ["sack"]:
value = ()
# These options must be set as integers - if they didn't exist, they can
# be added like this
if option in ["timestamp", "mss", "wscale", "altchksum", "uto"] and not value:
value = 0
i = 0
# First, check if the option is already present in the packet
for option in self.layer.options:
# Scapy may try to be helpful and return the string of the option
next_option = self.option_str_to_int(option[0])
if option_type == next_option:
packet["TCP"].options[i] = self.format_option(option_type, value)
break
i += 1
# If we didn't break, the option doesn't exist in the packet currently.
else:
old_options_array = packet["TCP"].options
old_options_array.append(self.format_option(option_type, value))
packet["TCP"].options = old_options_array
# Let scapy recalculate the required values
del self.layer.chksum
del self.layer.dataofs
if packet.haslayer("IP"):
del packet["IP"].chksum
del packet["IP"].len
return True
def gen_options(self, field):
"""
Helper method to set options.
"""
_, option = field.split("-")
option_num = self.options_names[option]
length = self.options_length[option_num]
data = b''
if length > 0:
data = os.urandom(length)
data = binascii.hexlify(data).decode()
# MSS must be a 2-byte int
if option_num == 2:
data = random.randint(0, 65535)
# WScale must be a 1-byte int
elif option_num == 3:
data = random.randint(0, 255)
# Timestamp must be an int
elif option_num == 8:
data = random.randint(0, 4294967294)
elif option_num == 14:
data = random.randint(0, 255)
elif option_num == 28:
data = random.randint(0, 255)
return data
def option_str_to_int(self, option):
"""
Takes a string representation of an option and returns the option integer code.
"""
if type(option) == int:
return option
assert "-" not in option, "Must be given specific option: %s." % option
for val in self.scapy_options:
if self.scapy_options[val].lower() == option.lower():
return val
if " " in option:
option = option.replace(" ", "_").lower()
if option.lower() in self.options_names:
return self.options_names[option.lower()]
def format_option(self, options_int, value):
"""
Formats the options so they will work with scapy.
"""
# NOPs
if options_int == 1:
return (self.scapy_options[options_int], ())
elif options_int in [5]:
return (self.scapy_options[options_int], value)
# Timestamp
elif options_int in [8, 14]:
return (self.scapy_options[options_int], (value, 0))
elif options_int in self.scapy_options:
return (self.scapy_options[options_int], value)
else:
return (options_int, value)

32
layers/udp_layer.py Normal file
View File

@ -0,0 +1,32 @@
from layers.layer import Layer
from scapy.all import UDP
class UDPLayer(Layer):
"""
Defines an interface to access UDP header fields.
"""
name = "UDP"
protocol = UDP
_fields = [
"sport",
"dport",
"chksum",
"len",
"load"
]
fields = _fields
def __init__(self, layer):
"""
Initializes the UDP layer.
"""
Layer.__init__(self, layer)
self.getters = {
'load' : self.get_load,
}
self.setters = {
'load' : self.set_load,
}
self.generators = {
'load' : self.gen_load,
}

View File

@ -7,7 +7,7 @@ import os
import shutil
import evolve
import actions.packet
import layers.packet
def pytest_addoption(parser):
@ -39,7 +39,7 @@ def reset_packet_restrictions():
"""
Autouse feature to make sure tests have a clean slate for processing.
"""
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
@pytest.mark.tryfirst

View File

@ -3,7 +3,7 @@ import sys
sys.path.append("..")
import actions.duplicate
import actions.packet
import layers.packet
import actions.strategy
import actions.utils
import evolve
@ -18,7 +18,7 @@ def test_duplicate(logger):
duplicate = actions.duplicate.DuplicateAction()
assert str(duplicate) == "duplicate", "Duplicate returned incorrect string representation: %s" % str(duplicate)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet1, packet2 = duplicate.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
duplicate.mutate()

View File

@ -8,7 +8,7 @@ from scapy.all import *
BASEPATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEPATH)
import actions.packet
import layers.packet
import engine
def test_engine():
@ -55,7 +55,7 @@ def test_nat_unit():
"forward_ip": "3.3.3.3"
}
pkt = IP(src="1.1.1.1", dst="2.2.2.2")/TCP()/Raw("test")
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
eng = engine.Engine(80, "", forwarder=forwarder)
eng.do_nat(packet)
packet[IP].src == "2.2.2.2"

View File

@ -12,7 +12,7 @@ import common
import evolve
import evaluator
import actions.utils
import actions.packet
import layers.packet
from actions.tamper import TamperAction
from scapy.all import IP, TCP, UDP
import random
@ -50,7 +50,7 @@ def test_disable_single_action(logger):
"""
Tests disabling a single action
"""
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
try:
logger.setLevel("ERROR")
actions.action.ACTION_CACHE={}
@ -66,14 +66,14 @@ def test_disable_single_action(logger):
actions.action.ACTION_CACHE["in"] = {}
actions.action.ACTION_CACHE["out"] = {}
finally:
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
def test_disable_multiple_actions(logger):
"""
Tests disabling multiple actions
"""
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
try:
logger.setLevel("ERROR")
actions.action.ACTION_CACHE={}
@ -95,7 +95,7 @@ def test_disable_multiple_actions(logger):
actions.action.ACTION_CACHE["in"] = {}
actions.action.ACTION_CACHE["out"] = {}
finally:
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
def assert_only(ind, field):
@ -160,7 +160,7 @@ def test_disable_fields(logger, use_canary):
p.mutate(logger)
assert_only(p, "ack")
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
# Restrict evolve to using NOT the dataofs or chksum field in the TCP header
evolve.restrict_headers(logger, "TCP,UDP", "", "dataofs,chksum",)
@ -194,7 +194,7 @@ def test_disable_fields(logger, use_canary):
assert_not(p, ["dataofs", "chksum"])
finally:
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
@pytest.mark.parametrize("use_canary", [True, False], ids=["with_canary", "without_canary"])
@ -220,7 +220,7 @@ def test_population_pool(logger, use_canary):
tester = evaluator.Evaluator(cmd, logger)
canary_id = evolve.run_collection_phase(logger, tester)
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
population = []
print("Generating population pool")
# Generate random strategies to initialize the population
@ -237,7 +237,7 @@ def test_population_pool(logger, use_canary):
IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x4444),
IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x8888)
]
packets = [actions.packet.Packet(packet) for packet in packets]
packets = [layers.packet.Packet(packet) for packet in packets]
for generation in range(0, 20):
print("Starting fake generation %d" % generation)
for ind in population:
@ -249,7 +249,7 @@ def test_population_pool(logger, use_canary):
print(str(ind))
print(packet)
packet.show()
print(actions.packet.SUPPORTED_LAYERS)
print(layers.packet.SUPPORTED_LAYERS)
raise
for p in population:
try:
@ -305,7 +305,7 @@ def test_mutation(logger):
Tests mutation.
"""
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
population = [actions.utils.parse("[TCP:flags:PA]-| \/", logger)]
population[0].in_enabled = False
assert population

View File

@ -5,7 +5,7 @@ import sys
sys.path.append("..")
import actions.fragment
import actions.packet
import layers.packet
import actions.strategy
import actions.utils
import evolve
@ -22,7 +22,7 @@ def test_segment(logger):
fragment = actions.fragment.FragmentAction(correct_order=True)
assert str(fragment) == "fragment{tcp:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -38,7 +38,7 @@ def test_segment_wrap(logger):
fragment = actions.fragment.FragmentAction(correct_order=True)
assert str(fragment) == "fragment{tcp:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet["TCP"].seq = MAX_UINT-1
packet1, packet2 = fragment.run(packet, logger)
@ -57,7 +57,7 @@ def test_segment_wrap2(logger):
fragment = actions.fragment.FragmentAction(correct_order=True)
assert str(fragment) == "fragment{tcp:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet["TCP"].seq = MAX_UINT
packet1, packet2 = fragment.run(packet, logger)
@ -77,7 +77,7 @@ def test_segment_wrap3(logger):
fragment = actions.fragment.FragmentAction(correct_order=True)
assert str(fragment) == "fragment{tcp:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet["TCP"].seq = MAX_UINT-2
packet1, packet2 = fragment.run(packet, logger)
@ -97,7 +97,7 @@ def test_segment_reverse(logger):
fragment = actions.fragment.FragmentAction(correct_order=False)
assert str(fragment) == "fragment{tcp:-1:False}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -114,7 +114,7 @@ def test_odd_fragment(logger):
fragment = actions.fragment.FragmentAction(correct_order=True, segment=False)
assert str(fragment) == "fragment{ip:-1:True}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("dataisodd"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("dataisodd"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -132,7 +132,7 @@ def test_custom_fragment(logger):
fragment = actions.fragment.FragmentAction(correct_order=True, fragsize=3, segment=False)
assert str(fragment) == "fragment{ip:3:True}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("thisissomedata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("thisissomedata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -149,7 +149,7 @@ def test_reverse_fragment(logger):
fragment = actions.fragment.FragmentAction(correct_order=False, fragsize=3, segment=False)
assert str(fragment) == "fragment{ip:3:False}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("thisissomedata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("thisissomedata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -166,7 +166,7 @@ def test_udp_fragment(logger):
fragment = actions.fragment.FragmentAction(correct_order=False, fragsize=2, segment=False)
assert str(fragment) == "fragment{ip:2:False}", "Fragment returned incorrect string representation: %s" % str(fragment)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -183,7 +183,7 @@ def test_mutate(logger):
for _ in range(0, 200):
fragment.mutate()
fragment.parse(str(fragment), logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet1, packet2 = fragment.run(packet, logger)
@ -211,16 +211,16 @@ def test_parse(logger):
fragment = actions.fragment.FragmentAction()
assert fragment.correct_order in [True, False]
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
strat = actions.utils.parse("[IP:proto:6:0]-tamper{IP:proto:replace:6}(fragment{ip:-1:True}(tamper{TCP:dataofs:replace:8}(duplicate,),tamper{IP:frag:replace:0}),)-| [IP:tos:0:0]-duplicate-| \/", logger)
strat.act_on_packet(packet, logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x4444))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/UDP(sport=2222, dport=3333, chksum=0x4444))
strat = actions.utils.parse("[IP:proto:6:0]-tamper{IP:proto:replace:6}(fragment{ip:-1:True}(tamper{TCP:dataofs:replace:8}(duplicate,),tamper{IP:frag:replace:0}),)-| [IP:tos:0:0]-duplicate-| \/", logger)
strat.act_on_packet(packet, logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, chksum=0x4444))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, chksum=0x4444))
strat = actions.utils.parse("[TCP:urgptr:0]-tamper{TCP:options-altchksumopt:corrupt}(fragment{tcp:-1:True}(tamper{IP:proto:corrupt},tamper{TCP:seq:replace:654077552}),)-| \/", logger)
strat.act_on_packet(packet, logger)
@ -245,25 +245,25 @@ def test_fallback(logger):
assert str(fragment) == "fragment{ip:2:False}", "Fragment returned incorrect string representation: %s" % str(fragment)
fragment.parse("fragment{ip:0:False}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
assert str(packet1) == str(packet2)
fragment.parse("fragment{tcp:-1:False}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/UDP(sport=2222, dport=3333, chksum=0x4444)/("thisissomedata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
assert str(packet1) == str(packet2)
fragment.parse("fragment{tcp:-1:False}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, chksum=0x4444))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06)/TCP(sport=2222, dport=3333, chksum=0x4444))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
assert str(packet1) == str(packet2)
fragment.parse("fragment{ip:-1:False}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1", proto=0x06))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
assert str(packet1) == str(packet2)
@ -276,7 +276,7 @@ def test_ip_only_fragment(logger):
fragment = actions.fragment.FragmentAction(correct_order=True)
fragment.parse("fragment{ip:-1:True}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/("datadata11datadata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/("datadata11datadata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -293,7 +293,7 @@ def test_overlapping_segment():
fragment = actions.fragment.FragmentAction(correct_order=True)
fragment.parse("fragment{tcp:-1:True:4}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -312,7 +312,7 @@ def test_overlapping_segment_no_overlap():
fragment = actions.fragment.FragmentAction(correct_order=True)
fragment.parse("fragment{tcp:-1:True:0}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -331,7 +331,7 @@ def test_overlapping_segment_entire_packet():
fragment = actions.fragment.FragmentAction(correct_order=True)
fragment.parse("fragment{tcp:-1:True:9}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"
@ -350,7 +350,7 @@ def test_overlapping_segment_out_of_bounds():
fragment = actions.fragment.FragmentAction(correct_order=True)
fragment.parse("fragment{tcp:-1:True:20}", logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(seq=100)/("datadata11datadata"))
packet1, packet2 = fragment.run(packet, logger)
assert id(packet1) != id(packet2), "Duplicate aliased packet objects"

View File

@ -7,6 +7,8 @@ sys.path.append("..")
import actions.strategy
import actions.utils
import actions.tamper
import layers.layer
import layers.tcp_layer
from scapy.all import IP, TCP, Raw, send
@ -15,7 +17,7 @@ def test_append_options(logger):
"""
Tests appending a given option
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(tamper_proto="TCP", field="options-wscale", tamper_value=50, tamper_type="replace")
lpacket, rpacket = tamper.run(packet, logger)
lpacket.show()
@ -26,7 +28,7 @@ def test_append_random_options(logger):
"""
Tests appending a given option with a random value
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-mss", tamper_type="corrupt")
lpacket, rpacket = tamper.run(packet, logger)
assert lpacket["TCP"].options[0][0] == 'MSS'
@ -36,7 +38,7 @@ def test_tamper_options(logger):
"""
Tests tampering a given option with a given value
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-timestamp", tamper_type="replace", tamper_value=3433)
lpacket, rpacket = tamper.run(packet, logger)
assert lpacket["TCP"].options[0][0] == "Timestamp"
@ -46,7 +48,7 @@ def test_random_tamper_options(logger):
"""
Tests tampering a given option with a random value (corrupt)
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-mss", tamper_type="corrupt")
lpacket, rpacket = tamper.run(packet, logger)
assert lpacket["TCP"].options[0][0] == "MSS"
@ -59,9 +61,9 @@ def test_correct_assignment(logger):
"""
Tests that all options can be assigned
"""
for option in actions.layer.TCPLayer.scapy_options.values():
for option in layers.tcp_layer.TCPLayer.scapy_options.values():
print(option)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-" + str(option.lower()), tamper_type="corrupt")
lpacket, rpacket = tamper.run(packet, logger)
assert lpacket["TCP"].options[0][0] == option
@ -70,7 +72,7 @@ def test_str(logger):
"""
Tests the string representation of each
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-mss", tamper_value=39584, tamper_type="replace")
assert str(tamper) == "tamper{TCP:options-mss:replace:39584}"
@ -79,7 +81,7 @@ def test_parse(logger):
"""
Tests the ability to parse
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-mss")
assert tamper.parse("TCP:options-mss:corrupt", logger)
assert str(tamper) == "tamper{TCP:options-mss:corrupt}"
@ -88,7 +90,7 @@ def test_parse_run(logger):
"""
Tests the ability to parse
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None)
assert tamper.parse("TCP:options-mss:corrupt", logger)
@ -99,7 +101,7 @@ def test_parse_num(logger):
"""
Tests parsing integers
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, tamper_type="options")
assert tamper.parse("TCP:options-mss:replace:1440", logger)
@ -110,7 +112,7 @@ def test_option_8(logger):
"""
Tests options 7
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None)
assert tamper.parse("TCP:options-timestamp:replace:40000", logger)
@ -121,7 +123,7 @@ def test_option_1(logger):
"""
Tests option 1
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, tamper_type="options")
assert tamper.parse("TCP:options-nop:corrupt", logger)
@ -132,7 +134,7 @@ def test_md5options(logger):
"""
Tests appending a given option - the md5header
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/("data"))
tamper = actions.tamper.TamperAction(None, field="options-md5header", tamper_value=b'\xee\xee\xee\xee\xee\xee\xee\xee', tamper_type="replace")
lpacket, rpacket = tamper.run(packet, logger)
assert lpacket["TCP"].options == [(19, b'\xee\xee\xee\xee\xee\xee\xee\xee')]

View File

@ -1,7 +1,14 @@
import pytest
import actions.packet
import actions.layer
import actions.utils
import actions.trigger
import layers.packet
import layers.layer
import layers.tcp_layer
import layers.dns_layer
import layers.dnsqr_layer
import layers.udp_layer
import layers.ip_layer
import evolve
from scapy.all import IP, TCP, UDP, DNS, DNSQR, Raw, DNSRR
@ -12,10 +19,10 @@ def test_parse_layers():
Tests layer parsing.
"""
pkt = IP()/TCP()/Raw("")
packet = actions.packet.Packet(pkt)
layers = list(packet.read_layers())
assert layers[0].name == "IP"
assert layers[1].name == "TCP"
packet = layers.packet.Packet(pkt)
layers_l = list(packet.read_layers())
assert layers_l[0].name == "IP"
assert layers_l[1].name == "TCP"
layers_dict = packet.setup_layers()
assert layers_dict["IP"]
@ -27,9 +34,9 @@ def test_get_random():
Tests get random
"""
tcplayer = actions.layer.TCPLayer(TCP())
tcplayer = layers.tcp_layer.TCPLayer(TCP())
field, value = tcplayer.get_random()
assert field in actions.layer.TCPLayer.fields
assert field in layers.tcp_layer.TCPLayer.fields
def test_gen_random():
@ -37,7 +44,7 @@ def test_gen_random():
Tests gen random
"""
for i in range(0, 2000):
layer, field, value = actions.packet.Packet().gen_random()
layer, field, value = layers.packet.Packet().gen_random()
assert layer in [DNS, TCP, UDP, IP, DNSQR]
@ -47,14 +54,14 @@ def test_dnsqr():
"""
pkt = UDP()/DNS(ancount=1)/DNSQR()
pkt.show()
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
packet.show()
assert len(packet.layers) == 3
assert "UDP" in packet.layers
assert "DNS" in packet.layers
assert "DNSQR" in packet.layers
pkt = IP()/UDP()/DNS()/DNSQR()
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
assert str(packet)
@ -62,12 +69,12 @@ def test_load():
"""
Tests loads.
"""
tcp = actions.layer.TCPLayer(TCP())
tcp = layers.tcp_layer.TCPLayer(TCP())
load = tcp.gen("load")
pkt = IP()/"datadata"
p = actions.packet.Packet(pkt)
p = layers.packet.Packet(pkt)
assert p.get("IP", "load") == "datadata"
p2 = actions.packet.Packet(IP(bytes(p)))
p2 = layers.packet.Packet(IP(bytes(p)))
assert p2.get("IP", "load") == "datadata"
p2.set("IP", "load", "data2")
# Check p is unchanged
@ -81,9 +88,9 @@ def test_load():
assert p2.get("IP", "chksum") == None
pkt = IP()/TCP()/"datadata"
p = actions.packet.Packet(pkt)
p = layers.packet.Packet(pkt)
assert p.get("TCP", "load") == "datadata"
p2 = actions.packet.Packet(IP(bytes(p)))
p2 = layers.packet.Packet(IP(bytes(p)))
assert p2.get("TCP", "load") == "datadata"
p2.set("TCP", "load", "data2")
# Check p is unchanged
@ -97,7 +104,7 @@ def test_parse_load(logger):
"""
Tests load parsing.
"""
pkt = actions.packet.Packet(IP()/TCP()/"TYPE A\r\n")
pkt = layers.packet.Packet(IP()/TCP()/"TYPE A\r\n")
print("Parsed: %s" % pkt.get("TCP", "load"))
strat = actions.utils.parse("[TCP:load:TYPE%20A%0D%0A]-drop-| \/", logger)
@ -113,12 +120,12 @@ def test_dns():
"""
Tests DNS layer.
"""
dns = actions.layer.DNSLayer(DNS())
dns = layers.dns_layer.DNSLayer(DNS())
print(dns.gen("id"))
assert dns.gen("id")
p = actions.packet.Packet(DNS(id=0xabcd))
p2 = actions.packet.Packet(DNS(bytes(p)))
p = layers.packet.Packet(DNS(id=0xabcd))
p2 = layers.packet.Packet(DNS(bytes(p)))
assert p.get("DNS", "id") == 0xabcd
assert p2.get("DNS", "id") == 0xabcd
@ -126,13 +133,13 @@ def test_dns():
assert p.get("DNS", "id") == 0xabcd # Check p is unchanged
assert p2.get("DNS", "id") == 0x4321
dns = actions.packet.Packet(DNS(aa=1))
dns = layers.packet.Packet(DNS(aa=1))
assert dns.get("DNS", "aa") == 1
aa = dns.gen("DNS", "aa")
assert aa == 0 or aa == 1
assert dns.get("DNS", "aa") == 1 # Original value unchanged
dns = actions.packet.Packet(DNS(opcode=15))
dns = layers.packet.Packet(DNS(opcode=15))
assert dns.get("DNS", "opcode") == 15
opcode = dns.gen("DNS", "opcode")
assert opcode >= 0 and opcode <= 15
@ -141,7 +148,7 @@ def test_dns():
dns.set("DNS", "opcode", 3)
assert dns.get("DNS", "opcode") == 3
dns = actions.packet.Packet(DNS(qr=0))
dns = layers.packet.Packet(DNS(qr=0))
assert dns.get("DNS", "qr") == 0
qr = dns.gen("DNS", "qr")
assert qr == 0 or qr == 1
@ -150,7 +157,7 @@ def test_dns():
dns.set("DNS", "qr", 1)
assert dns.get("DNS", "qr") == 1
dns = actions.packet.Packet(DNS(arcount=0xAABB))
dns = layers.packet.Packet(DNS(arcount=0xAABB))
assert dns.get("DNS", "arcount") == 0xAABB
arcount = dns.gen("DNS", "arcount")
assert arcount >= 0 and arcount <= 0xffff
@ -159,13 +166,13 @@ def test_dns():
dns.set("DNS", "arcount", 65432)
assert dns.get("DNS", "arcount") == 65432
dns = actions.layer.DNSLayer(DNS()/DNSQR(qname="example.com"))
dns = layers.dns_layer.DNSLayer(DNS()/DNSQR(qname="example.com"))
assert isinstance(dns.get_next_layer(), DNSQR)
print(dns.gen("id"))
assert dns.gen("id")
p = actions.packet.Packet(DNS(id=0xabcd))
p2 = actions.packet.Packet(DNS(bytes(p)))
p = layers.packet.Packet(DNS(id=0xabcd))
p2 = layers.packet.Packet(DNS(bytes(p)))
assert p.get("DNS", "id") == 0xabcd
assert p2.get("DNS", "id") == 0xabcd
@ -175,27 +182,27 @@ def test_read_layers():
Tests the ability to read each layer
"""
packet = IP() / UDP() / TCP() / DNS() / DNSQR(qname="example.com") / DNSQR(qname="example2.com") / DNSQR(qname="example3.com")
packet_geneva = actions.packet.Packet(packet)
packet_geneva = layers.packet.Packet(packet)
packet_geneva.setup_layers()
i = 0
for layer in packet_geneva.read_layers():
if i == 0:
assert isinstance(layer, actions.layer.IPLayer)
assert isinstance(layer, layers.ip_layer.IPLayer)
elif i == 1:
assert isinstance(layer, actions.layer.UDPLayer)
assert isinstance(layer, layers.udp_layer.UDPLayer)
elif i == 2:
assert isinstance(layer, actions.layer.TCPLayer)
assert isinstance(layer, layers.tcp_layer.TCPLayer)
elif i == 3:
assert isinstance(layer, actions.layer.DNSLayer)
assert isinstance(layer, layers.dns_layer.DNSLayer)
elif i == 4:
assert isinstance(layer, actions.layer.DNSQRLayer)
assert isinstance(layer, layers.dnsqr_layer.DNSQRLayer)
assert layer.layer.qname == b"example.com"
elif i == 5:
assert isinstance(layer, actions.layer.DNSQRLayer)
assert isinstance(layer, layers.dnsqr_layer.DNSQRLayer)
assert layer.layer.qname == b"example2.com"
elif i == 6:
assert isinstance(layer, actions.layer.DNSQRLayer)
assert isinstance(layer, layers.dnsqr_layer.DNSQRLayer)
assert layer.layer.qname == b"example3.com"
i += 1
@ -204,7 +211,7 @@ def test_multi_opts():
Tests various option getting/setting.
"""
pkt = IP()/TCP(options=[('MSS', 1460), ('SAckOK', b''), ('Timestamp', (4154603075, 0)), ('NOP', None), ('WScale', 7), ('md5header', b'abcd' * 8)])
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
assert packet.get("TCP", "options-sackok") == ''
assert packet.get("TCP", "options-mss") == 1460
assert packet.get("TCP", "options-timestamp") == 4154603075
@ -215,7 +222,7 @@ def test_multi_opts():
assert packet.get("TCP", "options-timestamp") == 400000000
assert packet.get("TCP", "options-wscale") == 7
pkt = IP()/TCP(options=[('SAckOK', b''), ('Timestamp', (4154603075, 0)), ('NOP', None), ('WScale', 7)])
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
# If the option isn't present, it will be returned as an empty string
assert packet.get("TCP", "options-mss") == ''
packet.set("TCP", "options-mss", "")
@ -227,11 +234,11 @@ def test_options_eol():
Tests options-eol.
"""
pkt = TCP(options=[("EOL", None)])
p = actions.packet.Packet(pkt)
p = layers.packet.Packet(pkt)
assert p.get("TCP", "options-eol") == ""
p2 = actions.packet.Packet(TCP(bytes(p)))
p2 = layers.packet.Packet(TCP(bytes(p)))
assert p2.get("TCP", "options-eol") == ""
p = actions.packet.Packet(IP()/TCP(options=[]))
p = layers.packet.Packet(IP()/TCP(options=[]))
assert p.get("TCP", "options-eol") == ""
p.set("TCP", "options-eol", "")
p.show()
@ -249,8 +256,8 @@ def test_compression_fallback(logger):
Test that compression does not touch a packet without DNS in it packet
"""
pkt = UDP()
p = actions.packet.Packet(pkt)
p2 = actions.layer.DNSLayer.dns_decompress(p, logger)
p = layers.packet.Packet(pkt)
p2 = layers.dns_layer.DNSLayer.dns_decompress(p, logger)
assert p2 == p, "dns_decompress changed a non DNS packet"
@ -259,11 +266,11 @@ def test_options_mss():
Tests options-eol.
"""
pkt = TCP(options=[("MSS", 1440)])
p = actions.packet.Packet(pkt)
p = layers.packet.Packet(pkt)
assert p.get("TCP", "options-mss") == 1440
p2 = actions.packet.Packet(TCP(bytes(p)))
p2 = layers.packet.Packet(TCP(bytes(p)))
assert p2.get("TCP", "options-mss") == 1440
p = actions.packet.Packet(TCP(options=[]))
p = layers.packet.Packet(TCP(options=[]))
assert p.get("TCP", "options-mss") == ""
p.set("TCP", "options-mss", 2880)
p.show()
@ -281,7 +288,7 @@ def check_get(protocol, field, value):
"""
pkt = protocol()
setattr(pkt, field, value)
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
assert packet.get(protocol.__name__, field) == value
@ -374,11 +381,11 @@ def check_set_get(protocol, field, value):
"""
Checks if the get method worked for this protocol, field, and value.
"""
pkt = actions.packet.Packet(protocol())
pkt = layers.packet.Packet(protocol())
pkt.set(protocol.__name__, field, value)
assert pkt.get(protocol.__name__, field) == value
# Rebuild the packet to confirm the type survived
pkt2 = actions.packet.Packet(protocol(bytes(pkt)))
pkt2 = layers.packet.Packet(protocol(bytes(pkt)))
assert pkt2.get(protocol.__name__, field) == value, "Value %s for header %s didn't survive packet parsing." % (value, field)
@ -396,12 +403,12 @@ def check_gen_set_get(protocol, field):
"""
Checks if the get method worked for this protocol, field, and value.
"""
pkt = actions.packet.Packet(protocol())
pkt = layers.packet.Packet(protocol())
new_value = pkt.gen(protocol.__name__, field)
pkt.set(protocol.__name__, field, new_value)
assert pkt.get(protocol.__name__, field) == new_value
# Rebuild the packet to confirm the type survived
pkt2 = actions.packet.Packet(protocol(bytes(pkt)))
pkt2 = layers.packet.Packet(protocol(bytes(pkt)))
assert pkt2.get(protocol.__name__, field) == new_value
@ -422,7 +429,7 @@ def test_custom_get():
Tests value retrieval for custom getters.
"""
pkt = IP()/TCP()/Raw(load="AAAA")
tcp = actions.packet.Packet(pkt)
tcp = layers.packet.Packet(pkt)
assert tcp.get("TCP", "load") == "AAAA"
@ -430,51 +437,51 @@ def test_restrict_fields(logger):
"""
Tests packet field restriction.
"""
actions.packet.SUPPORTED_LAYERS = [
actions.layer.IPLayer,
actions.layer.TCPLayer,
actions.layer.UDPLayer
layers.packet.SUPPORTED_LAYERS = [
layers.ip_layer.IPLayer,
layers.tcp_layer.TCPLayer,
layers.udp_layer.UDPLayer
]
tcpfields = actions.layer.TCPLayer.fields
udpfields = actions.layer.UDPLayer.fields
ipfields = actions.layer.IPLayer.fields
tcpfields = layers.tcp_layer.TCPLayer.fields
udpfields = layers.udp_layer.UDPLayer.fields
ipfields = layers.ip_layer.IPLayer.fields
actions.packet.Packet.restrict_fields(logger, ["TCP", "UDP"], [], [])
assert len(actions.packet.SUPPORTED_LAYERS) == 2
assert actions.layer.TCPLayer in actions.packet.SUPPORTED_LAYERS
assert actions.layer.UDPLayer in actions.packet.SUPPORTED_LAYERS
assert not actions.layer.IPLayer in actions.packet.SUPPORTED_LAYERS
layers.packet.Packet.restrict_fields(logger, ["TCP", "UDP"], [], [])
assert len(layers.packet.SUPPORTED_LAYERS) == 2
assert layers.tcp_layer.TCPLayer in layers.packet.SUPPORTED_LAYERS
assert layers.udp_layer.UDPLayer in layers.packet.SUPPORTED_LAYERS
assert not layers.ip_layer.IPLayer in layers.packet.SUPPORTED_LAYERS
pkt = IP()/TCP()
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
assert "TCP" in packet.layers
assert not "IP" in packet.layers
assert len(packet.layers) == 1
for i in range(0, 2000):
layer, proto, field = actions.packet.Packet().gen_random()
layer, proto, field = layers.packet.Packet().gen_random()
assert layer in [TCP, UDP]
# Check we can't retrieve any IP fields
for field in actions.layer.IPLayer.fields:
for field in layers.ip_layer.IPLayer.fields:
with pytest.raises(AssertionError):
packet.get("IP", field)
# Check we can get all the TCP fields
for field in actions.layer.TCPLayer.fields:
for field in layers.tcp_layer.TCPLayer.fields:
packet.get("TCP", field)
actions.packet.Packet.restrict_fields(logger, ["TCP", "UDP"], ["flags"], [])
packet = actions.packet.Packet(pkt)
assert len(actions.packet.SUPPORTED_LAYERS) == 1
assert actions.layer.TCPLayer in actions.packet.SUPPORTED_LAYERS
assert not actions.layer.UDPLayer in actions.packet.SUPPORTED_LAYERS
assert not actions.layer.IPLayer in actions.packet.SUPPORTED_LAYERS
assert actions.layer.TCPLayer.fields == ["flags"]
assert not actions.layer.UDPLayer.fields
layers.packet.Packet.restrict_fields(logger, ["TCP", "UDP"], ["flags"], [])
packet = layers.packet.Packet(pkt)
assert len(layers.packet.SUPPORTED_LAYERS) == 1
assert layers.tcp_layer.TCPLayer in layers.packet.SUPPORTED_LAYERS
assert not layers.udp_layer.UDPLayer in layers.packet.SUPPORTED_LAYERS
assert not layers.ip_layer.IPLayer in layers.packet.SUPPORTED_LAYERS
assert layers.tcp_layer.TCPLayer.fields == ["flags"]
assert not layers.udp_layer.UDPLayer.fields
# Check we can't retrieve any IP fields
for field in actions.layer.IPLayer.fields:
for field in layers.ip_layer.IPLayer.fields:
with pytest.raises(AssertionError):
packet.get("IP", field)
@ -487,37 +494,37 @@ def test_restrict_fields(logger):
packet.get("TCP", field)
for i in range(0, 2000):
layer, field, value = actions.packet.Packet().gen_random()
layer, field, value = layers.packet.Packet().gen_random()
assert layer == TCP
assert field == "flags"
_, proto, field, value, _ = actions.trigger.Trigger.get_rand_trigger(None, 0)
assert proto == 'TCP'
assert field == "flags"
actions.packet.Packet.reset_restrictions()
actions.packet.SUPPORTED_LAYERS = [
actions.layer.IPLayer,
actions.layer.TCPLayer,
actions.layer.UDPLayer
layers.packet.Packet.reset_restrictions()
layers.packet.SUPPORTED_LAYERS = [
layers.ip_layer.IPLayer,
layers.tcp_layer.TCPLayer,
layers.udp_layer.UDPLayer
]
with pytest.raises(AssertionError):
actions.packet.Packet.restrict_fields(logger, ["TCP", "IP"], ["notathing"], ["notathing"])
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.restrict_fields(logger, ["TCP", "IP"], ["notathing"], ["notathing"])
layers.packet.Packet.reset_restrictions()
actions.packet.Packet.restrict_fields(logger, ["TCP", "IP"], [], ["sport", "dport", "seq", "src"])
packet = actions.packet.Packet(pkt)
layers.packet.Packet.restrict_fields(logger, ["TCP", "IP"], [], ["sport", "dport", "seq", "src"])
packet = layers.packet.Packet(pkt)
packet = packet.copy()
assert packet.has_supported_layers()
assert len(actions.packet.SUPPORTED_LAYERS) == 2
assert actions.layer.TCPLayer in actions.packet.SUPPORTED_LAYERS
assert not actions.layer.UDPLayer in actions.packet.SUPPORTED_LAYERS
assert actions.layer.IPLayer in actions.packet.SUPPORTED_LAYERS
assert set(actions.layer.TCPLayer.fields) == set([f for f in tcpfields if f not in ["sport", "dport", "seq"]])
assert set(actions.layer.IPLayer.fields) == set([f for f in ipfields if f not in ["src"]])
assert len(layers.packet.SUPPORTED_LAYERS) == 2
assert layers.tcp_layer.TCPLayer in layers.packet.SUPPORTED_LAYERS
assert not layers.udp_layer.UDPLayer in layers.packet.SUPPORTED_LAYERS
assert layers.ip_layer.IPLayer in layers.packet.SUPPORTED_LAYERS
assert set(layers.tcp_layer.TCPLayer.fields) == set([f for f in tcpfields if f not in ["sport", "dport", "seq"]])
assert set(layers.ip_layer.IPLayer.fields) == set([f for f in ipfields if f not in ["src"]])
# Check we can't retrieve any IP fields
for field in actions.layer.IPLayer.fields:
for field in layers.ip_layer.IPLayer.fields:
if field == "src":
with pytest.raises(AssertionError):
packet.get("IP", field)
@ -533,7 +540,7 @@ def test_restrict_fields(logger):
packet.get("TCP", field)
for i in range(0, 2000):
layer, field, value = actions.packet.Packet().gen_random()
layer, field, value = layers.packet.Packet().gen_random()
assert layer in [TCP, IP]
assert field not in ["sport", "dport", "seq", "src"]
@ -541,24 +548,24 @@ def test_restrict_fields(logger):
assert proto in ['TCP', 'IP']
assert field not in ["sport", "dport", "seq", "src"]
actions.packet.Packet.reset_restrictions()
actions.packet.SUPPORTED_LAYERS = [
actions.layer.IPLayer,
actions.layer.TCPLayer,
actions.layer.UDPLayer
layers.packet.Packet.reset_restrictions()
layers.packet.SUPPORTED_LAYERS = [
layers.ip_layer.IPLayer,
layers.tcp_layer.TCPLayer,
layers.udp_layer.UDPLayer
]
evolve.restrict_headers(logger, "ip,udp,dns", "", "version")
packet = actions.packet.Packet(pkt)
packet = layers.packet.Packet(pkt)
proto, field, value = packet.get_random()
assert proto.__name__ in ["IP", "UDP"]
assert len(actions.packet.SUPPORTED_LAYERS) == 2
assert not actions.layer.TCPLayer in actions.packet.SUPPORTED_LAYERS
assert actions.layer.UDPLayer in actions.packet.SUPPORTED_LAYERS
assert actions.layer.IPLayer in actions.packet.SUPPORTED_LAYERS
assert set(actions.layer.IPLayer.fields) == set([f for f in ipfields if f not in ["version"]])
assert set(actions.layer.UDPLayer.fields) == set(udpfields)
assert len(layers.packet.SUPPORTED_LAYERS) == 2
assert not layers.tcp_layer.TCPLayer in layers.packet.SUPPORTED_LAYERS
assert layers.udp_layer.UDPLayer in layers.packet.SUPPORTED_LAYERS
assert layers.ip_layer.IPLayer in layers.packet.SUPPORTED_LAYERS
assert set(layers.ip_layer.IPLayer.fields) == set([f for f in ipfields if f not in ["version"]])
assert set(layers.udp_layer.UDPLayer.fields) == set(udpfields)
actions.packet.Packet.reset_restrictions()
for layer in actions.packet.SUPPORTED_LAYERS:
layers.packet.Packet.reset_restrictions()
for layer in layers.packet.SUPPORTED_LAYERS:
assert layer.fields, '%s has no fields - reset failed!' % str(layer)

View File

@ -4,6 +4,7 @@ sys.path.append("..") # Include the root of the project
import evolve
import os
import actions.utils
import layers.layer
# Test Files Directory Setup
test_files_directory = os.path.join("test_files")
@ -97,7 +98,7 @@ def test_evolve_load_generation(logger):
"""
generations = 2
actions.packet.Packet.reset_restrictions()
layers.packet.Packet.reset_restrictions()
options = {}
options["population_size"] = 2

View File

@ -2,7 +2,7 @@ from scapy.all import IP, TCP
import evolve
import actions.utils
import actions.strategy
import actions.packet
import layers.packet
import actions.sleep
import sys
# Include the root of the project
@ -16,7 +16,7 @@ def test_basic_sleep(logger):
sleep = actions.sleep.SleepAction(.5)
assert str(sleep) == "sleep{0.5}", "Sleep returned incorrect string representation: %s" % str(sleep)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP()/("data"))
packet1, packet2 = sleep.run(packet, logger)
assert packet1.sleep == .5, "Packet had wrong sleep value"

View File

@ -10,6 +10,7 @@ import actions.utils
import actions.strategy
import evaluator
import evolve
import layers.layer
from scapy.all import IP, TCP, Raw
@ -67,24 +68,24 @@ def test_run(logger):
strat3 = actions.utils.parse("[TCP:flags:A]-duplicate(tamper{TCP:dataofs:replace:0},)-| \/", logger)
strat4 = actions.utils.parse("[TCP:flags:A]-duplicate(tamper{TCP:flags:replace:R}(tamper{TCP:chksum:replace:15239},),duplicate(tamper{TCP:flags:replace:S}(tamper{TCP:chksum:replace:14539}(tamper{TCP:seq:corrupt},),),))-| \/", logger)
p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
p1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packets = strat1.act_on_packet(p1, logger, direction="out")
assert packets, "Strategy dropped SYN packets"
assert len(packets) == 1
assert packets[0]["TCP"].flags == "S"
p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
p1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packets = strat2.act_on_packet(p1, logger, direction="out")
assert not packets, "Strategy failed to drop SYN packets"
p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A", dataofs=5))
p1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A", dataofs=5))
packets = strat3.act_on_packet(p1, logger, direction="out")
assert packets, "Strategy dropped packets"
assert len(packets) == 2, "Incorrect number of packets emerged from forest"
assert packets[0]["TCP"].dataofs == 0, "Packet tamper failed"
assert packets[1]["TCP"].dataofs == 5, "Duplicate packet was tampered"
p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A", dataofs=5, chksum=100))
p1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A", dataofs=5, chksum=100))
packets = strat4.act_on_packet(p1, logger, direction="out")
assert packets, "Strategy dropped packets"
assert len(packets) == 3, "Incorrect number of packets emerged from forest"
@ -96,12 +97,12 @@ def test_run(logger):
assert packets[2]["TCP"].flags == "A", "Duplicate failed"
strat4 = actions.utils.parse("[TCP:load:]-tamper{TCP:load:replace:mhe76jm0bd}(fragment{ip:-1:True}(tamper{IP:load:corrupt},drop),)-| \/ ", logger)
p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
p1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packets = strat4.act_on_packet(p1, logger)
# Will fail with scapy 2.4.2 if packet is reparsed
strat5 = actions.utils.parse("[TCP:options-eol:]-tamper{TCP:load:replace:o}(tamper{TCP:dataofs:replace:11},)-| \/", logger)
p1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
p1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packets = strat5.act_on_packet(p1, logger)
@ -205,5 +206,5 @@ def test_fail_cases(logger):
s = "[IP:proto:6]-tamper{IP:proto:replace:125}(fragment{tcp:48:True:26}(tamper{TCP:options-md5header:replace:37f0e737da65224ea03d46c713ed6fd2},),)-| \/ "
s = actions.utils.parse(s, logger)
p = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/Raw("aaaaaaaaaa"))
p = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S")/Raw("aaaaaaaaaa"))
s.act_on_packet(p, logger)

View File

@ -8,10 +8,11 @@ sys.path.append("..")
import evolve
import evaluator
import actions.strategy
import actions.packet
import layers.packet
import actions.utils
import actions.tamper
import actions.layer
import layers.layer
import layers.ip_layer
from scapy.all import IP, TCP, UDP, DNS, DNSQR, sr1
@ -20,7 +21,7 @@ def test_tamper(logger):
"""
Tests tampering with replace
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="flags", tamper_type="replace", tamper_value="R")
lpacket, rpacket = tamper.run(packet, logger)
@ -48,7 +49,7 @@ def test_tamper_ip(logger):
"""
Tests tampering with IP
"""
packet = actions.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="src", tamper_type="replace", tamper_value="192.168.1.1", tamper_proto="IP")
lpacket, rpacket = tamper.run(packet, logger)
@ -70,7 +71,7 @@ def test_tamper_udp(logger):
"""
Tests tampering with UDP
"""
packet = actions.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=2222, dport=53))
packet = layers.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=2222, dport=53))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="chksum", tamper_type="replace", tamper_value=4444, tamper_proto="UDP")
lpacket, rpacket = tamper.run(packet, logger)
@ -93,7 +94,7 @@ def test_tamper_ip_ident(logger):
Tests tampering with IP and that the checksum is correctly changed
"""
packet = actions.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src='127.0.0.1', dst='127.0.0.1')/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field='id', tamper_type='replace', tamper_value=3333, tamper_proto="IP")
lpacket, rpacket = tamper.run(packet, logger)
@ -161,9 +162,9 @@ def test_mutate(logger, use_canary):
assert tamper.tamper_value == val, "Tamper value is not stable."
# Create a test packet to ensure the field/proto choice was safe
if random.random() < 0.5:
test_packet = actions.packet.Packet(IP()/TCP())
test_packet = layers.packet.Packet(IP()/TCP())
else:
test_packet = actions.packet.Packet(IP()/UDP())
test_packet = layers.packet.Packet(IP()/UDP())
# Check that tamper can run safely after mutation
try:
@ -199,7 +200,7 @@ def test_corrupt(logger):
assert tamper.tamper_type == "corrupt", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:flags:corrupt}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
@ -226,7 +227,7 @@ def test_add(logger):
assert tamper.tamper_type == "add", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:seq:add:10}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
@ -254,7 +255,7 @@ def test_decompress(logger):
assert tamper.tamper_type == "compress", "Tamper action changed types."
assert str(tamper) == "tamper{DNS:qd:compress}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = actions.packet.Packet(IP(dst="8.8.8.8")/UDP(dport=53)/DNS(qd=DNSQR(qname="minghui.ca.")))
packet = layers.packet.Packet(IP(dst="8.8.8.8")/UDP(dport=53)/DNS(qd=DNSQR(qname="minghui.ca.")))
original = packet.copy()
tamper.tamper(packet, logger)
assert bytes(packet["DNS"]) == b'\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x07minghui\xc0\x1a\x00\x01\x00\x01\x02ca\x00\x00\x01\x00\x01'
@ -266,7 +267,7 @@ def test_decompress(logger):
assert confirm_unchanged(packet, original, IP, ["len"])
print(resp.summary())
packet = actions.packet.Packet(IP(dst="8.8.8.8")/UDP(dport=53)/DNS(qd=DNSQR(qname="maps.google.com")))
packet = layers.packet.Packet(IP(dst="8.8.8.8")/UDP(dport=53)/DNS(qd=DNSQR(qname="maps.google.com")))
original = packet.copy()
tamper.tamper(packet, logger)
assert bytes(packet["DNS"]) == b'\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x04maps\xc0\x17\x00\x01\x00\x01\x06google\x03com\x00\x00\x01\x00\x01'
@ -279,7 +280,7 @@ def test_decompress(logger):
print(resp.summary())
# Confirm this is a NOP on normal packets
packet = actions.packet.Packet(IP()/UDP())
packet = layers.packet.Packet(IP()/UDP())
original = packet.copy()
tamper.tamper(packet, logger)
assert packet.packet.summary() == original.packet.summary()
@ -302,7 +303,7 @@ def test_corrupt_chksum(logger):
assert tamper.tamper_type == "corrupt", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:chksum:corrupt}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
@ -326,7 +327,7 @@ def test_corrupt_dataofs(logger):
"""
Tests the tamper 'replace' primitive.
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S", dataofs="6L"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S", dataofs="6L"))
original = copy.deepcopy(packet)
tamper = actions.tamper.TamperAction(None, field="dataofs", tamper_type="corrupt")
@ -357,7 +358,7 @@ def test_replace(logger):
assert tamper.field == "flags", "Tamper action changed fields."
assert tamper.tamper_type == "replace", "Tamper action changed types."
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
original = copy.deepcopy(packet)
tamper.tamper(packet, logger)
@ -400,7 +401,7 @@ def test_parse_flags(logger):
assert tamper.tamper_type == "replace", "Tamper action changed types."
assert str(tamper) == "tamper{TCP:flags:replace:FRAPUN}", "Tamper returned incorrect string representation: %s" % str(tamper)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
tamper.tamper(packet, logger)
assert packet[TCP].flags == "FRAPUN", "Tamper failed to change flags."
@ -417,21 +418,21 @@ def test_options(logger, value, test_type):
tamper = actions.tamper.TamperAction(None)
assert tamper.parse("TCP:options-%s:corrupt" % value.lower(), logger)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
tamper.run(packet, logger)
opts_dict_lookup = value.lower().replace(" ", "_")
for optname, optval in packet["TCP"].options:
if optname == value:
break
elif optname == actions.layer.TCPLayer.options_names[opts_dict_lookup]:
elif optname == layers.ip_layer.TCPLayer.options_names[opts_dict_lookup]:
break
else:
pytest.fail("Failed to find %s in options" % value)
assert len(packet["TCP"].options) == 1
raw_p = bytes(packet)
assert raw_p, "options broke scapy bytes"
p2 = actions.packet.Packet(IP(bytes(raw_p)))
p2 = layers.packet.Packet(IP(bytes(raw_p)))
assert p2.haslayer("IP")
assert p2.haslayer("TCP")
# EOLs might be added for padding, so just check >= 1
@ -439,7 +440,7 @@ def test_options(logger, value, test_type):
for optname, optval in p2["TCP"].options:
if optname == value:
break
elif optname == actions.layer.TCPLayer.options_names[opts_dict_lookup]:
elif optname == layers.ip_layer.TCPLayer.options_names[opts_dict_lookup]:
break
else:
pytest.fail("Failed to find %s in options" % value)
@ -458,7 +459,7 @@ def test_tamper_mutate_compress(logger):
assert tamper.tamper_type == "compress"
assert tamper.tamper_proto_str == "DNS"
assert tamper.field == "qd"
packet = actions.packet.Packet(IP()/TCP()/DNS()/DNSQR())
packet = layers.packet.Packet(IP()/TCP()/DNS()/DNSQR())
packet2 = tamper.tamper(packet, logger)
assert packet2 == packet
finally:

View File

@ -4,7 +4,7 @@ import pytest
sys.path.append("..")
import actions.trace
import actions.packet
import layers.packet
import actions.strategy
import actions.utils
import evolve
@ -19,14 +19,14 @@ def test_trace(logger):
trace = actions.trace.TraceAction(start_ttl=1, end_ttl=3)
assert str(trace) == "trace{1:3}", "Trace returned incorrect string representation: %s" % str(trace)
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
trace.run(packet, logger)
print("Testing that trace will not run twice:")
assert trace.run(packet, logger) == (None, None)
trace = actions.trace.TraceAction(start_ttl=1, end_ttl=3)
packet = actions.packet.Packet(TCP())
packet = layers.packet.Packet(TCP())
assert trace.run(packet, logger) == (packet, None)
s = "[TCP:flags:PA]-trace{1:3}-| \/ "

View File

@ -7,6 +7,7 @@ import actions.drop
import actions.tamper
import actions.duplicate
import actions.utils
import layers.packet
def test_init():
@ -46,9 +47,9 @@ def test_check():
a = actions.tree.ActionTree("out")
logger = logging.getLogger("test")
a.parse("[TCP:flags:RA]-tamper{TCP:flags:replace:S}-|", logger)
p = actions.packet.Packet(IP()/TCP(flags="A"))
p = layers.packet.Packet(IP()/TCP(flags="A"))
assert not a.check(p, logger)
p = actions.packet.Packet(IP(ttl=64)/TCP(flags="RA"))
p = layers.packet.Packet(IP(ttl=64)/TCP(flags="RA"))
assert a.check(p, logger)
assert a.remove_one()
assert a.check(p, logger)
@ -56,7 +57,7 @@ def test_check():
assert a.check(p, logger)
a.parse("[IP:ttl:64]-tamper{TCP:flags:replace:S}-|", logger)
assert a.check(p, logger)
p = actions.packet.Packet(IP(ttl=15)/TCP(flags="RA"))
p = layers.packet.Packet(IP(ttl=15)/TCP(flags="RA"))
assert not a.check(p, logger)
@ -67,11 +68,11 @@ def test_scapy():
a = actions.tree.ActionTree("out")
logger = logging.getLogger("test")
a.parse("[TCP:reserved:0]-tamper{TCP:flags:replace:S}-|", logger)
p = actions.packet.Packet(IP()/TCP(flags="A"))
p = layers.packet.Packet(IP()/TCP(flags="A"))
assert a.check(p, logger)
packets = a.run(p, logger)
assert packets[0][TCP].flags == "S"
p = actions.packet.Packet(IP()/TCP(flags="A"))
p = layers.packet.Packet(IP()/TCP(flags="A"))
assert a.check(p, logger)
a.parse("[TCP:reserved:0]-tamper{TCP:chksum:corrupt}-|", logger)
packets = a.run(p, logger)
@ -424,7 +425,7 @@ def test_run():
duplicate2 = actions.duplicate.DuplicateAction()
drop = actions.drop.DropAction()
packet = actions.packet.Packet(IP()/TCP())
packet = layers.packet.Packet(IP()/TCP())
a.add_action(tamper)
packets = a.run(packet, logging.getLogger("test"))
assert len(packets) == 1
@ -433,7 +434,7 @@ def test_run():
a.add_action(tamper2)
print(str(a))
packet = actions.packet.Packet(IP()/TCP())
packet = layers.packet.Packet(IP()/TCP())
assert not a.add_action(tamper), "tree added duplicate action"
packets = a.run(packet, logging.getLogger("test"))
assert len(packets) == 1
@ -444,7 +445,7 @@ def test_run():
a.remove_action(tamper2)
a.remove_action(tamper)
a.add_action(duplicate)
packet = actions.packet.Packet(IP()/TCP(flags="RA"))
packet = layers.packet.Packet(IP()/TCP(flags="RA"))
packets = a.run(packet, logging.getLogger("test"))
assert len(packets) == 2
assert None not in packets
@ -454,7 +455,7 @@ def test_run():
duplicate.left = tamper
duplicate.right = tamper2
packet = actions.packet.Packet(IP()/TCP(flags="RA"))
packet = layers.packet.Packet(IP()/TCP(flags="RA"))
print("ABUT TO RUN")
packets = a.run(packet, logging.getLogger("test"))
assert len(packets) == 2
@ -467,7 +468,7 @@ def test_run():
print(str(a))
tamper.left = duplicate2
packet = actions.packet.Packet(IP()/TCP(flags="RA"))
packet = layers.packet.Packet(IP()/TCP(flags="RA"))
packets = a.run(packet, logging.getLogger("test"))
assert len(packets) == 3
assert None not in packets
@ -477,7 +478,7 @@ def test_run():
print(str(a))
tamper2.left = drop
packet = actions.packet.Packet(IP()/TCP(flags="RA"))
packet = layers.packet.Packet(IP()/TCP(flags="RA"))
packets = a.run(packet, logging.getLogger("test"))
assert len(packets) == 2
assert None not in packets
@ -487,13 +488,13 @@ def test_run():
assert a.remove_action(duplicate2)
tamper.left = actions.drop.DropAction()
packet = actions.packet.Packet(IP()/TCP(flags="RA"))
packet = layers.packet.Packet(IP()/TCP(flags="RA"))
packets = a.run(packet, logger )
assert len(packets) == 0
print(str(a))
a.parse("[TCP:flags:A]-duplicate(tamper{TCP:flags:replace:R}(tamper{TCP:chksum:replace:14239},),duplicate(tamper{TCP:flags:replace:S},))-|", logger)
packet = actions.packet.Packet(IP()/TCP(flags="A"))
packet = layers.packet.Packet(IP()/TCP(flags="A"))
assert a.check(packet, logger)
packets = a.run(packet, logger)
assert len(packets) == 3

View File

@ -2,7 +2,7 @@ import sys
# Include the root of the project
sys.path.append("..")
import actions.packet
import layers.packet
import actions.strategy
import actions.tamper
import actions.utils
@ -23,7 +23,7 @@ def test_init(logger):
"""
Tests initialization.
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="S"))
trigger = actions.trigger.Trigger(None, None, None)
trigger.is_applicable(packet, logger)
@ -35,7 +35,7 @@ def test_trigger_gas(logger):
"""
Tests triggers having gas, including changing that gas while in use
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="SA", gas=1)
print(trigger)
assert trigger.is_applicable(packet, logger)
@ -66,7 +66,7 @@ def test_bomb_trigger_gas(logger):
"""
Tests triggers having bomb gas, including changing that gas while in use
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="SA", gas=-1)
print(trigger)
assert not trigger.is_applicable(packet, logger), "trigger should not fire on first run"
@ -98,7 +98,7 @@ def test_trigger_parse_gas(logger):
"""
Tests triggers having gas, including changing that gas while in use
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
# parse a trigger with 1 gas
@ -132,7 +132,7 @@ def test_bomb_trigger_parse_gas(logger):
"""
Tests bomb triggers having gas, including changing that gas while in use
"""
packet = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
# parse a bomb trigger with 1 gas
trigger = actions.trigger.Trigger.parse("TCP:flags:SA:-1")
@ -168,10 +168,10 @@ def test_wildcard(logger):
"""
Test wildcard trigger value
"""
packet_1 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A"))
packet_2 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet_3 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="RA"))
packet_4 = actions.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="P"))
packet_1 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="A"))
packet_2 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="SA"))
packet_3 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="RA"))
packet_4 = layers.packet.Packet(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(sport=2222, dport=3333, seq=100, ack=100, flags="P"))
trigger = actions.trigger.Trigger("field", "flags", "TCP", trigger_value="A*", gas=None)
assert trigger.is_applicable(packet_1, logger)
assert trigger.is_applicable(packet_2, logger)