1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-04 16:01:15 -05:00

Merge pull request #66 from craigerl/notify_rework

Reworked the notification threads and admin ui.
This commit is contained in:
Walter A. Boring IV 2021-07-17 14:45:18 -04:00 committed by GitHub
commit b606495fbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 231 additions and 136 deletions

View File

@ -44,7 +44,6 @@ APRSD Overview Diagram
---------------------- ----------------------
.. image:: https://raw.githubusercontent.com/craigerl/aprsd/master/docs/_static/aprsd_overview.svg?sanitize=true .. image:: https://raw.githubusercontent.com/craigerl/aprsd/master/docs/_static/aprsd_overview.svg?sanitize=true
.. image:: docs/_static/aprsd_overview.svg?sanitize=true
Typical use case Typical use case
@ -55,7 +54,9 @@ the weather. an APRS message is sent, and then picked up by APRSD. The
APRS packet is decoded, and the message is sent through the list of plugins APRS packet is decoded, and the message is sent through the list of plugins
for processing. For example, the WeatherPlugin picks up the message, fetches the weather for processing. For example, the WeatherPlugin picks up the message, fetches the weather
for the area around the user who sent the request, and then responds with for the area around the user who sent the request, and then responds with
the weather conditions in that area. the weather conditions in that area. Also includes a watch list of HAM
callsigns to look out for. The watch list can notify you when a HAM callsign
in the list is seen and now available to message on the APRS network.
APRSD Capabilities APRSD Capabilities
@ -83,6 +84,18 @@ If it matches, the plugin runs. IF the regex doesn't match, the plugin is skipp
* VersionPlugin - Reports the version information for aprsd * VersionPlugin - Reports the version information for aprsd
List of core notification plugins
=================================
These plugins see all APRS messages from ham callsigns in the config's watch
list.
* NotifySeenPlugin - Send a message when a message is seen from a callsign in
the watch list. This is helpful when you want to know
when a friend is online in the ARPS network, but haven't
been seen in a while.
Current messages this will respond to: Current messages this will respond to:
====================================== ======================================
@ -202,7 +215,6 @@ Output
- aprsd.plugins.version.VersionPlugin - aprsd.plugins.version.VersionPlugin
logfile: /tmp/aprsd.log logfile: /tmp/aprsd.log
logformat: '[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s] %(message)s - [%(pathname)s:%(lineno)d]' logformat: '[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s] %(message)s - [%(pathname)s:%(lineno)d]'
plugin_dir: ~/.config/aprsd/plugins
trace: false trace: false
units: imperial units: imperial
web: web:

View File

@ -51,13 +51,10 @@ class APRSDFlask(flask_classful.FlaskView):
self.config["aprsd"]["watch_list"], self.config["aprsd"]["watch_list"],
), ),
) )
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ wl = packets.WatchList()
"watch_list" if wl.is_enabled():
].get("enabled", False): watch_count = len(wl.callsigns)
watch_count = len(self.config["aprsd"]["watch_list"]["callsigns"]) watch_age = wl.max_delta()
watch_age = self.config["aprsd"]["watch_list"]["alert_time_seconds"]
age_time = {"seconds": watch_age}
watch_age = datetime.timedelta(**age_time)
else: else:
watch_count = 0 watch_count = 0
watch_age = 0 watch_age = 0
@ -84,7 +81,7 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required @auth.login_required
def packets(self): def packets(self):
packet_list = packets.PacketList().packet_list packet_list = packets.PacketList().get()
return json.dumps(packet_list) return json.dumps(packet_list)
@auth.login_required @auth.login_required
@ -111,14 +108,17 @@ class APRSDFlask(flask_classful.FlaskView):
stats_dict = stats_obj.stats() stats_dict = stats_obj.stats()
# Convert the watch_list entries to age # Convert the watch_list entries to age
watch_list = stats_dict["aprsd"]["watch_list"] wl = packets.WatchList()
new_list = {} new_list = {}
for call in watch_list: for call in wl.callsigns:
call_date = datetime.datetime.strptime( # call_date = datetime.datetime.strptime(
watch_list[call], # str(wl.last_seen(call)),
"%Y-%m-%d %H:%M:%S.%f", # "%Y-%m-%d %H:%M:%S.%f",
) # )
new_list[call] = str(now - call_date) new_list[call] = {
"last": wl.age(call),
"packets": wl.callsigns[call]["packets"].get(),
}
stats_dict["aprsd"]["watch_list"] = new_list stats_dict["aprsd"]["watch_list"] = new_list

View File

