diff --git a/ChangeLog b/ChangeLog index 8ad4952..827ad9c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -4,6 +4,7 @@ CHANGES v3.2.0 ------ +* Update Changelog for 3.2.0 * minor cleanup prior to release * Webchat: fix input maxlength * WebChat: cleanup some console.logs diff --git a/aprsd/client.py b/aprsd/client.py index 7bc4f02..bca92c5 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError from oslo_config import cfg from aprsd import exception -from aprsd.clients import aprsis, kiss +from aprsd.clients import aprsis, fake, kiss from aprsd.packets import core, packet_list from aprsd.utils import trace @@ -17,6 +17,7 @@ LOG = logging.getLogger("APRSD") TRANSPORT_APRSIS = "aprsis" TRANSPORT_TCPKISS = "tcpkiss" TRANSPORT_SERIALKISS = "serialkiss" +TRANSPORT_FAKE = "fake" # Main must create this from the ClientFactory # object such that it's populated with the @@ -248,6 +249,35 @@ class KISSClient(Client, metaclass=trace.TraceWrapperMetaclass): return self._client +class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): + + @staticmethod + def is_enabled(): + if CONF.fake_client.enabled: + return True + return False + + @staticmethod + def is_configured(): + return APRSDFakeClient.is_enabled() + + def is_alive(self): + return True + + def setup_connection(self): + return fake.APRSDFakeClient() + + @staticmethod + def transport(): + return TRANSPORT_FAKE + + def decode_packet(self, *args, **kwargs): + LOG.debug(f"kwargs {kwargs}") + pkt = kwargs["packet"] + LOG.debug(f"Got an APRS Fake Packet '{pkt}'") + return pkt + + class ClientFactory: _instance = None @@ -270,8 +300,11 @@ class ClientFactory: key = TRANSPORT_APRSIS elif KISSClient.is_enabled(): key = KISSClient.transport() + elif APRSDFakeClient.is_enabled(): + key = TRANSPORT_FAKE builder = self._builders.get(key) + LOG.debug(f"Creating client {key}") if not builder: raise ValueError(key) return builder() @@ -312,3 +345,4 @@ class ClientFactory: factory.register(TRANSPORT_APRSIS, APRSISClient) factory.register(TRANSPORT_TCPKISS, KISSClient) factory.register(TRANSPORT_SERIALKISS, KISSClient) + factory.register(TRANSPORT_FAKE, APRSDFakeClient) diff --git a/aprsd/clients/fake.py b/aprsd/clients/fake.py new file mode 100644 index 0000000..87e8d0b --- /dev/null +++ b/aprsd/clients/fake.py @@ -0,0 +1,49 @@ +import logging +import threading +import time + +from oslo_config import cfg +import wrapt + +from aprsd import conf # noqa +from aprsd.packets import core +from aprsd.utils import trace + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass): + '''Fake client for testing.''' + + # flag to tell us to stop + thread_stop = False + + lock = threading.Lock() + + def stop(self): + self.thread_stop = True + LOG.info("Shutdown APRSDFakeClient client.") + + def is_alive(self): + """If the connection is alive or not.""" + return not self.thread_stop + + @wrapt.synchronized(lock) + def send(self, packet: core.Packet): + """Send an APRS Message object.""" + LOG.info(f"Sending packet: {packet}") + + def consumer(self, callback, blocking=False, immortal=False, raw=False): + LOG.debug("Start non blocking FAKE consumer") + # Generate packets here? + pkt = core.MessagePacket( + from_call="N0CALL", + to_call=CONF.callsign, + message_text="Hello World", + msgNo=13, + ) + callback(packet=pkt) + LOG.debug(f"END blocking FAKE consumer {self}") + time.sleep(8) diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 57b1d69..ff56a16 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -131,9 +131,9 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): def process_ack_packet(self, packet: packets.AckPacket): super().process_ack_packet(packet) ack_num = packet.get("msgNo") - SentMessages().ack(int(ack_num)) + SentMessages().ack(ack_num) self.socketio.emit( - "ack", SentMessages().get(int(ack_num)), + "ack", SentMessages().get(ack_num), namespace="/sendmsg", ) self.got_ack = True diff --git a/aprsd/conf/client.py b/aprsd/conf/client.py index c752f16..cb77e09 100644 --- a/aprsd/conf/client.py +++ b/aprsd/conf/client.py @@ -19,6 +19,11 @@ kiss_tcp_group = cfg.OptGroup( name="kiss_tcp", title="KISS TCP/IP Device connection", ) + +fake_client_group = cfg.OptGroup( + name="fake_client", + title="Fake Client settings", +) aprs_opts = [ cfg.BoolOpt( "enabled", @@ -84,6 +89,14 @@ kiss_tcp_opts = [ ), ] +fake_client_opts = [ + cfg.BoolOpt( + "enabled", + default=False, + help="Enable fake client connection.", + ), +] + def register_opts(config): config.register_group(aprs_group) @@ -93,10 +106,14 @@ def register_opts(config): config.register_opts(kiss_serial_opts, group=kiss_serial_group) config.register_opts(kiss_tcp_opts, group=kiss_tcp_group) + config.register_group(fake_client_group) + config.register_opts(fake_client_opts, group=fake_client_group) + def list_opts(): return { aprs_group.name: aprs_opts, kiss_serial_group.name: kiss_serial_opts, kiss_tcp_group.name: kiss_tcp_opts, + fake_client_group.name: fake_client_opts, } diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py index 66236d8..7fab8bb 100644 --- a/aprsd/packets/__init__.py +++ b/aprsd/packets/__init__.py @@ -1,6 +1,6 @@ from aprsd.packets.core import ( # noqa: F401 - AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket, - RejectPacket, StatusPacket, WeatherPacket, + AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, RejectPacket, + StatusPacket, WeatherPacket, ) from aprsd.packets.packet_list import PacketList # noqa: F401 from aprsd.packets.seen_list import SeenList # noqa: F401 diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index beda33d..1b3dd8c 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -29,7 +29,7 @@ PACKET_TYPE_THIRDPARTY = "thirdparty" PACKET_TYPE_UNCOMPRESSED = "uncompressed" -def _int_timestamp(): +def _init_timestamp(): """Build a unix style timestamp integer""" return int(round(time.time())) @@ -45,28 +45,30 @@ def _init_msgNo(): # noqa: N802 return c.value -@dataclass +@dataclass(unsafe_hash=True) class Packet(metaclass=abc.ABCMeta): - from_call: str - to_call: str - addresse: str = None - format: str = None + from_call: str = field(default=None) + to_call: str = field(default=None) + addresse: str = field(default=None) + format: str = field(default=None) msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 - packet_type: str = None - timestamp: float = field(default_factory=_int_timestamp) + packet_type: str = field(default=None) + timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False) # Holds the raw text string to be sent over the wire # or holds the raw string from input packet - raw: str = None - raw_dict: dict = field(repr=False, default_factory=lambda: {}) + raw: str = field(default=None, compare=False, hash=False) + raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False, hash=False) # Built by calling prepare(). raw needs this built first. - payload: str = None + payload: str = field(default=None) # Fields related to sending packets out - send_count: int = field(repr=False, default=0) - retry_count: int = field(repr=False, default=3) - last_send_time: datetime.timedelta = field(repr=False, default=None) + send_count: int = field(repr=False, default=0, compare=False, hash=False) + retry_count: int = field(repr=False, default=3, compare=False, hash=False) + last_send_time: datetime.timedelta = field(repr=False, default=None, compare=False, hash=False) # Do we allow this packet to be saved to send later? - allow_delay: bool = field(repr=False, default=True) + allow_delay: bool = field(repr=False, default=True, compare=False, hash=False) + path: List[str] = field(default_factory=list, compare=False, hash=False) + via: str = field(default=None, compare=False, hash=False) def __post__init__(self): LOG.warning(f"POST INIT {self}") @@ -89,8 +91,13 @@ class Packet(metaclass=abc.ABCMeta): else: return default + @property + def key(self): + """Build a key for finding this packet in a dict.""" + return f"{self.from_call}:{self.addresse}:{self.msgNo}" + def update_timestamp(self): - self.timestamp = _int_timestamp() + self.timestamp = _init_timestamp() def prepare(self): """Do stuff here that is needed prior to sending over the air.""" @@ -258,18 +265,9 @@ class Packet(metaclass=abc.ABCMeta): return repr -@dataclass -class PathPacket(Packet): - path: List[str] = field(default_factory=list) - via: str = None - - def _build_payload(self): - raise NotImplementedError - - -@dataclass -class AckPacket(PathPacket): - response: str = None +@dataclass(unsafe_hash=True) +class AckPacket(Packet): + response: str = field(default=None) def __post__init__(self): if self.response: @@ -279,9 +277,9 @@ class AckPacket(PathPacket): self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}" -@dataclass -class RejectPacket(PathPacket): - response: str = None +@dataclass(unsafe_hash=True) +class RejectPacket(Packet): + response: str = field(default=None) def __post__init__(self): if self.response: @@ -291,9 +289,9 @@ class RejectPacket(PathPacket): self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}" -@dataclass -class MessagePacket(PathPacket): - message_text: str = None +@dataclass(unsafe_hash=True) +class MessagePacket(Packet): + message_text: str = field(default=None) def _filter_for_send(self) -> str: """Filter and format message string for FCC.""" @@ -313,24 +311,24 @@ class MessagePacket(PathPacket): ) -@dataclass -class StatusPacket(PathPacket): - status: str = None - messagecapable: bool = False - comment: str = None +@dataclass(unsafe_hash=True) +class StatusPacket(Packet): + status: str = field(default=None) + messagecapable: bool = field(default=False) + comment: str = field(default=None) def _build_payload(self): raise NotImplementedError -@dataclass -class GPSPacket(PathPacket): - latitude: float = 0.00 - longitude: float = 0.00 - altitude: float = 0.00 - rng: float = 0.00 - posambiguity: int = 0 - comment: str = None +@dataclass(unsafe_hash=True) +class GPSPacket(Packet): + latitude: float = field(default=0.00) + longitude: float = field(default=0.00) + altitude: float = field(default=0.00) + rng: float = field(default=0.00) + posambiguity: int = field(default=0) + comment: str = field(default=None) symbol: str = field(default="l") symbol_table: str = field(default="/") diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index d42e9de..a6dc6f7 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -1,10 +1,12 @@ +from collections import OrderedDict +from collections.abc import MutableMapping import logging import threading from oslo_config import cfg import wrapt -from aprsd import stats, utils +from aprsd import stats from aprsd.packets import seen_list @@ -12,31 +14,24 @@ CONF = cfg.CONF LOG = logging.getLogger("APRSD") -class PacketList: - """Class to track all of the packets rx'd and tx'd by aprsd.""" - +class PacketList(MutableMapping): _instance = None lock = threading.Lock() - - packet_list: utils.RingBuffer = utils.RingBuffer(1000) - _total_rx: int = 0 _total_tx: int = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls._maxlen = 1000 + cls.d = OrderedDict() return cls._instance - @wrapt.synchronized(lock) - def __iter__(self): - return iter(self.packet_list) - @wrapt.synchronized(lock) def rx(self, packet): """Add a packet that was received.""" self._total_rx += 1 - self.packet_list.append(packet) + self._add(packet) seen_list.SeenList().update_seen(packet) stats.APRSDStats().rx(packet) @@ -44,13 +39,44 @@ class PacketList: def tx(self, packet): """Add a packet that was received.""" self._total_tx += 1 - self.packet_list.append(packet) + self._add(packet) seen_list.SeenList().update_seen(packet) stats.APRSDStats().tx(packet) @wrapt.synchronized(lock) - def get(self): - return self.packet_list.get() + def add(self, packet): + self._add(packet) + + def _add(self, packet): + self[packet.key] = packet + + @property + def maxlen(self): + return self._maxlen + + @wrapt.synchronized(lock) + def find(self, packet): + return self.get(packet.key) + + def __getitem__(self, key): + # self.d.move_to_end(key) + return self.d[key] + + def __setitem__(self, key, value): + if key in self.d: + self.d.move_to_end(key) + elif len(self.d) == self.maxlen: + self.d.popitem(last=False) + self.d[key] = value + + def __delitem__(self, key): + del self.d[key] + + def __iter__(self): + return self.d.__iter__() + + def __len__(self): + return len(self.d) @wrapt.synchronized(lock) def total_rx(self): diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index e62706a..b512320 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -62,30 +62,22 @@ class PacketTrack(objectstore.ObjectStoreMixin): def __len__(self): return len(self.data) - @wrapt.synchronized(lock) - def __str__(self): - result = "{" - for key in self.data.keys(): - result += f"{key}: {str(self.data[key])}, " - result += "}" - return result - @wrapt.synchronized(lock) def add(self, packet): - key = int(packet.msgNo) + key = packet.msgNo self.data[key] = packet self.total_tracked += 1 @wrapt.synchronized(lock) - def get(self, id): - if id in self.data: - return self.data[id] + def get(self, key): + return self.data.get(key, None) @wrapt.synchronized(lock) - def remove(self, id): - key = int(id) - if key in self.data.keys(): + def remove(self, key): + try: del self.data[key] + except KeyError: + pass def restart(self): """Walk the list of messages and restart them if any.""" diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 8387d16..01ea308 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -67,11 +67,55 @@ class APRSDPluginRXThread(APRSDRXThread): """ def process_packet(self, *args, **kwargs): + """This handles the processing of an inbound packet. + + When a packet is received by the connected client object, + it sends the raw packet into this function. This function then + decodes the packet via the client, and then processes the packet. + Ack Packets are sent to the PluginProcessPacketThread for processing. + All other packets have to be checked as a dupe, and then only after + we haven't seen this packet before, do we send it to the + PluginProcessPacketThread for processing. + """ packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) packet.log(header="RX") - packets.PacketList().rx(packet) - self.packet_queue.put(packet) + + if isinstance(packet, packets.AckPacket): + # We don't need to drop AckPackets, those should be + # processed. + self.packet_queue.put(packet) + else: + # Make sure we aren't re-processing the same packet + # For RF based APRS Clients we can get duplicate packets + # So we need to track them and not process the dupes. + found = False + pkt_list = packets.PacketList() + try: + # Find the packet in the list of already seen packets + # Based on the packet.key + found = pkt_list.find(packet) + except KeyError: + found = False + + if not found: + # If we are in the process of already ack'ing + # a packet, we should drop the packet + # because it's a dupe within the time that + # we send the 3 acks for the packet. + pkt_list.rx(packet) + self.packet_queue.put(packet) + elif packet.timestamp - found.timestamp < 60: + # If the packet came in within 60 seconds of the + # Last time seeing the packet, then we drop it as a dupe. + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") + else: + LOG.warning( + f"Packet {packet.from_call}:{packet.msgNo} already tracked " + "but older than 60 seconds. processing.", + ) + pkt_list.rx(packet) + self.packet_queue.put(packet) class APRSDProcessPacketThread(APRSDThread): diff --git a/aprsd/utils/counter.py b/aprsd/utils/counter.py index e423bfb..5f569f4 100644 --- a/aprsd/utils/counter.py +++ b/aprsd/utils/counter.py @@ -37,7 +37,7 @@ class PacketCounter: @property @wrapt.synchronized(lock) def value(self): - return self.val.value + return str(self.val.value) @wrapt.synchronized(lock) def __repr__(self): diff --git a/aprsd/web/chat/static/js/send-message.js b/aprsd/web/chat/static/js/send-message.js index 2d93de2..6b2d3f2 100644 --- a/aprsd/web/chat/static/js/send-message.js +++ b/aprsd/web/chat/static/js/send-message.js @@ -34,7 +34,6 @@ function init_chat() { console.log("SENT: "); console.log(msg); if (cleared === false) { - console.log("CLEARING #msgsTabsDiv"); var msgsdiv = $("#msgsTabsDiv"); msgsdiv.html(''); cleared = true;