Merge pull request #132 from craigerl/RF_dupe_fix

Fix for dupe packets.
This commit is contained in:
Walter A. Boring IV 2023-10-03 16:18:34 -04:00 committed by GitHub
commit e2f89a6043
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 246 additions and 86 deletions

View File

@ -4,6 +4,7 @@ CHANGES
v3.2.0
------
* Update Changelog for 3.2.0
* minor cleanup prior to release
* Webchat: fix input maxlength
* WebChat: cleanup some console.logs

View File

@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError
from oslo_config import cfg
from aprsd import exception
from aprsd.clients import aprsis, kiss
from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import core, packet_list
from aprsd.utils import trace
@ -17,6 +17,7 @@ LOG = logging.getLogger("APRSD")
TRANSPORT_APRSIS = "aprsis"
TRANSPORT_TCPKISS = "tcpkiss"
TRANSPORT_SERIALKISS = "serialkiss"
TRANSPORT_FAKE = "fake"
# Main must create this from the ClientFactory
# object such that it's populated with the
@ -248,6 +249,35 @@ class KISSClient(Client, metaclass=trace.TraceWrapperMetaclass):
return self._client
class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass):
@staticmethod
def is_enabled():
if CONF.fake_client.enabled:
return True
return False
@staticmethod
def is_configured():
return APRSDFakeClient.is_enabled()
def is_alive(self):
return True
def setup_connection(self):
return fake.APRSDFakeClient()
@staticmethod
def transport():
return TRANSPORT_FAKE
def decode_packet(self, *args, **kwargs):
LOG.debug(f"kwargs {kwargs}")
pkt = kwargs["packet"]
LOG.debug(f"Got an APRS Fake Packet '{pkt}'")
return pkt
class ClientFactory:
_instance = None
@ -270,8 +300,11 @@ class ClientFactory:
key = TRANSPORT_APRSIS
elif KISSClient.is_enabled():
key = KISSClient.transport()
elif APRSDFakeClient.is_enabled():
key = TRANSPORT_FAKE
builder = self._builders.get(key)
LOG.debug(f"Creating client {key}")
if not builder:
raise ValueError(key)
return builder()
@ -312,3 +345,4 @@ class ClientFactory:
factory.register(TRANSPORT_APRSIS, APRSISClient)
factory.register(TRANSPORT_TCPKISS, KISSClient)
factory.register(TRANSPORT_SERIALKISS, KISSClient)
factory.register(TRANSPORT_FAKE, APRSDFakeClient)

49
aprsd/clients/fake.py Normal file
View File

@ -0,0 +1,49 @@
import logging
import threading
import time
from oslo_config import cfg
import wrapt
from aprsd import conf # noqa
from aprsd.packets import core
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass):
'''Fake client for testing.'''
# flag to tell us to stop
thread_stop = False
lock = threading.Lock()
def stop(self):
self.thread_stop = True
LOG.info("Shutdown APRSDFakeClient client.")
def is_alive(self):
"""If the connection is alive or not."""
return not self.thread_stop
@wrapt.synchronized(lock)
def send(self, packet: core.Packet):
"""Send an APRS Message object."""
LOG.info(f"Sending packet: {packet}")
def consumer(self, callback, blocking=False, immortal=False, raw=False):
LOG.debug("Start non blocking FAKE consumer")
# Generate packets here?
pkt = core.MessagePacket(
from_call="N0CALL",
to_call=CONF.callsign,
message_text="Hello World",
msgNo=13,
)
callback(packet=pkt)
LOG.debug(f"END blocking FAKE consumer {self}")
time.sleep(8)

View File

@ -131,9 +131,9 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
def process_ack_packet(self, packet: packets.AckPacket):
super().process_ack_packet(packet)
ack_num = packet.get("msgNo")
SentMessages().ack(int(ack_num))
SentMessages().ack(ack_num)
self.socketio.emit(
"ack", SentMessages().get(int(ack_num)),
"ack", SentMessages().get(ack_num),
namespace="/sendmsg",
)
self.got_ack = True

View File

