1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-02-13 03:23:44 -05:00

reverse the threaded plugin processing.

Considering how little processing each plugin has, it's
a bit overkill for now to have a threaded processing of plugins.

Also had issues where the help plugin was responding when it shouldn't.
This commit is contained in:
Walter Boring 2026-02-06 17:31:05 -05:00
parent 6968f16cec
commit 3128f24ef7

View File

@ -7,7 +7,6 @@ import logging
import re
import textwrap
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import pluggy
from oslo_config import cfg
@ -515,12 +514,10 @@ class PluginManager:
LOG.info('Completed Plugin Loading.')
def run(self, packet: packets.MessagePacket):
"""Execute all plugins in parallel.
"""Execute all plugins sequentially.
Plugins are executed concurrently using ThreadPoolExecutor to improve
performance, especially when plugins perform I/O operations (API calls,
subprocess calls, etc.). Each plugin's filter() method is called in
parallel, and results are collected as they complete.
Each plugin's filter() method is called in order. Results are
collected and returned.
Returns:
tuple: (results, handled) where:
@ -535,68 +532,48 @@ class PluginManager:
results = []
handled = False
# Execute all plugins in parallel
with ThreadPoolExecutor(max_workers=len(plugins)) as executor:
future_to_plugin = {
executor.submit(plugin.filter, packet=packet): plugin
for plugin in plugins
}
for future in as_completed(future_to_plugin):
plugin = future_to_plugin[future]
try:
result = future.result()
# Track if any plugin processed the message (even if NULL_MESSAGE)
if result is not None:
handled = True
# Only include non-NULL results
if result and result is not packets.NULL_MESSAGE:
results.append(result)
except Exception as ex:
LOG.error(
'Plugin {} failed to process packet: {}'.format(
plugin.__class__.__name__,
ex,
),
)
LOG.exception(ex)
for plugin in plugins:
try:
result = plugin.filter(packet=packet)
# Track if any plugin processed the message (even if NULL_MESSAGE)
if result is not None:
handled = True
# Only include non-NULL results
if result and result is not packets.NULL_MESSAGE:
results.append(result)
except Exception as ex:
LOG.error(
'Plugin {} failed to process packet: {}'.format(
plugin.__class__.__name__,
ex,
),
)
LOG.exception(ex)
return (results, handled)
def run_watchlist(self, packet: packets.Packet):
"""Execute all watchlist plugins in parallel.
Watchlist plugins are executed concurrently using ThreadPoolExecutor
to improve performance when multiple watchlist plugins are registered.
"""
"""Execute all watchlist plugins sequentially."""
plugins = list(self._watchlist_pm.get_plugins())
if not plugins:
return []
results = []
# Execute all plugins in parallel
with ThreadPoolExecutor(max_workers=len(plugins)) as executor:
future_to_plugin = {
executor.submit(plugin.filter, packet=packet): plugin
for plugin in plugins
}
for future in as_completed(future_to_plugin):
plugin = future_to_plugin[future]
try:
result = future.result()
# Only include non-NULL results
if result and result is not packets.NULL_MESSAGE:
results.append(result)
except Exception as ex:
LOG.error(
'Watchlist plugin {} failed to process packet: {}'.format(
plugin.__class__.__name__,
ex,
),
)
LOG.exception(ex)
for plugin in plugins:
try:
result = plugin.filter(packet=packet)
# Only include non-NULL results
if result and result is not packets.NULL_MESSAGE:
results.append(result)
except Exception as ex:
LOG.error(
'Watchlist plugin {} failed to process packet: {}'.format(
plugin.__class__.__name__,
ex,
),
)
LOG.exception(ex)
return results