From 3128f24ef7c60866b1b696be58d947fec52a7390 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Fri, 6 Feb 2026 17:31:05 -0500 Subject: [PATCH] reverse the threaded plugin processing. Considering how little processing each plugin has, it's a bit overkill for now to have a threaded processing of plugins. Also had issues where the help plugin was responding when it shouldn't. --- aprsd/plugin.py | 93 +++++++++++++++++++------------------------------ 1 file changed, 35 insertions(+), 58 deletions(-) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 499bbe5..1e75e28 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -7,7 +7,6 @@ import logging import re import textwrap import threading -from concurrent.futures import ThreadPoolExecutor, as_completed import pluggy from oslo_config import cfg @@ -515,12 +514,10 @@ class PluginManager: LOG.info('Completed Plugin Loading.') def run(self, packet: packets.MessagePacket): - """Execute all plugins in parallel. + """Execute all plugins sequentially. - Plugins are executed concurrently using ThreadPoolExecutor to improve - performance, especially when plugins perform I/O operations (API calls, - subprocess calls, etc.). Each plugin's filter() method is called in - parallel, and results are collected as they complete. + Each plugin's filter() method is called in order. Results are + collected and returned. Returns: tuple: (results, handled) where: @@ -535,68 +532,48 @@ class PluginManager: results = [] handled = False - # Execute all plugins in parallel - with ThreadPoolExecutor(max_workers=len(plugins)) as executor: - future_to_plugin = { - executor.submit(plugin.filter, packet=packet): plugin - for plugin in plugins - } - - for future in as_completed(future_to_plugin): - plugin = future_to_plugin[future] - try: - result = future.result() - # Track if any plugin processed the message (even if NULL_MESSAGE) - if result is not None: - handled = True - # Only include non-NULL results - if result and result is not packets.NULL_MESSAGE: - results.append(result) - except Exception as ex: - LOG.error( - 'Plugin {} failed to process packet: {}'.format( - plugin.__class__.__name__, - ex, - ), - ) - LOG.exception(ex) + for plugin in plugins: + try: + result = plugin.filter(packet=packet) + # Track if any plugin processed the message (even if NULL_MESSAGE) + if result is not None: + handled = True + # Only include non-NULL results + if result and result is not packets.NULL_MESSAGE: + results.append(result) + except Exception as ex: + LOG.error( + 'Plugin {} failed to process packet: {}'.format( + plugin.__class__.__name__, + ex, + ), + ) + LOG.exception(ex) return (results, handled) def run_watchlist(self, packet: packets.Packet): - """Execute all watchlist plugins in parallel. - - Watchlist plugins are executed concurrently using ThreadPoolExecutor - to improve performance when multiple watchlist plugins are registered. - """ + """Execute all watchlist plugins sequentially.""" plugins = list(self._watchlist_pm.get_plugins()) if not plugins: return [] results = [] - # Execute all plugins in parallel - with ThreadPoolExecutor(max_workers=len(plugins)) as executor: - future_to_plugin = { - executor.submit(plugin.filter, packet=packet): plugin - for plugin in plugins - } - - for future in as_completed(future_to_plugin): - plugin = future_to_plugin[future] - try: - result = future.result() - # Only include non-NULL results - if result and result is not packets.NULL_MESSAGE: - results.append(result) - except Exception as ex: - LOG.error( - 'Watchlist plugin {} failed to process packet: {}'.format( - plugin.__class__.__name__, - ex, - ), - ) - LOG.exception(ex) + for plugin in plugins: + try: + result = plugin.filter(packet=packet) + # Only include non-NULL results + if result and result is not packets.NULL_MESSAGE: + results.append(result) + except Exception as ex: + LOG.error( + 'Watchlist plugin {} failed to process packet: {}'.format( + plugin.__class__.__name__, + ex, + ), + ) + LOG.exception(ex) return results