Refactored packets

this patch removes the need for dacite2 package for creating
packet objects from the aprslib decoded packet dictionary.

moved the factory method from the base Packet object
to the core module.
This commit is contained in:
Hemna 2024-03-20 09:34:31 -04:00
parent 6f1d6b4122
commit 1477e61b0f
11 changed files with 298 additions and 182 deletions

View File

@ -137,7 +137,7 @@ class APRSISClient(Client):
def decode_packet(self, *args, **kwargs):
"""APRS lib already decodes this."""
return core.Packet.factory(args[0])
return core.factory(args[0])
def setup_connection(self):
user = CONF.aprs_network.login
@ -238,7 +238,7 @@ class KISSClient(Client):
# LOG.debug(f"Decoding {msg}")
raw = aprslib.parse(str(frame))
packet = core.Packet.factory(raw)
packet = core.factory(raw)
if isinstance(packet, core.ThirdParty):
return packet.subpacket
else:

View File

@ -67,7 +67,7 @@ class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass):
# Generate packets here?
raw = "GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW"
pkt_raw = aprslib.parse(raw)
pkt = core.Packet.factory(pkt_raw)
pkt = core.factory(pkt_raw)
callback(packet=pkt)
LOG.debug(f"END blocking FAKE consumer {self}")
time.sleep(8)

View File

@ -1,6 +1,6 @@
from aprsd.packets.core import ( # noqa: F401
AckPacket, BeaconPacket, GPSPacket, MessagePacket, MicEPacket, Packet,
RejectPacket, StatusPacket, WeatherPacket,
RejectPacket, StatusPacket, ThirdPartyPacket, WeatherPacket, factory,
)
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401

View File

