mirror of https://github.com/Kkevsterrr/geneva
95 lines
3.2 KiB
Python
95 lines
3.2 KiB
Python
import argparse
|
|
import importlib
|
|
import inspect
|
|
import os
|
|
import traceback
|
|
import sys
|
|
|
|
BASEPATH = os.path.dirname(os.path.abspath(__file__))
|
|
PROJECT_ROOT = os.path.dirname(BASEPATH)
|
|
|
|
if PROJECT_ROOT not in sys.path:
|
|
sys.path.append(PROJECT_ROOT)
|
|
|
|
import censors.censor
|
|
|
|
CENSORS = {}
|
|
|
|
def get_censors():
|
|
"""
|
|
Dynamically imports all of the Censors classes in this directory.
|
|
"""
|
|
global CENSORS
|
|
if CENSORS:
|
|
return CENSORS
|
|
|
|
collected_censors = {}
|
|
for censor_file in os.listdir(os.path.dirname(os.path.abspath(__file__))):
|
|
if not censor_file.endswith(".py"):
|
|
continue
|
|
censor_file = censor_file.replace(".py", "")
|
|
importlib.import_module("censors."+censor_file)
|
|
def check_censor(o):
|
|
return inspect.isclass(o) and issubclass(o, censors.censor.Censor) and o != censors.censor.Censor
|
|
clsmembers = inspect.getmembers(sys.modules["censors."+censor_file], predicate=check_censor)
|
|
if clsmembers:
|
|
name, censor_class = clsmembers[0]
|
|
if censor_class(0, [], None, None, None, None).enabled:
|
|
collected_censors[name.lower()] = censor_class
|
|
|
|
CENSORS = collected_censors
|
|
return collected_censors
|
|
|
|
|
|
def get_args():
|
|
"""
|
|
Sets up argparse and collects arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser(description='The server, run by the evaluator.')
|
|
parser.add_argument('--port', type=int, action='store', help="Server port",
|
|
required=True)
|
|
parser.add_argument('--queue', type=int, action='store', help="NFQueue number to use",
|
|
required=True)
|
|
parser.add_argument('--environment-id', action='store', help="ID of the current environment",
|
|
required=True)
|
|
parser.add_argument('--censor', action='store', help="censor to deploy", required=True)
|
|
parser.add_argument('--forbidden', action='store', default='ultrasurf', help="word to censor")
|
|
parser.add_argument('--output-directory', action='store', help="Where to write logs",
|
|
required=True)
|
|
parser.add_argument('--log', action='store', default="debug",
|
|
choices=("debug", "info", "warning", "critical", "error"),
|
|
help="Sets the log level")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main(args):
|
|
"""
|
|
Starts the given censor.
|
|
"""
|
|
try:
|
|
censors = get_censors()
|
|
censor_name = args["censor"].lower()
|
|
|
|
if censor_name not in censors:
|
|
print("ERROR: Unknown censor.")
|
|
return None
|
|
censor_cls = censors[censor_name]
|
|
censor = censor_cls(args["environment_id"],
|
|
[args["forbidden"].encode('utf-8')],
|
|
args["output_directory"],
|
|
args["log"],
|
|
args["port"],
|
|
args["queue"])
|
|
print("Censor %s starting." % censor_name)
|
|
censor.start()
|
|
except Exception as e:
|
|
print(e)
|
|
traceback.print_exc()
|
|
|
|
|
|
# Note that this code can be removed - this is how the
|
|
# evaluator runs the censor for tests in the Docker environment
|
|
if __name__ == "__main__":
|
|
main(vars(get_args()))
|