From bc9ce61e59111470c7a7537e3c751654acb39cf9 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Tue, 24 Mar 2026 12:13:20 -0400 Subject: [PATCH] refactor(threads): migrate RX threads to Event-based timing - APRSDRXThread: Replace time.sleep with self.wait for interruptible waits - APRSDRXThread.stop(): Use _shutdown_event.set() instead of thread_stop - APRSDRXThread: Error recovery waits check for shutdown signal - APRSDFilterThread: Use queue timeout with self.period for interruptible wait - Remove unused time import - Update tests to use new Event-based API --- aprsd/threads/rx.py | 15 ++++++++------- tests/threads/test_rx.py | 35 ++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 21a7614..2485b16 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -1,7 +1,6 @@ import abc import logging import queue -import time import aprslib from oslo_config import cfg @@ -43,19 +42,19 @@ class APRSDRXThread(APRSDThread): self.packet_queue = packet_queue def stop(self): - self.thread_stop = True + self._shutdown_event.set() if self._client: self._client.close() def loop(self): if not self._client: self._client = APRSDClient() - time.sleep(1) + self.wait(timeout=1) return True if not self._client.is_alive: self._client = APRSDClient() - time.sleep(1) + self.wait(timeout=1) return True # setup the consumer of messages and block until a messages @@ -82,12 +81,14 @@ class APRSDRXThread(APRSDThread): # This will cause a reconnect, next time client.get_client() # is called self._client.reset() - time.sleep(5) + if self.wait(timeout=5): + return False except Exception as ex: LOG.exception(ex) LOG.error('Resetting connection and trying again.') self._client.reset() - time.sleep(5) + if self.wait(timeout=5): + return False return True def process_packet(self, *args, **kwargs): @@ -153,7 +154,7 @@ class APRSDFilterThread(APRSDThread): def loop(self): try: - pkt = self.packet_queue.get(timeout=1) + pkt = self.packet_queue.get(timeout=self.period) self.packet_count += 1 # We use the client here, because the specific # driver may need to decode the packet differently. diff --git a/tests/threads/test_rx.py b/tests/threads/test_rx.py index f2b49d5..4c18059 100644 --- a/tests/threads/test_rx.py +++ b/tests/threads/test_rx.py @@ -15,17 +15,18 @@ class TestAPRSDRXThread(unittest.TestCase): self.packet_queue = queue.Queue() self.rx_thread = rx.APRSDRXThread(self.packet_queue) self.rx_thread.pkt_count = 0 # Reset packet count - # Mock time.sleep to speed up tests - self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep') - self.mock_sleep = self.sleep_patcher.start() + # Mock self.wait to speed up tests + self.wait_patcher = mock.patch.object( + self.rx_thread, 'wait', return_value=False + ) + self.mock_wait = self.wait_patcher.start() def tearDown(self): """Clean up after tests.""" + self.wait_patcher.stop() self.rx_thread.stop() if self.rx_thread.is_alive(): self.rx_thread.join(timeout=1) - # Stop the sleep patcher - self.sleep_patcher.stop() def test_init(self): """Test initialization.""" @@ -39,13 +40,13 @@ class TestAPRSDRXThread(unittest.TestCase): self.rx_thread._client = mock.MagicMock() self.rx_thread.stop() - self.assertTrue(self.rx_thread.thread_stop) + self.assertTrue(self.rx_thread._shutdown_event.is_set()) self.rx_thread._client.close.assert_called() def test_stop_no_client(self): """Test stop() when client is None.""" self.rx_thread.stop() - self.assertTrue(self.rx_thread.thread_stop) + self.assertTrue(self.rx_thread._shutdown_event.is_set()) def test_loop_no_client(self): """Test loop() when client is None.""" @@ -237,18 +238,18 @@ class TestAPRSDFilterThread(unittest.TestCase): """Process packet - required by base class.""" pass + # Mock APRSDClient to avoid config requirements + self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient') + self.mock_client = self.client_patcher.start() + self.filter_thread = TestFilterThread('TestFilterThread', self.packet_queue) - # Mock time.sleep to speed up tests - self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep') - self.mock_sleep = self.sleep_patcher.start() def tearDown(self): """Clean up after tests.""" + self.client_patcher.stop() self.filter_thread.stop() if self.filter_thread.is_alive(): self.filter_thread.join(timeout=1) - # Stop the sleep patcher - self.sleep_patcher.stop() def test_init(self): """Test initialization.""" @@ -330,18 +331,18 @@ class TestAPRSDProcessPacketThread(unittest.TestCase): def process_our_message_packet(self, packet): pass + # Mock APRSDClient to avoid config requirements + self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient') + self.mock_client = self.client_patcher.start() + self.process_thread = ConcreteProcessThread(self.packet_queue) - # Mock time.sleep to speed up tests - self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep') - self.mock_sleep = self.sleep_patcher.start() def tearDown(self): """Clean up after tests.""" + self.client_patcher.stop() self.process_thread.stop() if self.process_thread.is_alive(): self.process_thread.join(timeout=1) - # Stop the sleep patcher - self.sleep_patcher.stop() def test_init(self): """Test initialization."""