From 2a8b7002f24a03fd572c4d1d4890b97095aa2989 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Fri, 16 Jan 2026 23:38:46 -0500 Subject: [PATCH] Added new TX Scheduler and pool. This patch adds a new Send Packet scheduler and Ack Packet send scheduler. This prevents us from creating a new thread for each packet that we send. --- aprsd/threads/tx.py | 243 ++++++++++++++++++++- tests/threads/test_tx.py | 453 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 686 insertions(+), 10 deletions(-) diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index e3da259..e6d9ce6 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -1,6 +1,7 @@ import logging import threading import time +from concurrent.futures import ThreadPoolExecutor import wrapt from oslo_config import cfg @@ -39,6 +40,11 @@ msg_throttle_decorator = decorator.ThrottleDecorator(throttle=msg_t) ack_throttle_decorator = decorator.ThrottleDecorator(throttle=ack_t) s_lock = threading.Lock() +# Global scheduler instances (singletons) +_packet_scheduler = None +_ack_scheduler = None +_scheduler_lock = threading.Lock() + @wrapt.synchronized(s_lock) @msg_throttle_decorator.sleep_and_retry @@ -62,8 +68,15 @@ def send(packet: core.Packet, direct=False, aprs_client=None): @msg_throttle_decorator.sleep_and_retry def _send_packet(packet: core.Packet, direct=False, aprs_client=None): if not direct: - thread = SendPacketThread(packet=packet) - thread.start() + # Use threadpool scheduler instead of creating individual threads + scheduler = _get_packet_scheduler() + if scheduler and scheduler.is_alive(): + # Scheduler will handle the packet + pass + else: + # Fallback to old method if scheduler not available + thread = SendPacketThread(packet=packet) + thread.start() else: _send_direct(packet, aprs_client=aprs_client) @@ -71,12 +84,20 @@ def _send_packet(packet: core.Packet, direct=False, aprs_client=None): @ack_throttle_decorator.sleep_and_retry def _send_ack(packet: core.AckPacket, direct=False, aprs_client=None): if not direct: - thread = SendAckThread(packet=packet) - thread.start() + # Use threadpool scheduler instead of creating individual threads + scheduler = _get_ack_scheduler() + if scheduler and scheduler.is_alive(): + # Scheduler will handle the packet + pass + else: + # Fallback to old method if scheduler not available + thread = SendAckThread(packet=packet) + thread.start() else: _send_direct(packet, aprs_client=aprs_client) +@msg_throttle_decorator.sleep_and_retry def _send_direct(packet, aprs_client=None): if aprs_client: cl = aprs_client @@ -94,6 +115,220 @@ def _send_direct(packet, aprs_client=None): return True +def _get_packet_scheduler(): + """Get or create the packet send scheduler thread (singleton).""" + global _packet_scheduler + with _scheduler_lock: + if _packet_scheduler is None or not _packet_scheduler.is_alive(): + _packet_scheduler = PacketSendSchedulerThread() + _packet_scheduler.start() + return _packet_scheduler + + +def _get_ack_scheduler(): + """Get or create the ack send scheduler thread (singleton).""" + global _ack_scheduler + with _scheduler_lock: + if _ack_scheduler is None or not _ack_scheduler.is_alive(): + _ack_scheduler = AckSendSchedulerThread() + _ack_scheduler.start() + return _ack_scheduler + + +def _send_packet_worker(msg_no: str): + """Worker function for threadpool to send a packet. + + This function checks if the packet needs to be sent and sends it if conditions are met. + Returns True if packet should continue to be tracked, False if done. + """ + pkt_tracker = tracker.PacketTrack() + packet = pkt_tracker.get(msg_no) + + if not packet: + # Packet was acked and removed from tracker + return False + + if packet.send_count >= packet.retry_count: + # Reached max retry count + LOG.info( + f'{packet.__class__.__name__} ' + f'({packet.msgNo}) ' + 'Message Send Complete. Max attempts reached' + f' {packet.retry_count}', + ) + pkt_tracker.remove(packet.msgNo) + return False + + # Check if it's time to send + send_now = False + if packet.last_send_time: + now = int(round(time.time())) + sleeptime = (packet.send_count + 1) * 31 + delta = now - packet.last_send_time + if delta > sleeptime: + send_now = True + else: + send_now = True + + if send_now: + packet.last_send_time = int(round(time.time())) + sent = False + try: + sent = _send_direct(packet) + except Exception as ex: + LOG.error(f'Failed to send packet: {packet}') + LOG.error(ex) + else: + if sent: + packet.send_count += 1 + + return True + + +def _send_ack_worker(msg_no: str, max_retries: int): + """Worker function for threadpool to send an ack packet. + + This function checks if the ack needs to be sent and sends it if conditions are met. + Returns True if ack should continue to be tracked, False if done. + """ + pkt_tracker = tracker.PacketTrack() + packet = pkt_tracker.get(msg_no) + + if not packet: + # Packet was removed from tracker + return False + + if packet.send_count >= max_retries: + LOG.debug( + f'{packet.__class__.__name__}' + f'({packet.msgNo}) ' + 'Send Complete. Max attempts reached' + f' {max_retries}', + ) + return False + + # Check if it's time to send + send_now = False + if packet.last_send_time: + now = int(round(time.time())) + sleep_time = 31 + delta = now - packet.last_send_time + if delta > sleep_time: + send_now = True + else: + # No previous send time, send immediately + send_now = True + + if send_now: + sent = False + try: + sent = _send_direct(packet) + except Exception: + LOG.error(f'Failed to send packet: {packet}') + else: + if sent: + packet.send_count += 1 + packet.last_send_time = int(round(time.time())) + + return True + + +class PacketSendSchedulerThread(aprsd_threads.APRSDThread): + """Scheduler thread that uses a threadpool to send packets. + + This thread periodically checks all packets in PacketTrack and submits + send tasks to a threadpool executor, avoiding the need to create a + separate thread for each packet. + """ + + def __init__(self, max_workers=5): + super().__init__('PacketSendSchedulerThread') + self.executor = ThreadPoolExecutor( + max_workers=max_workers, thread_name_prefix='PacketSendWorker' + ) + self.max_workers = max_workers + + def loop(self): + """Check all tracked packets and submit send tasks to threadpool.""" + pkt_tracker = tracker.PacketTrack() + + # Check all packets in the tracker + for msg_no in list(pkt_tracker.keys()): + packet = pkt_tracker.get(msg_no) + if not packet: + # Packet was acked, skip it + continue + + # Skip AckPackets - they're handled by AckSendSchedulerThread + if isinstance(packet, core.AckPacket): + continue + + # Check if packet is still being tracked (not acked) + if packet.send_count >= packet.retry_count: + # Max retries reached, will be cleaned up by worker + continue + + # Submit send task to threadpool + # The worker will check timing and send if needed + self.executor.submit(_send_packet_worker, msg_no) + + time.sleep(1) # Check every second + return True + + def _cleanup(self): + """Cleanup threadpool executor on thread shutdown.""" + LOG.debug('Shutting down PacketSendSchedulerThread executor') + self.executor.shutdown(wait=True) + + +class AckSendSchedulerThread(aprsd_threads.APRSDThread): + """Scheduler thread that uses a threadpool to send ack packets. + + This thread periodically checks all ack packets in PacketTrack and submits + send tasks to a threadpool executor, avoiding the need to create a + separate thread for each ack. + """ + + def __init__(self, max_workers=3): + super().__init__('AckSendSchedulerThread') + self.executor = ThreadPoolExecutor( + max_workers=max_workers, thread_name_prefix='AckSendWorker' + ) + self.max_workers = max_workers + self.max_retries = CONF.default_ack_send_count + + def loop(self): + """Check all tracked ack packets and submit send tasks to threadpool.""" + pkt_tracker = tracker.PacketTrack() + + # Check all packets in the tracker that are acks + for msg_no in list(pkt_tracker.keys()): + packet = pkt_tracker.get(msg_no) + if not packet: + # Packet was removed, skip it + continue + + # Only process AckPackets + if not isinstance(packet, core.AckPacket): + continue + + # Check if ack is still being tracked + if packet.send_count >= self.max_retries: + # Max retries reached, will be cleaned up by worker + continue + + # Submit send task to threadpool + self.executor.submit(_send_ack_worker, msg_no, self.max_retries) + + time.sleep(1) # Check every second + return True + + def _cleanup(self): + """Cleanup threadpool executor on thread shutdown.""" + LOG.debug('Shutting down AckSendSchedulerThread executor') + self.executor.shutdown(wait=True) + + class SendPacketThread(aprsd_threads.APRSDThread): loop_count: int = 1 diff --git a/tests/threads/test_tx.py b/tests/threads/test_tx.py index 157b89a..8ff03a2 100644 --- a/tests/threads/test_tx.py +++ b/tests/threads/test_tx.py @@ -15,10 +15,24 @@ class TestSendFunctions(unittest.TestCase): """Set up test fixtures.""" # Reset singleton instances tracker.PacketTrack._instance = None + # Reset scheduler instances + tx._packet_scheduler = None + tx._ack_scheduler = None def tearDown(self): """Clean up after tests.""" tracker.PacketTrack._instance = None + # Clean up schedulers + if tx._packet_scheduler: + tx._packet_scheduler.stop() + if tx._packet_scheduler.is_alive(): + tx._packet_scheduler.join(timeout=1) + if tx._ack_scheduler: + tx._ack_scheduler.stop() + if tx._ack_scheduler.is_alive(): + tx._ack_scheduler.join(timeout=1) + tx._packet_scheduler = None + tx._ack_scheduler = None @mock.patch('aprsd.threads.tx.collector.PacketCollector') @mock.patch('aprsd.threads.tx._send_packet') @@ -66,10 +80,28 @@ class TestSendFunctions(unittest.TestCase): mock_log.info.assert_called() mock_send_ack.assert_not_called() - @mock.patch('aprsd.threads.tx.SendPacketThread') - def test_send_packet_threaded(self, mock_thread_class): - """Test _send_packet() with threading.""" + @mock.patch('aprsd.threads.tx._get_packet_scheduler') + def test_send_packet_threaded(self, mock_get_scheduler): + """Test _send_packet() uses scheduler.""" packet = fake.fake_packet() + mock_scheduler = mock.MagicMock() + mock_scheduler.is_alive.return_value = True + mock_get_scheduler.return_value = mock_scheduler + + tx._send_packet(packet, direct=False) + + mock_get_scheduler.assert_called() + # Scheduler should be alive and will handle the packet + self.assertTrue(mock_scheduler.is_alive()) + + @mock.patch('aprsd.threads.tx.SendPacketThread') + @mock.patch('aprsd.threads.tx._get_packet_scheduler') + def test_send_packet_fallback(self, mock_get_scheduler, mock_thread_class): + """Test _send_packet() falls back to old method if scheduler not available.""" + packet = fake.fake_packet() + mock_scheduler = mock.MagicMock() + mock_scheduler.is_alive.return_value = False + mock_get_scheduler.return_value = mock_scheduler mock_thread = mock.MagicMock() mock_thread_class.return_value = mock_thread @@ -85,10 +117,28 @@ class TestSendFunctions(unittest.TestCase): tx._send_packet(packet, direct=True) mock_send_direct.assert_called_with(packet, aprs_client=None) - @mock.patch('aprsd.threads.tx.SendAckThread') - def test_send_ack_threaded(self, mock_thread_class): - """Test _send_ack() with threading.""" + @mock.patch('aprsd.threads.tx._get_ack_scheduler') + def test_send_ack_threaded(self, mock_get_scheduler): + """Test _send_ack() uses scheduler.""" packet = fake.fake_ack_packet() + mock_scheduler = mock.MagicMock() + mock_scheduler.is_alive.return_value = True + mock_get_scheduler.return_value = mock_scheduler + + tx._send_ack(packet, direct=False) + + mock_get_scheduler.assert_called() + # Scheduler should be alive and will handle the packet + self.assertTrue(mock_scheduler.is_alive()) + + @mock.patch('aprsd.threads.tx.SendAckThread') + @mock.patch('aprsd.threads.tx._get_ack_scheduler') + def test_send_ack_fallback(self, mock_get_scheduler, mock_thread_class): + """Test _send_ack() falls back to old method if scheduler not available.""" + packet = fake.fake_ack_packet() + mock_scheduler = mock.MagicMock() + mock_scheduler.is_alive.return_value = False + mock_get_scheduler.return_value = mock_scheduler mock_thread = mock.MagicMock() mock_thread_class.return_value = mock_thread @@ -146,6 +196,397 @@ class TestSendFunctions(unittest.TestCase): self.assertFalse(result) mock_log_error.error.assert_called() + @mock.patch('aprsd.threads.tx.PacketSendSchedulerThread') + def test_get_packet_scheduler_creates_new(self, mock_scheduler_class): + """Test _get_packet_scheduler() creates new scheduler if none exists.""" + tx._packet_scheduler = None + mock_scheduler = mock.MagicMock() + mock_scheduler_class.return_value = mock_scheduler + + result = tx._get_packet_scheduler() + + mock_scheduler_class.assert_called_once() + mock_scheduler.start.assert_called_once() + self.assertEqual(result, mock_scheduler) + + @mock.patch('aprsd.threads.tx.PacketSendSchedulerThread') + def test_get_packet_scheduler_reuses_existing(self, mock_scheduler_class): + """Test _get_packet_scheduler() reuses existing scheduler if alive.""" + existing_scheduler = mock.MagicMock() + existing_scheduler.is_alive.return_value = True + tx._packet_scheduler = existing_scheduler + + result = tx._get_packet_scheduler() + + mock_scheduler_class.assert_not_called() + self.assertEqual(result, existing_scheduler) + + @mock.patch('aprsd.threads.tx.PacketSendSchedulerThread') + def test_get_packet_scheduler_recreates_if_dead(self, mock_scheduler_class): + """Test _get_packet_scheduler() recreates scheduler if dead.""" + dead_scheduler = mock.MagicMock() + dead_scheduler.is_alive.return_value = False + tx._packet_scheduler = dead_scheduler + new_scheduler = mock.MagicMock() + mock_scheduler_class.return_value = new_scheduler + + result = tx._get_packet_scheduler() + + mock_scheduler_class.assert_called_once() + new_scheduler.start.assert_called_once() + self.assertEqual(result, new_scheduler) + + @mock.patch('aprsd.threads.tx.AckSendSchedulerThread') + def test_get_ack_scheduler_creates_new(self, mock_scheduler_class): + """Test _get_ack_scheduler() creates new scheduler if none exists.""" + tx._ack_scheduler = None + mock_scheduler = mock.MagicMock() + mock_scheduler_class.return_value = mock_scheduler + + result = tx._get_ack_scheduler() + + mock_scheduler_class.assert_called_once() + mock_scheduler.start.assert_called_once() + self.assertEqual(result, mock_scheduler) + + +class TestPacketWorkers(unittest.TestCase): + """Unit tests for worker functions used by threadpool.""" + + def setUp(self): + """Set up test fixtures.""" + tracker.PacketTrack._instance = None + + def tearDown(self): + """Clean up after tests.""" + tracker.PacketTrack._instance = None + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_send_packet_worker_packet_acked(self, mock_tracker_class): + """Test _send_packet_worker() when packet is acked.""" + mock_tracker = mock.MagicMock() + mock_tracker.get.return_value = None # Packet removed = acked + mock_tracker_class.return_value = mock_tracker + + result = tx._send_packet_worker('123') + self.assertFalse(result) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_send_packet_worker_max_retries(self, mock_tracker_class): + """Test _send_packet_worker() when max retries reached.""" + mock_tracker = mock.MagicMock() + tracked_packet = fake.fake_packet(msg_number='123') + tracked_packet.send_count = 3 + tracked_packet.retry_count = 3 + mock_tracker.get.return_value = tracked_packet + mock_tracker_class.return_value = mock_tracker + + with mock.patch('aprsd.threads.tx.LOG') as mock_log: + result = tx._send_packet_worker('123') + self.assertFalse(result) + mock_log.info.assert_called() + mock_tracker.remove.assert_called() + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + @mock.patch('aprsd.threads.tx._send_direct') + def test_send_packet_worker_send_now(self, mock_send_direct, mock_tracker_class): + """Test _send_packet_worker() when it's time to send.""" + mock_tracker = mock.MagicMock() + tracked_packet = fake.fake_packet(msg_number='123') + tracked_packet.send_count = 0 + tracked_packet.retry_count = 3 + tracked_packet.last_send_time = None + mock_tracker.get.return_value = tracked_packet + mock_tracker_class.return_value = mock_tracker + + mock_send_direct.return_value = True + + result = tx._send_packet_worker('123') + + self.assertTrue(result) + mock_send_direct.assert_called() + self.assertEqual(tracked_packet.send_count, 1) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + @mock.patch('aprsd.threads.tx._send_direct') + def test_send_packet_worker_send_failed(self, mock_send_direct, mock_tracker_class): + """Test _send_packet_worker() when send fails.""" + mock_tracker = mock.MagicMock() + tracked_packet = fake.fake_packet(msg_number='123') + tracked_packet.send_count = 0 + tracked_packet.retry_count = 3 + tracked_packet.last_send_time = None + mock_tracker.get.return_value = tracked_packet + mock_tracker_class.return_value = mock_tracker + + mock_send_direct.return_value = False + + result = tx._send_packet_worker('123') + + self.assertTrue(result) + self.assertEqual( + tracked_packet.send_count, 0 + ) # Should not increment on failure + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_send_ack_worker_packet_removed(self, mock_tracker_class): + """Test _send_ack_worker() when packet is removed.""" + mock_tracker = mock.MagicMock() + mock_tracker.get.return_value = None + mock_tracker_class.return_value = mock_tracker + + result = tx._send_ack_worker('123', 3) + self.assertFalse(result) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_send_ack_worker_max_retries(self, mock_tracker_class): + """Test _send_ack_worker() when max retries reached.""" + mock_tracker = mock.MagicMock() + tracked_packet = fake.fake_ack_packet() + tracked_packet.send_count = 3 + mock_tracker.get.return_value = tracked_packet + mock_tracker_class.return_value = mock_tracker + + with mock.patch('aprsd.threads.tx.LOG') as mock_log: + result = tx._send_ack_worker('123', 3) + self.assertFalse(result) + mock_log.debug.assert_called() + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + @mock.patch('aprsd.threads.tx._send_direct') + def test_send_ack_worker_send_now(self, mock_send_direct, mock_tracker_class): + """Test _send_ack_worker() when it's time to send.""" + mock_tracker = mock.MagicMock() + tracked_packet = fake.fake_ack_packet() + tracked_packet.send_count = 0 + tracked_packet.last_send_time = None + mock_tracker.get.return_value = tracked_packet + mock_tracker_class.return_value = mock_tracker + + mock_send_direct.return_value = True + + result = tx._send_ack_worker('123', 3) + + self.assertTrue(result) + mock_send_direct.assert_called() + self.assertEqual(tracked_packet.send_count, 1) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + @mock.patch('aprsd.threads.tx._send_direct') + def test_send_ack_worker_waiting(self, mock_send_direct, mock_tracker_class): + """Test _send_ack_worker() when waiting for next send.""" + mock_tracker = mock.MagicMock() + tracked_packet = fake.fake_ack_packet() + tracked_packet.send_count = 0 + tracked_packet.last_send_time = int(time.time()) - 10 # Too soon + mock_tracker.get.return_value = tracked_packet + mock_tracker_class.return_value = mock_tracker + + mock_send_direct.return_value = True + + result = tx._send_ack_worker('123', 3) + + self.assertTrue(result) + mock_send_direct.assert_not_called() + + +class TestPacketSendSchedulerThread(unittest.TestCase): + """Unit tests for PacketSendSchedulerThread class.""" + + def setUp(self): + """Set up test fixtures.""" + tracker.PacketTrack._instance = None + self.scheduler = tx.PacketSendSchedulerThread(max_workers=2) + + def tearDown(self): + """Clean up after tests.""" + self.scheduler.stop() + if self.scheduler.is_alive(): + self.scheduler.join(timeout=1) + self.scheduler.executor.shutdown(wait=False) + tracker.PacketTrack._instance = None + + def test_init(self): + """Test initialization.""" + self.assertEqual(self.scheduler.name, 'PacketSendSchedulerThread') + self.assertEqual(self.scheduler.max_workers, 2) + self.assertIsNotNone(self.scheduler.executor) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_submits_tasks(self, mock_tracker_class): + """Test loop() submits tasks to threadpool.""" + mock_tracker = mock.MagicMock() + packet1 = fake.fake_packet(msg_number='123') + packet1.send_count = 0 + packet1.retry_count = 3 + packet2 = fake.fake_packet(msg_number='456') + packet2.send_count = 0 + packet2.retry_count = 3 + mock_tracker.keys.return_value = ['123', '456'] + mock_tracker.get.side_effect = lambda x: packet1 if x == '123' else packet2 + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should submit tasks for both packets + self.assertEqual(mock_submit.call_count, 2) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_skips_acked_packets(self, mock_tracker_class): + """Test loop() skips packets that are acked.""" + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = None # Packet acked + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should not submit task for acked packet + mock_submit.assert_not_called() + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_skips_ack_packets(self, mock_tracker_class): + """Test loop() skips AckPackets.""" + mock_tracker = mock.MagicMock() + ack_packet = fake.fake_ack_packet() + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = ack_packet + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should not submit task for ack packet + mock_submit.assert_not_called() + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_skips_max_retries(self, mock_tracker_class): + """Test loop() skips packets at max retries.""" + mock_tracker = mock.MagicMock() + packet = fake.fake_packet(msg_number='123') + packet.send_count = 3 + packet.retry_count = 3 + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = packet + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should not submit task for packet at max retries + mock_submit.assert_not_called() + + def test_cleanup(self): + """Test _cleanup() shuts down executor.""" + with mock.patch.object(self.scheduler.executor, 'shutdown') as mock_shutdown: + with mock.patch('aprsd.threads.tx.LOG') as mock_log: + self.scheduler._cleanup() + mock_shutdown.assert_called_once_with(wait=True) + mock_log.debug.assert_called() + + +class TestAckSendSchedulerThread(unittest.TestCase): + """Unit tests for AckSendSchedulerThread class.""" + + def setUp(self): + """Set up test fixtures.""" + from oslo_config import cfg + + CONF = cfg.CONF + CONF.default_ack_send_count = 3 + tracker.PacketTrack._instance = None + self.scheduler = tx.AckSendSchedulerThread(max_workers=2) + + def tearDown(self): + """Clean up after tests.""" + self.scheduler.stop() + if self.scheduler.is_alive(): + self.scheduler.join(timeout=1) + self.scheduler.executor.shutdown(wait=False) + tracker.PacketTrack._instance = None + + def test_init(self): + """Test initialization.""" + self.assertEqual(self.scheduler.name, 'AckSendSchedulerThread') + self.assertEqual(self.scheduler.max_workers, 2) + self.assertEqual(self.scheduler.max_retries, 3) + self.assertIsNotNone(self.scheduler.executor) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_submits_tasks(self, mock_tracker_class): + """Test loop() submits tasks to threadpool.""" + mock_tracker = mock.MagicMock() + ack_packet1 = fake.fake_ack_packet() + ack_packet1.send_count = 0 + ack_packet2 = fake.fake_ack_packet() + ack_packet2.send_count = 0 + mock_tracker.keys.return_value = ['123', '456'] + mock_tracker.get.side_effect = ( + lambda x: ack_packet1 if x == '123' else ack_packet2 + ) + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should submit tasks for both ack packets + self.assertEqual(mock_submit.call_count, 2) + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_skips_non_ack_packets(self, mock_tracker_class): + """Test loop() skips non-AckPackets.""" + mock_tracker = mock.MagicMock() + regular_packet = fake.fake_packet() + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = regular_packet + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should not submit task for non-ack packet + mock_submit.assert_not_called() + + @mock.patch('aprsd.threads.tx.tracker.PacketTrack') + def test_loop_skips_max_retries(self, mock_tracker_class): + """Test loop() skips acks at max retries.""" + mock_tracker = mock.MagicMock() + ack_packet = fake.fake_ack_packet() + ack_packet.send_count = 3 + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = ack_packet + mock_tracker_class.return_value = mock_tracker + + # Mock the executor's submit method + with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit: + result = self.scheduler.loop() + + self.assertTrue(result) + # Should not submit task for ack at max retries + mock_submit.assert_not_called() + + def test_cleanup(self): + """Test _cleanup() shuts down executor.""" + with mock.patch.object(self.scheduler.executor, 'shutdown') as mock_shutdown: + with mock.patch('aprsd.threads.tx.LOG') as mock_log: + self.scheduler._cleanup() + mock_shutdown.assert_called_once_with(wait=True) + mock_log.debug.assert_called() + class TestSendPacketThread(unittest.TestCase): """Unit tests for the SendPacketThread class."""