Added support for ThirdParty packet types

The kiss clients now detect if the incomming packet is a third party
packet and then sends up the subpacket instead of the encapsulated
packet up to the consumer.
This commit is contained in:
Hemna 2023-08-15 13:43:53 -04:00
parent 65a5a90458
commit ae9e4d31ad
2 changed files with 39 additions and 1 deletions

View File

@ -235,7 +235,11 @@ class KISSClient(Client):
# LOG.debug(f"Decoding {msg}")
raw = aprslib.parse(str(frame))
return core.Packet.factory(raw)
packet = core.Packet.factory(raw)
if isinstance(packet, core.ThirdParty):
return packet.subpacket
else:
return packet
@trace.trace
def setup_connection(self):

View File

@ -25,6 +25,7 @@ PACKET_TYPE_OBJECT = "object"
PACKET_TYPE_UNKNOWN = "unknown"
PACKET_TYPE_STATUS = "status"
PACKET_TYPE_BEACON = "beacon"
PACKET_TYPE_THIRDPARTY = "thirdparty"
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
@ -115,6 +116,7 @@ class Packet(metaclass=abc.ABCMeta):
@staticmethod
def factory(raw_packet):
"""Factory method to create a packet from a raw packet string."""
raw = raw_packet
raw["raw_dict"] = raw.copy()
translate_fields = {
@ -133,6 +135,19 @@ class Packet(metaclass=abc.ABCMeta):
packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type
class_name = TYPE_LOOKUP[packet_type]
if packet_type == PACKET_TYPE_THIRDPARTY:
# We have an encapsulated packet!
# So we need to decode it and return the inner packet
# as the packet we are going to process.
# This is a recursive call to the factory
subpacket_raw = raw["subpacket"]
subpacket = Packet.factory(subpacket_raw)
del raw["subpacket"]
# raw["subpacket"] = subpacket
packet = dacite.from_dict(data_class=class_name, data=raw)
packet.subpacket = subpacket
return packet
if packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here
if "latitude" in raw:
@ -552,6 +567,22 @@ class WeatherPacket(GPSPacket):
)
class ThirdParty(Packet):
# Holds the encapsulated packet
subpacket: Packet = None
def __repr__(self):
"""Build the repr version of the packet."""
repr_str = (
f"{self.__class__.__name__}:"
f" From: {self.from_call} "
f" To: {self.to_call} "
f" Subpacket: {repr(self.subpacket)}"
)
return repr_str
TYPE_LOOKUP = {
PACKET_TYPE_WX: WeatherPacket,
PACKET_TYPE_MESSAGE: MessagePacket,
@ -562,6 +593,7 @@ TYPE_LOOKUP = {
PACKET_TYPE_STATUS: StatusPacket,
PACKET_TYPE_BEACON: GPSPacket,
PACKET_TYPE_UNKNOWN: Packet,
PACKET_TYPE_THIRDPARTY: ThirdParty,
}
@ -588,6 +620,8 @@ def get_packet_type(packet: dict):
elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
if packet.get("symbol", None) == "_":
packet_type = PACKET_TYPE_WX
elif pkt_format == PACKET_TYPE_THIRDPARTY:
packet_type = PACKET_TYPE_THIRDPARTY
return packet_type