1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-01-29 22:42:18 -05:00

Refactored RX thread to not parse packets

The Main RX Thread that runs the client.consumer() call used to
parse packets as soon as it got them.  This lead to an iffecient
strategy for listen and acquire packets as fast as possible.
The APRSDRXThread now gets the raw packet from the client and
shoves it on the packet_queue.  The other threads that are looking
for packets on the packet_queue will parse the raw packet with
aprslib.  This allows us to capture packets as quickly as we can,
and then process those packets in the secondary threads.
This prevents a bottleneck capturing packets.
This commit is contained in:
Walter Boring 2026-01-14 15:00:14 -05:00
parent 0620e63e72
commit 274d5af0e9
5 changed files with 87 additions and 38 deletions

View File

@ -3,6 +3,7 @@ import logging
import time
from typing import Callable
import aprslib
from aprslib.exceptions import LoginError
from loguru import logger
from oslo_config import cfg
@ -167,7 +168,13 @@ class APRSISDriver:
def decode_packet(self, *args, **kwargs):
"""APRS lib already decodes this."""
return core.factory(args[0])
if not args:
LOG.warning('No frame received to decode?!?!')
return None
# If args[0] is already a dict (already parsed), pass it directly to factory
if isinstance(args[0], dict):
return core.factory(args[0])
return core.factory(aprslib.parse(args[0]))
def consumer(self, callback: Callable, raw: bool = False):
if self._client and self.connected:

View File

@ -103,16 +103,20 @@ class APRSDFakeDriver(metaclass=trace.TraceWrapperMetaclass):
def decode_packet(self, *args, **kwargs):
"""APRS lib already decodes this."""
if not kwargs:
# If packet is provided in kwargs, return it directly
if 'packet' in kwargs:
return kwargs['packet']
# If raw is provided in kwargs, use it
if 'raw' in kwargs:
return core.factory(aprslib.parse(kwargs['raw']))
# Otherwise, use args[0] if available
if not args:
LOG.warning('No frame received to decode?!?!')
return None
if kwargs.get('packet'):
return kwargs.get('packet')
if kwargs.get('raw'):
pkt_raw = aprslib.parse(kwargs.get('raw'))
pkt = core.factory(pkt_raw)
return pkt
# If args[0] is already a dict (already parsed), pass it directly to factory
if isinstance(args[0], dict):
return core.factory(args[0])
return core.factory(aprslib.parse(args[0]))
def stats(self, serializable: bool = False) -> dict:
return {

View File

@ -8,7 +8,7 @@ from oslo_config import cfg
from aprsd import packets, plugin
from aprsd.client.client import APRSDClient
from aprsd.packets import collector, filter
from aprsd.packets import collector, core, filter
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
@ -17,12 +17,11 @@ LOG = logging.getLogger('APRSD')
class APRSDRXThread(APRSDThread):
"""Main Class to connect to an APRS Client and recieve packets.
"""
Thread to receive packets from the APRS Client and put them on the packet queue.
A packet is received in the main loop and then sent to the
process_packet method, which sends the packet through the collector
to track the packet for stats, and then put into the packet queue
for processing in a separate thread.
Args:
packet_queue: The queue to put the packets in.
"""
_client = None
@ -34,7 +33,12 @@ class APRSDRXThread(APRSDThread):
pkt_count = 0
def __init__(self, packet_queue):
def __init__(self, packet_queue: queue.Queue):
"""Initialize the APRSDRXThread.
Args:
packet_queue: The queue to put the packets in.
"""
super().__init__('RX_PKT')
self.packet_queue = packet_queue
@ -67,7 +71,7 @@ class APRSDRXThread(APRSDThread):
# https://github.com/rossengeorgiev/aprs-python/pull/56
self._client.consumer(
self.process_packet,
raw=False,
raw=True,
)
except (
aprslib.exceptions.ConnectionDrop,
@ -87,27 +91,38 @@ class APRSDRXThread(APRSDThread):
return True
def process_packet(self, *args, **kwargs):
"""Convert the raw packet into a Packet object and put it on the queue.
"""Put the raw packet on the queue.
The processing of the packet will happen in a separate thread.
"""
packet = self._client.decode_packet(*args, **kwargs)
if not packet:
LOG.error(
'No packet received from decode_packet. Most likely a failure to parse'
)
if not args:
LOG.warning('No frame received to process?!?!')
return
self.pkt_count += 1
self.packet_queue.put(packet)
self.packet_queue.put(args[0])
class APRSDFilterThread(APRSDThread):
def __init__(self, thread_name, packet_queue):
"""
Thread to filter packets on the packet queue.
Args:
thread_name: The name of the thread.
packet_queue: The queue to get the packets from.
"""
def __init__(self, thread_name: str, packet_queue: queue.Queue):
"""Initialize the APRSDFilterThread.
Args:
thread_name: The name of the thread.
packet_queue: The queue to get the packets from.
"""
super().__init__(thread_name)
self.packet_queue = packet_queue
self.packet_count = 0
self._client = APRSDClient()
def filter_packet(self, packet):
def filter_packet(self, packet: type[core.Packet]) -> type[core.Packet] | None:
# Do any packet filtering prior to processing
if not filter.PacketFilter().filter(packet):
return None
@ -124,8 +139,14 @@ class APRSDFilterThread(APRSDThread):
def loop(self):
try:
packet = self.packet_queue.get(timeout=1)
pkt = self.packet_queue.get(timeout=1)
self.packet_count += 1
# We use the client here, because the specific
# driver may need to decode the packet differently.
packet = self._client.decode_packet(pkt)
if not packet:
LOG.error(f'Packet failed to parse. "{pkt}"')
return True
self.print_packet(packet)
if packet:
if self.filter_packet(packet):
@ -150,7 +171,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
will ack a message before sending the packet to the subclass
for processing."""
def __init__(self, packet_queue):
def __init__(self, packet_queue: queue.Queue):
super().__init__('ProcessPKT', packet_queue=packet_queue)
if not CONF.enable_sending_ack_packets:
LOG.warning(
@ -251,7 +272,10 @@ class APRSDProcessPacketThread(APRSDFilterThread):
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
"""Process the packet through the plugin manager.
This is the main aprsd server plugin processing thread."""
This is the main aprsd server plugin processing thread.
Args:
packet_queue: The queue to get the packets from.
"""
def process_other_packet(self, packet, for_us=False):
pm = plugin.PluginManager()

View File

@ -131,6 +131,8 @@ class StatsLogThread(APRSDThread):
)
total_ranks = len(sorted_callsigns)
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
# Calculate percentage of this callsign compared to total RX
percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
# Use different colors based on rank: most packets (rank 1) = red,
# least packets (last rank) = green, middle = yellow
if rank == 1:
@ -142,7 +144,8 @@ class StatsLogThread(APRSDThread):
LOGU.opt(colors=True).info(
f' <cyan>{rank:2d}.</cyan> '
f'<white>{callsign:<12}</white>: '
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}>',
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}> '
f'<magenta>({percentage:5.1f}%)</magenta>',
)
time.sleep(1)