@ -19,6 +19,11 @@ kiss_tcp_group = cfg.OptGroup(
name="kiss_tcp",
title="KISS TCP/IP Device connection",
)
fake_client_group = cfg.OptGroup(
name="fake_client",
title="Fake Client settings",
)
aprs_opts = [
cfg.BoolOpt(
"enabled",
@ -84,6 +89,14 @@ kiss_tcp_opts = [
),
]
fake_client_opts = [
cfg.BoolOpt(
"enabled",
default=False,
help="Enable fake client connection.",
),
]
def register_opts(config):
config.register_group(aprs_group)
@ -93,10 +106,14 @@ def register_opts(config):
config.register_opts(kiss_serial_opts, group=kiss_serial_group)
config.register_opts(kiss_tcp_opts, group=kiss_tcp_group)
config.register_group(fake_client_group)
config.register_opts(fake_client_opts, group=fake_client_group)
def list_opts():
return {
aprs_group.name: aprs_opts,
kiss_serial_group.name: kiss_serial_opts,
kiss_tcp_group.name: kiss_tcp_opts,
fake_client_group.name: fake_client_opts,
}

View File

@ -1,6 +1,6 @@
from aprsd.packets.core import ( # noqa: F401
AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket,
RejectPacket, StatusPacket, WeatherPacket,
AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, RejectPacket,
StatusPacket, WeatherPacket,
)
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401

View File

@ -29,7 +29,7 @@ PACKET_TYPE_THIRDPARTY = "thirdparty"
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
def _int_timestamp():
def _init_timestamp():
"""Build a unix style timestamp integer"""
return int(round(time.time()))
@ -45,28 +45,30 @@ def _init_msgNo(): # noqa: N802
return c.value
@dataclass
@dataclass(unsafe_hash=True)
class Packet(metaclass=abc.ABCMeta):
from_call: str
to_call: str
addresse: str = None
format: str = None
from_call: str = field(default=None)
to_call: str = field(default=None)
addresse: str = field(default=None)
format: str = field(default=None)
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
packet_type: str = None
timestamp: float = field(default_factory=_int_timestamp)
packet_type: str = field(default=None)
timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False)
# Holds the raw text string to be sent over the wire
# or holds the raw string from input packet
raw: str = None
raw_dict: dict = field(repr=False, default_factory=lambda: {})
raw: str = field(default=None, compare=False, hash=False)
raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False, hash=False)
# Built by calling prepare(). raw needs this built first.
payload: str = None
payload: str = field(default=None)
# Fields related to sending packets out
send_count: int = field(repr=False, default=0)
retry_count: int = field(repr=False, default=3)
last_send_time: datetime.timedelta = field(repr=False, default=None)
send_count: int = field(repr=False, default=0, compare=False, hash=False)
retry_count: int = field(repr=False, default=3, compare=False, hash=False)
last_send_time: datetime.timedelta = field(repr=False, default=None, compare=False, hash=False)
# Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True)
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
path: List[str] = field(default_factory=list, compare=False, hash=False)
via: str = field(default=None, compare=False, hash=False)
def __post__init__(self):
LOG.warning(f"POST INIT {self}")
@ -89,8 +91,13 @@ class Packet(metaclass=abc.ABCMeta):
else:
return default
@property
def key(self):
"""Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.addresse}:{self.msgNo}"
def update_timestamp(self):
self.timestamp = _int_timestamp()
self.timestamp = _init_timestamp()
def prepare(self):
"""Do stuff here that is needed prior to sending over the air."""
@ -258,18 +265,9 @@ class Packet(metaclass=abc.ABCMeta):
return repr
@dataclass
class PathPacket(Packet):
path: List[str] = field(default_factory=list)
via: str = None
def _build_payload(self):
raise NotImplementedError
@dataclass
class AckPacket(PathPacket):
response: str = None
@dataclass(unsafe_hash=True)
class AckPacket(Packet):
response: str = field(default=None)
def __post__init__(self):
if self.response:
@ -279,9 +277,9 @@ class AckPacket(PathPacket):
self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}"
@dataclass
class RejectPacket(PathPacket):
response: str = None
@dataclass(unsafe_hash=True)
class RejectPacket(Packet):
response: str = field(default=None)
def __post__init__(self):
if self.response:
@ -291,9 +289,9 @@ class RejectPacket(PathPacket):
self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}"
@dataclass
class MessagePacket(PathPacket):
message_text: str = None
@dataclass(unsafe_hash=True)
class MessagePacket(Packet):
message_text: str = field(default=None)
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
@ -313,24 +311,24 @@ class MessagePacket(PathPacket):
)
@dataclass
class StatusPacket(PathPacket):
status: str = None
messagecapable: bool = False
comment: str = None
@dataclass(unsafe_hash=True)
class StatusPacket(Packet):
status: str = field(default=None)
messagecapable: bool = field(default=False)
comment: str = field(default=None)
def _build_payload(self):
raise NotImplementedError
@dataclass
class GPSPacket(PathPacket):
latitude: float = 0.00
longitude: float = 0.00
altitude: float = 0.00
rng: float = 0.00
posambiguity: int = 0
comment: str = None
@dataclass(unsafe_hash=True)
class GPSPacket(Packet):
latitude: float = field(default=0.00)
longitude: float = field(default=0.00)
altitude: float = field(default=0.00)
rng: float = field(default=0.00)
posambiguity: int = field(default=0)
comment: str = field(default=None)
symbol: str = field(default="l")
symbol_table: str = field(default="/")

View File

@ -1,10 +1,12 @@
from collections import OrderedDict
from collections.abc import MutableMapping
import logging
import threading
from oslo_config import cfg
import wrapt
from aprsd import stats, utils
from aprsd import stats
from aprsd.packets import seen_list
@ -12,31 +14,24 @@ CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class PacketList:
"""Class to track all of the packets rx'd and tx'd by aprsd."""
class PacketList(MutableMapping):
_instance = None
lock = threading.Lock()
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
_total_rx: int = 0
_total_tx: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._maxlen = 1000
cls.d = OrderedDict()
return cls._instance
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.packet_list)
@wrapt.synchronized(lock)
def rx(self, packet):
"""Add a packet that was received."""
self._total_rx += 1
self.packet_list.append(packet)
self._add(packet)
seen_list.SeenList().update_seen(packet)
stats.APRSDStats().rx(packet)
@ -44,13 +39,44 @@ class PacketList:
def tx(self, packet):
"""Add a packet that was received."""
self._total_tx += 1
self.packet_list.append(packet)
self._add(packet)
seen_list.SeenList().update_seen(packet)
stats.APRSDStats().tx(packet)
@wrapt.synchronized(lock)
def get(self):
return self.packet_list.get()
def add(self, packet):
self._add(packet)
def _add(self, packet):
self[packet.key] = packet
@property
def maxlen(self):
return self._maxlen
@wrapt.synchronized(lock)
def find(self, packet):
return self.get(packet.key)
def __getitem__(self, key):
# self.d.move_to_end(key)
return self.d[key]
def __setitem__(self, key, value):
if key in self.d:
self.d.move_to_end(key)
elif len(self.d) == self.maxlen:
self.d.popitem(last=False)
self.d[key] = value
def __delitem__(self, key):
del self.d[key]
def __iter__(self):
return self.d.__iter__()
def __len__(self):
return len(self.d)
@wrapt.synchronized(lock)
def total_rx(self):

View File

@ -62,30 +62,22 @@ class PacketTrack(objectstore.ObjectStoreMixin):
def __len__(self):
return len(self.data)
@wrapt.synchronized(lock)
def __str__(self):
result = "{"
for key in self.data.keys():
result += f"{key}: {str(self.data[key])}, "
result += "}"
return result
@wrapt.synchronized(lock)
def add(self, packet):
key = int(packet.msgNo)
key = packet.msgNo
self.data[key] = packet
self.total_tracked += 1
@wrapt.synchronized(lock)
def get(self, id):
if id in self.data:
return self.data[id]
def get(self, key):
return self.data.get(key, None)
@wrapt.synchronized(lock)
def remove(self, id):
key = int(id)
if key in self.data.keys():
def remove(self, key):
try:
del self.data[key]
except KeyError:
pass
def restart(self):
"""Walk the list of messages and restart them if any."""

View File

@ -67,11 +67,55 @@ class APRSDPluginRXThread(APRSDRXThread):
"""
def process_packet(self, *args, **kwargs):
"""This handles the processing of an inbound packet.
When a packet is received by the connected client object,
it sends the raw packet into this function. This function then
decodes the packet via the client, and then processes the packet.
Ack Packets are sent to the PluginProcessPacketThread for processing.
All other packets have to be checked as a dupe, and then only after
we haven't seen this packet before, do we send it to the
PluginProcessPacketThread for processing.
"""
packet = self._client.decode_packet(*args, **kwargs)
# LOG.debug(raw)
packet.log(header="RX")
packets.PacketList().rx(packet)
self.packet_queue.put(packet)
if isinstance(packet, packets.AckPacket):
# We don't need to drop AckPackets, those should be
# processed.
self.packet_queue.put(packet)
else:
# Make sure we aren't re-processing the same packet
# For RF based APRS Clients we can get duplicate packets
# So we need to track them and not process the dupes.
found = False
pkt_list = packets.PacketList()
try:
# Find the packet in the list of already seen packets
# Based on the packet.key
found = pkt_list.find(packet)
except KeyError:
found = False
if not found:
# If we are in the process of already ack'ing
# a packet, we should drop the packet
# because it's a dupe within the time that
# we send the 3 acks for the packet.
pkt_list.rx(packet)
self.packet_queue.put(packet)
elif packet.timestamp - found.timestamp < 60:
# If the packet came in within 60 seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.")
else:
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
"but older than 60 seconds. processing.",
)
pkt_list.rx(packet)
self.packet_queue.put(packet)
class APRSDProcessPacketThread(APRSDThread):

View File

@ -37,7 +37,7 @@ class PacketCounter:
@property
@wrapt.synchronized(lock)
def value(self):
return self.val.value
return str(self.val.value)
@wrapt.synchronized(lock)
def __repr__(self):

View File

@ -34,7 +34,6 @@ function init_chat() {
console.log("SENT: ");
console.log(msg);
if (cleared === false) {
console.log("CLEARING #msgsTabsDiv");
var msgsdiv = $("#msgsTabsDiv");
msgsdiv.html('');
cleared = true;