From 343ec3e81c59112823e97b4b274febc87efff3ad Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Tue, 24 Mar 2026 12:06:41 -0400 Subject: [PATCH] refactor(threads): migrate stats threads to Event-based timing --- aprsd/threads/stats.py | 236 ++++++++++++++++++------------------ tests/threads/test_stats.py | 69 +++++------ 2 files changed, 149 insertions(+), 156 deletions(-) diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py index e2d3f5e..91eb00c 100644 --- a/aprsd/threads/stats.py +++ b/aprsd/threads/stats.py @@ -31,20 +31,19 @@ class StatsStore(objectstore.ObjectStoreMixin): class APRSDStatsStoreThread(APRSDThread): """Save APRSD Stats to disk periodically.""" - # how often in seconds to write the file - save_interval = 10 + daemon = False + period = 10 def __init__(self): super().__init__('StatsStore') def loop(self): - if self.loop_count % self.save_interval == 0: - stats = collector.Collector().collect() - ss = StatsStore() - ss.add(stats) - ss.save() + stats = collector.Collector().collect() + ss = StatsStore() + ss.add(stats) + ss.save() - time.sleep(1) + self.wait() return True @@ -64,143 +63,140 @@ class APRSDPushStatsThread(APRSDThread): self.send_packetlist = send_packetlist def loop(self): - if self.loop_count % self.period == 0: - stats_json = collector.Collector().collect(serializable=True) - url = f'{self.push_url}/stats' - headers = {'Content-Type': 'application/json'} - # Remove the PacketList section to reduce payload size - if not self.send_packetlist: - if 'PacketList' in stats_json: - del stats_json['PacketList']['packets'] + stats_json = collector.Collector().collect(serializable=True) + url = f'{self.push_url}/stats' + headers = {'Content-Type': 'application/json'} + # Remove the PacketList section to reduce payload size + if not self.send_packetlist: + if 'PacketList' in stats_json: + del stats_json['PacketList']['packets'] - now = datetime.datetime.now() - time_format = '%m-%d-%Y %H:%M:%S' - stats = { - 'time': now.strftime(time_format), - 'stats': stats_json, - } + now = datetime.datetime.now() + time_format = '%m-%d-%Y %H:%M:%S' + stats = { + 'time': now.strftime(time_format), + 'stats': stats_json, + } - try: - response = requests.post(url, json=stats, headers=headers, timeout=5) - response.raise_for_status() + try: + response = requests.post(url, json=stats, headers=headers, timeout=5) + response.raise_for_status() - if response.status_code == 200: - LOGU.info(f'Successfully pushed stats to {self.push_url}') - else: - LOGU.warning( - f'Failed to push stats to {self.push_url}: HTTP {response.status_code}' - ) + if response.status_code == 200: + LOGU.info(f'Successfully pushed stats to {self.push_url}') + else: + LOGU.warning( + f'Failed to push stats to {self.push_url}: HTTP {response.status_code}' + ) - except requests.exceptions.RequestException as e: - LOGU.error(f'Error pushing stats to {self.push_url}: {e}') - except Exception as e: - LOGU.error(f'Unexpected error in stats push: {e}') + except requests.exceptions.RequestException as e: + LOGU.error(f'Error pushing stats to {self.push_url}: {e}') + except Exception as e: + LOGU.error(f'Unexpected error in stats push: {e}') - time.sleep(1) + self.wait() return True class StatsLogThread(APRSDThread): """Log the stats from the PacketList.""" + period = 10 + def __init__(self): super().__init__('PacketStatsLog') self._last_total_rx = 0 - self.period = 10 self.start_time = time.time() def loop(self): - if self.loop_count % self.period == 0: - # log the stats every 10 seconds - stats_json = collector.Collector().collect(serializable=True) - stats = stats_json['PacketList'] - total_rx = stats['rx'] - rx_delta = total_rx - self._last_total_rx - rate = rx_delta / self.period + # log the stats every 10 seconds + stats_json = collector.Collector().collect(serializable=True) + stats = stats_json['PacketList'] + total_rx = stats['rx'] + rx_delta = total_rx - self._last_total_rx + rate = rx_delta / self.period - # Get unique callsigns count from SeenList stats - seen_list_instance = seen_list.SeenList() - # stats() returns data while holding lock internally, so copy it immediately - seen_list_stats = seen_list_instance.stats() - seen_list_instance.save() - # Copy the stats to avoid holding references to locked data - seen_list_stats = seen_list_stats.copy() - unique_callsigns_count = len(seen_list_stats) + # Get unique callsigns count from SeenList stats + seen_list_instance = seen_list.SeenList() + # stats() returns data while holding lock internally, so copy it immediately + seen_list_stats = seen_list_instance.stats() + seen_list_instance.save() + # Copy the stats to avoid holding references to locked data + seen_list_stats = seen_list_stats.copy() + unique_callsigns_count = len(seen_list_stats) - # Calculate uptime - elapsed = time.time() - self.start_time - elapsed_minutes = elapsed / 60 - elapsed_hours = elapsed / 3600 + # Calculate uptime + elapsed = time.time() - self.start_time + elapsed_minutes = elapsed / 60 + elapsed_hours = elapsed / 3600 - # Log summary stats - LOGU.opt(colors=True).info( - f'RX Rate: {rate:.2f} pps ' - f'Total RX: {total_rx} ' - f'RX Last {self.period} secs: {rx_delta} ' + # Log summary stats + LOGU.opt(colors=True).info( + f'RX Rate: {rate:.2f} pps ' + f'Total RX: {total_rx} ' + f'RX Last {self.period} secs: {rx_delta} ' + ) + LOGU.opt(colors=True).info( + f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) ' + f'Unique Callsigns: {unique_callsigns_count}', + ) + self._last_total_rx = total_rx + + # Log individual type stats, sorted by RX count (descending) + sorted_types = sorted( + stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True + ) + for k, v in sorted_types: + # Calculate percentage of this packet type compared to total RX + percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0 + # Format values first, then apply colors + packet_type_str = f'{k:<15}' + rx_count_str = f'{v["rx"]:6d}' + tx_count_str = f'{v["tx"]:6d}' + percentage_str = f'{percentage:5.1f}%' + # Use different colors for RX count based on threshold (matching mqtt_injest.py) + rx_color_tag = ( + 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red' ) LOGU.opt(colors=True).info( - f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) ' - f'Unique Callsigns: {unique_callsigns_count}', + f' {packet_type_str}: ' + f'<{rx_color_tag}>RX: {rx_count_str} ' + f'TX: {tx_count_str} ' + f'({percentage_str})', ) - self._last_total_rx = total_rx - # Log individual type stats, sorted by RX count (descending) - sorted_types = sorted( - stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True - ) - for k, v in sorted_types: - # Calculate percentage of this packet type compared to total RX - percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0 - # Format values first, then apply colors - packet_type_str = f'{k:<15}' - rx_count_str = f'{v["rx"]:6d}' - tx_count_str = f'{v["tx"]:6d}' - percentage_str = f'{percentage:5.1f}%' - # Use different colors for RX count based on threshold (matching mqtt_injest.py) - rx_color_tag = ( - 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red' - ) + # Extract callsign counts from seen_list stats + callsign_counts = {} + for callsign, data in seen_list_stats.items(): + if isinstance(data, dict) and 'count' in data: + callsign_counts[callsign] = data['count'] + + # Sort callsigns by packet count (descending) and get top 10 + sorted_callsigns = sorted( + callsign_counts.items(), key=lambda x: x[1], reverse=True + )[:10] + + # Log top 10 callsigns + if sorted_callsigns: + LOGU.opt(colors=True).info('Top 10 Callsigns by Packet Count:') + total_ranks = len(sorted_callsigns) + for rank, (callsign, count) in enumerate(sorted_callsigns, 1): + # Calculate percentage of this callsign compared to total RX + percentage = (count / total_rx * 100) if total_rx > 0 else 0.0 + # Use different colors based on rank: most packets (rank 1) = red, + # least packets (last rank) = green, middle = yellow + if rank == 1: + count_color_tag = 'red' + elif rank == total_ranks: + count_color_tag = 'green' + else: + count_color_tag = 'yellow' LOGU.opt(colors=True).info( - f' {packet_type_str}: ' - f'<{rx_color_tag}>RX: {rx_count_str} ' - f'TX: {tx_count_str} ' - f'({percentage_str})', + f' {rank:2d}. ' + f'{callsign:<12}: ' + f'<{count_color_tag}>{count:6d} packets ' + f'({percentage:5.1f}%)', ) - # Extract callsign counts from seen_list stats - callsign_counts = {} - for callsign, data in seen_list_stats.items(): - if isinstance(data, dict) and 'count' in data: - callsign_counts[callsign] = data['count'] - - # Sort callsigns by packet count (descending) and get top 10 - sorted_callsigns = sorted( - callsign_counts.items(), key=lambda x: x[1], reverse=True - )[:10] - - # Log top 10 callsigns - if sorted_callsigns: - LOGU.opt(colors=True).info( - 'Top 10 Callsigns by Packet Count:' - ) - total_ranks = len(sorted_callsigns) - for rank, (callsign, count) in enumerate(sorted_callsigns, 1): - # Calculate percentage of this callsign compared to total RX - percentage = (count / total_rx * 100) if total_rx > 0 else 0.0 - # Use different colors based on rank: most packets (rank 1) = red, - # least packets (last rank) = green, middle = yellow - if rank == 1: - count_color_tag = 'red' - elif rank == total_ranks: - count_color_tag = 'green' - else: - count_color_tag = 'yellow' - LOGU.opt(colors=True).info( - f' {rank:2d}. ' - f'{callsign:<12}: ' - f'<{count_color_tag}>{count:6d} packets ' - f'({percentage:5.1f}%)', - ) - - time.sleep(1) + self.wait() return True diff --git a/tests/threads/test_stats.py b/tests/threads/test_stats.py index 0bffa8a..33226aa 100644 --- a/tests/threads/test_stats.py +++ b/tests/threads/test_stats.py @@ -74,26 +74,25 @@ class TestAPRSDStatsStoreThread(unittest.TestCase): """Test APRSDStatsStoreThread initialization.""" thread = APRSDStatsStoreThread() self.assertEqual(thread.name, 'StatsStore') - self.assertEqual(thread.save_interval, 10) + self.assertEqual(thread.period, 10) + self.assertFalse(thread.daemon) self.assertTrue(hasattr(thread, 'loop_count')) def test_loop_with_save(self): - """Test loop method when save interval is reached.""" + """Test loop method saves stats every call.""" thread = APRSDStatsStoreThread() # Mock the collector and save methods with ( mock.patch('aprsd.stats.collector.Collector') as mock_collector_class, mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save, + mock.patch.object(thread, 'wait'), ): # Setup mock collector to return some stats mock_collector_instance = mock.Mock() mock_collector_instance.collect.return_value = {'test': 'data'} mock_collector_class.return_value = mock_collector_instance - # Set loop_count to match save interval - thread.loop_count = 10 - # Call loop result = thread.loop() @@ -104,45 +103,43 @@ class TestAPRSDStatsStoreThread(unittest.TestCase): mock_collector_instance.collect.assert_called_once() mock_save.assert_called_once() - def test_loop_without_save(self): - """Test loop method when save interval is not reached.""" + def test_loop_calls_wait(self): + """Test loop method calls wait() at the end.""" thread = APRSDStatsStoreThread() # Mock the collector and save methods with ( mock.patch('aprsd.stats.collector.Collector') as mock_collector_class, mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save, + mock.patch.object(thread, 'wait') as mock_wait, ): # Setup mock collector to return some stats mock_collector_instance = mock.Mock() mock_collector_instance.collect.return_value = {'test': 'data'} mock_collector_class.return_value = mock_collector_instance - # Set loop_count to not match save interval - thread.loop_count = 1 - # Call loop result = thread.loop() # Should return True (continue looping) self.assertTrue(result) - # Should not have called save - mock_save.assert_not_called() + # Should have called wait + mock_wait.assert_called_once() def test_loop_with_exception(self): """Test loop method when an exception occurs.""" thread = APRSDStatsStoreThread() # Mock the collector to raise an exception - with mock.patch('aprsd.stats.collector.Collector') as mock_collector_class: + with ( + mock.patch('aprsd.stats.collector.Collector') as mock_collector_class, + mock.patch.object(thread, 'wait'), + ): mock_collector_instance = mock.Mock() mock_collector_instance.collect.side_effect = RuntimeError('Test exception') mock_collector_class.return_value = mock_collector_instance - # Set loop_count to match save interval - thread.loop_count = 10 - # Should raise the exception with self.assertRaises(RuntimeError): thread.loop() @@ -177,24 +174,31 @@ class TestAPRSDPushStatsThread(unittest.TestCase): self.assertEqual(thread.period, 15) self.assertFalse(thread.send_packetlist) - def test_loop_skips_push_when_period_not_reached(self): - """Test loop does not POST when loop_count not divisible by period.""" + def test_loop_pushes_stats_every_call(self): + """Test loop POSTs stats on every call (timing controlled by wait).""" thread = APRSDPushStatsThread( push_url='https://example.com', frequency_seconds=10, ) - thread.loop_count = 3 # 3 % 10 != 0 with ( mock.patch('aprsd.threads.stats.collector.Collector') as mock_collector, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), + mock.patch('aprsd.threads.stats.datetime') as mock_dt, ): + mock_collector.return_value.collect.return_value = {} + mock_dt.datetime.now.return_value.strftime.return_value = ( + '01-01-2025 12:00:00' + ) + mock_post.return_value.status_code = 200 + mock_post.return_value.raise_for_status = mock.Mock() + result = thread.loop() self.assertTrue(result) - mock_collector.return_value.collect.assert_not_called() - mock_post.assert_not_called() + mock_collector.return_value.collect.assert_called_once_with(serializable=True) + mock_post.assert_called_once() def test_loop_pushes_stats_and_removes_packetlist_by_default(self): """Test loop collects stats, POSTs to url/stats, and strips PacketList.packets.""" @@ -203,7 +207,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase): frequency_seconds=10, send_packetlist=False, ) - thread.loop_count = 10 collected = { 'PacketList': {'packets': [1, 2, 3], 'rx': 5, 'tx': 1}, @@ -215,7 +218,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase): 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, ): mock_collector_class.return_value.collect.return_value = collected @@ -247,7 +250,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase): frequency_seconds=10, send_packetlist=True, ) - thread.loop_count = 10 collected = {'PacketList': {'packets': [1, 2, 3], 'rx': 5}} @@ -256,7 +258,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase): 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, ): mock_collector_class.return_value.collect.return_value = collected @@ -276,14 +278,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase): push_url='https://example.com', frequency_seconds=10, ) - thread.loop_count = 10 with ( mock.patch( 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, mock.patch('aprsd.threads.stats.LOGU') as mock_logu, ): @@ -306,14 +307,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase): push_url='https://example.com', frequency_seconds=10, ) - thread.loop_count = 10 with ( mock.patch( 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, mock.patch('aprsd.threads.stats.LOGU') as mock_logu, ): @@ -337,14 +337,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase): push_url='https://example.com', frequency_seconds=10, ) - thread.loop_count = 10 with ( mock.patch( 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, mock.patch('aprsd.threads.stats.LOGU') as mock_logu, ): @@ -368,14 +367,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase): push_url='https://example.com', frequency_seconds=10, ) - thread.loop_count = 10 with ( mock.patch( 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, mock.patch('aprsd.threads.stats.LOGU') as mock_logu, ): @@ -398,7 +396,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase): frequency_seconds=10, send_packetlist=False, ) - thread.loop_count = 10 collected = {'Only': 'data', 'No': 'PacketList'} @@ -407,7 +404,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase): 'aprsd.threads.stats.collector.Collector' ) as mock_collector_class, mock.patch('aprsd.threads.stats.requests.post') as mock_post, - mock.patch('aprsd.threads.stats.time.sleep'), + mock.patch.object(thread, 'wait'), mock.patch('aprsd.threads.stats.datetime') as mock_dt, ): mock_collector_class.return_value.collect.return_value = collected