From 082db7325de50c508a96db225ad196e47fbaf806 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 14 Dec 2022 19:21:25 -0500 Subject: [PATCH] Started using dataclasses to describe packets This patch adds new Packet classes to describe the incoming packets parsed out from aprslib. --- aprsd/client.py | 8 ++ aprsd/cmds/listen.py | 62 ++++++--------- aprsd/cmds/webchat.py | 2 +- aprsd/packets.py | 174 ++++++++++++++++++++++++++++++++++++++---- aprsd/threads/rx.py | 121 +++++++++++++++-------------- dev-requirements.txt | 8 +- requirements.in | 2 + requirements.txt | 2 + 8 files changed, 265 insertions(+), 114 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 89f259b..7664d62 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -31,6 +31,7 @@ class Client: connected = False server_string = None + filter = None def __new__(cls, *args, **kwargs): """This magic turns this into a singleton.""" @@ -44,10 +45,17 @@ class Client: if config: self.config = config + def set_filter(self, filter): + self.filter = filter + if self._client: + self._client.set_filter(filter) + @property def client(self): if not self._client: self._client = self.setup_connection() + if self.filter: + self._client.set_filter(self.filter) return self._client def reset(self): diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 1afbcef..2945548 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -5,10 +5,10 @@ # python included libs import datetime import logging +import signal import sys import time -import aprslib import click from rich.console import Console @@ -16,7 +16,7 @@ from rich.console import Console import aprsd from aprsd import cli_helper, client, messaging, packets, stats, threads, utils from aprsd.aprsd import cli -from aprsd.utils import trace +from aprsd.threads import rx # setup the global logger @@ -37,6 +37,14 @@ def signal_handler(sig, frame): LOG.info(stats.APRSDStats()) +class APRSDListenThread(rx.APRSDRXThread): + def process_packet(self, *args, **kwargs): + raw = self._client.decode_packet(*args, **kwargs) + packet = packets.Packet.factory(raw) + LOG.debug(f"Got packet {packet}") + packet.log(header="RX Packet") + + @cli.command() @cli_helper.add_options(cli_helper.common_options) @click.option( @@ -74,6 +82,8 @@ def listen( o/obj1/obj2... - Object Filter Pass all objects with the exact name of obj1, obj2, ... (* wild card allowed)\n """ + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) config = ctx.obj["config"] if not aprs_login: @@ -109,26 +119,6 @@ def listen( packets.WatchList(config=config).load() packets.SeenList(config=config).load() - @trace.trace - def rx_packet(packet): - resp = packet.get("response", None) - if resp == "ack": - ack_num = packet.get("msgNo") - console.log(f"We saw an ACK {ack_num} Ignoring") - messaging.log_packet(packet) - else: - message = packet.get("message_text", None) - fromcall = packet["from"] - msg_number = packet.get("msgNo", "0") - messaging.log_message( - "Received Message", - packet["raw"], - message, - fromcall=fromcall, - ack=msg_number, - console=console, - ) - # Initialize the client factory and create # The correct client object ready for use client.ClientFactory.setup(config) @@ -140,29 +130,21 @@ def listen( # Creates the client object LOG.info("Creating client connection") aprs_client = client.factory.create() - console.log(aprs_client) + LOG.info(aprs_client) LOG.debug(f"Filter by '{filter}'") - aprs_client.client.set_filter(filter) + aprs_client.set_filter(filter) packets.PacketList(config=config) keepalive = threads.KeepAliveThread(config=config) keepalive.start() - while True: - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - # with console.status("Listening for packets"): - aprs_client.client.consumer(rx_packet, raw=False) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - aprs_client.reset() - except aprslib.exceptions.UnknownFormat: - LOG.error("Got a Bad packet") + LOG.debug("Create APRSDListenThread") + listen_thread = APRSDListenThread(threads.msg_queues, config=config) + LOG.debug("Start APRSDListenThread") + listen_thread.start() + LOG.debug("keepalive Join") + keepalive.join() + LOG.debug("listen_thread Join") + listen_thread.join() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 4bf35f8..f62b48b 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -178,7 +178,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): ) self.got_ack = True - def process_non_ack_packet(self, packet): + def process_our_message_packet(self, packet): LOG.info(f"process non ack PACKET {packet}") packet.get("addresse", None) fromcall = packet["from"] diff --git a/aprsd/packets.py b/aprsd/packets.py index 83d795d..e111fc6 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -1,8 +1,10 @@ +from dataclasses import asdict, dataclass, field import datetime import logging import threading import time +import dacite import wrapt from aprsd import utils @@ -14,6 +16,150 @@ LOG = logging.getLogger("APRSD") PACKET_TYPE_MESSAGE = "message" PACKET_TYPE_ACK = "ack" PACKET_TYPE_MICE = "mic-e" +PACKET_TYPE_WX = "weather" +PACKET_TYPE_UNKNOWN = "unknown" +PACKET_TYPE_STATUS = "status" + + +@dataclass +class Packet: + from_call: str + to_call: str + addresse: str = None + format: str = None + msgNo: str = None + packet_type: str = None + timestamp: float = field(default_factory=time.time) + raw: str = None + _raw_dict: dict = field(repr=True, default_factory=lambda: {}) + + @staticmethod + def factory(raw): + raw["_raw_dict"] = raw.copy() + translate_fields = { + "from": "from_call", + "to": "to_call", + } + # First translate some fields + for key in translate_fields: + if key in raw: + raw[translate_fields[key]] = raw[key] + del raw[key] + + if "addresse" in raw: + raw["to_call"] = raw["addresse"] + + class_lookup = { + PACKET_TYPE_WX: WeatherPacket, + PACKET_TYPE_MESSAGE: MessagePacket, + PACKET_TYPE_ACK: AckPacket, + PACKET_TYPE_MICE: MicEPacket, + PACKET_TYPE_STATUS: StatusPacket, + PACKET_TYPE_UNKNOWN: Packet, + } + packet_type = get_packet_type(raw) + raw["packet_type"] = packet_type + class_name = class_lookup[packet_type] + if packet_type == PACKET_TYPE_UNKNOWN: + # Try and figure it out here + if "latitude" in raw: + class_name = GPSPacket + + if packet_type == PACKET_TYPE_WX: + # the weather information is in a dict + # this brings those values out to the outer dict + for key in raw["weather"]: + raw[key] = raw["weather"][key] + + return dacite.from_dict(data_class=class_name, data=raw) + + def log(self, header=None): + """LOG a packet to the logfile.""" + asdict(self) + log_list = ["\n"] + if header: + log_list.append(f"{header} _______________") + log_list.append(f" Packet : {self.__class__.__name__}") + log_list.append(f" Raw : {self.raw}") + if self.to_call: + log_list.append(f" To : {self.to_call}") + if self.from_call: + log_list.append(f" From : {self.from_call}") + if hasattr(self, "path"): + log_list.append(f" Path : {'=>'.join(self.path)}") + if hasattr(self, "via"): + log_list.append(f" VIA : {self.via}") + + elif isinstance(self, MessagePacket): + log_list.append(f" Message : {self.message_text}") + + if self.msgNo: + log_list.append(f" Msg # : {self.msgNo}") + log_list.append(f"{header} _______________ Complete") + + + LOG.info(self) + LOG.info("\n".join(log_list)) + +@dataclass +class PathPacket(Packet): + path: list[str] = field(default_factory=list) + via: str = None + + +@dataclass +class AckPacket(PathPacket): + response: str = None + +@dataclass +class MessagePacket(PathPacket): + message_text: str = None + + +@dataclass +class StatusPacket(PathPacket): + status: str = None + timestamp: int = 0 + messagecapable: bool = False + comment: str = None + + +@dataclass +class GPSPacket(PathPacket): + latitude: float = 0.00 + longitude: float = 0.00 + altitude: float = 0.00 + rng: float = 0.00 + posambiguity: int = 0 + timestamp: int = 0 + comment: str = None + symbol: str = None + symbol_table: str = None + speed: float = 0.00 + course: int = 0 + + +@dataclass +class MicEPacket(GPSPacket): + messagecapable: bool = False + mbits: str = None + mtype: str = None + + +@dataclass +class WeatherPacket(GPSPacket): + symbol: str = "_" + wind_gust: float = 0.00 + temperature: float = 0.00 + rain_1h: float = 0.00 + rain_24h: float = 0.00 + rain_since_midnight: float = 0.00 + humidity: int = 0 + pressure: float = 0.00 + messagecapable: bool = False + comment: str = None + + class PacketList: @@ -45,11 +191,8 @@ class PacketList: @wrapt.synchronized(lock) def add(self, packet): - packet["ts"] = time.time() - if ( - "fromcall" in packet - and packet["fromcall"] == self.config["aprs"]["login"] - ): + packet.ts = time.time() + if (packet.from_call == self.config["aprs"]["login"]): self.total_tx += 1 else: self.total_recv += 1 @@ -116,7 +259,10 @@ class WatchList(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def update_seen(self, packet): - callsign = packet["from"] + if packet.addresse: + callsign = packet.addresse + else: + callsign = packet.from_call if self.callsign_in_watchlist(callsign): self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["packets"].append(packet) @@ -178,10 +324,8 @@ class SeenList(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def update_seen(self, packet): callsign = None - if "fromcall" in packet: - callsign = packet["fromcall"] - elif "from" in packet: - callsign = packet["from"] + if packet.from_call: + callsign = packet.from_call else: LOG.warning(f"Can't find FROM in packet {packet}") return @@ -200,12 +344,16 @@ def get_packet_type(packet): msg_format = packet.get("format", None) msg_response = packet.get("response", None) packet_type = "unknown" - if msg_format == "message": - packet_type = PACKET_TYPE_MESSAGE - elif msg_response == "ack": + if msg_format == "message" and msg_response == "ack": packet_type = PACKET_TYPE_ACK + elif msg_format == "message": + packet_type = PACKET_TYPE_MESSAGE elif msg_format == "mic-e": packet_type = PACKET_TYPE_MICE + elif msg_format == "status": + packet_type = PACKET_TYPE_STATUS + elif packet.get("symbol", None) == "_": + packet_type = PACKET_TYPE_WX return packet_type diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 5529bfa..bf85a72 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -23,7 +23,6 @@ class APRSDRXThread(APRSDThread): client.factory.create().client.stop() def loop(self): - # setup the consumer of messages and block until a messages try: # This will register a packet consumer with aprslib @@ -65,7 +64,12 @@ class APRSDPluginRXThread(APRSDRXThread): processing in the PluginProcessPacketThread. """ def process_packet(self, *args, **kwargs): - packet = self._client.decode_packet(*args, **kwargs) + raw = self._client.decode_packet(*args, **kwargs) + #LOG.debug(raw) + packet = packets.Packet.factory(raw.copy()) + packet.log(header="RX Packet") + #LOG.debug(packet) + del raw thread = APRSDPluginProcessPacketThread( config=self.config, packet=packet, @@ -84,18 +88,18 @@ class APRSDProcessPacketThread(APRSDThread): def __init__(self, config, packet): self.config = config self.packet = packet - name = self.packet["raw"][:10] + name = self.packet.raw[:10] super().__init__(f"RXPKT-{name}") def process_ack_packet(self, packet): - ack_num = packet.get("msgNo") + ack_num = packet.msgNo LOG.info(f"Got ack for message {ack_num}") messaging.log_message( "RXACK", - packet["raw"], + packet.raw, None, ack=ack_num, - fromcall=packet["from"], + fromcall=packet.from_call, ) tracker = messaging.MsgTrack() tracker.remove(ack_num) @@ -106,56 +110,60 @@ class APRSDProcessPacketThread(APRSDThread): """Process a packet received from aprs-is server.""" packet = self.packet packets.PacketList().add(packet) + our_call = self.config["aprsd"]["callsign"].lower() - fromcall = packet["from"] - tocall = packet.get("addresse", None) - msg = packet.get("message_text", None) - msg_id = packet.get("msgNo", "0") - msg_response = packet.get("response", None) - # LOG.debug(f"Got packet from '{fromcall}' - {packet}") + from_call = packet.from_call + if packet.addresse: + to_call = packet.addresse + else: + to_call = packet.to_call + msg_id = packet.msgNo # We don't put ack packets destined for us through the # plugins. + wl = packets.WatchList() + wl.update_seen(packet) if ( - tocall - and tocall.lower() == self.config["aprsd"]["callsign"].lower() - and msg_response == "ack" + isinstance(packet, packets.AckPacket) + and packet.addresse.lower() == our_call ): self.process_ack_packet(packet) else: - # It's not an ACK for us, so lets run it through - # the plugins. - messaging.log_message( - "Received Message", - packet["raw"], - msg, - fromcall=fromcall, - msg_num=msg_id, - ) - # Only ack messages that were sent directly to us - if ( - tocall - and tocall.lower() == self.config["aprsd"]["callsign"].lower() - ): - stats.APRSDStats().msgs_rx_inc() - # let any threads do their thing, then ack - # send an ack last - ack = messaging.AckMessage( - self.config["aprsd"]["callsign"], - fromcall, - msg_id=msg_id, - ) - ack.send() + if isinstance(packet, packets.MessagePacket): + if to_call and to_call.lower() == our_call: + # It's a MessagePacket and it's for us! + stats.APRSDStats().msgs_rx_inc() + # let any threads do their thing, then ack + # send an ack last + ack = messaging.AckMessage( + self.config["aprsd"]["callsign"], + from_call, + msg_id=msg_id, + ) + ack.send() - self.process_non_ack_packet(packet) + self.process_our_message_packet(packet) + else: + # Packet wasn't meant for us! + self.process_other_packet(packet, for_us=False) else: - LOG.info("Packet was not for us.") + self.process_other_packet( + packet, for_us=(to_call.lower() == our_call), + ) LOG.debug("Packet processing complete") @abc.abstractmethod - def process_non_ack_packet(self, *args, **kwargs): - """Ack packets already dealt with here.""" + def process_our_message_packet(self, *args, **kwargs): + """Process a MessagePacket destined for us!""" + + def process_other_packet(self, packet, for_us=False): + """Process an APRS Packet that isn't a message or ack""" + if not for_us: + LOG.info("Got a packet not meant for us.") + else: + LOG.info("Got a non AckPacket/MessagePacket") + LOG.info(packet) class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): @@ -163,18 +171,19 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): This is the main aprsd server plugin processing thread.""" - def process_non_ack_packet(self, packet): + def process_our_message_packet(self, packet): """Send the packet through the plugins.""" - fromcall = packet["from"] - tocall = packet.get("addresse", None) - msg = packet.get("message_text", None) - packet.get("msgNo", "0") - packet.get("response", None) + from_call = packet.from_call + if packet.addresse: + to_call = packet.addresse + else: + to_call = None + # msg = packet.get("message_text", None) + # packet.get("msgNo", "0") + # packet.get("response", None) pm = plugin.PluginManager() try: results = pm.run(packet) - wl = packets.WatchList() - wl.update_seen(packet) replied = False for reply in results: if isinstance(reply, list): @@ -187,7 +196,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): else: msg = messaging.TextMessage( self.config["aprsd"]["callsign"], - fromcall, + from_call, subreply, ) msg.send() @@ -207,18 +216,18 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): msg = messaging.TextMessage( self.config["aprsd"]["callsign"], - fromcall, + from_call, reply, ) msg.send() # If the message was for us and we didn't have a # response, then we send a usage statement. - if tocall == self.config["aprsd"]["callsign"] and not replied: + if to_call == self.config["aprsd"]["callsign"] and not replied: LOG.warning("Sending help!") msg = messaging.TextMessage( self.config["aprsd"]["callsign"], - fromcall, + from_call, "Unknown command! Send 'help' message for help", ) msg.send() @@ -226,11 +235,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): LOG.error("Plugin failed!!!") LOG.exception(ex) # Do we need to send a reply? - if tocall == self.config["aprsd"]["callsign"]: + if to_call == self.config["aprsd"]["callsign"]: reply = "A Plugin failed! try again?" msg = messaging.TextMessage( self.config["aprsd"]["callsign"], - fromcall, + from_call, reply, ) msg.send() diff --git a/dev-requirements.txt b/dev-requirements.txt index 2472816..93d34eb 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -4,7 +4,7 @@ # # pip-compile --annotation-style=line --resolver=backtracking dev-requirements.in # -add-trailing-comma==2.3.0 # via gray +add-trailing-comma==2.4.0 # via gray alabaster==0.7.12 # via sphinx attrs==22.1.0 # via jsonschema, pytest autoflake==1.5.3 # via gray @@ -34,7 +34,7 @@ imagesize==1.4.1 # via sphinx importlib-metadata==5.1.0 # via sphinx importlib-resources==5.10.1 # via fixit iniconfig==1.1.1 # via pytest -isort==5.10.1 # via -r dev-requirements.in, gray +isort==5.11.2 # via -r dev-requirements.in, gray jinja2==3.1.2 # via sphinx jsonschema==4.17.3 # via fixit libcst==0.4.9 # via fixit @@ -47,7 +47,7 @@ packaging==22.0 # via build, pyproject-api, pytest, sphinx, tox pathspec==0.10.3 # via black pep517==0.13.0 # via build pep8-naming==0.13.2 # via -r dev-requirements.in -pip-tools==6.11.0 # via -r dev-requirements.in +pip-tools==6.12.0 # via -r dev-requirements.in platformdirs==2.6.0 # via black, tox, virtualenv pluggy==1.0.0 # via pytest, tox pre-commit==2.20.0 # via -r dev-requirements.in @@ -74,7 +74,7 @@ sphinxcontrib-serializinghtml==1.1.5 # via sphinx tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade toml==0.10.2 # via autoflake, pre-commit tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox -tox==4.0.8 # via -r dev-requirements.in +tox==4.0.9 # via -r dev-requirements.in typing-extensions==4.4.0 # via black, libcst, mypy, typing-inspect typing-inspect==0.8.0 # via libcst unify==0.5 # via gray diff --git a/requirements.in b/requirements.in index 368f239..f80c069 100644 --- a/requirements.in +++ b/requirements.in @@ -27,3 +27,5 @@ attrs==22.1.0 # for mobile checking user-agents pyopenssl +dataclasses +dacite2 diff --git a/requirements.txt b/requirements.txt index 2dd4a1c..a0e3277 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,6 +17,8 @@ click==8.1.3 # via -r requirements.in, click-completion, flask click-completion==0.5.2 # via -r requirements.in commonmark==0.9.1 # via rich cryptography==38.0.4 # via pyopenssl +dacite2==2.0.0 # via -r requirements.in +dataclasses==0.6 # via -r requirements.in dnspython==2.2.1 # via eventlet eventlet==0.33.2 # via -r requirements.in flask==2.1.2 # via -r requirements.in, flask-classful, flask-httpauth, flask-socketio