From e386e91f6e58e63a405eb2365809501c52a05abd Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 20 Mar 2024 21:46:43 -0400 Subject: [PATCH] Eliminated need for from_aprslib_dict This patch eliminates the need for a custom static method on each Packetclass to convert an aprslib raw decoded dictionary -> correct Packet class. This now uses the built in dataclasses_json from_dict() mixin with an override for both the WeatherPacket and the ThirdPartyPacket. This patch also adds the TelemetryPacket and adds some missing members to a few of the classes from test runs decoding all packets from APRS-IS -> Packet classes. Also adds some verification for packets in test_packets --- aprsd/packets/core.py | 146 ++++++++++++++++++++++-------------------- tests/test_packets.py | 29 +++++++++ 2 files changed, 104 insertions(+), 71 deletions(-) diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 9337844..93eaa50 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -4,13 +4,17 @@ import logging import re import time # Due to a failure in python 3.8 -from typing import List, Optional +from typing import Any, List, Optional, Type, TypeVar, Union -from dataclasses_json import DataClassJsonMixin +from dataclasses_json import DataClassJsonMixin, dataclass_json from aprsd.utils import counter +# For mypy to be happy +A = TypeVar("A", bound="DataClassJsonMixin") +Json = Union[dict, list, str, int, float, bool, None] + LOG = logging.getLogger("APRSD") PACKET_TYPE_MESSAGE = "message" @@ -24,6 +28,7 @@ PACKET_TYPE_UNKNOWN = "unknown" PACKET_TYPE_STATUS = "status" PACKET_TYPE_BEACON = "beacon" PACKET_TYPE_THIRDPARTY = "thirdparty" +PACKET_TYPE_TELEMETRY = "telemetry-message" PACKET_TYPE_UNCOMPRESSED = "uncompressed" NO_DATE = datetime(1900, 10, 24) @@ -69,8 +74,9 @@ def _translate_fields(raw: dict) -> dict: return raw +@dataclass_json @dataclass(unsafe_hash=True) -class Packet(DataClassJsonMixin): +class Packet: _type: str = field(default="Packet", hash=False) from_call: Optional[str] = field(default=None) to_call: Optional[str] = field(default=None) @@ -101,13 +107,13 @@ class Packet(DataClassJsonMixin): def json(self) -> str: """get the json formated string""" # comes from the DataClassJsonMixin - return self.to_json() + return self.to_json() # type: ignore @property def dict(self) -> dict: """get the dict formated string""" # comes from the DataClassJsonMixin - return self.to_dict() + return self.to_dict() # type: ignore def get(self, key: str, default: Optional[str] = None): """Emulate a getter on a dict.""" @@ -214,16 +220,12 @@ class Packet(DataClassJsonMixin): return repr +@dataclass_json @dataclass(unsafe_hash=True) class AckPacket(Packet): _type: str = field(default="AckPacket", hash=False) response: Optional[str] = field(default=None) - @staticmethod - def from_aprslib_dict(raw: dict) -> "AckPacket": - raw = _translate_fields(raw) - return AckPacket(**raw) - def __post__init__(self): if self.response: LOG.warning("Response set!") @@ -232,16 +234,12 @@ class AckPacket(Packet): self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}" +@dataclass_json @dataclass(unsafe_hash=True) class RejectPacket(Packet): _type: str = field(default="RejectPacket", hash=False) response: Optional[str] = field(default=None) - @staticmethod - def from_aprslib_dict(raw: dict) -> "RejectPacket": - raw = _translate_fields(raw) - return RejectPacket(**raw) - def __post__init__(self): if self.response: LOG.warning("Response set!") @@ -250,6 +248,7 @@ class RejectPacket(Packet): self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}" +@dataclass_json @dataclass(unsafe_hash=True) class MessagePacket(Packet): _type: str = field(default="MessagePacket", hash=False) @@ -275,28 +274,21 @@ class MessagePacket(Packet): str(self.msgNo), ) - @staticmethod - def from_aprslib_dict(raw: dict) -> "MessagePacket": - raw = _translate_fields(raw) - return MessagePacket(**raw) - +@dataclass_json @dataclass(unsafe_hash=True) class StatusPacket(Packet): _type: str = field(default="StatusPacket", hash=False) status: Optional[str] = field(default=None) messagecapable: bool = field(default=False) comment: Optional[str] = field(default=None) - - @staticmethod - def from_aprslib_dict(raw: dict) -> "StatusPacket": - raw = _translate_fields(raw) - return StatusPacket(**raw) + raw_timestamp: Optional[str] = field(default=None) def _build_payload(self): raise NotImplementedError +@dataclass_json @dataclass(unsafe_hash=True) class GPSPacket(Packet): _type: str = field(default="GPSPacket", hash=False) @@ -315,11 +307,15 @@ class GPSPacket(Packet): alive: Optional[bool] = field(default=None) course: Optional[int] = field(default=None) speed: Optional[float] = field(default=None) - - @staticmethod - def from_aprslib_dict(raw: dict) -> "GPSPacket": - raw = _translate_fields(raw) - return GPSPacket(**raw) + phg: Optional[str] = field(default=None) + phg_power: Optional[int] = field(default=None) + phg_height: Optional[float] = field(default=None) + phg_gain: Optional[int] = field(default=None) + phg_dir: Optional[str] = field(default=None) + phg_range: Optional[float] = field(default=None) + phg_rate: Optional[int] = field(default=None) + # http://www.aprs.org/datum.txt + daodatumbyte: Optional[str] = field(default=None) def decdeg2dms(self, degrees_decimal): is_positive = degrees_decimal >= 0 @@ -425,15 +421,11 @@ class GPSPacket(Packet): ) +@dataclass_json @dataclass(unsafe_hash=True) class BeaconPacket(GPSPacket): _type: str = field(default="BeaconPacket", hash=False) - @staticmethod - def from_aprslib_dict(raw: dict) -> "BeaconPacket": - raw = _translate_fields(raw) - return BeaconPacket(**raw) - def _build_payload(self): """The payload is the non headers portion of the packet.""" time_zulu = self._build_time_zulu() @@ -452,23 +444,37 @@ class BeaconPacket(GPSPacket): ) +@dataclass_json @dataclass class MicEPacket(GPSPacket): _type: str = field(default="MicEPacket", hash=False) messagecapable: bool = False mbits: Optional[str] = None mtype: Optional[str] = None + telemetry: Optional[dict] = field(default=None) # in MPH speed: float = 0.00 # 0 to 360 course: int = 0 - @staticmethod - def from_aprslib_dict(raw: dict) -> "MicEPacket": - raw = _translate_fields(raw) - return MicEPacket(**raw) + +@dataclass_json +@dataclass +class TelemetryPacket(GPSPacket): + _type: str = field(default="TelemetryPacket", hash=False) + messagecapable: bool = False + mbits: Optional[str] = None + mtype: Optional[str] = None + telemetry: Optional[dict] = field(default=None) + tPARM: Optional[list[str]] = field(default=None) # noqa: N815 + tUNIT: Optional[list[str]] = field(default=None) # noqa: N815 + # in MPH + speed: float = 0.00 + # 0 to 360 + course: int = 0 +@dataclass_json @dataclass class ObjectPacket(GPSPacket): _type: str = field(default="ObjectPacket", hash=False) @@ -480,11 +486,6 @@ class ObjectPacket(GPSPacket): # 0 to 360 course: int = 0 - @staticmethod - def from_aprslib_dict(raw: dict) -> "ObjectPacket": - raw = _translate_fields(raw) - return ObjectPacket(**raw) - def _build_payload(self): time_zulu = self._build_time_zulu() lat = self.convert_latitude(self.latitude) @@ -516,7 +517,7 @@ class ObjectPacket(GPSPacket): @dataclass() -class WeatherPacket(GPSPacket): +class WeatherPacket(GPSPacket, DataClassJsonMixin): _type: str = field(default="WeatherPacket", hash=False) symbol: str = "_" wind_speed: float = 0.00 @@ -535,13 +536,7 @@ class WeatherPacket(GPSPacket): course: Optional[int] = field(default=None) speed: Optional[float] = field(default=None) - @staticmethod - def from_aprslib_dict(raw: dict) -> "WeatherPacket": - """Create from a dictionary that has come directly from aprslib parse""" - # Because from is a reserved word in python, we need to translate it - # from -> from_call and to -> to_call - raw = _translate_fields(raw) - + def _translate(self, raw: dict) -> dict: for key in raw["weather"]: raw[key] = raw["weather"][key] @@ -579,7 +574,13 @@ class WeatherPacket(GPSPacket): del raw["course"] del raw["weather"] - return WeatherPacket(**raw) + return raw + + @classmethod + def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: + """Create from a dictionary that has come directly from aprslib parse""" + raw = cls._translate(cls, kvs) # type: ignore + return super().from_dict(raw) def _build_payload(self): """Build an uncompressed weather packet @@ -642,7 +643,7 @@ class WeatherPacket(GPSPacket): @dataclass() -class ThirdPartyPacket(Packet): +class ThirdPartyPacket(Packet, DataClassJsonMixin): _type: str = "ThirdPartyPacket" # Holds the encapsulated packet subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False) @@ -658,15 +659,11 @@ class ThirdPartyPacket(Packet): return repr_str - @staticmethod - def from_aprslib_dict(raw: dict) -> "ThirdPartyPacket": - """Create from a dictionary that has come directly from aprslib parse""" - # Because from is a reserved word in python, we need to translate it - # from -> from_call and to -> to_call - raw = _translate_fields(raw) - subpacket = raw.get("subpacket") - del raw["subpacket"] - return ThirdPartyPacket(**raw, subpacket=factory(subpacket)) + @classmethod + def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: + obj = super().from_dict(kvs) + obj.subpacket = factory(obj.subpacket) # type: ignore + return obj TYPE_LOOKUP: dict[str, type[Packet]] = { @@ -681,6 +678,7 @@ TYPE_LOOKUP: dict[str, type[Packet]] = { PACKET_TYPE_BEACON: BeaconPacket, PACKET_TYPE_UNKNOWN: Packet, PACKET_TYPE_THIRDPARTY: ThirdPartyPacket, + PACKET_TYPE_TELEMETRY: TelemetryPacket, } OBJ_LOOKUP: dict[str, type[Packet]] = { @@ -694,6 +692,7 @@ OBJ_LOOKUP: dict[str, type[Packet]] = { "BeaconPacket": BeaconPacket, "WeatherPacket": WeatherPacket, "ThirdPartyPacket": ThirdPartyPacket, + "TelemetryPacket": TelemetryPacket, } @@ -717,6 +716,8 @@ def get_packet_type(packet: dict) -> str: packet_type = PACKET_TYPE_STATUS elif pkt_format == PACKET_TYPE_BEACON: packet_type = PACKET_TYPE_BEACON + elif pkt_format == PACKET_TYPE_TELEMETRY: + packet_type = PACKET_TYPE_TELEMETRY elif pkt_format == PACKET_TYPE_WX: packet_type = PACKET_TYPE_WEATHER elif pkt_format == PACKET_TYPE_UNCOMPRESSED: @@ -739,11 +740,11 @@ def is_ack_packet(packet: dict) -> bool: return get_packet_type(packet) == PACKET_TYPE_ACK -def is_mice_packet(packet: dict) -> bool: +def is_mice_packet(packet: dict[Any, Any]) -> bool: return get_packet_type(packet) == PACKET_TYPE_MICE -def factory(raw_packet: dict) -> type[Packet]: +def factory(raw_packet: dict[Any, Any]) -> type[Packet]: """Factory method to create a packet from a raw packet string.""" raw = raw_packet if "_type" in raw: @@ -756,20 +757,23 @@ def factory(raw_packet: dict) -> type[Packet]: packet_type = get_packet_type(raw) raw["packet_type"] = packet_type - class_name = TYPE_LOOKUP[packet_type] + packet_class = TYPE_LOOKUP[packet_type] if packet_type == PACKET_TYPE_WX: # the weather information is in a dict # this brings those values out to the outer dict - class_name = WeatherPacket + packet_class = WeatherPacket elif packet_type == PACKET_TYPE_OBJECT and "weather" in raw: - class_name = WeatherPacket + packet_class = WeatherPacket elif packet_type == PACKET_TYPE_UNKNOWN: # Try and figure it out here if "latitude" in raw: - class_name = GPSPacket + packet_class = GPSPacket else: + LOG.error(f"Unknown packet type {packet_type}") + LOG.error(raw) raise Exception(f"Unknown packet type {packet_type} {raw}") - print(f"factory({packet_type}): {class_name} {raw}") + # LOG.info(f"factory({packet_type}):({raw.get('from_call')}) {packet_class.__class__} {raw}") + # LOG.error(packet_class) - return class_name.from_aprslib_dict(raw) + return packet_class().from_dict(raw) # type: ignore diff --git a/tests/test_packets.py b/tests/test_packets.py index 9080591..e8e3679 100644 --- a/tests/test_packets.py +++ b/tests/test_packets.py @@ -110,6 +110,11 @@ class TestPluginBase(unittest.TestCase): packet = packets.factory(packet_dict) self.assertIsInstance(packet, packets.BeaconPacket) + packet_raw = "kd8mey-10>APRS,TCPIP*,qAC,T2SYDNEY:=4247.80N/08539.00WrPHG1210/Making 220 Great Again Allstar# 552191" + packet_dict = aprslib.parse(packet_raw) + packet = packets.factory(packet_dict) + self.assertIsInstance(packet, packets.BeaconPacket) + def test_reject_factory(self): """Test to ensure a reject packet is created.""" packet_raw = "HB9FDL-1>APK102,HB9FM-4*,WIDE2,qAR,HB9FEF-11::REPEAT :rej4139" @@ -117,6 +122,12 @@ class TestPluginBase(unittest.TestCase): packet = packets.factory(packet_dict) self.assertIsInstance(packet, packets.RejectPacket) + self.assertEqual("4139", packet.msgNo) + self.assertEqual("HB9FDL-1", packet.from_call) + self.assertEqual("REPEAT", packet.to_call) + self.assertEqual("reject", packet.packet_type) + self.assertIsNone(packet.payload) + def test_thirdparty_factory(self): """Test to ensure a third party packet is created.""" packet_raw = "GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW" @@ -131,8 +142,26 @@ class TestPluginBase(unittest.TestCase): packet = packets.factory(packet_dict) self.assertIsInstance(packet, packets.WeatherPacket) + self.assertEqual(28.88888888888889, packet.temperature) + self.assertEqual(0.0, packet.rain_1h) + self.assertEqual(1015.7, packet.pressure) + self.assertEqual(80, packet.humidity) + self.assertEqual(745, packet.luminosity) + self.assertEqual(3.0, packet.wind_speed) + self.assertEqual(232, packet.wind_direction) + self.assertEqual(6.0, packet.wind_gust) + self.assertEqual(29.899, packet.latitude) + self.assertEqual(-84.39616666666667, packet.longitude) + def test_mice_factory(self): packet_raw = 'kh2sr-15>S7TSYR,WIDE1-1,WIDE2-1,qAO,KO6KL-1:`1`7\x1c\x1c.#/`"4,}QuirkyQRP 4.6V 35.3C S06' packet_dict = aprslib.parse(packet_raw) packet = packets.factory(packet_dict) self.assertIsInstance(packet, packets.MicEPacket) + + # Packet with telemetry and DAO + # http://www.aprs.org/datum.txt + packet_raw = 'KD9YIL>T0PX9W,WIDE1-1,WIDE2-1,qAO,NU9R-10:`sB,l#P>/\'"6+}|#*%U\'a|!whl!|3' + packet_dict = aprslib.parse(packet_raw) + packet = packets.factory(packet_dict) + self.assertIsInstance(packet, packets.MicEPacket)