1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-18 06:11:49 -05:00

rewrote packet_list and drop dupe packets

This patch rewrites the packet_list internally to be a dictionary
instead of a list for very fast lookups.  This was needed to test for
duplicate packets already in the list.

This patch drops packets that have the same data and are < 60 seconds
in age from the last time we got the packet.   On RF based clients
we can get dupes!!
This commit is contained in:
Hemna 2023-09-28 12:19:18 -04:00
parent 0d7e50d2ba
commit 4f87d5da12
6 changed files with 173 additions and 33 deletions

View File

@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError
from oslo_config import cfg from oslo_config import cfg
from aprsd import exception from aprsd import exception
from aprsd.clients import aprsis, kiss from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import core, packet_list from aprsd.packets import core, packet_list
from aprsd.utils import trace from aprsd.utils import trace
@ -17,6 +17,7 @@ LOG = logging.getLogger("APRSD")
TRANSPORT_APRSIS = "aprsis" TRANSPORT_APRSIS = "aprsis"
TRANSPORT_TCPKISS = "tcpkiss" TRANSPORT_TCPKISS = "tcpkiss"
TRANSPORT_SERIALKISS = "serialkiss" TRANSPORT_SERIALKISS = "serialkiss"
TRANSPORT_FAKE = "fake"
# Main must create this from the ClientFactory # Main must create this from the ClientFactory
# object such that it's populated with the # object such that it's populated with the
@ -248,6 +249,35 @@ class KISSClient(Client, metaclass=trace.TraceWrapperMetaclass):
return self._client return self._client
class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass):
@staticmethod
def is_enabled():
if CONF.fake_client.enabled:
return True
return False
@staticmethod
def is_configured():
return APRSDFakeClient.is_enabled()
def is_alive(self):
return True
def setup_connection(self):
return fake.APRSDFakeClient()
@staticmethod
def transport():
return TRANSPORT_FAKE
def decode_packet(self, *args, **kwargs):
LOG.debug(f"kwargs {kwargs}")
pkt = kwargs["packet"]
LOG.debug(f"Got an APRS Fake Packet '{pkt}'")
return pkt
class ClientFactory: class ClientFactory:
_instance = None _instance = None
@ -270,8 +300,11 @@ class ClientFactory:
key = TRANSPORT_APRSIS key = TRANSPORT_APRSIS
elif KISSClient.is_enabled(): elif KISSClient.is_enabled():
key = KISSClient.transport() key = KISSClient.transport()
elif APRSDFakeClient.is_enabled():
key = TRANSPORT_FAKE
builder = self._builders.get(key) builder = self._builders.get(key)
LOG.debug(f"Creating client {key}")
if not builder: if not builder:
raise ValueError(key) raise ValueError(key)
return builder() return builder()
@ -312,3 +345,4 @@ class ClientFactory:
factory.register(TRANSPORT_APRSIS, APRSISClient) factory.register(TRANSPORT_APRSIS, APRSISClient)
factory.register(TRANSPORT_TCPKISS, KISSClient) factory.register(TRANSPORT_TCPKISS, KISSClient)
factory.register(TRANSPORT_SERIALKISS, KISSClient) factory.register(TRANSPORT_SERIALKISS, KISSClient)
factory.register(TRANSPORT_FAKE, APRSDFakeClient)

49
aprsd/clients/fake.py Normal file
View File

@ -0,0 +1,49 @@
import logging
import threading
import time
from oslo_config import cfg
import wrapt
from aprsd import conf # noqa
from aprsd.packets import core
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass):
'''Fake client for testing.'''
# flag to tell us to stop
thread_stop = False
lock = threading.Lock()
def stop(self):
self.thread_stop = True
LOG.info("Shutdown APRSDFakeClient client.")
def is_alive(self):
"""If the connection is alive or not."""
return not self.thread_stop
@wrapt.synchronized(lock)
def send(self, packet: core.Packet):
"""Send an APRS Message object."""
LOG.info(f"Sending packet: {packet}")
def consumer(self, callback, blocking=False, immortal=False, raw=False):
LOG.debug("Start non blocking FAKE consumer")
# Generate packets here?
pkt = core.MessagePacket(
from_call="N0CALL",
to_call=CONF.callsign,
message_text="Hello World",
msgNo=13,
)
callback(packet=pkt)
LOG.debug(f"END blocking FAKE consumer {self}")
time.sleep(8)

View File

