1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-01-21 04:55:38 -05:00

Parallel processing of plugins

This patch changes how plugins are processed.  This patch processes
each plugin in a separate thread using a threadpool.   This allows
us to process packets through plugins in parallel.  Previous to this
patch we processed plugins serially, which could mean that it takes
longer to get a single packet through all of the plugins, as each
plugin could be blocking for a while.
This commit is contained in:
Walter Boring 2026-01-18 20:39:42 -05:00
parent 2a8b7002f2
commit 6dcacb5904
2 changed files with 100 additions and 18 deletions

View File

@ -7,6 +7,7 @@ import logging
import re
import textwrap
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import pluggy
from oslo_config import cfg
@ -49,6 +50,7 @@ class APRSDPluginSpec:
class APRSDPluginBase(metaclass=abc.ABCMeta):
"""The base class for all APRSD Plugins."""
_counter_lock = threading.Lock()
config = None
rx_count = 0
tx_count = 0
@ -106,10 +108,12 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
return []
def rx_inc(self):
self.rx_count += 1
with self._counter_lock:
self.rx_count += 1
def tx_inc(self):
self.tx_count += 1
with self._counter_lock:
self.tx_count += 1
def stop_threads(self):
"""Stop any threads this plugin might have created."""
@ -513,18 +517,90 @@ class PluginManager:
LOG.info('Completed Plugin Loading.')
def run(self, packet: packets.MessagePacket):
"""Execute all the plugins run method."""
# No lock needed here - plugins are loaded at startup and not modified
# during runtime in listen command. Pluggy's hook execution is thread-safe
# for read operations. This prevents lock contention when plugins are slow
# (e.g., MQTT publish queue full scenarios).
return self._pluggy_pm.hook.filter(packet=packet)
"""Execute all plugins in parallel.
Plugins are executed concurrently using ThreadPoolExecutor to improve
performance, especially when plugins perform I/O operations (API calls,
subprocess calls, etc.). Each plugin's filter() method is called in
parallel, and results are collected as they complete.
Returns:
tuple: (results, handled) where:
- results: list of non-NULL plugin results
- handled: bool indicating if any plugin processed the message
(even if it returned NULL_MESSAGE)
"""
plugins = list(self._pluggy_pm.get_plugins())
if not plugins:
return ([], False)
results = []
handled = False
# Execute all plugins in parallel
with ThreadPoolExecutor(max_workers=len(plugins)) as executor:
future_to_plugin = {
executor.submit(plugin.filter, packet=packet): plugin
for plugin in plugins
}
for future in as_completed(future_to_plugin):
plugin = future_to_plugin[future]
try:
result = future.result()
# Track if any plugin processed the message (even if NULL_MESSAGE)
if result is not None:
handled = True
# Only include non-NULL results
if result and result is not packets.NULL_MESSAGE:
results.append(result)
except Exception as ex:
LOG.error(
'Plugin {} failed to process packet: {}'.format(
plugin.__class__.__name__,
ex,
),
)
LOG.exception(ex)
return (results, handled)
def run_watchlist(self, packet: packets.Packet):
# No lock needed here - plugins are loaded at startup and not modified
# during runtime in listen command. Pluggy's hook execution is thread-safe
# for read operations.
return self._watchlist_pm.hook.filter(packet=packet)
"""Execute all watchlist plugins in parallel.
Watchlist plugins are executed concurrently using ThreadPoolExecutor
to improve performance when multiple watchlist plugins are registered.
"""
plugins = list(self._watchlist_pm.get_plugins())
if not plugins:
return []
results = []
# Execute all plugins in parallel
with ThreadPoolExecutor(max_workers=len(plugins)) as executor:
future_to_plugin = {
executor.submit(plugin.filter, packet=packet): plugin
for plugin in plugins
}
for future in as_completed(future_to_plugin):
plugin = future_to_plugin[future]
try:
result = future.result()
# Only include non-NULL results
if result and result is not packets.NULL_MESSAGE:
results.append(result)
except Exception as ex:
LOG.error(
'Watchlist plugin {} failed to process packet: {}'.format(
plugin.__class__.__name__,
ex,
),
)
LOG.exception(ex)
return results
def stop(self):
"""Stop all threads created by all plugins."""

View File

@ -317,12 +317,16 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
pm = plugin.PluginManager()
try:
results = pm.run(packet)
replied = False
results, handled = pm.run(packet)
# Check if any plugin replied (results may be unordered due to parallel execution)
replied = any(
result and result is not packets.NULL_MESSAGE for result in results
)
LOG.debug(f'Replied: {replied}, Handled: {handled}')
for reply in results:
LOG.debug(f'Reply: {reply}')
if isinstance(reply, list):
# one of the plugins wants to send multiple messages
replied = True
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, packets.Packet):
@ -338,13 +342,13 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
elif isinstance(reply, packets.Packet):
# We have a message based object.
tx.send(reply)
replied = True
else:
replied = True
# A plugin can return a null message flag which signals
# us that they processed the message correctly, but have
# nothing to reply with, so we avoid replying with a
# usage string
# Note: NULL_MESSAGE results are already filtered out
# in PluginManager.run(), so we can safely send this
if reply is not packets.NULL_MESSAGE:
LOG.debug(f"Sending '{reply}'")
tx.send(
@ -357,7 +361,9 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
# If the message was for us and we didn't have a
# response, then we send a usage statement.
if to_call == CONF.callsign and not replied:
# Only send "Unknown command!" if no plugin handled the message.
# If a plugin returned NULL_MESSAGE, it handled it and we shouldn't reply.
if to_call == CONF.callsign and not replied and not handled:
# Tailor the messages accordingly
if CONF.load_help_plugin:
LOG.warning('Sending help!')