Reworked the notification threads and admin ui.

This patch updates the notification thread to send all packets
through the notification plugins.   The plugins themselves need to
do smart filter to not reply to every packet.  This allows for
more interesting plugins.

Also fixed an issue with the messages tab in the admin ui, not
showing all of the recieved packets.   The messages tab now also
sees all the packets that aprsd recieves.
This commit is contained in:
Hemna 2021-07-17 14:30:29 -04:00
parent 3d38402be2
commit 2fceba10e1
9 changed files with 231 additions and 136 deletions

View File

@ -44,7 +44,6 @@ APRSD Overview Diagram
----------------------
.. image:: https://raw.githubusercontent.com/craigerl/aprsd/master/docs/_static/aprsd_overview.svg?sanitize=true
.. image:: docs/_static/aprsd_overview.svg?sanitize=true
Typical use case
@ -55,7 +54,9 @@ the weather. an APRS message is sent, and then picked up by APRSD. The
APRS packet is decoded, and the message is sent through the list of plugins
for processing. For example, the WeatherPlugin picks up the message, fetches the weather
for the area around the user who sent the request, and then responds with
the weather conditions in that area.
the weather conditions in that area. Also includes a watch list of HAM
callsigns to look out for. The watch list can notify you when a HAM callsign
in the list is seen and now available to message on the APRS network.
APRSD Capabilities
@ -83,6 +84,18 @@ If it matches, the plugin runs. IF the regex doesn't match, the plugin is skipp
* VersionPlugin - Reports the version information for aprsd
List of core notification plugins
=================================
These plugins see all APRS messages from ham callsigns in the config's watch
list.
* NotifySeenPlugin - Send a message when a message is seen from a callsign in
the watch list. This is helpful when you want to know
when a friend is online in the ARPS network, but haven't
been seen in a while.
Current messages this will respond to:
======================================
@ -202,7 +215,6 @@ Output
- aprsd.plugins.version.VersionPlugin
logfile: /tmp/aprsd.log
logformat: '[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s] %(message)s - [%(pathname)s:%(lineno)d]'
plugin_dir: ~/.config/aprsd/plugins
trace: false
units: imperial
web:

View File

@ -51,13 +51,10 @@ class APRSDFlask(flask_classful.FlaskView):
self.config["aprsd"]["watch_list"],
),
)
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][
"watch_list"
].get("enabled", False):
watch_count = len(self.config["aprsd"]["watch_list"]["callsigns"])
watch_age = self.config["aprsd"]["watch_list"]["alert_time_seconds"]
age_time = {"seconds": watch_age}
watch_age = datetime.timedelta(**age_time)
wl = packets.WatchList()
if wl.is_enabled():
watch_count = len(wl.callsigns)
watch_age = wl.max_delta()
else:
watch_count = 0
watch_age = 0
@ -84,7 +81,7 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required
def packets(self):
packet_list = packets.PacketList().packet_list
packet_list = packets.PacketList().get()
return json.dumps(packet_list)
@auth.login_required
@ -111,14 +108,17 @@ class APRSDFlask(flask_classful.FlaskView):
stats_dict = stats_obj.stats()
# Convert the watch_list entries to age
watch_list = stats_dict["aprsd"]["watch_list"]
wl = packets.WatchList()
new_list = {}
for call in watch_list:
call_date = datetime.datetime.strptime(
watch_list[call],
"%Y-%m-%d %H:%M:%S.%f",
)
new_list[call] = str(now - call_date)
for call in wl.callsigns:
# call_date = datetime.datetime.strptime(
# str(wl.last_seen(call)),
# "%Y-%m-%d %H:%M:%S.%f",
# )
new_list[call] = {
"last": wl.age(call),
"packets": wl.callsigns[call]["packets"].get(),
}
stats_dict["aprsd"]["watch_list"] = new_list

View File

