1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-12-20 16:41:13 -05:00

Added ObjectPacket

This patch adds the ObjectPacket.  This is used by the REPEAT plugins
to send out an object in message packet to let radios tune directly
to the station.
This commit is contained in:
Hemna 2022-12-30 09:44:25 -05:00
parent 3d0bb8ae8e
commit ed284a42cc
2 changed files with 125 additions and 7 deletions

View File

@ -20,6 +20,7 @@ PACKET_TYPE_MESSAGE = "message"
PACKET_TYPE_ACK = "ack"
PACKET_TYPE_MICE = "mic-e"
PACKET_TYPE_WX = "weather"
PACKET_TYPE_OBJECT = "object"
PACKET_TYPE_UNKNOWN = "unknown"
PACKET_TYPE_STATUS = "status"
PACKET_TYPE_BEACON = "beacon"
@ -232,7 +233,7 @@ class MessagePacket(PathPacket):
)
@dataclass()
@dataclass
class StatusPacket(PathPacket):
status: str = None
messagecapable: bool = False
@ -242,7 +243,7 @@ class StatusPacket(PathPacket):
raise NotImplementedError
@dataclass()
@dataclass
class GPSPacket(PathPacket):
latitude: float = 0.00
longitude: float = 0.00
@ -257,6 +258,77 @@ class GPSPacket(PathPacket):
# 0 to 360
course: int = 0
def decdeg2dms(self, degrees_decimal):
is_positive = degrees_decimal >= 0
degrees_decimal = abs(degrees_decimal)
minutes, seconds = divmod(degrees_decimal * 3600, 60)
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees
degrees = str(int(degrees)).replace("-", "0")
minutes = str(int(minutes)).replace("-", "0")
seconds = str(int(round(seconds * 0.01, 2) * 100))
return {"degrees": degrees, "minutes": minutes, "seconds": seconds}
def decdeg2dmm_m(self, degrees_decimal):
is_positive = degrees_decimal >= 0
degrees_decimal = abs(degrees_decimal)
minutes, seconds = divmod(degrees_decimal * 3600, 60)
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees
degrees = abs(int(degrees))
minutes = int(round(minutes + (seconds / 60), 2))
hundredths = round(seconds / 60, 2)
return {
"degrees": degrees, "minutes": minutes, "seconds": seconds,
"hundredths": hundredths,
}
def convert_latitude(self, degrees_decimal):
det = self.decdeg2dmm_m(degrees_decimal)
if degrees_decimal > 0:
direction = "N"
else:
direction = "S"
degrees = str(det.get("degrees")).zfill(2)
minutes = str(det.get("minutes")).zfill(2)
seconds = det.get("seconds")
hun = det.get("hundredths")
hundredths = f"{hun:.2f}".split(".")[1]
LOG.debug(
f"LAT degress {degrees} minutes {str(minutes)} "
f"seconds {seconds} hundredths {hundredths} direction {direction}",
)
lat = f"{degrees}{str(minutes)}.{hundredths}{direction}"
return lat
def convert_longitude(self, degrees_decimal):
det = self.decdeg2dmm_m(degrees_decimal)
if degrees_decimal > 0:
direction = "E"
else:
direction = "W"
degrees = str(det.get("degrees")).zfill(3)
minutes = str(det.get("minutes")).zfill(2)
seconds = det.get("seconds")
hun = det.get("hundredths")
hundredths = f"{hun:.2f}".split(".")[1]
LOG.debug(
f"LON degress {degrees} minutes {str(minutes)} "
f"seconds {seconds} hundredths {hundredths} direction {direction}",
)
lon = f"{degrees}{str(minutes)}.{hundredths}{direction}"
return lon
def _build_time_zulu(self):
"""Build the timestamp in UTC/zulu."""
if self.timestamp:
@ -282,7 +354,7 @@ class GPSPacket(PathPacket):
self.raw = f"{self.raw}{self.comment}"
@dataclass()
@dataclass
class MicEPacket(GPSPacket):
messagecapable: bool = False
mbits: str = None
@ -292,6 +364,35 @@ class MicEPacket(GPSPacket):
raise NotImplementedError
@dataclass
class ObjectPacket(GPSPacket):
alive: bool = True
raw_timestamp: str = None
symbol: str = field(default="r")
def _build_raw(self):
"""
REPEAT builds packets like
reply = "{}>APZ100:;{:9s}*{}z{}r{:.3f}MHz {} {}".format(
fromcall, callsign, time_zulu, latlon, freq, uplink_tone, offset,
)
where fromcall is the callsign that is sending the packet
callsign is the station callsign for the object
The frequency, uplink_tone, offset is part of the comment
"""
time_zulu = self._build_time_zulu()
lat = self.convert_latitude(self.latitude)
long = self.convert_longitude(self.longitude)
self.raw = (
f"{self.from_call}>APZ100:;{self.to_call:9s}"
f"*{time_zulu}z{lat}{self.symbol_table}"
f"{long}{self.symbol}"
)
if self.comment:
self.raw = f"{self.raw}{self.comment}"
@dataclass()
class WeatherPacket(GPSPacket):
symbol: str = "_"
@ -369,6 +470,7 @@ TYPE_LOOKUP = {
PACKET_TYPE_MESSAGE: MessagePacket,
PACKET_TYPE_ACK: AckPacket,
PACKET_TYPE_MICE: MicEPacket,
PACKET_TYPE_OBJECT: ObjectPacket,
PACKET_TYPE_STATUS: StatusPacket,
PACKET_TYPE_BEACON: GPSPacket,
PACKET_TYPE_UNKNOWN: Packet,
@ -387,6 +489,8 @@ def get_packet_type(packet: dict):
packet_type = PACKET_TYPE_MESSAGE
elif pkt_format == "mic-e":
packet_type = PACKET_TYPE_MICE
elif pkt_format == "object":
packet_type = PACKET_TYPE_OBJECT
elif pkt_format == "status":
packet_type = PACKET_TYPE_STATUS
elif pkt_format == PACKET_TYPE_BEACON:

View File

@ -58,6 +58,10 @@ class APRSDStats:
"tx": 0,
"rx": 0,
},
"ObjectPacket": {
"tx": 0,
"rx": 0,
},
}
def __new__(cls, *args, **kwargs):
@ -110,12 +114,22 @@ class APRSDStats:
self._aprsis_keepalive = datetime.datetime.now()
def rx(self, packet):
type = packet.__class__.__name__
self._pkt_cnt[type]["rx"] += 1
pkt_type = packet.__class__.__name__
if pkt_type not in self._pkt_cnt:
self._pkt_cnt[pkt_type] = {
"tx": 0,
"rx": 0,
}
self._pkt_cnt[pkt_type]["rx"] += 1
def tx(self, packet):
type = packet.__class__.__name__
self._pkt_cnt[type]["tx"] += 1
pkt_type = packet.__class__.__name__
if pkt_type not in self._pkt_cnt:
self._pkt_cnt[pkt_type] = {
"tx": 0,
"rx": 0,
}
self._pkt_cnt[pkt_type]["tx"] += 1
@wrapt.synchronized(lock)
@property