1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-02-15 12:33:41 -05:00
aprsd/aprsd/plugin.py
Walter Boring e5644cc49d Remove plugin.run() locking
The plugin doesn't really need locks when processing the packets
as there is really only 1 thread that sends packets through
the plugins processing.
2026-01-09 12:31:46 -05:00

565 lines
18 KiB
Python

from __future__ import annotations
import abc
import importlib
import inspect
import logging
import re
import textwrap
import threading
import pluggy
from oslo_config import cfg
import aprsd
from aprsd import packets, threads
from aprsd.client.client import APRSDClient
from aprsd.packets import watch_list
# setup the global logger
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
CORE_MESSAGE_PLUGINS = [
'aprsd.plugins.email.EmailPlugin',
'aprsd.plugins.fortune.FortunePlugin',
'aprsd.plugins.location.LocationPlugin',
'aprsd.plugins.ping.PingPlugin',
'aprsd.plugins.time.TimePlugin',
'aprsd.plugins.weather.USWeatherPlugin',
'aprsd.plugins.version.VersionPlugin',
]
CORE_NOTIFY_PLUGINS = [
'aprsd.plugins.notify.NotifySeenPlugin',
]
hookspec = pluggy.HookspecMarker('aprsd')
hookimpl = pluggy.HookimplMarker('aprsd')
class APRSDPluginSpec:
"""A hook specification namespace."""
@hookspec
def filter(self, packet: type[packets.Packet]):
"""My special little hook that you can customize."""
class APRSDPluginBase(metaclass=abc.ABCMeta):
"""The base class for all APRSD Plugins."""
config = None
rx_count = 0
tx_count = 0
version = aprsd.__version__
# Holds the list of APRSDThreads that the plugin creates
threads = []
# Set this in setup()
enabled = False
def __init__(self):
self.message_counter = 0
self.setup()
self.threads = self.create_threads() or []
self.start_threads()
def start_threads(self) -> None:
if self.enabled and self.threads:
if not isinstance(self.threads, list):
self.threads = [self.threads]
try:
for thread in self.threads:
if isinstance(thread, threads.APRSDThread):
thread.start()
else:
LOG.error(
"Can't start thread {}:{}, Must be a child "
'of aprsd.threads.APRSDThread'.format(
self,
thread,
),
)
except Exception:
LOG.error(
'Failed to start threads for plugin {}'.format(
self,
),
)
@property
def message_count(self) -> int:
return self.message_counter
def help(self) -> str:
return 'Help!'
@abc.abstractmethod
def setup(self):
"""Do any plugin setup here."""
self.enabled = True
def create_threads(self):
"""Gives the plugin writer the ability start a background thread."""
return []
def rx_inc(self):
self.rx_count += 1
def tx_inc(self):
self.tx_count += 1
def stop_threads(self):
"""Stop any threads this plugin might have created."""
for thread in self.threads:
if isinstance(thread, threads.APRSDThread):
thread.stop()
@abc.abstractmethod
def filter(self, packet: type[packets.Packet]) -> str | packets.MessagePacket:
pass
@abc.abstractmethod
def process(self, packet: type[packets.Packet]):
"""This is called when the filter passes."""
class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
"""Base plugin class for all notification APRSD plugins.
All these plugins will get every packet seen by APRSD's
registered list of HAM callsigns in the config file's
watch_list.
When you want to 'notify' something when a packet is seen
by a particular HAM callsign, write a plugin based off of
this class.
"""
def setup(self):
# if we have a watch list enabled, we need to add filtering
# to enable seeing packets from the watch list.
if CONF.watch_list.enabled:
# watch list is enabled
self.enabled = True
watch_list = CONF.watch_list.callsigns
# make sure the timeout is set or this doesn't work
if watch_list:
aprs_client = APRSDClient()
filter_str = 'b/{}'.format('/'.join(watch_list))
aprs_client.set_filter(filter_str)
else:
LOG.warning('Watch list enabled, but no callsigns set.')
@hookimpl
def filter(self, packet: type[packets.Packet]) -> str | packets.MessagePacket:
result = packets.NULL_MESSAGE
if self.enabled:
wl = watch_list.WatchList()
if wl.callsign_in_watchlist(packet.from_call):
# packet is from a callsign in the watch list
self.rx_inc()
try:
result = self.process(packet)
except Exception as ex:
LOG.error(
'Plugin {} failed to process packet {}'.format(
self.__class__,
ex,
),
)
if result:
self.tx_inc()
else:
LOG.warning(f'{self.__class__} plugin is not enabled')
return result
class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
"""Base Message plugin class.
When you want to search for a particular command in an
APRSD message and send a direct reply, write a plugin
based off of this class.
"""
@property
def command_name(self):
"""The usage string help."""
raise NotImplementedError
@property
def command_regex(self):
"""The regex to match from the caller"""
raise NotImplementedError
def help(self):
return '{}: {}'.format(
self.command_name.lower(),
self.command_regex,
)
def setup(self):
"""Do any plugin setup here."""
self.enabled = True
@hookimpl
def filter(self, packet: packets.MessagePacket) -> str | packets.MessagePacket:
LOG.debug(f'{self.__class__.__name__} called')
if not self.enabled:
result = f"{self.__class__.__name__} isn't enabled"
LOG.warning(result)
return result
if not isinstance(packet, packets.MessagePacket):
LOG.warning(
f'{self.__class__.__name__} Got a {packet.__class__.__name__} ignoring'
)
return packets.NULL_MESSAGE
result = None
message = packet.message_text
tocall = packet.to_call
# Only process messages destined for us
# and is an APRS message format and has a message.
if (
tocall == CONF.callsign
and isinstance(packet, packets.MessagePacket)
and message
):
if re.search(self.command_regex, message, re.IGNORECASE):
self.rx_inc()
try:
result = self.process(packet)
except Exception as ex:
LOG.error(
'Plugin {} failed to process packet {}'.format(
self.__class__,
ex,
),
)
LOG.exception(ex)
if result:
self.tx_inc()
return result
class APRSFIKEYMixin:
"""Mixin class to enable checking the existence of the aprs.fi apiKey."""
def ensure_aprs_fi_key(self):
if not CONF.aprs_fi.apiKey:
LOG.error('Config aprs_fi.apiKey is not set')
self.enabled = False
else:
self.enabled = True
class HelpPlugin(APRSDRegexCommandPluginBase):
"""Help Plugin that is always enabled.
This plugin is in this file to prevent a circular import.
"""
command_regex = '^[hH]'
command_name = 'help'
def help(self):
return 'Help: send APRS help or help <plugin>'
def process(self, packet: packets.MessagePacket):
LOG.info('HelpPlugin')
# fromcall = packet.get("from")
message = packet.message_text
# ack = packet.get("msgNo", "0")
a = re.search(r'^.*\s+(.*)', message)
command_name = None
if a is not None:
command_name = a.group(1).lower()
pm = PluginManager()
if command_name and '?' not in command_name:
# user wants help for a specific plugin
reply = None
for p in pm.get_plugins():
if (
p.enabled
and isinstance(p, APRSDRegexCommandPluginBase)
and p.command_name.lower() == command_name
):
reply = p.help()
if reply:
return reply
list = []
for p in pm.get_plugins():
LOG.debug(p)
if p.enabled and isinstance(p, APRSDRegexCommandPluginBase):
name = p.command_name.lower()
if name not in list and 'help' not in name:
list.append(name)
list.sort()
reply = ' '.join(list)
lines = textwrap.wrap(reply, 60)
replies = ["Send APRS MSG of 'help' or 'help <plugin>'"]
for line in lines:
replies.append(f'plugins: {line}')
for entry in replies:
LOG.debug(f'{len(entry)} {entry}')
LOG.debug(f'{replies}')
return replies
class PluginManager:
# The singleton instance object for this class
_instance = None
# the pluggy PluginManager for all Message plugins
_pluggy_pm = None
# the pluggy PluginManager for all WatchList plugins
_watchlist_pm = None
lock = None
def __new__(cls, *args, **kwargs):
"""This magic turns this into a singleton."""
if cls._instance is None:
cls._instance = super().__new__(cls)
# Put any initialization here.
cls._instance.lock = threading.Lock()
cls._instance._init()
return cls._instance
def _init(self):
self._pluggy_pm = pluggy.PluginManager('aprsd')
self._pluggy_pm.add_hookspecs(APRSDPluginSpec)
# For the watchlist plugins
self._watchlist_pm = pluggy.PluginManager('aprsd')
self._watchlist_pm.add_hookspecs(APRSDPluginSpec)
def stats(self, serializable=False) -> dict:
"""Collect and return stats for all plugins."""
def full_name_with_qualname(obj):
return '{}.{}'.format(
obj.__class__.__module__,
obj.__class__.__qualname__,
)
plugin_stats = {}
plugins = self.get_plugins()
if plugins:
for p in plugins:
plugin_stats[full_name_with_qualname(p)] = {
'enabled': p.enabled,
'rx': p.rx_count,
'tx': p.tx_count,
'version': p.version,
}
return plugin_stats
def is_plugin(self, obj):
for c in inspect.getmro(obj):
if issubclass(c, APRSDPluginBase):
return True
return False
def _create_class(
self,
module_class_string,
super_cls: type = None,
**kwargs,
):
"""
Method to create a class from a fqn python string.
:param module_class_string: full name of the class to create an object
:param super_cls: expected super class for validity, None if bypass
:param kwargs: parameters to pass
:return:
"""
module_name = None
class_name = None
try:
module_name, class_name = module_class_string.rsplit('.', 1)
module = importlib.import_module(module_name)
# Commented out because the email thread starts in a different context
# and hence gives a different singleton for the EmailStats
# module = importlib.reload(module)
except Exception as ex:
if not module_name:
LOG.error(f'Failed to load Plugin {module_class_string}')
else:
LOG.error(f"Failed to load Plugin '{module_name}' : '{ex}'")
return
assert hasattr(module, class_name), 'class {} is not in {}'.format(
class_name,
module_name,
)
# click.echo('reading class {} from module {}'.format(
# class_name, module_name))
cls = getattr(module, class_name)
if super_cls is not None:
assert issubclass(cls, super_cls), 'class {} should inherit from {}'.format(
class_name,
super_cls.__name__,
)
# click.echo('initialising {} with params {}'.format(class_name, kwargs))
obj = cls(**kwargs)
return obj
def _load_plugin(self, plugin_name):
"""
Given a python fully qualified class path.name,
Try importing the path, then creating the object,
then registering it as a aprsd Command Plugin
"""
plugin_obj = None
try:
plugin_obj = self._create_class(
plugin_name,
APRSDPluginBase,
)
if plugin_obj:
if isinstance(plugin_obj, APRSDWatchListPluginBase):
if plugin_obj.enabled:
LOG.info(
"Registering WatchList plugin '{}'({})".format(
plugin_name,
plugin_obj.version,
),
)
self._watchlist_pm.register(plugin_obj)
else:
LOG.warning(
f'Plugin {plugin_obj.__class__.__name__} is disabled'
)
elif isinstance(plugin_obj, APRSDRegexCommandPluginBase):
if plugin_obj.enabled:
LOG.info(
"Registering Regex plugin '{}'({}) -- {}".format(
plugin_name,
plugin_obj.version,
plugin_obj.command_regex,
),
)
self._pluggy_pm.register(plugin_obj)
else:
LOG.warning(
f'Plugin {plugin_obj.__class__.__name__} is disabled'
)
elif isinstance(plugin_obj, APRSDPluginBase):
if plugin_obj.enabled:
LOG.info(
"Registering Base plugin '{}'({})".format(
plugin_name,
plugin_obj.version,
),
)
self._pluggy_pm.register(plugin_obj)
else:
LOG.warning(
f'Plugin {plugin_obj.__class__.__name__} is disabled'
)
except Exception as ex:
LOG.error(f"Couldn't load plugin '{plugin_name}'")
LOG.exception(ex)
def reload_plugins(self):
with self.lock:
del self._pluggy_pm
self.setup_plugins(load_help_plugin=CONF.load_help_plugin)
def setup_plugins(
self,
load_help_plugin=True,
plugin_list=None,
):
"""Create the plugin manager and register plugins."""
LOG.info('Loading APRSD Plugins')
# Help plugin is always enabled.
if load_help_plugin:
_help = HelpPlugin()
self._pluggy_pm.register(_help)
# if plugins_list is passed in, only load
# those plugins.
if plugin_list:
for plugin_name in plugin_list:
self._load_plugin(plugin_name)
elif CONF.enabled_plugins:
for p_name in CONF.enabled_plugins:
self._load_plugin(p_name)
else:
# Enabled plugins isn't set, so we default to loading all of
# the core plugins.
for p_name in CORE_MESSAGE_PLUGINS:
self._load_plugin(p_name)
LOG.info('Completed Plugin Loading.')
def run(self, packet: packets.MessagePacket):
"""Execute all the plugins run method."""
# No lock needed here - plugins are loaded at startup and not modified
# during runtime in listen command. Pluggy's hook execution is thread-safe
# for read operations. This prevents lock contention when plugins are slow
# (e.g., MQTT publish queue full scenarios).
return self._pluggy_pm.hook.filter(packet=packet)
def run_watchlist(self, packet: packets.Packet):
# No lock needed here - plugins are loaded at startup and not modified
# during runtime in listen command. Pluggy's hook execution is thread-safe
# for read operations.
return self._watchlist_pm.hook.filter(packet=packet)
def stop(self):
"""Stop all threads created by all plugins."""
with self.lock:
for p in self.get_plugins():
if hasattr(p, 'stop_threads'):
p.stop_threads()
def register_msg(self, obj):
"""Register the plugin."""
with self.lock:
self._pluggy_pm.register(obj)
def get_plugins(self):
plugin_list = []
if self._pluggy_pm:
for plug in self._pluggy_pm.get_plugins():
plugin_list.append(plug)
if self._watchlist_pm:
for plug in self._watchlist_pm.get_plugins():
plugin_list.append(plug)
return plugin_list
def get_watchlist_plugins(self):
pl = []
if self._watchlist_pm:
for plug in self._watchlist_pm.get_plugins():
pl.append(plug)
return pl
def get_message_plugins(self):
pl = []
if self._pluggy_pm:
for plug in self._pluggy_pm.get_plugins():
pl.append(plug)
return pl