mirror of https://github.com/Kkevsterrr/geneva
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
476 lines
20 KiB
476 lines
20 KiB
"""
|
|
Geneva Strategy Engine
|
|
|
|
Given a strategy and a server port, the engine configures NFQueue to capture all traffic
|
|
into and out of that port so the strategy can run over the connection.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
|
|
try:
|
|
import netfilterqueue
|
|
except ImportError:
|
|
pass
|
|
|
|
from scapy.layers.inet import IP
|
|
from scapy.utils import wrpcap
|
|
from scapy.config import conf
|
|
|
|
socket.setdefaulttimeout(1)
|
|
|
|
import layers.packet
|
|
import actions.strategy
|
|
import actions.utils
|
|
|
|
BASEPATH = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
class Engine():
|
|
def __init__(self, server_port,
|
|
string_strategy,
|
|
environment_id=None,
|
|
server_side=False,
|
|
output_directory="trials",
|
|
log_level="info",
|
|
file_log_level="info",
|
|
enabled=True,
|
|
in_queue_num=None,
|
|
out_queue_num=None,
|
|
forwarder=None,
|
|
save_seen_packets=True,
|
|
demo_mode=False):
|
|
"""
|
|
Args:
|
|
server_port (str): The port(s) the engine will monitor
|
|
string_strategy (str): String representation of strategy DNA to apply to the network
|
|
environment_id (str, None): ID of the given strategy
|
|
server_side (bool, False): Whether or not the engine is running on the server side of the connection
|
|
output_directory (str, 'trials'): The path logs and packet captures should be written to
|
|
enabled (bool, True): whether or not the engine should be started (used for conditional context managers)
|
|
in_queue_num (int, None): override the netfilterqueue number used for inbound packets. Used for running multiple instances of the engine at the same time. Defaults to None.
|
|
out_queue_num (int, None): override the netfilterqueue number used for outbound packets. Used for running multiple instances of the engine at the same time. Defaults to None.
|
|
save_seen_packets (bool, True): whether or not the engine should record and save packets it sees while running. Defaults to True, but it is recommended this be disabled on higher throughput systems.
|
|
demo_mode (bool, False): whether to replace IPs in log messages with random IPs to hide sensitive IP addresses.
|
|
"""
|
|
self.server_port = server_port
|
|
# whether the engine is running on the server or client side.
|
|
# this affects which direction each out/in tree is attached to the
|
|
# source and destination port.
|
|
self.server_side = server_side
|
|
self.overhead = 0
|
|
self.seen_packets = []
|
|
self.environment_id = environment_id
|
|
self.forwarder = forwarder
|
|
self.save_seen_packets = save_seen_packets
|
|
if forwarder:
|
|
self.sender_ip = forwarder["sender_ip"]
|
|
self.routing_ip = forwarder["routing_ip"]
|
|
self.forward_ip = forwarder["forward_ip"]
|
|
|
|
# Set up the directory and ID for logging
|
|
if not output_directory:
|
|
self.output_directory = "trials"
|
|
else:
|
|
self.output_directory = output_directory
|
|
actions.utils.setup_dirs(self.output_directory)
|
|
if not environment_id:
|
|
self.environment_id = actions.utils.get_id()
|
|
|
|
# Set up a logger
|
|
self.logger = actions.utils.get_logger(BASEPATH,
|
|
self.output_directory,
|
|
__name__,
|
|
"engine",
|
|
self.environment_id,
|
|
log_level=log_level,
|
|
file_log_level=file_log_level,
|
|
demo_mode=demo_mode)
|
|
# Warn if these are not provided
|
|
if not environment_id:
|
|
self.logger.warning("No environment ID given, one has been generated (%s)", self.environment_id)
|
|
if not output_directory:
|
|
self.logger.warning("No output directory specified, using the default (%s)" % self.output_directory)
|
|
|
|
# Used for conditional context manager usage
|
|
self.enabled = enabled
|
|
|
|
# Parse the given strategy
|
|
self.strategy = actions.utils.parse(string_strategy, self.logger)
|
|
|
|
# Setup variables used by the NFQueue system
|
|
self.in_queue_num = in_queue_num or 1
|
|
self.out_queue_num = out_queue_num or self.in_queue_num + 1
|
|
self.out_nfqueue_started = False
|
|
self.in_nfqueue_started = False
|
|
self.running_nfqueue = False
|
|
self.out_nfqueue = None
|
|
self.in_nfqueue = None
|
|
self.out_nfqueue_socket = None
|
|
self.in_nfqueue_socket = None
|
|
self.out_nfqueue_thread = None
|
|
self.in_nfqueue_thread = None
|
|
self.censorship_detected = False
|
|
# Specifically define an L3Socket to send our packets. This is an optimization
|
|
# for scapy to send packets more quickly than using just send(), as under the hood
|
|
# send() creates and then destroys a socket each time, imparting a large amount
|
|
# of overhead.
|
|
self.socket = conf.L3socket(iface=actions.utils.get_interface())
|
|
|
|
def __enter__(self):
|
|
"""
|
|
Allows the engine to be used as a context manager; simply launches the
|
|
engine if enabled.
|
|
"""
|
|
if self.enabled:
|
|
self.initialize_nfqueue()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, tb):
|
|
"""
|
|
Allows the engine to be used as a context manager
|
|
Stops the engine if enabled and closes loggers.
|
|
"""
|
|
for handler in self.logger.handlers:
|
|
handler.close()
|
|
if self.enabled:
|
|
self.shutdown_nfqueue()
|
|
|
|
def do_nat(self, packet):
|
|
"""
|
|
NATs packet: changes the sources and destination IP if it matches the
|
|
configured route, and clears the checksums for recalculating
|
|
|
|
Args:
|
|
packet (layers.packet.Packet): packet to modify before sending
|
|
|
|
Returns:
|
|
layers.packet.Packet: the modified packet
|
|
"""
|
|
if packet["IP"].src == self.sender_ip:
|
|
packet["IP"].dst = self.forward_ip
|
|
packet["IP"].src = self.routing_ip
|
|
del packet["TCP"].chksum
|
|
del packet["IP"].chksum
|
|
elif packet["IP"].src == self.forward_ip:
|
|
packet["IP"].dst = self.sender_ip
|
|
packet["IP"].src = self.routing_ip
|
|
del packet["TCP"].chksum
|
|
del packet["IP"].chksum
|
|
return packet
|
|
|
|
def mysend(self, packet):
|
|
"""
|
|
Helper scapy sending method. Expects a Geneva Packet input.
|
|
"""
|
|
try:
|
|
if self.forwarder:
|
|
self.logger.debug("NAT-ing packet.")
|
|
packet = self.do_nat(packet)
|
|
self.logger.debug("Sending packet %s", str(packet))
|
|
self.socket.send(packet.packet)
|
|
except Exception:
|
|
self.logger.exception("Error in engine mysend.")
|
|
|
|
def delayed_send(self, packet, delay):
|
|
"""
|
|
Method to be started by a thread to delay the sending of a packet without blocking the main thread.
|
|
"""
|
|
self.logger.debug("Sleeping for %f seconds." % delay)
|
|
time.sleep(delay)
|
|
self.mysend(packet)
|
|
|
|
def run_nfqueue(self, nfqueue, nfqueue_socket, direction):
|
|
"""
|
|
Handles running the outbound nfqueue socket with the socket timeout.
|
|
"""
|
|
try:
|
|
while self.running_nfqueue:
|
|
try:
|
|
if direction == "out":
|
|
self.out_nfqueue_started = True
|
|
else:
|
|
self.in_nfqueue_started = True
|
|
|
|
nfqueue.run_socket(nfqueue_socket)
|
|
# run_socket can raise an OSError on shutdown for some builds of netfilterqueue
|
|
except (socket.timeout, OSError):
|
|
pass
|
|
except Exception:
|
|
self.logger.exception("Exception out of run_nfqueue() (direction=%s)", direction)
|
|
|
|
def configure_iptables(self, remove=False):
|
|
"""
|
|
Handles setting up ipables for this run
|
|
"""
|
|
self.logger.debug("Configuring iptables rules")
|
|
|
|
# Switch source and destination ports if this evaluator is to run from the server side
|
|
port1, port2 = "sport", "dport"
|
|
if not self.server_side:
|
|
port1, port2 = "dport", "sport"
|
|
|
|
out_chain = "OUTPUT"
|
|
in_chain = "INPUT"
|
|
|
|
# Switch whether the command should add or delete the rules
|
|
add_or_remove = "A"
|
|
if remove:
|
|
add_or_remove = "D"
|
|
cmds = []
|
|
for proto in ["tcp", "udp"]:
|
|
# Need to change the match rule if multiple ports are specified
|
|
# Default match policy is the protocol
|
|
match_policy = proto
|
|
# Don't need to do any checking on the port since the iptables command can error, closing the engine
|
|
# Change server port to str for backwards compatibility calling engine directly with an int
|
|
if any(x in str(self.server_port) for x in [":", ","]):
|
|
match_policy = "multiport"
|
|
|
|
cmds += ["iptables -%s %s -p %s --match %s --%s %s -j NFQUEUE --queue-num %d" %
|
|
(add_or_remove, out_chain, proto, match_policy, port1, self.server_port, self.out_queue_num),
|
|
"iptables -%s %s -p %s --match %s --%s %s -j NFQUEUE --queue-num %d" %
|
|
(add_or_remove, in_chain, proto, match_policy, port2, self.server_port, self.in_queue_num)]
|
|
# If this machine is acting as a middlebox, we need to add the same rules again
|
|
# in the opposite direction so that we can pass packets back and forth
|
|
if self.forwarder:
|
|
cmds += ["iptables -%s %s -p %s --match %s --%s %s -j NFQUEUE --queue-num %d" %
|
|
(add_or_remove, out_chain, proto, match_policy, port2, self.server_port, self.out_queue_num),
|
|
"iptables -%s %s -p %s --match %s --%s %s -j NFQUEUE --queue-num %d" %
|
|
(add_or_remove, in_chain, proto, match_policy, port1, self.server_port, self.in_queue_num)]
|
|
|
|
for cmd in cmds:
|
|
self.logger.debug(cmd)
|
|
# If we're logging at debug mode, keep stderr/stdout piped to us
|
|
# Otherwise, pipe them both to DEVNULL
|
|
if actions.utils.get_console_log_level() == "debug":
|
|
subprocess.check_call(cmd.split(), timeout=60)
|
|
else:
|
|
subprocess.check_call(cmd.split(), stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, timeout=60)
|
|
return cmds
|
|
|
|
def initialize_nfqueue(self):
|
|
"""
|
|
Initializes the nfqueue for input and output forests.
|
|
"""
|
|
self.logger.debug("Engine created with strategy %s (ID %s) to port %s",
|
|
str(self.strategy).strip(), self.environment_id, self.server_port)
|
|
self.configure_iptables()
|
|
|
|
self.out_nfqueue_started = False
|
|
self.in_nfqueue_started = False
|
|
self.running_nfqueue = True
|
|
# Create our NFQueues
|
|
self.out_nfqueue = netfilterqueue.NetfilterQueue()
|
|
self.in_nfqueue = netfilterqueue.NetfilterQueue()
|
|
# Bind them
|
|
self.out_nfqueue.bind(self.out_queue_num, self.out_callback)
|
|
self.in_nfqueue.bind(self.in_queue_num, self.in_callback)
|
|
# Create our nfqueue sockets to allow for non-blocking usage
|
|
self.out_nfqueue_socket = socket.fromfd(self.out_nfqueue.get_fd(),
|
|
socket.AF_UNIX,
|
|
socket.SOCK_STREAM)
|
|
self.in_nfqueue_socket = socket.fromfd(self.in_nfqueue.get_fd(),
|
|
socket.AF_UNIX,
|
|
socket.SOCK_STREAM)
|
|
# Create our handling threads for packets
|
|
self.out_nfqueue_thread = threading.Thread(target=self.run_nfqueue,
|
|
args=(self.out_nfqueue, self.out_nfqueue_socket, "out"))
|
|
|
|
self.in_nfqueue_thread = threading.Thread(target=self.run_nfqueue,
|
|
args=(self.in_nfqueue, self.in_nfqueue_socket, "in"))
|
|
# Start each thread
|
|
self.in_nfqueue_thread.start()
|
|
self.out_nfqueue_thread.start()
|
|
|
|
maxwait = 100 # 100 time steps of 0.01 seconds for a max wait of 10 seconds
|
|
i = 0
|
|
# Give NFQueue time to startup, since it's running in background threads
|
|
# Block the main thread until this is done
|
|
while (not self.in_nfqueue_started or not self.out_nfqueue_started) and i < maxwait:
|
|
time.sleep(0.1)
|
|
i += 1
|
|
self.logger.debug("NFQueue Initialized after %d", int(i))
|
|
|
|
def shutdown_nfqueue(self):
|
|
"""
|
|
Shutdown nfqueue.
|
|
"""
|
|
self.logger.debug("Shutting down NFQueue")
|
|
self.out_nfqueue_started = False
|
|
self.in_nfqueue_started = False
|
|
self.running_nfqueue = False
|
|
# Give the handlers two seconds to leave the callbacks before we forcibly unbind
|
|
# the queues.
|
|
time.sleep(2)
|
|
if self.in_nfqueue:
|
|
self.in_nfqueue.unbind()
|
|
if self.out_nfqueue:
|
|
self.out_nfqueue.unbind()
|
|
self.configure_iptables(remove=True)
|
|
self.socket.close()
|
|
self.out_nfqueue_socket.close()
|
|
self.in_nfqueue_socket.close()
|
|
|
|
packets_path = os.path.join(BASEPATH,
|
|
self.output_directory,
|
|
"packets",
|
|
"original_%s.pcap" % self.environment_id)
|
|
|
|
# Write to disk the original packets we captured
|
|
if self.save_seen_packets:
|
|
wrpcap(packets_path, [p.packet for p in self.seen_packets])
|
|
|
|
# If the engine exits before it initializes for any reason, these threads may not be set
|
|
# Only join them if they are defined
|
|
if self.out_nfqueue_thread:
|
|
self.out_nfqueue_thread.join()
|
|
if self.in_nfqueue_thread:
|
|
self.in_nfqueue_thread.join()
|
|
|
|
# Shutdown the logger
|
|
actions.utils.close_logger(self.logger)
|
|
|
|
def out_callback(self, nfpacket):
|
|
"""
|
|
Callback bound to the outgoing nfqueue rule to run the outbound strategy.
|
|
"""
|
|
if not self.running_nfqueue:
|
|
return
|
|
|
|
packet = layers.packet.Packet(IP(nfpacket.get_payload()))
|
|
self.logger.debug("Received outbound packet %s", str(packet))
|
|
|
|
# Record this packet for a .pacp later
|
|
if self.save_seen_packets:
|
|
self.seen_packets.append(packet)
|
|
|
|
# Drop the packet in NFQueue so the strategy can handle it
|
|
nfpacket.drop()
|
|
|
|
self.handle_packet(packet)
|
|
|
|
def handle_packet(self, packet):
|
|
"""
|
|
Handles processing an outbound packet through the engine.
|
|
"""
|
|
packets_to_send = self.strategy.act_on_packet(packet, self.logger, direction="out")
|
|
if packets_to_send:
|
|
self.overhead += (len(packets_to_send) - 1)
|
|
|
|
# Send all of the packets we've collected to send
|
|
for out_packet in packets_to_send:
|
|
# If the strategy requested us to sleep before sending this packet, do so here
|
|
if out_packet.sleep:
|
|
# We can't block the main sending thread, so instead spin off a new thread to handle sleeping
|
|
threading.Thread(target=self.delayed_send, args=(out_packet, out_packet.sleep)).start()
|
|
else:
|
|
self.mysend(out_packet)
|
|
|
|
def in_callback(self, nfpacket):
|
|
"""
|
|
Callback bound to the incoming nfqueue rule. Since we can't
|
|
manually send packets to ourself, process the given packet here.
|
|
"""
|
|
if not self.running_nfqueue:
|
|
return
|
|
packet = layers.packet.Packet(IP(nfpacket.get_payload()))
|
|
|
|
if self.save_seen_packets:
|
|
self.seen_packets.append(packet)
|
|
|
|
self.logger.debug("Received packet: %s", str(packet))
|
|
|
|
# Run the given strategy
|
|
packets = self.strategy.act_on_packet(packet, self.logger, direction="in")
|
|
|
|
# GFW will send RA packets to disrupt a TCP stream
|
|
if packet.haslayer("TCP") and packet.get("TCP", "flags") == "RA":
|
|
self.censorship_detected = True
|
|
|
|
# Branching is disabled for the in direction, so we can only ever get
|
|
# back 1 or 0 packets. If zero, drop the packet.
|
|
if not packets:
|
|
nfpacket.drop()
|
|
return
|
|
|
|
if self.forwarder:
|
|
nfpacket.drop()
|
|
self.handle_packet(packet)
|
|
return
|
|
|
|
# Otherwise, overwrite this packet with the packet the action trees gave back
|
|
nfpacket.set_payload(bytes(packets[0]))
|
|
|
|
# If the strategy requested us to sleep before accepting on this packet, do so here
|
|
if packets[0].sleep:
|
|
time.sleep(packets[0].sleep)
|
|
|
|
# Accept the modified packet
|
|
nfpacket.accept()
|
|
|
|
|
|
def get_args():
|
|
"""
|
|
Sets up argparse and collects arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser(description='The engine that runs a given strategy.')
|
|
# Store a string, not int, in case of port ranges/lists. The iptables command checks the port var
|
|
parser.add_argument('--server-port', action='store', required=True)
|
|
parser.add_argument('--environment-id', action='store', help="ID of the current strategy under test")
|
|
parser.add_argument('--sender-ip', action='store', help="IP address of sending machine, used for NAT")
|
|
parser.add_argument('--routing-ip', action='store', help="Public IP of this machine, used for NAT")
|
|
parser.add_argument('--forward-ip', action='store', help="IP address to forward traffic to")
|
|
parser.add_argument('--strategy', action='store', help="Strategy to deploy")
|
|
parser.add_argument('--output-directory', default="trials", action='store', help="Where to output logs, captures, and results. Defaults to trials/.")
|
|
parser.add_argument('--forward', action='store_true', help='Enable if this is forwarding traffic')
|
|
parser.add_argument('--server-side', action='store_true', help='Enable if this is running on the server side')
|
|
parser.add_argument('--log', action='store', default="debug",
|
|
choices=("debug", "info", "warning", "critical", "error"),
|
|
help="Sets the log level for the console")
|
|
parser.add_argument('--file-log', action='store', default="debug",
|
|
choices=("debug", "info", "warning", "critical", "error"),
|
|
help="Sets the log level for the log file")
|
|
parser.add_argument('--no-save-packets', action='store_false', help='Disables recording captured packets')
|
|
parser.add_argument("--in-queue-num", action="store", help="NfQueue number for incoming packets", default=1, type=int)
|
|
parser.add_argument("--out-queue-num", action="store", help="NfQueue number for outgoing packets", default=None, type=int)
|
|
parser.add_argument("--demo-mode", action='store_true', help="Replaces all IPs with dummy IPs in log messages so as not to reveal sensitive IP addresses")
|
|
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
def main(args):
|
|
"""
|
|
Kicks off the engine with the given arguments.
|
|
"""
|
|
nat_config = {}
|
|
if args.get("sender_ip") and args.get("routing_ip") and args.get("forward_ip"):
|
|
nat_config = {"sender_ip": args["sender_ip"],
|
|
"routing_ip": args["routing_ip"],
|
|
"forward_ip": args["forward_ip"]}
|
|
|
|
with Engine(args["server_port"],
|
|
args["strategy"],
|
|
environment_id=args["environment_id"],
|
|
server_side=args["server_side"],
|
|
output_directory=args["output_directory"],
|
|
forwarder=nat_config,
|
|
log_level=args["log"],
|
|
file_log_level=args["file_log"],
|
|
in_queue_num=args["in_queue_num"],
|
|
out_queue_num=args["out_queue_num"],
|
|
save_seen_packets=args["no_save_packets"],
|
|
demo_mode=args["demo_mode"]):
|
|
|
|
threading.Event().wait() # Wait forever
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(vars(get_args()))
|
|
|