1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-03-31 12:15:34 -04:00

refactor(threads): migrate stats threads to Event-based timing

This commit is contained in:
Walter Boring 2026-03-24 12:06:41 -04:00
parent 44b8bc572d
commit 343ec3e81c
2 changed files with 149 additions and 156 deletions

View File

@ -31,20 +31,19 @@ class StatsStore(objectstore.ObjectStoreMixin):
class APRSDStatsStoreThread(APRSDThread):
"""Save APRSD Stats to disk periodically."""
# how often in seconds to write the file
save_interval = 10
daemon = False
period = 10
def __init__(self):
super().__init__('StatsStore')
def loop(self):
if self.loop_count % self.save_interval == 0:
stats = collector.Collector().collect()
ss = StatsStore()
ss.add(stats)
ss.save()
stats = collector.Collector().collect()
ss = StatsStore()
ss.add(stats)
ss.save()
time.sleep(1)
self.wait()
return True
@ -64,143 +63,140 @@ class APRSDPushStatsThread(APRSDThread):
self.send_packetlist = send_packetlist
def loop(self):
if self.loop_count % self.period == 0:
stats_json = collector.Collector().collect(serializable=True)
url = f'{self.push_url}/stats'
headers = {'Content-Type': 'application/json'}
# Remove the PacketList section to reduce payload size
if not self.send_packetlist:
if 'PacketList' in stats_json:
del stats_json['PacketList']['packets']
stats_json = collector.Collector().collect(serializable=True)
url = f'{self.push_url}/stats'
headers = {'Content-Type': 'application/json'}
# Remove the PacketList section to reduce payload size
if not self.send_packetlist:
if 'PacketList' in stats_json:
del stats_json['PacketList']['packets']
now = datetime.datetime.now()
time_format = '%m-%d-%Y %H:%M:%S'
stats = {
'time': now.strftime(time_format),
'stats': stats_json,
}
now = datetime.datetime.now()
time_format = '%m-%d-%Y %H:%M:%S'
stats = {
'time': now.strftime(time_format),
'stats': stats_json,
}
try:
response = requests.post(url, json=stats, headers=headers, timeout=5)
response.raise_for_status()
try:
response = requests.post(url, json=stats, headers=headers, timeout=5)
response.raise_for_status()
if response.status_code == 200:
LOGU.info(f'Successfully pushed stats to {self.push_url}')
else:
LOGU.warning(
f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
)
if response.status_code == 200:
LOGU.info(f'Successfully pushed stats to {self.push_url}')
else:
LOGU.warning(
f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
)
except requests.exceptions.RequestException as e:
LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
except Exception as e:
LOGU.error(f'Unexpected error in stats push: {e}')
except requests.exceptions.RequestException as e:
LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
except Exception as e:
LOGU.error(f'Unexpected error in stats push: {e}')
time.sleep(1)
self.wait()
return True
class StatsLogThread(APRSDThread):
"""Log the stats from the PacketList."""
period = 10
def __init__(self):
super().__init__('PacketStatsLog')
self._last_total_rx = 0
self.period = 10
self.start_time = time.time()
def loop(self):
if self.loop_count % self.period == 0:
# log the stats every 10 seconds
stats_json = collector.Collector().collect(serializable=True)
stats = stats_json['PacketList']
total_rx = stats['rx']
rx_delta = total_rx - self._last_total_rx
rate = rx_delta / self.period
# log the stats every 10 seconds
stats_json = collector.Collector().collect(serializable=True)
stats = stats_json['PacketList']
total_rx = stats['rx']
rx_delta = total_rx - self._last_total_rx
rate = rx_delta / self.period
# Get unique callsigns count from SeenList stats
seen_list_instance = seen_list.SeenList()
# stats() returns data while holding lock internally, so copy it immediately
seen_list_stats = seen_list_instance.stats()
seen_list_instance.save()
# Copy the stats to avoid holding references to locked data
seen_list_stats = seen_list_stats.copy()
unique_callsigns_count = len(seen_list_stats)
# Get unique callsigns count from SeenList stats
seen_list_instance = seen_list.SeenList()
# stats() returns data while holding lock internally, so copy it immediately
seen_list_stats = seen_list_instance.stats()
seen_list_instance.save()
# Copy the stats to avoid holding references to locked data
seen_list_stats = seen_list_stats.copy()
unique_callsigns_count = len(seen_list_stats)
# Calculate uptime
elapsed = time.time() - self.start_time
elapsed_minutes = elapsed / 60
elapsed_hours = elapsed / 3600
# Calculate uptime
elapsed = time.time() - self.start_time
elapsed_minutes = elapsed / 60
elapsed_hours = elapsed / 3600
# Log summary stats
LOGU.opt(colors=True).info(
f'<green>RX Rate: {rate:.2f} pps</green> '
f'<yellow>Total RX: {total_rx}</yellow> '
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
# Log summary stats
LOGU.opt(colors=True).info(
f'<green>RX Rate: {rate:.2f} pps</green> '
f'<yellow>Total RX: {total_rx}</yellow> '
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
)
LOGU.opt(colors=True).info(
f'<cyan>Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h)</cyan> '
f'<magenta>Unique Callsigns: {unique_callsigns_count}</magenta>',
)
self._last_total_rx = total_rx
# Log individual type stats, sorted by RX count (descending)
sorted_types = sorted(
stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
)
for k, v in sorted_types:
# Calculate percentage of this packet type compared to total RX
percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
# Format values first, then apply colors
packet_type_str = f'{k:<15}'
rx_count_str = f'{v["rx"]:6d}'
tx_count_str = f'{v["tx"]:6d}'
percentage_str = f'{percentage:5.1f}%'
# Use different colors for RX count based on threshold (matching mqtt_injest.py)
rx_color_tag = (
'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
)
LOGU.opt(colors=True).info(
f'<cyan>Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h)</cyan> '
f'<magenta>Unique Callsigns: {unique_callsigns_count}</magenta>',
f' <cyan>{packet_type_str}</cyan>: '
f'<{rx_color_tag}>RX: {rx_count_str}</{rx_color_tag}> '
f'<red>TX: {tx_count_str}</red> '
f'<magenta>({percentage_str})</magenta>',
)
self._last_total_rx = total_rx
# Log individual type stats, sorted by RX count (descending)
sorted_types = sorted(
stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
)
for k, v in sorted_types:
# Calculate percentage of this packet type compared to total RX
percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
# Format values first, then apply colors
packet_type_str = f'{k:<15}'
rx_count_str = f'{v["rx"]:6d}'
tx_count_str = f'{v["tx"]:6d}'
percentage_str = f'{percentage:5.1f}%'
# Use different colors for RX count based on threshold (matching mqtt_injest.py)
rx_color_tag = (
'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
)
# Extract callsign counts from seen_list stats
callsign_counts = {}
for callsign, data in seen_list_stats.items():
if isinstance(data, dict) and 'count' in data:
callsign_counts[callsign] = data['count']
# Sort callsigns by packet count (descending) and get top 10
sorted_callsigns = sorted(
callsign_counts.items(), key=lambda x: x[1], reverse=True
)[:10]
# Log top 10 callsigns
if sorted_callsigns:
LOGU.opt(colors=True).info('<cyan>Top 10 Callsigns by Packet Count:</cyan>')
total_ranks = len(sorted_callsigns)
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
# Calculate percentage of this callsign compared to total RX
percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
# Use different colors based on rank: most packets (rank 1) = red,
# least packets (last rank) = green, middle = yellow
if rank == 1:
count_color_tag = 'red'
elif rank == total_ranks:
count_color_tag = 'green'
else:
count_color_tag = 'yellow'
LOGU.opt(colors=True).info(
f' <cyan>{packet_type_str}</cyan>: '
f'<{rx_color_tag}>RX: {rx_count_str}</{rx_color_tag}> '
f'<red>TX: {tx_count_str}</red> '
f'<magenta>({percentage_str})</magenta>',
f' <cyan>{rank:2d}.</cyan> '
f'<white>{callsign:<12}</white>: '
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}> '
f'<magenta>({percentage:5.1f}%)</magenta>',
)
# Extract callsign counts from seen_list stats
callsign_counts = {}
for callsign, data in seen_list_stats.items():
if isinstance(data, dict) and 'count' in data:
callsign_counts[callsign] = data['count']
# Sort callsigns by packet count (descending) and get top 10
sorted_callsigns = sorted(
callsign_counts.items(), key=lambda x: x[1], reverse=True
)[:10]
# Log top 10 callsigns
if sorted_callsigns:
LOGU.opt(colors=True).info(
'<cyan>Top 10 Callsigns by Packet Count:</cyan>'
)
total_ranks = len(sorted_callsigns)
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
# Calculate percentage of this callsign compared to total RX
percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
# Use different colors based on rank: most packets (rank 1) = red,
# least packets (last rank) = green, middle = yellow
if rank == 1:
count_color_tag = 'red'
elif rank == total_ranks:
count_color_tag = 'green'
else:
count_color_tag = 'yellow'
LOGU.opt(colors=True).info(
f' <cyan>{rank:2d}.</cyan> '
f'<white>{callsign:<12}</white>: '
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}> '
f'<magenta>({percentage:5.1f}%)</magenta>',
)
time.sleep(1)
self.wait()
return True