@ -1,7 +1,10 @@
import datetime
import logging
import threading
import time
from aprsd import utils
LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message"
@ -19,7 +22,7 @@ class PacketList:
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.packet_list = {}
cls._instance.packet_list = utils.RingBuffer(100)
cls._instance.lock = threading.Lock()
return cls._instance
@ -29,9 +32,96 @@ class PacketList:
def add(self, packet):
with self.lock:
now = time.time()
ts = str(now).split(".")[0]
self.packet_list[ts] = packet
packet["ts"] = time.time()
self.packet_list.append(packet)
def get(self):
with self.lock:
return self.packet_list.get()
class WatchList:
"""Global watch list and info for callsigns."""
_instance = None
callsigns = {}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.lock = threading.Lock()
cls.callsigns = {}
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
ring_size = config["aprsd"]["watch_list"]["packet_keep_count"]
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.callsigns[call] = {
"last": datetime.datetime.now(),
"packets": utils.RingBuffer(
ring_size,
),
}
def is_enabled(self):
if "watch_list" in self.config["aprsd"]:
return self.config["aprsd"]["watch_list"].get("enabled", False)
else:
return False
def callsign_in_watchlist(self, callsign):
return callsign in self.callsigns
def update_seen(self, packet):
callsign = packet["from"]
if self.callsign_in_watchlist(callsign):
self.callsigns[callsign]["last"] = datetime.datetime.now()
self.callsigns[callsign]["packets"].append(packet)
def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign):
return self.callsigns[callsign]["last"]
def age(self, callsign):
now = datetime.datetime.now()
return str(now - self.last_seen(callsign))
def max_delta(self, seconds=None):
watch_list_conf = self.config["aprsd"]["watch_list"]
if not seconds:
seconds = watch_list_conf["alert_time_seconds"]
max_timeout = {"seconds": seconds}
return datetime.timedelta(**max_timeout)
def is_old(self, callsign, seconds=None):
"""Watch list callsign last seen is old compared to now?
This tests to see if the last time we saw a callsign packet,
if that is older than the allowed timeout in the config.
We put this here so any notification plugin can use this
same test.
"""
age = self.age(callsign)
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
max_delta = self.max_delta(seconds=seconds)
if d > max_delta:
return True
else:
return False
def get_packet_type(packet):

View File

@ -30,7 +30,7 @@ CORE_MESSAGE_PLUGINS = [
]
CORE_NOTIFY_PLUGINS = [
"aprsd.plugins.notify.BaseNotifyPlugin",
"aprsd.plugins.notify.NotifySeenPlugin",
]
@ -298,27 +298,6 @@ class PluginManager:
for p_name in enabled_notify_plugins:
self._load_notify_plugin(p_name)
# FIXME(Walt) - no real need to support loading random python classes
# from a directory anymore. Need to remove this.
plugin_dir = self.config["aprsd"].get("plugin_dir", None)
if plugin_dir:
LOG.info("Trying to load custom plugins from '{}'".format(plugin_dir))
plugins_list = self.load_plugins_from_path(plugin_dir)
if plugins_list:
LOG.info("Discovered {} modules to load".format(len(plugins_list)))
for o in plugins_list:
plugin_obj = None
if plugin_obj:
LOG.info(
"Registering Command plugin '{}'({}) '{}'".format(
o["name"],
o["obj"].version,
o["obj"].command_regex,
),
)
self._pluggy_pm.register(o["obj"])
else:
LOG.info("Skipping Custom Plugins directory.")
LOG.info("Completed Plugin Loading.")

View File

