From cc15950f33122f55642e60b8123c0b99fa3b0fe5 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Mon, 29 Dec 2025 19:23:47 -0500 Subject: [PATCH] update the rx thread and packet trackers the main rx thread now doesn't do any processing of the incoming packet other than converting the raw packet string to a packet object. --- aprsd/packets/packet_list.py | 2 ++ aprsd/packets/seen_list.py | 2 ++ aprsd/packets/tracker.py | 2 ++ aprsd/packets/watch_list.py | 2 ++ aprsd/threads/rx.py | 49 ++++++------------------------------ aprsd/threads/stats.py | 5 +++- aprsd/utils/objectstore.py | 47 +++++++++++++++++----------------- 7 files changed, 44 insertions(+), 65 deletions(-) diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index de251d5..c2edac9 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -1,4 +1,5 @@ import logging +import threading from collections import OrderedDict from oslo_config import cfg @@ -21,6 +22,7 @@ class PacketList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls.lock = threading.RLock() cls._instance.maxlen = CONF.packet_list_maxlen cls._instance._init_data() return cls._instance diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py index 6f0da5c..571954b 100644 --- a/aprsd/packets/seen_list.py +++ b/aprsd/packets/seen_list.py @@ -1,5 +1,6 @@ import datetime import logging +import threading from oslo_config import cfg @@ -20,6 +21,7 @@ class SeenList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls._instance.lock = threading.RLock() cls._instance.data = {} return cls._instance diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index a751e15..a2d4031 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -1,5 +1,6 @@ import datetime import logging +import threading from oslo_config import cfg @@ -33,6 +34,7 @@ class PacketTrack(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls._instance.lock = threading.RLock() cls._instance._start_time = datetime.datetime.now() cls._instance._init_store() return cls._instance diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index d9a82d9..ec09128 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -1,5 +1,6 @@ import datetime import logging +import threading from oslo_config import cfg @@ -21,6 +22,7 @@ class WatchList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls._instance.lock = threading.RLock() return cls._instance @trace.no_trace diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 9c46644..5715e99 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -87,6 +87,10 @@ class APRSDRXThread(APRSDThread): return True def process_packet(self, *args, **kwargs): + """Convert the raw packet into a Packet object and put it on the queue. + + The processing of the packet will happen in a separate thread. + """ packet = self._client.decode_packet(*args, **kwargs) if not packet: LOG.error( @@ -95,47 +99,7 @@ class APRSDRXThread(APRSDThread): return self.pkt_count += 1 packet_log.log(packet, packet_count=self.pkt_count) - pkt_list = packets.PacketList() - - if isinstance(packet, packets.AckPacket): - # We don't need to drop AckPackets, those should be - # processed. - self.packet_queue.put(packet) - else: - # Make sure we aren't re-processing the same packet - # For RF based APRS Clients we can get duplicate packets - # So we need to track them and not process the dupes. - found = False - try: - # Find the packet in the list of already seen packets - # Based on the packet.key - found = pkt_list.find(packet) - if not packet.msgNo: - # If the packet doesn't have a message id - # then there is no reliable way to detect - # if it's a dupe, so we just pass it on. - # it shouldn't get acked either. - found = False - except KeyError: - found = False - - if not found: - # We haven't seen this packet before, so we process it. - collector.PacketCollector().rx(packet) - self.packet_queue.put(packet) - elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout: - # If the packet came in within N seconds of the - # Last time seeing the packet, then we drop it as a dupe. - LOG.warning( - f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.' - ) - else: - LOG.warning( - f'Packet {packet.from_call}:{packet.msgNo} already tracked ' - f'but older than {CONF.packet_dupe_timeout} seconds. processing.', - ) - collector.PacketCollector().rx(packet) - self.packet_queue.put(packet) + self.packet_queue.put(packet) class APRSDFilterThread(APRSDThread): @@ -164,6 +128,9 @@ class APRSDFilterThread(APRSDThread): self.print_packet(packet) if packet: if self.filter_packet(packet): + # The packet has passed all filters, so we collect it. + # and process it. + collector.PacketCollector().rx(packet) self.process_packet(packet) except queue.Empty: pass diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py index 54d789a..bd7ef3a 100644 --- a/aprsd/threads/stats.py +++ b/aprsd/threads/stats.py @@ -1,8 +1,8 @@ import logging +import threading import time from oslo_config import cfg - from aprsd.stats import collector from aprsd.threads import APRSDThread from aprsd.utils import objectstore @@ -14,6 +14,9 @@ LOG = logging.getLogger('APRSD') class StatsStore(objectstore.ObjectStoreMixin): """Container to save the stats from the collector.""" + def __init__(self): + self.lock = threading.RLock() + def add(self, stats: dict): with self.lock: self.data = stats diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index be494f0..93e005b 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -24,9 +24,8 @@ class ObjectStoreMixin: When APRSD Starts, it calls load() aprsd server -f (flush) will wipe all saved objects. """ - - def __init__(self): - self.lock = threading.RLock() + # Child class must create the lock. + lock = None def __len__(self): with self.lock: @@ -94,29 +93,31 @@ class ObjectStoreMixin: def load(self): if not CONF.enable_save: return - if os.path.exists(self._save_filename()): - try: - with open(self._save_filename(), 'rb') as fp: - raw = pickle.load(fp) - if raw: - self.data = raw - LOG.debug( - f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.', - ) - else: - LOG.debug(f'{self.__class__.__name__}::No data to load.') - except (pickle.UnpicklingError, Exception) as ex: - LOG.error(f'Failed to UnPickle {self._save_filename()}') - LOG.error(ex) - self.data = {} - else: - LOG.debug(f'{self.__class__.__name__}::No save file found.') + with self.lock: + if os.path.exists(self._save_filename()): + try: + with open(self._save_filename(), 'rb') as fp: + raw = pickle.load(fp) + if raw: + self.data = raw + LOG.debug( + f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.', + ) + else: + LOG.debug(f'{self.__class__.__name__}::No data to load.') + except (pickle.UnpicklingError, Exception) as ex: + LOG.error(f'Failed to UnPickle {self._save_filename()}') + LOG.error(ex) + self.data = {} + else: + LOG.debug(f'{self.__class__.__name__}::No save file found.') def flush(self): """Nuke the old pickle file that stored the old results from last aprsd run.""" if not CONF.enable_save: return - if os.path.exists(self._save_filename()): - pathlib.Path(self._save_filename()).unlink() with self.lock: - self.data = {} + if os.path.exists(self._save_filename()): + pathlib.Path(self._save_filename()).unlink() + with self.lock: + self.data = {}