@ -19,6 +19,11 @@ kiss_tcp_group = cfg.OptGroup(
name="kiss_tcp", name="kiss_tcp",
title="KISS TCP/IP Device connection", title="KISS TCP/IP Device connection",
) )
fake_client_group = cfg.OptGroup(
name="fake_client",
title="Fake Client settings",
)
aprs_opts = [ aprs_opts = [
cfg.BoolOpt( cfg.BoolOpt(
"enabled", "enabled",
@ -84,6 +89,14 @@ kiss_tcp_opts = [
), ),
] ]
fake_client_opts = [
cfg.BoolOpt(
"enabled",
default=False,
help="Enable fake client connection.",
),
]
def register_opts(config): def register_opts(config):
config.register_group(aprs_group) config.register_group(aprs_group)
@ -93,10 +106,14 @@ def register_opts(config):
config.register_opts(kiss_serial_opts, group=kiss_serial_group) config.register_opts(kiss_serial_opts, group=kiss_serial_group)
config.register_opts(kiss_tcp_opts, group=kiss_tcp_group) config.register_opts(kiss_tcp_opts, group=kiss_tcp_group)
config.register_group(fake_client_group)
config.register_opts(fake_client_opts, group=fake_client_group)
def list_opts(): def list_opts():
return { return {
aprs_group.name: aprs_opts, aprs_group.name: aprs_opts,
kiss_serial_group.name: kiss_serial_opts, kiss_serial_group.name: kiss_serial_opts,
kiss_tcp_group.name: kiss_tcp_opts, kiss_tcp_group.name: kiss_tcp_opts,
fake_client_group.name: fake_client_opts,
} }

View File

@ -29,11 +29,10 @@ PACKET_TYPE_THIRDPARTY = "thirdparty"
PACKET_TYPE_UNCOMPRESSED = "uncompressed" PACKET_TYPE_UNCOMPRESSED = "uncompressed"
def _int_timestamp(): def _init_timestamp():
"""Build a unix style timestamp integer""" """Build a unix style timestamp integer"""
return int(round(time.time())) return int(round(time.time()))
def _init_msgNo(): # noqa: N802 def _init_msgNo(): # noqa: N802
"""For some reason __post__init doesn't get called. """For some reason __post__init doesn't get called.
@ -45,7 +44,7 @@ def _init_msgNo(): # noqa: N802
return c.value return c.value
@dataclass @dataclass(unsafe_hash=True)
class Packet(metaclass=abc.ABCMeta): class Packet(metaclass=abc.ABCMeta):
from_call: str from_call: str
to_call: str to_call: str
@ -53,11 +52,11 @@ class Packet(metaclass=abc.ABCMeta):
format: str = None format: str = None
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
packet_type: str = None packet_type: str = None
timestamp: float = field(default_factory=_int_timestamp) timestamp: float = field(default_factory=_init_timestamp)
# Holds the raw text string to be sent over the wire # Holds the raw text string to be sent over the wire
# or holds the raw string from input packet # or holds the raw string from input packet
raw: str = None raw: str = None
raw_dict: dict = field(repr=False, default_factory=lambda: {}) raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False)
# Built by calling prepare(). raw needs this built first. # Built by calling prepare(). raw needs this built first.
payload: str = None payload: str = None
@ -89,8 +88,12 @@ class Packet(metaclass=abc.ABCMeta):
else: else:
return default return default
def key(self):
"""Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.addresse}:{self.msgNo}"
def update_timestamp(self): def update_timestamp(self):
self.timestamp = _int_timestamp() self.timestamp = _init_timestamp()
def prepare(self): def prepare(self):
"""Do stuff here that is needed prior to sending over the air.""" """Do stuff here that is needed prior to sending over the air."""
@ -258,16 +261,16 @@ class Packet(metaclass=abc.ABCMeta):
return repr return repr
@dataclass @dataclass(unsafe_hash=True)
class PathPacket(Packet): class PathPacket(Packet):
path: List[str] = field(default_factory=list) path: List[str] = field(default_factory=list, compare=False)
via: str = None via: str = None
def _build_payload(self): def _build_payload(self):
raise NotImplementedError raise NotImplementedError
@dataclass @dataclass(unsafe_hash=True)
class AckPacket(PathPacket): class AckPacket(PathPacket):
response: str = None response: str = None
@ -279,7 +282,7 @@ class AckPacket(PathPacket):
self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}" self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}"
@dataclass @dataclass(unsafe_hash=True)
class RejectPacket(PathPacket): class RejectPacket(PathPacket):
response: str = None response: str = None
@ -291,7 +294,7 @@ class RejectPacket(PathPacket):
self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}" self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}"
@dataclass @dataclass(unsafe_hash=True)
class MessagePacket(PathPacket): class MessagePacket(PathPacket):
message_text: str = None message_text: str = None
@ -313,7 +316,7 @@ class MessagePacket(PathPacket):
) )
@dataclass @dataclass(unsafe_hash=True)
class StatusPacket(PathPacket): class StatusPacket(PathPacket):
status: str = None status: str = None
messagecapable: bool = False messagecapable: bool = False
@ -323,7 +326,7 @@ class StatusPacket(PathPacket):
raise NotImplementedError raise NotImplementedError
@dataclass @dataclass(unsafe_hash=True)
class GPSPacket(PathPacket): class GPSPacket(PathPacket):
latitude: float = 0.00 latitude: float = 0.00
longitude: float = 0.00 longitude: float = 0.00

View File

@ -1,10 +1,11 @@
from collections import MutableMapping, OrderedDict
import logging import logging
import threading import threading
from oslo_config import cfg from oslo_config import cfg
import wrapt import wrapt
from aprsd import stats, utils from aprsd import stats
from aprsd.packets import seen_list from aprsd.packets import seen_list
@ -12,31 +13,24 @@ CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
class PacketList: class PacketList(MutableMapping):
"""Class to track all of the packets rx'd and tx'd by aprsd."""
_instance = None _instance = None
lock = threading.Lock() lock = threading.Lock()
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
_total_rx: int = 0 _total_rx: int = 0
_total_tx: int = 0 _total_tx: int = 0
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._maxlen = 1000
cls.d = OrderedDict()
return cls._instance return cls._instance
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.packet_list)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def rx(self, packet): def rx(self, packet):
"""Add a packet that was received.""" """Add a packet that was received."""
self._total_rx += 1 self._total_rx += 1
self.packet_list.append(packet) self._add(packet)
seen_list.SeenList().update_seen(packet) seen_list.SeenList().update_seen(packet)
stats.APRSDStats().rx(packet) stats.APRSDStats().rx(packet)
@ -44,13 +38,46 @@ class PacketList:
def tx(self, packet): def tx(self, packet):
"""Add a packet that was received.""" """Add a packet that was received."""
self._total_tx += 1 self._total_tx += 1
self.packet_list.append(packet) self._add(packet)
seen_list.SeenList().update_seen(packet) seen_list.SeenList().update_seen(packet)
stats.APRSDStats().tx(packet) stats.APRSDStats().tx(packet)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def get(self): def add(self, packet):
return self.packet_list.get() self._add(packet)
def _add(self, packet):
key = packet.key()
self[key] = packet
@property
def maxlen(self):
return self._maxlen
@wrapt.synchronized(lock)
def find(self, packet):
key = packet.key()
return self.get(key)
def __getitem__(self, key):
#self.d.move_to_end(key)
return self.d[key]
def __setitem__(self, key, value):
if key in self.d:
self.d.move_to_end(key)
elif len(self.d) == self.maxlen:
self.d.popitem(last=False)
self.d[key] = value
def __delitem__(self, key):
del self.d[key]
def __iter__(self):
return self.d.__iter__()
def __len__(self):
return len(self.d)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def total_rx(self): def total_rx(self):

View File

@ -70,16 +70,26 @@ class APRSDPluginRXThread(APRSDRXThread):
packet = self._client.decode_packet(*args, **kwargs) packet = self._client.decode_packet(*args, **kwargs)
# LOG.debug(raw) # LOG.debug(raw)
packet.log(header="RX") packet.log(header="RX")
tracked = packets.PacketTrack().get(packet.msgNo) found = False
if not tracked: pkt_list = packets.PacketList()
try:
found = pkt_list.find(packet)
except KeyError:
found = False
if not found:
# If we are in the process of already ack'ing # If we are in the process of already ack'ing
# a packet, we should drop the packet # a packet, we should drop the packet
# because it's a dupe within the time that # because it's a dupe within the time that
# we send the 3 acks for the packet. # we send the 3 acks for the packet.
packets.PacketList().rx(packet) pkt_list.rx(packet)
self.packet_queue.put(packet) self.packet_queue.put(packet)
else: elif packet.timestamp - found.timestamp < 60:
LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.")
else:
LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked but older than 60 seconds. processing.")
pkt_list.rx(packet)
self.packet_queue.put(packet)
class APRSDProcessPacketThread(APRSDThread): class APRSDProcessPacketThread(APRSDThread):