From c5ca4f11af7b78642fbd730baa8131a07e3c8c0d Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Tue, 10 Feb 2026 18:49:23 -0500 Subject: [PATCH] Added new APRSDPushStatsThread This allows an aprsd server instance to push it's to a remote location. --- aprsd/cmds/listen.py | 51 +------ aprsd/cmds/server.py | 4 + aprsd/conf/common.py | 26 ++++ aprsd/threads/stats.py | 54 +++++++ tests/threads/test_stats.py | 279 +++++++++++++++++++++++++++++++++++- 5 files changed, 364 insertions(+), 50 deletions(-) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index a43a5e0..aa1980b 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -12,7 +12,6 @@ import sys import time import click -import requests from loguru import logger from oslo_config import cfg from rich.console import Console @@ -29,8 +28,7 @@ from aprsd.packets.filters import dupe_filter, packet_type from aprsd.stats import collector from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread -from aprsd.threads.aprsd import APRSDThread -from aprsd.threads.stats import StatsLogThread +from aprsd.threads.stats import APRSDPushStatsThread, StatsLogThread # setup the global logger # log.basicConfig(level=log.DEBUG) # level=10 @@ -86,51 +84,6 @@ class APRSDListenProcessThread(rx.APRSDFilterThread): self.plugin_manager.run(packet) -class StatsExportThread(APRSDThread): - """Export stats to remote aprsd-exporter API.""" - - def __init__(self, exporter_url): - super().__init__('StatsExport') - self.exporter_url = exporter_url - self.period = 10 # Export stats every 60 seconds - - def loop(self): - if self.loop_count % self.period == 0: - try: - # Collect all stats - stats_json = collector.Collector().collect(serializable=True) - # Remove the PacketList section to reduce payload size - if 'PacketList' in stats_json: - del stats_json['PacketList']['packets'] - - now = datetime.datetime.now() - time_format = '%m-%d-%Y %H:%M:%S' - stats = { - 'time': now.strftime(time_format), - 'stats': stats_json, - } - - # Send stats to exporter API - url = f'{self.exporter_url}/stats' - headers = {'Content-Type': 'application/json'} - response = requests.post(url, json=stats, headers=headers, timeout=10) - - if response.status_code == 200: - LOGU.info(f'Successfully exported stats to {self.exporter_url}') - else: - LOGU.warning( - f'Failed to export stats to {self.exporter_url}: HTTP {response.status_code}' - ) - - except requests.exceptions.RequestException as e: - LOGU.error(f'Error exporting stats to {self.exporter_url}: {e}') - except Exception as e: - LOGU.error(f'Unexpected error in stats export: {e}') - - time.sleep(1) - return True - - @cli.command() @cli_helper.add_options(cli_helper.common_options) @click.option( @@ -356,7 +309,7 @@ def listen( stats_export = None if export_stats: LOG.debug('Start StatsExportThread') - stats_export = StatsExportThread(exporter_url) + stats_export = APRSDPushStatsThread(push_url=exporter_url) stats_export.start() keepalive_thread.start() diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index e6bf54b..6d79270 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -168,6 +168,10 @@ def server(ctx, flush, enable_packet_stats): LOG.info('Beacon Enabled. Starting static Beacon thread.') service_threads.register(tx.BeaconSendThread()) + if CONF.push_stats.enabled: + LOG.info('Push Stats Enabled. Starting Push Stats thread.') + service_threads.register(stats_thread.APRSDPushStatsThread()) + if CONF.aprs_registry.enabled: LOG.info('Registry Enabled. Starting Registry thread.') service_threads.register(registry.APRSRegistryThread()) diff --git a/aprsd/conf/common.py b/aprsd/conf/common.py index c50bc3d..f51eb22 100644 --- a/aprsd/conf/common.py +++ b/aprsd/conf/common.py @@ -16,6 +16,11 @@ registry_group = cfg.OptGroup( title='APRS Registry settings', ) +push_stats_group = cfg.OptGroup( + name='push_stats', + title='Push local stats to a remote API', +) + aprsd_opts = [ cfg.StrOpt( 'callsign', @@ -180,6 +185,24 @@ watch_list_opts = [ ), ] +push_stats_opts = [ + cfg.BoolOpt( + 'enabled', + default=False, + help='Enable pushing local stats to a remote API.', + ), + cfg.StrOpt( + 'push_url', + default=None, + help='The URL of the remote API to push the stats to. This should be the base URL of the API.' + 'APRSD Will make a POST request to this url endpoint.', + ), + cfg.IntOpt( + 'frequency_seconds', + default=15, + help='The frequency in seconds to push the stats to the remote API.', + ), +] enabled_plugins_opts = [ cfg.ListOpt( @@ -240,6 +263,8 @@ def register_opts(config): config.register_opts(watch_list_opts, group=watch_list_group) config.register_group(registry_group) config.register_opts(registry_opts, group=registry_group) + config.register_group(push_stats_group) + config.register_opts(push_stats_opts, group=push_stats_group) def list_opts(): @@ -247,4 +272,5 @@ def list_opts(): 'DEFAULT': (aprsd_opts + enabled_plugins_opts), watch_list_group.name: watch_list_opts, registry_group.name: registry_opts, + push_stats_group.name: push_stats_opts, } diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py index 290d3e3..e2d3f5e 100644 --- a/aprsd/threads/stats.py +++ b/aprsd/threads/stats.py @@ -1,7 +1,9 @@ +import datetime import logging import threading import time +import requests from loguru import logger from oslo_config import cfg @@ -46,6 +48,58 @@ class APRSDStatsStoreThread(APRSDThread): return True +class APRSDPushStatsThread(APRSDThread): + """Push the local stats to a remote API.""" + + def __init__( + self, push_url=None, frequency_seconds=None, send_packetlist: bool = False + ): + super().__init__('PushStats') + self.push_url = push_url if push_url else CONF.push_stats.push_url + self.period = ( + frequency_seconds + if frequency_seconds + else CONF.push_stats.frequency_seconds + ) + self.send_packetlist = send_packetlist + + def loop(self): + if self.loop_count % self.period == 0: + stats_json = collector.Collector().collect(serializable=True) + url = f'{self.push_url}/stats' + headers = {'Content-Type': 'application/json'} + # Remove the PacketList section to reduce payload size + if not self.send_packetlist: + if 'PacketList' in stats_json: + del stats_json['PacketList']['packets'] + + now = datetime.datetime.now() + time_format = '%m-%d-%Y %H:%M:%S' + stats = { + 'time': now.strftime(time_format), + 'stats': stats_json, + } + + try: + response = requests.post(url, json=stats, headers=headers, timeout=5) + response.raise_for_status() + + if response.status_code == 200: + LOGU.info(f'Successfully pushed stats to {self.push_url}') + else: + LOGU.warning( + f'Failed to push stats to {self.push_url}: HTTP {response.status_code}' + ) + + except requests.exceptions.RequestException as e: + LOGU.error(f'Error pushing stats to {self.push_url}: {e}') + except Exception as e: + LOGU.error(f'Unexpected error in stats push: {e}') + + time.sleep(1) + return True + + class StatsLogThread(APRSDThread): """Log the stats from the PacketList.""" diff --git a/tests/threads/test_stats.py b/tests/threads/test_stats.py index 7b3e097..0bffa8a 100644 --- a/tests/threads/test_stats.py +++ b/tests/threads/test_stats.py @@ -1,8 +1,14 @@ import unittest from unittest import mock +import requests + from aprsd.stats import collector -from aprsd.threads.stats import APRSDStatsStoreThread, StatsStore +from aprsd.threads.stats import ( + APRSDPushStatsThread, + APRSDStatsStoreThread, + StatsStore, +) class TestStatsStore(unittest.TestCase): @@ -145,5 +151,276 @@ class TestAPRSDStatsStoreThread(unittest.TestCase): # since the increment happens in the parent run() method, not in loop() +class TestAPRSDPushStatsThread(unittest.TestCase): + """Unit tests for the APRSDPushStatsThread class.""" + + def test_init_with_explicit_args(self): + """Test initialization with explicit push_url, frequency, and send_packetlist.""" + thread = APRSDPushStatsThread( + push_url='https://example.com/api', + frequency_seconds=30, + send_packetlist=True, + ) + self.assertEqual(thread.name, 'PushStats') + self.assertEqual(thread.push_url, 'https://example.com/api') + self.assertEqual(thread.period, 30) + self.assertTrue(thread.send_packetlist) + self.assertTrue(hasattr(thread, 'loop_count')) + + def test_init_uses_conf_when_args_not_passed(self): + """Test initialization uses CONF.push_stats when args omitted.""" + with mock.patch('aprsd.threads.stats.CONF') as mock_conf: + mock_conf.push_stats.push_url = 'https://conf.example.com' + mock_conf.push_stats.frequency_seconds = 15 + thread = APRSDPushStatsThread() + self.assertEqual(thread.push_url, 'https://conf.example.com') + self.assertEqual(thread.period, 15) + self.assertFalse(thread.send_packetlist) + + def test_loop_skips_push_when_period_not_reached(self): + """Test loop does not POST when loop_count not divisible by period.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + ) + thread.loop_count = 3 # 3 % 10 != 0 + + with ( + mock.patch('aprsd.threads.stats.collector.Collector') as mock_collector, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + ): + result = thread.loop() + + self.assertTrue(result) + mock_collector.return_value.collect.assert_not_called() + mock_post.assert_not_called() + + def test_loop_pushes_stats_and_removes_packetlist_by_default(self): + """Test loop collects stats, POSTs to url/stats, and strips PacketList.packets.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + send_packetlist=False, + ) + thread.loop_count = 10 + + collected = { + 'PacketList': {'packets': [1, 2, 3], 'rx': 5, 'tx': 1}, + 'Other': 'data', + } + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + ): + mock_collector_class.return_value.collect.return_value = collected + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + + result = thread.loop() + + self.assertTrue(result) + mock_collector_class.return_value.collect.assert_called_once_with( + serializable=True + ) + mock_post.assert_called_once() + call_args = mock_post.call_args + self.assertEqual(call_args[0][0], 'https://example.com/stats') + self.assertEqual(call_args[1]['headers'], {'Content-Type': 'application/json'}) + self.assertEqual(call_args[1]['timeout'], 5) + body = call_args[1]['json'] + self.assertEqual(body['time'], '01-01-2025 12:00:00') + self.assertNotIn('packets', body['stats']['PacketList']) + self.assertEqual(body['stats']['PacketList']['rx'], 5) + self.assertEqual(body['stats']['Other'], 'data') + + def test_loop_pushes_stats_with_packetlist_when_send_packetlist_true(self): + """Test loop includes PacketList.packets when send_packetlist is True.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + send_packetlist=True, + ) + thread.loop_count = 10 + + collected = {'PacketList': {'packets': [1, 2, 3], 'rx': 5}} + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + ): + mock_collector_class.return_value.collect.return_value = collected + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + + result = thread.loop() + + self.assertTrue(result) + body = mock_post.call_args[1]['json'] + self.assertEqual(body['stats']['PacketList']['packets'], [1, 2, 3]) + + def test_loop_on_http_200_logs_success(self): + """Test loop logs info on successful 200 response.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + ) + thread.loop_count = 10 + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + mock.patch('aprsd.threads.stats.LOGU') as mock_logu, + ): + mock_collector_class.return_value.collect.return_value = {} + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + mock_post.return_value.status_code = 200 + mock_post.return_value.raise_for_status = mock.Mock() + + result = thread.loop() + + self.assertTrue(result) + mock_logu.info.assert_called() + self.assertIn('Successfully pushed stats', mock_logu.info.call_args[0][0]) + + def test_loop_on_non_200_logs_warning(self): + """Test loop logs warning when response is not 200.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + ) + thread.loop_count = 10 + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + mock.patch('aprsd.threads.stats.LOGU') as mock_logu, + ): + mock_collector_class.return_value.collect.return_value = {} + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + mock_post.return_value.status_code = 500 + mock_post.return_value.raise_for_status = mock.Mock() + + result = thread.loop() + + self.assertTrue(result) + mock_logu.warning.assert_called_once() + self.assertIn('Failed to push stats', mock_logu.warning.call_args[0][0]) + self.assertIn('500', mock_logu.warning.call_args[0][0]) + + def test_loop_on_request_exception_logs_error_and_continues(self): + """Test loop logs error on requests.RequestException and returns True.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + ) + thread.loop_count = 10 + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + mock.patch('aprsd.threads.stats.LOGU') as mock_logu, + ): + mock_collector_class.return_value.collect.return_value = {} + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + mock_post.side_effect = requests.exceptions.ConnectionError( + 'Connection refused' + ) + + result = thread.loop() + + self.assertTrue(result) + mock_logu.error.assert_called_once() + self.assertIn('Error pushing stats', mock_logu.error.call_args[0][0]) + + def test_loop_on_other_exception_logs_error_and_continues(self): + """Test loop logs error on unexpected exception and returns True.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + ) + thread.loop_count = 10 + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + mock.patch('aprsd.threads.stats.LOGU') as mock_logu, + ): + mock_collector_class.return_value.collect.return_value = {} + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + mock_post.side_effect = ValueError('unexpected') + + result = thread.loop() + + self.assertTrue(result) + mock_logu.error.assert_called_once() + self.assertIn('Unexpected error in stats push', mock_logu.error.call_args[0][0]) + + def test_loop_no_packetlist_key_in_stats(self): + """Test loop does not fail when stats have no PacketList key.""" + thread = APRSDPushStatsThread( + push_url='https://example.com', + frequency_seconds=10, + send_packetlist=False, + ) + thread.loop_count = 10 + + collected = {'Only': 'data', 'No': 'PacketList'} + + with ( + mock.patch( + 'aprsd.threads.stats.collector.Collector' + ) as mock_collector_class, + mock.patch('aprsd.threads.stats.requests.post') as mock_post, + mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, + ): + mock_collector_class.return_value.collect.return_value = collected + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + + result = thread.loop() + + self.assertTrue(result) + body = mock_post.call_args[1]['json'] + self.assertEqual(body['stats'], collected) + + if __name__ == '__main__': unittest.main()