Make tracking objectstores work w/o initializing

This changes the objectstore to test to see if the config has been
set or not.  if not, then it doesn't try to save/load from disk.
This commit is contained in:
Hemna 2022-12-17 20:02:49 -05:00
parent c201c93b5d
commit 1187f1ed73
13 changed files with 115 additions and 66 deletions

View File

@ -40,7 +40,8 @@ def signal_handler(sig, frame):
class APRSDListenThread(rx.APRSDRXThread):
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
packet.log(header="RX Packet")
packet.log(header="RX")
packets.PacketList().rx(packet)
@cli.command()
@ -113,9 +114,6 @@ def listen(
# Try and load saved MsgTrack list
LOG.debug("Loading saved MsgTrack object.")
packets.PacketTrack(config=config).load()
packets.WatchList(config=config).load()
packets.SeenList(config=config).load()
# Initialize the client factory and create
# The correct client object ready for use
@ -133,8 +131,6 @@ def listen(
LOG.debug(f"Filter by '{filter}'")
aprs_client.set_filter(filter)
packets.PacketList(config=config)
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()

View File

@ -181,7 +181,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
packet.get("addresse", None)
fromcall = packet.from_call
packets.PacketList().add(packet)
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
msg = {

View File

@ -209,7 +209,7 @@ class SendMessageThread(threads.APRSDRXThread):
def process_our_message_packet(self, packet):
global socketio
packets.PacketList().add(packet)
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
msg_number = packet.msgNo
SentMessages().reply(self.packet.msgNo, packet)

View File

@ -1,6 +1,5 @@
import logging
import threading
import time
import wrapt
@ -18,32 +17,40 @@ class PacketList:
lock = threading.Lock()
config = None
packet_list = utils.RingBuffer(1000)
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
total_recv = 0
total_tx = 0
total_recv: int = 0
total_tx: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.config = kwargs["config"]
if "config" in kwargs:
cls._instance.config = kwargs["config"]
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
def _is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.packet_list)
@wrapt.synchronized(lock)
def add(self, packet):
packet.ts = time.time()
if packet.from_call == self.config["aprs"]["login"]:
self.total_tx += 1
else:
self.total_recv += 1
def rx(self, packet):
"""Add a packet that was received."""
self.total_recv += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def tx(self, packet):
"""Add a packet that was received."""
self.total_tx += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)

View File

@ -15,18 +15,22 @@ class SeenList(objectstore.ObjectStoreMixin):
_instance = None
lock = threading.Lock()
data = {}
data: dict = {}
config = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
cls._instance.config = kwargs["config"]
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
def is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = None

View File

@ -23,18 +23,23 @@ class PacketTrack(objectstore.ObjectStoreMixin):
_instance = None
_start_time = None
lock = threading.Lock()
config = None
data = {}
total_tracked = 0
data: dict = {}
total_tracked: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._start_time = datetime.datetime.now()
cls._instance.config = kwargs["config"]
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
return cls._instance
def is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def __getitem__(self, name):
return self.data[name]

View File

@ -47,6 +47,9 @@ class WatchList(objectstore.ObjectStoreMixin):
),
}
def is_initialized(self):
return self.config is not None
def is_enabled(self):
if self.config and "watch_list" in self.config["aprsd"]:
return self.config["aprsd"]["watch_list"].get("enabled", False)

View File

@ -90,6 +90,14 @@ class APRSDStats:
def set_aprsis_keepalive(self):
self._aprsis_keepalive = datetime.datetime.now()
def rx_packet(self, packet):
if isinstance(packet, packets.MessagePacket):
self.msgs_rx_inc()
elif isinstance(packet, packets.MicEPacket):
self.msgs_mice_inc()
elif isinstance(packet, packets.AckPacket):
self.ack_rx_inc()
@wrapt.synchronized(lock)
@property
def msgs_tx(self):

View File

@ -45,6 +45,11 @@ class KeepAliveThread(APRSDThread):
except KeyError:
login = self.config["ham"]["callsign"]
if pkt_tracker.is_initialized():
tracked_packets = len(pkt_tracker)
else:
tracked_packets = 0
keepalive = (
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}"
@ -53,7 +58,7 @@ class KeepAliveThread(APRSDThread):
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
len(pkt_tracker),
tracked_packets,
stats_obj.msgs_tx,
stats_obj.msgs_rx,
last_msg_time,

View File

