From 2fceba10e17422b075a47dbae7a17e33d6f1fb20 Mon Sep 17 00:00:00 2001 From: Hemna Date: Sat, 17 Jul 2021 14:30:29 -0400 Subject: [PATCH] Reworked the notification threads and admin ui. This patch updates the notification thread to send all packets through the notification plugins. The plugins themselves need to do smart filter to not reply to every packet. This allows for more interesting plugins. Also fixed an issue with the messages tab in the admin ui, not showing all of the recieved packets. The messages tab now also sees all the packets that aprsd recieves. --- README.rst | 18 +++++-- aprsd/flask.py | 30 +++++------ aprsd/packets.py | 98 +++++++++++++++++++++++++++++++++-- aprsd/plugin.py | 23 +------- aprsd/plugins/notify.py | 45 +++++++++++++--- aprsd/stats.py | 16 ++---- aprsd/threads.py | 84 +++++++----------------------- aprsd/utils.py | 40 +++++++++++++- aprsd/web/static/js/charts.js | 13 +++-- 9 files changed, 231 insertions(+), 136 deletions(-) diff --git a/README.rst b/README.rst index 2790c6a..d14091d 100644 --- a/README.rst +++ b/README.rst @@ -44,7 +44,6 @@ APRSD Overview Diagram ---------------------- .. image:: https://raw.githubusercontent.com/craigerl/aprsd/master/docs/_static/aprsd_overview.svg?sanitize=true -.. image:: docs/_static/aprsd_overview.svg?sanitize=true Typical use case @@ -55,7 +54,9 @@ the weather. an APRS message is sent, and then picked up by APRSD. The APRS packet is decoded, and the message is sent through the list of plugins for processing. For example, the WeatherPlugin picks up the message, fetches the weather for the area around the user who sent the request, and then responds with -the weather conditions in that area. +the weather conditions in that area. Also includes a watch list of HAM +callsigns to look out for. The watch list can notify you when a HAM callsign +in the list is seen and now available to message on the APRS network. APRSD Capabilities @@ -83,6 +84,18 @@ If it matches, the plugin runs. IF the regex doesn't match, the plugin is skipp * VersionPlugin - Reports the version information for aprsd +List of core notification plugins +================================= + +These plugins see all APRS messages from ham callsigns in the config's watch +list. + +* NotifySeenPlugin - Send a message when a message is seen from a callsign in + the watch list. This is helpful when you want to know + when a friend is online in the ARPS network, but haven't + been seen in a while. + + Current messages this will respond to: ====================================== @@ -202,7 +215,6 @@ Output - aprsd.plugins.version.VersionPlugin logfile: /tmp/aprsd.log logformat: '[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s] %(message)s - [%(pathname)s:%(lineno)d]' - plugin_dir: ~/.config/aprsd/plugins trace: false units: imperial web: diff --git a/aprsd/flask.py b/aprsd/flask.py index 2596ac7..160d150 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -51,13 +51,10 @@ class APRSDFlask(flask_classful.FlaskView): self.config["aprsd"]["watch_list"], ), ) - if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ - "watch_list" - ].get("enabled", False): - watch_count = len(self.config["aprsd"]["watch_list"]["callsigns"]) - watch_age = self.config["aprsd"]["watch_list"]["alert_time_seconds"] - age_time = {"seconds": watch_age} - watch_age = datetime.timedelta(**age_time) + wl = packets.WatchList() + if wl.is_enabled(): + watch_count = len(wl.callsigns) + watch_age = wl.max_delta() else: watch_count = 0 watch_age = 0 @@ -84,7 +81,7 @@ class APRSDFlask(flask_classful.FlaskView): @auth.login_required def packets(self): - packet_list = packets.PacketList().packet_list + packet_list = packets.PacketList().get() return json.dumps(packet_list) @auth.login_required @@ -111,14 +108,17 @@ class APRSDFlask(flask_classful.FlaskView): stats_dict = stats_obj.stats() # Convert the watch_list entries to age - watch_list = stats_dict["aprsd"]["watch_list"] + wl = packets.WatchList() new_list = {} - for call in watch_list: - call_date = datetime.datetime.strptime( - watch_list[call], - "%Y-%m-%d %H:%M:%S.%f", - ) - new_list[call] = str(now - call_date) + for call in wl.callsigns: + # call_date = datetime.datetime.strptime( + # str(wl.last_seen(call)), + # "%Y-%m-%d %H:%M:%S.%f", + # ) + new_list[call] = { + "last": wl.age(call), + "packets": wl.callsigns[call]["packets"].get(), + } stats_dict["aprsd"]["watch_list"] = new_list diff --git a/aprsd/packets.py b/aprsd/packets.py index 4320a46..a2d19c6 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -1,7 +1,10 @@ +import datetime import logging import threading import time +from aprsd import utils + LOG = logging.getLogger("APRSD") PACKET_TYPE_MESSAGE = "message" @@ -19,7 +22,7 @@ class PacketList: def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance.packet_list = {} + cls._instance.packet_list = utils.RingBuffer(100) cls._instance.lock = threading.Lock() return cls._instance @@ -29,9 +32,96 @@ class PacketList: def add(self, packet): with self.lock: - now = time.time() - ts = str(now).split(".")[0] - self.packet_list[ts] = packet + packet["ts"] = time.time() + self.packet_list.append(packet) + + def get(self): + with self.lock: + return self.packet_list.get() + + +class WatchList: + """Global watch list and info for callsigns.""" + + _instance = None + callsigns = {} + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.lock = threading.Lock() + cls.callsigns = {} + return cls._instance + + def __init__(self, config=None): + if config: + self.config = config + + ring_size = config["aprsd"]["watch_list"]["packet_keep_count"] + + for callsign in config["aprsd"]["watch_list"].get("callsigns", []): + call = callsign.replace("*", "") + # FIXME(waboring) - we should fetch the last time we saw + # a beacon from a callsign or some other mechanism to find + # last time a message was seen by aprs-is. For now this + # is all we can do. + self.callsigns[call] = { + "last": datetime.datetime.now(), + "packets": utils.RingBuffer( + ring_size, + ), + } + + def is_enabled(self): + if "watch_list" in self.config["aprsd"]: + return self.config["aprsd"]["watch_list"].get("enabled", False) + else: + return False + + def callsign_in_watchlist(self, callsign): + return callsign in self.callsigns + + def update_seen(self, packet): + callsign = packet["from"] + if self.callsign_in_watchlist(callsign): + self.callsigns[callsign]["last"] = datetime.datetime.now() + self.callsigns[callsign]["packets"].append(packet) + + def last_seen(self, callsign): + if self.callsign_in_watchlist(callsign): + return self.callsigns[callsign]["last"] + + def age(self, callsign): + now = datetime.datetime.now() + return str(now - self.last_seen(callsign)) + + def max_delta(self, seconds=None): + watch_list_conf = self.config["aprsd"]["watch_list"] + if not seconds: + seconds = watch_list_conf["alert_time_seconds"] + max_timeout = {"seconds": seconds} + return datetime.timedelta(**max_timeout) + + def is_old(self, callsign, seconds=None): + """Watch list callsign last seen is old compared to now? + + This tests to see if the last time we saw a callsign packet, + if that is older than the allowed timeout in the config. + + We put this here so any notification plugin can use this + same test. + """ + age = self.age(callsign) + + delta = utils.parse_delta_str(age) + d = datetime.timedelta(**delta) + + max_delta = self.max_delta(seconds=seconds) + + if d > max_delta: + return True + else: + return False def get_packet_type(packet): diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 16ced7f..9d9c62f 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -30,7 +30,7 @@ CORE_MESSAGE_PLUGINS = [ ] CORE_NOTIFY_PLUGINS = [ - "aprsd.plugins.notify.BaseNotifyPlugin", + "aprsd.plugins.notify.NotifySeenPlugin", ] @@ -298,27 +298,6 @@ class PluginManager: for p_name in enabled_notify_plugins: self._load_notify_plugin(p_name) - # FIXME(Walt) - no real need to support loading random python classes - # from a directory anymore. Need to remove this. - plugin_dir = self.config["aprsd"].get("plugin_dir", None) - if plugin_dir: - LOG.info("Trying to load custom plugins from '{}'".format(plugin_dir)) - plugins_list = self.load_plugins_from_path(plugin_dir) - if plugins_list: - LOG.info("Discovered {} modules to load".format(len(plugins_list))) - for o in plugins_list: - plugin_obj = None - - if plugin_obj: - LOG.info( - "Registering Command plugin '{}'({}) '{}'".format( - o["name"], - o["obj"].version, - o["obj"].command_regex, - ), - ) - self._pluggy_pm.register(o["obj"]) - else: LOG.info("Skipping Custom Plugins directory.") LOG.info("Completed Plugin Loading.") diff --git a/aprsd/plugins/notify.py b/aprsd/plugins/notify.py index eaca7bb..dd48500 100644 --- a/aprsd/plugins/notify.py +++ b/aprsd/plugins/notify.py @@ -1,23 +1,52 @@ import logging -from aprsd import packets, plugin, trace +from aprsd import messaging, packets, plugin LOG = logging.getLogger("APRSD") -class BaseNotifyPlugin(plugin.APRSDNotificationPluginBase): - """Notification base plugin.""" +class NotifySeenPlugin(plugin.APRSDNotificationPluginBase): + """Notification plugin to send seen message for callsign. + + + This plugin will track callsigns in the watch list and report + when a callsign has been seen when the last time they were + seen was older than the configured age limit. + """ version = "1.0" - @trace.trace + def __init__(self, config): + """The aprsd config object is stored.""" + super().__init__(config) + def notify(self, packet): LOG.info("BaseNotifyPlugin") notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"] fromcall = packet.get("from") - packet_type = packets.get_packet_type(packet) - # we shouldn't notify the alert user that they are online. - if fromcall != notify_callsign: - return "{} was just seen by type:'{}'".format(fromcall, packet_type) + wl = packets.WatchList() + age = wl.age(fromcall) + + if wl.is_old(packet["from"]): + LOG.info( + "NOTIFY {} last seen {} max age={}".format( + fromcall, + age, + wl.max_delta(), + ), + ) + packet_type = packets.get_packet_type(packet) + # we shouldn't notify the alert user that they are online. + if fromcall != notify_callsign: + return "{} was just seen by type:'{}'".format(fromcall, packet_type) + else: + LOG.debug( + "Not old enough to notify callsign '{}' : {} < {}".format( + fromcall, + age, + wl.max_delta(), + ), + ) + return messaging.NULL_MESSAGE diff --git a/aprsd/stats.py b/aprsd/stats.py index 235ad6e..de5a226 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -3,7 +3,7 @@ import logging import threading import aprsd -from aprsd import plugin, utils +from aprsd import packets, plugin, utils LOG = logging.getLogger("APRSD") @@ -33,7 +33,6 @@ class APRSDStats: _mem_current = 0 _mem_peak = 0 - _watch_list = {} def __new__(cls, *args, **kwargs): if cls._instance is None: @@ -170,15 +169,6 @@ class APRSDStats: with self.lock: self._email_thread_last_time = datetime.datetime.now() - @property - def watch_list(self): - with self.lock: - return self._watch_list - - def update_watch_list(self, watch_list): - with self.lock: - self._watch_list = watch_list - def stats(self): now = datetime.datetime.now() if self._email_thread_last_time: @@ -204,6 +194,8 @@ class APRSDStats: for p in plugins: plugin_stats[full_name_with_qualname(p)] = p.message_count + wl = packets.WatchList() + stats = { "aprsd": { "version": aprsd.__version__, @@ -212,7 +204,7 @@ class APRSDStats: "memory_current_str": utils.human_size(self.memory), "memory_peak": self.memory_peak, "memory_peak_str": utils.human_size(self.memory_peak), - "watch_list": self.watch_list, + "watch_list": wl.callsigns, }, "aprs-is": { "server": self.aprsis_server, diff --git a/aprsd/threads.py b/aprsd/threads.py index 36e9b1f..a776d69 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -121,78 +121,29 @@ class APRSDNotifyThread(APRSDThread): super().__init__("NOTIFY_MSG") self.msg_queues = msg_queues self.config = config - for callsign in config["aprsd"]["watch_list"].get("callsigns", []): - call = callsign.replace("*", "") - # FIXME(waboring) - we should fetch the last time we saw - # a beacon from a callsign or some other mechanism to find - # last time a message was seen by aprs-is. For now this - # is all we can do. - self.last_seen[call] = datetime.datetime.now() - self.update_stats() - - def update_stats(self): - stats_seen = {} - for callsign in self.last_seen: - stats_seen[callsign] = str(self.last_seen[callsign]) - - stats.APRSDStats().update_watch_list(stats_seen) + packets.WatchList(config=config) def loop(self): try: packet = self.msg_queues["notify"].get(timeout=5) + wl = packets.WatchList() + if wl.callsign_in_watchlist(packet["from"]): + # NOW WE RUN through the notify plugins. + # If they return a msg, then we queue it for sending. + pm = plugin.PluginManager() + results = pm.notify(packet) + for reply in results: + if reply is not messaging.NULL_MESSAGE: + watch_list_conf = self.config["aprsd"]["watch_list"] - if packet["from"] in self.last_seen: - # We only notify if the last time a callsign was seen - # is older than the alert_time_seconds - now = datetime.datetime.now() - age = str(now - self.last_seen[packet["from"]]) + msg = messaging.TextMessage( + self.config["aprs"]["login"], + watch_list_conf["alert_callsign"], + reply, + ) + self.msg_queues["tx"].put(msg) - delta = utils.parse_delta_str(age) - d = datetime.timedelta(**delta) - - watch_list_conf = self.config["aprsd"]["watch_list"] - max_timeout = { - "seconds": watch_list_conf["alert_time_seconds"], - } - max_delta = datetime.timedelta(**max_timeout) - - if d > max_delta: - LOG.info( - "NOTIFY {} last seen {} max age={}".format( - packet["from"], - age, - max_delta, - ), - ) - # NOW WE RUN through the notify plugins. - # If they return a msg, then we queue it for sending. - pm = plugin.PluginManager() - results = pm.notify(packet) - for reply in results: - if reply is not messaging.NULL_MESSAGE: - LOG.debug("Sending '{}'".format(reply)) - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - watch_list_conf["alert_callsign"], - reply, - ) - self.msg_queues["tx"].put(msg) - else: - LOG.debug("Got NULL MESSAGE from plugin") - - else: - LOG.debug( - "Not old enough to notify callsign {}: {} < {}".format( - packet["from"], - age, - max_delta, - ), - ) - - LOG.debug("Update last seen from {}".format(packet["from"])) - self.last_seen[packet["from"]] = now - self.update_stats() + wl.update_seen(packet) else: LOG.debug( "Ignoring packet from '{}'. Not in watch list.".format( @@ -353,6 +304,7 @@ class APRSDRXThread(APRSDThread): try: LOG.debug("Adding packet to notify queue {}".format(packet["raw"])) self.msg_queues["notify"].put(packet) + packets.PacketList().add(packet) # since we can see packets from anyone now with the # watch list, we need to filter messages directly only to us. diff --git a/aprsd/utils.py b/aprsd/utils.py index 172773f..0e3255c 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -45,7 +45,6 @@ DEFAULT_CONFIG_DICT = { "logformat": DEFAULT_LOG_FORMAT, "dateformat": DEFAULT_DATE_FORMAT, "trace": False, - "plugin_dir": "~/.config/aprsd/plugins", "enabled_plugins": plugin.CORE_MESSAGE_PLUGINS, "units": "imperial", "watch_list": { @@ -54,6 +53,9 @@ DEFAULT_CONFIG_DICT = { "alert_callsign": "NOCALL", # 43200 is 12 hours "alert_time_seconds": 43200, + # How many packets to save in a ring Buffer + # for a particular callsign + "packet_keep_count": 10, "callsigns": [], "enabled_plugins": plugin.CORE_NOTIFY_PLUGINS, }, @@ -435,3 +437,39 @@ def parse_delta_str(s): else: m = re.match(r"(?P\d+):(?P\d+):(?P\d[\.\d+]*)", s) return {key: float(val) for key, val in m.groupdict().items()} + + +class RingBuffer: + """class that implements a not-yet-full buffer""" + + def __init__(self, size_max): + self.max = size_max + self.data = [] + + class __Full: + """class that implements a full buffer""" + + def append(self, x): + """Append an element overwriting the oldest one.""" + self.data[self.cur] = x + self.cur = (self.cur + 1) % self.max + + def get(self): + """return list of elements in correct order""" + return self.data[self.cur :] + self.data[: self.cur] + + def append(self, x): + """append an element at the end of the buffer""" + + self.data.append(x) + if len(self.data) == self.max: + self.cur = 0 + # Permanently change self's class from non-full to full + self.__class__ = self.__Full + + def get(self): + """Return a list of elements from the oldest to the newest.""" + return self.data + + def __len__(self): + return len(self.data) diff --git a/aprsd/web/static/js/charts.js b/aprsd/web/static/js/charts.js index 0f79ab5..e2b3a7f 100644 --- a/aprsd/web/static/js/charts.js +++ b/aprsd/web/static/js/charts.js @@ -191,7 +191,7 @@ function update_stats( data ) { var html_str = '' watchdiv.html('') jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) { - html_str += '' + html_str += '' }); html_str += "
HAM CallsignAge since last seen by APRSD
' + i + '' + val + '
' + i + '' + val["last"] + '
"; watchdiv.append(html_str); @@ -205,10 +205,13 @@ function update_packets( data ) { packetsdiv.html('') } jQuery.each(data, function(i, val) { - if ( packet_list.hasOwnProperty(i) == false ) { - packet_list[i] = val; - var d = new Date(i*1000).toLocaleDateString("en-US") - var t = new Date(i*1000).toLocaleTimeString("en-US") + if ( packet_list.hasOwnProperty(val["ts"]) == false ) { + // Store the packet + packet_list[val["ts"]] = val; + ts_str = val["ts"].toString(); + ts = ts_str.split(".")[0]*1000; + var d = new Date(ts).toLocaleDateString("en-US") + var t = new Date(ts).toLocaleTimeString("en-US") if (val.hasOwnProperty('from') == false) { from = val['fromcall'] title_id = 'title_tx'