View File

@ -74,26 +74,25 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
"""Test APRSDStatsStoreThread initialization."""
thread = APRSDStatsStoreThread()
self.assertEqual(thread.name, 'StatsStore')
self.assertEqual(thread.save_interval, 10)
self.assertEqual(thread.period, 10)
self.assertFalse(thread.daemon)
self.assertTrue(hasattr(thread, 'loop_count'))
def test_loop_with_save(self):
"""Test loop method when save interval is reached."""
"""Test loop method saves stats every call."""
thread = APRSDStatsStoreThread()
# Mock the collector and save methods
with (
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save,
mock.patch.object(thread, 'wait'),
):
# Setup mock collector to return some stats
mock_collector_instance = mock.Mock()
mock_collector_instance.collect.return_value = {'test': 'data'}
mock_collector_class.return_value = mock_collector_instance
# Set loop_count to match save interval
thread.loop_count = 10
# Call loop
result = thread.loop()
@ -104,45 +103,43 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
mock_collector_instance.collect.assert_called_once()
mock_save.assert_called_once()
def test_loop_without_save(self):
"""Test loop method when save interval is not reached."""
def test_loop_calls_wait(self):
"""Test loop method calls wait() at the end."""
thread = APRSDStatsStoreThread()
# Mock the collector and save methods
with (
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save,
mock.patch.object(thread, 'wait') as mock_wait,
):
# Setup mock collector to return some stats
mock_collector_instance = mock.Mock()
mock_collector_instance.collect.return_value = {'test': 'data'}
mock_collector_class.return_value = mock_collector_instance
# Set loop_count to not match save interval
thread.loop_count = 1
# Call loop
result = thread.loop()
# Should return True (continue looping)
self.assertTrue(result)
# Should not have called save
mock_save.assert_not_called()
# Should have called wait
mock_wait.assert_called_once()
def test_loop_with_exception(self):
"""Test loop method when an exception occurs."""
thread = APRSDStatsStoreThread()
# Mock the collector to raise an exception
with mock.patch('aprsd.stats.collector.Collector') as mock_collector_class:
with (
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
mock.patch.object(thread, 'wait'),
):
mock_collector_instance = mock.Mock()
mock_collector_instance.collect.side_effect = RuntimeError('Test exception')
mock_collector_class.return_value = mock_collector_instance
# Set loop_count to match save interval
thread.loop_count = 10
# Should raise the exception
with self.assertRaises(RuntimeError):
thread.loop()
@ -177,24 +174,31 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
self.assertEqual(thread.period, 15)
self.assertFalse(thread.send_packetlist)
def test_loop_skips_push_when_period_not_reached(self):
"""Test loop does not POST when loop_count not divisible by period."""
def test_loop_pushes_stats_every_call(self):
"""Test loop POSTs stats on every call (timing controlled by wait)."""
thread = APRSDPushStatsThread(
push_url='https://example.com',
frequency_seconds=10,
)
thread.loop_count = 3 # 3 % 10 != 0
with (
mock.patch('aprsd.threads.stats.collector.Collector') as mock_collector,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector.return_value.collect.return_value = {}
mock_dt.datetime.now.return_value.strftime.return_value = (
'01-01-2025 12:00:00'
)
mock_post.return_value.status_code = 200
mock_post.return_value.raise_for_status = mock.Mock()
result = thread.loop()
self.assertTrue(result)
mock_collector.return_value.collect.assert_not_called()
mock_post.assert_not_called()
mock_collector.return_value.collect.assert_called_once_with(serializable=True)
mock_post.assert_called_once()
def test_loop_pushes_stats_and_removes_packetlist_by_default(self):
"""Test loop collects stats, POSTs to url/stats, and strips PacketList.packets."""
@ -203,7 +207,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
frequency_seconds=10,
send_packetlist=False,
)
thread.loop_count = 10
collected = {
'PacketList': {'packets': [1, 2, 3], 'rx': 5, 'tx': 1},
@ -215,7 +218,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector_class.return_value.collect.return_value = collected
@ -247,7 +250,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
frequency_seconds=10,
send_packetlist=True,
)
thread.loop_count = 10
collected = {'PacketList': {'packets': [1, 2, 3], 'rx': 5}}
@ -256,7 +258,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector_class.return_value.collect.return_value = collected
@ -276,14 +278,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@ -306,14 +307,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@ -337,14 +337,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@ -368,14 +367,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@ -398,7 +396,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
frequency_seconds=10,
send_packetlist=False,
)
thread.loop_count = 10
collected = {'Only': 'data', 'No': 'PacketList'}
@ -407,7 +404,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
mock.patch('aprsd.threads.stats.time.sleep'),
mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector_class.return_value.collect.return_value = collected