From 0620e63e720fbf8c85dead0235e95fd3dcf2c7a3 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Mon, 12 Jan 2026 23:26:49 -0500 Subject: [PATCH] added more unit tests --- tests/packets/test_log.py | 208 +++++++++++++++++++++ tests/plugins/test_package.py | 85 +++++++++ tests/threads/test_stats.py | 149 +++++++++++++++ tests/utils/test_ring_buffer_additional.py | 172 +++++++++++++++++ 4 files changed, 614 insertions(+) create mode 100644 tests/packets/test_log.py create mode 100644 tests/plugins/test_package.py create mode 100644 tests/threads/test_stats.py create mode 100644 tests/utils/test_ring_buffer_additional.py diff --git a/tests/packets/test_log.py b/tests/packets/test_log.py new file mode 100644 index 0000000..a5bc643 --- /dev/null +++ b/tests/packets/test_log.py @@ -0,0 +1,208 @@ +import unittest +from unittest import mock + +from aprsd import packets +from aprsd.packets import log +from tests import fake + + +class TestPacketLog(unittest.TestCase): + """Unit tests for the packet logging functions.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock the logging to avoid actual log output during tests + self.loguru_opt_mock = mock.patch('aprsd.packets.log.LOGU.opt').start() + self.loguru_info_mock = self.loguru_opt_mock.return_value.info + self.logging_mock = mock.patch('aprsd.packets.log.LOG').start() + self.haversine_mock = mock.patch('aprsd.packets.log.haversine').start() + self.utils_mock = mock.patch('aprsd.packets.log.utils').start() + self.conf_mock = mock.patch('aprsd.packets.log.CONF').start() + + # Set default configuration values + self.conf_mock.enable_packet_logging = True + self.conf_mock.log_packet_format = ( + 'multiline' # Changed from 'compact' to 'multiline' + ) + self.conf_mock.default_ack_send_count = 3 + self.conf_mock.default_packet_send_count = 5 + self.conf_mock.latitude = 37.7749 + self.conf_mock.longitude = -122.4194 + + # Set up the utils mock methods + self.utils_mock.calculate_initial_compass_bearing.return_value = 45.0 + self.utils_mock.degrees_to_cardinal.return_value = 'NE' + self.haversine_mock.return_value = 10.5 + + # No need to mock packet.raw since we create real packets with raw data + # The packet objects created in tests will have their raw attribute set properly + + def tearDown(self): + """Clean up after tests.""" + # Stop all mocks + mock.patch.stopall() + + def test_log_multiline_with_ack_packet(self): + """Test log_multiline with an AckPacket.""" + # Create a fake AckPacket + packet = fake.fake_ack_packet() + packet.send_count = 1 + + # Call the function + log.log_multiline(packet, tx=True, header=True) + + # Verify that logging was called + self.loguru_opt_mock.assert_called_once() + self.loguru_info_mock.assert_called_once() + # LOG.debug is no longer called in log_multiline + + def test_log_multiline_with_gps_packet(self): + """Test log_multiline with a GPSPacket.""" + # Create a fake GPSPacket + packet = packets.GPSPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + latitude=37.7749, + longitude=-122.4194, + symbol='>', + comment='Test GPS comment', + ) + packet.send_count = 2 + + # Call the function + log.log_multiline(packet, tx=False, header=True) + + # Verify that logging was called + self.loguru_opt_mock.assert_called_once() + self.loguru_info_mock.assert_called_once() + # LOG.debug is no longer called in log_multiline + + def test_log_multiline_disabled_logging(self): + """Test log_multiline when packet logging is disabled.""" + # Disable packet logging + self.conf_mock.enable_packet_logging = False + + # Create a fake packet + packet = fake.fake_packet() + packet.send_count = 0 + + # Call the function + log.log_multiline(packet, tx=False, header=True) + + # Verify that logging was NOT called + self.loguru_opt_mock.assert_not_called() + self.logging_mock.debug.assert_not_called() + + def test_log_multiline_compact_format(self): + """Test log_multiline when log format is compact.""" + # Set compact format + self.conf_mock.log_packet_format = 'compact' + + # Create a fake packet + packet = fake.fake_packet() + packet.send_count = 0 + + # Call the function + log.log_multiline(packet, tx=False, header=True) + + # Verify that logging was NOT called (because of compact format) + self.loguru_opt_mock.assert_not_called() + self.logging_mock.debug.assert_not_called() + + def test_log_with_compact_format(self): + """Test log function with compact format.""" + # Set compact format + self.conf_mock.log_packet_format = 'compact' + + # Create a fake packet + packet = fake.fake_packet() + packet.send_count = 1 + + # Call the function + log.log(packet, tx=True, header=True, packet_count=1) + + # Verify that logging was called (but may be different behavior) + self.loguru_opt_mock.assert_called_once() + + def test_log_with_multiline_format(self): + """Test log function with multiline format.""" + # Set multiline format + self.conf_mock.log_packet_format = 'multiline' + + # Create a fake packet + packet = fake.fake_packet() + packet.send_count = 1 + + # Call the function + log.log(packet, tx=True, header=True, packet_count=1) + + # Verify that logging was called + self.loguru_opt_mock.assert_called_once() + + def test_log_with_gps_packet_distance(self): + """Test log function with GPS packet that includes distance info.""" + # Create a GPSPacket + packet = packets.GPSPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + latitude=37.7749, + longitude=-122.4194, + symbol='>', + comment='Test GPS comment', + ) + packet.send_count = 2 + + # Call the function + log.log(packet, tx=False, header=True) + + # Verify that logging was called + self.loguru_opt_mock.assert_called_once() + + def test_log_with_disabled_logging(self): + """Test log function when packet logging is disabled.""" + # Disable packet logging + self.conf_mock.enable_packet_logging = False + + # Create a fake packet + packet = fake.fake_packet() + packet.send_count = 0 + + # Call the function + log.log(packet, tx=False, header=True, force_log=False) + + # Verify that logging was NOT called + self.loguru_opt_mock.assert_not_called() + + def test_log_with_force_log(self): + """Test log function with force_log=True even when logging is disabled.""" + # Disable packet logging + self.conf_mock.enable_packet_logging = False + + # Create a fake packet + packet = fake.fake_packet() + packet.send_count = 0 + + # Call the function with force_log=True + log.log(packet, tx=False, header=True, force_log=True) + + # Verify that logging WAS called because of force_log=True + self.loguru_opt_mock.assert_called_once() + + def test_log_with_different_packet_types(self): + """Test log function with different packet types.""" + # Test with MessagePacket + packet = fake.fake_packet() + packet.send_count = 1 + + log.log(packet, tx=False, header=True) + self.loguru_opt_mock.assert_called_once() + + # Reset mocks + self.loguru_opt_mock.reset_mock() + + # Test with AckPacket + ack_packet = fake.fake_ack_packet() + ack_packet.send_count = 2 + + log.log(ack_packet, tx=True, header=True) + self.loguru_opt_mock.assert_called_once() diff --git a/tests/plugins/test_package.py b/tests/plugins/test_package.py new file mode 100644 index 0000000..d0c715e --- /dev/null +++ b/tests/plugins/test_package.py @@ -0,0 +1,85 @@ +import os +import unittest + +from aprsd import plugin +from aprsd.utils import package + + +class TestPackage(unittest.TestCase): + def test_plugin_type(self): + self.assertEqual( + package.plugin_type(plugin.APRSDRegexCommandPluginBase), 'RegexCommand' + ) + self.assertEqual( + package.plugin_type(plugin.APRSDWatchListPluginBase), 'WatchList' + ) + self.assertEqual(package.plugin_type(plugin.APRSDPluginBase), 'APRSDPluginBase') + + def test_is_plugin(self): + class TestPlugin(plugin.APRSDPluginBase): + def setup(self): + pass + + def filter(self, packet): + pass + + def process(self, packet): + pass + + class NonPlugin: + pass + + self.assertTrue(package.is_plugin(TestPlugin)) + self.assertFalse(package.is_plugin(NonPlugin)) + + def test_walk_package(self): + import aprsd.utils + + result = package.walk_package(aprsd.utils) + # walk_package returns an iterator, so we just check it's not None + self.assertIsNotNone(result) + + def test_get_module_info(self): + # Test with a specific, limited directory to avoid hanging + # Use the aprsd/utils directory which is small and safe + import aprsd.utils + + package_name = 'aprsd.utils' + module_name = 'package' + # Get the actual path to aprsd/utils directory + module_path = os.path.dirname(aprsd.utils.__file__) + module_info = package.get_module_info(package_name, module_name, module_path) + # The result should be a list (even if empty) + self.assertIsInstance(module_info, list) + + def test_is_aprsd_package(self): + self.assertTrue(package.is_aprsd_package('aprsd_plugin')) + self.assertFalse(package.is_aprsd_package('other')) + + def test_is_aprsd_extension(self): + self.assertTrue(package.is_aprsd_extension('aprsd_extension_plugin')) + self.assertFalse(package.is_aprsd_extension('other')) + + def test_get_installed_aprsd_items(self): + plugins, extensions = package.get_installed_aprsd_items() + self.assertIsNotNone(plugins) + self.assertIsNotNone(extensions) + + def test_get_installed_plugins(self): + plugins = package.get_installed_plugins() + self.assertIsNotNone(plugins) + + def test_get_installed_extensions(self): + extensions = package.get_installed_extensions() + self.assertIsNotNone(extensions) + + def test_get_pypi_packages(self): + packages = package.get_pypi_packages() + self.assertIsNotNone(packages) + + def test_log_installed_extensions_and_plugins(self): + package.log_installed_extensions_and_plugins() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/threads/test_stats.py b/tests/threads/test_stats.py new file mode 100644 index 0000000..ef137e6 --- /dev/null +++ b/tests/threads/test_stats.py @@ -0,0 +1,149 @@ +import unittest +from unittest import mock + +from aprsd.stats import collector +from aprsd.threads.stats import APRSDStatsStoreThread, StatsStore + + +class TestStatsStore(unittest.TestCase): + """Unit tests for the StatsStore class.""" + + def test_init(self): + """Test StatsStore initialization.""" + ss = StatsStore() + self.assertIsNotNone(ss.lock) + self.assertFalse(hasattr(ss, 'data')) + + def test_add(self): + """Test add method.""" + ss = StatsStore() + test_data = {'test': 'data'} + + ss.add(test_data) + self.assertEqual(ss.data, test_data) + + def test_add_concurrent(self): + """Test add method with concurrent access.""" + import threading + + ss = StatsStore() + test_data = {'test': 'data'} + results = [] + + def add_data(): + ss.add(test_data) + results.append(ss.data) + + # Create multiple threads to test thread safety + threads = [] + for _ in range(5): + t = threading.Thread(target=add_data) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # All threads should have added the data + for result in results: + self.assertEqual(result, test_data) + + +class TestAPRSDStatsStoreThread(unittest.TestCase): + """Unit tests for the APRSDStatsStoreThread class.""" + + def setUp(self): + """Set up test fixtures.""" + # Reset singleton instance + collector.Collector._instance = None + # Clear producers to start fresh + c = collector.Collector() + c.producers = [] + + def tearDown(self): + """Clean up after tests.""" + collector.Collector._instance = None + + def test_init(self): + """Test APRSDStatsStoreThread initialization.""" + thread = APRSDStatsStoreThread() + self.assertEqual(thread.name, 'StatsStore') + self.assertEqual(thread.save_interval, 10) + self.assertTrue(hasattr(thread, 'loop_count')) + + def test_loop_with_save(self): + """Test loop method when save interval is reached.""" + thread = APRSDStatsStoreThread() + + # Mock the collector and save methods + with ( + mock.patch('aprsd.stats.collector.Collector') as mock_collector_class, + mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save, + ): + # Setup mock collector to return some stats + mock_collector_instance = mock.Mock() + mock_collector_instance.collect.return_value = {'test': 'data'} + mock_collector_class.return_value = mock_collector_instance + + # Set loop_count to match save interval + thread.loop_count = 10 + + # Call loop + result = thread.loop() + + # Should return True (continue looping) + self.assertTrue(result) + + # Should have called collect and save + mock_collector_instance.collect.assert_called_once() + mock_save.assert_called_once() + + def test_loop_without_save(self): + """Test loop method when save interval is not reached.""" + thread = APRSDStatsStoreThread() + + # Mock the collector and save methods + with ( + mock.patch('aprsd.stats.collector.Collector') as mock_collector_class, + mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save, + ): + # Setup mock collector to return some stats + mock_collector_instance = mock.Mock() + mock_collector_instance.collect.return_value = {'test': 'data'} + mock_collector_class.return_value = mock_collector_instance + + # Set loop_count to not match save interval + thread.loop_count = 1 + + # Call loop + result = thread.loop() + + # Should return True (continue looping) + self.assertTrue(result) + + # Should not have called save + mock_save.assert_not_called() + + def test_loop_with_exception(self): + """Test loop method when an exception occurs.""" + thread = APRSDStatsStoreThread() + + # Mock the collector to raise an exception + with mock.patch('aprsd.stats.collector.Collector') as mock_collector_class: + mock_collector_instance = mock.Mock() + mock_collector_instance.collect.side_effect = Exception('Test exception') + mock_collector_class.return_value = mock_collector_instance + + # Set loop_count to match save interval + thread.loop_count = 10 + + # Should raise the exception + with self.assertRaises(Exception): + thread.loop() + + # Removed test_loop_count_increment as it's not meaningful to test in isolation + # since the increment happens in the parent run() method, not in loop() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/utils/test_ring_buffer_additional.py b/tests/utils/test_ring_buffer_additional.py new file mode 100644 index 0000000..a84339f --- /dev/null +++ b/tests/utils/test_ring_buffer_additional.py @@ -0,0 +1,172 @@ +import unittest + +from aprsd.utils.ring_buffer import RingBuffer + + +class TestRingBufferAdditional(unittest.TestCase): + """Additional unit tests for the RingBuffer class to cover edge cases.""" + + def test_empty_buffer(self): + """Test behavior with empty buffer.""" + rb = RingBuffer(5) + self.assertEqual(len(rb), 0) + self.assertEqual(rb.get(), []) + + def test_buffer_with_zero_size(self): + """Test buffer with zero size.""" + rb = RingBuffer(0) + # Should not crash, but behavior might be different + # In this implementation, it will behave like a normal list + rb.append(1) + self.assertEqual(len(rb), 1) + self.assertEqual(rb.get(), [1]) + + def test_buffer_with_negative_size(self): + """Test buffer with negative size.""" + # This might not be a valid use case, but let's test it + rb = RingBuffer(-1) + rb.append(1) + self.assertEqual(len(rb), 1) + self.assertEqual(rb.get(), [1]) + + def test_append_none_value(self): + """Test appending None values.""" + rb = RingBuffer(3) + rb.append(None) + rb.append(1) + rb.append(2) + + result = rb.get() + self.assertEqual(len(result), 3) + self.assertIsNone(result[0]) + self.assertEqual(result[1], 1) + self.assertEqual(result[2], 2) + + def test_append_multiple_types(self): + """Test appending multiple different types of values.""" + rb = RingBuffer(4) + rb.append('string') + rb.append(42) + rb.append([1, 2, 3]) + rb.append({'key': 'value'}) + + result = rb.get() + self.assertEqual(len(result), 4) + self.assertEqual(result[0], 'string') + self.assertEqual(result[1], 42) + self.assertEqual(result[2], [1, 2, 3]) + self.assertEqual(result[3], {'key': 'value'}) + + def test_multiple_appends_then_get(self): + """Test multiple appends followed by get operations.""" + rb = RingBuffer(5) + + # Append multiple items + for i in range(10): + rb.append(i) + + # Get should return the last 5 items + result = rb.get() + self.assertEqual(len(result), 5) + self.assertEqual(result, [5, 6, 7, 8, 9]) + + def test_get_returns_copy(self): + """Test that get() returns a copy, not a reference.""" + rb = RingBuffer(3) + rb.append(1) + rb.append(2) + rb.append(3) + + result = rb.get() + # Modify the returned list + result.append(4) + + # Original buffer should not be affected + original = rb.get() + self.assertEqual(len(original), 3) + self.assertNotIn(4, original) + + def test_buffer_size_one(self): + """Test buffer with size 1.""" + rb = RingBuffer(1) + rb.append(1) + self.assertEqual(len(rb), 1) + self.assertEqual(rb.get(), [1]) + + rb.append(2) + self.assertEqual(len(rb), 1) + result = rb.get() + self.assertEqual(len(result), 1) + self.assertEqual(result[0], 2) + + def test_buffer_size_two(self): + """Test buffer with size 2.""" + rb = RingBuffer(2) + rb.append(1) + rb.append(2) + self.assertEqual(len(rb), 2) + self.assertEqual(rb.get(), [1, 2]) + + rb.append(3) + self.assertEqual(len(rb), 2) + result = rb.get() + self.assertEqual(len(result), 2) + self.assertEqual(result[0], 2) + self.assertEqual(result[1], 3) + + def test_large_buffer_size(self): + """Test with a large buffer size.""" + rb = RingBuffer(1000) + for i in range(1000): + rb.append(i) + + result = rb.get() + self.assertEqual(len(result), 1000) + self.assertEqual(result[0], 0) + self.assertEqual(result[-1], 999) + + def test_buffer_with_many_wraparounds(self): + """Test buffer with many wraparounds.""" + rb = RingBuffer(3) + # Fill and wrap multiple times + for i in range(100): + rb.append(i) + + result = rb.get() + self.assertEqual(len(result), 3) + # Should contain the last 3 elements + self.assertEqual(result[0], 97) + self.assertEqual(result[1], 98) + self.assertEqual(result[2], 99) + + def test_multiple_get_calls(self): + """Test multiple get() calls return consistent results.""" + rb = RingBuffer(3) + rb.append(1) + rb.append(2) + rb.append(3) + + result1 = rb.get() + result2 = rb.get() + result3 = rb.get() + + self.assertEqual(result1, result2) + self.assertEqual(result2, result3) + self.assertEqual(result1, [1, 2, 3]) + + def test_get_order_consistency(self): + """Test that get() maintains order consistency.""" + rb = RingBuffer(5) + # Add elements + elements = [1, 2, 3, 4, 5, 6, 7] + for elem in elements: + rb.append(elem) + + result = rb.get() + # Should contain the last 5 elements in correct order + self.assertEqual(len(result), 5) + self.assertEqual(result, [3, 4, 5, 6, 7]) + + +if __name__ == '__main__': + unittest.main()