diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index a059e8f..0a3f68e 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -7,7 +7,6 @@ import sys import threading import time -from aprslib import util as aprslib_util import click import flask from flask import request @@ -539,10 +538,10 @@ class SendMessageNamespace(Namespace): def on_gps(self, data): LOG.debug(f"WS on_GPS: {data}") - lat = aprslib_util.latitude_to_ddm(data["latitude"]) - long = aprslib_util.longitude_to_ddm(data["longitude"]) - LOG.debug(f"Lat DDM {lat}") - LOG.debug(f"Long DDM {long}") + lat = data["latitude"] + long = data["longitude"] + LOG.debug(f"Lat {lat}") + LOG.debug(f"Long {long}") tx.send( packets.GPSPacket( diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py index 1335637..c83d002 100644 --- a/aprsd/packets/__init__.py +++ b/aprsd/packets/__init__.py @@ -1,7 +1,7 @@ from aprsd.packets.core import ( # noqa: F401 - AckPacket, BeaconPacket, GPSPacket, MessagePacket, MicEPacket, ObjectPacket, - Packet, RejectPacket, StatusPacket, ThirdPartyPacket, UnknownPacket, - WeatherPacket, factory, + AckPacket, BeaconPacket, BulletinPacket, GPSPacket, MessagePacket, + MicEPacket, ObjectPacket, Packet, RejectPacket, StatusPacket, + ThirdPartyPacket, UnknownPacket, WeatherPacket, factory, ) from aprsd.packets.packet_list import PacketList # noqa: F401 from aprsd.packets.seen_list import SeenList # noqa: F401 diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 06187c7..9372000 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -6,6 +6,7 @@ import time # Due to a failure in python 3.8 from typing import Any, List, Optional, Type, TypeVar, Union +from aprslib import util as aprslib_util from dataclasses_json import ( CatchAll, DataClassJsonMixin, Undefined, dataclass_json, ) @@ -131,7 +132,7 @@ class Packet: the human readable payload. """ self.prepare() - msg = self._filter_for_send().rstrip("\n") + msg = self._filter_for_send(self.raw).rstrip("\n") return msg def prepare(self) -> None: @@ -146,10 +147,10 @@ class Packet: """The payload is the non headers portion of the packet.""" if not self.to_call: raise ValueError("to_call isn't set. Must set to_call before calling prepare()") - msg = self._filter_for_send().rstrip("\n") + + # The base packet class has no real payload self.payload = ( f":{self.to_call.ljust(9)}" - f":{msg}" ) def _build_raw(self) -> None: @@ -159,16 +160,16 @@ class Packet: self.payload, ) - def _filter_for_send(self) -> str: + def _filter_for_send(self, msg) -> str: """Filter and format message string for FCC.""" # max? ftm400 displays 64, raw msg shows 74 # and ftm400-send is max 64. setting this to # 67 displays 64 on the ftm400. (+3 {01 suffix) # feature req: break long ones into two msgs - if not self.raw: + if not msg: raise ValueError("No message text to send. call prepare() first.") - message = self.raw[:67] + message = msg[:67] # We all miss George Carlin return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) @@ -193,14 +194,28 @@ class Packet: @dataclass(unsafe_hash=True) class AckPacket(Packet): _type: str = field(default="AckPacket", hash=False) - response: Optional[str] = field(default=None) - - def __post__init__(self): - if self.response: - LOG.warning("Response set!") def _build_payload(self): - self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}" + self.payload = f":{self.to_call: <9}:ack{self.msgNo}" + + +@dataclass_json +@dataclass(unsafe_hash=True) +class BulletinPacket(Packet): + _type: str = "BulletinPacket" + # Holds the encapsulated packet + bid: Optional[str] = field(default="1") + message_text: Optional[str] = field(default=None) + + @property + def human_info(self) -> str: + return f"BLN{self.bid} {self.message_text}" + + def _build_payload(self) -> None: + self.payload = ( + f":BLN{self.bid:<9}" + f":{self.message_text}" + ) @dataclass_json @@ -214,7 +229,7 @@ class RejectPacket(Packet): LOG.warning("Response set!") def _build_payload(self): - self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}" + self.payload = f":{self.to_call: <9}:rej{self.msgNo}" @dataclass_json @@ -223,23 +238,10 @@ class MessagePacket(Packet): _type: str = field(default="MessagePacket", hash=False) message_text: Optional[str] = field(default=None) - def _filter_for_send(self) -> str: - """Filter and format message string for FCC.""" - # max? ftm400 displays 64, raw msg shows 74 - # and ftm400-send is max 64. setting this to - # 67 displays 64 on the ftm400. (+3 {01 suffix) - # feature req: break long ones into two msgs - if not self.message_text: - raise ValueError("No message text to send. Populate message_text field.") - - message = self.message_text[:67] - # We all miss George Carlin - return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def _build_payload(self): self.payload = ":{}:{}{{{}".format( self.to_call.ljust(9), - self._filter_for_send().rstrip("\n"), + self._filter_for_send(self.message_text).rstrip("\n"), str(self.msgNo), ) @@ -253,23 +255,10 @@ class StatusPacket(Packet): comment: Optional[str] = field(default=None) raw_timestamp: Optional[str] = field(default=None) - def _filter_for_send(self) -> str: - """Filter and format message string for FCC.""" - # max? ftm400 displays 64, raw msg shows 74 - # and ftm400-send is max 64. setting this to - # 67 displays 64 on the ftm400. (+3 {01 suffix) - # feature req: break long ones into two msgs - if not self.status and not self.comment: - self.status = "None" - - message = self.status[:67] - # We all miss George Carlin - return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def _build_payload(self): self.payload = ":{}:{}{{{}".format( self.to_call.ljust(9), - self._filter_for_send().rstrip("\n"), + self._filter_for_send(self.status).rstrip("\n"), str(self.msgNo), ) @@ -308,102 +297,29 @@ class GPSPacket(Packet): # http://www.aprs.org/datum.txt daodatumbyte: Optional[str] = field(default=None) - def decdeg2dms(self, degrees_decimal): - is_positive = degrees_decimal >= 0 - degrees_decimal = abs(degrees_decimal) - minutes, seconds = divmod(degrees_decimal * 3600, 60) - degrees, minutes = divmod(minutes, 60) - degrees = degrees if is_positive else -degrees - - degrees = str(int(degrees)).replace("-", "0") - minutes = str(int(minutes)).replace("-", "0") - seconds = str(int(round(seconds * 0.01, 2) * 100)) - - return {"degrees": degrees, "minutes": minutes, "seconds": seconds} - - def decdeg2dmm_m(self, degrees_decimal): - is_positive = degrees_decimal >= 0 - degrees_decimal = abs(degrees_decimal) - minutes, seconds = divmod(degrees_decimal * 3600, 60) - degrees, minutes = divmod(minutes, 60) - degrees = degrees if is_positive else -degrees - - degrees = abs(int(degrees)) - minutes = int(round(minutes + (seconds / 60), 2)) - hundredths = round(seconds / 60, 2) - - return { - "degrees": degrees, "minutes": minutes, "seconds": seconds, - "hundredths": hundredths, - } - - def convert_latitude(self, degrees_decimal): - det = self.decdeg2dmm_m(degrees_decimal) - if degrees_decimal > 0: - direction = "N" - else: - direction = "S" - - degrees = str(det.get("degrees")).zfill(2) - minutes = str(det.get("minutes")).zfill(2) - seconds = det.get("seconds") - hun = det.get("hundredths") - hundredths = f"{hun:.2f}".split(".")[1] - - LOG.debug( - f"LAT degress {degrees} minutes {str(minutes)} " - f"seconds {seconds} hundredths {hundredths} direction {direction}", - ) - - lat = f"{degrees}{str(minutes)}.{hundredths}{direction}" - return lat - - def convert_longitude(self, degrees_decimal): - det = self.decdeg2dmm_m(degrees_decimal) - if degrees_decimal > 0: - direction = "E" - else: - direction = "W" - - degrees = str(det.get("degrees")).zfill(3) - minutes = str(det.get("minutes")).zfill(2) - seconds = det.get("seconds") - hun = det.get("hundredths") - hundredths = f"{hun:.2f}".split(".")[1] - - LOG.debug( - f"LON degress {degrees} minutes {str(minutes)} " - f"seconds {seconds} hundredths {hundredths} direction {direction}", - ) - - lon = f"{degrees}{str(minutes)}.{hundredths}{direction}" - return lon - def _build_time_zulu(self): """Build the timestamp in UTC/zulu.""" if self.timestamp: - local_dt = datetime.fromtimestamp(self.timestamp) - else: - local_dt = datetime.now() - self.timestamp = datetime.timestamp(local_dt) - - utc_offset_timedelta = datetime.utcnow() - local_dt - result_utc_datetime = local_dt + utc_offset_timedelta - time_zulu = result_utc_datetime.strftime("%d%H%M") - return time_zulu + return datetime.utcfromtimestamp(self.timestamp).strftime("%d%H%M") def _build_payload(self): """The payload is the non headers portion of the packet.""" time_zulu = self._build_time_zulu() - lat = self.latitude - long = self.longitude - self.payload = ( - f"@{time_zulu}z{lat}{self.symbol_table}" - f"{long}{self.symbol}" - ) + lat = aprslib_util.latitude_to_ddm(self.latitude) + long = aprslib_util.longitude_to_ddm(self.longitude) + payload = [ + "@" if self.timestamp else "!", + time_zulu, + lat, + self.symbol_table, + long, + self.symbol, + ] if self.comment: - self.payload = f"{self.payload}{self.comment}" + payload.append(self._filter_for_send(self.comment)) + + self.payload = "".join(payload) def _build_raw(self): self.raw = ( @@ -438,14 +354,20 @@ class BeaconPacket(GPSPacket): def _build_payload(self): """The payload is the non headers portion of the packet.""" time_zulu = self._build_time_zulu() - lat = self.convert_latitude(self.latitude) - long = self.convert_longitude(self.longitude) + lat = aprslib_util.latitude_to_ddm(self.latitude) + lon = aprslib_util.longitude_to_ddm(self.longitude) self.payload = ( f"@{time_zulu}z{lat}{self.symbol_table}" - f"{long}{self.symbol}APRSD Beacon" + f"{lon}" ) + if self.comment: + comment = self._filter_for_send(self.comment) + self.payload = f"{self.payload}{self.symbol}{comment}" + else: + self.payload = f"{self.payload}{self.symbol}APRSD Beacon" + def _build_raw(self): self.raw = ( f"{self.from_call}>APZ100:" @@ -515,8 +437,8 @@ class ObjectPacket(GPSPacket): def _build_payload(self): time_zulu = self._build_time_zulu() - lat = self.convert_latitude(self.latitude) - long = self.convert_longitude(self.longitude) + lat = aprslib_util.latitude_to_ddm(self.latitude) + long = aprslib_util.longitude_to_ddm(self.longitude) self.payload = ( f"*{time_zulu}z{lat}{self.symbol_table}" @@ -524,7 +446,8 @@ class ObjectPacket(GPSPacket): ) if self.comment: - self.payload = f"{self.payload}{self.comment}" + comment = self._filter_for_send(self.comment) + self.payload = f"{self.payload}{comment}" def _build_raw(self): """ @@ -674,7 +597,8 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin): f"b{self.pressure:05.0f}", ] if self.comment: - contents.append(self.comment) + comment = self.filter_for_send(self.comment) + contents.append(comment) self.payload = "".join(contents) def _build_raw(self): @@ -714,19 +638,6 @@ class ThirdPartyPacket(Packet, DataClassJsonMixin): return f"{self.from_call}->{self.to_call} {sub_info}" -@dataclass_json -@dataclass(unsafe_hash=True) -class BulletinPacket(Packet): - _type: str = "BulletinPacket" - # Holds the encapsulated packet - bid: Optional[str] = field(default="1") - message_text: Optional[str] = field(default=None) - - @property - def human_info(self) -> str: - return f"BLN{self.bid} {self.message_text}" - - @dataclass_json(undefined=Undefined.INCLUDE) @dataclass(unsafe_hash=True) class UnknownPacket: diff --git a/aprsd/packets/log.py b/aprsd/packets/log.py index 74ef36c..ff4704a 100644 --- a/aprsd/packets/log.py +++ b/aprsd/packets/log.py @@ -63,7 +63,6 @@ def log_multiline(packet, tx: Optional[bool] = False, header: Optional[bool] = T if hasattr(packet, "comment") and packet.comment: logit.append(f" Comment : {packet.comment}") - raw = packet.raw.replace("<", "\\<") logit.append(f" Raw : {raw}") logit.append(f"{header_str}________(<{PACKET_COLOR}>{name})") diff --git a/tests/test_packets.py b/tests/test_packets.py index e8e3679..a9774e0 100644 --- a/tests/test_packets.py +++ b/tests/test_packets.py @@ -2,6 +2,7 @@ import unittest from unittest import mock import aprslib +from aprslib import util as aprslib_util from aprsd import packets from aprsd.packets import core @@ -9,7 +10,7 @@ from aprsd.packets import core from . import fake -class TestPluginBase(unittest.TestCase): +class TestPacketBase(unittest.TestCase): def _fake_dict( self, @@ -165,3 +166,120 @@ class TestPluginBase(unittest.TestCase): packet_dict = aprslib.parse(packet_raw) packet = packets.factory(packet_dict) self.assertIsInstance(packet, packets.MicEPacket) + + def test_ack_format(self): + """Test the ack packet format.""" + ack = packets.AckPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + msgNo=123, + ) + + expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:ack123" + self.assertEqual(expected, str(ack)) + + def test_reject_format(self): + """Test the reject packet format.""" + reject = packets.RejectPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + msgNo=123, + ) + + expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:rej123" + self.assertEqual(expected, str(reject)) + + def test_beacon_format(self): + """Test the beacon packet format.""" + lat = 28.123456 + lon = -80.123456 + ts = 1711219496.6426 + comment = "My Beacon Comment" + packet = packets.BeaconPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + latitude=lat, + longitude=lon, + timestamp=ts, + symbol=">", + comment=comment, + ) + + expected_lat = aprslib_util.latitude_to_ddm(lat) + expected_lon = aprslib_util.longitude_to_ddm(lon) + expected = f"KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{comment}" + self.assertEqual(expected, str(packet)) + + def test_beacon_format_no_comment(self): + """Test the beacon packet format.""" + lat = 28.123456 + lon = -80.123456 + ts = 1711219496.6426 + packet = packets.BeaconPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + latitude=lat, + longitude=lon, + timestamp=ts, + symbol=">", + ) + empty_comment = "APRSD Beacon" + + expected_lat = aprslib_util.latitude_to_ddm(lat) + expected_lon = aprslib_util.longitude_to_ddm(lon) + expected = f"KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{empty_comment}" + self.assertEqual(expected, str(packet)) + + def test_bulletin_format(self): + """Test the bulletin packet format.""" + # bulletin id = 0 + bid = 0 + packet = packets.BulletinPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + message_text="My Bulletin Message", + bid=0, + ) + + expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{packet.message_text}" + self.assertEqual(expected, str(packet)) + + # bulletin id = 1 + bid = 1 + txt = "((((((( CX2SA - Salto Uruguay ))))))) http://www.cx2sa.org" + packet = packets.BulletinPacket( + from_call=fake.FAKE_FROM_CALLSIGN, + message_text=txt, + bid=1, + ) + + expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{txt}" + self.assertEqual(expected, str(packet)) + + def test_message_format(self): + """Test the message packet format.""" + + message = "My Message" + msgno = "ABX" + packet = packets.MessagePacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + message_text=message, + msgNo=msgno, + ) + + expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{message}{{{msgno}" + self.assertEqual(expected, str(packet)) + + # test with bad words + # Currently fails with mixed case + message = "My cunt piss fuck text" + exp_msg = "My **** **** **** text" + msgno = "ABX" + packet = packets.MessagePacket( + from_call=fake.FAKE_FROM_CALLSIGN, + to_call=fake.FAKE_TO_CALLSIGN, + message_text=message, + msgNo=msgno, + ) + expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{exp_msg}{{{msgno}" + self.assertEqual(expected, str(packet))