From 7b721623ef9c9c2310f34d6664a9f54e65d7ba94 Mon Sep 17 00:00:00 2001 From: Kkevsterrr Date: Thu, 12 Dec 2019 23:10:09 -0500 Subject: [PATCH] removed unused sniffer --- actions/sniffer.py | 85 ---------------------------------------------- 1 file changed, 85 deletions(-) delete mode 100644 actions/sniffer.py diff --git a/actions/sniffer.py b/actions/sniffer.py deleted file mode 100644 index d4fc543..0000000 --- a/actions/sniffer.py +++ /dev/null @@ -1,85 +0,0 @@ -import threading -import os - -import actions.packet -from scapy.all import sniff -from scapy.utils import PcapWriter - - -class Sniffer(): - """ - The sniffer class lets the user begin and end sniffing whenever in a given location with a port to filter on. - Call start_sniffing to begin sniffing and stop_sniffing to stop sniffing. - """ - - def __init__(self, location, port, logger): - """ - Intializes a sniffer object. - Needs a location and a port to filter on. - """ - self.stop_sniffing_flag = False - self.location = location - self.port = port - self.pcap_thread = None - self.packet_dumper = None - self.logger = logger - full_path = os.path.dirname(location) - assert port, "Need to specify a port in order to launch a sniffer" - if not os.path.exists(full_path): - os.makedirs(full_path) - - def __packet_callback(self, scapy_packet): - """ - This callback is called whenever a packet is applied. - Returns true if it should finish, otherwise, returns false. - """ - packet = actions.packet.Packet(scapy_packet) - for proto in ["TCP", "UDP"]: - if(packet.haslayer(proto) and ((packet[proto].sport == self.port) or (packet[proto].dport == self.port))): - break - else: - return self.stop_sniffing_flag - - self.logger.debug(str(packet)) - self.packet_dumper.write(scapy_packet) - return self.stop_sniffing_flag - - def __spawn_sniffer(self): - """ - Saves pcaps to a file. Should be run as a thread. - Ends when the stop_sniffing_flag is set. Should not be called by user - """ - self.packet_dumper = PcapWriter(self.location, append=True, sync=True) - while(self.stop_sniffing_flag == False): - sniff(stop_filter=self.__packet_callback, timeout=1) - - def start_sniffing(self): - """ - Starts sniffing. Should be called by user. - """ - self.stop_sniffing_flag = False - self.pcap_thread = threading.Thread(target=self.__spawn_sniffer) - self.pcap_thread.start() - self.logger.debug("Sniffer starting to port %d" % self.port) - - def __enter__(self): - """ - Defines a context manager for this sniffer; simply starts sniffing. - """ - self.start_sniffing() - return self - - def __exit__(self, exc_type, exc_value, tb): - """ - Defines exit context manager behavior for this sniffer; simply stops sniffing. - """ - self.stop_sniffing() - - def stop_sniffing(self): - """ - Stops the sniffer by setting the flag and calling join - """ - if(self.pcap_thread): - self.stop_sniffing_flag = True - self.pcap_thread.join() - self.logger.debug("Sniffer stopping")