mirror of
https://github.com/craigerl/aprsd.git
synced 2026-02-11 10:33:44 -05:00
update the rx thread and packet trackers
the main rx thread now doesn't do any processing of the incoming packet other than converting the raw packet string to a packet object.
This commit is contained in:
parent
514df8788d
commit
cc15950f33
@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import threading
|
||||
from collections import OrderedDict
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -21,6 +22,7 @@ class PacketList(objectstore.ObjectStoreMixin):
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls.lock = threading.RLock()
|
||||
cls._instance.maxlen = CONF.packet_list_maxlen
|
||||
cls._instance._init_data()
|
||||
return cls._instance
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
@ -20,6 +21,7 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.lock = threading.RLock()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
@ -33,6 +34,7 @@ class PacketTrack(objectstore.ObjectStoreMixin):
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.lock = threading.RLock()
|
||||
cls._instance._start_time = datetime.datetime.now()
|
||||
cls._instance._init_store()
|
||||
return cls._instance
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
@ -21,6 +22,7 @@ class WatchList(objectstore.ObjectStoreMixin):
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.lock = threading.RLock()
|
||||
return cls._instance
|
||||
|
||||
@trace.no_trace
|
||||
|
||||
@ -87,6 +87,10 @@ class APRSDRXThread(APRSDThread):
|
||||
return True
|
||||
|
||||
def process_packet(self, *args, **kwargs):
|
||||
"""Convert the raw packet into a Packet object and put it on the queue.
|
||||
|
||||
The processing of the packet will happen in a separate thread.
|
||||
"""
|
||||
packet = self._client.decode_packet(*args, **kwargs)
|
||||
if not packet:
|
||||
LOG.error(
|
||||
@ -95,47 +99,7 @@ class APRSDRXThread(APRSDThread):
|
||||
return
|
||||
self.pkt_count += 1
|
||||
packet_log.log(packet, packet_count=self.pkt_count)
|
||||
pkt_list = packets.PacketList()
|
||||
|
||||
if isinstance(packet, packets.AckPacket):
|
||||
# We don't need to drop AckPackets, those should be
|
||||
# processed.
|
||||
self.packet_queue.put(packet)
|
||||
else:
|
||||
# Make sure we aren't re-processing the same packet
|
||||
# For RF based APRS Clients we can get duplicate packets
|
||||
# So we need to track them and not process the dupes.
|
||||
found = False
|
||||
try:
|
||||
# Find the packet in the list of already seen packets
|
||||
# Based on the packet.key
|
||||
found = pkt_list.find(packet)
|
||||
if not packet.msgNo:
|
||||
# If the packet doesn't have a message id
|
||||
# then there is no reliable way to detect
|
||||
# if it's a dupe, so we just pass it on.
|
||||
# it shouldn't get acked either.
|
||||
found = False
|
||||
except KeyError:
|
||||
found = False
|
||||
|
||||
if not found:
|
||||
# We haven't seen this packet before, so we process it.
|
||||
collector.PacketCollector().rx(packet)
|
||||
self.packet_queue.put(packet)
|
||||
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
|
||||
# If the packet came in within N seconds of the
|
||||
# Last time seeing the packet, then we drop it as a dupe.
|
||||
LOG.warning(
|
||||
f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
|
||||
)
|
||||
else:
|
||||
LOG.warning(
|
||||
f'Packet {packet.from_call}:{packet.msgNo} already tracked '
|
||||
f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
|
||||
)
|
||||
collector.PacketCollector().rx(packet)
|
||||
self.packet_queue.put(packet)
|
||||
self.packet_queue.put(packet)
|
||||
|
||||
|
||||
class APRSDFilterThread(APRSDThread):
|
||||
@ -164,6 +128,9 @@ class APRSDFilterThread(APRSDThread):
|
||||
self.print_packet(packet)
|
||||
if packet:
|
||||
if self.filter_packet(packet):
|
||||
# The packet has passed all filters, so we collect it.
|
||||
# and process it.
|
||||
collector.PacketCollector().rx(packet)
|
||||
self.process_packet(packet)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from aprsd.stats import collector
|
||||
from aprsd.threads import APRSDThread
|
||||
from aprsd.utils import objectstore
|
||||
@ -14,6 +14,9 @@ LOG = logging.getLogger('APRSD')
|
||||
class StatsStore(objectstore.ObjectStoreMixin):
|
||||
"""Container to save the stats from the collector."""
|
||||
|
||||
def __init__(self):
|
||||
self.lock = threading.RLock()
|
||||
|
||||
def add(self, stats: dict):
|
||||
with self.lock:
|
||||
self.data = stats
|
||||
|
||||
@ -24,9 +24,8 @@ class ObjectStoreMixin:
|
||||
When APRSD Starts, it calls load()
|
||||
aprsd server -f (flush) will wipe all saved objects.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.lock = threading.RLock()
|
||||
# Child class must create the lock.
|
||||
lock = None
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
@ -94,29 +93,31 @@ class ObjectStoreMixin:
|
||||
def load(self):
|
||||
if not CONF.enable_save:
|
||||
return
|
||||
if os.path.exists(self._save_filename()):
|
||||
try:
|
||||
with open(self._save_filename(), 'rb') as fp:
|
||||
raw = pickle.load(fp)
|
||||
if raw:
|
||||
self.data = raw
|
||||
LOG.debug(
|
||||
f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.',
|
||||
)
|
||||
else:
|
||||
LOG.debug(f'{self.__class__.__name__}::No data to load.')
|
||||
except (pickle.UnpicklingError, Exception) as ex:
|
||||
LOG.error(f'Failed to UnPickle {self._save_filename()}')
|
||||
LOG.error(ex)
|
||||
self.data = {}
|
||||
else:
|
||||
LOG.debug(f'{self.__class__.__name__}::No save file found.')
|
||||
with self.lock:
|
||||
if os.path.exists(self._save_filename()):
|
||||
try:
|
||||
with open(self._save_filename(), 'rb') as fp:
|
||||
raw = pickle.load(fp)
|
||||
if raw:
|
||||
self.data = raw
|
||||
LOG.debug(
|
||||
f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.',
|
||||
)
|
||||
else:
|
||||
LOG.debug(f'{self.__class__.__name__}::No data to load.')
|
||||
except (pickle.UnpicklingError, Exception) as ex:
|
||||
LOG.error(f'Failed to UnPickle {self._save_filename()}')
|
||||
LOG.error(ex)
|
||||
self.data = {}
|
||||
else:
|
||||
LOG.debug(f'{self.__class__.__name__}::No save file found.')
|
||||
|
||||
def flush(self):
|
||||
"""Nuke the old pickle file that stored the old results from last aprsd run."""
|
||||
if not CONF.enable_save:
|
||||
return
|
||||
if os.path.exists(self._save_filename()):
|
||||
pathlib.Path(self._save_filename()).unlink()
|
||||
with self.lock:
|
||||
self.data = {}
|
||||
if os.path.exists(self._save_filename()):
|
||||
pathlib.Path(self._save_filename()).unlink()
|
||||
with self.lock:
|
||||
self.data = {}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user