@ -1,14 +1,12 @@
import abc
from dataclasses import asdict, dataclass, field
from datetime import datetime
import logging
import re
import time
# Due to a failure in python 3.8
from typing import List
from typing import List, Optional
import dacite
from dataclasses_json import dataclass_json
from dataclasses_json import DataClassJsonMixin
from aprsd.utils import counter
@ -19,7 +17,8 @@ PACKET_TYPE_MESSAGE = "message"
PACKET_TYPE_ACK = "ack"
PACKET_TYPE_REJECT = "reject"
PACKET_TYPE_MICE = "mic-e"
PACKET_TYPE_WX = "weather"
PACKET_TYPE_WX = "wx"
PACKET_TYPE_WEATHER = "weather"
PACKET_TYPE_OBJECT = "object"
PACKET_TYPE_UNKNOWN = "unknown"
PACKET_TYPE_STATUS = "status"
@ -52,68 +51,65 @@ def _init_msgNo(): # noqa: N802
return c.value
def factory_from_dict(packet_dict):
pkt_type = get_packet_type(packet_dict)
if pkt_type:
cls = TYPE_LOOKUP[pkt_type]
return cls.from_dict(packet_dict)
def _translate_fields(raw: dict) -> dict:
translate_fields = {
"from": "from_call",
"to": "to_call",
}
# First translate some fields
for key in translate_fields:
if key in raw:
raw[translate_fields[key]] = raw[key]
del raw[key]
# addresse overrides to_call
if "addresse" in raw:
raw["to_call"] = raw["addresse"]
return raw
def factory_from_json(packet_dict):
pkt_type = get_packet_type(packet_dict)
if pkt_type:
return TYPE_LOOKUP[pkt_type].from_json(packet_dict)
@dataclass_json
@dataclass(unsafe_hash=True)
class Packet(metaclass=abc.ABCMeta):
from_call: str = field(default=None)
to_call: str = field(default=None)
addresse: str = field(default=None)
format: str = field(default=None)
class Packet(DataClassJsonMixin):
_type: str = field(default="Packet", hash=False)
from_call: Optional[str] = field(default=None)
to_call: Optional[str] = field(default=None)
addresse: Optional[str] = field(default=None)
format: Optional[str] = field(default=None)
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
packet_type: str = field(default=None)
packet_type: Optional[str] = field(default=None)
timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False)
# Holds the raw text string to be sent over the wire
# or holds the raw string from input packet
raw: str = field(default=None, compare=False, hash=False)
raw: Optional[str] = field(default=None, compare=False, hash=False)
raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False, hash=False)
# Built by calling prepare(). raw needs this built first.
payload: str = field(default=None)
payload: Optional[str] = field(default=None)
# Fields related to sending packets out
send_count: int = field(repr=False, default=0, compare=False, hash=False)
retry_count: int = field(repr=False, default=3, compare=False, hash=False)
# last_send_time: datetime = field(
# metadata=dc_json_config(
# encoder=datetime.isoformat,
# decoder=datetime.fromisoformat,
# ),
# repr=True,
# default_factory=_init_send_time,
# compare=False,
# hash=False
# )
last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
last_send_attempt: int = field(repr=False, default=0, compare=False, hash=False)
# Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
path: List[str] = field(default_factory=list, compare=False, hash=False)
via: str = field(default=None, compare=False, hash=False)
def __post__init__(self):
LOG.warning(f"POST INIT {self}")
via: Optional[str] = field(default=None, compare=False, hash=False)
@property
def json(self):
"""
get the json formated string
"""
def json(self) -> str:
"""get the json formated string"""
# comes from the DataClassJsonMixin
return self.to_json()
def get(self, key, default=None):
@property
def dict(self) -> dict:
"""get the dict formated string"""
# comes from the DataClassJsonMixin
return self.to_dict()
def get(self, key: str, default: Optional[str] = None):
"""Emulate a getter on a dict."""
if hasattr(self, key):
return getattr(self, key)
@ -121,116 +117,37 @@ class Packet(metaclass=abc.ABCMeta):
return default
@property
def key(self):
def key(self) -> str:
"""Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.addresse}:{self.msgNo}"
def update_timestamp(self):
def update_timestamp(self) -> None:
self.timestamp = _init_timestamp()
def prepare(self):
def prepare(self) -> None:
"""Do stuff here that is needed prior to sending over the air."""
# now build the raw message for sending
self._build_payload()
self._build_raw()
def _build_payload(self):
def _build_payload(self) -> None:
"""The payload is the non headers portion of the packet."""
if not self.to_call:
raise ValueError("to_call isn't set. Must set to_call before calling prepare()")
msg = self._filter_for_send().rstrip("\n")
self.payload = (
f":{self.to_call.ljust(9)}"
f":{msg}"
)
def _build_raw(self):
def _build_raw(self) -> None:
"""Build the self.raw which is what is sent over the air."""
self.raw = "{}>APZ100:{}".format(
self.from_call,
self.payload,
)
@staticmethod
def factory(raw_packet):
"""Factory method to create a packet from a raw packet string."""
raw = raw_packet
raw["raw_dict"] = raw.copy()
translate_fields = {
"from": "from_call",
"to": "to_call",
}
# First translate some fields
for key in translate_fields:
if key in raw:
raw[translate_fields[key]] = raw[key]
del raw[key]
if "addresse" in raw:
raw["to_call"] = raw["addresse"]
packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type
class_name = TYPE_LOOKUP[packet_type]
if packet_type == PACKET_TYPE_THIRDPARTY:
# We have an encapsulated packet!
# So we need to decode it and return the inner packet
# as the packet we are going to process.
# This is a recursive call to the factory
subpacket_raw = raw["subpacket"]
subpacket = Packet.factory(subpacket_raw)
del raw["subpacket"]
# raw["subpacket"] = subpacket
packet = dacite.from_dict(data_class=class_name, data=raw)
packet.subpacket = subpacket
return packet
if packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here
if "latitude" in raw:
class_name = GPSPacket
if packet_type == PACKET_TYPE_WX:
# the weather information is in a dict
# this brings those values out to the outer dict
for key in raw["weather"]:
raw[key] = raw["weather"][key]
# If we have the broken aprslib, then we need to
# Convert the course and speed to wind_speed and wind_direction
# aprslib issue #80
# https://github.com/rossengeorgiev/aprs-python/issues/80
# Wind speed and course is option in the SPEC.
# For some reason aprslib multiplies the speed by 1.852.
if "wind_speed" not in raw and "wind_direction" not in raw:
# Most likely this is the broken aprslib
# So we need to convert the wind_gust speed
raw["wind_gust"] = round(raw.get("wind_gust", 0) / 0.44704, 3)
if "wind_speed" not in raw:
wind_speed = raw.get("speed")
if wind_speed:
raw["wind_speed"] = round(wind_speed / 1.852, 3)
raw["weather"]["wind_speed"] = raw["wind_speed"]
if "speed" in raw:
del raw["speed"]
# Let's adjust the rain numbers as well, since it's wrong
raw["rain_1h"] = round((raw.get("rain_1h", 0) / .254) * .01, 3)
raw["weather"]["rain_1h"] = raw["rain_1h"]
raw["rain_24h"] = round((raw.get("rain_24h", 0) / .254) * .01, 3)
raw["weather"]["rain_24h"] = raw["rain_24h"]
raw["rain_since_midnight"] = round((raw.get("rain_since_midnight", 0) / .254) * .01, 3)
raw["weather"]["rain_since_midnight"] = raw["rain_since_midnight"]
if "wind_direction" not in raw:
wind_direction = raw.get("course")
if wind_direction:
raw["wind_direction"] = wind_direction
raw["weather"]["wind_direction"] = raw["wind_direction"]
if "course" in raw:
del raw["course"]
return dacite.from_dict(data_class=class_name, data=raw)
def log(self, header=None):
def log(self, header: Optional[str] = None) -> None:
"""LOG a packet to the logfile."""
asdict(self)
log_list = ["\n"]
@ -273,29 +190,39 @@ class Packet(metaclass=abc.ABCMeta):
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
if not self.raw:
raise ValueError("No message text to send. call prepare() first.")
message = self.raw[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def __str__(self):
def __str__(self) -> str:
"""Show the raw version of the packet"""
self.prepare()
if not self.raw:
raise ValueError("self.raw is unset")
return self.raw
def __repr__(self):
def __repr__(self) -> str:
"""Build the repr version of the packet."""
repr = (
f"{self.__class__.__name__}:"
f" From: {self.from_call} "
" To: "
f" To: {self.to_call}"
)
return repr
@dataclass(unsafe_hash=True)
class AckPacket(Packet):
response: str = field(default=None)
_type: str = field(default="AckPacket", hash=False)
response: Optional[str] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "AckPacket":
raw = _translate_fields(raw)
return AckPacket(**raw)
def __post__init__(self):
if self.response:
@ -307,7 +234,13 @@ class AckPacket(Packet):
@dataclass(unsafe_hash=True)
class RejectPacket(Packet):
response: str = field(default=None)
_type: str = field(default="RejectPacket", hash=False)
response: Optional[str] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "RejectPacket":
raw = _translate_fields(raw)
return RejectPacket(**raw)
def __post__init__(self):
if self.response:
@ -317,10 +250,10 @@ class RejectPacket(Packet):
self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}"
@dataclass_json
@dataclass(unsafe_hash=True)
class MessagePacket(Packet):
message_text: str = field(default=None)
_type: str = field(default="MessagePacket", hash=False)
message_text: Optional[str] = field(default=None)
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
@ -328,6 +261,9 @@ class MessagePacket(Packet):
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
if not self.message_text:
raise ValueError("No message text to send. Populate message_text field.")
message = self.message_text[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
@ -339,12 +275,23 @@ class MessagePacket(Packet):
str(self.msgNo),
)
@staticmethod
def from_aprslib_dict(raw: dict) -> "MessagePacket":
raw = _translate_fields(raw)
return MessagePacket(**raw)
@dataclass(unsafe_hash=True)
class StatusPacket(Packet):
status: str = field(default=None)
_type: str = field(default="StatusPacket", hash=False)
status: Optional[str] = field(default=None)
messagecapable: bool = field(default=False)
comment: str = field(default=None)
comment: Optional[str] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "StatusPacket":
raw = _translate_fields(raw)
return StatusPacket(**raw)
def _build_payload(self):
raise NotImplementedError
@ -352,14 +299,27 @@ class StatusPacket(Packet):
@dataclass(unsafe_hash=True)
class GPSPacket(Packet):
_type: str = field(default="GPSPacket", hash=False)
latitude: float = field(default=0.00)
longitude: float = field(default=0.00)
altitude: float = field(default=0.00)
rng: float = field(default=0.00)
posambiguity: int = field(default=0)
comment: str = field(default=None)
messagecapable: bool = field(default=False)
comment: Optional[str] = field(default=None)
symbol: str = field(default="l")
symbol_table: str = field(default="/")
raw_timestamp: Optional[str] = field(default=None)
object_name: Optional[str] = field(default=None)
object_format: Optional[str] = field(default=None)
alive: Optional[bool] = field(default=None)
course: Optional[int] = field(default=None)
speed: Optional[float] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "GPSPacket":
raw = _translate_fields(raw)
return GPSPacket(**raw)
def decdeg2dms(self, degrees_decimal):
is_positive = degrees_decimal >= 0
@ -467,6 +427,13 @@ class GPSPacket(Packet):
@dataclass(unsafe_hash=True)
class BeaconPacket(GPSPacket):
_type: str = field(default="BeaconPacket", hash=False)
@staticmethod
def from_aprslib_dict(raw: dict) -> "BeaconPacket":
raw = _translate_fields(raw)
return BeaconPacket(**raw)
def _build_payload(self):
"""The payload is the non headers portion of the packet."""
time_zulu = self._build_time_zulu()
@ -487,9 +454,10 @@ class BeaconPacket(GPSPacket):
@dataclass
class MicEPacket(GPSPacket):
_type: str = field(default="MicEPacket", hash=False)
messagecapable: bool = False
mbits: str = None
mtype: str = None
mbits: Optional[str] = None
mtype: Optional[str] = None
# in MPH
speed: float = 0.00
# 0 to 360
@ -501,14 +469,20 @@ class MicEPacket(GPSPacket):
@dataclass
class ObjectPacket(GPSPacket):
_type: str = field(default="ObjectPacket", hash=False)
alive: bool = True
raw_timestamp: str = None
raw_timestamp: Optional[str] = None
symbol: str = field(default="r")
# in MPH
speed: float = 0.00
# 0 to 360
course: int = 0
@staticmethod
def from_aprslib_dict(raw: dict) -> "ObjectPacket":
raw = _translate_fields(raw)
return ObjectPacket(**raw)
def _build_payload(self):
time_zulu = self._build_time_zulu()
lat = self.convert_latitude(self.latitude)
@ -541,6 +515,7 @@ class ObjectPacket(GPSPacket):
@dataclass()
class WeatherPacket(GPSPacket):
_type: str = field(default="WeatherPacket", hash=False)
symbol: str = "_"
wind_speed: float = 0.00
wind_direction: int = 0
@ -552,7 +527,57 @@ class WeatherPacket(GPSPacket):
rain_since_midnight: float = 0.00
humidity: int = 0
pressure: float = 0.00
comment: str = None
comment: Optional[str] = field(default=None)
luminosity: Optional[int] = field(default=None)
wx_raw_timestamp: Optional[str] = field(default=None)
course: Optional[int] = field(default=None)
speed: Optional[float] = field(default=None)
@staticmethod
def from_aprslib_dict(raw: dict) -> "WeatherPacket":
"""Create from a dictionary that has come directly from aprslib parse"""
# Because from is a reserved word in python, we need to translate it
# from -> from_call and to -> to_call
raw = _translate_fields(raw)
for key in raw["weather"]:
raw[key] = raw["weather"][key]
# If we have the broken aprslib, then we need to
# Convert the course and speed to wind_speed and wind_direction
# aprslib issue #80
# https://github.com/rossengeorgiev/aprs-python/issues/80
# Wind speed and course is option in the SPEC.
# For some reason aprslib multiplies the speed by 1.852.
if "wind_speed" not in raw and "wind_direction" not in raw:
# Most likely this is the broken aprslib
# So we need to convert the wind_gust speed
raw["wind_gust"] = round(raw.get("wind_gust", 0) / 0.44704, 3)
if "wind_speed" not in raw:
wind_speed = raw.get("speed")
if wind_speed:
raw["wind_speed"] = round(wind_speed / 1.852, 3)
raw["weather"]["wind_speed"] = raw["wind_speed"]
if "speed" in raw:
del raw["speed"]
# Let's adjust the rain numbers as well, since it's wrong
raw["rain_1h"] = round((raw.get("rain_1h", 0) / .254) * .01, 3)
raw["weather"]["rain_1h"] = raw["rain_1h"]
raw["rain_24h"] = round((raw.get("rain_24h", 0) / .254) * .01, 3)
raw["weather"]["rain_24h"] = raw["rain_24h"]
raw["rain_since_midnight"] = round((raw.get("rain_since_midnight", 0) / .254) * .01, 3)
raw["weather"]["rain_since_midnight"] = raw["rain_since_midnight"]
if "wind_direction" not in raw:
wind_direction = raw.get("course")
if wind_direction:
raw["wind_direction"] = wind_direction
raw["weather"]["wind_direction"] = raw["wind_direction"]
if "course" in raw:
del raw["course"]
del raw["weather"]
return WeatherPacket(**raw)
def _build_payload(self):
"""Build an uncompressed weather packet
@ -614,9 +639,11 @@ class WeatherPacket(GPSPacket):
)
class ThirdParty(Packet):
@dataclass()
class ThirdPartyPacket(Packet):
_type: str = "ThirdPartyPacket"
# Holds the encapsulated packet
subpacket: Packet = field(default=None, compare=True, hash=False)
subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False)
def __repr__(self):
"""Build the repr version of the packet."""
@ -629,26 +656,50 @@ class ThirdParty(Packet):
return repr_str
@staticmethod
def from_aprslib_dict(raw: dict) -> "ThirdPartyPacket":
"""Create from a dictionary that has come directly from aprslib parse"""
# Because from is a reserved word in python, we need to translate it
# from -> from_call and to -> to_call
raw = _translate_fields(raw)
subpacket = raw.get("subpacket")
del raw["subpacket"]
return ThirdPartyPacket(**raw, subpacket=factory(subpacket))
TYPE_LOOKUP = {
TYPE_LOOKUP: dict[str, type[Packet]] = {
PACKET_TYPE_WX: WeatherPacket,
PACKET_TYPE_WEATHER: WeatherPacket,
PACKET_TYPE_MESSAGE: MessagePacket,
PACKET_TYPE_ACK: AckPacket,
PACKET_TYPE_REJECT: RejectPacket,
PACKET_TYPE_MICE: MicEPacket,
PACKET_TYPE_OBJECT: ObjectPacket,
PACKET_TYPE_STATUS: StatusPacket,
PACKET_TYPE_BEACON: GPSPacket,
PACKET_TYPE_BEACON: BeaconPacket,
PACKET_TYPE_UNKNOWN: Packet,
PACKET_TYPE_THIRDPARTY: ThirdParty,
PACKET_TYPE_THIRDPARTY: ThirdPartyPacket,
}
OBJ_LOOKUP: dict[str, type[Packet]] = {
"MessagePacket": MessagePacket,
"AckPacket": AckPacket,
"RejectPacket": RejectPacket,
"MicEPacket": MicEPacket,
"ObjectPacket": ObjectPacket,
"StatusPacket": StatusPacket,
"GPSPacket": GPSPacket,
"BeaconPacket": BeaconPacket,
"WeatherPacket": WeatherPacket,
"ThirdPartyPacket": ThirdPartyPacket,
}
def get_packet_type(packet: dict):
def get_packet_type(packet: dict) -> str:
"""Decode the packet type from the packet."""
pkt_format = packet.get("format", None)
msg_response = packet.get("response", None)
pkt_format = packet.get("format")
msg_response = packet.get("response")
packet_type = PACKET_TYPE_UNKNOWN
if pkt_format == "message" and msg_response == "ack":
packet_type = PACKET_TYPE_ACK
@ -664,9 +715,11 @@ def get_packet_type(packet: dict):
packet_type = PACKET_TYPE_STATUS
elif pkt_format == PACKET_TYPE_BEACON:
packet_type = PACKET_TYPE_BEACON
elif pkt_format == PACKET_TYPE_WX:
packet_type = PACKET_TYPE_WEATHER
elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
if packet.get("symbol", None) == "_":
packet_type = PACKET_TYPE_WX
if packet.get("symbol") == "_":
packet_type = PACKET_TYPE_WEATHER
elif pkt_format == PACKET_TYPE_THIRDPARTY:
packet_type = PACKET_TYPE_THIRDPARTY
@ -676,13 +729,43 @@ def get_packet_type(packet: dict):
return packet_type
def is_message_packet(packet):
def is_message_packet(packet: dict) -> bool:
return get_packet_type(packet) == PACKET_TYPE_MESSAGE
def is_ack_packet(packet):
def is_ack_packet(packet: dict) -> bool:
return get_packet_type(packet) == PACKET_TYPE_ACK
def is_mice_packet(packet):
def is_mice_packet(packet: dict) -> bool:
return get_packet_type(packet) == PACKET_TYPE_MICE
def factory(raw_packet: dict) -> type[Packet]:
"""Factory method to create a packet from a raw packet string."""
raw = raw_packet
if "_type" in raw:
cls = globals()[raw["_type"]]
return cls.from_dict(raw)
raw["raw_dict"] = raw.copy()
raw = _translate_fields(raw)
packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type
class_name = TYPE_LOOKUP[packet_type]
if packet_type == PACKET_TYPE_WX:
# the weather information is in a dict
# this brings those values out to the outer dict
class_name = WeatherPacket
elif packet_type == PACKET_TYPE_OBJECT and "weather" in raw:
class_name = WeatherPacket
elif packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here
if "latitude" in raw:
class_name = GPSPacket
else:
raise Exception(f"Unknown packet type {packet_type} {raw}")
return class_name.from_aprslib_dict(raw)

View File

@ -28,7 +28,6 @@ wrapt
kiss3
attrs
dataclasses
dacite2
oslo.config
rpyc>=6.0.0
# Pin this here so it doesn't require a compile on

View File

@ -17,7 +17,6 @@ click==8.1.7 # via -r requirements.in, click-completion, click-para
click-completion==0.5.2 # via -r requirements.in
click-params==0.5.0 # via -r requirements.in
commonmark==0.9.1 # via rich
dacite2==2.0.0 # via -r requirements.in
dataclasses==0.6 # via -r requirements.in
dataclasses-json==0.6.4 # via -r requirements.in
debtcollector==3.0.0 # via oslo-config
@ -34,7 +33,7 @@ greenlet==3.0.3 # via eventlet, gevent
h11==0.14.0 # via wsproto
idna==3.6 # via requests
imapclient==3.0.1 # via -r requirements.in
importlib-metadata==7.0.1 # via ax253, kiss3
importlib-metadata==7.0.2 # via ax253, kiss3
itsdangerous==2.1.2 # via flask
jinja2==3.1.3 # via click-completion, flask
kiss3==8.0.0 # via -r requirements.in
@ -45,7 +44,7 @@ mypy-extensions==1.0.0 # via typing-inspect
netaddr==1.2.1 # via oslo-config
oslo-config==9.4.0 # via -r requirements.in
oslo-i18n==6.3.0 # via oslo-config
packaging==23.2 # via marshmallow
packaging==24.0 # via marshmallow
pbr==6.0.0 # via -r requirements.in, oslo-i18n, stevedore
pluggy==1.4.0 # via -r requirements.in
plumbum==1.8.2 # via rpyc
@ -76,7 +75,7 @@ validators==0.22.0 # via click-params
werkzeug==3.0.1 # via -r requirements.in, flask
wrapt==1.16.0 # via -r requirements.in, debtcollector, deprecated
wsproto==1.2.0 # via simple-websocket
zipp==3.17.0 # via importlib-metadata
zipp==3.18.1 # via importlib-metadata
zope-event==5.0 # via gevent
zope-interface==6.2 # via gevent

View File

@ -51,11 +51,8 @@ class TestSendMessageCommand(unittest.TestCase):
):
self.config_and_init()
mock_socketio.emit = mock.MagicMock()
packet = fake.fake_packet(
message="blah",
msg_number=1,
message_format=core.PACKET_TYPE_ACK,
)
# Create an ACK packet
packet = fake.fake_ack_packet()
mock_queue = mock.MagicMock()
socketio = mock.MagicMock()
wcp = webchat.WebChatProcessPacketThread(mock_queue, socketio)

