1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-04-01 12:45:33 -04:00

refactor(threads): migrate RX threads to Event-based timing

- APRSDRXThread: Replace time.sleep with self.wait for interruptible waits
- APRSDRXThread.stop(): Use _shutdown_event.set() instead of thread_stop
- APRSDRXThread: Error recovery waits check for shutdown signal
- APRSDFilterThread: Use queue timeout with self.period for interruptible wait
- Remove unused time import
- Update tests to use new Event-based API
This commit is contained in:
Walter Boring 2026-03-24 12:13:20 -04:00
parent 343ec3e81c
commit bc9ce61e59
2 changed files with 26 additions and 24 deletions

View File

@ -1,7 +1,6 @@
import abc
import logging
import queue
import time
import aprslib
from oslo_config import cfg
@ -43,19 +42,19 @@ class APRSDRXThread(APRSDThread):
self.packet_queue = packet_queue
def stop(self):
self.thread_stop = True
self._shutdown_event.set()
if self._client:
self._client.close()
def loop(self):
if not self._client:
self._client = APRSDClient()
time.sleep(1)
self.wait(timeout=1)
return True
if not self._client.is_alive:
self._client = APRSDClient()
time.sleep(1)
self.wait(timeout=1)
return True
# setup the consumer of messages and block until a messages
@ -82,12 +81,14 @@ class APRSDRXThread(APRSDThread):
# This will cause a reconnect, next time client.get_client()
# is called
self._client.reset()
time.sleep(5)
if self.wait(timeout=5):
return False
except Exception as ex:
LOG.exception(ex)
LOG.error('Resetting connection and trying again.')
self._client.reset()
time.sleep(5)
if self.wait(timeout=5):
return False
return True
def process_packet(self, *args, **kwargs):
@ -153,7 +154,7 @@ class APRSDFilterThread(APRSDThread):
def loop(self):
try:
pkt = self.packet_queue.get(timeout=1)
pkt = self.packet_queue.get(timeout=self.period)
self.packet_count += 1
# We use the client here, because the specific
# driver may need to decode the packet differently.

View File

@ -15,17 +15,18 @@ class TestAPRSDRXThread(unittest.TestCase):
self.packet_queue = queue.Queue()
self.rx_thread = rx.APRSDRXThread(self.packet_queue)
self.rx_thread.pkt_count = 0 # Reset packet count
# Mock time.sleep to speed up tests
self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
self.mock_sleep = self.sleep_patcher.start()
# Mock self.wait to speed up tests
self.wait_patcher = mock.patch.object(
self.rx_thread, 'wait', return_value=False
)
self.mock_wait = self.wait_patcher.start()
def tearDown(self):
"""Clean up after tests."""
self.wait_patcher.stop()
self.rx_thread.stop()
if self.rx_thread.is_alive():
self.rx_thread.join(timeout=1)
# Stop the sleep patcher
self.sleep_patcher.stop()
def test_init(self):
"""Test initialization."""
@ -39,13 +40,13 @@ class TestAPRSDRXThread(unittest.TestCase):
self.rx_thread._client = mock.MagicMock()
self.rx_thread.stop()
self.assertTrue(self.rx_thread.thread_stop)
self.assertTrue(self.rx_thread._shutdown_event.is_set())
self.rx_thread._client.close.assert_called()
def test_stop_no_client(self):
"""Test stop() when client is None."""
self.rx_thread.stop()
self.assertTrue(self.rx_thread.thread_stop)
self.assertTrue(self.rx_thread._shutdown_event.is_set())
def test_loop_no_client(self):
"""Test loop() when client is None."""
@ -237,18 +238,18 @@ class TestAPRSDFilterThread(unittest.TestCase):
"""Process packet - required by base class."""
pass
# Mock APRSDClient to avoid config requirements
self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient')
self.mock_client = self.client_patcher.start()
self.filter_thread = TestFilterThread('TestFilterThread', self.packet_queue)
# Mock time.sleep to speed up tests
self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
self.mock_sleep = self.sleep_patcher.start()
def tearDown(self):
"""Clean up after tests."""
self.client_patcher.stop()
self.filter_thread.stop()
if self.filter_thread.is_alive():
self.filter_thread.join(timeout=1)
# Stop the sleep patcher
self.sleep_patcher.stop()
def test_init(self):
"""Test initialization."""
@ -330,18 +331,18 @@ class TestAPRSDProcessPacketThread(unittest.TestCase):
def process_our_message_packet(self, packet):
pass
# Mock APRSDClient to avoid config requirements
self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient')
self.mock_client = self.client_patcher.start()
self.process_thread = ConcreteProcessThread(self.packet_queue)
# Mock time.sleep to speed up tests
self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
self.mock_sleep = self.sleep_patcher.start()
def tearDown(self):
"""Clean up after tests."""
self.client_patcher.stop()
self.process_thread.stop()
if self.process_thread.is_alive():
self.process_thread.join(timeout=1)
# Stop the sleep patcher
self.sleep_patcher.stop()
def test_init(self):
"""Test initialization."""