mirror of https://github.com/craigerl/aprsd.git
867 lines
29 KiB
Python
867 lines
29 KiB
Python
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import logging
|
|
import re
|
|
import time
|
|
# Due to a failure in python 3.8
|
|
from typing import Any, List, Optional, Type, TypeVar, Union
|
|
|
|
from dataclasses_json import (
|
|
CatchAll, DataClassJsonMixin, Undefined, dataclass_json,
|
|
)
|
|
from loguru import logger
|
|
|
|
from aprsd.utils import counter
|
|
|
|
|
|
# For mypy to be happy
|
|
A = TypeVar("A", bound="DataClassJsonMixin")
|
|
Json = Union[dict, list, str, int, float, bool, None]
|
|
|
|
LOG = logging.getLogger()
|
|
LOGU = logger
|
|
|
|
PACKET_TYPE_BULLETIN = "bulletin"
|
|
PACKET_TYPE_MESSAGE = "message"
|
|
PACKET_TYPE_ACK = "ack"
|
|
PACKET_TYPE_REJECT = "reject"
|
|
PACKET_TYPE_MICE = "mic-e"
|
|
PACKET_TYPE_WX = "wx"
|
|
PACKET_TYPE_WEATHER = "weather"
|
|
PACKET_TYPE_OBJECT = "object"
|
|
PACKET_TYPE_UNKNOWN = "unknown"
|
|
PACKET_TYPE_STATUS = "status"
|
|
PACKET_TYPE_BEACON = "beacon"
|
|
PACKET_TYPE_THIRDPARTY = "thirdparty"
|
|
PACKET_TYPE_TELEMETRY = "telemetry-message"
|
|
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
|
|
|
|
NO_DATE = datetime(1900, 10, 24)
|
|
|
|
|
|
def _init_timestamp():
|
|
"""Build a unix style timestamp integer"""
|
|
return int(round(time.time()))
|
|
|
|
|
|
def _init_send_time():
|
|
# We have to use a datetime here, or the json encoder
|
|
# Fails on a NoneType.
|
|
return NO_DATE
|
|
|
|
|
|
def _init_msgNo(): # noqa: N802
|
|
"""For some reason __post__init doesn't get called.
|
|
|
|
So in order to initialize the msgNo field in the packet
|
|
we use this workaround.
|
|
"""
|
|
c = counter.PacketCounter()
|
|
c.increment()
|
|
return c.value
|
|
|
|
|
|
def _translate_fields(raw: dict) -> dict:
|
|
translate_fields = {
|
|
"from": "from_call",
|
|
"to": "to_call",
|
|
}
|
|
# First translate some fields
|
|
for key in translate_fields:
|
|
if key in raw:
|
|
raw[translate_fields[key]] = raw[key]
|
|
del raw[key]
|
|
|
|
# addresse overrides to_call
|
|
if "addresse" in raw:
|
|
raw["to_call"] = raw["addresse"]
|
|
|
|
return raw
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class Packet:
|
|
_type: str = field(default="Packet", hash=False)
|
|
from_call: Optional[str] = field(default=None)
|
|
to_call: Optional[str] = field(default=None)
|
|
addresse: Optional[str] = field(default=None)
|
|
format: Optional[str] = field(default=None)
|
|
msgNo: Optional[str] = field(default=None) # noqa: N815
|
|
packet_type: Optional[str] = field(default=None)
|
|
timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False)
|
|
# Holds the raw text string to be sent over the wire
|
|
# or holds the raw string from input packet
|
|
raw: Optional[str] = field(default=None, compare=False, hash=False)
|
|
raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False, hash=False)
|
|
# Built by calling prepare(). raw needs this built first.
|
|
payload: Optional[str] = field(default=None)
|
|
|
|
# Fields related to sending packets out
|
|
send_count: int = field(repr=False, default=0, compare=False, hash=False)
|
|
retry_count: int = field(repr=False, default=3, compare=False, hash=False)
|
|
last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
|
|
last_send_attempt: int = field(repr=False, default=0, compare=False, hash=False)
|
|
|
|
# Do we allow this packet to be saved to send later?
|
|
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
|
|
path: List[str] = field(default_factory=list, compare=False, hash=False)
|
|
via: Optional[str] = field(default=None, compare=False, hash=False)
|
|
|
|
def get(self, key: str, default: Optional[str] = None):
|
|
"""Emulate a getter on a dict."""
|
|
if hasattr(self, key):
|
|
return getattr(self, key)
|
|
else:
|
|
return default
|
|
|
|
@property
|
|
def key(self) -> str:
|
|
"""Build a key for finding this packet in a dict."""
|
|
return f"{self.from_call}:{self.addresse}:{self.msgNo}"
|
|
|
|
def update_timestamp(self) -> None:
|
|
self.timestamp = _init_timestamp()
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
"""Build a human readable string for this packet.
|
|
|
|
This doesn't include the from to and type, but just
|
|
the human readable payload.
|
|
"""
|
|
self.prepare()
|
|
msg = self._filter_for_send().rstrip("\n")
|
|
return msg
|
|
|
|
def prepare(self) -> None:
|
|
"""Do stuff here that is needed prior to sending over the air."""
|
|
# now build the raw message for sending
|
|
if not self.msgNo:
|
|
self.msgNo = _init_msgNo()
|
|
self._build_payload()
|
|
self._build_raw()
|
|
|
|
def _build_payload(self) -> None:
|
|
"""The payload is the non headers portion of the packet."""
|
|
if not self.to_call:
|
|
raise ValueError("to_call isn't set. Must set to_call before calling prepare()")
|
|
msg = self._filter_for_send().rstrip("\n")
|
|
self.payload = (
|
|
f":{self.to_call.ljust(9)}"
|
|
f":{msg}"
|
|
)
|
|
|
|
def _build_raw(self) -> None:
|
|
"""Build the self.raw which is what is sent over the air."""
|
|
self.raw = "{}>APZ100:{}".format(
|
|
self.from_call,
|
|
self.payload,
|
|
)
|
|
|
|
def _filter_for_send(self) -> str:
|
|
"""Filter and format message string for FCC."""
|
|
# max? ftm400 displays 64, raw msg shows 74
|
|
# and ftm400-send is max 64. setting this to
|
|
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
|
# feature req: break long ones into two msgs
|
|
if not self.raw:
|
|
raise ValueError("No message text to send. call prepare() first.")
|
|
|
|
message = self.raw[:67]
|
|
# We all miss George Carlin
|
|
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
|
|
|
def __str__(self) -> str:
|
|
"""Show the raw version of the packet"""
|
|
self.prepare()
|
|
if not self.raw:
|
|
raise ValueError("self.raw is unset")
|
|
return self.raw
|
|
|
|
def __repr__(self) -> str:
|
|
"""Build the repr version of the packet."""
|
|
repr = (
|
|
f"{self.__class__.__name__}:"
|
|
f" From: {self.from_call} "
|
|
f" To: {self.to_call}"
|
|
)
|
|
return repr
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class AckPacket(Packet):
|
|
_type: str = field(default="AckPacket", hash=False)
|
|
response: Optional[str] = field(default=None)
|
|
|
|
def __post__init__(self):
|
|
if self.response:
|
|
LOG.warning("Response set!")
|
|
|
|
def _build_payload(self):
|
|
self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}"
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class RejectPacket(Packet):
|
|
_type: str = field(default="RejectPacket", hash=False)
|
|
response: Optional[str] = field(default=None)
|
|
|
|
def __post__init__(self):
|
|
if self.response:
|
|
LOG.warning("Response set!")
|
|
|
|
def _build_payload(self):
|
|
self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}"
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class MessagePacket(Packet):
|
|
_type: str = field(default="MessagePacket", hash=False)
|
|
message_text: Optional[str] = field(default=None)
|
|
|
|
def _filter_for_send(self) -> str:
|
|
"""Filter and format message string for FCC."""
|
|
# max? ftm400 displays 64, raw msg shows 74
|
|
# and ftm400-send is max 64. setting this to
|
|
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
|
# feature req: break long ones into two msgs
|
|
if not self.message_text:
|
|
raise ValueError("No message text to send. Populate message_text field.")
|
|
|
|
message = self.message_text[:67]
|
|
# We all miss George Carlin
|
|
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
|
|
|
def _build_payload(self):
|
|
self.payload = ":{}:{}{{{}".format(
|
|
self.to_call.ljust(9),
|
|
self._filter_for_send().rstrip("\n"),
|
|
str(self.msgNo),
|
|
)
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class StatusPacket(Packet):
|
|
_type: str = field(default="StatusPacket", hash=False)
|
|
status: Optional[str] = field(default=None)
|
|
messagecapable: bool = field(default=False)
|
|
comment: Optional[str] = field(default=None)
|
|
raw_timestamp: Optional[str] = field(default=None)
|
|
|
|
def _filter_for_send(self) -> str:
|
|
"""Filter and format message string for FCC."""
|
|
# max? ftm400 displays 64, raw msg shows 74
|
|
# and ftm400-send is max 64. setting this to
|
|
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
|
# feature req: break long ones into two msgs
|
|
if not self.status and not self.comment:
|
|
self.status = "None"
|
|
|
|
message = self.status[:67]
|
|
# We all miss George Carlin
|
|
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
|
|
|
def _build_payload(self):
|
|
self.payload = ":{}:{}{{{}".format(
|
|
self.to_call.ljust(9),
|
|
self._filter_for_send().rstrip("\n"),
|
|
str(self.msgNo),
|
|
)
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
self.prepare()
|
|
return self.status
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class GPSPacket(Packet):
|
|
_type: str = field(default="GPSPacket", hash=False)
|
|
latitude: float = field(default=0.00)
|
|
longitude: float = field(default=0.00)
|
|
altitude: float = field(default=0.00)
|
|
rng: float = field(default=0.00)
|
|
posambiguity: int = field(default=0)
|
|
messagecapable: bool = field(default=False)
|
|
comment: Optional[str] = field(default=None)
|
|
symbol: str = field(default="l")
|
|
symbol_table: str = field(default="/")
|
|
raw_timestamp: Optional[str] = field(default=None)
|
|
object_name: Optional[str] = field(default=None)
|
|
object_format: Optional[str] = field(default=None)
|
|
alive: Optional[bool] = field(default=None)
|
|
course: Optional[int] = field(default=None)
|
|
speed: Optional[float] = field(default=None)
|
|
phg: Optional[str] = field(default=None)
|
|
phg_power: Optional[int] = field(default=None)
|
|
phg_height: Optional[float] = field(default=None)
|
|
phg_gain: Optional[int] = field(default=None)
|
|
phg_dir: Optional[str] = field(default=None)
|
|
phg_range: Optional[float] = field(default=None)
|
|
phg_rate: Optional[int] = field(default=None)
|
|
# http://www.aprs.org/datum.txt
|
|
daodatumbyte: Optional[str] = field(default=None)
|
|
|
|
def decdeg2dms(self, degrees_decimal):
|
|
is_positive = degrees_decimal >= 0
|
|
degrees_decimal = abs(degrees_decimal)
|
|
minutes, seconds = divmod(degrees_decimal * 3600, 60)
|
|
degrees, minutes = divmod(minutes, 60)
|
|
degrees = degrees if is_positive else -degrees
|
|
|
|
degrees = str(int(degrees)).replace("-", "0")
|
|
minutes = str(int(minutes)).replace("-", "0")
|
|
seconds = str(int(round(seconds * 0.01, 2) * 100))
|
|
|
|
return {"degrees": degrees, "minutes": minutes, "seconds": seconds}
|
|
|
|
def decdeg2dmm_m(self, degrees_decimal):
|
|
is_positive = degrees_decimal >= 0
|
|
degrees_decimal = abs(degrees_decimal)
|
|
minutes, seconds = divmod(degrees_decimal * 3600, 60)
|
|
degrees, minutes = divmod(minutes, 60)
|
|
degrees = degrees if is_positive else -degrees
|
|
|
|
degrees = abs(int(degrees))
|
|
minutes = int(round(minutes + (seconds / 60), 2))
|
|
hundredths = round(seconds / 60, 2)
|
|
|
|
return {
|
|
"degrees": degrees, "minutes": minutes, "seconds": seconds,
|
|
"hundredths": hundredths,
|
|
}
|
|
|
|
def convert_latitude(self, degrees_decimal):
|
|
det = self.decdeg2dmm_m(degrees_decimal)
|
|
if degrees_decimal > 0:
|
|
direction = "N"
|
|
else:
|
|
direction = "S"
|
|
|
|
degrees = str(det.get("degrees")).zfill(2)
|
|
minutes = str(det.get("minutes")).zfill(2)
|
|
seconds = det.get("seconds")
|
|
hun = det.get("hundredths")
|
|
hundredths = f"{hun:.2f}".split(".")[1]
|
|
|
|
LOG.debug(
|
|
f"LAT degress {degrees} minutes {str(minutes)} "
|
|
f"seconds {seconds} hundredths {hundredths} direction {direction}",
|
|
)
|
|
|
|
lat = f"{degrees}{str(minutes)}.{hundredths}{direction}"
|
|
return lat
|
|
|
|
def convert_longitude(self, degrees_decimal):
|
|
det = self.decdeg2dmm_m(degrees_decimal)
|
|
if degrees_decimal > 0:
|
|
direction = "E"
|
|
else:
|
|
direction = "W"
|
|
|
|
degrees = str(det.get("degrees")).zfill(3)
|
|
minutes = str(det.get("minutes")).zfill(2)
|
|
seconds = det.get("seconds")
|
|
hun = det.get("hundredths")
|
|
hundredths = f"{hun:.2f}".split(".")[1]
|
|
|
|
LOG.debug(
|
|
f"LON degress {degrees} minutes {str(minutes)} "
|
|
f"seconds {seconds} hundredths {hundredths} direction {direction}",
|
|
)
|
|
|
|
lon = f"{degrees}{str(minutes)}.{hundredths}{direction}"
|
|
return lon
|
|
|
|
def _build_time_zulu(self):
|
|
"""Build the timestamp in UTC/zulu."""
|
|
if self.timestamp:
|
|
local_dt = datetime.fromtimestamp(self.timestamp)
|
|
else:
|
|
local_dt = datetime.now()
|
|
self.timestamp = datetime.timestamp(local_dt)
|
|
|
|
utc_offset_timedelta = datetime.utcnow() - local_dt
|
|
result_utc_datetime = local_dt + utc_offset_timedelta
|
|
time_zulu = result_utc_datetime.strftime("%d%H%M")
|
|
return time_zulu
|
|
|
|
def _build_payload(self):
|
|
"""The payload is the non headers portion of the packet."""
|
|
time_zulu = self._build_time_zulu()
|
|
lat = self.latitude
|
|
long = self.longitude
|
|
self.payload = (
|
|
f"@{time_zulu}z{lat}{self.symbol_table}"
|
|
f"{long}{self.symbol}"
|
|
)
|
|
|
|
if self.comment:
|
|
self.payload = f"{self.payload}{self.comment}"
|
|
|
|
def _build_raw(self):
|
|
self.raw = (
|
|
f"{self.from_call}>{self.to_call},WIDE2-1:"
|
|
f"{self.payload}"
|
|
)
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
h_str = []
|
|
h_str.append(f"Lat:{self.latitude:03.3f}")
|
|
h_str.append(f"Lon:{self.longitude:03.3f}")
|
|
if self.altitude:
|
|
h_str.append(f"Altitude {self.altitude:03.0f}")
|
|
if self.speed:
|
|
h_str.append(f"Speed {self.speed:03.0f}MPH")
|
|
if self.course:
|
|
h_str.append(f"Course {self.course:03.0f}")
|
|
if self.rng:
|
|
h_str.append(f"RNG {self.rng:03.0f}")
|
|
if self.phg:
|
|
h_str.append(f"PHG {self.phg}")
|
|
|
|
return " ".join(h_str)
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class BeaconPacket(GPSPacket):
|
|
_type: str = field(default="BeaconPacket", hash=False)
|
|
|
|
def _build_payload(self):
|
|
"""The payload is the non headers portion of the packet."""
|
|
time_zulu = self._build_time_zulu()
|
|
lat = self.convert_latitude(self.latitude)
|
|
long = self.convert_longitude(self.longitude)
|
|
|
|
self.payload = (
|
|
f"@{time_zulu}z{lat}{self.symbol_table}"
|
|
f"{long}{self.symbol}APRSD Beacon"
|
|
)
|
|
|
|
def _build_raw(self):
|
|
self.raw = (
|
|
f"{self.from_call}>APZ100:"
|
|
f"{self.payload}"
|
|
)
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
h_str = []
|
|
h_str.append(f"Lat:{self.latitude:03.3f}")
|
|
h_str.append(f"Lon:{self.longitude:03.3f}")
|
|
h_str.append(f"{self.comment}")
|
|
return " ".join(h_str)
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class MicEPacket(GPSPacket):
|
|
_type: str = field(default="MicEPacket", hash=False)
|
|
messagecapable: bool = False
|
|
mbits: Optional[str] = None
|
|
mtype: Optional[str] = None
|
|
telemetry: Optional[dict] = field(default=None)
|
|
# in MPH
|
|
speed: float = 0.00
|
|
# 0 to 360
|
|
course: int = 0
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
h_info = super().human_info
|
|
return f"{h_info} {self.mbits} mbits"
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class TelemetryPacket(GPSPacket):
|
|
_type: str = field(default="TelemetryPacket", hash=False)
|
|
messagecapable: bool = False
|
|
mbits: Optional[str] = None
|
|
mtype: Optional[str] = None
|
|
telemetry: Optional[dict] = field(default=None)
|
|
tPARM: Optional[list[str]] = field(default=None) # noqa: N815
|
|
tUNIT: Optional[list[str]] = field(default=None) # noqa: N815
|
|
# in MPH
|
|
speed: float = 0.00
|
|
# 0 to 360
|
|
course: int = 0
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
h_info = super().human_info
|
|
return f"{h_info} {self.telemetry}"
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class ObjectPacket(GPSPacket):
|
|
_type: str = field(default="ObjectPacket", hash=False)
|
|
alive: bool = True
|
|
raw_timestamp: Optional[str] = None
|
|
symbol: str = field(default="r")
|
|
# in MPH
|
|
speed: float = 0.00
|
|
# 0 to 360
|
|
course: int = 0
|
|
|
|
def _build_payload(self):
|
|
time_zulu = self._build_time_zulu()
|
|
lat = self.convert_latitude(self.latitude)
|
|
long = self.convert_longitude(self.longitude)
|
|
|
|
self.payload = (
|
|
f"*{time_zulu}z{lat}{self.symbol_table}"
|
|
f"{long}{self.symbol}"
|
|
)
|
|
|
|
if self.comment:
|
|
self.payload = f"{self.payload}{self.comment}"
|
|
|
|
def _build_raw(self):
|
|
"""
|
|
REPEAT builds packets like
|
|
reply = "{}>APZ100:;{:9s}*{}z{}r{:.3f}MHz {} {}".format(
|
|
fromcall, callsign, time_zulu, latlon, freq, uplink_tone, offset,
|
|
)
|
|
where fromcall is the callsign that is sending the packet
|
|
callsign is the station callsign for the object
|
|
The frequency, uplink_tone, offset is part of the comment
|
|
"""
|
|
|
|
self.raw = (
|
|
f"{self.from_call}>APZ100:;{self.to_call:9s}"
|
|
f"{self.payload}"
|
|
)
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
h_info = super().human_info
|
|
return f"{h_info} {self.comment}"
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class WeatherPacket(GPSPacket, DataClassJsonMixin):
|
|
_type: str = field(default="WeatherPacket", hash=False)
|
|
symbol: str = "_"
|
|
wind_speed: float = 0.00
|
|
wind_direction: int = 0
|
|
wind_gust: float = 0.00
|
|
temperature: float = 0.00
|
|
# in inches. 1.04 means 1.04 inches
|
|
rain_1h: float = 0.00
|
|
rain_24h: float = 0.00
|
|
rain_since_midnight: float = 0.00
|
|
humidity: int = 0
|
|
pressure: float = 0.00
|
|
comment: Optional[str] = field(default=None)
|
|
luminosity: Optional[int] = field(default=None)
|
|
wx_raw_timestamp: Optional[str] = field(default=None)
|
|
course: Optional[int] = field(default=None)
|
|
speed: Optional[float] = field(default=None)
|
|
|
|
def _translate(self, raw: dict) -> dict:
|
|
for key in raw["weather"]:
|
|
raw[key] = raw["weather"][key]
|
|
|
|
# If we have the broken aprslib, then we need to
|
|
# Convert the course and speed to wind_speed and wind_direction
|
|
# aprslib issue #80
|
|
# https://github.com/rossengeorgiev/aprs-python/issues/80
|
|
# Wind speed and course is option in the SPEC.
|
|
# For some reason aprslib multiplies the speed by 1.852.
|
|
if "wind_speed" not in raw and "wind_direction" not in raw:
|
|
# Most likely this is the broken aprslib
|
|
# So we need to convert the wind_gust speed
|
|
raw["wind_gust"] = round(raw.get("wind_gust", 0) / 0.44704, 3)
|
|
if "wind_speed" not in raw:
|
|
wind_speed = raw.get("speed")
|
|
if wind_speed:
|
|
raw["wind_speed"] = round(wind_speed / 1.852, 3)
|
|
raw["weather"]["wind_speed"] = raw["wind_speed"]
|
|
if "speed" in raw:
|
|
del raw["speed"]
|
|
# Let's adjust the rain numbers as well, since it's wrong
|
|
raw["rain_1h"] = round((raw.get("rain_1h", 0) / .254) * .01, 3)
|
|
raw["weather"]["rain_1h"] = raw["rain_1h"]
|
|
raw["rain_24h"] = round((raw.get("rain_24h", 0) / .254) * .01, 3)
|
|
raw["weather"]["rain_24h"] = raw["rain_24h"]
|
|
raw["rain_since_midnight"] = round((raw.get("rain_since_midnight", 0) / .254) * .01, 3)
|
|
raw["weather"]["rain_since_midnight"] = raw["rain_since_midnight"]
|
|
|
|
if "wind_direction" not in raw:
|
|
wind_direction = raw.get("course")
|
|
if wind_direction:
|
|
raw["wind_direction"] = wind_direction
|
|
raw["weather"]["wind_direction"] = raw["wind_direction"]
|
|
if "course" in raw:
|
|
del raw["course"]
|
|
|
|
del raw["weather"]
|
|
return raw
|
|
|
|
@classmethod
|
|
def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A:
|
|
"""Create from a dictionary that has come directly from aprslib parse"""
|
|
raw = cls._translate(cls, kvs) # type: ignore
|
|
return super().from_dict(raw)
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
h_str = []
|
|
h_str.append(f"Temp {self.temperature:03.0f}F")
|
|
h_str.append(f"Humidity {self.humidity}%")
|
|
h_str.append(f"Wind {self.wind_speed:03.0f}MPH@{self.wind_direction}")
|
|
h_str.append(f"Pressure {self.pressure}mb")
|
|
h_str.append(f"Rain {self.rain_24h}in/24hr")
|
|
|
|
return " ".join(h_str)
|
|
|
|
def _build_payload(self):
|
|
"""Build an uncompressed weather packet
|
|
|
|
Format =
|
|
|
|
_CSE/SPDgXXXtXXXrXXXpXXXPXXXhXXbXXXXX%type NEW FORMAT APRS793 June 97
|
|
NOT BACKWARD COMPATIBLE
|
|
|
|
|
|
Where: CSE/SPD is wind direction and sustained 1 minute speed
|
|
t is in degrees F
|
|
|
|
r is Rain per last 60 minutes
|
|
1.04 inches of rain will show as r104
|
|
p is precipitation per last 24 hours (sliding 24 hour window)
|
|
P is precip per last 24 hours since midnight
|
|
b is Baro in tenths of a mb
|
|
h is humidity in percent. 00=100
|
|
g is Gust (peak winds in last 5 minutes)
|
|
# is the raw rain counter for remote WX stations
|
|
See notes on remotes below
|
|
% shows software type d=Dos, m=Mac, w=Win, etc
|
|
type shows type of WX instrument
|
|
|
|
"""
|
|
time_zulu = self._build_time_zulu()
|
|
|
|
contents = [
|
|
f"@{time_zulu}z{self.latitude}{self.symbol_table}",
|
|
f"{self.longitude}{self.symbol}",
|
|
f"{self.wind_direction:03d}",
|
|
# Speed = sustained 1 minute wind speed in mph
|
|
f"{self.symbol_table}", f"{self.wind_speed:03.0f}",
|
|
# wind gust (peak wind speed in mph in the last 5 minutes)
|
|
f"g{self.wind_gust:03.0f}",
|
|
# Temperature in degrees F
|
|
f"t{self.temperature:03.0f}",
|
|
# Rainfall (in hundredths of an inch) in the last hour
|
|
f"r{self.rain_1h*100:03.0f}",
|
|
# Rainfall (in hundredths of an inch) in last 24 hours
|
|
f"p{self.rain_24h*100:03.0f}",
|
|
# Rainfall (in hundredths of an inch) since midnigt
|
|
f"P{self.rain_since_midnight*100:03.0f}",
|
|
# Humidity
|
|
f"h{self.humidity:02d}",
|
|
# Barometric pressure (in tenths of millibars/tenths of hPascal)
|
|
f"b{self.pressure:05.0f}",
|
|
]
|
|
if self.comment:
|
|
contents.append(self.comment)
|
|
self.payload = "".join(contents)
|
|
|
|
def _build_raw(self):
|
|
|
|
self.raw = (
|
|
f"{self.from_call}>{self.to_call},WIDE1-1,WIDE2-1:"
|
|
f"{self.payload}"
|
|
)
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class ThirdPartyPacket(Packet, DataClassJsonMixin):
|
|
_type: str = "ThirdPartyPacket"
|
|
# Holds the encapsulated packet
|
|
subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False)
|
|
|
|
def __repr__(self):
|
|
"""Build the repr version of the packet."""
|
|
repr_str = (
|
|
f"{self.__class__.__name__}:"
|
|
f" From: {self.from_call} "
|
|
f" To: {self.to_call} "
|
|
f" Subpacket: {repr(self.subpacket)}"
|
|
)
|
|
|
|
return repr_str
|
|
|
|
@classmethod
|
|
def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A:
|
|
obj = super().from_dict(kvs)
|
|
obj.subpacket = factory(obj.subpacket) # type: ignore
|
|
return obj
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
sub_info = self.subpacket.human_info
|
|
return f"{self.from_call}->{self.to_call} {sub_info}"
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass(unsafe_hash=True)
|
|
class BulletinPacket(Packet):
|
|
_type: str = "BulletinPacket"
|
|
# Holds the encapsulated packet
|
|
bid: Optional[str] = field(default="1")
|
|
message_text: Optional[str] = field(default=None)
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
return f"BLN{self.bid} {self.message_text}"
|
|
|
|
|
|
@dataclass_json(undefined=Undefined.INCLUDE)
|
|
@dataclass(unsafe_hash=True)
|
|
class UnknownPacket:
|
|
"""Catchall Packet for things we don't know about.
|
|
|
|
All of the unknown attributes are stored in the unknown_fields
|
|
"""
|
|
unknown_fields: CatchAll
|
|
_type: str = "UnknownPacket"
|
|
from_call: Optional[str] = field(default=None)
|
|
to_call: Optional[str] = field(default=None)
|
|
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
|
|
format: Optional[str] = field(default=None)
|
|
raw: Optional[str] = field(default=None)
|
|
raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False, hash=False)
|
|
path: List[str] = field(default_factory=list, compare=False, hash=False)
|
|
packet_type: Optional[str] = field(default=None)
|
|
via: Optional[str] = field(default=None, compare=False, hash=False)
|
|
|
|
@property
|
|
def key(self) -> str:
|
|
"""Build a key for finding this packet in a dict."""
|
|
return f"{self.from_call}:{self.packet_type}:{self.to_call}"
|
|
|
|
@property
|
|
def human_info(self) -> str:
|
|
return str(self.unknown_fields)
|
|
|
|
|
|
TYPE_LOOKUP: dict[str, type[Packet]] = {
|
|
PACKET_TYPE_BULLETIN: BulletinPacket,
|
|
PACKET_TYPE_WX: WeatherPacket,
|
|
PACKET_TYPE_WEATHER: WeatherPacket,
|
|
PACKET_TYPE_MESSAGE: MessagePacket,
|
|
PACKET_TYPE_ACK: AckPacket,
|
|
PACKET_TYPE_REJECT: RejectPacket,
|
|
PACKET_TYPE_MICE: MicEPacket,
|
|
PACKET_TYPE_OBJECT: ObjectPacket,
|
|
PACKET_TYPE_STATUS: StatusPacket,
|
|
PACKET_TYPE_BEACON: BeaconPacket,
|
|
PACKET_TYPE_UNKNOWN: UnknownPacket,
|
|
PACKET_TYPE_THIRDPARTY: ThirdPartyPacket,
|
|
PACKET_TYPE_TELEMETRY: TelemetryPacket,
|
|
}
|
|
|
|
|
|
def get_packet_type(packet: dict) -> str:
|
|
"""Decode the packet type from the packet."""
|
|
|
|
pkt_format = packet.get("format")
|
|
msg_response = packet.get("response")
|
|
packet_type = PACKET_TYPE_UNKNOWN
|
|
if pkt_format == "message" and msg_response == "ack":
|
|
packet_type = PACKET_TYPE_ACK
|
|
elif pkt_format == "message" and msg_response == "rej":
|
|
packet_type = PACKET_TYPE_REJECT
|
|
elif pkt_format == "message":
|
|
packet_type = PACKET_TYPE_MESSAGE
|
|
elif pkt_format == "mic-e":
|
|
packet_type = PACKET_TYPE_MICE
|
|
elif pkt_format == "object":
|
|
packet_type = PACKET_TYPE_OBJECT
|
|
elif pkt_format == "status":
|
|
packet_type = PACKET_TYPE_STATUS
|
|
elif pkt_format == PACKET_TYPE_BULLETIN:
|
|
packet_type = PACKET_TYPE_BULLETIN
|
|
elif pkt_format == PACKET_TYPE_BEACON:
|
|
packet_type = PACKET_TYPE_BEACON
|
|
elif pkt_format == PACKET_TYPE_TELEMETRY:
|
|
packet_type = PACKET_TYPE_TELEMETRY
|
|
elif pkt_format == PACKET_TYPE_WX:
|
|
packet_type = PACKET_TYPE_WEATHER
|
|
elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
|
|
if packet.get("symbol") == "_":
|
|
packet_type = PACKET_TYPE_WEATHER
|
|
elif pkt_format == PACKET_TYPE_THIRDPARTY:
|
|
packet_type = PACKET_TYPE_THIRDPARTY
|
|
|
|
if packet_type == PACKET_TYPE_UNKNOWN:
|
|
if "latitude" in packet:
|
|
packet_type = PACKET_TYPE_BEACON
|
|
else:
|
|
packet_type = PACKET_TYPE_UNKNOWN
|
|
return packet_type
|
|
|
|
|
|
def is_message_packet(packet: dict) -> bool:
|
|
return get_packet_type(packet) == PACKET_TYPE_MESSAGE
|
|
|
|
|
|
def is_ack_packet(packet: dict) -> bool:
|
|
return get_packet_type(packet) == PACKET_TYPE_ACK
|
|
|
|
|
|
def is_mice_packet(packet: dict[Any, Any]) -> bool:
|
|
return get_packet_type(packet) == PACKET_TYPE_MICE
|
|
|
|
|
|
def factory(raw_packet: dict[Any, Any]) -> type[Packet]:
|
|
"""Factory method to create a packet from a raw packet string."""
|
|
raw = raw_packet
|
|
if "_type" in raw:
|
|
cls = globals()[raw["_type"]]
|
|
return cls.from_dict(raw)
|
|
|
|
raw["raw_dict"] = raw.copy()
|
|
raw = _translate_fields(raw)
|
|
|
|
packet_type = get_packet_type(raw)
|
|
|
|
raw["packet_type"] = packet_type
|
|
packet_class = TYPE_LOOKUP[packet_type]
|
|
if packet_type == PACKET_TYPE_WX:
|
|
# the weather information is in a dict
|
|
# this brings those values out to the outer dict
|
|
packet_class = WeatherPacket
|
|
elif packet_type == PACKET_TYPE_OBJECT and "weather" in raw:
|
|
packet_class = WeatherPacket
|
|
elif packet_type == PACKET_TYPE_UNKNOWN:
|
|
# Try and figure it out here
|
|
if "latitude" in raw:
|
|
packet_class = GPSPacket
|
|
else:
|
|
LOG.warning(f"Unknown packet type {packet_type}")
|
|
LOG.warning(raw)
|
|
packet_class = UnknownPacket
|
|
|
|
raw.get("addresse", raw.get("to_call"))
|
|
|
|
# TODO: Find a global way to enable/disable this
|
|
# LOGU.opt(colors=True).info(
|
|
# f"factory(<green>{packet_type: <8}</green>):"
|
|
# f"(<red>{packet_class.__name__: <13}</red>): "
|
|
# f"<light-blue>{raw.get('from_call'): <9}</light-blue> -> <cyan>{to: <9}</cyan>")
|
|
# LOG.info(raw.get('msgNo'))
|
|
|
|
return packet_class().from_dict(raw) # type: ignore
|