View File

@ -154,21 +154,26 @@ class TestAPRSDRXThread(unittest.TestCase):
mock_list_instance.find.side_effect = KeyError('Not found')
mock_pkt_list.return_value = mock_list_instance
self.rx_thread.process_packet()
# Pass raw packet string as args[0]
self.rx_thread.process_packet(packet.raw)
self.assertEqual(self.rx_thread.pkt_count, 1)
self.assertFalse(self.packet_queue.empty())
# Verify the raw string is on the queue
queued_raw = self.packet_queue.get()
self.assertEqual(queued_raw, packet.raw)
def test_process_packet_no_packet(self):
"""Test process_packet() when decode returns None."""
"""Test process_packet() when no frame is received."""
mock_client = MockClientDriver()
mock_client._decode_packet_return = None
self.rx_thread._client = mock_client
self.rx_thread.pkt_count = 0
with mock.patch('aprsd.threads.rx.LOG') as mock_log:
# Call without args to trigger warning
self.rx_thread.process_packet()
mock_log.error.assert_called()
mock_log.warning.assert_called()
self.assertEqual(self.rx_thread.pkt_count, 0)
def test_process_packet_ack_packet(self):
@ -180,10 +185,14 @@ class TestAPRSDRXThread(unittest.TestCase):
self.rx_thread.pkt_count = 0
with mock.patch('aprsd.threads.rx.packet_log'):
self.rx_thread.process_packet()
# Pass raw packet string as args[0]
self.rx_thread.process_packet(packet.raw)
self.assertEqual(self.rx_thread.pkt_count, 1)
self.assertFalse(self.packet_queue.empty())
# Verify the raw string is on the queue
queued_raw = self.packet_queue.get()
self.assertEqual(queued_raw, packet.raw)
def test_process_packet_duplicate(self):
"""Test process_packet() with duplicate packet.
@ -201,12 +210,14 @@ class TestAPRSDRXThread(unittest.TestCase):
self.rx_thread.pkt_count = 0
with mock.patch('aprsd.threads.rx.packet_log'):
self.rx_thread.process_packet()
# Pass raw packet string as args[0]
self.rx_thread.process_packet(packet.raw)
# The rx thread puts all packets on the queue regardless of duplicates
# Duplicate filtering happens in the filter thread
self.assertFalse(self.packet_queue.empty())
queued_packet = self.packet_queue.get()
self.assertEqual(queued_packet, packet)
queued_raw = self.packet_queue.get()
# Verify the raw string is on the queue
self.assertEqual(queued_raw, packet.raw)
class TestAPRSDFilterThread(unittest.TestCase):