""" Geneva superclass object for defining a packet-level action. """ import inspect import importlib import os import sys import actions.utils ACTION_CACHE = {} ACTION_CACHE["in"] = {} ACTION_CACHE["out"] = {} BASEPATH = os.path.sep.join(os.path.dirname(os.path.abspath(__file__)).split(os.path.sep)[:-1]) class Action(): """ Defines the superclass for a Geneva Action. """ # Give each Action a unique ID - this is needed for graphing/visualization ident = 0 # Each Action has a 'frequency' field - this defines how likely it is to be chosen # when a new action is chosen frequency = 0 def __init__(self, action_name, direction): """ Initializes this action object. Args: action_name (str): Name of this action ("duplicate") direction (str): Direction of this action ("out", "both", "in") """ self.enabled = True self.action_name = action_name self.direction = direction self.requires_undo = False self.num_seen = 0 self.left = None self.right = None self.branching = False self.terminal = False self.ident = Action.ident Action.ident += 1 def applies(self, direction): """ Returns whether this action applies to the given direction, as branching actions are not supported on inbound trees. Args: direction (str): Direction to check if this action applies ("out", "in", "both") Returns: bool: whether or not this action can be used to a given direction """ if direction == self.direction or self.direction == "both": return True return False def mutate(self, environment_id=None): """ Mutates packet. """ def __str__(self): """ Defines string representation of this action. """ return "%s" % (self.action_name) @staticmethod def get_actions(direction, disabled=None, allow_terminal=True): """ Dynamically imports all of the Action classes in this directory. Will only return terminal actions if terminal is set to True. Args: direction (str): Limit imported actions to just those that can run to this direction ("out", "in", "both") disabled (list, optional): list of actions that are disabled allow_terminal (bool): whether or not terminal actions ("drop") should be imported Returns: dict: Dictionary of imported actions """ if disabled is None: disabled = [] # Recursively call this function again to enumerate in and out actions if direction.lower() == "both": return list(set(Action.get_actions("in", disabled=disabled, allow_terminal=allow_terminal) + \ Action.get_actions("out", disabled=disabled, allow_terminal=allow_terminal))) terminal = "terminal" if not allow_terminal: terminal = "non-terminal" if terminal not in ACTION_CACHE[direction]: ACTION_CACHE[direction][terminal] = {} else: return ACTION_CACHE[direction][terminal] collected_actions = [] # Get the base path for the project relative to this file path = os.path.join(BASEPATH, "actions") for action_file in os.listdir(path): if not action_file.endswith(".py"): continue action = action_file.replace(".py", "") if BASEPATH not in sys.path: sys.path.append(BASEPATH) importlib.import_module("actions." + action) def check_action(obj): return inspect.isclass(obj) and \ issubclass(obj, actions.action.Action) and \ obj != actions.action.Action and \ obj().applies(direction) and \ obj().enabled and \ not any([x in str(obj) for x in disabled]) and \ (allow_terminal or not obj().terminal) clsmembers = inspect.getmembers(sys.modules["actions."+action], predicate=check_action) collected_actions += clsmembers collected_actions = list(set(collected_actions)) ACTION_CACHE[direction][terminal] = collected_actions return collected_actions @staticmethod def parse_action(str_action, direction, logger): """ Parses a string action into the action object. Args: str_action (str): String representation of an action to parse direction (str): Limit actions searched through to just those that can run to this direction ("out", "in", "both") logger (:obj:`logging.Logger`): a logger to log with Returns: :obj:`action.Action`: A parsed action object """ # Collect all viable actions that can run for each respective direction outs = Action.get_actions("out") ins = Action.get_actions("in") # If we're currently parsing the OUT forest, only search the out-compatible actions if direction == "out": search = outs # Otherwise only search in-compatible actions (no branching) else: search = ins action_obj = None data = None # If this action has parameters (defined within {} attached to the action), # split off the data parameters from the raw action name if "{" in str_action: str_action, data = str_action.split("{") data = data.replace("}", "") # Search through all of the actions available for this direction to find the right class for action_name, action_cls in search: if str_action.strip() and str_action.lower() in action_name.lower(): # Define the action, and give it a reference to its parent strategy action_obj = action_cls() # If this action has data, ask the new module to parse & initialize itself to it if data: # Pass our logger to the action to alert us if it can't parse something action_obj.parse(data, logger) return action_obj