@ -67,6 +67,7 @@ class APRSDPluginRXThread(APRSDRXThread):
packet = self._client.decode_packet(*args, **kwargs)
# LOG.debug(raw)
packet.log(header="RX")
packets.PacketList().rx(packet)
thread = APRSDPluginProcessPacketThread(
config=self.config,
packet=packet,
@ -101,7 +102,6 @@ class APRSDProcessPacketThread(APRSDThread):
"""Process a packet received from aprs-is server."""
LOG.debug(f"RXPKT-LOOP {self._loop_cnt}")
packet = self.packet
packets.PacketList().add(packet)
our_call = self.config["aprsd"]["callsign"].lower()
from_call = packet.from_call

View File

@ -67,7 +67,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
cl = client.factory.create().client
cl.send(packet.raw)
stats.APRSDStats().msgs_tx_inc()
packet_list.PacketList().add(packet)
packet_list.PacketList().tx(packet)
packet.last_send_time = datetime.datetime.now()
packet.last_send_attempt += 1
@ -115,7 +115,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
cl.send(self.packet.raw)
self.packet.send_count += 1
stats.APRSDStats().ack_tx_inc()
packet_list.PacketList().add(self.packet)
packet_list.PacketList().tx(self.packet)
self.packet.last_send_attempt += 1
self.packet.last_send_time = datetime.datetime.now()

View File

@ -1,3 +1,4 @@
import abc
import logging
import os
import pathlib
@ -9,7 +10,7 @@ from aprsd import config as aprsd_config
LOG = logging.getLogger("APRSD")
class ObjectStoreMixin:
class ObjectStoreMixin(metaclass=abc.ABCMeta):
"""Class 'MIXIN' intended to save/load object data.
The asumption of how this mixin is used:
@ -23,6 +24,13 @@ class ObjectStoreMixin:
When APRSD Starts, it calls load()
aprsd server -f (flush) will wipe all saved objects.
"""
@abc.abstractmethod
def is_initialized(self):
"""Return True if the class has been setup correctly.
If this returns False, the ObjectStore doesn't save anything.
"""
def __len__(self):
return len(self.data)
@ -36,13 +44,16 @@ class ObjectStoreMixin:
return self.data[id]
def _init_store(self):
sl = self._save_location()
if not os.path.exists(sl):
LOG.warning(f"Save location {sl} doesn't exist")
try:
os.makedirs(sl)
except Exception as ex:
LOG.exception(ex)
if self.is_initialized():
sl = self._save_location()
if not os.path.exists(sl):
LOG.warning(f"Save location {sl} doesn't exist")
try:
os.makedirs(sl)
except Exception as ex:
LOG.exception(ex)
else:
LOG.warning(f"{self.__class__.__name__} is not initialized")
def _save_location(self):
save_location = self.config.get("aprsd.save_location", None)
@ -68,38 +79,45 @@ class ObjectStoreMixin:
def save(self):
"""Save any queued to disk?"""
if len(self) > 0:
LOG.info(f"{self.__class__.__name__}::Saving {len(self)} entries to disk at {self._save_location()}")
with open(self._save_filename(), "wb+") as fp:
pickle.dump(self._dump(), fp)
else:
LOG.debug(
"{} Nothing to save, flushing old save file '{}'".format(
self.__class__.__name__,
self._save_filename(),
),
)
self.flush()
if self.is_initialized():
if len(self) > 0:
LOG.info(
f"{self.__class__.__name__}::Saving"
f" {len(self)} entries to disk at"
f"{self._save_location()}",
)
with open(self._save_filename(), "wb+") as fp:
pickle.dump(self._dump(), fp)
else:
LOG.debug(
"{} Nothing to save, flushing old save file '{}'".format(
self.__class__.__name__,
self._save_filename(),
),
)
self.flush()
def load(self):
if os.path.exists(self._save_filename()):
try:
with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
)
LOG.debug(f"{self.data}")
except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex)
self.data = {}
if self.is_initialized():
if os.path.exists(self._save_filename()):
try:
with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
)
LOG.debug(f"{self.data}")
except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex)
self.data = {}
def flush(self):
"""Nuke the old pickle file that stored the old results from last aprsd run."""
if os.path.exists(self._save_filename()):
pathlib.Path(self._save_filename()).unlink()
with self.lock:
self.data = {}
if self.is_initialized():
if os.path.exists(self._save_filename()):
pathlib.Path(self._save_filename()).unlink()
with self.lock:
self.data = {}

View File

@ -1,6 +1,9 @@
class RingBuffer:
"""class that implements a not-yet-full buffer"""
max: int = 100
data: list = []
def __init__(self, size_max):
self.max = size_max
self.data = []