@ -1,23 +1,52 @@
import logging
from aprsd import packets, plugin, trace
from aprsd import messaging, packets, plugin
LOG = logging.getLogger("APRSD")
class BaseNotifyPlugin(plugin.APRSDNotificationPluginBase):
"""Notification base plugin."""
class NotifySeenPlugin(plugin.APRSDNotificationPluginBase):
"""Notification plugin to send seen message for callsign.
This plugin will track callsigns in the watch list and report
when a callsign has been seen when the last time they were
seen was older than the configured age limit.
"""
version = "1.0"
@trace.trace
def __init__(self, config):
"""The aprsd config object is stored."""
super().__init__(config)
def notify(self, packet):
LOG.info("BaseNotifyPlugin")
notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"]
fromcall = packet.get("from")
packet_type = packets.get_packet_type(packet)
# we shouldn't notify the alert user that they are online.
if fromcall != notify_callsign:
return "{} was just seen by type:'{}'".format(fromcall, packet_type)
wl = packets.WatchList()
age = wl.age(fromcall)
if wl.is_old(packet["from"]):
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
fromcall,
age,
wl.max_delta(),
),
)
packet_type = packets.get_packet_type(packet)
# we shouldn't notify the alert user that they are online.
if fromcall != notify_callsign:
return "{} was just seen by type:'{}'".format(fromcall, packet_type)
else:
LOG.debug(
"Not old enough to notify callsign '{}' : {} < {}".format(
fromcall,
age,
wl.max_delta(),
),
)
return messaging.NULL_MESSAGE

View File

