mirror of
https://github.com/craigerl/aprsd.git
synced 2026-02-12 19:13:45 -05:00
Added new APRSDPushStatsThread
This allows an aprsd server instance to push it's to a remote location.
This commit is contained in:
parent
008fe3c83e
commit
c5ca4f11af
@ -12,7 +12,6 @@ import sys
|
||||
import time
|
||||
|
||||
import click
|
||||
import requests
|
||||
from loguru import logger
|
||||
from oslo_config import cfg
|
||||
from rich.console import Console
|
||||
@ -29,8 +28,7 @@ from aprsd.packets.filters import dupe_filter, packet_type
|
||||
from aprsd.stats import collector
|
||||
from aprsd.threads import keepalive, rx
|
||||
from aprsd.threads import stats as stats_thread
|
||||
from aprsd.threads.aprsd import APRSDThread
|
||||
from aprsd.threads.stats import StatsLogThread
|
||||
from aprsd.threads.stats import APRSDPushStatsThread, StatsLogThread
|
||||
|
||||
# setup the global logger
|
||||
# log.basicConfig(level=log.DEBUG) # level=10
|
||||
@ -86,51 +84,6 @@ class APRSDListenProcessThread(rx.APRSDFilterThread):
|
||||
self.plugin_manager.run(packet)
|
||||
|
||||
|
||||
class StatsExportThread(APRSDThread):
|
||||
"""Export stats to remote aprsd-exporter API."""
|
||||
|
||||
def __init__(self, exporter_url):
|
||||
super().__init__('StatsExport')
|
||||
self.exporter_url = exporter_url
|
||||
self.period = 10 # Export stats every 60 seconds
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.period == 0:
|
||||
try:
|
||||
# Collect all stats
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
# Remove the PacketList section to reduce payload size
|
||||
if 'PacketList' in stats_json:
|
||||
del stats_json['PacketList']['packets']
|
||||
|
||||
now = datetime.datetime.now()
|
||||
time_format = '%m-%d-%Y %H:%M:%S'
|
||||
stats = {
|
||||
'time': now.strftime(time_format),
|
||||
'stats': stats_json,
|
||||
}
|
||||
|
||||
# Send stats to exporter API
|
||||
url = f'{self.exporter_url}/stats'
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
response = requests.post(url, json=stats, headers=headers, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
LOGU.info(f'Successfully exported stats to {self.exporter_url}')
|
||||
else:
|
||||
LOGU.warning(
|
||||
f'Failed to export stats to {self.exporter_url}: HTTP {response.status_code}'
|
||||
)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
LOGU.error(f'Error exporting stats to {self.exporter_url}: {e}')
|
||||
except Exception as e:
|
||||
LOGU.error(f'Unexpected error in stats export: {e}')
|
||||
|
||||
time.sleep(1)
|
||||
return True
|
||||
|
||||
|
||||
@cli.command()
|
||||
@cli_helper.add_options(cli_helper.common_options)
|
||||
@click.option(
|
||||
@ -356,7 +309,7 @@ def listen(
|
||||
stats_export = None
|
||||
if export_stats:
|
||||
LOG.debug('Start StatsExportThread')
|
||||
stats_export = StatsExportThread(exporter_url)
|
||||
stats_export = APRSDPushStatsThread(push_url=exporter_url)
|
||||
stats_export.start()
|
||||
|
||||
keepalive_thread.start()
|
||||
|
||||
@ -168,6 +168,10 @@ def server(ctx, flush, enable_packet_stats):
|
||||
LOG.info('Beacon Enabled. Starting static Beacon thread.')
|
||||
service_threads.register(tx.BeaconSendThread())
|
||||
|
||||
if CONF.push_stats.enabled:
|
||||
LOG.info('Push Stats Enabled. Starting Push Stats thread.')
|
||||
service_threads.register(stats_thread.APRSDPushStatsThread())
|
||||
|
||||
if CONF.aprs_registry.enabled:
|
||||
LOG.info('Registry Enabled. Starting Registry thread.')
|
||||
service_threads.register(registry.APRSRegistryThread())
|
||||
|
||||
@ -16,6 +16,11 @@ registry_group = cfg.OptGroup(
|
||||
title='APRS Registry settings',
|
||||
)
|
||||
|
||||
push_stats_group = cfg.OptGroup(
|
||||
name='push_stats',
|
||||
title='Push local stats to a remote API',
|
||||
)
|
||||
|
||||
aprsd_opts = [
|
||||
cfg.StrOpt(
|
||||
'callsign',
|
||||
@ -180,6 +185,24 @@ watch_list_opts = [
|
||||
),
|
||||
]
|
||||
|
||||
push_stats_opts = [
|
||||
cfg.BoolOpt(
|
||||
'enabled',
|
||||
default=False,
|
||||
help='Enable pushing local stats to a remote API.',
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'push_url',
|
||||
default=None,
|
||||
help='The URL of the remote API to push the stats to. This should be the base URL of the API.'
|
||||
'APRSD Will make a POST request to this url endpoint.',
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'frequency_seconds',
|
||||
default=15,
|
||||
help='The frequency in seconds to push the stats to the remote API.',
|
||||
),
|
||||
]
|
||||
|
||||
enabled_plugins_opts = [
|
||||
cfg.ListOpt(
|
||||
@ -240,6 +263,8 @@ def register_opts(config):
|
||||
config.register_opts(watch_list_opts, group=watch_list_group)
|
||||
config.register_group(registry_group)
|
||||
config.register_opts(registry_opts, group=registry_group)
|
||||
config.register_group(push_stats_group)
|
||||
config.register_opts(push_stats_opts, group=push_stats_group)
|
||||
|
||||
|
||||
def list_opts():
|
||||
@ -247,4 +272,5 @@ def list_opts():
|
||||
'DEFAULT': (aprsd_opts + enabled_plugins_opts),
|
||||
watch_list_group.name: watch_list_opts,
|
||||
registry_group.name: registry_opts,
|
||||
push_stats_group.name: push_stats_opts,
|
||||
}
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
from oslo_config import cfg
|
||||
|
||||
@ -46,6 +48,58 @@ class APRSDStatsStoreThread(APRSDThread):
|
||||
return True
|
||||
|
||||
|
||||
class APRSDPushStatsThread(APRSDThread):
|
||||
"""Push the local stats to a remote API."""
|
||||
|
||||
def __init__(
|
||||
self, push_url=None, frequency_seconds=None, send_packetlist: bool = False
|
||||
):
|
||||
super().__init__('PushStats')
|
||||
self.push_url = push_url if push_url else CONF.push_stats.push_url
|
||||
self.period = (
|
||||
frequency_seconds
|
||||
if frequency_seconds
|
||||
else CONF.push_stats.frequency_seconds
|
||||
)
|
||||
self.send_packetlist = send_packetlist
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.period == 0:
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
url = f'{self.push_url}/stats'
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
# Remove the PacketList section to reduce payload size
|
||||
if not self.send_packetlist:
|
||||
if 'PacketList' in stats_json:
|
||||
del stats_json['PacketList']['packets']
|
||||
|
||||
now = datetime.datetime.now()
|
||||
time_format = '%m-%d-%Y %H:%M:%S'
|
||||
stats = {
|
||||
'time': now.strftime(time_format),
|
||||
'stats': stats_json,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=stats, headers=headers, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
if response.status_code == 200:
|
||||
LOGU.info(f'Successfully pushed stats to {self.push_url}')
|
||||
else:
|
||||
LOGU.warning(
|
||||
f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
|
||||
)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
|
||||
except Exception as e:
|
||||
LOGU.error(f'Unexpected error in stats push: {e}')
|
||||
|
||||
time.sleep(1)
|
||||
return True
|
||||
|
||||
|
||||
class StatsLogThread(APRSDThread):
|
||||
"""Log the stats from the PacketList."""
|
||||
|
||||
|
||||
@ -1,8 +1,14 @@
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import requests
|
||||
|
||||
from aprsd.stats import collector
|
||||
from aprsd.threads.stats import APRSDStatsStoreThread, StatsStore
|
||||
from aprsd.threads.stats import (
|
||||
APRSDPushStatsThread,
|
||||
APRSDStatsStoreThread,
|
||||
StatsStore,
|
||||
)
|
||||
|
||||
|
||||
class TestStatsStore(unittest.TestCase):
|
||||
@ -145,5 +151,276 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
|
||||
# since the increment happens in the parent run() method, not in loop()
|
||||
|
||||
|
||||
class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
"""Unit tests for the APRSDPushStatsThread class."""
|
||||
|
||||
def test_init_with_explicit_args(self):
|
||||
"""Test initialization with explicit push_url, frequency, and send_packetlist."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com/api',
|
||||
frequency_seconds=30,
|
||||
send_packetlist=True,
|
||||
)
|
||||
self.assertEqual(thread.name, 'PushStats')
|
||||
self.assertEqual(thread.push_url, 'https://example.com/api')
|
||||
self.assertEqual(thread.period, 30)
|
||||
self.assertTrue(thread.send_packetlist)
|
||||
self.assertTrue(hasattr(thread, 'loop_count'))
|
||||
|
||||
def test_init_uses_conf_when_args_not_passed(self):
|
||||
"""Test initialization uses CONF.push_stats when args omitted."""
|
||||
with mock.patch('aprsd.threads.stats.CONF') as mock_conf:
|
||||
mock_conf.push_stats.push_url = 'https://conf.example.com'
|
||||
mock_conf.push_stats.frequency_seconds = 15
|
||||
thread = APRSDPushStatsThread()
|
||||
self.assertEqual(thread.push_url, 'https://conf.example.com')
|
||||
self.assertEqual(thread.period, 15)
|
||||
self.assertFalse(thread.send_packetlist)
|
||||
|
||||
def test_loop_skips_push_when_period_not_reached(self):
|
||||
"""Test loop does not POST when loop_count not divisible by period."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 3 # 3 % 10 != 0
|
||||
|
||||
with (
|
||||
mock.patch('aprsd.threads.stats.collector.Collector') as mock_collector,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
):
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_collector.return_value.collect.assert_not_called()
|
||||
mock_post.assert_not_called()
|
||||
|
||||
def test_loop_pushes_stats_and_removes_packetlist_by_default(self):
|
||||
"""Test loop collects stats, POSTs to url/stats, and strips PacketList.packets."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
send_packetlist=False,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
collected = {
|
||||
'PacketList': {'packets': [1, 2, 3], 'rx': 5, 'tx': 1},
|
||||
'Other': 'data',
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = collected
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_collector_class.return_value.collect.assert_called_once_with(
|
||||
serializable=True
|
||||
)
|
||||
mock_post.assert_called_once()
|
||||
call_args = mock_post.call_args
|
||||
self.assertEqual(call_args[0][0], 'https://example.com/stats')
|
||||
self.assertEqual(call_args[1]['headers'], {'Content-Type': 'application/json'})
|
||||
self.assertEqual(call_args[1]['timeout'], 5)
|
||||
body = call_args[1]['json']
|
||||
self.assertEqual(body['time'], '01-01-2025 12:00:00')
|
||||
self.assertNotIn('packets', body['stats']['PacketList'])
|
||||
self.assertEqual(body['stats']['PacketList']['rx'], 5)
|
||||
self.assertEqual(body['stats']['Other'], 'data')
|
||||
|
||||
def test_loop_pushes_stats_with_packetlist_when_send_packetlist_true(self):
|
||||
"""Test loop includes PacketList.packets when send_packetlist is True."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
send_packetlist=True,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
collected = {'PacketList': {'packets': [1, 2, 3], 'rx': 5}}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = collected
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
body = mock_post.call_args[1]['json']
|
||||
self.assertEqual(body['stats']['PacketList']['packets'], [1, 2, 3])
|
||||
|
||||
def test_loop_on_http_200_logs_success(self):
|
||||
"""Test loop logs info on successful 200 response."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = {}
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
mock_post.return_value.status_code = 200
|
||||
mock_post.return_value.raise_for_status = mock.Mock()
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_logu.info.assert_called()
|
||||
self.assertIn('Successfully pushed stats', mock_logu.info.call_args[0][0])
|
||||
|
||||
def test_loop_on_non_200_logs_warning(self):
|
||||
"""Test loop logs warning when response is not 200."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = {}
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
mock_post.return_value.status_code = 500
|
||||
mock_post.return_value.raise_for_status = mock.Mock()
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_logu.warning.assert_called_once()
|
||||
self.assertIn('Failed to push stats', mock_logu.warning.call_args[0][0])
|
||||
self.assertIn('500', mock_logu.warning.call_args[0][0])
|
||||
|
||||
def test_loop_on_request_exception_logs_error_and_continues(self):
|
||||
"""Test loop logs error on requests.RequestException and returns True."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = {}
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
mock_post.side_effect = requests.exceptions.ConnectionError(
|
||||
'Connection refused'
|
||||
)
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_logu.error.assert_called_once()
|
||||
self.assertIn('Error pushing stats', mock_logu.error.call_args[0][0])
|
||||
|
||||
def test_loop_on_other_exception_logs_error_and_continues(self):
|
||||
"""Test loop logs error on unexpected exception and returns True."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = {}
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
mock_post.side_effect = ValueError('unexpected')
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_logu.error.assert_called_once()
|
||||
self.assertIn('Unexpected error in stats push', mock_logu.error.call_args[0][0])
|
||||
|
||||
def test_loop_no_packetlist_key_in_stats(self):
|
||||
"""Test loop does not fail when stats have no PacketList key."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
send_packetlist=False,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
collected = {'Only': 'data', 'No': 'PacketList'}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = collected
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
body = mock_post.call_args[1]['json']
|
||||
self.assertEqual(body['stats'], collected)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user