From 6dcacb5904af35f81027d77154cb81ecd2a82402 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Sun, 18 Jan 2026 20:39:42 -0500 Subject: [PATCH] Parallel processing of plugins This patch changes how plugins are processed. This patch processes each plugin in a separate thread using a threadpool. This allows us to process packets through plugins in parallel. Previous to this patch we processed plugins serially, which could mean that it takes longer to get a single packet through all of the plugins, as each plugin could be blocking for a while. --- aprsd/plugin.py | 100 ++++++++++++++++++++++++++++++++++++++------ aprsd/threads/rx.py | 18 +++++--- 2 files changed, 100 insertions(+), 18 deletions(-) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 897cffd..6c9defe 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -7,6 +7,7 @@ import logging import re import textwrap import threading +from concurrent.futures import ThreadPoolExecutor, as_completed import pluggy from oslo_config import cfg @@ -49,6 +50,7 @@ class APRSDPluginSpec: class APRSDPluginBase(metaclass=abc.ABCMeta): """The base class for all APRSD Plugins.""" + _counter_lock = threading.Lock() config = None rx_count = 0 tx_count = 0 @@ -106,10 +108,12 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): return [] def rx_inc(self): - self.rx_count += 1 + with self._counter_lock: + self.rx_count += 1 def tx_inc(self): - self.tx_count += 1 + with self._counter_lock: + self.tx_count += 1 def stop_threads(self): """Stop any threads this plugin might have created.""" @@ -513,18 +517,90 @@ class PluginManager: LOG.info('Completed Plugin Loading.') def run(self, packet: packets.MessagePacket): - """Execute all the plugins run method.""" - # No lock needed here - plugins are loaded at startup and not modified - # during runtime in listen command. Pluggy's hook execution is thread-safe - # for read operations. This prevents lock contention when plugins are slow - # (e.g., MQTT publish queue full scenarios). - return self._pluggy_pm.hook.filter(packet=packet) + """Execute all plugins in parallel. + + Plugins are executed concurrently using ThreadPoolExecutor to improve + performance, especially when plugins perform I/O operations (API calls, + subprocess calls, etc.). Each plugin's filter() method is called in + parallel, and results are collected as they complete. + + Returns: + tuple: (results, handled) where: + - results: list of non-NULL plugin results + - handled: bool indicating if any plugin processed the message + (even if it returned NULL_MESSAGE) + """ + plugins = list(self._pluggy_pm.get_plugins()) + if not plugins: + return ([], False) + + results = [] + handled = False + + # Execute all plugins in parallel + with ThreadPoolExecutor(max_workers=len(plugins)) as executor: + future_to_plugin = { + executor.submit(plugin.filter, packet=packet): plugin + for plugin in plugins + } + + for future in as_completed(future_to_plugin): + plugin = future_to_plugin[future] + try: + result = future.result() + # Track if any plugin processed the message (even if NULL_MESSAGE) + if result is not None: + handled = True + # Only include non-NULL results + if result and result is not packets.NULL_MESSAGE: + results.append(result) + except Exception as ex: + LOG.error( + 'Plugin {} failed to process packet: {}'.format( + plugin.__class__.__name__, + ex, + ), + ) + LOG.exception(ex) + + return (results, handled) def run_watchlist(self, packet: packets.Packet): - # No lock needed here - plugins are loaded at startup and not modified - # during runtime in listen command. Pluggy's hook execution is thread-safe - # for read operations. - return self._watchlist_pm.hook.filter(packet=packet) + """Execute all watchlist plugins in parallel. + + Watchlist plugins are executed concurrently using ThreadPoolExecutor + to improve performance when multiple watchlist plugins are registered. + """ + plugins = list(self._watchlist_pm.get_plugins()) + if not plugins: + return [] + + results = [] + + # Execute all plugins in parallel + with ThreadPoolExecutor(max_workers=len(plugins)) as executor: + future_to_plugin = { + executor.submit(plugin.filter, packet=packet): plugin + for plugin in plugins + } + + for future in as_completed(future_to_plugin): + plugin = future_to_plugin[future] + try: + result = future.result() + # Only include non-NULL results + if result and result is not packets.NULL_MESSAGE: + results.append(result) + except Exception as ex: + LOG.error( + 'Watchlist plugin {} failed to process packet: {}'.format( + plugin.__class__.__name__, + ex, + ), + ) + LOG.exception(ex) + + return results def stop(self): """Stop all threads created by all plugins.""" diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 49967b4..0878502 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -317,12 +317,16 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): pm = plugin.PluginManager() try: - results = pm.run(packet) - replied = False + results, handled = pm.run(packet) + # Check if any plugin replied (results may be unordered due to parallel execution) + replied = any( + result and result is not packets.NULL_MESSAGE for result in results + ) + LOG.debug(f'Replied: {replied}, Handled: {handled}') for reply in results: + LOG.debug(f'Reply: {reply}') if isinstance(reply, list): # one of the plugins wants to send multiple messages - replied = True for subreply in reply: LOG.debug(f"Sending '{subreply}'") if isinstance(subreply, packets.Packet): @@ -338,13 +342,13 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): elif isinstance(reply, packets.Packet): # We have a message based object. tx.send(reply) - replied = True else: - replied = True # A plugin can return a null message flag which signals # us that they processed the message correctly, but have # nothing to reply with, so we avoid replying with a # usage string + # Note: NULL_MESSAGE results are already filtered out + # in PluginManager.run(), so we can safely send this if reply is not packets.NULL_MESSAGE: LOG.debug(f"Sending '{reply}'") tx.send( @@ -357,7 +361,9 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): # If the message was for us and we didn't have a # response, then we send a usage statement. - if to_call == CONF.callsign and not replied: + # Only send "Unknown command!" if no plugin handled the message. + # If a plugin returned NULL_MESSAGE, it handled it and we shouldn't reply. + if to_call == CONF.callsign and not replied and not handled: # Tailor the messages accordingly if CONF.load_help_plugin: LOG.warning('Sending help!')