2019-11-15 03:38:40 +01:00
"""
2020-05-17 16:15:16 +02:00
Geneva Strategy Engine
2019-11-15 03:38:40 +01:00
2020-05-17 16:15:16 +02:00
Given a strategy and a server port , the engine configures NFQueue to capture all traffic
into and out of that port so the strategy can run over the connection .
2019-11-15 03:38:40 +01:00
"""
import argparse
import logging
logging . getLogger ( " scapy.runtime " ) . setLevel ( logging . ERROR )
import os
import socket
import subprocess
import threading
import time
2020-05-17 16:15:16 +02:00
try :
import netfilterqueue
except ImportError :
pass
2019-11-15 03:38:40 +01:00
from scapy . layers . inet import IP
from scapy . utils import wrpcap
from scapy . config import conf
socket . setdefaulttimeout ( 1 )
2020-06-24 14:20:51 +02:00
import layers . packet
2019-11-15 03:38:40 +01:00
import actions . strategy
import actions . utils
BASEPATH = os . path . dirname ( os . path . abspath ( __file__ ) )
class Engine ( ) :
2020-05-17 16:15:16 +02:00
def __init__ ( self , server_port ,
string_strategy ,
environment_id = None ,
server_side = False ,
output_directory = " trials " ,
log_level = " info " ,
enabled = True ,
in_queue_num = None ,
out_queue_num = None ,
forwarder = None ,
2020-07-06 22:43:24 +02:00
save_seen_packets = True ,
demo_mode = False ) :
2020-05-17 16:15:16 +02:00
"""
Args :
2021-09-05 16:58:38 +02:00
server_port ( str ) : The port ( s ) the engine will monitor
2020-05-17 16:15:16 +02:00
string_strategy ( str ) : String representation of strategy DNA to apply to the network
environment_id ( str , None ) : ID of the given strategy
server_side ( bool , False ) : Whether or not the engine is running on the server side of the connection
output_directory ( str , ' trials ' ) : The path logs and packet captures should be written to
enabled ( bool , True ) : whether or not the engine should be started ( used for conditional context managers )
in_queue_num ( int , None ) : override the netfilterqueue number used for inbound packets . Used for running multiple instances of the engine at the same time . Defaults to None .
out_queue_num ( int , None ) : override the netfilterqueue number used for outbound packets . Used for running multiple instances of the engine at the same time . Defaults to None .
save_seen_packets ( bool , True ) : whether or not the engine should record and save packets it sees while running . Defaults to True , but it is recommended this be disabled on higher throughput systems .
2020-07-06 22:43:24 +02:00
demo_mode ( bool , False ) : whether to replace IPs in log messages with random IPs to hide sensitive IP addresses .
2020-05-17 16:15:16 +02:00
"""
2019-11-15 03:38:40 +01:00
self . server_port = server_port
2020-05-17 16:15:16 +02:00
# whether the engine is running on the server or client side.
# this affects which direction each out/in tree is attached to the
# source and destination port.
self . server_side = server_side
self . overhead = 0
2019-11-15 03:38:40 +01:00
self . seen_packets = [ ]
2020-05-17 16:15:16 +02:00
self . environment_id = environment_id
self . forwarder = forwarder
self . save_seen_packets = save_seen_packets
if forwarder :
self . sender_ip = forwarder [ " sender_ip " ]
self . routing_ip = forwarder [ " routing_ip " ]
self . forward_ip = forwarder [ " forward_ip " ]
2019-11-15 03:38:40 +01:00
# Set up the directory and ID for logging
2020-05-17 16:15:16 +02:00
if not output_directory :
self . output_directory = " trials "
else :
self . output_directory = output_directory
actions . utils . setup_dirs ( self . output_directory )
2019-11-15 03:38:40 +01:00
if not environment_id :
2020-05-17 16:15:16 +02:00
self . environment_id = actions . utils . get_id ( )
2019-11-15 03:38:40 +01:00
# Set up a logger
self . logger = actions . utils . get_logger ( BASEPATH ,
2020-05-17 16:15:16 +02:00
self . output_directory ,
2019-11-15 03:38:40 +01:00
__name__ ,
" engine " ,
2020-05-17 16:15:16 +02:00
self . environment_id ,
2020-07-01 20:41:06 +02:00
log_level = log_level ,
demo_mode = demo_mode )
2020-05-17 16:15:16 +02:00
# Warn if these are not provided
if not environment_id :
self . logger . warning ( " No environment ID given, one has been generated ( %s ) " , self . environment_id )
if not output_directory :
self . logger . warning ( " No output directory specified, using the default ( %s ) " % self . output_directory )
2019-11-15 03:38:40 +01:00
# Used for conditional context manager usage
2020-05-17 16:15:16 +02:00
self . enabled = enabled
# Parse the given strategy
2019-11-15 03:38:40 +01:00
self . strategy = actions . utils . parse ( string_strategy , self . logger )
2020-05-17 16:15:16 +02:00
2019-11-15 03:38:40 +01:00
# Setup variables used by the NFQueue system
2020-05-17 16:15:16 +02:00
self . in_queue_num = in_queue_num or 1
self . out_queue_num = out_queue_num or self . in_queue_num + 1
2019-11-15 03:38:40 +01:00
self . out_nfqueue_started = False
self . in_nfqueue_started = False
self . running_nfqueue = False
self . out_nfqueue = None
self . in_nfqueue = None
self . out_nfqueue_socket = None
self . in_nfqueue_socket = None
self . out_nfqueue_thread = None
self . in_nfqueue_thread = None
self . censorship_detected = False
# Specifically define an L3Socket to send our packets. This is an optimization
# for scapy to send packets more quickly than using just send(), as under the hood
# send() creates and then destroys a socket each time, imparting a large amount
# of overhead.
self . socket = conf . L3socket ( iface = actions . utils . get_interface ( ) )
def __enter__ ( self ) :
"""
Allows the engine to be used as a context manager ; simply launches the
2020-05-17 16:15:16 +02:00
engine if enabled .
2019-11-15 03:38:40 +01:00
"""
2020-05-17 16:15:16 +02:00
if self . enabled :
self . initialize_nfqueue ( )
2019-11-15 03:38:40 +01:00
return self
def __exit__ ( self , exc_type , exc_value , tb ) :
"""
Allows the engine to be used as a context manager ; simply stops the engine
2020-05-17 16:15:16 +02:00
if enabled .
2019-11-15 03:38:40 +01:00
"""
2020-05-17 16:15:16 +02:00
if self . enabled :
self . shutdown_nfqueue ( )
def do_nat ( self , packet ) :
"""
NATs packet : changes the sources and destination IP if it matches the
configured route , and clears the checksums for recalculating
Args :
2020-06-24 14:20:51 +02:00
packet ( layers . packet . Packet ) : packet to modify before sending
2020-05-17 16:15:16 +02:00
Returns :
2020-06-24 14:20:51 +02:00
layers . packet . Packet : the modified packet
2020-05-17 16:15:16 +02:00
"""
if packet [ " IP " ] . src == self . sender_ip :
packet [ " IP " ] . dst = self . forward_ip
packet [ " IP " ] . src = self . routing_ip
del packet [ " TCP " ] . chksum
del packet [ " IP " ] . chksum
elif packet [ " IP " ] . src == self . forward_ip :
packet [ " IP " ] . dst = self . sender_ip
packet [ " IP " ] . src = self . routing_ip
del packet [ " TCP " ] . chksum
del packet [ " IP " ] . chksum
return packet
2019-11-15 03:38:40 +01:00
def mysend ( self , packet ) :
"""
Helper scapy sending method . Expects a Geneva Packet input .
"""
try :
2020-05-17 16:15:16 +02:00
if self . forwarder :
self . logger . debug ( " NAT-ing packet. " )
packet = self . do_nat ( packet )
2019-11-15 03:38:40 +01:00
self . logger . debug ( " Sending packet %s " , str ( packet ) )
self . socket . send ( packet . packet )
except Exception :
self . logger . exception ( " Error in engine mysend. " )
def delayed_send ( self , packet , delay ) :
"""
Method to be started by a thread to delay the sending of a packet without blocking the main thread .
"""
self . logger . debug ( " Sleeping for %f seconds. " % delay )
time . sleep ( delay )
self . mysend ( packet )
def run_nfqueue ( self , nfqueue , nfqueue_socket , direction ) :
"""
Handles running the outbound nfqueue socket with the socket timeout .
"""
try :
while self . running_nfqueue :
try :
if direction == " out " :
self . out_nfqueue_started = True
else :
self . in_nfqueue_started = True
nfqueue . run_socket ( nfqueue_socket )
2020-05-17 16:15:16 +02:00
# run_socket can raise an OSError on shutdown for some builds of netfilterqueue
except ( socket . timeout , OSError ) :
2019-11-15 03:38:40 +01:00
pass
except Exception :
self . logger . exception ( " Exception out of run_nfqueue() (direction= %s ) " , direction )
def configure_iptables ( self , remove = False ) :
"""
Handles setting up ipables for this run
"""
self . logger . debug ( " Configuring iptables rules " )
2020-05-17 16:15:16 +02:00
# Switch source and destination ports if this evaluator is to run from the server side
port1 , port2 = " sport " , " dport "
if not self . server_side :
port1 , port2 = " dport " , " sport "
2019-11-15 03:38:40 +01:00
out_chain = " OUTPUT "
in_chain = " INPUT "
# Switch whether the command should add or delete the rules
add_or_remove = " A "
if remove :
add_or_remove = " D "
cmds = [ ]
for proto in [ " tcp " , " udp " ] :
2021-09-02 19:42:15 +02:00
# Need to change the match rule if multiple ports are specified
# Default match policy is the protocol
match_policy = proto
2021-09-07 13:57:47 +02:00
# Don't need to do any checking on the port since the iptables command can error, closing the engine
# Change server port to str for backwards compatibility calling engine directly with an int
if any ( x in str ( self . server_port ) for x in [ " : " , " , " ] ) :
2021-09-02 19:42:15 +02:00
match_policy = " multiport "
cmds + = [ " iptables - %s %s -p %s --match %s -- %s %s -j NFQUEUE --queue-num %d " %
( add_or_remove , out_chain , proto , match_policy , port1 , self . server_port , self . out_queue_num ) ,
" iptables - %s %s -p %s --match %s -- %s %s -j NFQUEUE --queue-num %d " %
( add_or_remove , in_chain , proto , match_policy , port2 , self . server_port , self . in_queue_num ) ]
2020-05-17 16:15:16 +02:00
# If this machine is acting as a middlebox, we need to add the same rules again
# in the opposite direction so that we can pass packets back and forth
if self . forwarder :
2021-09-02 19:42:15 +02:00
cmds + = [ " iptables - %s %s -p %s --match %s -- %s %s -j NFQUEUE --queue-num %d " %
( add_or_remove , out_chain , proto , match_policy , port2 , self . server_port , self . out_queue_num ) ,
" iptables - %s %s -p %s --match %s -- %s %s -j NFQUEUE --queue-num %d " %
( add_or_remove , in_chain , proto , match_policy , port1 , self . server_port , self . in_queue_num ) ]
2019-11-15 03:38:40 +01:00
for cmd in cmds :
self . logger . debug ( cmd )
2020-05-17 16:15:16 +02:00
# If we're logging at debug mode, keep stderr/stdout piped to us
2019-11-15 03:38:40 +01:00
# Otherwise, pipe them both to DEVNULL
2020-05-17 16:15:16 +02:00
if actions . utils . get_console_log_level ( ) == " debug " :
2019-11-15 03:38:40 +01:00
subprocess . check_call ( cmd . split ( ) , timeout = 60 )
else :
subprocess . check_call ( cmd . split ( ) , stderr = subprocess . DEVNULL , stdout = subprocess . DEVNULL , timeout = 60 )
return cmds
def initialize_nfqueue ( self ) :
"""
Initializes the nfqueue for input and output forests .
"""
self . logger . debug ( " Engine created with strategy %s (ID %s ) to port %s " ,
str ( self . strategy ) . strip ( ) , self . environment_id , self . server_port )
self . configure_iptables ( )
self . out_nfqueue_started = False
self . in_nfqueue_started = False
self . running_nfqueue = True
# Create our NFQueues
self . out_nfqueue = netfilterqueue . NetfilterQueue ( )
self . in_nfqueue = netfilterqueue . NetfilterQueue ( )
# Bind them
2020-05-17 16:15:16 +02:00
self . out_nfqueue . bind ( self . out_queue_num , self . out_callback )
self . in_nfqueue . bind ( self . in_queue_num , self . in_callback )
2019-11-15 03:38:40 +01:00
# Create our nfqueue sockets to allow for non-blocking usage
self . out_nfqueue_socket = socket . fromfd ( self . out_nfqueue . get_fd ( ) ,
socket . AF_UNIX ,
socket . SOCK_STREAM )
self . in_nfqueue_socket = socket . fromfd ( self . in_nfqueue . get_fd ( ) ,
socket . AF_UNIX ,
socket . SOCK_STREAM )
# Create our handling threads for packets
self . out_nfqueue_thread = threading . Thread ( target = self . run_nfqueue ,
args = ( self . out_nfqueue , self . out_nfqueue_socket , " out " ) )
self . in_nfqueue_thread = threading . Thread ( target = self . run_nfqueue ,
args = ( self . in_nfqueue , self . in_nfqueue_socket , " in " ) )
# Start each thread
self . in_nfqueue_thread . start ( )
self . out_nfqueue_thread . start ( )
maxwait = 100 # 100 time steps of 0.01 seconds for a max wait of 10 seconds
i = 0
# Give NFQueue time to startup, since it's running in background threads
# Block the main thread until this is done
while ( not self . in_nfqueue_started or not self . out_nfqueue_started ) and i < maxwait :
time . sleep ( 0.1 )
i + = 1
2020-07-01 20:34:49 +02:00
self . logger . debug ( " NFQueue Initialized after %d " , int ( i ) )
2019-11-15 03:38:40 +01:00
def shutdown_nfqueue ( self ) :
"""
Shutdown nfqueue .
"""
self . logger . debug ( " Shutting down NFQueue " )
self . out_nfqueue_started = False
self . in_nfqueue_started = False
self . running_nfqueue = False
# Give the handlers two seconds to leave the callbacks before we forcibly unbind
# the queues.
time . sleep ( 2 )
if self . in_nfqueue :
self . in_nfqueue . unbind ( )
if self . out_nfqueue :
self . out_nfqueue . unbind ( )
self . configure_iptables ( remove = True )
2020-05-17 16:15:16 +02:00
self . socket . close ( )
self . out_nfqueue_socket . close ( )
self . in_nfqueue_socket . close ( )
2019-11-15 03:38:40 +01:00
packets_path = os . path . join ( BASEPATH ,
self . output_directory ,
" packets " ,
" original_ %s .pcap " % self . environment_id )
# Write to disk the original packets we captured
2020-05-17 16:15:16 +02:00
if self . save_seen_packets :
wrpcap ( packets_path , [ p . packet for p in self . seen_packets ] )
2019-11-15 03:38:40 +01:00
# If the engine exits before it initializes for any reason, these threads may not be set
# Only join them if they are defined
if self . out_nfqueue_thread :
self . out_nfqueue_thread . join ( )
if self . in_nfqueue_thread :
self . in_nfqueue_thread . join ( )
# Shutdown the logger
actions . utils . close_logger ( self . logger )
def out_callback ( self , nfpacket ) :
"""
Callback bound to the outgoing nfqueue rule to run the outbound strategy .
"""
if not self . running_nfqueue :
return
2020-06-24 14:20:51 +02:00
packet = layers . packet . Packet ( IP ( nfpacket . get_payload ( ) ) )
2019-11-15 03:38:40 +01:00
self . logger . debug ( " Received outbound packet %s " , str ( packet ) )
# Record this packet for a .pacp later
2020-05-17 16:15:16 +02:00
if self . save_seen_packets :
self . seen_packets . append ( packet )
2019-11-15 03:38:40 +01:00
# Drop the packet in NFQueue so the strategy can handle it
nfpacket . drop ( )
self . handle_packet ( packet )
def handle_packet ( self , packet ) :
"""
Handles processing an outbound packet through the engine .
"""
packets_to_send = self . strategy . act_on_packet ( packet , self . logger , direction = " out " )
2020-05-17 16:15:16 +02:00
if packets_to_send :
self . overhead + = ( len ( packets_to_send ) - 1 )
2019-11-15 03:38:40 +01:00
# Send all of the packets we've collected to send
for out_packet in packets_to_send :
# If the strategy requested us to sleep before sending this packet, do so here
if out_packet . sleep :
# We can't block the main sending thread, so instead spin off a new thread to handle sleeping
threading . Thread ( target = self . delayed_send , args = ( out_packet , out_packet . sleep ) ) . start ( )
else :
self . mysend ( out_packet )
def in_callback ( self , nfpacket ) :
"""
Callback bound to the incoming nfqueue rule . Since we can ' t
manually send packets to ourself , process the given packet here .
"""
if not self . running_nfqueue :
return
2020-06-24 14:20:51 +02:00
packet = layers . packet . Packet ( IP ( nfpacket . get_payload ( ) ) )
2019-11-15 03:38:40 +01:00
2020-05-17 16:15:16 +02:00
if self . save_seen_packets :
self . seen_packets . append ( packet )
2019-11-15 03:38:40 +01:00
self . logger . debug ( " Received packet: %s " , str ( packet ) )
# Run the given strategy
packets = self . strategy . act_on_packet ( packet , self . logger , direction = " in " )
2020-05-17 16:15:16 +02:00
# GFW will send RA packets to disrupt a TCP stream
2019-11-15 03:38:40 +01:00
if packet . haslayer ( " TCP " ) and packet . get ( " TCP " , " flags " ) == " RA " :
self . censorship_detected = True
# Branching is disabled for the in direction, so we can only ever get
# back 1 or 0 packets. If zero, drop the packet.
if not packets :
nfpacket . drop ( )
return
2020-05-17 16:15:16 +02:00
if self . forwarder :
nfpacket . drop ( )
self . handle_packet ( packet )
return
2019-11-15 03:38:40 +01:00
# Otherwise, overwrite this packet with the packet the action trees gave back
nfpacket . set_payload ( bytes ( packets [ 0 ] ) )
# If the strategy requested us to sleep before accepting on this packet, do so here
if packets [ 0 ] . sleep :
time . sleep ( packets [ 0 ] . sleep )
# Accept the modified packet
nfpacket . accept ( )
def get_args ( ) :
"""
Sets up argparse and collects arguments .
"""
parser = argparse . ArgumentParser ( description = ' The engine that runs a given strategy. ' )
2021-09-02 19:42:15 +02:00
# Store a string, not int, in case of port ranges/lists. The iptables command checks the port var
parser . add_argument ( ' --server-port ' , action = ' store ' , required = True )
2020-05-17 16:15:16 +02:00
parser . add_argument ( ' --environment-id ' , action = ' store ' , help = " ID of the current strategy under test " )
parser . add_argument ( ' --sender-ip ' , action = ' store ' , help = " IP address of sending machine, used for NAT " )
parser . add_argument ( ' --routing-ip ' , action = ' store ' , help = " Public IP of this machine, used for NAT " )
parser . add_argument ( ' --forward-ip ' , action = ' store ' , help = " IP address to forward traffic to " )
2019-11-15 03:38:40 +01:00
parser . add_argument ( ' --strategy ' , action = ' store ' , help = " Strategy to deploy " )
parser . add_argument ( ' --output-directory ' , default = " trials " , action = ' store ' , help = " Where to output logs, captures, and results. Defaults to trials/. " )
2020-05-17 16:15:16 +02:00
parser . add_argument ( ' --forward ' , action = ' store_true ' , help = ' Enable if this is forwarding traffic ' )
parser . add_argument ( ' --server-side ' , action = ' store_true ' , help = ' Enable if this is running on the server side ' )
2019-11-15 03:38:40 +01:00
parser . add_argument ( ' --log ' , action = ' store ' , default = " debug " ,
choices = ( " debug " , " info " , " warning " , " critical " , " error " ) ,
help = " Sets the log level " )
2020-05-17 16:15:16 +02:00
parser . add_argument ( ' --no-save-packets ' , action = ' store_false ' , help = ' Disables recording captured packets ' )
parser . add_argument ( " --in-queue-num " , action = " store " , help = " NfQueue number for incoming packets " , default = 1 , type = int )
parser . add_argument ( " --out-queue-num " , action = " store " , help = " NfQueue number for outgoing packets " , default = None , type = int )
2020-07-01 20:41:06 +02:00
parser . add_argument ( " --demo-mode " , action = ' store_true ' , help = " Replaces all IPs with dummy IPs in log messages so as not to reveal sensitive IP addresses " )
2019-11-15 03:38:40 +01:00
args = parser . parse_args ( )
return args
def main ( args ) :
"""
Kicks off the engine with the given arguments .
"""
2022-07-27 18:54:53 +02:00
nat_config = { }
if args . get ( " sender_ip " ) and args . get ( " routing_ip " ) and args . get ( " forward_ip " ) :
nat_config = { " sender_ip " : args [ " sender_ip " ] ,
" routing_ip " : args [ " routing_ip " ] ,
" forward_ip " : args [ " forward_ip " ] }
with Engine ( args [ " server_port " ] ,
args [ " strategy " ] ,
environment_id = args [ " environment_id " ] ,
server_side = args [ " server_side " ] ,
output_directory = args [ " output_directory " ] ,
forwarder = nat_config ,
log_level = args [ " log " ] ,
in_queue_num = args [ " in_queue_num " ] ,
out_queue_num = args [ " out_queue_num " ] ,
save_seen_packets = args [ " no_save_packets " ] ,
demo_mode = args [ " demo_mode " ] ) as eng :
2019-11-15 03:38:40 +01:00
eng . initialize_nfqueue ( )
2022-07-27 18:54:53 +02:00
threading . Event ( ) . wait ( ) # Wait forever
2019-11-15 03:38:40 +01:00
if __name__ == " __main__ " :
main ( vars ( get_args ( ) ) )