@ -3,7 +3,7 @@ import logging
import threading
import aprsd
from aprsd import plugin, utils
from aprsd import packets, plugin, utils
LOG = logging.getLogger("APRSD")
@ -33,7 +33,6 @@ class APRSDStats:
_mem_current = 0
_mem_peak = 0
_watch_list = {}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
@ -170,15 +169,6 @@ class APRSDStats:
with self.lock:
self._email_thread_last_time = datetime.datetime.now()
@property
def watch_list(self):
with self.lock:
return self._watch_list
def update_watch_list(self, watch_list):
with self.lock:
self._watch_list = watch_list
def stats(self):
now = datetime.datetime.now()
if self._email_thread_last_time:
@ -204,6 +194,8 @@ class APRSDStats:
for p in plugins:
plugin_stats[full_name_with_qualname(p)] = p.message_count
wl = packets.WatchList()
stats = {
"aprsd": {
"version": aprsd.__version__,
@ -212,7 +204,7 @@ class APRSDStats:
"memory_current_str": utils.human_size(self.memory),
"memory_peak": self.memory_peak,
"memory_peak_str": utils.human_size(self.memory_peak),
"watch_list": self.watch_list,
"watch_list": wl.callsigns,
},
"aprs-is": {
"server": self.aprsis_server,

View File

@ -121,78 +121,29 @@ class APRSDNotifyThread(APRSDThread):
super().__init__("NOTIFY_MSG")
self.msg_queues = msg_queues
self.config = config
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.last_seen[call] = datetime.datetime.now()
self.update_stats()
def update_stats(self):
stats_seen = {}
for callsign in self.last_seen:
stats_seen[callsign] = str(self.last_seen[callsign])
stats.APRSDStats().update_watch_list(stats_seen)
packets.WatchList(config=config)
def loop(self):
try:
packet = self.msg_queues["notify"].get(timeout=5)
wl = packets.WatchList()
if wl.callsign_in_watchlist(packet["from"]):
# NOW WE RUN through the notify plugins.
# If they return a msg, then we queue it for sending.
pm = plugin.PluginManager()
results = pm.notify(packet)
for reply in results:
if reply is not messaging.NULL_MESSAGE:
watch_list_conf = self.config["aprsd"]["watch_list"]
if packet["from"] in self.last_seen:
# We only notify if the last time a callsign was seen
# is older than the alert_time_seconds
now = datetime.datetime.now()
age = str(now - self.last_seen[packet["from"]])
msg = messaging.TextMessage(
self.config["aprs"]["login"],
watch_list_conf["alert_callsign"],
reply,
)
self.msg_queues["tx"].put(msg)
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
watch_list_conf = self.config["aprsd"]["watch_list"]
max_timeout = {
"seconds": watch_list_conf["alert_time_seconds"],
}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
packet["from"],
age,
max_delta,
),
)
# NOW WE RUN through the notify plugins.
# If they return a msg, then we queue it for sending.
pm = plugin.PluginManager()
results = pm.notify(packet)
for reply in results:
if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply))
msg = messaging.TextMessage(
self.config["aprs"]["login"],
watch_list_conf["alert_callsign"],
reply,
)
self.msg_queues["tx"].put(msg)
else:
LOG.debug("Got NULL MESSAGE from plugin")
else:
LOG.debug(
"Not old enough to notify callsign {}: {} < {}".format(
packet["from"],
age,
max_delta,
),
)
LOG.debug("Update last seen from {}".format(packet["from"]))
self.last_seen[packet["from"]] = now
self.update_stats()
wl.update_seen(packet)
else:
LOG.debug(
"Ignoring packet from '{}'. Not in watch list.".format(
@ -353,6 +304,7 @@ class APRSDRXThread(APRSDThread):
try:
LOG.debug("Adding packet to notify queue {}".format(packet["raw"]))
self.msg_queues["notify"].put(packet)
packets.PacketList().add(packet)
# since we can see packets from anyone now with the
# watch list, we need to filter messages directly only to us.

View File

@ -45,7 +45,6 @@ DEFAULT_CONFIG_DICT = {
"logformat": DEFAULT_LOG_FORMAT,
"dateformat": DEFAULT_DATE_FORMAT,
"trace": False,
"plugin_dir": "~/.config/aprsd/plugins",
"enabled_plugins": plugin.CORE_MESSAGE_PLUGINS,
"units": "imperial",
"watch_list": {
@ -54,6 +53,9 @@ DEFAULT_CONFIG_DICT = {
"alert_callsign": "NOCALL",
# 43200 is 12 hours
"alert_time_seconds": 43200,
# How many packets to save in a ring Buffer
# for a particular callsign
"packet_keep_count": 10,
"callsigns": [],
"enabled_plugins": plugin.CORE_NOTIFY_PLUGINS,
},
@ -435,3 +437,39 @@ def parse_delta_str(s):
else:
m = re.match(r"(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)", s)
return {key: float(val) for key, val in m.groupdict().items()}
class RingBuffer:
"""class that implements a not-yet-full buffer"""
def __init__(self, size_max):
self.max = size_max
self.data = []
class __Full:
"""class that implements a full buffer"""
def append(self, x):
"""Append an element overwriting the oldest one."""
self.data[self.cur] = x
self.cur = (self.cur + 1) % self.max
def get(self):
"""return list of elements in correct order"""
return self.data[self.cur :] + self.data[: self.cur]
def append(self, x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full
def get(self):
"""Return a list of elements from the oldest to the newest."""
return self.data
def __len__(self):
return len(self.data)

View File

@ -191,7 +191,7 @@ function update_stats( data ) {
var html_str = '<table class="ui celled striped table"><thead><tr><th>HAM Callsign</th><th>Age since last seen by APRSD</th></tr></thead><tbody>'
watchdiv.html('')
jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) {
html_str += '<tr><td class="collapsing"><i class="phone volume icon"></i>' + i + '</td><td>' + val + '</td></tr>'
html_str += '<tr><td class="collapsing"><i class="phone volume icon"></i>' + i + '</td><td>' + val["last"] + '</td></tr>'
});
html_str += "</tbody></table>";
watchdiv.append(html_str);
@ -205,10 +205,13 @@ function update_packets( data ) {
packetsdiv.html('')
}
jQuery.each(data, function(i, val) {
if ( packet_list.hasOwnProperty(i) == false ) {
packet_list[i] = val;
var d = new Date(i*1000).toLocaleDateString("en-US")
var t = new Date(i*1000).toLocaleTimeString("en-US")
if ( packet_list.hasOwnProperty(val["ts"]) == false ) {
// Store the packet
packet_list[val["ts"]] = val;
ts_str = val["ts"].toString();
ts = ts_str.split(".")[0]*1000;
var d = new Date(ts).toLocaleDateString("en-US")
var t = new Date(ts).toLocaleTimeString("en-US")
if (val.hasOwnProperty('from') == false) {
from = val['fromcall']
title_id = 'title_tx'