@ -1,7 +1,10 @@
import datetime
import logging import logging
import threading import threading
import time import time
from aprsd import utils
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message" PACKET_TYPE_MESSAGE = "message"
@ -19,7 +22,7 @@ class PacketList:
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance.packet_list = {} cls._instance.packet_list = utils.RingBuffer(100)
cls._instance.lock = threading.Lock() cls._instance.lock = threading.Lock()
return cls._instance return cls._instance
@ -29,9 +32,96 @@ class PacketList:
def add(self, packet): def add(self, packet):
with self.lock: with self.lock:
now = time.time() packet["ts"] = time.time()
ts = str(now).split(".")[0] self.packet_list.append(packet)
self.packet_list[ts] = packet
def get(self):
with self.lock:
return self.packet_list.get()
class WatchList:
"""Global watch list and info for callsigns."""
_instance = None
callsigns = {}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.lock = threading.Lock()
cls.callsigns = {}
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
ring_size = config["aprsd"]["watch_list"]["packet_keep_count"]
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.callsigns[call] = {
"last": datetime.datetime.now(),
"packets": utils.RingBuffer(
ring_size,
),
}
def is_enabled(self):
if "watch_list" in self.config["aprsd"]:
return self.config["aprsd"]["watch_list"].get("enabled", False)
else:
return False
def callsign_in_watchlist(self, callsign):
return callsign in self.callsigns
def update_seen(self, packet):
callsign = packet["from"]
if self.callsign_in_watchlist(callsign):
self.callsigns[callsign]["last"] = datetime.datetime.now()
self.callsigns[callsign]["packets"].append(packet)
def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign):
return self.callsigns[callsign]["last"]
def age(self, callsign):
now = datetime.datetime.now()
return str(now - self.last_seen(callsign))
def max_delta(self, seconds=None):
watch_list_conf = self.config["aprsd"]["watch_list"]
if not seconds:
seconds = watch_list_conf["alert_time_seconds"]
max_timeout = {"seconds": seconds}
return datetime.timedelta(**max_timeout)
def is_old(self, callsign, seconds=None):
"""Watch list callsign last seen is old compared to now?
This tests to see if the last time we saw a callsign packet,
if that is older than the allowed timeout in the config.
We put this here so any notification plugin can use this
same test.
"""
age = self.age(callsign)
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
max_delta = self.max_delta(seconds=seconds)
if d > max_delta:
return True
else:
return False
def get_packet_type(packet): def get_packet_type(packet):

View File

@ -30,7 +30,7 @@ CORE_MESSAGE_PLUGINS = [
] ]
CORE_NOTIFY_PLUGINS = [ CORE_NOTIFY_PLUGINS = [
"aprsd.plugins.notify.BaseNotifyPlugin", "aprsd.plugins.notify.NotifySeenPlugin",
] ]
@ -298,27 +298,6 @@ class PluginManager:
for p_name in enabled_notify_plugins: for p_name in enabled_notify_plugins:
self._load_notify_plugin(p_name) self._load_notify_plugin(p_name)
# FIXME(Walt) - no real need to support loading random python classes
# from a directory anymore. Need to remove this.
plugin_dir = self.config["aprsd"].get("plugin_dir", None)
if plugin_dir:
LOG.info("Trying to load custom plugins from '{}'".format(plugin_dir))
plugins_list = self.load_plugins_from_path(plugin_dir)
if plugins_list:
LOG.info("Discovered {} modules to load".format(len(plugins_list)))
for o in plugins_list:
plugin_obj = None
if plugin_obj:
LOG.info(
"Registering Command plugin '{}'({}) '{}'".format(
o["name"],
o["obj"].version,
o["obj"].command_regex,
),
)
self._pluggy_pm.register(o["obj"])
else: else:
LOG.info("Skipping Custom Plugins directory.") LOG.info("Skipping Custom Plugins directory.")
LOG.info("Completed Plugin Loading.") LOG.info("Completed Plugin Loading.")

View File

@ -1,23 +1,52 @@
import logging import logging
from aprsd import packets, plugin, trace from aprsd import messaging, packets, plugin
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
class BaseNotifyPlugin(plugin.APRSDNotificationPluginBase): class NotifySeenPlugin(plugin.APRSDNotificationPluginBase):
"""Notification base plugin.""" """Notification plugin to send seen message for callsign.
This plugin will track callsigns in the watch list and report
when a callsign has been seen when the last time they were
seen was older than the configured age limit.
"""
version = "1.0" version = "1.0"
@trace.trace def __init__(self, config):
"""The aprsd config object is stored."""
super().__init__(config)
def notify(self, packet): def notify(self, packet):
LOG.info("BaseNotifyPlugin") LOG.info("BaseNotifyPlugin")
notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"] notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"]
fromcall = packet.get("from") fromcall = packet.get("from")
wl = packets.WatchList()
age = wl.age(fromcall)
if wl.is_old(packet["from"]):
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
fromcall,
age,
wl.max_delta(),
),
)
packet_type = packets.get_packet_type(packet) packet_type = packets.get_packet_type(packet)
# we shouldn't notify the alert user that they are online. # we shouldn't notify the alert user that they are online.
if fromcall != notify_callsign: if fromcall != notify_callsign:
return "{} was just seen by type:'{}'".format(fromcall, packet_type) return "{} was just seen by type:'{}'".format(fromcall, packet_type)
else:
LOG.debug(
"Not old enough to notify callsign '{}' : {} < {}".format(
fromcall,
age,
wl.max_delta(),
),
)
return messaging.NULL_MESSAGE

