geneva/evaluator.py

1166 lines
50 KiB
Python
Raw Permalink Normal View History

"""
The Evaluator is charged with evaluating a given strategy and assigning a numerical fitness metric to it.
"""
import argparse
import copy
import logging
import multiprocessing
import os
import random
import socket
import subprocess
import sys
import threading
import time
import re
import warnings
import requests
import urllib3
import actions.utils
import censors.censor_driver
# Suppress unfixed Paramiko warnings (see Paramiko issue #1386)
warnings.filterwarnings(action='ignore',module='.*paramiko.*')
# Placeholder for a docker import (see below why we cannot import docker here)
docker = None
BASEPATH = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = BASEPATH
class Evaluator():
def __init__(self, command, logger):
"""
Initialize the global evaluator for this evolution.
Args:
command (list): sys.argv or list of arguments
logger (:obj:`logging.Logger`): logger passed in from the main driver to log from
"""
self.args = get_args(command)
self.test_plugin = self.args["test_type"]
assert self.test_plugin, "Cannot import an empty plugin"
self.public_ip = self.args.get("public_ip", "")
self.external_client = self.args["external_client"]
self.censor = self.args.get("censor")
# If there is no external client defined and no internal test setup, default --external-server to True
if not self.external_client and not self.censor:
self.args["external_server"] = True
self.external_server = self.args["external_server"]
# If there is an external client connecting to us, override the server with our public ip
if not self.external_server and self.external_client:
assert self.args.get("public_ip", ""), "Cannot use an external client to this server without specifying the public IP."
self.public_ip = self.args.get("public_ip", "")
worker = actions.utils.get_worker(self.public_ip, logger)
if worker:
self.public_ip = worker["ip"]
self.args.update({'server': self.public_ip})
command += ["--server", self.public_ip]
self.run_canary_phase = True
self.client_args = copy.deepcopy(self.args)
self.server_args = copy.deepcopy(self.args)
self.client_cls = None
self.server_cls = None
self.plugin = None
self.override_evaluation = False
# Plugin may optionally override the strategy evaluation for a single individual or the entire evaluation
try:
_, plugin_cls = actions.utils.import_plugin(self.test_plugin, "plugin")
parsed_args = plugin_cls.get_args(command)
self.args.update({k:v for k,v in parsed_args.items() if v or (not v and k not in self.args)})
self.plugin = plugin_cls(self.args)
# Disable the canary phase if the plugin will override the default evaluation logic
self.run_canary_phase = not self.plugin.override_evaluation
self.override_evaluation = self.plugin.override_evaluation
except ImportError:
pass
self.client_cls = collect_plugin(self.test_plugin, "client", command, self.args, self.client_args)
self.server_cls = collect_plugin(self.test_plugin, "server", command, self.args, self.server_args)
self.workers = self.args["workers"]
self.stop = False
self.skip_empty = not self.args["no_skip_empty"]
self.output_directory = self.args["output_directory"]
self.routing_ip = self.args.get("routing_ip", None)
self.runs = self.args.get("runs", 1)
self.fitness_by = self.args.get("fitness_by", "avg")
self.forwarder = {}
# If NAT options were specified to train as a middle box, set up the engine's
# NAT configuration
self.act_as_middlebox = self.args.get("act_as_middlebox")
if self.act_as_middlebox:
assert self.args.get("forward_ip")
assert self.args.get("sender_ip")
assert self.args.get("routing_ip")
self.forwarder["forward_ip"] = self.args["forward_ip"]
self.forwarder["sender_ip"] = self.args["sender_ip"]
self.forwarder["routing_ip"] = self.args["routing_ip"]
# Legacy environments storage
self.environments = []
if not os.path.exists(self.output_directory):
os.mkdir(self.output_directory)
# Only enable docker if we're going to use an internal censor
self.use_docker = False
if self.args["censor"]:
import docker
self.use_docker = True
self.docker_client = docker.from_env()
self.apiclient = docker.APIClient()
self.logger = logger
def evaluate(self, ind_list):
"""
Perform the overall fitness evaluation driving.
Args:
ind_list (list): list of individuals to evaluate
Returns:
list: Population list after evaluation
"""
# Setup environment ids for each individual
self.assign_ids(ind_list)
# If the plugin has overridden default evaluation, call that here
if self.override_evaluation:
self.logger.debug("Beginning evaluation in plugin")
return self.plugin.evaluate(self.args, self, ind_list, self.logger)
if self.workers > 1 and self.use_docker:
# Chunk the population and test sites into smaller lists to hand to each worker
split = [ind_list[i::self.workers] for i in range(0, self.workers)]
procs = []
# Create workers
for i in range(0, len(split)):
if not split[i]:
continue
if self.use_docker:
try:
# Due to limitations in docker-py, it is not safe to build the containers in a multiprocessed
# setting. To handle this, build the environments ahead of time, and pass them to the workers to use.
environment = self.create_test_environment(i)
except (docker.errors.APIError, requests.exceptions.ConnectionError, urllib3.exceptions.ProtocolError):
self.logger.exception("Failed to create evaluator environment - is docker running?")
return
proc = multiprocessing.Process(target=self.worker, args=(split[i], str(i), environment))
proc.start()
procs.append(proc)
try:
# Join all the processes
for proc in procs:
proc.join()
except KeyboardInterrupt:
self.shutdown()
else:
environment = {}
if self.use_docker:
try:
environment = self.create_test_environment("main")
except (docker.errors.APIError, requests.exceptions.ConnectionError, urllib3.exceptions.ProtocolError):
self.logger.exception("Failed to create evaluator environment - is docker running?")
return
self.worker(ind_list, "main", environment)
for ind in ind_list:
self.read_fitness(ind)
self.terminate_docker()
return ind_list
def run_test(self, environment, ind):
"""
Conducts a test of a given individual in the environment.
Args:
environment (dict): Dictionary of environment variables
ind (:obj:`actions.strategy.Strategy`): A strategy object to test with
Returns:
tuple: (ind.environment_id, ind.fitness) environment ID of strategy and fitness
"""
# If skip_empty is enabled, this is not the canary, and the individual is empty,
# skip it
if len(ind) == 0 and ind.environment_id != "canary" and self.skip_empty:
self.logger.info("[skipped] Fitness %d: %s" % (-1000, str(ind)))
ind.fitness = -1000
return "skipped", -1000
fitnesses = []
# Run the strategy multiple times if requested
for run in range(0, self.runs):
self.logger.debug("Launching %s plugin (run %d) for %s" % (self.test_plugin, run + 1, str(ind)))
environment["id"] = ind.environment_id
self.client_args.update({"environment_id": ind.environment_id})
self.server_args.update({"environment_id": ind.environment_id})
if not self.args["server_side"]:
self.client_args.update({"strategy" : str(ind)})
self.server_args.update({"no_engine" : True})
else:
self.server_args.update({"strategy" : str(ind)})
self.client_args.update({"no_engine" : True})
# If we're using an internal censor, make sure the client is pointed at the server
if self.args["censor"]:
self.client_args.update({"server": environment["server"]["ip"]})
self.client_args.update({"wait_for_censor": True})
self.server_args.update({"wait_for_shutdown": True})
self.update_ports(environment)
try:
# If the plugin has overridden the below logic, run that plugin's version directly
if self.plugin:
self.logger.debug("Running standalone plugin.")
self.args.update({"strategy": str(ind)})
self.plugin.start(self.args, self, environment, ind, self.logger)
self.read_fitness(ind)
else:
self.logger.debug("Launching client and server directly.")
# If we're given a server to start, start it now
if self.server_cls and not self.external_server and not self.act_as_middlebox:
server = self.start_server(self.server_args, environment, self.logger)
fitness = self.run_client(self.client_args, environment, self.logger)
if self.server_cls and not self.external_server and not self.act_as_middlebox:
self.stop_server(environment, server)
self.read_fitness(ind)
# If the engine ran on the server side, ask that it punish fitness
if self.args["server_side"]:
ind.fitness = server.punish_fitness(ind.fitness, self.logger)
actions.utils.write_fitness(ind.fitness, self.output_directory, environment["id"])
except actions.utils.SkipStrategyException as exc:
self.logger.debug("Strategy evaluation ending.")
ind.fitness = exc.fitness
fitnesses.append(ind.fitness)
break
fitnesses.append(ind.fitness)
if self.runs > 1:
self.logger.debug("\t(%d/%d) Fitness %s: %s" % (run + 1, self.runs, str(ind.fitness), str(ind)))
self.logger.debug("Storing fitness of %s by: %s" % (fitnesses, self.fitness_by))
if self.fitness_by == "min":
ind.fitness = min(fitnesses)
elif self.fitness_by == "max":
ind.fitness = max(fitnesses)
elif self.fitness_by == "avg":
ind.fitness = round(sum(fitnesses)/len(fitnesses), 2)
actions.utils.write_fitness(ind.fitness, self.output_directory, environment["id"])
# Log the fitness
self.logger.info("[%s] Fitness %s: %s" % (ind.environment_id, str(ind.fitness), str(ind)))
return ind.environment_id, ind.fitness
def run_client(self, args, environment, logger):
"""
Runs the plugin client given the current configuration
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
Returns:
float: Fitness of individual
"""
fitness = None
if environment.get("remote"):
fitness = self.run_remote_client(args, environment, logger)
elif environment.get("docker"):
self.run_docker_client(args, environment, logger)
else:
self.run_local_client(args, environment, logger)
fitpath = os.path.join(BASEPATH, self.output_directory, actions.utils.FLAGFOLDER, environment["id"]) + ".fitness"
# Do not overwrite the fitness if it already exists
if not os.path.exists(fitpath):
actions.utils.write_fitness(fitness, self.output_directory, environment["id"])
return fitness
def run_docker_client(self, args, environment, logger):
"""
Runs client within the docker container. Does not return fitness; instead
fitness is written via the flags directory and read back in later.
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
"""
command = ["docker", "exec", "--privileged", environment["client"]["container"].name, "python", "code/plugins/plugin_client.py", "--server", environment["server"]["ip"]]
base_cmd = actions.utils.build_command(args)
command += base_cmd
self.exec_cmd(command)
def update_ports(self, environment):
"""
Checks that the chosen port is open inside the docker container - if not, it chooses a new port.
Args:
environment (dict): Dictionary describing docker environment
"""
command = ["docker", "exec", "--privileged", environment["server"]["container"].name, "netstat", "-ano"]
output = self.exec_cmd_output(command)
requested_port = self.args.get("port")
self.logger.debug("Testing if port %s is open in the docker container" % requested_port)
while (":%s" % requested_port) in output:
self.logger.warn("Port %s is in use, choosing a new port" % requested_port)
requested_port = random.randint(1000, 65000)
output = self.exec_cmd_output(command)
self.logger.debug("Using port %s" % requested_port)
self.args.update({"port": requested_port})
self.client_args.update({"port": requested_port})
self.server_args.update({"port": requested_port})
def run_remote_client(self, args, environment, logger):
"""
Runs client remotely over SSH, using the given SSH channel
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
Returns:
float: Fitness of individual
"""
worker = environment["worker"]
remote = environment["remote"]
command = []
if worker["username"] != "root":
command = ["sudo"]
command += [worker["python"], os.path.join(worker["geneva_path"], "plugins/plugin_client.py")]
base_cmd = actions.utils.build_command(args)
command += base_cmd
command = " ".join(command)
self.remote_exec_cmd(remote, command, logger, timeout=20)
# Get the logs from the run
self.get_log(remote, worker, "%s.client.log" % environment["id"], logger)
if not args["server_side"]:
self.get_log(remote, worker, "%s.engine.log" % environment["id"], logger)
# Get the individual's fitness
command = 'cat %s/%s/%s/%s.fitness' % (worker["geneva_path"], self.output_directory, actions.utils.FLAGFOLDER, environment["id"])
remote_fitness, error_lines = self.remote_exec_cmd(remote, command, logger)
fitness = -1000
try:
fitness = int(remote_fitness[0])
except Exception:
logger.exception("Failed to parse remote fitness.")
return None
return fitness
def remote_exec_cmd(self, remote, command, logger, timeout=15, verbose=True):
"""
Given a remote SSH session, executes a string command. Blocks until
command completes, and returns the stdout and stderr. If the SSH
connection is broken, it will try again.
Args:
remote: Paramiko SSH channel to execute commands over
command (str): Command to execute
logger (:obj:`logging.Logger`): A logger to log with
timeout (int, optional): Timeout for the command
verbose (bool, optional): Whether the output should be printed
Returns:
tuple: (stdout, stderr) of command, each is a list
"""
i, max_tries = 0, 3
lines = []
error_lines = []
stdin_, stdout_, stderr_ = None, None, None
while i < max_tries:
try:
if verbose:
logger.debug(command)
stdin_, stdout_, stderr_ = remote.exec_command(command, timeout=timeout)
# Block until the client finishes
stdout_.channel.recv_exit_status()
error_lines = stderr_.readlines()
lines = stdout_.readlines()
break
# We would like to catch paramiko.SSHException here, but because of issues with importing paramiko
# at the top of the file in the main namespace, we catch Exception instead as a broader exception.
except Exception:
logger.error("Failed to execute \"%s\" on remote host. Re-creating SSH tunnel." % command)
# Note that at this point, our remote object still has a valid channel as far as paramiko is
# concerned, but the channel is no longer responding. If we tried to do remote.close() here,
# it would hang our process. Instead, we'll set up a new remote channel, and let Python's garbage
# collection handle destroying the original remote object for us.
try:
remote = self.setup_remote()
except Exception:
logger.error("Failed to re-connect remote - trying again.")
i += 1
if verbose:
for line in error_lines:
logger.debug("ERROR: %s", line.strip())
# Close the channels
if stdin_:
stdin_.close()
if stdout_:
stdout_.close()
if stderr_:
stderr_.close()
return lines, error_lines
def get_log(self, remote, worker, log_name, logger):
"""
Retrieves a log from the remote server and writes it to disk.
Args:
remote: A Paramiko SSH channel to execute over
worker (dict): Dictionary describing external client worker
log_name (str): Log name to retrieve
logger (:obj:`logging.Logger`): A logger to log with
"""
# Get the client.log
log_path = os.path.join(self.output_directory, "logs", log_name)
command = "cat %s" % os.path.join(worker["geneva_path"], log_path)
client_log, error_lines = self.remote_exec_cmd(remote, command, logger, verbose=False)
# If something goes wrong, we don't necessarily want to dump the entire client_log to the screen
# a second time, so just disable verbosity and display the stderr.
for line in error_lines:
logger.error(line.strip())
# Write the client log out to disk
with open(log_path, "w") as fd:
for line in client_log:
fd.write(line)
def run_local_client(self, args, environment, logger):
"""
Runs client locally. Does not return fitness.
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
"""
# Launch the plugin client
command = [sys.executable, "plugins/plugin_client.py", "--plugin", self.client_cls.name]
base_cmd = actions.utils.build_command(args)
command += base_cmd
# Replace strings of empty strings "''" with empty strings "", as subprocess will handle this correctly
command = [x if x != "''" else "" for x in command]
logger.debug(" ".join(command))
self.exec_cmd(command)
def exec_cmd(self, command, timeout=60):
"""
Runs a subprocess command at the correct log level.
Args:
command (list): Command to execute.
timeout (int, optional): Timeout for execution
"""
self.logger.debug(" ".join(command))
try:
if actions.utils.get_console_log_level() == "debug":
subprocess.check_call(command, timeout=60)
else:
subprocess.check_call(command, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, timeout=60)
except subprocess.CalledProcessError as exc:
# Code 137 is for SIGKILL, which is how docker containers are shutdown by the evaluator.
# Ignore these exceptions, raise all others
if exc.returncode != 137:
raise
def exec_cmd_output(self, command, timeout=60):
"""
Runs a subprocess command at the correct log level. This is a separate method from the above exec_cmd,
since that is used to stream output to the screen (so check_output is not appropriate).
Args:
command (list): Command to execute.
timeout (int, optional): Timeout for execution
Returns:
str: Output of command
"""
self.logger.debug(" ".join(command))
output = ""
try:
output = subprocess.check_output(command, timeout=60, stderr=subprocess.PIPE).decode('utf-8', 'ignore')
if actions.utils.get_console_log_level() == "debug":
self.logger.debug(output)
except subprocess.CalledProcessError as exc:
# Code 137 is for SIGKILL, which is how docker containers are shutdown by the evaluator.
# Ignore these exceptions, raise all others
if exc.returncode != 137:
raise
return output
def start_server(self, args, environment, logger):
"""
Launches the server.
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
Return:
float: fitness of individual (if one is provided)
"""
if environment.get("docker"):
logger.debug("Evaluator: running server inside docker")
return self.run_docker_server(args, environment, logger)
else:
logger.debug("Evaluator: running server")
return self.run_local_server(args, environment, logger)
def run_docker_server(self, args, environment, logger):
"""
Runs server and censor in their respective docker containers.
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
"""
command = ["docker", "exec", "--privileged", environment["server"]["container"].name, "python", "code/plugins/plugin_server.py", "--test-type", self.server_cls.name]
base_cmd = actions.utils.build_command(args)
command += base_cmd
# Replace strings of empty strings "''" with empty strings "", as subprocess will handle this correctly
command = [x if x != "''" else "" for x in command]
port = args.get("port")
queue_num = random.randint(1, 1000)
environment["port"] = port
environment["queue_num"] = queue_num
server_thread = threading.Thread(target=self.exec_cmd, args=(command, ))
censor_thread = threading.Thread(target=self.start_censor, args=(environment, environment["id"]))
censor_thread.start()
server_thread.start()
max_wait = 30
count = 0
flag_file = os.path.join(args["output_directory"], "flags", "%s.server_ready" % args["environment_id"])
while count < max_wait:
if os.path.exists(flag_file):
break
if count % 15 == 0:
logger.debug("Evaluator waiting for confirmation of server startup")
count += 1
time.sleep(0.5)
else:
logger.warn("Evaluator: Server did not startup within window")
return
logger.debug("Evaluator: Server ready.")
def stop_server(self, environment, server):
"""
Stops server.
Args:
environment (dict): Environment dictionary
server (:obj:`plugins.plugin_server.ServerPlugin`): A plugin server to stop
"""
# If the server is running inside a docker container, we don't have access to it directly
# to shut it down. Instead, write a shutdown flag to instruct it to shut down.
self.logger.debug("Evaluator shutting down server.")
if environment.get("docker"):
flag_file = os.path.join(self.args["output_directory"], "flags", "%s.server_shutdown" % self.server_args["environment_id"])
# Touch shutdown file to instruct the server to shutdown
open(flag_file, 'a').close()
self.stop_censor(environment)
else:
# Shut down the server
server.stop()
# Shut down the server's logger, now that we are done with it
actions.utils.close_logger(environment["server_logger"])
def run_local_server(self, args, environment, logger):
"""
Runs local server.
Args:
args (dict): Dictionary of arguments
environment (dict): Dictionary describing environment configuration for this evaluation
logger (:obj:`logging.Logger`): A logger to log with
"""
server = self.server_cls(args)
logger.debug("Starting local server with args: %s" % str(args))
server_logger = actions.utils.get_logger(PROJECT_ROOT, args["output_directory"], "server", "server", environment["id"], log_level=actions.utils.get_console_log_level())
environment["server_logger"] = server_logger
args.update({"test_type": self.server_cls.name})
if not args.get("server_side"):
args.update({"no_engine" : True})
server.start(args, server_logger)
return server
def canary_phase(self, canary):
"""
Learning phase runs the client against the censor to collect packets.
Args:
canary (:obj:`actions.strategy.Strategy`): A (usually empty) strategy object to evaluate
Returns:
str: canary id used ("canary")
"""
if not self.run_canary_phase:
return None
self.logger.info("Starting collection phase")
environment = {}
canary.environment_id = "canary"
if self.use_docker:
try:
environment = self.create_test_environment("canary")
except (docker.errors.APIError, requests.exceptions.ConnectionError, urllib3.exceptions.ProtocolError):
self.logger.error("Failed to create evaluator environment - is docker running?")
return None
self.worker([canary], canary.environment_id, environment)
self.logger.info("Collected packets under %s" % canary)
return "canary"
def get_ip(self):
"""
Gets IP of evaluator computer.
Returns:
str: Public IP provided
"""
if self.public_ip:
return self.public_ip
return None
def create_test_environment(self, worker_id):
"""
Creates a test environment in docker.
Args:
worker_id (int): Worker ID of this worker
Returns:
dict: Environment dictionary to use
"""
self.logger.debug("Initializing docker environment.")
# We can't have an environment with an intenral test server and no censor
# with the current set up. To be addressed later to allow for no censor testing
assert not (not self.censor and not self.external_server), "Can't create internal server w/o censor"
assert not (self.censor and self.external_server), "Can't create a censor without an internal training server"
# Create a dict to hold the environment we're about to create
environment = {}
# Create the client container
environment["client"] = self.initialize_base_container("client_%s" % worker_id)
environment["client"]["ip"] = self.parse_ip(environment["client"]["container"], "eth0")
# If a training censor is requested, create a censor container
if self.censor:
environment["censor"] = self.initialize_base_container("censor_%s" % worker_id)
environment["server"] = self.initialize_base_container("server_%s" % worker_id)
# Set up the routing
environment["server"]["ip"] = self.parse_ip(environment["server"]["container"], "eth0")
environment["censor"]["ip"] = self.parse_ip(environment["censor"]["container"], "eth0")
self._add_route(environment["server"]["container"], environment["censor"]["ip"])
self._add_route(environment["client"]["container"], environment["censor"]["ip"])
# Calculate the network base ("172.17.0.0")
network_base = ".".join(environment["server"]["ip"].split(".")[:2]) + ".0.0"
# Delete all other routes for the server and client to force communication through the censor
environment["server"]["container"].exec_run(["route", "del", "-net", network_base, "gw", "0.0.0.0", "netmask", "255.255.0.0", "dev", "eth0"], privileged=True)
environment["client"]["container"].exec_run(["route", "del", "-net", network_base, "gw", "0.0.0.0", "netmask", "255.255.0.0", "dev", "eth0"], privileged=True)
# Set up NAT on the censor
environment["censor"]["container"].exec_run(["iptables", "-t", "nat", "-A", "POSTROUTING", "-j", "MASQUERADE"], privileged=True)
self.environments.append(environment)
# Flag that this environment is a docker environment
environment["docker"] = True
# Return the configured environment for use
return environment
def _add_route(self, container, via):
"""
Helper method to take down an interface on a container
Args:
container: Docker container object to execute within
via (str): IP address to route through
"""
exit_code, _output = container.exec_run(["ip", "route", "del", "default"], privileged=True)
exit_code, _output = container.exec_run(["ip", "route", "add", "default", "via", via], privileged=True)
return exit_code
def parse_ip(self, container, iface):
"""
Helper method to parse an IP address from ifconfig.
Args:
container: Docker container object to execute within
iface (str): Interface to parse from
Returns:
str: IP address
"""
_exit_code, output = container.exec_run(["ifconfig", iface], privileged=True)
ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', output.decode("utf-8"))[0]
return ip
def setup_remote(self):
"""
Opens an SSH tunnel to the remote client worker.
"""
# Import paramiko here instead of at the top of the file. This is done intentionally. When
# paramiko is imported, pynacl is loaded, which polls /dev/random for entropy to setup crypto
# keys. However, if the evaluator is run on a relatively blank VM (or AWS instance) with little
# network traffic before it starts (as will often be the case), there may be insufficient entropy
# available in the system. This will cause pynacl to block on entropy, and since the only thing
# running on the system is now blocking, it is liable to block indefinitely. Instead, the import
# is performed here so that the system interaction of running the evaluator this far collects
# enough entropy to not block paramiko. The pynacl team is aware of this issue: see issue #503
# (https://github.com/pyca/pynacl/issues/503) and #327 (https://github.com/pyca/pynacl/issues/327)
import paramiko
paramiko_logger = paramiko.util.logging.getLogger()
paramiko_logger.setLevel(logging.WARN)
worker = actions.utils.get_worker(self.external_client, self.logger)
if self.use_docker:
worker["ip"] = "0.0.0.0"
# Pull the destination to connect to this worker. Preference hostnames over IP addresses.
destination = worker["hostname"]
if not destination:
destination = worker["ip"]
self.logger.debug("Connecting to worker %s@%s" % (worker["username"], destination))
remote = paramiko.SSHClient()
remote.set_missing_host_key_policy(paramiko.AutoAddPolicy())
max_tries = 5
i = 0
while i < max_tries:
try:
if "keyfile" in worker:
k = paramiko.RSAKey.from_private_key_file(worker["keyfile"])
remote.connect(destination, username=worker["username"], pkey=k, port=worker["port"], timeout=60)
else:
remote.connect(destination, username=worker["username"], password=worker["password"], port=worker["port"], timeout=60)
break
except socket.timeout:
self.logger.error("Could not connect to worker %s" % destination)
i += 1
return remote
def worker(self, ind_list, worker_id, environment):
"""
Perform the actual fitness evaluation as a multithreaded worker. The
worker pops off an individual from the list and evaluates it.
Args:
ind_list (list): List of strategy objects to evaluate
worker_id (int): ID of this worker
environment (dict): Environment dictionary
"""
environment["remote"] = None
if self.external_client:
environment["remote"] = self.setup_remote()
environment["worker"] = actions.utils.get_worker(self.external_client, self.logger)
for ind in ind_list:
if self.stop:
break
# Run a test
eid, fitness = self.run_test(environment, ind)
if not fitness:
fitness = -1000
# Dump logs if requested
if fitness < 0 and self.args.get("log_on_fail"):
self.dump_logs(eid)
elif fitness > 0 and self.args.get("log_on_success"):
self.dump_logs(eid)
# Clean up the test environment
self.shutdown_environment(environment)
def assign_ids(self, ind_list):
"""
Assigns random environment ids to each individual to be evaluated.
Args:
ind_list (list): List of individuals to assign random IDs to
"""
for ind in ind_list:
ind.environment_id = actions.utils.get_id()
ind.fitness = None
def dump_logs(self, environment_id):
"""
Dumps client, engine, server, and censor logs, to be called on test failure
at ERROR level.
Args:
environment_id (str): Environment ID of a strategy to dump
"""
log_files = ["client.log", "engine.log", "censor.log", "server.log"]
for log_file in log_files:
log = ""
log_path = os.path.join(BASEPATH,
self.output_directory,
"logs",
"%s.%s" % (environment_id, log_file))
try:
if not os.path.exists(log_path):
continue
with open(log_path, "rb") as logfd:
log = logfd.read().decode('utf-8')
except Exception:
self.logger.exception("Failed to open log file")
continue
self.logger.error("%s: %s", log_file, log)
def terminate_docker(self):
"""
Terminates any hanging running containers.
"""
if not self.use_docker:
return
# First, stop all the docker containers that match the given names
# If a previous run was cut off in between container creation and startup,
# we must also remove the container ('docker rm <container>')
for operation in ["stop", "rm"]:
try:
output = subprocess.check_output(['docker', 'ps', '--format', "'{{.Names}}'"]).decode('utf-8')
except subprocess.CalledProcessError:
self.logger.error("Failed to list container names -- is docker running?")
return
if output.strip():
self.logger.debug("Cleaning up docker (%s)" % operation)
for name in output.splitlines():
if any(key in name for key in ["client", "censor", "server"]):
try:
subprocess.check_output(['docker', operation, name])
except subprocess.CalledProcessError:
pass
def initialize_base_container(self, name):
"""
Builds a base container with a given name and connects it to a given network.
Also retrieves lower level settings and the IP address of the container.
Args:
name (str): Name of this docker container
Returns:
dict: Dictionary containing docker container object and relevant information
"""
try:
container = {}
container["name"] = name
# Note that this is _not_ safe to do in a multiprocessed context - must be run single threaded.
container["container"] = self.docker_client.containers.run('base', detach=True, privileged=True, volumes={os.path.abspath(os.getcwd()): {"bind" : "/code", "mode" : "rw"}}, tty=True, remove=True, name=name)
container["settings"] = self.apiclient.inspect_container(name)
except docker.errors.NotFound:
self.logger.error("Could not run container \"base\". Is docker not running, or does the base container not exist?")
return None
return container
def get_pid(self, container):
"""
Returns PID of first actively running python process.
Args:
container: Docker container object to query
Returns:
int: PID of Python process
"""
pid = None
try:
output = subprocess.check_output(["docker", "exec", container.name, "ps", "aux"], stderr=subprocess.PIPE).decode('utf-8')
except subprocess.CalledProcessError:
return None
for line in output.splitlines():
if "root" not in line or "python" not in line:
continue
parts = line.split()
# Try to parse out the pid to confirm we found it
try:
pid = int(parts[1])
break
except ValueError:
raise
return pid
def stop_censor(self, environment):
"""
Send SIGKILL to all remaining python processes in the censor container.
This is done intentionally over a SIGINT or a graceful shutdown mecahnism - due to
dynamics with signal handling in nfqueue callbacks (threads), SIGINTs can be ignored
and graceful shutdown mechanisms may not be picked up (or be fast enough).
The output this method parses is below:
.. code-block:: bash
# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.1 0.0 21944 3376 pts/0 Ss+ 13:30 0:00 /bin/bash
root 14 24.0 0.4 200376 38564 ? Ss 13:30 0:00 python code/censor_driver.py censor2 jgsko1rf trials/2018-10-30_06:30:48 60181
root 32 0.0 0.0 19188 2400 ? Rs 13:30 0:00 ps aux
Args:
environment (dict): Environment dictionary
"""
port = environment["port"]
queue_num = environment["queue_num"]
if environment:
pid = self.get_pid(environment["censor"]["container"])
while pid:
#self.logger.info("%s killing process %s in %s" % (environment["id"], str(pid), environment["censor"]["container"].name))
try:
subprocess.check_call(["docker", "exec", "--privileged", environment["censor"]["container"].name, "kill", "-9", str(pid)])
except subprocess.CalledProcessError:
pass
pid = self.get_pid(environment["censor"]["container"])
time.sleep(0.25)
try:
subprocess.check_call(["docker", "exec", "--privileged", environment["censor"]["container"].name, "iptables", "-D", "FORWARD", "-j", "NFQUEUE", "-p", "tcp", "--sport", str(port), "--queue-num", str(queue_num)])
except subprocess.CalledProcessError:
pass
try:
subprocess.check_call(["docker", "exec", "--privileged",environment["censor"]["container"].name, "iptables", "-D", "FORWARD", "-j", "NFQUEUE", "-p", "tcp", "--dport", str(port), "--queue-num", str(queue_num)])
except subprocess.CalledProcessError:
pass
def start_censor(self, environment, environment_id):
"""
Starts the censor in the server environment container.
Args:
environment (dict): Environment dictionary
environment_id (str): Environment ID of the censor to stop
"""
assert self.use_docker, "Cannot start censor without enabling docker"
port = environment["port"]
queue_num = environment["queue_num"]
try:
self.logger.debug(" Starting censor %s with driver" % self.censor)
command = ["docker", "exec", "--privileged", environment["censor"]["container"].name,
"python", "code/censors/censor_driver.py",
"--censor", self.censor,
"--environment-id", environment_id,
"--output-directory", self.output_directory,
"--port", str(port),
"--log", "debug",
"--forbidden", self.args.get("bad_word", "ultrasurf"),
"--queue", str(queue_num)]
self.exec_cmd(command)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
# Docker containers were killed out from under us - likely means
# user forced a shutdown. Bail gracefully.
return False
except Exception:
self.logger.exception("Failed out of start_censor")
finally:
self.logger.debug("Dockerized censor thread exiting")
def read_fitness(self, ind):
"""
Looks for this individual's fitness file on disk, opens it, and stores the fitness in the given individual.
Args:
ind (:obj:`actions.strategy.Strategy`): Individual to read fitness for
"""
fitness_path = os.path.join(BASEPATH, self.output_directory, actions.utils.FLAGFOLDER, ind.environment_id + ".fitness")
try:
if os.path.exists(fitness_path):
with open(fitness_path, "r") as fd:
ind.fitness = float(fd.read())
elif not ind.fitness:
self.logger.warning("Could not find fitness file for %s" % fitness_path)
ind.fitness = -1000
except:
self.logger.exception("[%s] Failed to read fitness file" % ind.environment_id)
ind.fitness = -1000
def shutdown_container(self, container):
"""
Tries to shutdown a given container and eats a NotFound exception if the container
has already exited.
Args:
container: docker container object to call stop() on
"""
try:
container.stop()
except docker.errors.NotFound:
pass
def shutdown_environment(self, environment):
"""
Shuts down the evaluation environment.
If Docker, shuts down server and client container.
If a remote SSH connection, the connection is shut down.
"""
if environment.get("docker"):
self.shutdown_container(environment["client"]["container"])
if self.censor:
self.shutdown_container(environment["censor"]["container"])
self.shutdown_container(environment["server"]["container"])
elif environment.get("remote"):
environment["remote"].close()
def shutdown(self):
"""
Shuts down all active environments
"""
self.terminate_docker()
def collect_plugin(test_plugin, plugin_type, command, full_args, plugin_args):
"""
Import a given plugin
Args:
test_plugin (str): Plugin name to import ("http")
plugin_type (str): Component of plugin to import ("client")
command (list): sys.argv or list of arguments
full_args (dict): Parsed full list of arguments already maintained by the parent plugin
plugin_args (dict): Dictionary of args specific to this plugin component
Returns:
Imported plugin class for instantiation later
"""
cls = None
try:
_, cls = actions.utils.import_plugin(test_plugin, plugin_type)
parsed_args = cls.get_args(command)
# Only override the args if the plugin successfully parsed something; this allows
# defaults from the evaluator or plugin to propagate.
parsed_args = {k:v for k,v in parsed_args.items() if v or (not v and k not in full_args) }
full_args.update(parsed_args)
plugin_args.update(parsed_args)
except ImportError as exc:
pass
return cls
def get_random_open_port():
"""
Selects a random ephemeral port that is open.
Returns:
int: Open port
"""
while True:
port = random.randint(1024, 65000)
# Bind TCP socket
try:
with socket.socket() as sock:
# If we can bind, nothing is listening
sock.bind(('', port))
break
except OSError:
continue
return port
def get_arg_parser(single_use=False):
"""
Sets up argparse. This is done separately to enable collection of help messages.
Args:
single_use (bool, optional): whether this evaluator will only be used for one strategy, used to configure sane defaults
"""
# Disable prefix matching to avoid prefix collisions for unseen plugin arguments
parser = argparse.ArgumentParser(description='Evaluate a given strategy a given number of times.', allow_abbrev=False, prog="evaluator.py")
# Type of evaluation
parser.add_argument('--test-type', action='store', choices=actions.utils.get_plugins(), default="http", help="plugin to launch")
parser.add_argument('--strategy', action='store', default="", required=single_use, help='strategy to evaluate')
logging_group = parser.add_argument_group('control aspects of evaluator logging and storage')
logging_group.add_argument('--log', action='store', choices=("debug", "info", "warning", "critical", "error"), help="Sets the log level")
logging_group.add_argument('--output-directory', action='store', help="where to output results")
logging_group.add_argument('--log-on-fail', action='store_true', help="dump the logs associated with each individual on strategy failure")
logging_group.add_argument('--log-on-success', action='store_true', help="dump the logs associated with each individual on strategy success")
external_group = parser.add_argument_group('control aspects of external resource usage')
external_group.add_argument('--external-server', action='store_true', help="use an external server for testing.")
external_group.add_argument('--external-client', action='store', help="use the given external client for testing.")
networking_group = parser.add_argument_group('control aspects of evaluator networking configuration')
networking_group.add_argument('--server-side', action="store_true", help="run the Geneva engine on the server side, not the client")
networking_group.add_argument('--public-ip', action='store', help="public facing IP for this computer for server-side evaluation.")
networking_group.add_argument('--routing-ip', action='store', help="locally facing IP for this computer, used for NAT")
networking_group.add_argument('--sender-ip', action='store', help="IP address of sending machine, used for NAT")
networking_group.add_argument('--forward-ip', action='store', help="IP address to forward traffic to")
networking_group.add_argument('--act-as-middlebox', action='store_true', help="enables NAT mode. Requires --routing-ip, --sender-ip, and --forward-ip")
networking_group.add_argument('--port', action='store', type=int, default=get_random_open_port(), help='default port to use')
docker_group = parser.add_argument_group('control aspects of docker-specific options')
docker_group.add_argument('--censor', action='store', help='censor to test against.', choices=censors.censor_driver.get_censors())
docker_group.add_argument('--workers', action='store', default=1, type=int, help='controls the number of docker containers the evaluator will use.')
docker_group.add_argument('--bad-word', action='store', help="forbidden word to test with", default="ultrasurf")
evaluation_group = parser.add_argument_group('control aspects of evaluation')
evaluation_group.add_argument('--runs', type=int, default=1, action='store', help="number of times each individual should be run per evaluation")
evaluation_group.add_argument("--fitness-by", action='store', choices=('min', 'avg', 'max'), default='avg', help="if each individual is run multiple times, control how fitness is assigned.")
evaluation_group.add_argument('--no-skip-empty', action='store_true', help="evaluate empty strategies (default: False).")
return parser
def get_args(cmd, single_use=False):
"""
Creates an argparser and collects arguments.
Args:
single_use (bool, optional): whether this evaluator will only be used for one strategy, used to configure sane defaults
Returns:
dict: parsed args
"""
parser = get_arg_parser(single_use=single_use)
args, _ = parser.parse_known_args(cmd)
return vars(args)