More packet cleanup and tests

This commit is contained in:
Hemna 2024-03-23 16:59:33 -04:00
parent f4356e4a20
commit f53df24988
5 changed files with 185 additions and 158 deletions

View File

@ -7,7 +7,6 @@ import sys
import threading
import time
from aprslib import util as aprslib_util
import click
import flask
from flask import request
@ -539,10 +538,10 @@ class SendMessageNamespace(Namespace):
def on_gps(self, data):
LOG.debug(f"WS on_GPS: {data}")
lat = aprslib_util.latitude_to_ddm(data["latitude"])
long = aprslib_util.longitude_to_ddm(data["longitude"])
LOG.debug(f"Lat DDM {lat}")
LOG.debug(f"Long DDM {long}")
lat = data["latitude"]
long = data["longitude"]
LOG.debug(f"Lat {lat}")
LOG.debug(f"Long {long}")
tx.send(
packets.GPSPacket(

View File

@ -1,7 +1,7 @@
from aprsd.packets.core import ( # noqa: F401
AckPacket, BeaconPacket, GPSPacket, MessagePacket, MicEPacket, ObjectPacket,
Packet, RejectPacket, StatusPacket, ThirdPartyPacket, UnknownPacket,
WeatherPacket, factory,
AckPacket, BeaconPacket, BulletinPacket, GPSPacket, MessagePacket,
MicEPacket, ObjectPacket, Packet, RejectPacket, StatusPacket,
ThirdPartyPacket, UnknownPacket, WeatherPacket, factory,
)
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401

View File

@ -6,6 +6,7 @@ import time
# Due to a failure in python 3.8
from typing import Any, List, Optional, Type, TypeVar, Union
from aprslib import util as aprslib_util
from dataclasses_json import (
CatchAll, DataClassJsonMixin, Undefined, dataclass_json,
)
@ -131,7 +132,7 @@ class Packet:
the human readable payload.
"""
self.prepare()
msg = self._filter_for_send().rstrip("\n")
msg = self._filter_for_send(self.raw).rstrip("\n")
return msg
def prepare(self) -> None:
@ -146,10 +147,10 @@ class Packet:
"""The payload is the non headers portion of the packet."""
if not self.to_call:
raise ValueError("to_call isn't set. Must set to_call before calling prepare()")
msg = self._filter_for_send().rstrip("\n")
# The base packet class has no real payload
self.payload = (
f":{self.to_call.ljust(9)}"
f":{msg}"
)
def _build_raw(self) -> None:
@ -159,16 +160,16 @@ class Packet:
self.payload,
)
def _filter_for_send(self) -> str:
def _filter_for_send(self, msg) -> str:
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
if not self.raw:
if not msg:
raise ValueError("No message text to send. call prepare() first.")
message = self.raw[:67]
message = msg[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
@ -193,14 +194,28 @@ class Packet:
@dataclass(unsafe_hash=True)
class AckPacket(Packet):
_type: str = field(default="AckPacket", hash=False)
response: Optional[str] = field(default=None)
def __post__init__(self):
if self.response:
LOG.warning("Response set!")
def _build_payload(self):
self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}"
self.payload = f":{self.to_call: <9}:ack{self.msgNo}"
@dataclass_json
@dataclass(unsafe_hash=True)
class BulletinPacket(Packet):
_type: str = "BulletinPacket"
# Holds the encapsulated packet
bid: Optional[str] = field(default="1")
message_text: Optional[str] = field(default=None)
@property
def human_info(self) -> str:
return f"BLN{self.bid} {self.message_text}"
def _build_payload(self) -> None:
self.payload = (
f":BLN{self.bid:<9}"
f":{self.message_text}"
)
@dataclass_json
@ -214,7 +229,7 @@ class RejectPacket(Packet):
LOG.warning("Response set!")
def _build_payload(self):
self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}"
self.payload = f":{self.to_call: <9}:rej{self.msgNo}"
@dataclass_json
@ -223,23 +238,10 @@ class MessagePacket(Packet):
_type: str = field(default="MessagePacket", hash=False)
message_text: Optional[str] = field(default=None)
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
if not self.message_text:
raise ValueError("No message text to send. Populate message_text field.")
message = self.message_text[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def _build_payload(self):
self.payload = ":{}:{}{{{}".format(
self.to_call.ljust(9),
self._filter_for_send().rstrip("\n"),
self._filter_for_send(self.message_text).rstrip("\n"),
str(self.msgNo),
)
@ -253,23 +255,10 @@ class StatusPacket(Packet):
comment: Optional[str] = field(default=None)
raw_timestamp: Optional[str] = field(default=None)
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
if not self.status and not self.comment:
self.status = "None"
message = self.status[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def _build_payload(self):
self.payload = ":{}:{}{{{}".format(
self.to_call.ljust(9),
self._filter_for_send().rstrip("\n"),
self._filter_for_send(self.status).rstrip("\n"),
str(self.msgNo),
)
@ -308,102 +297,29 @@ class GPSPacket(Packet):
# http://www.aprs.org/datum.txt
daodatumbyte: Optional[str] = field(default=None)
def decdeg2dms(self, degrees_decimal):
is_positive = degrees_decimal >= 0
degrees_decimal = abs(degrees_decimal)
minutes, seconds = divmod(degrees_decimal * 3600, 60)
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees
degrees = str(int(degrees)).replace("-", "0")
minutes = str(int(minutes)).replace("-", "0")
seconds = str(int(round(seconds * 0.01, 2) * 100))
return {"degrees": degrees, "minutes": minutes, "seconds": seconds}
def decdeg2dmm_m(self, degrees_decimal):
is_positive = degrees_decimal >= 0
degrees_decimal = abs(degrees_decimal)
minutes, seconds = divmod(degrees_decimal * 3600, 60)
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees
degrees = abs(int(degrees))
minutes = int(round(minutes + (seconds / 60), 2))
hundredths = round(seconds / 60, 2)
return {
"degrees": degrees, "minutes": minutes, "seconds": seconds,
"hundredths": hundredths,
}
def convert_latitude(self, degrees_decimal):
det = self.decdeg2dmm_m(degrees_decimal)
if degrees_decimal > 0:
direction = "N"
else:
direction = "S"
degrees = str(det.get("degrees")).zfill(2)
minutes = str(det.get("minutes")).zfill(2)
seconds = det.get("seconds")
hun = det.get("hundredths")
hundredths = f"{hun:.2f}".split(".")[1]
LOG.debug(
f"LAT degress {degrees} minutes {str(minutes)} "
f"seconds {seconds} hundredths {hundredths} direction {direction}",
)
lat = f"{degrees}{str(minutes)}.{hundredths}{direction}"
return lat
def convert_longitude(self, degrees_decimal):
det = self.decdeg2dmm_m(degrees_decimal)
if degrees_decimal > 0:
direction = "E"
else:
direction = "W"
degrees = str(det.get("degrees")).zfill(3)
minutes = str(det.get("minutes")).zfill(2)
seconds = det.get("seconds")
hun = det.get("hundredths")
hundredths = f"{hun:.2f}".split(".")[1]
LOG.debug(
f"LON degress {degrees} minutes {str(minutes)} "
f"seconds {seconds} hundredths {hundredths} direction {direction}",
)
lon = f"{degrees}{str(minutes)}.{hundredths}{direction}"
return lon
def _build_time_zulu(self):
"""Build the timestamp in UTC/zulu."""
if self.timestamp:
local_dt = datetime.fromtimestamp(self.timestamp)
else:
local_dt = datetime.now()
self.timestamp = datetime.timestamp(local_dt)
utc_offset_timedelta = datetime.utcnow() - local_dt
result_utc_datetime = local_dt + utc_offset_timedelta
time_zulu = result_utc_datetime.strftime("%d%H%M")
return time_zulu
return datetime.utcfromtimestamp(self.timestamp).strftime("%d%H%M")
def _build_payload(self):
"""The payload is the non headers portion of the packet."""
time_zulu = self._build_time_zulu()
lat = self.latitude
long = self.longitude
self.payload = (
f"@{time_zulu}z{lat}{self.symbol_table}"
f"{long}{self.symbol}"
)
lat = aprslib_util.latitude_to_ddm(self.latitude)
long = aprslib_util.longitude_to_ddm(self.longitude)
payload = [
"@" if self.timestamp else "!",
time_zulu,
lat,
self.symbol_table,
long,
self.symbol,
]
if self.comment:
self.payload = f"{self.payload}{self.comment}"
payload.append(self._filter_for_send(self.comment))
self.payload = "".join(payload)
def _build_raw(self):
self.raw = (
@ -438,14 +354,20 @@ class BeaconPacket(GPSPacket):
def _build_payload(self):
"""The payload is the non headers portion of the packet."""
time_zulu = self._build_time_zulu()
lat = self.convert_latitude(self.latitude)
long = self.convert_longitude(self.longitude)
lat = aprslib_util.latitude_to_ddm(self.latitude)
lon = aprslib_util.longitude_to_ddm(self.longitude)
self.payload = (
f"@{time_zulu}z{lat}{self.symbol_table}"
f"{long}{self.symbol}APRSD Beacon"
f"{lon}"
)
if self.comment:
comment = self._filter_for_send(self.comment)
self.payload = f"{self.payload}{self.symbol}{comment}"
else:
self.payload = f"{self.payload}{self.symbol}APRSD Beacon"
def _build_raw(self):
self.raw = (
f"{self.from_call}>APZ100:"
@ -515,8 +437,8 @@ class ObjectPacket(GPSPacket):
def _build_payload(self):
time_zulu = self._build_time_zulu()
lat = self.convert_latitude(self.latitude)
long = self.convert_longitude(self.longitude)
lat = aprslib_util.latitude_to_ddm(self.latitude)
long = aprslib_util.longitude_to_ddm(self.longitude)
self.payload = (
f"*{time_zulu}z{lat}{self.symbol_table}"
@ -524,7 +446,8 @@ class ObjectPacket(GPSPacket):
)
if self.comment:
self.payload = f"{self.payload}{self.comment}"
comment = self._filter_for_send(self.comment)
self.payload = f"{self.payload}{comment}"
def _build_raw(self):
"""
@ -674,7 +597,8 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin):
f"b{self.pressure:05.0f}",
]
if self.comment:
contents.append(self.comment)
comment = self.filter_for_send(self.comment)
contents.append(comment)
self.payload = "".join(contents)
def _build_raw(self):
@ -714,19 +638,6 @@ class ThirdPartyPacket(Packet, DataClassJsonMixin):
return f"{self.from_call}->{self.to_call} {sub_info}"
@dataclass_json
@dataclass(unsafe_hash=True)
class BulletinPacket(Packet):
_type: str = "BulletinPacket"
# Holds the encapsulated packet
bid: Optional[str] = field(default="1")
message_text: Optional[str] = field(default=None)
@property
def human_info(self) -> str:
return f"BLN{self.bid} {self.message_text}"
@dataclass_json(undefined=Undefined.INCLUDE)
@dataclass(unsafe_hash=True)
class UnknownPacket:

