Started using dataclasses to describe packets

This patch adds new Packet classes to describe the
incoming packets parsed out from aprslib.
This commit is contained in:
Hemna 2022-12-14 19:21:25 -05:00
parent 2089b2575e
commit 082db7325d
8 changed files with 265 additions and 114 deletions

View File

@ -31,6 +31,7 @@ class Client:
connected = False
server_string = None
filter = None
def __new__(cls, *args, **kwargs):
"""This magic turns this into a singleton."""
@ -44,10 +45,17 @@ class Client:
if config:
self.config = config
def set_filter(self, filter):
self.filter = filter
if self._client:
self._client.set_filter(filter)
@property
def client(self):
if not self._client:
self._client = self.setup_connection()
if self.filter:
self._client.set_filter(self.filter)
return self._client
def reset(self):

View File

@ -5,10 +5,10 @@
# python included libs
import datetime
import logging
import signal
import sys
import time
import aprslib
import click
from rich.console import Console
@ -16,7 +16,7 @@ from rich.console import Console
import aprsd
from aprsd import cli_helper, client, messaging, packets, stats, threads, utils
from aprsd.aprsd import cli
from aprsd.utils import trace
from aprsd.threads import rx
# setup the global logger
@ -37,6 +37,14 @@ def signal_handler(sig, frame):
LOG.info(stats.APRSDStats())
class APRSDListenThread(rx.APRSDRXThread):
def process_packet(self, *args, **kwargs):
raw = self._client.decode_packet(*args, **kwargs)
packet = packets.Packet.factory(raw)
LOG.debug(f"Got packet {packet}")
packet.log(header="RX Packet")
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.option(
@ -74,6 +82,8 @@ def listen(
o/obj1/obj2... - Object Filter Pass all objects with the exact name of obj1, obj2, ... (* wild card allowed)\n
"""
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
config = ctx.obj["config"]
if not aprs_login:
@ -109,26 +119,6 @@ def listen(
packets.WatchList(config=config).load()
packets.SeenList(config=config).load()
@trace.trace
def rx_packet(packet):
resp = packet.get("response", None)
if resp == "ack":
ack_num = packet.get("msgNo")
console.log(f"We saw an ACK {ack_num} Ignoring")
messaging.log_packet(packet)
else:
message = packet.get("message_text", None)
fromcall = packet["from"]
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message",
packet["raw"],
message,
fromcall=fromcall,
ack=msg_number,
console=console,
)
# Initialize the client factory and create
# The correct client object ready for use
client.ClientFactory.setup(config)
@ -140,29 +130,21 @@ def listen(
# Creates the client object
LOG.info("Creating client connection")
aprs_client = client.factory.create()
console.log(aprs_client)
LOG.info(aprs_client)
LOG.debug(f"Filter by '{filter}'")
aprs_client.client.set_filter(filter)
aprs_client.set_filter(filter)
packets.PacketList(config=config)
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()
while True:
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
# with console.status("Listening for packets"):
aprs_client.client.consumer(rx_packet, raw=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
aprs_client.reset()
except aprslib.exceptions.UnknownFormat:
LOG.error("Got a Bad packet")
LOG.debug("Create APRSDListenThread")
listen_thread = APRSDListenThread(threads.msg_queues, config=config)
LOG.debug("Start APRSDListenThread")
listen_thread.start()
LOG.debug("keepalive Join")
keepalive.join()
LOG.debug("listen_thread Join")
listen_thread.join()

View File

@ -178,7 +178,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
)
self.got_ack = True
def process_non_ack_packet(self, packet):
def process_our_message_packet(self, packet):
LOG.info(f"process non ack PACKET {packet}")
packet.get("addresse", None)
fromcall = packet["from"]

View File

@ -1,8 +1,10 @@
from dataclasses import asdict, dataclass, field
import datetime
import logging
import threading
import time
import dacite
import wrapt
from aprsd import utils
@ -14,6 +16,150 @@ LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message"
PACKET_TYPE_ACK = "ack"
PACKET_TYPE_MICE = "mic-e"
PACKET_TYPE_WX = "weather"
PACKET_TYPE_UNKNOWN = "unknown"
PACKET_TYPE_STATUS = "status"
@dataclass
class Packet:
from_call: str
to_call: str
addresse: str = None
format: str = None
msgNo: str = None
packet_type: str = None
timestamp: float = field(default_factory=time.time)
raw: str = None
_raw_dict: dict = field(repr=True, default_factory=lambda: {})
@staticmethod
def factory(raw):
raw["_raw_dict"] = raw.copy()
translate_fields = {
"from": "from_call",
"to": "to_call",
}
# First translate some fields
for key in translate_fields:
if key in raw:
raw[translate_fields[key]] = raw[key]
del raw[key]
if "addresse" in raw:
raw["to_call"] = raw["addresse"]
class_lookup = {
PACKET_TYPE_WX: WeatherPacket,
PACKET_TYPE_MESSAGE: MessagePacket,
PACKET_TYPE_ACK: AckPacket,
PACKET_TYPE_MICE: MicEPacket,
PACKET_TYPE_STATUS: StatusPacket,
PACKET_TYPE_UNKNOWN: Packet,
}
packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type
class_name = class_lookup[packet_type]
if packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here
if "latitude" in raw:
class_name = GPSPacket
if packet_type == PACKET_TYPE_WX:
# the weather information is in a dict
# this brings those values out to the outer dict
for key in raw["weather"]:
raw[key] = raw["weather"][key]
return dacite.from_dict(data_class=class_name, data=raw)
def log(self, header=None):
"""LOG a packet to the logfile."""
asdict(self)
log_list = ["\n"]
if header:
log_list.append(f"{header} _______________")
log_list.append(f" Packet : {self.__class__.__name__}")
log_list.append(f" Raw : {self.raw}")
if self.to_call:
log_list.append(f" To : {self.to_call}")
if self.from_call:
log_list.append(f" From : {self.from_call}")
if hasattr(self, "path"):
log_list.append(f" Path : {'=>'.join(self.path)}")
if hasattr(self, "via"):
log_list.append(f" VIA : {self.via}")
elif isinstance(self, MessagePacket):
log_list.append(f" Message : {self.message_text}")
if self.msgNo:
log_list.append(f" Msg # : {self.msgNo}")
log_list.append(f"{header} _______________ Complete")
LOG.info(self)
LOG.info("\n".join(log_list))
@dataclass
class PathPacket(Packet):
path: list[str] = field(default_factory=list)
via: str = None
@dataclass
class AckPacket(PathPacket):
response: str = None
@dataclass
class MessagePacket(PathPacket):
message_text: str = None
@dataclass
class StatusPacket(PathPacket):
status: str = None
timestamp: int = 0
messagecapable: bool = False
comment: str = None
@dataclass
class GPSPacket(PathPacket):
latitude: float = 0.00
longitude: float = 0.00
altitude: float = 0.00
rng: float = 0.00
posambiguity: int = 0
timestamp: int = 0
comment: str = None
symbol: str = None
symbol_table: str = None
speed: float = 0.00
course: int = 0
@dataclass
class MicEPacket(GPSPacket):
messagecapable: bool = False
mbits: str = None
mtype: str = None
@dataclass
class WeatherPacket(GPSPacket):
symbol: str = "_"
wind_gust: float = 0.00
temperature: float = 0.00
rain_1h: float = 0.00
rain_24h: float = 0.00
rain_since_midnight: float = 0.00
humidity: int = 0
pressure: float = 0.00
messagecapable: bool = False
comment: str = None
class PacketList:
@ -45,11 +191,8 @@ class PacketList:
@wrapt.synchronized(lock)
def add(self, packet):
packet["ts"] = time.time()
if (
"fromcall" in packet
and packet["fromcall"] == self.config["aprs"]["login"]
):
packet.ts = time.time()
if (packet.from_call == self.config["aprs"]["login"]):
self.total_tx += 1
else:
self.total_recv += 1
@ -116,7 +259,10 @@ class WatchList(objectstore.ObjectStoreMixin):
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = packet["from"]
if packet.addresse:
callsign = packet.addresse
else:
callsign = packet.from_call
if self.callsign_in_watchlist(callsign):
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["packets"].append(packet)
@ -178,10 +324,8 @@ class SeenList(objectstore.ObjectStoreMixin):
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = None
if "fromcall" in packet:
callsign = packet["fromcall"]
elif "from" in packet:
callsign = packet["from"]
if packet.from_call:
callsign = packet.from_call
else:
LOG.warning(f"Can't find FROM in packet {packet}")
return
@ -200,12 +344,16 @@ def get_packet_type(packet):
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
packet_type = "unknown"
if msg_format == "message":
packet_type = PACKET_TYPE_MESSAGE
elif msg_response == "ack":
if msg_format == "message" and msg_response == "ack":
packet_type = PACKET_TYPE_ACK
elif msg_format == "message":
packet_type = PACKET_TYPE_MESSAGE
elif msg_format == "mic-e":
packet_type = PACKET_TYPE_MICE
elif msg_format == "status":
packet_type = PACKET_TYPE_STATUS
elif packet.get("symbol", None) == "_":
packet_type = PACKET_TYPE_WX
return packet_type

View File

@ -23,7 +23,6 @@ class APRSDRXThread(APRSDThread):
client.factory.create().client.stop()
def loop(self):
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
@ -65,7 +64,12 @@ class APRSDPluginRXThread(APRSDRXThread):
processing in the PluginProcessPacketThread.
"""
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
raw = self._client.decode_packet(*args, **kwargs)
#LOG.debug(raw)
packet = packets.Packet.factory(raw.copy())
packet.log(header="RX Packet")
#LOG.debug(packet)
del raw
thread = APRSDPluginProcessPacketThread(
config=self.config,
packet=packet,
@ -84,18 +88,18 @@ class APRSDProcessPacketThread(APRSDThread):
def __init__(self, config, packet):
self.config = config
self.packet = packet
name = self.packet["raw"][:10]
name = self.packet.raw[:10]
super().__init__(f"RXPKT-{name}")
def process_ack_packet(self, packet):
ack_num = packet.get("msgNo")
ack_num = packet.msgNo
LOG.info(f"Got ack for message {ack_num}")
messaging.log_message(
"RXACK",
packet["raw"],
packet.raw,
None,
ack=ack_num,
fromcall=packet["from"],
fromcall=packet.from_call,
)
tracker = messaging.MsgTrack()
tracker.remove(ack_num)
@ -106,56 +110,60 @@ class APRSDProcessPacketThread(APRSDThread):
"""Process a packet received from aprs-is server."""
packet = self.packet
packets.PacketList().add(packet)
our_call = self.config["aprsd"]["callsign"].lower()
fromcall = packet["from"]
tocall = packet.get("addresse", None)
msg = packet.get("message_text", None)
msg_id = packet.get("msgNo", "0")
msg_response = packet.get("response", None)
# LOG.debug(f"Got packet from '{fromcall}' - {packet}")
from_call = packet.from_call
if packet.addresse:
to_call = packet.addresse
else:
to_call = packet.to_call
msg_id = packet.msgNo
# We don't put ack packets destined for us through the
# plugins.
wl = packets.WatchList()
wl.update_seen(packet)
if (
tocall
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
and msg_response == "ack"
isinstance(packet, packets.AckPacket)
and packet.addresse.lower() == our_call
):
self.process_ack_packet(packet)
else:
# It's not an ACK for us, so lets run it through
# the plugins.
messaging.log_message(
"Received Message",
packet["raw"],
msg,
fromcall=fromcall,
msg_num=msg_id,
)
# Only ack messages that were sent directly to us
if (
tocall
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
):
stats.APRSDStats().msgs_rx_inc()
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(
self.config["aprsd"]["callsign"],
fromcall,
msg_id=msg_id,
)
ack.send()
if isinstance(packet, packets.MessagePacket):
if to_call and to_call.lower() == our_call:
# It's a MessagePacket and it's for us!
stats.APRSDStats().msgs_rx_inc()
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(
self.config["aprsd"]["callsign"],
from_call,
msg_id=msg_id,
)
ack.send()
self.process_non_ack_packet(packet)
self.process_our_message_packet(packet)
else:
# Packet wasn't meant for us!
self.process_other_packet(packet, for_us=False)
else:
LOG.info("Packet was not for us.")
self.process_other_packet(
packet, for_us=(to_call.lower() == our_call),
)
LOG.debug("Packet processing complete")
@abc.abstractmethod
def process_non_ack_packet(self, *args, **kwargs):
"""Ack packets already dealt with here."""
def process_our_message_packet(self, *args, **kwargs):
"""Process a MessagePacket destined for us!"""
def process_other_packet(self, packet, for_us=False):
"""Process an APRS Packet that isn't a message or ack"""
if not for_us:
LOG.info("Got a packet not meant for us.")
else:
LOG.info("Got a non AckPacket/MessagePacket")
LOG.info(packet)
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
@ -163,18 +171,19 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
This is the main aprsd server plugin processing thread."""
def process_non_ack_packet(self, packet):
def process_our_message_packet(self, packet):
"""Send the packet through the plugins."""
fromcall = packet["from"]
tocall = packet.get("addresse", None)
msg = packet.get("message_text", None)
packet.get("msgNo", "0")
packet.get("response", None)
from_call = packet.from_call
if packet.addresse:
to_call = packet.addresse
else:
to_call = None
# msg = packet.get("message_text", None)
# packet.get("msgNo", "0")
# packet.get("response", None)
pm = plugin.PluginManager()
try:
results = pm.run(packet)
wl = packets.WatchList()
wl.update_seen(packet)
replied = False
for reply in results:
if isinstance(reply, list):
@ -187,7 +196,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
else:
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
from_call,
subreply,
)
msg.send()
@ -207,18 +216,18 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
from_call,
reply,
)
msg.send()
# If the message was for us and we didn't have a
# response, then we send a usage statement.
if tocall == self.config["aprsd"]["callsign"] and not replied:
if to_call == self.config["aprsd"]["callsign"] and not replied:
LOG.warning("Sending help!")
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
from_call,
"Unknown command! Send 'help' message for help",
)
msg.send()
@ -226,11 +235,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
LOG.error("Plugin failed!!!")
LOG.exception(ex)
# Do we need to send a reply?
if tocall == self.config["aprsd"]["callsign"]:
if to_call == self.config["aprsd"]["callsign"]:
reply = "A Plugin failed! try again?"
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
from_call,
reply,
)
msg.send()

View File

@ -4,7 +4,7 @@
#
# pip-compile --annotation-style=line --resolver=backtracking dev-requirements.in
#
add-trailing-comma==2.3.0 # via gray
add-trailing-comma==2.4.0 # via gray
alabaster==0.7.12 # via sphinx
attrs==22.1.0 # via jsonschema, pytest
autoflake==1.5.3 # via gray
@ -34,7 +34,7 @@ imagesize==1.4.1 # via sphinx
importlib-metadata==5.1.0 # via sphinx
importlib-resources==5.10.1 # via fixit
iniconfig==1.1.1 # via pytest
isort==5.10.1 # via -r dev-requirements.in, gray
isort==5.11.2 # via -r dev-requirements.in, gray
jinja2==3.1.2 # via sphinx
jsonschema==4.17.3 # via fixit
libcst==0.4.9 # via fixit
@ -47,7 +47,7 @@ packaging==22.0 # via build, pyproject-api, pytest, sphinx, tox
pathspec==0.10.3 # via black
pep517==0.13.0 # via build
pep8-naming==0.13.2 # via -r dev-requirements.in
pip-tools==6.11.0 # via -r dev-requirements.in
pip-tools==6.12.0 # via -r dev-requirements.in
platformdirs==2.6.0 # via black, tox, virtualenv
pluggy==1.0.0 # via pytest, tox
pre-commit==2.20.0 # via -r dev-requirements.in
@ -74,7 +74,7 @@ sphinxcontrib-serializinghtml==1.1.5 # via sphinx
tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade
toml==0.10.2 # via autoflake, pre-commit
tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox
tox==4.0.8 # via -r dev-requirements.in
tox==4.0.9 # via -r dev-requirements.in
typing-extensions==4.4.0 # via black, libcst, mypy, typing-inspect
typing-inspect==0.8.0 # via libcst
unify==0.5 # via gray

View File

@ -27,3 +27,5 @@ attrs==22.1.0
# for mobile checking
user-agents
pyopenssl
dataclasses
dacite2

View File

@ -17,6 +17,8 @@ click==8.1.3 # via -r requirements.in, click-completion, flask
click-completion==0.5.2 # via -r requirements.in
commonmark==0.9.1 # via rich
cryptography==38.0.4 # via pyopenssl
dacite2==2.0.0 # via -r requirements.in
dataclasses==0.6 # via -r requirements.in
dnspython==2.2.1 # via eventlet
eventlet==0.33.2 # via -r requirements.in
flask==2.1.2 # via -r requirements.in, flask-classful, flask-httpauth, flask-socketio