View File

@ -1,4 +1,4 @@
from aprsd import packets, plugin, threads
from aprsd import plugin, threads
from aprsd.packets import core
@ -13,6 +13,7 @@ def fake_packet(
message=None,
msg_number=None,
message_format=core.PACKET_TYPE_MESSAGE,
response=None,
):
packet_dict = {
"from": fromcall,
@ -27,7 +28,17 @@ def fake_packet(
if msg_number:
packet_dict["msgNo"] = str(msg_number)
return packets.Packet.factory(packet_dict)
if response:
packet_dict["response"] = response
return core.factory(packet_dict)
def fake_ack_packet():
return fake_packet(
msg_number=12,
response=core.PACKET_TYPE_ACK,
)
class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase):

View File

@ -11,7 +11,7 @@ from .. import fake, test_plugin
CONF = cfg.CONF
class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
class TestUSWeatherPlugin(test_plugin.TestPlugin):
def test_not_enabled_missing_aprs_fi_key(self):
# When the aprs.fi api key isn't set, then

View File

@ -1,6 +1,8 @@
import unittest
from unittest import mock
import aprslib
from aprsd import packets
from aprsd.packets import core
@ -55,7 +57,7 @@ class TestPluginBase(unittest.TestCase):
def test_packet_factory(self):
pkt_dict = self._fake_dict()
pkt = packets.Packet.factory(pkt_dict)
pkt = packets.factory(pkt_dict)
self.assertIsInstance(pkt, packets.MessagePacket)
self.assertEqual(fake.FAKE_FROM_CALLSIGN, pkt.from_call)
@ -71,7 +73,7 @@ class TestPluginBase(unittest.TestCase):
"comment": "Home!",
}
pkt_dict["format"] = core.PACKET_TYPE_UNCOMPRESSED
pkt = packets.Packet.factory(pkt_dict)
pkt = packets.factory(pkt_dict)
self.assertIsInstance(pkt, packets.WeatherPacket)
@mock.patch("aprsd.packets.core.GPSPacket._build_time_zulu")
@ -100,3 +102,31 @@ class TestPluginBase(unittest.TestCase):
wx.prepare()
expected = "KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r001p000P000h00b00000"
self.assertEqual(expected, wx.raw)
def test_beacon_factory(self):
"""Test to ensure a beacon packet is created."""
packet_raw = "WB4BOR-12>APZ100,WIDE2-1:@161647z3724.15N107847.58W$ APRSD WebChat"
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
def test_reject_factory(self):
"""Test to ensure a reject packet is created."""
packet_raw = "HB9FDL-1>APK102,HB9FM-4*,WIDE2,qAR,HB9FEF-11::REPEAT :rej4139"
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.RejectPacket)
def test_thirdparty_factory(self):
"""Test to ensure a third party packet is created."""
packet_raw = "GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW"
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.ThirdPartyPacket)
def test_weather_factory(self):
"""Test to ensure a weather packet is created."""
packet_raw = "FW9222>APRS,TCPXX*,qAX,CWOP-6:@122025z2953.94N/08423.77W_232/003g006t084r000p032P000h80b10157L745.DsWLL"
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.WeatherPacket)

View File

@ -45,7 +45,6 @@ class TestPluginManager(unittest.TestCase):
self.assertEqual([], plugin_list)
pm.setup_plugins()
plugin_list = pm.get_plugins()
print(plugin_list)
self.assertIsInstance(plugin_list, list)
self.assertIsInstance(
plugin_list[0],
@ -163,9 +162,7 @@ class TestPluginBase(TestPlugin):
self.assertEqual(expected, actual)
mock_process.assert_not_called()
packet = fake.fake_packet(
message_format=core.PACKET_TYPE_ACK,
)
packet = fake.fake_ack_packet()
expected = packets.NULL_MESSAGE
actual = p.filter(packet)
self.assertEqual(expected, actual)