From c1c89fd2c2c69c5e6c5d29a736a7b89e3d45cfe2 Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 28 Feb 2025 17:16:53 -0500 Subject: [PATCH] Added threads.service This is just a handy wrapper to inject all the threads a service wants to start and then start them all at once, and then join them all. --- aprsd/cmds/server.py | 91 +++++++++++++--------------------------- aprsd/threads/service.py | 42 +++++++++++++++++++ 2 files changed, 70 insertions(+), 63 deletions(-) create mode 100644 aprsd/threads/service.py diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index f4ad975..058fe27 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -12,59 +12,24 @@ from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import seen_list -from aprsd.threads import aprsd as aprsd_threads -from aprsd.threads import keepalive, registry, rx, tx +from aprsd.threads import keepalive, registry, rx, service, tx from aprsd.threads import stats as stats_thread -from aprsd.utils import singleton CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -@singleton -class ServerThreads: - """Registry for threads that the server command runs. - - This enables extensions to register a thread to run during - the server command. - - """ - - def __init__(self): - self.threads: list[aprsd_threads.APRSDThread] = [] - - def register(self, thread: aprsd_threads.APRSDThread): - if not isinstance(thread, aprsd_threads.APRSDThread): - raise TypeError(f"Thread {thread} is not an APRSDThread") - self.threads.append(thread) - - def unregister(self, thread: aprsd_threads.APRSDThread): - if not isinstance(thread, aprsd_threads.APRSDThread): - raise TypeError(f"Thread {thread} is not an APRSDThread") - self.threads.remove(thread) - - def start(self): - """Start all threads in the list.""" - for thread in self.threads: - thread.start() - - def join(self): - """Join all the threads in the list""" - for thread in self.threads: - thread.join() +LOG = logging.getLogger('APRSD') # main() ### @cli.command() @cli_helper.add_options(cli_helper.common_options) @click.option( - "-f", - "--flush", - "flush", + '-f', + '--flush', + 'flush', is_flag=True, show_default=True, default=False, - help="Flush out all old aged messages on disk.", + help='Flush out all old aged messages on disk.', ) @click.pass_context @cli_helper.process_standard_options @@ -73,37 +38,37 @@ def server(ctx, flush): signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler) - server_threads = ServerThreads() + service_threads = service.ServiceThreads() level, msg = utils._check_version() if level: LOG.warning(msg) else: LOG.info(msg) - LOG.info(f"APRSD Started version: {aprsd.__version__}") + LOG.info(f'APRSD Started version: {aprsd.__version__}') # Initialize the client factory and create # The correct client object ready for use if not client_factory.is_client_enabled(): - LOG.error("No Clients are enabled in config.") + LOG.error('No Clients are enabled in config.') sys.exit(-1) # Make sure we have 1 client transport enabled if not client_factory.is_client_enabled(): - LOG.error("No Clients are enabled in config.") + LOG.error('No Clients are enabled in config.') sys.exit(-1) if not client_factory.is_client_configured(): - LOG.error("APRS client is not properly configured in config file.") + LOG.error('APRS client is not properly configured in config file.') sys.exit(-1) # Creates the client object - LOG.info("Creating client connection") + LOG.info('Creating client connection') aprs_client = client_factory.create() LOG.info(aprs_client) if not aprs_client.login_success: # We failed to login, will just quit! - msg = f"Login Failure: {aprs_client.login_failure}" + msg = f'Login Failure: {aprs_client.login_failure}' LOG.error(msg) print(msg) sys.exit(-1) @@ -114,7 +79,7 @@ def server(ctx, flush): # We register plugins first here so we can register each # plugins config options, so we can dump them all in the # log file output. - LOG.info("Loading Plugin Manager and registering plugins") + LOG.info('Loading Plugin Manager and registering plugins') plugin_manager = plugin.PluginManager() plugin_manager.setup_plugins(load_help_plugin=CONF.load_help_plugin) @@ -122,10 +87,10 @@ def server(ctx, flush): CONF.log_opt_values(LOG, logging.DEBUG) message_plugins = plugin_manager.get_message_plugins() watchlist_plugins = plugin_manager.get_watchlist_plugins() - LOG.info("Message Plugins enabled and running:") + LOG.info('Message Plugins enabled and running:') for p in message_plugins: LOG.info(p) - LOG.info("Watchlist Plugins enabled and running:") + LOG.info('Watchlist Plugins enabled and running:') for p in watchlist_plugins: LOG.info(p) @@ -135,37 +100,37 @@ def server(ctx, flush): # Now load the msgTrack from disk if any if flush: - LOG.debug("Flushing All packet tracking objects.") + LOG.debug('Flushing All packet tracking objects.') packet_collector.PacketCollector().flush() else: # Try and load saved MsgTrack list - LOG.debug("Loading saved packet tracking data.") + LOG.debug('Loading saved packet tracking data.') packet_collector.PacketCollector().load() # Now start all the main processing threads. - server_threads.register(keepalive.KeepAliveThread()) - server_threads.register(stats_thread.APRSDStatsStoreThread()) - server_threads.register( + service_threads.register(keepalive.KeepAliveThread()) + service_threads.register(stats_thread.APRSDStatsStoreThread()) + service_threads.register( rx.APRSDRXThread( packet_queue=threads.packet_queue, ), ) - server_threads.register( + service_threads.register( rx.APRSDPluginProcessPacketThread( packet_queue=threads.packet_queue, ), ) if CONF.enable_beacon: - LOG.info("Beacon Enabled. Starting Beacon thread.") - server_threads.register(tx.BeaconSendThread()) + LOG.info('Beacon Enabled. Starting Beacon thread.') + service_threads.register(tx.BeaconSendThread()) if CONF.aprs_registry.enabled: - LOG.info("Registry Enabled. Starting Registry thread.") - server_threads.register(registry.APRSRegistryThread()) + LOG.info('Registry Enabled. Starting Registry thread.') + service_threads.register(registry.APRSRegistryThread()) - server_threads.start() - server_threads.join() + service_threads.start() + service_threads.join() return 0 diff --git a/aprsd/threads/service.py b/aprsd/threads/service.py new file mode 100644 index 0000000..6971c51 --- /dev/null +++ b/aprsd/threads/service.py @@ -0,0 +1,42 @@ +# aprsd/aprsd/threads/service.py +# +# This module is used to register threads that the service command runs. +# +# The service command is used to start and stop the APRS service. +# This is a mechanism to register threads that the service or command +# needs to run, and then start stop them as needed. + +from aprsd.threads import aprsd as aprsd_threads +from aprsd.utils import singleton + + +@singleton +class ServiceThreads: + """Registry for threads that the service command runs. + + This enables extensions to register a thread to run during + the service command. + """ + + def __init__(self): + self.threads: list[aprsd_threads.APRSDThread] = [] + + def register(self, thread: aprsd_threads.APRSDThread): + if not isinstance(thread, aprsd_threads.APRSDThread): + raise TypeError(f'Thread {thread} is not an APRSDThread') + self.threads.append(thread) + + def unregister(self, thread: aprsd_threads.APRSDThread): + if not isinstance(thread, aprsd_threads.APRSDThread): + raise TypeError(f'Thread {thread} is not an APRSDThread') + self.threads.remove(thread) + + def start(self): + """Start all threads in the list.""" + for thread in self.threads: + thread.start() + + def join(self): + """Join all the threads in the list""" + for thread in self.threads: + thread.join()