From 4233827dea27277737812003ebeee9c3a98d473d Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 20 Oct 2021 14:07:22 -0400 Subject: [PATCH] Added objectstore Mixin This patch adds the new objectstore Mixin class that enables classes that store their date in self.data as a serializeable dict, to be able to be stored to disk at shutdown and loaded at startup. The SeenList and WatchList are now saved/loaded to/from disk. --- aprsd/config.py | 8 ++--- aprsd/dev.py | 9 +++-- aprsd/flask.py | 8 ++--- aprsd/main.py | 4 +++ aprsd/objectstore.py | 83 ++++++++++++++++++++++++++++++++++++++++++++ aprsd/packets.py | 37 ++++++++++---------- aprsd/stats.py | 4 +-- 7 files changed, 123 insertions(+), 30 deletions(-) create mode 100644 aprsd/objectstore.py diff --git a/aprsd/config.py b/aprsd/config.py index eb43fb5..6053a21 100644 --- a/aprsd/config.py +++ b/aprsd/config.py @@ -140,9 +140,9 @@ class Config(collections.UserDict): """ Example: d = {'meta': {'status': 'OK', 'status_code': 200}} - deep_get(d, ['meta', 'status_code']) # => 200 - deep_get(d, ['garbage', 'status_code']) # => None - deep_get(d, ['meta', 'garbage'], default='-') # => '-' + _get(d, ['meta', 'status_code']) # => 200 + _get(d, ['garbage', 'status_code']) # => None + _get(d, ['meta', 'garbage'], default='-') # => '-' """ if type(keys) is str and "." in keys: @@ -166,7 +166,7 @@ class Config(collections.UserDict): def exists(self, path): """See if a conf value exists.""" test = "-3.14TEST41.3-" - return (self.get(path, default=test) != test) + return self.get(path, default=test) != test def check_option(self, path, default_fail=None): """Make sure the config option doesn't have default value.""" diff --git a/aprsd/dev.py b/aprsd/dev.py index f4477f5..36c7e2a 100644 --- a/aprsd/dev.py +++ b/aprsd/dev.py @@ -211,10 +211,9 @@ def test_plugin( config = aprsd_config.parse_config(config_file) setup_logging(config, loglevel, False) - LOG.info(f"Test APRSD PLugin version: {aprsd.__version__}") + LOG.info(f"Test APRSD Plgin version: {aprsd.__version__}") if type(message) is tuple: message = " ".join(message) - LOG.info(f"P'{plugin_path}' F'{fromcall}' C'{message}'") client.Client(config) pm = plugin.PluginManager(config) @@ -224,6 +223,11 @@ def test_plugin( pm._init() obj = pm._create_class(plugin_path, plugin.APRSDPluginBase, config=config) # Register the plugin they wanted tested. + LOG.info( + "Testing plugin {} Version {}".format( + obj.__class__, obj.version, + ), + ) pm._pluggy_pm.register(obj) login = config["aprs"]["login"] @@ -233,6 +237,7 @@ def test_plugin( "format": "message", "msgNo": 1, } + LOG.info(f"P'{plugin_path}' F'{fromcall}' C'{message}'") for x in range(number): reply = pm.run(packet) diff --git a/aprsd/flask.py b/aprsd/flask.py index 7caf5f8..1e07829 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -296,14 +296,14 @@ class APRSDFlask(flask_classful.FlaskView): ) wl = packets.WatchList() if wl.is_enabled(): - watch_count = len(wl.callsigns) + watch_count = len(wl) watch_age = wl.max_delta() else: watch_count = 0 watch_age = 0 sl = packets.SeenList() - seen_count = len(sl.callsigns) + seen_count = len(sl) pm = plugin.PluginManager() plugins = pm.get_plugins() @@ -408,14 +408,14 @@ class APRSDFlask(flask_classful.FlaskView): # Convert the watch_list entries to age wl = packets.WatchList() new_list = {} - for call in wl.callsigns: + for call in wl.get_all(): # call_date = datetime.datetime.strptime( # str(wl.last_seen(call)), # "%Y-%m-%d %H:%M:%S.%f", # ) new_list[call] = { "last": wl.age(call), - "packets": wl.callsigns[call]["packets"].get(), + "packets": wl.get(call)["packets"].get(), } stats_dict["aprsd"]["watch_list"] = new_list diff --git a/aprsd/main.py b/aprsd/main.py index fea3d84..91e5bdd 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -148,6 +148,8 @@ def signal_handler(sig, frame): time.sleep(1.5) tracker = messaging.MsgTrack() tracker.save() + packets.WatchList().save() + packets.SeenList().save() LOG.info(stats.APRSDStats()) # signal.signal(signal.SIGTERM, sys.exit(0)) # sys.exit(0) @@ -481,6 +483,8 @@ def server( # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") messaging.MsgTrack().load() + packets.WatchList().load() + packets.SeenList().load() packets.PacketList(config=config) packets.WatchList(config=config) diff --git a/aprsd/objectstore.py b/aprsd/objectstore.py new file mode 100644 index 0000000..b6d5c37 --- /dev/null +++ b/aprsd/objectstore.py @@ -0,0 +1,83 @@ +import logging +import os +import pathlib +import pickle + +from aprsd import config as aprsd_config + + +LOG = logging.getLogger("APRSD") + + +class ObjectStoreMixin: + """Class 'MIXIN' intended to save/load object data. + + The asumption of how this mixin is used: + The using class has to have a: + * data in self.data as a dictionary + * a self.lock thread lock + * Class must specify self.save_file as the location. + + + When APRSD quits, it calls save() + When APRSD Starts, it calls load() + aprsd server -f (flush) will wipe all saved objects. + """ + + def __len__(self): + return len(self.data) + + def get_all(self): + with self.lock: + return self.data + + def get(self, id): + with self.lock: + return self.data[id] + + def _save_filename(self): + return "{}/{}.p".format( + aprsd_config.DEFAULT_CONFIG_DIR, + self.__class__.__name__.lower(), + ) + + def _dump(self): + dump = {} + with self.lock: + for key in self.data.keys(): + dump[key] = self.data[key] + + LOG.debug(f"{self.__class__.__name__}:: DUMP") + LOG.debug(dump) + + return dump + + def save(self): + """Save any queued to disk?""" + if len(self) > 0: + LOG.info(f"{self.__class__.__name__}::Saving {len(self)} entries to disk") + pickle.dump(self._dump(), open(self._save_filename(), "wb+")) + else: + LOG.debug( + "{} Nothing to save, flushing old save file '{}'".format( + self.__class__.__name__, + self._save_filename(), + ), + ) + self.flush() + + + def load(self): + if os.path.exists(self._save_filename()): + raw = pickle.load(open(self._save_filename(), "rb")) + if raw: + self.data = raw + LOG.debug(f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.") + LOG.debug(f"{self.data}") + + def flush(self): + """Nuke the old pickle file that stored the old results from last aprsd run.""" + if os.path.exists(self._save_filename()): + pathlib.Path(self._save_filename()).unlink() + with self.lock: + self.data = {} diff --git a/aprsd/packets.py b/aprsd/packets.py index 63b6b6c..a9d4d9c 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -3,7 +3,7 @@ import logging import threading import time -from aprsd import utils +from aprsd import objectstore, utils LOG = logging.getLogger("APRSD") @@ -65,18 +65,18 @@ class PacketList: return self.total_tx -class WatchList: +class WatchList(objectstore.ObjectStoreMixin): """Global watch list and info for callsigns.""" _instance = None - callsigns = {} + data = {} config = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.lock = threading.Lock() - cls.callsigns = {} + cls.data = {} return cls._instance def __init__(self, config=None): @@ -91,7 +91,7 @@ class WatchList: # a beacon from a callsign or some other mechanism to find # last time a message was seen by aprs-is. For now this # is all we can do. - self.callsigns[call] = { + self.data[call] = { "last": datetime.datetime.now(), "packets": utils.RingBuffer( ring_size, @@ -105,17 +105,18 @@ class WatchList: return False def callsign_in_watchlist(self, callsign): - return callsign in self.callsigns + return callsign in self.data def update_seen(self, packet): - callsign = packet["from"] - if self.callsign_in_watchlist(callsign): - self.callsigns[callsign]["last"] = datetime.datetime.now() - self.callsigns[callsign]["packets"].append(packet) + with self.lock: + callsign = packet["from"] + if self.callsign_in_watchlist(callsign): + self.data[callsign]["last"] = datetime.datetime.now() + self.data[callsign]["packets"].append(packet) def last_seen(self, callsign): if self.callsign_in_watchlist(callsign): - return self.callsigns[callsign]["last"] + return self.data[callsign]["last"] def age(self, callsign): now = datetime.datetime.now() @@ -150,18 +151,18 @@ class WatchList: return False -class SeenList: +class SeenList(objectstore.ObjectStoreMixin): """Global callsign seen list.""" _instance = None - callsigns = {} + data = {} config = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.lock = threading.Lock() - cls.callsigns = {} + cls.data = {} return cls._instance def update_seen(self, packet): @@ -170,13 +171,13 @@ class SeenList: callsign = packet["fromcall"] elif "from" in packet: callsign = packet["from"] - if callsign not in self.callsigns: - self.callsigns[callsign] = { + if callsign not in self.data: + self.data[callsign] = { "last": None, "count": 0, } - self.callsigns[callsign]["last"] = str(datetime.datetime.now()) - self.callsigns[callsign]["count"] += 1 + self.data[callsign]["last"] = str(datetime.datetime.now()) + self.data[callsign]["count"] += 1 def get_packet_type(packet): diff --git a/aprsd/stats.py b/aprsd/stats.py index 0590ff7..e629157 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -211,8 +211,8 @@ class APRSDStats: "memory_current_str": utils.human_size(self.memory), "memory_peak": self.memory_peak, "memory_peak_str": utils.human_size(self.memory_peak), - "watch_list": wl.callsigns, - "seen_list": sl.callsigns, + "watch_list": wl.get_all(), + "seen_list": sl.get_all(), }, "aprs-is": { "server": self.aprsis_server,