View File

@ -3,7 +3,7 @@ import logging
import threading import threading
import aprsd import aprsd
from aprsd import plugin, utils from aprsd import packets, plugin, utils
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -33,7 +33,6 @@ class APRSDStats:
_mem_current = 0 _mem_current = 0
_mem_peak = 0 _mem_peak = 0
_watch_list = {}
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
@ -170,15 +169,6 @@ class APRSDStats:
with self.lock: with self.lock:
self._email_thread_last_time = datetime.datetime.now() self._email_thread_last_time = datetime.datetime.now()
@property
def watch_list(self):
with self.lock:
return self._watch_list
def update_watch_list(self, watch_list):
with self.lock:
self._watch_list = watch_list
def stats(self): def stats(self):
now = datetime.datetime.now() now = datetime.datetime.now()
if self._email_thread_last_time: if self._email_thread_last_time:
@ -204,6 +194,8 @@ class APRSDStats:
for p in plugins: for p in plugins:
plugin_stats[full_name_with_qualname(p)] = p.message_count plugin_stats[full_name_with_qualname(p)] = p.message_count
wl = packets.WatchList()
stats = { stats = {
"aprsd": { "aprsd": {
"version": aprsd.__version__, "version": aprsd.__version__,
@ -212,7 +204,7 @@ class APRSDStats:
"memory_current_str": utils.human_size(self.memory), "memory_current_str": utils.human_size(self.memory),
"memory_peak": self.memory_peak, "memory_peak": self.memory_peak,
"memory_peak_str": utils.human_size(self.memory_peak), "memory_peak_str": utils.human_size(self.memory_peak),
"watch_list": self.watch_list, "watch_list": wl.callsigns,
}, },
"aprs-is": { "aprs-is": {
"server": self.aprsis_server, "server": self.aprsis_server,

View File

@ -121,56 +121,20 @@ class APRSDNotifyThread(APRSDThread):
super().__init__("NOTIFY_MSG") super().__init__("NOTIFY_MSG")
self.msg_queues = msg_queues self.msg_queues = msg_queues
self.config = config self.config = config
for callsign in config["aprsd"]["watch_list"].get("callsigns", []): packets.WatchList(config=config)
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.last_seen[call] = datetime.datetime.now()
self.update_stats()
def update_stats(self):
stats_seen = {}
for callsign in self.last_seen:
stats_seen[callsign] = str(self.last_seen[callsign])
stats.APRSDStats().update_watch_list(stats_seen)
def loop(self): def loop(self):
try: try:
packet = self.msg_queues["notify"].get(timeout=5) packet = self.msg_queues["notify"].get(timeout=5)
wl = packets.WatchList()
if packet["from"] in self.last_seen: if wl.callsign_in_watchlist(packet["from"]):
# We only notify if the last time a callsign was seen
# is older than the alert_time_seconds
now = datetime.datetime.now()
age = str(now - self.last_seen[packet["from"]])
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
watch_list_conf = self.config["aprsd"]["watch_list"]
max_timeout = {
"seconds": watch_list_conf["alert_time_seconds"],
}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
packet["from"],
age,
max_delta,
),
)
# NOW WE RUN through the notify plugins. # NOW WE RUN through the notify plugins.
# If they return a msg, then we queue it for sending. # If they return a msg, then we queue it for sending.
pm = plugin.PluginManager() pm = plugin.PluginManager()
results = pm.notify(packet) results = pm.notify(packet)
for reply in results: for reply in results:
if reply is not messaging.NULL_MESSAGE: if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply)) watch_list_conf = self.config["aprsd"]["watch_list"]
msg = messaging.TextMessage( msg = messaging.TextMessage(
self.config["aprs"]["login"], self.config["aprs"]["login"],
@ -178,21 +142,8 @@ class APRSDNotifyThread(APRSDThread):
reply, reply,
) )
self.msg_queues["tx"].put(msg) self.msg_queues["tx"].put(msg)
else:
LOG.debug("Got NULL MESSAGE from plugin")
else: wl.update_seen(packet)
LOG.debug(
"Not old enough to notify callsign {}: {} < {}".format(
packet["from"],
age,
max_delta,
),
)
LOG.debug("Update last seen from {}".format(packet["from"]))
self.last_seen[packet["from"]] = now
self.update_stats()
else: else:
LOG.debug( LOG.debug(
"Ignoring packet from '{}'. Not in watch list.".format( "Ignoring packet from '{}'. Not in watch list.".format(
@ -353,6 +304,7 @@ class APRSDRXThread(APRSDThread):
try: try:
LOG.debug("Adding packet to notify queue {}".format(packet["raw"])) LOG.debug("Adding packet to notify queue {}".format(packet["raw"]))
self.msg_queues["notify"].put(packet) self.msg_queues["notify"].put(packet)
packets.PacketList().add(packet)
# since we can see packets from anyone now with the # since we can see packets from anyone now with the
# watch list, we need to filter messages directly only to us. # watch list, we need to filter messages directly only to us.

View File

@ -45,7 +45,6 @@ DEFAULT_CONFIG_DICT = {
"logformat": DEFAULT_LOG_FORMAT, "logformat": DEFAULT_LOG_FORMAT,
"dateformat": DEFAULT_DATE_FORMAT, "dateformat": DEFAULT_DATE_FORMAT,
"trace": False, "trace": False,
"plugin_dir": "~/.config/aprsd/plugins",
"enabled_plugins": plugin.CORE_MESSAGE_PLUGINS, "enabled_plugins": plugin.CORE_MESSAGE_PLUGINS,
"units": "imperial", "units": "imperial",
"watch_list": { "watch_list": {
@ -54,6 +53,9 @@ DEFAULT_CONFIG_DICT = {
"alert_callsign": "NOCALL", "alert_callsign": "NOCALL",
# 43200 is 12 hours # 43200 is 12 hours
"alert_time_seconds": 43200, "alert_time_seconds": 43200,
# How many packets to save in a ring Buffer
# for a particular callsign
"packet_keep_count": 10,
"callsigns": [], "callsigns": [],
"enabled_plugins": plugin.CORE_NOTIFY_PLUGINS, "enabled_plugins": plugin.CORE_NOTIFY_PLUGINS,
}, },
@ -435,3 +437,39 @@ def parse_delta_str(s):
else: else:
m = re.match(r"(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)", s) m = re.match(r"(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)", s)
return {key: float(val) for key, val in m.groupdict().items()} return {key: float(val) for key, val in m.groupdict().items()}
class RingBuffer:
"""class that implements a not-yet-full buffer"""
def __init__(self, size_max):
self.max = size_max
self.data = []
class __Full:
"""class that implements a full buffer"""
def append(self, x):
"""Append an element overwriting the oldest one."""
self.data[self.cur] = x
self.cur = (self.cur + 1) % self.max
def get(self):
"""return list of elements in correct order"""
return self.data[self.cur :] + self.data[: self.cur]
def append(self, x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full
def get(self):
"""Return a list of elements from the oldest to the newest."""
return self.data
def __len__(self):
return len(self.data)

View File

@ -191,7 +191,7 @@ function update_stats( data ) {
var html_str = '<table class="ui celled striped table"><thead><tr><th>HAM Callsign</th><th>Age since last seen by APRSD</th></tr></thead><tbody>' var html_str = '<table class="ui celled striped table"><thead><tr><th>HAM Callsign</th><th>Age since last seen by APRSD</th></tr></thead><tbody>'
watchdiv.html('') watchdiv.html('')
jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) { jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) {
html_str += '<tr><td class="collapsing"><i class="phone volume icon"></i>' + i + '</td><td>' + val + '</td></tr>' html_str += '<tr><td class="collapsing"><i class="phone volume icon"></i>' + i + '</td><td>' + val["last"] + '</td></tr>'
}); });
html_str += "</tbody></table>"; html_str += "</tbody></table>";
watchdiv.append(html_str); watchdiv.append(html_str);
@ -205,10 +205,13 @@ function update_packets( data ) {
packetsdiv.html('') packetsdiv.html('')
} }
jQuery.each(data, function(i, val) { jQuery.each(data, function(i, val) {
if ( packet_list.hasOwnProperty(i) == false ) { if ( packet_list.hasOwnProperty(val["ts"]) == false ) {
packet_list[i] = val; // Store the packet
var d = new Date(i*1000).toLocaleDateString("en-US") packet_list[val["ts"]] = val;
var t = new Date(i*1000).toLocaleTimeString("en-US") ts_str = val["ts"].toString();
ts = ts_str.split(".")[0]*1000;
var d = new Date(ts).toLocaleDateString("en-US")
var t = new Date(ts).toLocaleTimeString("en-US")
if (val.hasOwnProperty('from') == false) { if (val.hasOwnProperty('from') == false) {
from = val['fromcall'] from = val['fromcall']
title_id = 'title_tx' title_id = 'title_tx'