Eliminated need for from_aprslib_dict

This patch eliminates the need for a custom
static method on each Packetclass to convert an aprslib
raw decoded dictionary -> correct Packet class.

This now uses the built in dataclasses_json from_dict()
mixin with an override for both the WeatherPacket and
the ThirdPartyPacket.

This patch also adds the TelemetryPacket and adds some
missing members to a few of the classes from test runs
decoding all packets from APRS-IS -> Packet classes.

Also adds some verification for packets in test_packets
This commit is contained in:
Hemna 2024-03-20 21:46:43 -04:00
parent 386d2bea62
commit e386e91f6e
2 changed files with 104 additions and 71 deletions

View File

@ -4,13 +4,17 @@ import logging
import re
import time
# Due to a failure in python 3.8
from typing import List, Optional
from typing import Any, List, Optional, Type, TypeVar, Union
from dataclasses_json import DataClassJsonMixin
from dataclasses_json import DataClassJsonMixin, dataclass_json
from aprsd.utils import counter
# For mypy to be happy
A = TypeVar("A", bound="DataClassJsonMixin")
Json = Union[dict, list, str, int, float, bool, None]
LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message"
@ -24,6 +28,7 @@ PACKET_TYPE_UNKNOWN = "unknown"
PACKET_TYPE_STATUS = "status"
PACKET_TYPE_BEACON = "beacon"
PACKET_TYPE_THIRDPARTY = "thirdparty"
PACKET_TYPE_TELEMETRY = "telemetry-message"
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
NO_DATE = datetime(1900, 10, 24)
@ -69,8 +74,9 @@ def _translate_fields(raw: dict) -> dict:
return raw
@dataclass_json
@dataclass(unsafe_hash=True)
class Packet(DataClassJsonMixin):
class Packet:
_type: str = field(default="Packet", hash=False)
from_call: Optional[str] = field(default=None)
to_call: Optional[str] = field(default=None)
@ -101,13 +107,13 @@ class Packet(DataClassJsonMixin):
def json(self) -> str:
"""get the json formated string"""
# comes from the DataClassJsonMixin
return self.to_json()
return self.to_json() # type: ignore
@property
def dict(self) -> dict:
"""get the dict formated string"""
# comes from the DataClassJsonMixin
return self.to_dict()
return self.to_dict() # type: ignore
def get(self, key: str, default: Optional[str] = None):
"""Emulate a getter on a dict."""
@ -214,16 +220,12 @@ class Packet(DataClassJsonMixin):
return repr
@dataclass_json
@dataclass(unsafe_hash=True)
class AckPacket(Packet):
_type: str = field(default="AckPacket", hash=False)
response: Optional[str] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "AckPacket":
raw = _translate_fields(raw)
return AckPacket(**raw)
def __post__init__(self):
if self.response:
LOG.warning("Response set!")
@ -232,16 +234,12 @@ class AckPacket(Packet):
self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}"
@dataclass_json
@dataclass(unsafe_hash=True)
class RejectPacket(Packet):
_type: str = field(default="RejectPacket", hash=False)
response: Optional[str] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "RejectPacket":
raw = _translate_fields(raw)
return RejectPacket(**raw)
def __post__init__(self):
if self.response:
LOG.warning("Response set!")
@ -250,6 +248,7 @@ class RejectPacket(Packet):
self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}"
@dataclass_json
@dataclass(unsafe_hash=True)
class MessagePacket(Packet):
_type: str = field(default="MessagePacket", hash=False)
@ -275,28 +274,21 @@ class MessagePacket(Packet):
str(self.msgNo),
)
@staticmethod
def from_aprslib_dict(raw: dict) -> "MessagePacket":
raw = _translate_fields(raw)
return MessagePacket(**raw)
@dataclass_json
@dataclass(unsafe_hash=True)
class StatusPacket(Packet):
_type: str = field(default="StatusPacket", hash=False)
status: Optional[str] = field(default=None)
messagecapable: bool = field(default=False)
comment: Optional[str] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "StatusPacket":
raw = _translate_fields(raw)
return StatusPacket(**raw)
raw_timestamp: Optional[str] = field(default=None)
def _build_payload(self):
raise NotImplementedError
@dataclass_json
@dataclass(unsafe_hash=True)
class GPSPacket(Packet):
_type: str = field(default="GPSPacket", hash=False)
@ -315,11 +307,15 @@ class GPSPacket(Packet):
alive: Optional[bool] = field(default=None)
course: Optional[int] = field(default=None)
speed: Optional[float] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "GPSPacket":
raw = _translate_fields(raw)
return GPSPacket(**raw)
phg: Optional[str] = field(default=None)
phg_power: Optional[int] = field(default=None)
phg_height: Optional[float] = field(default=None)
phg_gain: Optional[int] = field(default=None)
phg_dir: Optional[str] = field(default=None)
phg_range: Optional[float] = field(default=None)
phg_rate: Optional[int] = field(default=None)
# http://www.aprs.org/datum.txt
daodatumbyte: Optional[str] = field(default=None)
def decdeg2dms(self, degrees_decimal):
is_positive = degrees_decimal >= 0
@ -425,15 +421,11 @@ class GPSPacket(Packet):
)
@dataclass_json
@dataclass(unsafe_hash=True)
class BeaconPacket(GPSPacket):
_type: str = field(default="BeaconPacket", hash=False)
@staticmethod
def from_aprslib_dict(raw: dict) -> "BeaconPacket":
raw = _translate_fields(raw)
return BeaconPacket(**raw)
def _build_payload(self):
"""The payload is the non headers portion of the packet."""
time_zulu = self._build_time_zulu()
@ -452,23 +444,37 @@ class BeaconPacket(GPSPacket):
)
@dataclass_json
@dataclass
class MicEPacket(GPSPacket):
_type: str = field(default="MicEPacket", hash=False)
messagecapable: bool = False
mbits: Optional[str] = None
mtype: Optional[str] = None
telemetry: Optional[dict] = field(default=None)
# in MPH
speed: float = 0.00
# 0 to 360
course: int = 0
@staticmethod
def from_aprslib_dict(raw: dict) -> "MicEPacket":
raw = _translate_fields(raw)
return MicEPacket(**raw)
@dataclass_json
@dataclass
class TelemetryPacket(GPSPacket):
_type: str = field(default="TelemetryPacket", hash=False)
messagecapable: bool = False
mbits: Optional[str] = None
mtype: Optional[str] = None
telemetry: Optional[dict] = field(default=None)
tPARM: Optional[list[str]] = field(default=None) # noqa: N815
tUNIT: Optional[list[str]] = field(default=None) # noqa: N815
# in MPH
speed: float = 0.00
# 0 to 360
course: int = 0
@dataclass_json
@dataclass
class ObjectPacket(GPSPacket):
_type: str = field(default="ObjectPacket", hash=False)
@ -480,11 +486,6 @@ class ObjectPacket(GPSPacket):
# 0 to 360
course: int = 0
@staticmethod
def from_aprslib_dict(raw: dict) -> "ObjectPacket":
raw = _translate_fields(raw)
return ObjectPacket(**raw)
def _build_payload(self):
time_zulu = self._build_time_zulu()
lat = self.convert_latitude(self.latitude)
@ -516,7 +517,7 @@ class ObjectPacket(GPSPacket):
@dataclass()
class WeatherPacket(GPSPacket):
class WeatherPacket(GPSPacket, DataClassJsonMixin):
_type: str = field(default="WeatherPacket", hash=False)
symbol: str = "_"
wind_speed: float = 0.00
@ -535,13 +536,7 @@ class WeatherPacket(GPSPacket):
course: Optional[int] = field(default=None)
speed: Optional[float] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "WeatherPacket":
"""Create from a dictionary that has come directly from aprslib parse"""
# Because from is a reserved word in python, we need to translate it
# from -> from_call and to -> to_call
raw = _translate_fields(raw)
def _translate(self, raw: dict) -> dict:
for key in raw["weather"]:
raw[key] = raw["weather"][key]
@ -579,7 +574,13 @@ class WeatherPacket(GPSPacket):
del raw["course"]
del raw["weather"]
return WeatherPacket(**raw)
return raw
@classmethod
def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A:
"""Create from a dictionary that has come directly from aprslib parse"""
raw = cls._translate(cls, kvs) # type: ignore
return super().from_dict(raw)
def _build_payload(self):
"""Build an uncompressed weather packet
@ -642,7 +643,7 @@ class WeatherPacket(GPSPacket):
@dataclass()
class ThirdPartyPacket(Packet):
class ThirdPartyPacket(Packet, DataClassJsonMixin):
_type: str = "ThirdPartyPacket"
# Holds the encapsulated packet
subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False)
@ -658,15 +659,11 @@ class ThirdPartyPacket(Packet):
return repr_str
@staticmethod
def from_aprslib_dict(raw: dict) -> "ThirdPartyPacket":
"""Create from a dictionary that has come directly from aprslib parse"""
# Because from is a reserved word in python, we need to translate it
# from -> from_call and to -> to_call
raw = _translate_fields(raw)
subpacket = raw.get("subpacket")
del raw["subpacket"]
return ThirdPartyPacket(**raw, subpacket=factory(subpacket))
@classmethod
def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A:
obj = super().from_dict(kvs)
obj.subpacket = factory(obj.subpacket) # type: ignore
return obj
TYPE_LOOKUP: dict[str, type[Packet]] = {
@ -681,6 +678,7 @@ TYPE_LOOKUP: dict[str, type[Packet]] = {
PACKET_TYPE_BEACON: BeaconPacket,
PACKET_TYPE_UNKNOWN: Packet,
PACKET_TYPE_THIRDPARTY: ThirdPartyPacket,
PACKET_TYPE_TELEMETRY: TelemetryPacket,
}
OBJ_LOOKUP: dict[str, type[Packet]] = {
@ -694,6 +692,7 @@ OBJ_LOOKUP: dict[str, type[Packet]] = {
"BeaconPacket": BeaconPacket,
"WeatherPacket": WeatherPacket,
"ThirdPartyPacket": ThirdPartyPacket,
"TelemetryPacket": TelemetryPacket,
}
@ -717,6 +716,8 @@ def get_packet_type(packet: dict) -> str:
packet_type = PACKET_TYPE_STATUS
elif pkt_format == PACKET_TYPE_BEACON:
packet_type = PACKET_TYPE_BEACON
elif pkt_format == PACKET_TYPE_TELEMETRY:
packet_type = PACKET_TYPE_TELEMETRY
elif pkt_format == PACKET_TYPE_WX:
packet_type = PACKET_TYPE_WEATHER
elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
@ -739,11 +740,11 @@ def is_ack_packet(packet: dict) -> bool:
return get_packet_type(packet) == PACKET_TYPE_ACK
def is_mice_packet(packet: dict) -> bool:
def is_mice_packet(packet: dict[Any, Any]) -> bool:
return get_packet_type(packet) == PACKET_TYPE_MICE
def factory(raw_packet: dict) -> type[Packet]:
def factory(raw_packet: dict[Any, Any]) -> type[Packet]:
"""Factory method to create a packet from a raw packet string."""
raw = raw_packet
if "_type" in raw:
@ -756,20 +757,23 @@ def factory(raw_packet: dict) -> type[Packet]:
packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type
class_name = TYPE_LOOKUP[packet_type]
packet_class = TYPE_LOOKUP[packet_type]
if packet_type == PACKET_TYPE_WX:
# the weather information is in a dict
# this brings those values out to the outer dict
class_name = WeatherPacket
packet_class = WeatherPacket
elif packet_type == PACKET_TYPE_OBJECT and "weather" in raw:
class_name = WeatherPacket
packet_class = WeatherPacket
elif packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here
if "latitude" in raw:
class_name = GPSPacket
packet_class = GPSPacket
else:
LOG.error(f"Unknown packet type {packet_type}")
LOG.error(raw)
raise Exception(f"Unknown packet type {packet_type} {raw}")
print(f"factory({packet_type}): {class_name} {raw}")
# LOG.info(f"factory({packet_type}):({raw.get('from_call')}) {packet_class.__class__} {raw}")
# LOG.error(packet_class)
return class_name.from_aprslib_dict(raw)
return packet_class().from_dict(raw) # type: ignore

View File

@ -110,6 +110,11 @@ class TestPluginBase(unittest.TestCase):
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
packet_raw = "kd8mey-10>APRS,TCPIP*,qAC,T2SYDNEY:=4247.80N/08539.00WrPHG1210/Making 220 Great Again Allstar# 552191"
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
def test_reject_factory(self):
"""Test to ensure a reject packet is created."""
packet_raw = "HB9FDL-1>APK102,HB9FM-4*,WIDE2,qAR,HB9FEF-11::REPEAT :rej4139"
@ -117,6 +122,12 @@ class TestPluginBase(unittest.TestCase):
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.RejectPacket)
self.assertEqual("4139", packet.msgNo)
self.assertEqual("HB9FDL-1", packet.from_call)
self.assertEqual("REPEAT", packet.to_call)
self.assertEqual("reject", packet.packet_type)
self.assertIsNone(packet.payload)
def test_thirdparty_factory(self):
"""Test to ensure a third party packet is created."""
packet_raw = "GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW"
@ -131,8 +142,26 @@ class TestPluginBase(unittest.TestCase):
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.WeatherPacket)
self.assertEqual(28.88888888888889, packet.temperature)
self.assertEqual(0.0, packet.rain_1h)
self.assertEqual(1015.7, packet.pressure)
self.assertEqual(80, packet.humidity)
self.assertEqual(745, packet.luminosity)
self.assertEqual(3.0, packet.wind_speed)
self.assertEqual(232, packet.wind_direction)
self.assertEqual(6.0, packet.wind_gust)
self.assertEqual(29.899, packet.latitude)
self.assertEqual(-84.39616666666667, packet.longitude)
def test_mice_factory(self):
packet_raw = 'kh2sr-15>S7TSYR,WIDE1-1,WIDE2-1,qAO,KO6KL-1:`1`7\x1c\x1c.#/`"4,}QuirkyQRP 4.6V 35.3C S06'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.MicEPacket)
# Packet with telemetry and DAO
# http://www.aprs.org/datum.txt
packet_raw = 'KD9YIL>T0PX9W,WIDE1-1,WIDE2-1,qAO,NU9R-10:`sB,l#P>/\'"6+}|#*%U\'a|!whl!|3'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.MicEPacket)