View File

@ -63,7 +63,6 @@ def log_multiline(packet, tx: Optional[bool] = False, header: Optional[bool] = T
if hasattr(packet, "comment") and packet.comment:
logit.append(f" Comment : {packet.comment}")
raw = packet.raw.replace("<", "\\<")
logit.append(f" Raw : <fg #828282>{raw}</fg #828282>")
logit.append(f"{header_str}________(<{PACKET_COLOR}>{name}</{PACKET_COLOR}>)")

View File

@ -2,6 +2,7 @@ import unittest
from unittest import mock
import aprslib
from aprslib import util as aprslib_util
from aprsd import packets
from aprsd.packets import core
@ -9,7 +10,7 @@ from aprsd.packets import core
from . import fake
class TestPluginBase(unittest.TestCase):
class TestPacketBase(unittest.TestCase):
def _fake_dict(
self,
@ -165,3 +166,120 @@ class TestPluginBase(unittest.TestCase):
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.MicEPacket)
def test_ack_format(self):
"""Test the ack packet format."""
ack = packets.AckPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo=123,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:ack123"
self.assertEqual(expected, str(ack))
def test_reject_format(self):
"""Test the reject packet format."""
reject = packets.RejectPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo=123,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:rej123"
self.assertEqual(expected, str(reject))
def test_beacon_format(self):
"""Test the beacon packet format."""
lat = 28.123456
lon = -80.123456
ts = 1711219496.6426
comment = "My Beacon Comment"
packet = packets.BeaconPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=lat,
longitude=lon,
timestamp=ts,
symbol=">",
comment=comment,
)
expected_lat = aprslib_util.latitude_to_ddm(lat)
expected_lon = aprslib_util.longitude_to_ddm(lon)
expected = f"KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{comment}"
self.assertEqual(expected, str(packet))
def test_beacon_format_no_comment(self):
"""Test the beacon packet format."""
lat = 28.123456
lon = -80.123456
ts = 1711219496.6426
packet = packets.BeaconPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=lat,
longitude=lon,
timestamp=ts,
symbol=">",
)
empty_comment = "APRSD Beacon"
expected_lat = aprslib_util.latitude_to_ddm(lat)
expected_lon = aprslib_util.longitude_to_ddm(lon)
expected = f"KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{empty_comment}"
self.assertEqual(expected, str(packet))
def test_bulletin_format(self):
"""Test the bulletin packet format."""
# bulletin id = 0
bid = 0
packet = packets.BulletinPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
message_text="My Bulletin Message",
bid=0,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{packet.message_text}"
self.assertEqual(expected, str(packet))
# bulletin id = 1
bid = 1
txt = "((((((( CX2SA - Salto Uruguay ))))))) http://www.cx2sa.org"
packet = packets.BulletinPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
message_text=txt,
bid=1,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{txt}"
self.assertEqual(expected, str(packet))
def test_message_format(self):
"""Test the message packet format."""
message = "My Message"
msgno = "ABX"
packet = packets.MessagePacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
message_text=message,
msgNo=msgno,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{message}{{{msgno}"
self.assertEqual(expected, str(packet))
# test with bad words
# Currently fails with mixed case
message = "My cunt piss fuck text"
exp_msg = "My **** **** **** text"
msgno = "ABX"
packet = packets.MessagePacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
message_text=message,
msgNo=msgno,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{exp_msg}{{{msgno}"
self.assertEqual(expected, str(packet))