From 274d5af0e99466f6fdd5c0409fc7c527008f433c Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Wed, 14 Jan 2026 15:00:14 -0500 Subject: [PATCH] Refactored RX thread to not parse packets The Main RX Thread that runs the client.consumer() call used to parse packets as soon as it got them. This lead to an iffecient strategy for listen and acquire packets as fast as possible. The APRSDRXThread now gets the raw packet from the client and shoves it on the packet_queue. The other threads that are looking for packets on the packet_queue will parse the raw packet with aprslib. This allows us to capture packets as quickly as we can, and then process those packets in the secondary threads. This prevents a bottleneck capturing packets. --- aprsd/client/drivers/aprsis.py | 9 ++++- aprsd/client/drivers/fake.py | 22 +++++++----- aprsd/threads/rx.py | 64 +++++++++++++++++++++++----------- aprsd/threads/stats.py | 5 ++- tests/threads/test_rx.py | 25 +++++++++---- 5 files changed, 87 insertions(+), 38 deletions(-) diff --git a/aprsd/client/drivers/aprsis.py b/aprsd/client/drivers/aprsis.py index aa8ee9c..7c3c20d 100644 --- a/aprsd/client/drivers/aprsis.py +++ b/aprsd/client/drivers/aprsis.py @@ -3,6 +3,7 @@ import logging import time from typing import Callable +import aprslib from aprslib.exceptions import LoginError from loguru import logger from oslo_config import cfg @@ -167,7 +168,13 @@ class APRSISDriver: def decode_packet(self, *args, **kwargs): """APRS lib already decodes this.""" - return core.factory(args[0]) + if not args: + LOG.warning('No frame received to decode?!?!') + return None + # If args[0] is already a dict (already parsed), pass it directly to factory + if isinstance(args[0], dict): + return core.factory(args[0]) + return core.factory(aprslib.parse(args[0])) def consumer(self, callback: Callable, raw: bool = False): if self._client and self.connected: diff --git a/aprsd/client/drivers/fake.py b/aprsd/client/drivers/fake.py index c203e46..e3cbc92 100644 --- a/aprsd/client/drivers/fake.py +++ b/aprsd/client/drivers/fake.py @@ -103,16 +103,20 @@ class APRSDFakeDriver(metaclass=trace.TraceWrapperMetaclass): def decode_packet(self, *args, **kwargs): """APRS lib already decodes this.""" - if not kwargs: + # If packet is provided in kwargs, return it directly + if 'packet' in kwargs: + return kwargs['packet'] + # If raw is provided in kwargs, use it + if 'raw' in kwargs: + return core.factory(aprslib.parse(kwargs['raw'])) + # Otherwise, use args[0] if available + if not args: + LOG.warning('No frame received to decode?!?!') return None - - if kwargs.get('packet'): - return kwargs.get('packet') - - if kwargs.get('raw'): - pkt_raw = aprslib.parse(kwargs.get('raw')) - pkt = core.factory(pkt_raw) - return pkt + # If args[0] is already a dict (already parsed), pass it directly to factory + if isinstance(args[0], dict): + return core.factory(args[0]) + return core.factory(aprslib.parse(args[0])) def stats(self, serializable: bool = False) -> dict: return { diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 3a22319..f91d574 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -8,7 +8,7 @@ from oslo_config import cfg from aprsd import packets, plugin from aprsd.client.client import APRSDClient -from aprsd.packets import collector, filter +from aprsd.packets import collector, core, filter from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx @@ -17,12 +17,11 @@ LOG = logging.getLogger('APRSD') class APRSDRXThread(APRSDThread): - """Main Class to connect to an APRS Client and recieve packets. + """ + Thread to receive packets from the APRS Client and put them on the packet queue. - A packet is received in the main loop and then sent to the - process_packet method, which sends the packet through the collector - to track the packet for stats, and then put into the packet queue - for processing in a separate thread. + Args: + packet_queue: The queue to put the packets in. """ _client = None @@ -34,7 +33,12 @@ class APRSDRXThread(APRSDThread): pkt_count = 0 - def __init__(self, packet_queue): + def __init__(self, packet_queue: queue.Queue): + """Initialize the APRSDRXThread. + + Args: + packet_queue: The queue to put the packets in. + """ super().__init__('RX_PKT') self.packet_queue = packet_queue @@ -67,7 +71,7 @@ class APRSDRXThread(APRSDThread): # https://github.com/rossengeorgiev/aprs-python/pull/56 self._client.consumer( self.process_packet, - raw=False, + raw=True, ) except ( aprslib.exceptions.ConnectionDrop, @@ -87,27 +91,38 @@ class APRSDRXThread(APRSDThread): return True def process_packet(self, *args, **kwargs): - """Convert the raw packet into a Packet object and put it on the queue. + """Put the raw packet on the queue. The processing of the packet will happen in a separate thread. """ - packet = self._client.decode_packet(*args, **kwargs) - if not packet: - LOG.error( - 'No packet received from decode_packet. Most likely a failure to parse' - ) + if not args: + LOG.warning('No frame received to process?!?!') return self.pkt_count += 1 - self.packet_queue.put(packet) + self.packet_queue.put(args[0]) class APRSDFilterThread(APRSDThread): - def __init__(self, thread_name, packet_queue): + """ + Thread to filter packets on the packet queue. + Args: + thread_name: The name of the thread. + packet_queue: The queue to get the packets from. + """ + + def __init__(self, thread_name: str, packet_queue: queue.Queue): + """Initialize the APRSDFilterThread. + + Args: + thread_name: The name of the thread. + packet_queue: The queue to get the packets from. + """ super().__init__(thread_name) self.packet_queue = packet_queue self.packet_count = 0 + self._client = APRSDClient() - def filter_packet(self, packet): + def filter_packet(self, packet: type[core.Packet]) -> type[core.Packet] | None: # Do any packet filtering prior to processing if not filter.PacketFilter().filter(packet): return None @@ -124,8 +139,14 @@ class APRSDFilterThread(APRSDThread): def loop(self): try: - packet = self.packet_queue.get(timeout=1) + pkt = self.packet_queue.get(timeout=1) self.packet_count += 1 + # We use the client here, because the specific + # driver may need to decode the packet differently. + packet = self._client.decode_packet(pkt) + if not packet: + LOG.error(f'Packet failed to parse. "{pkt}"') + return True self.print_packet(packet) if packet: if self.filter_packet(packet): @@ -150,7 +171,7 @@ class APRSDProcessPacketThread(APRSDFilterThread): will ack a message before sending the packet to the subclass for processing.""" - def __init__(self, packet_queue): + def __init__(self, packet_queue: queue.Queue): super().__init__('ProcessPKT', packet_queue=packet_queue) if not CONF.enable_sending_ack_packets: LOG.warning( @@ -251,7 +272,10 @@ class APRSDProcessPacketThread(APRSDFilterThread): class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): """Process the packet through the plugin manager. - This is the main aprsd server plugin processing thread.""" + This is the main aprsd server plugin processing thread. + Args: + packet_queue: The queue to get the packets from. + """ def process_other_packet(self, packet, for_us=False): pm = plugin.PluginManager() diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py index 7a891db..290d3e3 100644 --- a/aprsd/threads/stats.py +++ b/aprsd/threads/stats.py @@ -131,6 +131,8 @@ class StatsLogThread(APRSDThread): ) total_ranks = len(sorted_callsigns) for rank, (callsign, count) in enumerate(sorted_callsigns, 1): + # Calculate percentage of this callsign compared to total RX + percentage = (count / total_rx * 100) if total_rx > 0 else 0.0 # Use different colors based on rank: most packets (rank 1) = red, # least packets (last rank) = green, middle = yellow if rank == 1: @@ -142,7 +144,8 @@ class StatsLogThread(APRSDThread): LOGU.opt(colors=True).info( f' {rank:2d}. ' f'{callsign:<12}: ' - f'<{count_color_tag}>{count:6d} packets', + f'<{count_color_tag}>{count:6d} packets ' + f'({percentage:5.1f}%)', ) time.sleep(1) diff --git a/tests/threads/test_rx.py b/tests/threads/test_rx.py index ffc3fcc..ed557bf 100644 --- a/tests/threads/test_rx.py +++ b/tests/threads/test_rx.py @@ -154,21 +154,26 @@ class TestAPRSDRXThread(unittest.TestCase): mock_list_instance.find.side_effect = KeyError('Not found') mock_pkt_list.return_value = mock_list_instance - self.rx_thread.process_packet() + # Pass raw packet string as args[0] + self.rx_thread.process_packet(packet.raw) self.assertEqual(self.rx_thread.pkt_count, 1) self.assertFalse(self.packet_queue.empty()) + # Verify the raw string is on the queue + queued_raw = self.packet_queue.get() + self.assertEqual(queued_raw, packet.raw) def test_process_packet_no_packet(self): - """Test process_packet() when decode returns None.""" + """Test process_packet() when no frame is received.""" mock_client = MockClientDriver() mock_client._decode_packet_return = None self.rx_thread._client = mock_client self.rx_thread.pkt_count = 0 with mock.patch('aprsd.threads.rx.LOG') as mock_log: + # Call without args to trigger warning self.rx_thread.process_packet() - mock_log.error.assert_called() + mock_log.warning.assert_called() self.assertEqual(self.rx_thread.pkt_count, 0) def test_process_packet_ack_packet(self): @@ -180,10 +185,14 @@ class TestAPRSDRXThread(unittest.TestCase): self.rx_thread.pkt_count = 0 with mock.patch('aprsd.threads.rx.packet_log'): - self.rx_thread.process_packet() + # Pass raw packet string as args[0] + self.rx_thread.process_packet(packet.raw) self.assertEqual(self.rx_thread.pkt_count, 1) self.assertFalse(self.packet_queue.empty()) + # Verify the raw string is on the queue + queued_raw = self.packet_queue.get() + self.assertEqual(queued_raw, packet.raw) def test_process_packet_duplicate(self): """Test process_packet() with duplicate packet. @@ -201,12 +210,14 @@ class TestAPRSDRXThread(unittest.TestCase): self.rx_thread.pkt_count = 0 with mock.patch('aprsd.threads.rx.packet_log'): - self.rx_thread.process_packet() + # Pass raw packet string as args[0] + self.rx_thread.process_packet(packet.raw) # The rx thread puts all packets on the queue regardless of duplicates # Duplicate filtering happens in the filter thread self.assertFalse(self.packet_queue.empty()) - queued_packet = self.packet_queue.get() - self.assertEqual(queued_packet, packet) + queued_raw = self.packet_queue.get() + # Verify the raw string is on the queue + self.assertEqual(queued_raw, packet.raw) class TestAPRSDFilterThread(unittest.TestCase):