From 1187f1ed738890b63201c136e607a88781a8b36e Mon Sep 17 00:00:00 2001 From: Hemna Date: Sat, 17 Dec 2022 20:02:49 -0500 Subject: [PATCH] Make tracking objectstores work w/o initializing This changes the objectstore to test to see if the config has been set or not. if not, then it doesn't try to save/load from disk. --- aprsd/cmds/listen.py | 8 +-- aprsd/cmds/webchat.py | 2 +- aprsd/flask.py | 2 +- aprsd/packets/packet_list.py | 29 ++++++----- aprsd/packets/seen_list.py | 8 ++- aprsd/packets/tracker.py | 11 +++-- aprsd/packets/watch_list.py | 3 ++ aprsd/stats.py | 8 +++ aprsd/threads/keep_alive.py | 7 ++- aprsd/threads/rx.py | 2 +- aprsd/threads/tx.py | 4 +- aprsd/utils/objectstore.py | 94 +++++++++++++++++++++--------------- aprsd/utils/ring_buffer.py | 3 ++ 13 files changed, 115 insertions(+), 66 deletions(-) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index e889445..61ded19 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -40,7 +40,8 @@ def signal_handler(sig, frame): class APRSDListenThread(rx.APRSDRXThread): def process_packet(self, *args, **kwargs): packet = self._client.decode_packet(*args, **kwargs) - packet.log(header="RX Packet") + packet.log(header="RX") + packets.PacketList().rx(packet) @cli.command() @@ -113,9 +114,6 @@ def listen( # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") - packets.PacketTrack(config=config).load() - packets.WatchList(config=config).load() - packets.SeenList(config=config).load() # Initialize the client factory and create # The correct client object ready for use @@ -133,8 +131,6 @@ def listen( LOG.debug(f"Filter by '{filter}'") aprs_client.set_filter(filter) - packets.PacketList(config=config) - keepalive = threads.KeepAliveThread(config=config) keepalive.start() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 58a8eba..1f19412 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -181,7 +181,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): packet.get("addresse", None) fromcall = packet.from_call - packets.PacketList().add(packet) + packets.PacketList().rx(packet) stats.APRSDStats().msgs_rx_inc() message = packet.get("message_text", None) msg = { diff --git a/aprsd/flask.py b/aprsd/flask.py index 76865c1..2b85857 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -209,7 +209,7 @@ class SendMessageThread(threads.APRSDRXThread): def process_our_message_packet(self, packet): global socketio - packets.PacketList().add(packet) + packets.PacketList().rx(packet) stats.APRSDStats().msgs_rx_inc() msg_number = packet.msgNo SentMessages().reply(self.packet.msgNo, packet) diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 89c1fd4..db850c6 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -1,6 +1,5 @@ import logging import threading -import time import wrapt @@ -18,32 +17,40 @@ class PacketList: lock = threading.Lock() config = None - packet_list = utils.RingBuffer(1000) + packet_list: utils.RingBuffer = utils.RingBuffer(1000) - total_recv = 0 - total_tx = 0 + total_recv: int = 0 + total_tx: int = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance.config = kwargs["config"] + if "config" in kwargs: + cls._instance.config = kwargs["config"] return cls._instance def __init__(self, config=None): if config: self.config = config + def _is_initialized(self): + return self.config is not None + @wrapt.synchronized(lock) def __iter__(self): return iter(self.packet_list) @wrapt.synchronized(lock) - def add(self, packet): - packet.ts = time.time() - if packet.from_call == self.config["aprs"]["login"]: - self.total_tx += 1 - else: - self.total_recv += 1 + def rx(self, packet): + """Add a packet that was received.""" + self.total_recv += 1 + self.packet_list.append(packet) + seen_list.SeenList().update_seen(packet) + + @wrapt.synchronized(lock) + def tx(self, packet): + """Add a packet that was received.""" + self.total_tx += 1 self.packet_list.append(packet) seen_list.SeenList().update_seen(packet) diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py index 3c9e1bf..d68a933 100644 --- a/aprsd/packets/seen_list.py +++ b/aprsd/packets/seen_list.py @@ -15,18 +15,22 @@ class SeenList(objectstore.ObjectStoreMixin): _instance = None lock = threading.Lock() - data = {} + data: dict = {} config = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) if "config" in kwargs: - cls._instance.config = kwargs["config"] + if "config" in kwargs: + cls._instance.config = kwargs["config"] cls._instance._init_store() cls._instance.data = {} return cls._instance + def is_initialized(self): + return self.config is not None + @wrapt.synchronized(lock) def update_seen(self, packet): callsign = None diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 82ff3f3..8ec7b67 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -23,18 +23,23 @@ class PacketTrack(objectstore.ObjectStoreMixin): _instance = None _start_time = None lock = threading.Lock() + config = None - data = {} - total_tracked = 0 + data: dict = {} + total_tracked: int = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._start_time = datetime.datetime.now() - cls._instance.config = kwargs["config"] + if "config" in kwargs: + cls._instance.config = kwargs["config"] cls._instance._init_store() return cls._instance + def is_initialized(self): + return self.config is not None + @wrapt.synchronized(lock) def __getitem__(self, name): return self.data[name] diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index 54c3995..c1da79f 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -47,6 +47,9 @@ class WatchList(objectstore.ObjectStoreMixin): ), } + def is_initialized(self): + return self.config is not None + def is_enabled(self): if self.config and "watch_list" in self.config["aprsd"]: return self.config["aprsd"]["watch_list"].get("enabled", False) diff --git a/aprsd/stats.py b/aprsd/stats.py index 8562fc3..cc8dc8f 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -90,6 +90,14 @@ class APRSDStats: def set_aprsis_keepalive(self): self._aprsis_keepalive = datetime.datetime.now() + def rx_packet(self, packet): + if isinstance(packet, packets.MessagePacket): + self.msgs_rx_inc() + elif isinstance(packet, packets.MicEPacket): + self.msgs_mice_inc() + elif isinstance(packet, packets.AckPacket): + self.ack_rx_inc() + @wrapt.synchronized(lock) @property def msgs_tx(self): diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index cfbdbc9..4813663 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -45,6 +45,11 @@ class KeepAliveThread(APRSDThread): except KeyError: login = self.config["ham"]["callsign"] + if pkt_tracker.is_initialized(): + tracked_packets = len(pkt_tracker) + else: + tracked_packets = 0 + keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" @@ -53,7 +58,7 @@ class KeepAliveThread(APRSDThread): utils.strfdelta(stats_obj.uptime), pl.total_recv, pl.total_tx, - len(pkt_tracker), + tracked_packets, stats_obj.msgs_tx, stats_obj.msgs_rx, last_msg_time, diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 796aac3..226248f 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -67,6 +67,7 @@ class APRSDPluginRXThread(APRSDRXThread): packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) packet.log(header="RX") + packets.PacketList().rx(packet) thread = APRSDPluginProcessPacketThread( config=self.config, packet=packet, @@ -101,7 +102,6 @@ class APRSDProcessPacketThread(APRSDThread): """Process a packet received from aprs-is server.""" LOG.debug(f"RXPKT-LOOP {self._loop_cnt}") packet = self.packet - packets.PacketList().add(packet) our_call = self.config["aprsd"]["callsign"].lower() from_call = packet.from_call diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index 9f88b55..b8fca07 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -67,7 +67,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): cl = client.factory.create().client cl.send(packet.raw) stats.APRSDStats().msgs_tx_inc() - packet_list.PacketList().add(packet) + packet_list.PacketList().tx(packet) packet.last_send_time = datetime.datetime.now() packet.last_send_attempt += 1 @@ -115,7 +115,7 @@ class SendAckThread(aprsd_threads.APRSDThread): cl.send(self.packet.raw) self.packet.send_count += 1 stats.APRSDStats().ack_tx_inc() - packet_list.PacketList().add(self.packet) + packet_list.PacketList().tx(self.packet) self.packet.last_send_attempt += 1 self.packet.last_send_time = datetime.datetime.now() diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index eaeaf73..4349213 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -1,3 +1,4 @@ +import abc import logging import os import pathlib @@ -9,7 +10,7 @@ from aprsd import config as aprsd_config LOG = logging.getLogger("APRSD") -class ObjectStoreMixin: +class ObjectStoreMixin(metaclass=abc.ABCMeta): """Class 'MIXIN' intended to save/load object data. The asumption of how this mixin is used: @@ -23,6 +24,13 @@ class ObjectStoreMixin: When APRSD Starts, it calls load() aprsd server -f (flush) will wipe all saved objects. """ + @abc.abstractmethod + def is_initialized(self): + """Return True if the class has been setup correctly. + + If this returns False, the ObjectStore doesn't save anything. + + """ def __len__(self): return len(self.data) @@ -36,13 +44,16 @@ class ObjectStoreMixin: return self.data[id] def _init_store(self): - sl = self._save_location() - if not os.path.exists(sl): - LOG.warning(f"Save location {sl} doesn't exist") - try: - os.makedirs(sl) - except Exception as ex: - LOG.exception(ex) + if self.is_initialized(): + sl = self._save_location() + if not os.path.exists(sl): + LOG.warning(f"Save location {sl} doesn't exist") + try: + os.makedirs(sl) + except Exception as ex: + LOG.exception(ex) + else: + LOG.warning(f"{self.__class__.__name__} is not initialized") def _save_location(self): save_location = self.config.get("aprsd.save_location", None) @@ -68,38 +79,45 @@ class ObjectStoreMixin: def save(self): """Save any queued to disk?""" - if len(self) > 0: - LOG.info(f"{self.__class__.__name__}::Saving {len(self)} entries to disk at {self._save_location()}") - with open(self._save_filename(), "wb+") as fp: - pickle.dump(self._dump(), fp) - else: - LOG.debug( - "{} Nothing to save, flushing old save file '{}'".format( - self.__class__.__name__, - self._save_filename(), - ), - ) - self.flush() + if self.is_initialized(): + if len(self) > 0: + LOG.info( + f"{self.__class__.__name__}::Saving" + f" {len(self)} entries to disk at" + f"{self._save_location()}", + ) + with open(self._save_filename(), "wb+") as fp: + pickle.dump(self._dump(), fp) + else: + LOG.debug( + "{} Nothing to save, flushing old save file '{}'".format( + self.__class__.__name__, + self._save_filename(), + ), + ) + self.flush() def load(self): - if os.path.exists(self._save_filename()): - try: - with open(self._save_filename(), "rb") as fp: - raw = pickle.load(fp) - if raw: - self.data = raw - LOG.debug( - f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.", - ) - LOG.debug(f"{self.data}") - except (pickle.UnpicklingError, Exception) as ex: - LOG.error(f"Failed to UnPickle {self._save_filename()}") - LOG.error(ex) - self.data = {} + if self.is_initialized(): + if os.path.exists(self._save_filename()): + try: + with open(self._save_filename(), "rb") as fp: + raw = pickle.load(fp) + if raw: + self.data = raw + LOG.debug( + f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.", + ) + LOG.debug(f"{self.data}") + except (pickle.UnpicklingError, Exception) as ex: + LOG.error(f"Failed to UnPickle {self._save_filename()}") + LOG.error(ex) + self.data = {} def flush(self): """Nuke the old pickle file that stored the old results from last aprsd run.""" - if os.path.exists(self._save_filename()): - pathlib.Path(self._save_filename()).unlink() - with self.lock: - self.data = {} + if self.is_initialized(): + if os.path.exists(self._save_filename()): + pathlib.Path(self._save_filename()).unlink() + with self.lock: + self.data = {} diff --git a/aprsd/utils/ring_buffer.py b/aprsd/utils/ring_buffer.py index 4029ce4..6db4dfe 100644 --- a/aprsd/utils/ring_buffer.py +++ b/aprsd/utils/ring_buffer.py @@ -1,6 +1,9 @@ class RingBuffer: """class that implements a not-yet-full buffer""" + max: int = 100 + data: list = [] + def __init__(self, size_max): self.max = size_max self.data = []