mirror of
https://github.com/craigerl/aprsd.git
synced 2026-01-26 15:35:53 -05:00
Added new TX Scheduler and pool.
This patch adds a new Send Packet scheduler and Ack Packet send scheduler. This prevents us from creating a new thread for each packet that we send.
This commit is contained in:
parent
0c1a074697
commit
2a8b7002f2
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import wrapt
|
||||
from oslo_config import cfg
|
||||
@ -39,6 +40,11 @@ msg_throttle_decorator = decorator.ThrottleDecorator(throttle=msg_t)
|
||||
ack_throttle_decorator = decorator.ThrottleDecorator(throttle=ack_t)
|
||||
s_lock = threading.Lock()
|
||||
|
||||
# Global scheduler instances (singletons)
|
||||
_packet_scheduler = None
|
||||
_ack_scheduler = None
|
||||
_scheduler_lock = threading.Lock()
|
||||
|
||||
|
||||
@wrapt.synchronized(s_lock)
|
||||
@msg_throttle_decorator.sleep_and_retry
|
||||
@ -62,8 +68,15 @@ def send(packet: core.Packet, direct=False, aprs_client=None):
|
||||
@msg_throttle_decorator.sleep_and_retry
|
||||
def _send_packet(packet: core.Packet, direct=False, aprs_client=None):
|
||||
if not direct:
|
||||
thread = SendPacketThread(packet=packet)
|
||||
thread.start()
|
||||
# Use threadpool scheduler instead of creating individual threads
|
||||
scheduler = _get_packet_scheduler()
|
||||
if scheduler and scheduler.is_alive():
|
||||
# Scheduler will handle the packet
|
||||
pass
|
||||
else:
|
||||
# Fallback to old method if scheduler not available
|
||||
thread = SendPacketThread(packet=packet)
|
||||
thread.start()
|
||||
else:
|
||||
_send_direct(packet, aprs_client=aprs_client)
|
||||
|
||||
@ -71,12 +84,20 @@ def _send_packet(packet: core.Packet, direct=False, aprs_client=None):
|
||||
@ack_throttle_decorator.sleep_and_retry
|
||||
def _send_ack(packet: core.AckPacket, direct=False, aprs_client=None):
|
||||
if not direct:
|
||||
thread = SendAckThread(packet=packet)
|
||||
thread.start()
|
||||
# Use threadpool scheduler instead of creating individual threads
|
||||
scheduler = _get_ack_scheduler()
|
||||
if scheduler and scheduler.is_alive():
|
||||
# Scheduler will handle the packet
|
||||
pass
|
||||
else:
|
||||
# Fallback to old method if scheduler not available
|
||||
thread = SendAckThread(packet=packet)
|
||||
thread.start()
|
||||
else:
|
||||
_send_direct(packet, aprs_client=aprs_client)
|
||||
|
||||
|
||||
@msg_throttle_decorator.sleep_and_retry
|
||||
def _send_direct(packet, aprs_client=None):
|
||||
if aprs_client:
|
||||
cl = aprs_client
|
||||
@ -94,6 +115,220 @@ def _send_direct(packet, aprs_client=None):
|
||||
return True
|
||||
|
||||
|
||||
def _get_packet_scheduler():
|
||||
"""Get or create the packet send scheduler thread (singleton)."""
|
||||
global _packet_scheduler
|
||||
with _scheduler_lock:
|
||||
if _packet_scheduler is None or not _packet_scheduler.is_alive():
|
||||
_packet_scheduler = PacketSendSchedulerThread()
|
||||
_packet_scheduler.start()
|
||||
return _packet_scheduler
|
||||
|
||||
|
||||
def _get_ack_scheduler():
|
||||
"""Get or create the ack send scheduler thread (singleton)."""
|
||||
global _ack_scheduler
|
||||
with _scheduler_lock:
|
||||
if _ack_scheduler is None or not _ack_scheduler.is_alive():
|
||||
_ack_scheduler = AckSendSchedulerThread()
|
||||
_ack_scheduler.start()
|
||||
return _ack_scheduler
|
||||
|
||||
|
||||
def _send_packet_worker(msg_no: str):
|
||||
"""Worker function for threadpool to send a packet.
|
||||
|
||||
This function checks if the packet needs to be sent and sends it if conditions are met.
|
||||
Returns True if packet should continue to be tracked, False if done.
|
||||
"""
|
||||
pkt_tracker = tracker.PacketTrack()
|
||||
packet = pkt_tracker.get(msg_no)
|
||||
|
||||
if not packet:
|
||||
# Packet was acked and removed from tracker
|
||||
return False
|
||||
|
||||
if packet.send_count >= packet.retry_count:
|
||||
# Reached max retry count
|
||||
LOG.info(
|
||||
f'{packet.__class__.__name__} '
|
||||
f'({packet.msgNo}) '
|
||||
'Message Send Complete. Max attempts reached'
|
||||
f' {packet.retry_count}',
|
||||
)
|
||||
pkt_tracker.remove(packet.msgNo)
|
||||
return False
|
||||
|
||||
# Check if it's time to send
|
||||
send_now = False
|
||||
if packet.last_send_time:
|
||||
now = int(round(time.time()))
|
||||
sleeptime = (packet.send_count + 1) * 31
|
||||
delta = now - packet.last_send_time
|
||||
if delta > sleeptime:
|
||||
send_now = True
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
packet.last_send_time = int(round(time.time()))
|
||||
sent = False
|
||||
try:
|
||||
sent = _send_direct(packet)
|
||||
except Exception as ex:
|
||||
LOG.error(f'Failed to send packet: {packet}')
|
||||
LOG.error(ex)
|
||||
else:
|
||||
if sent:
|
||||
packet.send_count += 1
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _send_ack_worker(msg_no: str, max_retries: int):
|
||||
"""Worker function for threadpool to send an ack packet.
|
||||
|
||||
This function checks if the ack needs to be sent and sends it if conditions are met.
|
||||
Returns True if ack should continue to be tracked, False if done.
|
||||
"""
|
||||
pkt_tracker = tracker.PacketTrack()
|
||||
packet = pkt_tracker.get(msg_no)
|
||||
|
||||
if not packet:
|
||||
# Packet was removed from tracker
|
||||
return False
|
||||
|
||||
if packet.send_count >= max_retries:
|
||||
LOG.debug(
|
||||
f'{packet.__class__.__name__}'
|
||||
f'({packet.msgNo}) '
|
||||
'Send Complete. Max attempts reached'
|
||||
f' {max_retries}',
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if it's time to send
|
||||
send_now = False
|
||||
if packet.last_send_time:
|
||||
now = int(round(time.time()))
|
||||
sleep_time = 31
|
||||
delta = now - packet.last_send_time
|
||||
if delta > sleep_time:
|
||||
send_now = True
|
||||
else:
|
||||
# No previous send time, send immediately
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
sent = False
|
||||
try:
|
||||
sent = _send_direct(packet)
|
||||
except Exception:
|
||||
LOG.error(f'Failed to send packet: {packet}')
|
||||
else:
|
||||
if sent:
|
||||
packet.send_count += 1
|
||||
packet.last_send_time = int(round(time.time()))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
"""Scheduler thread that uses a threadpool to send packets.
|
||||
|
||||
This thread periodically checks all packets in PacketTrack and submits
|
||||
send tasks to a threadpool executor, avoiding the need to create a
|
||||
separate thread for each packet.
|
||||
"""
|
||||
|
||||
def __init__(self, max_workers=5):
|
||||
super().__init__('PacketSendSchedulerThread')
|
||||
self.executor = ThreadPoolExecutor(
|
||||
max_workers=max_workers, thread_name_prefix='PacketSendWorker'
|
||||
)
|
||||
self.max_workers = max_workers
|
||||
|
||||
def loop(self):
|
||||
"""Check all tracked packets and submit send tasks to threadpool."""
|
||||
pkt_tracker = tracker.PacketTrack()
|
||||
|
||||
# Check all packets in the tracker
|
||||
for msg_no in list(pkt_tracker.keys()):
|
||||
packet = pkt_tracker.get(msg_no)
|
||||
if not packet:
|
||||
# Packet was acked, skip it
|
||||
continue
|
||||
|
||||
# Skip AckPackets - they're handled by AckSendSchedulerThread
|
||||
if isinstance(packet, core.AckPacket):
|
||||
continue
|
||||
|
||||
# Check if packet is still being tracked (not acked)
|
||||
if packet.send_count >= packet.retry_count:
|
||||
# Max retries reached, will be cleaned up by worker
|
||||
continue
|
||||
|
||||
# Submit send task to threadpool
|
||||
# The worker will check timing and send if needed
|
||||
self.executor.submit(_send_packet_worker, msg_no)
|
||||
|
||||
time.sleep(1) # Check every second
|
||||
return True
|
||||
|
||||
def _cleanup(self):
|
||||
"""Cleanup threadpool executor on thread shutdown."""
|
||||
LOG.debug('Shutting down PacketSendSchedulerThread executor')
|
||||
self.executor.shutdown(wait=True)
|
||||
|
||||
|
||||
class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
"""Scheduler thread that uses a threadpool to send ack packets.
|
||||
|
||||
This thread periodically checks all ack packets in PacketTrack and submits
|
||||
send tasks to a threadpool executor, avoiding the need to create a
|
||||
separate thread for each ack.
|
||||
"""
|
||||
|
||||
def __init__(self, max_workers=3):
|
||||
super().__init__('AckSendSchedulerThread')
|
||||
self.executor = ThreadPoolExecutor(
|
||||
max_workers=max_workers, thread_name_prefix='AckSendWorker'
|
||||
)
|
||||
self.max_workers = max_workers
|
||||
self.max_retries = CONF.default_ack_send_count
|
||||
|
||||
def loop(self):
|
||||
"""Check all tracked ack packets and submit send tasks to threadpool."""
|
||||
pkt_tracker = tracker.PacketTrack()
|
||||
|
||||
# Check all packets in the tracker that are acks
|
||||
for msg_no in list(pkt_tracker.keys()):
|
||||
packet = pkt_tracker.get(msg_no)
|
||||
if not packet:
|
||||
# Packet was removed, skip it
|
||||
continue
|
||||
|
||||
# Only process AckPackets
|
||||
if not isinstance(packet, core.AckPacket):
|
||||
continue
|
||||
|
||||
# Check if ack is still being tracked
|
||||
if packet.send_count >= self.max_retries:
|
||||
# Max retries reached, will be cleaned up by worker
|
||||
continue
|
||||
|
||||
# Submit send task to threadpool
|
||||
self.executor.submit(_send_ack_worker, msg_no, self.max_retries)
|
||||
|
||||
time.sleep(1) # Check every second
|
||||
return True
|
||||
|
||||
def _cleanup(self):
|
||||
"""Cleanup threadpool executor on thread shutdown."""
|
||||
LOG.debug('Shutting down AckSendSchedulerThread executor')
|
||||
self.executor.shutdown(wait=True)
|
||||
|
||||
|
||||
class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
|
||||
|
||||
@ -15,10 +15,24 @@ class TestSendFunctions(unittest.TestCase):
|
||||
"""Set up test fixtures."""
|
||||
# Reset singleton instances
|
||||
tracker.PacketTrack._instance = None
|
||||
# Reset scheduler instances
|
||||
tx._packet_scheduler = None
|
||||
tx._ack_scheduler = None
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
tracker.PacketTrack._instance = None
|
||||
# Clean up schedulers
|
||||
if tx._packet_scheduler:
|
||||
tx._packet_scheduler.stop()
|
||||
if tx._packet_scheduler.is_alive():
|
||||
tx._packet_scheduler.join(timeout=1)
|
||||
if tx._ack_scheduler:
|
||||
tx._ack_scheduler.stop()
|
||||
if tx._ack_scheduler.is_alive():
|
||||
tx._ack_scheduler.join(timeout=1)
|
||||
tx._packet_scheduler = None
|
||||
tx._ack_scheduler = None
|
||||
|
||||
@mock.patch('aprsd.threads.tx.collector.PacketCollector')
|
||||
@mock.patch('aprsd.threads.tx._send_packet')
|
||||
@ -66,10 +80,28 @@ class TestSendFunctions(unittest.TestCase):
|
||||
mock_log.info.assert_called()
|
||||
mock_send_ack.assert_not_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.SendPacketThread')
|
||||
def test_send_packet_threaded(self, mock_thread_class):
|
||||
"""Test _send_packet() with threading."""
|
||||
@mock.patch('aprsd.threads.tx._get_packet_scheduler')
|
||||
def test_send_packet_threaded(self, mock_get_scheduler):
|
||||
"""Test _send_packet() uses scheduler."""
|
||||
packet = fake.fake_packet()
|
||||
mock_scheduler = mock.MagicMock()
|
||||
mock_scheduler.is_alive.return_value = True
|
||||
mock_get_scheduler.return_value = mock_scheduler
|
||||
|
||||
tx._send_packet(packet, direct=False)
|
||||
|
||||
mock_get_scheduler.assert_called()
|
||||
# Scheduler should be alive and will handle the packet
|
||||
self.assertTrue(mock_scheduler.is_alive())
|
||||
|
||||
@mock.patch('aprsd.threads.tx.SendPacketThread')
|
||||
@mock.patch('aprsd.threads.tx._get_packet_scheduler')
|
||||
def test_send_packet_fallback(self, mock_get_scheduler, mock_thread_class):
|
||||
"""Test _send_packet() falls back to old method if scheduler not available."""
|
||||
packet = fake.fake_packet()
|
||||
mock_scheduler = mock.MagicMock()
|
||||
mock_scheduler.is_alive.return_value = False
|
||||
mock_get_scheduler.return_value = mock_scheduler
|
||||
mock_thread = mock.MagicMock()
|
||||
mock_thread_class.return_value = mock_thread
|
||||
|
||||
@ -85,10 +117,28 @@ class TestSendFunctions(unittest.TestCase):
|
||||
tx._send_packet(packet, direct=True)
|
||||
mock_send_direct.assert_called_with(packet, aprs_client=None)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.SendAckThread')
|
||||
def test_send_ack_threaded(self, mock_thread_class):
|
||||
"""Test _send_ack() with threading."""
|
||||
@mock.patch('aprsd.threads.tx._get_ack_scheduler')
|
||||
def test_send_ack_threaded(self, mock_get_scheduler):
|
||||
"""Test _send_ack() uses scheduler."""
|
||||
packet = fake.fake_ack_packet()
|
||||
mock_scheduler = mock.MagicMock()
|
||||
mock_scheduler.is_alive.return_value = True
|
||||
mock_get_scheduler.return_value = mock_scheduler
|
||||
|
||||
tx._send_ack(packet, direct=False)
|
||||
|
||||
mock_get_scheduler.assert_called()
|
||||
# Scheduler should be alive and will handle the packet
|
||||
self.assertTrue(mock_scheduler.is_alive())
|
||||
|
||||
@mock.patch('aprsd.threads.tx.SendAckThread')
|
||||
@mock.patch('aprsd.threads.tx._get_ack_scheduler')
|
||||
def test_send_ack_fallback(self, mock_get_scheduler, mock_thread_class):
|
||||
"""Test _send_ack() falls back to old method if scheduler not available."""
|
||||
packet = fake.fake_ack_packet()
|
||||
mock_scheduler = mock.MagicMock()
|
||||
mock_scheduler.is_alive.return_value = False
|
||||
mock_get_scheduler.return_value = mock_scheduler
|
||||
mock_thread = mock.MagicMock()
|
||||
mock_thread_class.return_value = mock_thread
|
||||
|
||||
@ -146,6 +196,397 @@ class TestSendFunctions(unittest.TestCase):
|
||||
self.assertFalse(result)
|
||||
mock_log_error.error.assert_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.PacketSendSchedulerThread')
|
||||
def test_get_packet_scheduler_creates_new(self, mock_scheduler_class):
|
||||
"""Test _get_packet_scheduler() creates new scheduler if none exists."""
|
||||
tx._packet_scheduler = None
|
||||
mock_scheduler = mock.MagicMock()
|
||||
mock_scheduler_class.return_value = mock_scheduler
|
||||
|
||||
result = tx._get_packet_scheduler()
|
||||
|
||||
mock_scheduler_class.assert_called_once()
|
||||
mock_scheduler.start.assert_called_once()
|
||||
self.assertEqual(result, mock_scheduler)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.PacketSendSchedulerThread')
|
||||
def test_get_packet_scheduler_reuses_existing(self, mock_scheduler_class):
|
||||
"""Test _get_packet_scheduler() reuses existing scheduler if alive."""
|
||||
existing_scheduler = mock.MagicMock()
|
||||
existing_scheduler.is_alive.return_value = True
|
||||
tx._packet_scheduler = existing_scheduler
|
||||
|
||||
result = tx._get_packet_scheduler()
|
||||
|
||||
mock_scheduler_class.assert_not_called()
|
||||
self.assertEqual(result, existing_scheduler)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.PacketSendSchedulerThread')
|
||||
def test_get_packet_scheduler_recreates_if_dead(self, mock_scheduler_class):
|
||||
"""Test _get_packet_scheduler() recreates scheduler if dead."""
|
||||
dead_scheduler = mock.MagicMock()
|
||||
dead_scheduler.is_alive.return_value = False
|
||||
tx._packet_scheduler = dead_scheduler
|
||||
new_scheduler = mock.MagicMock()
|
||||
mock_scheduler_class.return_value = new_scheduler
|
||||
|
||||
result = tx._get_packet_scheduler()
|
||||
|
||||
mock_scheduler_class.assert_called_once()
|
||||
new_scheduler.start.assert_called_once()
|
||||
self.assertEqual(result, new_scheduler)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.AckSendSchedulerThread')
|
||||
def test_get_ack_scheduler_creates_new(self, mock_scheduler_class):
|
||||
"""Test _get_ack_scheduler() creates new scheduler if none exists."""
|
||||
tx._ack_scheduler = None
|
||||
mock_scheduler = mock.MagicMock()
|
||||
mock_scheduler_class.return_value = mock_scheduler
|
||||
|
||||
result = tx._get_ack_scheduler()
|
||||
|
||||
mock_scheduler_class.assert_called_once()
|
||||
mock_scheduler.start.assert_called_once()
|
||||
self.assertEqual(result, mock_scheduler)
|
||||
|
||||
|
||||
class TestPacketWorkers(unittest.TestCase):
|
||||
"""Unit tests for worker functions used by threadpool."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
tracker.PacketTrack._instance = None
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
tracker.PacketTrack._instance = None
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_send_packet_worker_packet_acked(self, mock_tracker_class):
|
||||
"""Test _send_packet_worker() when packet is acked."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.get.return_value = None # Packet removed = acked
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
result = tx._send_packet_worker('123')
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_send_packet_worker_max_retries(self, mock_tracker_class):
|
||||
"""Test _send_packet_worker() when max retries reached."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
tracked_packet = fake.fake_packet(msg_number='123')
|
||||
tracked_packet.send_count = 3
|
||||
tracked_packet.retry_count = 3
|
||||
mock_tracker.get.return_value = tracked_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
|
||||
result = tx._send_packet_worker('123')
|
||||
self.assertFalse(result)
|
||||
mock_log.info.assert_called()
|
||||
mock_tracker.remove.assert_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
@mock.patch('aprsd.threads.tx._send_direct')
|
||||
def test_send_packet_worker_send_now(self, mock_send_direct, mock_tracker_class):
|
||||
"""Test _send_packet_worker() when it's time to send."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
tracked_packet = fake.fake_packet(msg_number='123')
|
||||
tracked_packet.send_count = 0
|
||||
tracked_packet.retry_count = 3
|
||||
tracked_packet.last_send_time = None
|
||||
mock_tracker.get.return_value = tracked_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
mock_send_direct.return_value = True
|
||||
|
||||
result = tx._send_packet_worker('123')
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send_direct.assert_called()
|
||||
self.assertEqual(tracked_packet.send_count, 1)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
@mock.patch('aprsd.threads.tx._send_direct')
|
||||
def test_send_packet_worker_send_failed(self, mock_send_direct, mock_tracker_class):
|
||||
"""Test _send_packet_worker() when send fails."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
tracked_packet = fake.fake_packet(msg_number='123')
|
||||
tracked_packet.send_count = 0
|
||||
tracked_packet.retry_count = 3
|
||||
tracked_packet.last_send_time = None
|
||||
mock_tracker.get.return_value = tracked_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
mock_send_direct.return_value = False
|
||||
|
||||
result = tx._send_packet_worker('123')
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertEqual(
|
||||
tracked_packet.send_count, 0
|
||||
) # Should not increment on failure
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_send_ack_worker_packet_removed(self, mock_tracker_class):
|
||||
"""Test _send_ack_worker() when packet is removed."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.get.return_value = None
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
result = tx._send_ack_worker('123', 3)
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_send_ack_worker_max_retries(self, mock_tracker_class):
|
||||
"""Test _send_ack_worker() when max retries reached."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
tracked_packet = fake.fake_ack_packet()
|
||||
tracked_packet.send_count = 3
|
||||
mock_tracker.get.return_value = tracked_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
|
||||
result = tx._send_ack_worker('123', 3)
|
||||
self.assertFalse(result)
|
||||
mock_log.debug.assert_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
@mock.patch('aprsd.threads.tx._send_direct')
|
||||
def test_send_ack_worker_send_now(self, mock_send_direct, mock_tracker_class):
|
||||
"""Test _send_ack_worker() when it's time to send."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
tracked_packet = fake.fake_ack_packet()
|
||||
tracked_packet.send_count = 0
|
||||
tracked_packet.last_send_time = None
|
||||
mock_tracker.get.return_value = tracked_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
mock_send_direct.return_value = True
|
||||
|
||||
result = tx._send_ack_worker('123', 3)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send_direct.assert_called()
|
||||
self.assertEqual(tracked_packet.send_count, 1)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
@mock.patch('aprsd.threads.tx._send_direct')
|
||||
def test_send_ack_worker_waiting(self, mock_send_direct, mock_tracker_class):
|
||||
"""Test _send_ack_worker() when waiting for next send."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
tracked_packet = fake.fake_ack_packet()
|
||||
tracked_packet.send_count = 0
|
||||
tracked_packet.last_send_time = int(time.time()) - 10 # Too soon
|
||||
mock_tracker.get.return_value = tracked_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
mock_send_direct.return_value = True
|
||||
|
||||
result = tx._send_ack_worker('123', 3)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send_direct.assert_not_called()
|
||||
|
||||
|
||||
class TestPacketSendSchedulerThread(unittest.TestCase):
|
||||
"""Unit tests for PacketSendSchedulerThread class."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
tracker.PacketTrack._instance = None
|
||||
self.scheduler = tx.PacketSendSchedulerThread(max_workers=2)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.scheduler.stop()
|
||||
if self.scheduler.is_alive():
|
||||
self.scheduler.join(timeout=1)
|
||||
self.scheduler.executor.shutdown(wait=False)
|
||||
tracker.PacketTrack._instance = None
|
||||
|
||||
def test_init(self):
|
||||
"""Test initialization."""
|
||||
self.assertEqual(self.scheduler.name, 'PacketSendSchedulerThread')
|
||||
self.assertEqual(self.scheduler.max_workers, 2)
|
||||
self.assertIsNotNone(self.scheduler.executor)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_submits_tasks(self, mock_tracker_class):
|
||||
"""Test loop() submits tasks to threadpool."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
packet1 = fake.fake_packet(msg_number='123')
|
||||
packet1.send_count = 0
|
||||
packet1.retry_count = 3
|
||||
packet2 = fake.fake_packet(msg_number='456')
|
||||
packet2.send_count = 0
|
||||
packet2.retry_count = 3
|
||||
mock_tracker.keys.return_value = ['123', '456']
|
||||
mock_tracker.get.side_effect = lambda x: packet1 if x == '123' else packet2
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should submit tasks for both packets
|
||||
self.assertEqual(mock_submit.call_count, 2)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_skips_acked_packets(self, mock_tracker_class):
|
||||
"""Test loop() skips packets that are acked."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = None # Packet acked
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should not submit task for acked packet
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_skips_ack_packets(self, mock_tracker_class):
|
||||
"""Test loop() skips AckPackets."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
ack_packet = fake.fake_ack_packet()
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = ack_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should not submit task for ack packet
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_skips_max_retries(self, mock_tracker_class):
|
||||
"""Test loop() skips packets at max retries."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
packet = fake.fake_packet(msg_number='123')
|
||||
packet.send_count = 3
|
||||
packet.retry_count = 3
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should not submit task for packet at max retries
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
def test_cleanup(self):
|
||||
"""Test _cleanup() shuts down executor."""
|
||||
with mock.patch.object(self.scheduler.executor, 'shutdown') as mock_shutdown:
|
||||
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
|
||||
self.scheduler._cleanup()
|
||||
mock_shutdown.assert_called_once_with(wait=True)
|
||||
mock_log.debug.assert_called()
|
||||
|
||||
|
||||
class TestAckSendSchedulerThread(unittest.TestCase):
|
||||
"""Unit tests for AckSendSchedulerThread class."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.default_ack_send_count = 3
|
||||
tracker.PacketTrack._instance = None
|
||||
self.scheduler = tx.AckSendSchedulerThread(max_workers=2)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.scheduler.stop()
|
||||
if self.scheduler.is_alive():
|
||||
self.scheduler.join(timeout=1)
|
||||
self.scheduler.executor.shutdown(wait=False)
|
||||
tracker.PacketTrack._instance = None
|
||||
|
||||
def test_init(self):
|
||||
"""Test initialization."""
|
||||
self.assertEqual(self.scheduler.name, 'AckSendSchedulerThread')
|
||||
self.assertEqual(self.scheduler.max_workers, 2)
|
||||
self.assertEqual(self.scheduler.max_retries, 3)
|
||||
self.assertIsNotNone(self.scheduler.executor)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_submits_tasks(self, mock_tracker_class):
|
||||
"""Test loop() submits tasks to threadpool."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
ack_packet1 = fake.fake_ack_packet()
|
||||
ack_packet1.send_count = 0
|
||||
ack_packet2 = fake.fake_ack_packet()
|
||||
ack_packet2.send_count = 0
|
||||
mock_tracker.keys.return_value = ['123', '456']
|
||||
mock_tracker.get.side_effect = (
|
||||
lambda x: ack_packet1 if x == '123' else ack_packet2
|
||||
)
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should submit tasks for both ack packets
|
||||
self.assertEqual(mock_submit.call_count, 2)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_skips_non_ack_packets(self, mock_tracker_class):
|
||||
"""Test loop() skips non-AckPackets."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
regular_packet = fake.fake_packet()
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = regular_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should not submit task for non-ack packet
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_skips_max_retries(self, mock_tracker_class):
|
||||
"""Test loop() skips acks at max retries."""
|
||||
mock_tracker = mock.MagicMock()
|
||||
ack_packet = fake.fake_ack_packet()
|
||||
ack_packet.send_count = 3
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = ack_packet
|
||||
mock_tracker_class.return_value = mock_tracker
|
||||
|
||||
# Mock the executor's submit method
|
||||
with mock.patch.object(self.scheduler.executor, 'submit') as mock_submit:
|
||||
result = self.scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should not submit task for ack at max retries
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
def test_cleanup(self):
|
||||
"""Test _cleanup() shuts down executor."""
|
||||
with mock.patch.object(self.scheduler.executor, 'shutdown') as mock_shutdown:
|
||||
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
|
||||
self.scheduler._cleanup()
|
||||
mock_shutdown.assert_called_once_with(wait=True)
|
||||
mock_log.debug.assert_called()
|
||||
|
||||
|
||||
class TestSendPacketThread(unittest.TestCase):
|
||||
"""Unit tests for the SendPacketThread class."""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user