mirror of
https://github.com/craigerl/aprsd.git
synced 2026-03-31 12:15:34 -04:00
refactor(threads): migrate TX threads to Event-based timing
- PacketSendSchedulerThread: Add daemon=False, replace time.sleep with self.wait - AckSendSchedulerThread: Add daemon=False, replace time.sleep with self.wait - SendPacketThread: Replace time.sleep with self.wait, remove manual loop_count - SendAckThread: Replace time.sleep with self.wait, remove manual loop_count - BeaconSendThread: Set self.period=CONF.beacon_interval, remove counter-based conditional, replace time.sleep with self.wait, remove _loop_cnt tracking - Update tests to use new Event-based API
This commit is contained in:
parent
bc9ce61e59
commit
85ebf8a274
@ -241,6 +241,8 @@ class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
separate thread for each packet.
|
||||
"""
|
||||
|
||||
daemon = False # Non-daemon for graceful packet handling
|
||||
|
||||
def __init__(self, max_workers=5):
|
||||
super().__init__('PacketSendSchedulerThread')
|
||||
self.executor = ThreadPoolExecutor(
|
||||
@ -272,7 +274,7 @@ class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
# The worker will check timing and send if needed
|
||||
self.executor.submit(_send_packet_worker, msg_no)
|
||||
|
||||
time.sleep(1) # Check every second
|
||||
self.wait() # Check every period (default 1 second)
|
||||
return True
|
||||
|
||||
def _cleanup(self):
|
||||
@ -289,6 +291,8 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
separate thread for each ack.
|
||||
"""
|
||||
|
||||
daemon = False # Non-daemon for graceful ACK handling
|
||||
|
||||
def __init__(self, max_workers=3):
|
||||
super().__init__('AckSendSchedulerThread')
|
||||
self.executor = ThreadPoolExecutor(
|
||||
@ -320,7 +324,7 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
# Submit send task to threadpool
|
||||
self.executor.submit(_send_ack_worker, msg_no, self.max_retries)
|
||||
|
||||
time.sleep(1) # Check every second
|
||||
self.wait() # Check every period (default 1 second)
|
||||
return True
|
||||
|
||||
def _cleanup(self):
|
||||
@ -330,8 +334,6 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
|
||||
|
||||
class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
|
||||
def __init__(self, packet):
|
||||
self.packet = packet
|
||||
super().__init__(f'TX-{packet.to_call}-{self.packet.msgNo}')
|
||||
@ -401,14 +403,12 @@ class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
if sent:
|
||||
packet.send_count += 1
|
||||
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
# Make sure we get called again.
|
||||
self.loop_count += 1
|
||||
return True
|
||||
|
||||
|
||||
class SendAckThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
max_retries = 3
|
||||
|
||||
def __init__(self, packet):
|
||||
@ -462,8 +462,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
|
||||
|
||||
self.packet.last_send_time = int(round(time.time()))
|
||||
|
||||
time.sleep(1)
|
||||
self.loop_count += 1
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
|
||||
@ -473,11 +472,9 @@ class BeaconSendThread(aprsd_threads.APRSDThread):
|
||||
Settings are in the [DEFAULT] section of the config file.
|
||||
"""
|
||||
|
||||
_loop_cnt: int = 1
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('BeaconSendThread')
|
||||
self._loop_cnt = 1
|
||||
self.period = CONF.beacon_interval
|
||||
# Make sure Latitude and Longitude are set.
|
||||
if not CONF.latitude or not CONF.longitude:
|
||||
LOG.error(
|
||||
@ -491,25 +488,23 @@ class BeaconSendThread(aprsd_threads.APRSDThread):
|
||||
)
|
||||
|
||||
def loop(self):
|
||||
# Only dump out the stats every N seconds
|
||||
if self._loop_cnt % CONF.beacon_interval == 0:
|
||||
pkt = core.BeaconPacket(
|
||||
from_call=CONF.callsign,
|
||||
to_call='APRS',
|
||||
latitude=float(CONF.latitude),
|
||||
longitude=float(CONF.longitude),
|
||||
comment='APRSD GPS Beacon',
|
||||
symbol=CONF.beacon_symbol,
|
||||
)
|
||||
try:
|
||||
# Only send it once
|
||||
pkt.retry_count = 1
|
||||
send(pkt, direct=True)
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to send beacon: {e}')
|
||||
APRSDClient().reset()
|
||||
time.sleep(5)
|
||||
pkt = core.BeaconPacket(
|
||||
from_call=CONF.callsign,
|
||||
to_call='APRS',
|
||||
latitude=float(CONF.latitude),
|
||||
longitude=float(CONF.longitude),
|
||||
comment='APRSD GPS Beacon',
|
||||
symbol=CONF.beacon_symbol,
|
||||
)
|
||||
try:
|
||||
# Only send it once
|
||||
pkt.retry_count = 1
|
||||
send(pkt, direct=True)
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to send beacon: {e}')
|
||||
APRSDClient().reset()
|
||||
if self.wait(timeout=5):
|
||||
return False
|
||||
|
||||
self._loop_cnt += 1
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
@ -596,9 +596,13 @@ class TestSendPacketThread(unittest.TestCase):
|
||||
tracker.PacketTrack._instance = None
|
||||
self.packet = fake.fake_packet(msg_number='123')
|
||||
self.thread = tx.SendPacketThread(self.packet)
|
||||
# Mock wait to speed up tests
|
||||
self.wait_patcher = mock.patch.object(self.thread, 'wait', return_value=False)
|
||||
self.mock_wait = self.wait_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.wait_patcher.stop()
|
||||
self.thread.stop()
|
||||
if self.thread.is_alive():
|
||||
self.thread.join(timeout=1)
|
||||
@ -608,7 +612,8 @@ class TestSendPacketThread(unittest.TestCase):
|
||||
"""Test initialization."""
|
||||
self.assertEqual(self.thread.packet, self.packet)
|
||||
self.assertIn('TX-', self.thread.name)
|
||||
self.assertEqual(self.thread.loop_count, 1)
|
||||
# loop_count starts at 0 from base class, incremented in run()
|
||||
self.assertEqual(self.thread.loop_count, 0)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_packet_acked(self, mock_tracker_class):
|
||||
@ -761,7 +766,8 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
"""Test initialization."""
|
||||
thread = tx.BeaconSendThread()
|
||||
self.assertEqual(thread.name, 'BeaconSendThread')
|
||||
self.assertEqual(thread._loop_cnt, 1)
|
||||
self.assertEqual(thread.period, 10) # Uses CONF.beacon_interval
|
||||
thread.stop()
|
||||
|
||||
def test_init_no_coordinates(self):
|
||||
"""Test initialization without coordinates."""
|
||||
@ -772,39 +778,27 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
CONF.longitude = None
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
self.assertTrue(thread.thread_stop)
|
||||
self.assertTrue(thread._shutdown_event.is_set())
|
||||
thread.stop()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.send')
|
||||
def test_loop_send_beacon(self, mock_send):
|
||||
"""Test loop() sends beacon at interval."""
|
||||
"""Test loop() sends beacon."""
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.beacon_interval = 1
|
||||
CONF.latitude = 40.7128
|
||||
CONF.longitude = -74.0060
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
thread._loop_cnt = 1
|
||||
# Mock wait to return False (no shutdown)
|
||||
with mock.patch.object(thread, 'wait', return_value=False):
|
||||
result = thread.loop()
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send.assert_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.send')
|
||||
def test_loop_not_time(self, mock_send):
|
||||
"""Test loop() doesn't send before interval."""
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.beacon_interval = 10
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
thread._loop_cnt = 5
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send.assert_not_called()
|
||||
self.assertTrue(result)
|
||||
mock_send.assert_called()
|
||||
thread.stop()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.send')
|
||||
@mock.patch('aprsd.threads.tx.APRSDClient')
|
||||
@ -814,13 +808,17 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.beacon_interval = 1
|
||||
CONF.latitude = 40.7128
|
||||
CONF.longitude = -74.0060
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
thread._loop_cnt = 1
|
||||
mock_send.side_effect = Exception('Send error')
|
||||
|
||||
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
|
||||
result = thread.loop()
|
||||
self.assertTrue(result)
|
||||
mock_log.error.assert_called()
|
||||
mock_client_class.return_value.reset.assert_called()
|
||||
# Mock wait to return False (no shutdown signaled during error wait)
|
||||
with mock.patch.object(thread, 'wait', return_value=False):
|
||||
result = thread.loop()
|
||||
self.assertTrue(result)
|
||||
mock_log.error.assert_called()
|
||||
mock_client_class.return_value.reset.assert_called()
|
||||
thread.stop()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user