Merge pull request #106 from craigerl/dataclasses

Dataclasses
This commit is contained in:
Walter A. Boring IV 2022-12-20 17:26:46 -05:00 committed by GitHub
commit eca5972ebd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 1831 additions and 1680 deletions

View File

@ -34,7 +34,7 @@ import click_completion
import aprsd
from aprsd import cli_helper
from aprsd import config as aprsd_config
from aprsd import messaging, packets, stats, threads, utils
from aprsd import packets, stats, threads, utils
# setup the global logger
@ -85,7 +85,7 @@ def signal_handler(sig, frame):
),
)
time.sleep(1.5)
messaging.MsgTrack().save()
packets.PacketTrack().save()
packets.WatchList().save()
packets.SeenList().save()
LOG.info(stats.APRSDStats())

View File

@ -8,6 +8,7 @@ from aprslib.exceptions import LoginError
from aprsd import config as aprsd_config
from aprsd import exception
from aprsd.clients import aprsis, kiss
from aprsd.packets import core
from aprsd.utils import trace
@ -31,6 +32,7 @@ class Client:
connected = False
server_string = None
filter = None
def __new__(cls, *args, **kwargs):
"""This magic turns this into a singleton."""
@ -44,10 +46,17 @@ class Client:
if config:
self.config = config
def set_filter(self, filter):
self.filter = filter
if self._client:
self._client.set_filter(filter)
@property
def client(self):
if not self._client:
self._client = self.setup_connection()
if self.filter:
self._client.set_filter(self.filter)
return self._client
def reset(self):
@ -101,7 +110,7 @@ class APRSISClient(Client):
def decode_packet(self, *args, **kwargs):
"""APRS lib already decodes this."""
return args[0]
return core.Packet.factory(args[0])
@trace.trace
def setup_connection(self):
@ -190,8 +199,8 @@ class KISSClient(Client):
# msg = frame.tnc2
LOG.debug(f"Decoding {msg}")
packet = aprslib.parse(msg)
return packet
raw = aprslib.parse(msg)
return core.Packet.factory(raw)
@trace.trace
def setup_connection(self):

View File

@ -1,6 +1,5 @@
import logging
import select
import socket
import threading
import aprslib
@ -32,23 +31,6 @@ class Aprsdis(aprslib.IS):
self.thread_stop = True
LOG.info("Shutdown Aprsdis client.")
def is_socket_closed(self, sock: socket.socket) -> bool:
try:
# this will try to read bytes without blocking and also without removing them from buffer (peek only)
data = sock.recv(16, socket.MSG_DONTWAIT | socket.MSG_PEEK)
if len(data) == 0:
return True
except BlockingIOError:
return False # socket is open and reading from it would block
except ConnectionResetError:
return True # socket was closed for some other reason
except Exception:
self.logger.exception(
"unexpected exception when checking if a socket is closed",
)
return False
return False
@wrapt.synchronized(lock)
def send(self, msg):
"""Send an APRS Message object."""

View File

@ -4,7 +4,7 @@ import aprslib
from ax253 import Frame
import kiss
from aprsd import messaging
from aprsd.packets import core
from aprsd.utils import trace
@ -83,7 +83,7 @@ class KISS3Client:
self.kiss.read(callback=self.parse_frame, min_frames=None)
LOG.debug("END blocking KISS consumer")
def send(self, msg):
def send(self, packet):
"""Send an APRS Message object."""
# payload = (':%-9s:%s' % (
@ -93,26 +93,26 @@ class KISS3Client:
# payload = str(msg).encode('US-ASCII')
payload = None
path = ["WIDE1-1", "WIDE2-1"]
if isinstance(msg, messaging.AckMessage):
msg_payload = f"ack{msg.id}"
elif isinstance(msg, messaging.RawMessage):
payload = msg.message.encode("US-ASCII")
if isinstance(packet, core.AckPacket):
msg_payload = f"ack{packet.msgNo}"
elif isinstance(packet, core.Packet):
payload = packet.raw.encode("US-ASCII")
path = ["WIDE2-1"]
else:
msg_payload = f"{msg.message}{{{str(msg.id)}"
msg_payload = f"{packet.raw}{{{str(packet.msgNo)}"
if not payload:
payload = (
":{:<9}:{}".format(
msg.tocall,
packet.to_call,
msg_payload,
)
).encode("US-ASCII")
LOG.debug(f"Send '{payload}' TO KISS")
frame = Frame.ui(
destination=msg.tocall,
source=msg.fromcall,
destination=packet.to_call,
source=packet.from_call,
path=path,
info=payload,
)

View File

@ -8,7 +8,7 @@ import logging
import click
# local imports here
from aprsd import cli_helper, client, messaging, packets, plugin, stats, utils
from aprsd import cli_helper, client, packets, plugin, stats, utils
from aprsd.aprsd import cli
from aprsd.utils import trace
@ -102,7 +102,7 @@ def test_plugin(
client.Client(config)
stats.APRSDStats(config)
messaging.MsgTrack(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)

View File

@ -5,18 +5,18 @@
# python included libs
import datetime
import logging
import signal
import sys
import time
import aprslib
import click
from rich.console import Console
# local imports here
import aprsd
from aprsd import cli_helper, client, messaging, packets, stats, threads, utils
from aprsd import cli_helper, client, packets, stats, threads, utils
from aprsd.aprsd import cli
from aprsd.utils import trace
from aprsd.threads import rx
# setup the global logger
@ -37,6 +37,32 @@ def signal_handler(sig, frame):
LOG.info(stats.APRSDStats())
class APRSDListenThread(rx.APRSDRXThread):
def __init__(self, config, packet_queue, packet_filter=None):
super().__init__(config, packet_queue)
self.packet_filter = packet_filter
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
filters = {
packets.Packet.__name__: packets.Packet,
packets.AckPacket.__name__: packets.AckPacket,
packets.GPSPacket.__name__: packets.GPSPacket,
packets.MessagePacket.__name__: packets.MessagePacket,
packets.MicEPacket.__name__: packets.MicEPacket,
packets.WeatherPacket.__name__: packets.WeatherPacket,
}
if self.packet_filter:
filter_class = filters[self.packet_filter]
if isinstance(packet, filter_class):
packet.log(header="RX")
else:
packet.log(header="RX")
packets.PacketList().rx(packet)
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.option(
@ -51,6 +77,21 @@ def signal_handler(sig, frame):
show_envvar=True,
help="the APRS-IS password for APRS_LOGIN",
)
@click.option(
"--packet-filter",
type=click.Choice(
[
packets.Packet.__name__,
packets.AckPacket.__name__,
packets.GPSPacket.__name__,
packets.MicEPacket.__name__,
packets.MessagePacket.__name__,
packets.WeatherPacket.__name__,
],
case_sensitive=False,
),
help="Filter by packet type",
)
@click.argument(
"filter",
nargs=-1,
@ -62,6 +103,7 @@ def listen(
ctx,
aprs_login,
aprs_password,
packet_filter,
filter,
):
"""Listen to packets on the APRS-IS Network based on FILTER.
@ -74,6 +116,8 @@ def listen(
o/obj1/obj2... - Object Filter Pass all objects with the exact name of obj1, obj2, ... (* wild card allowed)\n
"""
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
config = ctx.obj["config"]
if not aprs_login:
@ -105,29 +149,6 @@ def listen(
# Try and load saved MsgTrack list
LOG.debug("Loading saved MsgTrack object.")
messaging.MsgTrack(config=config).load()
packets.WatchList(config=config).load()
packets.SeenList(config=config).load()
@trace.trace
def rx_packet(packet):
resp = packet.get("response", None)
if resp == "ack":
ack_num = packet.get("msgNo")
console.log(f"We saw an ACK {ack_num} Ignoring")
messaging.log_packet(packet)
else:
message = packet.get("message_text", None)
fromcall = packet["from"]
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message",
packet["raw"],
message,
fromcall=fromcall,
ack=msg_number,
console=console,
)
# Initialize the client factory and create
# The correct client object ready for use
@ -140,29 +161,23 @@ def listen(
# Creates the client object
LOG.info("Creating client connection")
aprs_client = client.factory.create()
console.log(aprs_client)
LOG.info(aprs_client)
LOG.debug(f"Filter by '{filter}'")
aprs_client.client.set_filter(filter)
packets.PacketList(config=config)
aprs_client.set_filter(filter)
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()
while True:
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
# with console.status("Listening for packets"):
aprs_client.client.consumer(rx_packet, raw=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
aprs_client.reset()
except aprslib.exceptions.UnknownFormat:
LOG.error("Got a Bad packet")
LOG.debug("Create APRSDListenThread")
listen_thread = APRSDListenThread(
config=config,
packet_queue=threads.packet_queue,
packet_filter=packet_filter,
)
LOG.debug("Start APRSDListenThread")
listen_thread.start()
LOG.debug("keepalive Join")
keepalive.join()
LOG.debug("listen_thread Join")
listen_thread.join()

View File

@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError
import click
import aprsd
from aprsd import cli_helper, client, messaging, packets
from aprsd import cli_helper, client, packets
from aprsd.aprsd import cli
@ -98,32 +98,23 @@ def send_message(
def rx_packet(packet):
global got_ack, got_response
cl = client.factory.create()
packet = cl.decode_packet(packet)
packets.PacketList().rx(packet)
packet.log("RX")
# LOG.debug("Got packet back {}".format(packet))
resp = packet.get("response", None)
if resp == "ack":
ack_num = packet.get("msgNo")
LOG.info(f"We got ack for our sent message {ack_num}")
messaging.log_packet(packet)
if isinstance(packet, packets.AckPacket):
got_ack = True
else:
message = packet.get("message_text", None)
fromcall = packet["from"]
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message",
packet["raw"],
message,
fromcall=fromcall,
ack=msg_number,
)
got_response = True
# Send the ack back?
ack = messaging.AckMessage(
config["aprs"]["login"],
fromcall,
msg_id=msg_number,
from_call = packet.from_call
our_call = config["aprsd"]["callsign"].lower()
ack_pkt = packets.AckPacket(
from_call=our_call,
to_call=from_call,
msgNo=packet.msgNo,
)
ack.send_direct()
ack_pkt.send_direct()
if got_ack:
if wait_response:
@ -144,12 +135,16 @@ def send_message(
# we should bail after we get the ack and send an ack back for the
# message
if raw:
msg = messaging.RawMessage(raw)
msg.send_direct()
pkt = packets.Packet(from_call="", to_call="", raw=raw)
pkt.send_direct()
sys.exit(0)
else:
msg = messaging.TextMessage(aprs_login, tocallsign, command)
msg.send_direct()
pkt = packets.MessagePacket(
from_call=aprs_login,
to_call=tocallsign,
message_text=command,
)
pkt.send_direct()
if no_ack:
sys.exit(0)

View File

@ -6,8 +6,7 @@ import click
import aprsd
from aprsd import (
cli_helper, client, flask, messaging, packets, plugin, stats, threads,
utils,
cli_helper, client, flask, packets, plugin, stats, threads, utils,
)
from aprsd import aprsd as aprsd_main
from aprsd.aprsd import cli
@ -81,13 +80,13 @@ def server(ctx, flush):
packets.PacketList(config=config)
if flush:
LOG.debug("Deleting saved MsgTrack.")
messaging.MsgTrack(config=config).flush()
packets.PacketTrack(config=config).flush()
packets.WatchList(config=config)
packets.SeenList(config=config)
else:
# Try and load saved MsgTrack list
LOG.debug("Loading saved MsgTrack object.")
messaging.MsgTrack(config=config).load()
packets.PacketTrack(config=config).load()
packets.WatchList(config=config).load()
packets.SeenList(config=config).load()
@ -97,12 +96,17 @@ def server(ctx, flush):
plugin_manager.setup_plugins()
rx_thread = rx.APRSDPluginRXThread(
msg_queues=threads.msg_queues,
packet_queue=threads.packet_queue,
config=config,
)
process_thread = rx.APRSDPluginProcessPacketThread(
config=config,
packet_queue=threads.packet_queue,
)
rx_thread.start()
process_thread.start()
messaging.MsgTrack().restart()
packets.PacketTrack().restart()
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()

View File

@ -22,7 +22,7 @@ import wrapt
import aprsd
from aprsd import cli_helper, client
from aprsd import config as aprsd_config
from aprsd import messaging, packets, stats, threads, utils
from aprsd import packets, stats, threads, utils
from aprsd.aprsd import cli
from aprsd.logging import rich as aprsd_logging
from aprsd.threads import rx
@ -44,13 +44,11 @@ def signal_handler(sig, frame):
threads.APRSDThreadList().stop_all()
if "subprocess" not in str(frame):
time.sleep(1.5)
# messaging.MsgTrack().save()
# packets.WatchList().save()
# packets.SeenList().save()
LOG.info(stats.APRSDStats())
LOG.info("Telling flask to bail.")
signal.signal(signal.SIGTERM, sys.exit(0))
sys.exit(0)
class SentMessages(objectstore.ObjectStoreMixin):
@ -65,13 +63,16 @@ class SentMessages(objectstore.ObjectStoreMixin):
cls._instance = super().__new__(cls)
return cls._instance
def is_initialized(self):
return True
@wrapt.synchronized(lock)
def add(self, msg):
self.data[msg.id] = self.create(msg.id)
self.data[msg.id]["from"] = msg.fromcall
self.data[msg.id]["to"] = msg.tocall
self.data[msg.id]["message"] = msg.message.rstrip("\n")
self.data[msg.id]["raw"] = str(msg).rstrip("\n")
self.data[msg.msgNo] = self.create(msg.msgNo)
self.data[msg.msgNo]["from"] = msg.from_call
self.data[msg.msgNo]["to"] = msg.to_call
self.data[msg.msgNo]["message"] = msg.message_text.rstrip("\n")
self.data[msg.msgNo]["raw"] = msg.message_text.rstrip("\n")
def create(self, id):
return {
@ -131,44 +132,14 @@ def verify_password(username, password):
return username
class WebChatRXThread(rx.APRSDRXThread):
"""Class that connects to APRISIS/kiss and waits for messages.
After the packet is received from APRSIS/KISS, the packet is
sent to processing in the WebChatProcessPacketThread.
"""
def __init__(self, config, socketio):
super().__init__(None, config)
self.socketio = socketio
self.connected = False
def connected(self, connected=True):
self.connected = connected
def process_packet(self, *args, **kwargs):
# packet = self._client.decode_packet(*args, **kwargs)
if "packet" in kwargs:
packet = kwargs["packet"]
else:
packet = self._client.decode_packet(*args, **kwargs)
LOG.debug(f"GOT Packet {packet}")
thread = WebChatProcessPacketThread(
config=self.config,
packet=packet,
socketio=self.socketio,
)
thread.start()
class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
"""Class that handles packets being sent to us."""
def __init__(self, config, packet, socketio):
def __init__(self, config, packet_queue, socketio):
self.socketio = socketio
self.connected = False
super().__init__(config, packet)
super().__init__(config, packet_queue)
def process_ack_packet(self, packet):
def process_ack_packet(self, packet: packets.AckPacket):
super().process_ack_packet(packet)
ack_num = packet.get("msgNo")
SentMessages().ack(int(ack_num))
@ -178,21 +149,19 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
)
self.got_ack = True
def process_non_ack_packet(self, packet):
def process_our_message_packet(self, packet: packets.MessagePacket):
LOG.info(f"process non ack PACKET {packet}")
packet.get("addresse", None)
fromcall = packet["from"]
fromcall = packet.from_call
packets.PacketList().add(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
msg = {
"id": 0,
"ts": time.time(),
"ts": packet.get("timestamp", time.time()),
"ack": False,
"from": fromcall,
"to": packet["to"],
"raw": packet["raw"],
"to": packet.to_call,
"raw": packet.raw,
"message": message,
"status": None,
"last_update": None,
@ -344,21 +313,21 @@ class SendMessageNamespace(Namespace):
LOG.debug(f"WS: on_send {data}")
self.request = data
data["from"] = self._config["aprs"]["login"]
msg = messaging.TextMessage(
data["from"],
data["to"].upper(),
data["message"],
pkt = packets.MessagePacket(
from_call=data["from"],
to_call=data["to"].upper(),
message_text=data["message"],
)
self.msg = msg
self.msg = pkt
msgs = SentMessages()
msgs.add(msg)
msgs.set_status(msg.id, "Sending")
obj = msgs.get(self.msg.id)
msgs.add(pkt)
pkt.send()
msgs.set_status(pkt.msgNo, "Sending")
obj = msgs.get(pkt.msgNo)
socketio.emit(
"sent", obj,
namespace="/sendmsg",
)
msg.send()
def on_gps(self, data):
LOG.debug(f"WS on_GPS: {data}")
@ -367,21 +336,14 @@ class SendMessageNamespace(Namespace):
LOG.debug(f"Lat DDM {lat}")
LOG.debug(f"Long DDM {long}")
local_datetime = datetime.datetime.now()
utc_offset_timedelta = datetime.datetime.utcnow() - local_datetime
result_utc_datetime = local_datetime + utc_offset_timedelta
time_zulu = result_utc_datetime.strftime("%d%H%M")
# now construct a beacon to send over the client connection
txt = (
f"{self._config['aprs']['login']}>APZ100,WIDE2-1"
f":@{time_zulu}z{lat}/{long}l APRSD WebChat Beacon"
beacon = packets.GPSPacket(
from_call=self._config["aprs"]["login"],
to_call="APDW16",
latitude=lat,
longitude=long,
comment="APRSD WebChat Beacon",
)
beacon_msg = messaging.RawMessage(txt)
beacon_msg.fromcall = self._config["aprs"]["login"]
beacon_msg.tocall = "APDW16"
beacon_msg.send_direct()
beacon.send_direct()
def handle_message(self, data):
LOG.debug(f"WS Data {data}")
@ -534,17 +496,22 @@ def webchat(ctx, flush, port):
sys.exit(-1)
packets.PacketList(config=config)
messaging.MsgTrack(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
(socketio, app) = init_flask(config, loglevel, quiet)
rx_thread = WebChatRXThread(
rx_thread = rx.APRSDPluginRXThread(
config=config,
packet_queue=threads.packet_queue,
)
rx_thread.start()
process_thread = WebChatProcessPacketThread(
config=config,
packet_queue=threads.packet_queue,
socketio=socketio,
)
LOG.info("Start RX Thread")
rx_thread.start()
process_thread.start()
keepalive = threads.KeepAliveThread(config=config)
LOG.info("Start KeepAliveThread")

View File

@ -14,11 +14,12 @@ import flask_classful
from flask_httpauth import HTTPBasicAuth
from flask_socketio import Namespace, SocketIO
from werkzeug.security import check_password_hash, generate_password_hash
import wrapt
import aprsd
from aprsd import client
from aprsd import config as aprsd_config
from aprsd import messaging, packets, plugin, stats, threads, utils
from aprsd import packets, plugin, stats, threads, utils
from aprsd.clients import aprsis
from aprsd.logging import log
from aprsd.logging import rich as aprsd_logging
@ -32,7 +33,7 @@ users = None
class SentMessages:
_instance = None
lock = None
lock = threading.Lock()
msgs = {}
@ -41,16 +42,16 @@ class SentMessages:
if cls._instance is None:
cls._instance = super().__new__(cls)
# Put any initialization here.
cls.lock = threading.Lock()
return cls._instance
def add(self, msg):
with self.lock:
self.msgs[msg.id] = self._create(msg.id)
self.msgs[msg.id]["from"] = msg.fromcall
self.msgs[msg.id]["to"] = msg.tocall
self.msgs[msg.id]["message"] = msg.message.rstrip("\n")
self.msgs[msg.id]["raw"] = str(msg).rstrip("\n")
@wrapt.synchronized(lock)
def add(self, packet):
self.msgs[packet.msgNo] = self._create(packet.msgNo)
self.msgs[packet.msgNo]["from"] = packet.from_call
self.msgs[packet.msgNo]["to"] = packet.to_call
self.msgs[packet.msgNo]["message"] = packet.message_text.rstrip("\n")
packet._build_raw()
self.msgs[packet.msgNo]["raw"] = packet.raw.rstrip("\n")
def _create(self, id):
return {
@ -66,34 +67,34 @@ class SentMessages:
"reply": None,
}
@wrapt.synchronized(lock)
def __len__(self):
with self.lock:
return len(self.msgs.keys())
return len(self.msgs.keys())
@wrapt.synchronized(lock)
def get(self, id):
with self.lock:
if id in self.msgs:
return self.msgs[id]
if id in self.msgs:
return self.msgs[id]
@wrapt.synchronized(lock)
def get_all(self):
with self.lock:
return self.msgs
return self.msgs
@wrapt.synchronized(lock)
def set_status(self, id, status):
with self.lock:
self.msgs[id]["last_update"] = str(datetime.datetime.now())
self.msgs[id]["status"] = status
self.msgs[id]["last_update"] = str(datetime.datetime.now())
self.msgs[id]["status"] = status
@wrapt.synchronized(lock)
def ack(self, id):
"""The message got an ack!"""
with self.lock:
self.msgs[id]["last_update"] = str(datetime.datetime.now())
self.msgs[id]["ack"] = True
self.msgs[id]["last_update"] = str(datetime.datetime.now())
self.msgs[id]["ack"] = True
@wrapt.synchronized(lock)
def reply(self, id, packet):
"""We got a packet back from the sent message."""
with self.lock:
self.msgs[id]["reply"] = packet
self.msgs[id]["reply"] = packet
# HTTPBasicAuth doesn't work on a class method.
@ -107,7 +108,7 @@ def verify_password(username, password):
return username
class SendMessageThread(threads.APRSDThread):
class SendMessageThread(threads.APRSDRXThread):
"""Thread for sending a message from web."""
aprsis_client = None
@ -115,10 +116,10 @@ class SendMessageThread(threads.APRSDThread):
got_ack = False
got_reply = False
def __init__(self, config, info, msg, namespace):
def __init__(self, config, info, packet, namespace):
self.config = config
self.request = info
self.msg = msg
self.packet = packet
self.namespace = namespace
self.start_time = datetime.datetime.now()
msg = "({} -> {}) : {}".format(
@ -180,8 +181,8 @@ class SendMessageThread(threads.APRSDThread):
except LoginError as e:
f"Failed to setup Connection {e}"
self.msg.send_direct(aprsis_client=self.aprs_client)
SentMessages().set_status(self.msg.id, "Sent")
self.packet.send_direct(aprsis_client=self.aprs_client)
SentMessages().set_status(self.packet.msgNo, "Sent")
while not self.thread_stop:
can_loop = self.loop()
@ -190,59 +191,55 @@ class SendMessageThread(threads.APRSDThread):
threads.APRSDThreadList().remove(self)
LOG.debug("Exiting")
def rx_packet(self, packet):
def process_ack_packet(self, packet):
global socketio
# LOG.debug("Got packet back {}".format(packet))
resp = packet.get("response", None)
if resp == "ack":
ack_num = packet.get("msgNo")
LOG.info(f"We got ack for our sent message {ack_num}")
messaging.log_packet(packet)
SentMessages().ack(self.msg.id)
socketio.emit(
"ack", SentMessages().get(self.msg.id),
namespace="/sendmsg",
)
stats.APRSDStats().ack_rx_inc()
self.got_ack = True
if self.request["wait_reply"] == "0" or self.got_reply:
# We aren't waiting for a reply, so we can bail
self.stop()
self.thread_stop = self.aprs_client.thread_stop = True
ack_num = packet.msgNo
LOG.info(f"We got ack for our sent message {ack_num}")
packet.log("RXACK")
SentMessages().ack(self.packet.msgNo)
stats.APRSDStats().ack_rx_inc()
socketio.emit(
"ack", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
if self.request["wait_reply"] == "0" or self.got_reply:
# We aren't waiting for a reply, so we can bail
self.stop()
self.thread_stop = self.aprs_client.thread_stop = True
def process_our_message_packet(self, packet):
global socketio
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
msg_number = packet.msgNo
SentMessages().reply(self.packet.msgNo, packet)
SentMessages().set_status(self.packet.msgNo, "Got Reply")
socketio.emit(
"reply", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
ack_pkt = packets.AckPacket(
from_call=self.request["from"],
to_call=packet.from_call,
msgNo=msg_number,
)
ack_pkt.send_direct(aprsis_client=self.aprsis_client)
SentMessages().set_status(self.packet.msgNo, "Ack Sent")
# Now we can exit, since we are done.
self.got_reply = True
if self.got_ack:
self.stop()
self.thread_stop = self.aprs_client.thread_stop = True
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
packet.log(header="RX Packet")
if isinstance(packet, packets.AckPacket):
self.process_ack_packet(packet)
else:
packets.PacketList().add(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
fromcall = packet["from"]
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message",
packet["raw"],
message,
fromcall=fromcall,
ack=msg_number,
)
SentMessages().reply(self.msg.id, packet)
SentMessages().set_status(self.msg.id, "Got Reply")
socketio.emit(
"reply", SentMessages().get(self.msg.id),
namespace="/sendmsg",
)
# Send the ack back?
ack = messaging.AckMessage(
self.request["from"],
fromcall,
msg_id=msg_number,
)
ack.send_direct()
SentMessages().set_status(self.msg.id, "Ack Sent")
# Now we can exit, since we are done.
self.got_reply = True
if self.got_ack:
self.stop()
self.thread_stop = self.aprs_client.thread_stop = True
self.process_our_message_packet(packet)
def loop(self):
# we have a general time limit expecting results of
@ -265,7 +262,9 @@ class SendMessageThread(threads.APRSDThread):
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
self.aprs_client.consumer(self.rx_packet, raw=False, blocking=False)
self.aprs_client.consumer(
self.process_packet, raw=False, blocking=False,
)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped.")
return False
@ -353,7 +352,7 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required
def messages(self):
track = messaging.MsgTrack()
track = packets.PacketTrack()
msgs = []
for id in track:
LOG.info(track[id].dict())
@ -381,7 +380,12 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required
def packets(self):
packet_list = packets.PacketList().get()
return json.dumps(packet_list)
tmp_list = []
for pkt in packet_list:
tmp_list.append(pkt.json)
LOG.info(f"PACKETS {tmp_list}")
return json.dumps(tmp_list)
@auth.login_required
def plugins(self):
@ -393,13 +397,13 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required
def save(self):
"""Save the existing queue to disk."""
track = messaging.MsgTrack()
track = packets.PacketTrack()
track.save()
return json.dumps({"messages": "saved"})
def _stats(self):
stats_obj = stats.APRSDStats()
track = messaging.MsgTrack()
track = packets.PacketTrack()
now = datetime.datetime.now()
time_format = "%m-%d-%Y %H:%M:%S"
@ -421,8 +425,8 @@ class APRSDFlask(flask_classful.FlaskView):
stats_dict["aprsd"]["watch_list"] = new_list
packet_list = packets.PacketList()
rx = packet_list.total_received()
tx = packet_list.total_sent()
rx = packet_list.total_rx()
tx = packet_list.total_tx()
stats_dict["packets"] = {
"sent": tx,
"received": rx,
@ -444,7 +448,7 @@ class SendMessageNamespace(Namespace):
_config = None
got_ack = False
reply_sent = False
msg = None
packet = None
request = None
def __init__(self, namespace=None, config=None):
@ -466,24 +470,27 @@ class SendMessageNamespace(Namespace):
global socketio
LOG.debug(f"WS: on_send {data}")
self.request = data
msg = messaging.TextMessage(
data["from"], data["to"],
data["message"],
self.packet = packets.MessagePacket(
from_call=data["from"],
to_call=data["to"],
message_text=data["message"],
)
self.msg = msg
msgs = SentMessages()
msgs.add(msg)
msgs.set_status(msg.id, "Sending")
msgs.add(self.packet)
msgs.set_status(self.packet.msgNo, "Sending")
socketio.emit(
"sent", SentMessages().get(self.msg.id),
"sent", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
socketio.start_background_task(self._start, self._config, data, msg, self)
socketio.start_background_task(
self._start, self._config, data,
self.packet, self,
)
LOG.warning("WS: on_send: exit")
def _start(self, config, data, msg, namespace):
msg_thread = SendMessageThread(self._config, data, msg, self)
def _start(self, config, data, packet, namespace):
msg_thread = SendMessageThread(self._config, data, packet, self)
msg_thread.start()
def handle_message(self, data):

View File

@ -1,584 +1,4 @@
import abc
import datetime
import logging
from multiprocessing import RawValue
import re
import threading
import time
from aprsd import client, packets, stats, threads
from aprsd.utils import objectstore
LOG = logging.getLogger("APRSD")
# What to return from a plugin if we have processed the message
# and it's ok, but don't send a usage string back
NULL_MESSAGE = -1
class MsgTrack(objectstore.ObjectStoreMixin):
"""Class to keep track of outstanding text messages.
This is a thread safe class that keeps track of active
messages.
When a message is asked to be sent, it is placed into this
class via it's id. The TextMessage class's send() method
automatically adds itself to this class. When the ack is
recieved from the radio, the message object is removed from
this class.
"""
_instance = None
_start_time = None
lock = None
data = {}
total_messages_tracked = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.track = {}
cls._instance._start_time = datetime.datetime.now()
cls._instance.lock = threading.Lock()
cls._instance.config = kwargs["config"]
cls._instance._init_store()
return cls._instance
def __getitem__(self, name):
with self.lock:
return self.data[name]
def __iter__(self):
with self.lock:
return iter(self.data)
def keys(self):
with self.lock:
return self.data.keys()
def items(self):
with self.lock:
return self.data.items()
def values(self):
with self.lock:
return self.data.values()
def __len__(self):
with self.lock:
return len(self.data)
def __str__(self):
with self.lock:
result = "{"
for key in self.data.keys():
result += f"{key}: {str(self.data[key])}, "
result += "}"
return result
def add(self, msg):
with self.lock:
key = int(msg.id)
self.data[key] = msg
stats.APRSDStats().msgs_tracked_inc()
self.total_messages_tracked += 1
def get(self, id):
with self.lock:
if id in self.data:
return self.data[id]
def remove(self, id):
with self.lock:
key = int(id)
if key in self.data.keys():
del self.data[key]
def restart(self):
"""Walk the list of messages and restart them if any."""
for key in self.data.keys():
msg = self.data[key]
if msg.last_send_attempt < msg.retry_count:
msg.send()
def _resend(self, msg):
msg.last_send_attempt = 0
msg.send()
def restart_delayed(self, count=None, most_recent=True):
"""Walk the list of delayed messages and restart them if any."""
if not count:
# Send all the delayed messages
for key in self.data.keys():
msg = self.data[key]
if msg.last_send_attempt == msg.retry_count:
self._resend(msg)
else:
# They want to resend <count> delayed messages
tmp = sorted(
self.data.items(),
reverse=most_recent,
key=lambda x: x[1].last_send_time,
)
msg_list = tmp[:count]
for (_key, msg) in msg_list:
self._resend(msg)
class MessageCounter:
"""
Global message id counter class.
This is a singleton based class that keeps
an incrementing counter for all messages to
be sent. All new Message objects gets a new
message id, which is the next number available
from the MessageCounter.
"""
_instance = None
max_count = 9999
lock = None
def __new__(cls, *args, **kwargs):
"""Make this a singleton class."""
if cls._instance is None:
cls._instance = super().__new__(cls, *args, **kwargs)
cls._instance.val = RawValue("i", 1)
cls._instance.lock = threading.Lock()
return cls._instance
def increment(self):
with self.lock:
if self.val.value == self.max_count:
self.val.value = 1
else:
self.val.value += 1
@property
def value(self):
with self.lock:
return self.val.value
def __repr__(self):
with self.lock:
return str(self.val.value)
def __str__(self):
with self.lock:
return str(self.val.value)
class Message(metaclass=abc.ABCMeta):
"""Base Message Class."""
# The message id to send over the air
id = 0
retry_count = 3
last_send_time = 0
last_send_attempt = 0
transport = None
_raw_message = None
def __init__(
self,
fromcall,
tocall,
msg_id=None,
allow_delay=True,
):
self.fromcall = fromcall
self.tocall = tocall
if not msg_id:
c = MessageCounter()
c.increment()
msg_id = c.value
self.id = msg_id
# do we try and save this message for later if we don't get
# an ack? Some messages we don't want to do this ever.
self.allow_delay = allow_delay
@abc.abstractmethod
def send(self):
"""Child class must declare."""
def _filter_for_send(self):
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = self._raw_message[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
@property
def message(self):
return self._filter_for_send().rstrip("\n")
def __str__(self):
return self.message
class RawMessage(Message):
"""Send a raw message.
This class is used for custom messages that contain the entire
contents of an APRS message in the message field.
"""
last_send_age = last_send_time = None
def __init__(self, message, allow_delay=True):
super().__init__(
fromcall=None, tocall=None, msg_id=None,
allow_delay=allow_delay,
)
self._raw_message = message
def dict(self):
now = datetime.datetime.now()
last_send_age = None
if self.last_send_time:
last_send_age = str(now - self.last_send_time)
return {
"type": "raw",
"message": self.message,
"raw": str(self),
"retry_count": self.retry_count,
"last_send_attempt": self.last_send_attempt,
"last_send_time": str(self.last_send_time),
"last_send_age": last_send_age,
}
def send(self):
tracker = MsgTrack()
tracker.add(self)
thread = SendMessageThread(message=self)
thread.start()
def send_direct(self, aprsis_client=None):
"""Send a message without a separate thread."""
cl = client.factory.create().client
log_message(
"Sending Message Direct",
str(self),
self.message,
tocall=self.tocall,
fromcall=self.fromcall,
)
cl.send(self)
stats.APRSDStats().msgs_tx_inc()
class TextMessage(Message):
"""Send regular ARPS text/command messages/replies."""
last_send_time = last_send_age = None
def __init__(
self, fromcall, tocall, message,
msg_id=None, allow_delay=True,
):
super().__init__(
fromcall=fromcall, tocall=tocall,
msg_id=msg_id, allow_delay=allow_delay,
)
self._raw_message = message
def dict(self):
now = datetime.datetime.now()
last_send_age = None
if self.last_send_time:
last_send_age = str(now - self.last_send_time)
return {
"id": self.id,
"type": "text-message",
"fromcall": self.fromcall,
"tocall": self.tocall,
"message": self.message,
"raw": str(self),
"retry_count": self.retry_count,
"last_send_attempt": self.last_send_attempt,
"last_send_time": str(self.last_send_time),
"last_send_age": last_send_age,
}
def __str__(self):
"""Build raw string to send over the air."""
return "{}>APZ100::{}:{}{{{}\n".format(
self.fromcall,
self.tocall.ljust(9),
self.message,
str(self.id),
)
def send(self):
tracker = MsgTrack()
tracker.add(self)
LOG.debug(f"Length of MsgTrack is {len(tracker)}")
thread = SendMessageThread(message=self)
thread.start()
def send_direct(self, aprsis_client=None):
"""Send a message without a separate thread."""
if aprsis_client:
cl = aprsis_client
else:
cl = client.factory.create().client
log_message(
"Sending Message Direct",
str(self),
self.message,
tocall=self.tocall,
fromcall=self.fromcall,
)
cl.send(self)
stats.APRSDStats().msgs_tx_inc()
packets.PacketList().add(self.dict())
class SendMessageThread(threads.APRSDThread):
def __init__(self, message):
self.msg = message
name = self.msg._raw_message[:5]
super().__init__(f"TXPKT-{self.msg.id}-{name}")
def loop(self):
"""Loop until a message is acked or it gets delayed.
We only sleep for 5 seconds between each loop run, so
that CTRL-C can exit the app in a short period. Each sleep
means the app quitting is blocked until sleep is done.
So we keep track of the last send attempt and only send if the
last send attempt is old enough.
"""
tracker = MsgTrack()
# lets see if the message is still in the tracking queue
msg = tracker.get(self.msg.id)
if not msg:
# The message has been removed from the tracking queue
# So it got acked and we are done.
LOG.info("Message Send Complete via Ack.")
return False
else:
send_now = False
if msg.last_send_attempt == msg.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Message Send Complete. Max attempts reached.")
if not msg.allow_delay:
tracker.remove(msg.id)
return False
# Message is still outstanding and needs to be acked.
if msg.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
sleeptime = (msg.last_send_attempt + 1) * 31
delta = now - msg.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
send_now = True
else:
send_now = True
if send_now:
# no attempt time, so lets send it, and start
# tracking the time.
log_message(
"Sending Message",
str(msg),
msg.message,
tocall=self.msg.tocall,
retry_number=msg.last_send_attempt,
msg_num=msg.id,
)
cl = client.factory.create().client
cl.send(msg)
stats.APRSDStats().msgs_tx_inc()
packets.PacketList().add(msg.dict())
msg.last_send_time = datetime.datetime.now()
msg.last_send_attempt += 1
time.sleep(5)
# Make sure we get called again.
return True
class AckMessage(Message):
"""Class for building Acks and sending them."""
def __init__(self, fromcall, tocall, msg_id):
super().__init__(fromcall, tocall, msg_id=msg_id)
def dict(self):
now = datetime.datetime.now()
last_send_age = None
if self.last_send_time:
last_send_age = str(now - self.last_send_time)
return {
"id": self.id,
"type": "ack",
"fromcall": self.fromcall,
"tocall": self.tocall,
"raw": str(self).rstrip("\n"),
"retry_count": self.retry_count,
"last_send_attempt": self.last_send_attempt,
"last_send_time": str(self.last_send_time),
"last_send_age": last_send_age,
}
def __str__(self):
return "{}>APZ100::{}:ack{}\n".format(
self.fromcall,
self.tocall.ljust(9),
self.id,
)
def _filter_for_send(self):
return f"ack{self.id}"
def send(self):
LOG.debug(f"Send ACK({self.tocall}:{self.id}) to radio.")
thread = SendAckThread(self)
thread.start()
def send_direct(self, aprsis_client=None):
"""Send an ack message without a separate thread."""
if aprsis_client:
cl = aprsis_client
else:
cl = client.factory.create().client
log_message(
"Sending ack",
str(self).rstrip("\n"),
None,
ack=self.id,
tocall=self.tocall,
fromcall=self.fromcall,
)
cl.send(self)
class SendAckThread(threads.APRSDThread):
def __init__(self, ack):
self.ack = ack
super().__init__(f"SendAck-{self.ack.id}")
def loop(self):
"""Separate thread to send acks with retries."""
send_now = False
if self.ack.last_send_attempt == self.ack.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Ack Send Complete. Max attempts reached.")
return False
if self.ack.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
sleeptime = 31
delta = now - self.ack.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
send_now = True
else:
LOG.debug(f"Still wating. {delta}")
else:
send_now = True
if send_now:
cl = client.factory.create().client
log_message(
"Sending ack",
str(self.ack).rstrip("\n"),
None,
ack=self.ack.id,
tocall=self.ack.tocall,
retry_number=self.ack.last_send_attempt,
)
cl.send(self.ack)
stats.APRSDStats().ack_tx_inc()
packets.PacketList().add(self.ack.dict())
self.ack.last_send_attempt += 1
self.ack.last_send_time = datetime.datetime.now()
time.sleep(5)
return True
def log_packet(packet):
fromcall = packet.get("from", None)
tocall = packet.get("to", None)
response_type = packet.get("response", None)
msg = packet.get("message_text", None)
msg_num = packet.get("msgNo", None)
ack = packet.get("ack", None)
log_message(
"Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall,
ack=ack, packet_type=response_type, msg_num=msg_num, )
def log_message(
header, raw, message, tocall=None, fromcall=None, msg_num=None,
retry_number=None, ack=None, packet_type=None, uuid=None,
console=None,
):
"""
Log a message entry.
This builds a long string with newlines for the log entry, so that
it's thread safe. If we log each item as a separate log.debug() call
Then the message information could get multiplexed with other log
messages. Each python log call is automatically synchronized.
"""
log_list = [""]
if retry_number:
log_list.append(f"{header} _______________(TX:{retry_number})")
else:
log_list.append(f"{header} _______________")
log_list.append(f" Raw : {raw}")
if packet_type:
log_list.append(f" Packet : {packet_type}")
if tocall:
log_list.append(f" To : {tocall}")
if fromcall:
log_list.append(f" From : {fromcall}")
if ack:
log_list.append(f" Ack : {ack}")
else:
log_list.append(f" Message : {message}")
if msg_num:
log_list.append(f" Msg # : {msg_num}")
if uuid:
log_list.append(f" UUID : {uuid}")
log_list.append(f"{header} _______________ Complete")
if console:
console.log("\n".join(log_list))
else:
LOG.info("\n".join(log_list))
# REMOVE THIS FILE

View File

@ -1,221 +0,0 @@
import datetime
import logging
import threading
import time
import wrapt
from aprsd import utils
from aprsd.utils import objectstore
LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message"
PACKET_TYPE_ACK = "ack"
PACKET_TYPE_MICE = "mic-e"
class PacketList:
"""Class to track all of the packets rx'd and tx'd by aprsd."""
_instance = None
lock = threading.Lock()
config = None
packet_list = {}
total_recv = 0
total_tx = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.packet_list = utils.RingBuffer(1000)
cls._instance.config = kwargs["config"]
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.packet_list)
@wrapt.synchronized(lock)
def add(self, packet):
packet["ts"] = time.time()
if (
"fromcall" in packet
and packet["fromcall"] == self.config["aprs"]["login"]
):
self.total_tx += 1
else:
self.total_recv += 1
self.packet_list.append(packet)
SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def get(self):
return self.packet_list.get()
@wrapt.synchronized(lock)
def total_received(self):
return self.total_recv
@wrapt.synchronized(lock)
def total_sent(self):
return self.total_tx
class WatchList(objectstore.ObjectStoreMixin):
"""Global watch list and info for callsigns."""
_instance = None
lock = threading.Lock()
data = {}
config = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10)
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.data[call] = {
"last": datetime.datetime.now(),
"packets": utils.RingBuffer(
ring_size,
),
}
def is_enabled(self):
if self.config and "watch_list" in self.config["aprsd"]:
return self.config["aprsd"]["watch_list"].get("enabled", False)
else:
return False
def callsign_in_watchlist(self, callsign):
return callsign in self.data
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = packet["from"]
if self.callsign_in_watchlist(callsign):
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["packets"].append(packet)
def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign):
return self.data[callsign]["last"]
def age(self, callsign):
now = datetime.datetime.now()
return str(now - self.last_seen(callsign))
def max_delta(self, seconds=None):
watch_list_conf = self.config["aprsd"]["watch_list"]
if not seconds:
seconds = watch_list_conf["alert_time_seconds"]
max_timeout = {"seconds": seconds}
return datetime.timedelta(**max_timeout)
def is_old(self, callsign, seconds=None):
"""Watch list callsign last seen is old compared to now?
This tests to see if the last time we saw a callsign packet,
if that is older than the allowed timeout in the config.
We put this here so any notification plugin can use this
same test.
"""
age = self.age(callsign)
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
max_delta = self.max_delta(seconds=seconds)
if d > max_delta:
return True
else:
return False
class SeenList(objectstore.ObjectStoreMixin):
"""Global callsign seen list."""
_instance = None
lock = threading.Lock()
data = {}
config = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = None
if "fromcall" in packet:
callsign = packet["fromcall"]
elif "from" in packet:
callsign = packet["from"]
else:
LOG.warning(f"Can't find FROM in packet {packet}")
return
if callsign not in self.data:
self.data[callsign] = {
"last": None,
"count": 0,
}
self.data[callsign]["last"] = str(datetime.datetime.now())
self.data[callsign]["count"] += 1
def get_packet_type(packet):
"""Decode the packet type from the packet."""
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
packet_type = "unknown"
if msg_format == "message":
packet_type = PACKET_TYPE_MESSAGE
elif msg_response == "ack":
packet_type = PACKET_TYPE_ACK
elif msg_format == "mic-e":
packet_type = PACKET_TYPE_MICE
return packet_type
def is_message_packet(packet):
return get_packet_type(packet) == PACKET_TYPE_MESSAGE
def is_ack_packet(packet):
return get_packet_type(packet) == PACKET_TYPE_ACK
def is_mice_packet(packet):
return get_packet_type(packet) == PACKET_TYPE_MICE

11
aprsd/packets/__init__.py Normal file
View File

@ -0,0 +1,11 @@
from aprsd.packets.core import ( # noqa: F401
AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket,
StatusPacket, WeatherPacket,
)
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401
from aprsd.packets.tracker import PacketTrack # noqa: F401
from aprsd.packets.watch_list import WatchList # noqa: F401
NULL_MESSAGE = -1

425
aprsd/packets/core.py Normal file
View File

@ -0,0 +1,425 @@
import abc
from dataclasses import asdict, dataclass, field
import datetime
import json
import logging
import re
import time
# Due to a failure in python 3.8
from typing import List
import dacite
from aprsd import client
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.threads import tx
from aprsd.utils import counter
from aprsd.utils import json as aprsd_json
LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message"
PACKET_TYPE_ACK = "ack"
PACKET_TYPE_MICE = "mic-e"
PACKET_TYPE_WX = "weather"
PACKET_TYPE_UNKNOWN = "unknown"
PACKET_TYPE_STATUS = "status"
PACKET_TYPE_BEACON = "beacon"
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
def _int_timestamp():
"""Build a unix style timestamp integer"""
return int(round(time.time()))
def _init_msgNo(): # noqa: N802
"""For some reason __post__init doesn't get called.
So in order to initialize the msgNo field in the packet
we use this workaround.
"""
c = counter.PacketCounter()
c.increment()
return c.value
@dataclass
class Packet(metaclass=abc.ABCMeta):
from_call: str
to_call: str
addresse: str = None
format: str = None
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
packet_type: str = None
timestamp: float = field(default_factory=_int_timestamp)
# Holds the raw text string to be sent over the wire
# or holds the raw string from input packet
raw: str = None
raw_dict: dict = field(repr=False, default_factory=lambda: {})
# Fields related to sending packets out
send_count: int = field(repr=False, default=0)
retry_count: int = field(repr=False, default=3)
last_send_time: datetime.timedelta = field(repr=False, default=None)
# Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True)
def __post__init__(self):
LOG.warning(f"POST INIT {self}")
@property
def __dict__(self):
return asdict(self)
@property
def json(self):
"""
get the json formated string
"""
return json.dumps(self.__dict__, cls=aprsd_json.EnhancedJSONEncoder)
def get(self, key, default=None):
"""Emulate a getter on a dict."""
if hasattr(self, key):
return getattr(self, key)
else:
return default
def _init_for_send(self):
"""Do stuff here that is needed prior to sending over the air."""
# now build the raw message for sending
self._build_raw()
def _build_raw(self):
"""Build the self.raw string which is what is sent over the air."""
self.raw = self._filter_for_send().rstrip("\n")
@staticmethod
def factory(raw_packet):
raw = raw_packet
raw["raw_dict"] = raw.copy()
translate_fields = {
"from": "from_call",
"to": "to_call",
}
# First translate some fields
for key in translate_fields:
if key in raw:
raw[translate_fields[key]] = raw[key]
del raw[key]
if "addresse" in raw:
raw["to_call"] = raw["addresse"]
packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type
class_name = TYPE_LOOKUP[packet_type]
if packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here
if "latitude" in raw:
class_name = GPSPacket
if packet_type == PACKET_TYPE_WX:
# the weather information is in a dict
# this brings those values out to the outer dict
for key in raw["weather"]:
raw[key] = raw["weather"][key]
return dacite.from_dict(data_class=class_name, data=raw)
def log(self, header=None):
"""LOG a packet to the logfile."""
asdict(self)
log_list = ["\n"]
name = self.__class__.__name__
if header:
if "tx" in header.lower():
log_list.append(
f"{header}________({name} "
f"TX:{self.send_count+1} of {self.retry_count})",
)
else:
log_list.append(f"{header}________({name})")
# log_list.append(f" Packet : {self.__class__.__name__}")
log_list.append(f" Raw : {self.raw}")
if self.to_call:
log_list.append(f" To : {self.to_call}")
if self.from_call:
log_list.append(f" From : {self.from_call}")
if hasattr(self, "path") and self.path:
log_list.append(f" Path : {'=>'.join(self.path)}")
if hasattr(self, "via") and self.via:
log_list.append(f" VIA : {self.via}")
elif isinstance(self, MessagePacket):
log_list.append(f" Message : {self.message_text}")
if hasattr(self, "comment") and self.comment:
log_list.append(f" Comment : {self.comment}")
if self.msgNo:
log_list.append(f" Msg # : {self.msgNo}")
log_list.append(f"{header}________({name})")
LOG.info("\n".join(log_list))
LOG.debug(self)
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = self.raw[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def send(self):
"""Method to send a packet."""
self._init_for_send()
thread = tx.SendPacketThread(packet=self)
thread.start()
def send_direct(self, aprsis_client=None):
"""Send the message in the same thread as caller."""
self._init_for_send()
if aprsis_client:
cl = aprsis_client
else:
cl = client.factory.create().client
self.log(header="TX Message Direct")
cl.send(self.raw)
PacketList().tx(self)
@dataclass
class PathPacket(Packet):
path: List[str] = field(default_factory=list)
via: str = None
def _build_raw(self):
raise NotImplementedError
@dataclass
class AckPacket(PathPacket):
response: str = None
def __post__init__(self):
if self.response:
LOG.warning("Response set!")
def _build_raw(self):
"""Build the self.raw which is what is sent over the air."""
self.raw = "{}>APZ100::{}:ack{}".format(
self.from_call,
self.to_call.ljust(9),
self.msgNo,
)
def send(self):
"""Method to send a packet."""
self._init_for_send()
thread = tx.SendAckThread(packet=self)
LOG.warning(f"Starting thread to TXACK {self}")
thread.start()
@dataclass
class MessagePacket(PathPacket):
message_text: str = None
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = self.message_text[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def _build_raw(self):
"""Build the self.raw which is what is sent over the air."""
self.raw = "{}>APZ100::{}:{}{{{}".format(
self.from_call,
self.to_call.ljust(9),
self._filter_for_send().rstrip("\n"),
str(self.msgNo),
)
@dataclass()
class StatusPacket(PathPacket):
status: str = None
messagecapable: bool = False
comment: str = None
def _build_raw(self):
raise NotImplementedError
@dataclass()
class GPSPacket(PathPacket):
latitude: float = 0.00
longitude: float = 0.00
altitude: float = 0.00
rng: float = 0.00
posambiguity: int = 0
comment: str = None
symbol: str = field(default="l")
symbol_table: str = field(default="/")
# in MPH
speed: float = 0.00
# 0 to 360
course: int = 0
def _build_time_zulu(self):
"""Build the timestamp in UTC/zulu."""
if self.timestamp:
local_dt = datetime.datetime.fromtimestamp(self.timestamp)
else:
local_dt = datetime.datetime.now()
self.timestamp = datetime.datetime.timestamp(local_dt)
utc_offset_timedelta = datetime.datetime.utcnow() - local_dt
result_utc_datetime = local_dt + utc_offset_timedelta
time_zulu = result_utc_datetime.strftime("%d%H%M")
return time_zulu
def _build_raw(self):
time_zulu = self._build_time_zulu()
self.raw = (
f"{self.from_call}>{self.to_call},WIDE2-1:"
f"@{time_zulu}z{self.latitude}{self.symbol_table}"
f"{self.longitude}{self.symbol}"
)
if self.comment:
self.raw = f"{self.raw}{self.comment}"
@dataclass()
class MicEPacket(GPSPacket):
messagecapable: bool = False
mbits: str = None
mtype: str = None
def _build_raw(self):
raise NotImplementedError
@dataclass()
class WeatherPacket(GPSPacket):
symbol: str = "_"
wind_gust: float = 0.00
temperature: float = 0.00
rain_1h: float = 0.00
rain_24h: float = 0.00
rain_since_midnight: float = 0.00
humidity: int = 0
pressure: float = 0.00
comment: str = None
def _build_raw(self):
"""Build an uncompressed weather packet
Format =
_CSE/SPDgXXXtXXXrXXXpXXXPXXXhXXbXXXXX%type NEW FORMAT APRS793 June 97
NOT BACKWARD COMPATIBLE
Where: CSE/SPD is wind direction and sustained 1 minute speed
t is in degrees F
r is Rain per last 60 minutes
p is precipitation per last 24 hours (sliding 24 hour window)
P is precip per last 24 hours since midnight
b is Baro in tenths of a mb
h is humidity in percent. 00=100
g is Gust (peak winds in last 5 minutes)
# is the raw rain counter for remote WX stations
See notes on remotes below
% shows software type d=Dos, m=Mac, w=Win, etc
type shows type of WX instrument
"""
time_zulu = self._build_time_zulu()
course = "%03u" % self.course
contents = [
f"{self.from_call}>{self.to_call},WIDE1-1,WIDE2-1:",
f"@{time_zulu}z{self.latitude}{self.symbol_table}",
f"{self.longitude}{self.symbol}",
# Add CSE = Course
f"{course}",
# Speed = sustained 1 minute wind speed in mph
f"{self.symbol_table}", f"{self.speed:03.0f}",
# wind gust (peak wind speed in mph in the last 5 minutes)
f"g{self.wind_gust:03.0f}",
# Temperature in degrees F
f"t{self.temperature:03.0f}",
# Rainfall (in hundredths of an inch) in the last hour
f"r{self.rain_1h:03.0f}",
# Rainfall (in hundredths of an inch) in last 24 hours
f"p{self.rain_24h:03.0f}",
# Rainfall (in hundredths of an inch) since midnigt
f"P{self.rain_since_midnight:03.0f}",
# Humidity
f"h{self.humidity:02d}",
# Barometric pressure (in tenths of millibars/tenths of hPascal)
f"b{self.pressure:05.0f}",
]
if self.comment:
contents.append(self.comment)
self.raw = "".join(contents)
TYPE_LOOKUP = {
PACKET_TYPE_WX: WeatherPacket,
PACKET_TYPE_MESSAGE: MessagePacket,
PACKET_TYPE_ACK: AckPacket,
PACKET_TYPE_MICE: MicEPacket,
PACKET_TYPE_STATUS: StatusPacket,
PACKET_TYPE_BEACON: GPSPacket,
PACKET_TYPE_UNKNOWN: Packet,
}
def get_packet_type(packet: dict):
"""Decode the packet type from the packet."""
pkt_format = packet.get("format", None)
msg_response = packet.get("response", None)
packet_type = "unknown"
if pkt_format == "message" and msg_response == "ack":
packet_type = PACKET_TYPE_ACK
elif pkt_format == "message":
packet_type = PACKET_TYPE_MESSAGE
elif pkt_format == "mic-e":
packet_type = PACKET_TYPE_MICE
elif pkt_format == "status":
packet_type = PACKET_TYPE_STATUS
elif pkt_format == PACKET_TYPE_BEACON:
packet_type = PACKET_TYPE_BEACON
elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
if packet.get("symbol", None) == "_":
packet_type = PACKET_TYPE_WX
return packet_type
def is_message_packet(packet):
return get_packet_type(packet) == PACKET_TYPE_MESSAGE
def is_ack_packet(packet):
return get_packet_type(packet) == PACKET_TYPE_ACK
def is_mice_packet(packet):
return get_packet_type(packet) == PACKET_TYPE_MICE

View File

@ -0,0 +1,69 @@
import logging
import threading
import wrapt
from aprsd import stats, utils
from aprsd.packets import seen_list
LOG = logging.getLogger("APRSD")
class PacketList:
"""Class to track all of the packets rx'd and tx'd by aprsd."""
_instance = None
lock = threading.Lock()
config = None
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
_total_rx: int = 0
_total_tx: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
cls._instance.config = kwargs["config"]
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
def _is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.packet_list)
@wrapt.synchronized(lock)
def rx(self, packet):
"""Add a packet that was received."""
self._total_rx += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)
stats.APRSDStats().rx(packet)
@wrapt.synchronized(lock)
def tx(self, packet):
"""Add a packet that was received."""
self._total_tx += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)
stats.APRSDStats().tx(packet)
@wrapt.synchronized(lock)
def get(self):
return self.packet_list.get()
@wrapt.synchronized(lock)
def total_rx(self):
return self._total_rx
@wrapt.synchronized(lock)
def total_tx(self):
return self._total_tx

View File

@ -0,0 +1,48 @@
import datetime
import logging
import threading
import wrapt
from aprsd.utils import objectstore
LOG = logging.getLogger("APRSD")
class SeenList(objectstore.ObjectStoreMixin):
"""Global callsign seen list."""
_instance = None
lock = threading.Lock()
data: dict = {}
config = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
def is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = None
if packet.from_call:
callsign = packet.from_call
else:
LOG.warning(f"Can't find FROM in packet {packet}")
return
if callsign not in self.data:
self.data[callsign] = {
"last": None,
"count": 0,
}
self.data[callsign]["last"] = str(datetime.datetime.now())
self.data[callsign]["count"] += 1

119
aprsd/packets/tracker.py Normal file
View File

@ -0,0 +1,119 @@
import datetime
import threading
import wrapt
from aprsd.utils import objectstore
class PacketTrack(objectstore.ObjectStoreMixin):
"""Class to keep track of outstanding text messages.
This is a thread safe class that keeps track of active
messages.
When a message is asked to be sent, it is placed into this
class via it's id. The TextMessage class's send() method
automatically adds itself to this class. When the ack is
recieved from the radio, the message object is removed from
this class.
"""
_instance = None
_start_time = None
lock = threading.Lock()
config = None
data: dict = {}
total_tracked: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._start_time = datetime.datetime.now()
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
return cls._instance
def is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def __getitem__(self, name):
return self.data[name]
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.data)
@wrapt.synchronized(lock)
def keys(self):
return self.data.keys()
@wrapt.synchronized(lock)
def items(self):
return self.data.items()
@wrapt.synchronized(lock)
def values(self):
return self.data.values()
@wrapt.synchronized(lock)
def __len__(self):
return len(self.data)
@wrapt.synchronized(lock)
def __str__(self):
result = "{"
for key in self.data.keys():
result += f"{key}: {str(self.data[key])}, "
result += "}"
return result
@wrapt.synchronized(lock)
def add(self, packet):
key = int(packet.msgNo)
self.data[key] = packet
self.total_tracked += 1
@wrapt.synchronized(lock)
def get(self, id):
if id in self.data:
return self.data[id]
@wrapt.synchronized(lock)
def remove(self, id):
key = int(id)
if key in self.data.keys():
del self.data[key]
def restart(self):
"""Walk the list of messages and restart them if any."""
for key in self.data.keys():
pkt = self.data[key]
if pkt.last_send_attempt < pkt.retry_count:
pkt.send()
def _resend(self, packet):
packet._last_send_attempt = 0
packet.send()
def restart_delayed(self, count=None, most_recent=True):
"""Walk the list of delayed messages and restart them if any."""
if not count:
# Send all the delayed messages
for key in self.data.keys():
pkt = self.data[key]
if pkt._last_send_attempt == pkt._retry_count:
self._resend(pkt)
else:
# They want to resend <count> delayed messages
tmp = sorted(
self.data.items(),
reverse=most_recent,
key=lambda x: x[1].last_send_time,
)
pkt_list = tmp[:count]
for (_key, pkt) in pkt_list:
self._resend(pkt)

106
aprsd/packets/watch_list.py Normal file
View File

@ -0,0 +1,106 @@
import datetime
import logging
import threading
import wrapt
from aprsd import utils
from aprsd.utils import objectstore
LOG = logging.getLogger("APRSD")
class WatchList(objectstore.ObjectStoreMixin):
"""Global watch list and info for callsigns."""
_instance = None
lock = threading.Lock()
data = {}
config = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10)
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.data[call] = {
"last": datetime.datetime.now(),
"packets": utils.RingBuffer(
ring_size,
),
}
def is_initialized(self):
return self.config is not None
def is_enabled(self):
if self.config and "watch_list" in self.config["aprsd"]:
return self.config["aprsd"]["watch_list"].get("enabled", False)
else:
return False
def callsign_in_watchlist(self, callsign):
return callsign in self.data
@wrapt.synchronized(lock)
def update_seen(self, packet):
if packet.addresse:
callsign = packet.addresse
else:
callsign = packet.from_call
if self.callsign_in_watchlist(callsign):
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["packets"].append(packet)
def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign):
return self.data[callsign]["last"]
def age(self, callsign):
now = datetime.datetime.now()
return str(now - self.last_seen(callsign))
def max_delta(self, seconds=None):
watch_list_conf = self.config["aprsd"]["watch_list"]
if not seconds:
seconds = watch_list_conf["alert_time_seconds"]
max_timeout = {"seconds": seconds}
return datetime.timedelta(**max_timeout)
def is_old(self, callsign, seconds=None):
"""Watch list callsign last seen is old compared to now?
This tests to see if the last time we saw a callsign packet,
if that is older than the allowed timeout in the config.
We put this here so any notification plugin can use this
same test.
"""
age = self.age(callsign)
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
max_delta = self.max_delta(seconds=seconds)
if d > max_delta:
return True
else:
return False

View File

@ -1,19 +1,17 @@
# The base plugin class
import abc
import fnmatch
import importlib
import inspect
import logging
import os
import re
import textwrap
import threading
import pluggy
from thesmuggler import smuggle
import aprsd
from aprsd import client, messaging, packets, threads
from aprsd import client, packets, threads
from aprsd.packets import watch_list
# setup the global logger
@ -161,10 +159,10 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
@hookimpl
def filter(self, packet):
result = messaging.NULL_MESSAGE
result = packets.NULL_MESSAGE
if self.enabled:
wl = packets.WatchList()
if wl.callsign_in_watchlist(packet["from"]):
wl = watch_list.WatchList()
if wl.callsign_in_watchlist(packet.from_call):
# packet is from a callsign in the watch list
self.rx_inc()
try:
@ -275,7 +273,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase):
def process(self, packet):
LOG.info("HelpPlugin")
# fromcall = packet.get("from")
message = packet.get("message_text", None)
message = packet.message_text
# ack = packet.get("msgNo", "0")
a = re.search(r"^.*\s+(.*)", message)
command_name = None
@ -347,29 +345,9 @@ class PluginManager:
def _init(self):
self._pluggy_pm = pluggy.PluginManager("aprsd")
self._pluggy_pm.add_hookspecs(APRSDPluginSpec)
def load_plugins_from_path(self, module_path):
if not os.path.exists(module_path):
LOG.error(f"plugin path '{module_path}' doesn't exist.")
return None
dir_path = os.path.realpath(module_path)
pattern = "*.py"
self.obj_list = []
for path, _subdirs, files in os.walk(dir_path):
for name in files:
if fnmatch.fnmatch(name, pattern):
LOG.debug(f"MODULE? '{name}' '{path}'")
module = smuggle(f"{path}/{name}")
for mem_name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and self.is_plugin(obj):
self.obj_list.append(
{"name": mem_name, "obj": obj(self.config)},
)
return self.obj_list
# For the watchlist plugins
self._watchlist_pm = pluggy.PluginManager("aprsd")
self._watchlist_pm.add_hookspecs(APRSDPluginSpec)
def is_plugin(self, obj):
for c in inspect.getmro(obj):
@ -429,13 +407,22 @@ class PluginManager:
config=self.config,
)
if plugin_obj:
LOG.info(
"Registering plugin '{}'({})".format(
plugin_name,
plugin_obj.version,
),
)
self._pluggy_pm.register(plugin_obj)
if isinstance(plugin_obj, APRSDWatchListPluginBase):
LOG.info(
"Registering WatchList plugin '{}'({})".format(
plugin_name,
plugin_obj.version,
),
)
self._watchlist_pm.register(plugin_obj)
else:
LOG.info(
"Registering plugin '{}'({})".format(
plugin_name,
plugin_obj.version,
),
)
self._pluggy_pm.register(plugin_obj)
except Exception as ex:
LOG.error(f"Couldn't load plugin '{plugin_name}'")
LOG.exception(ex)
@ -464,15 +451,6 @@ class PluginManager:
for p_name in CORE_MESSAGE_PLUGINS:
self._load_plugin(p_name)
if self.config["aprsd"]["watch_list"].get("enabled", False):
LOG.info("Loading APRSD WatchList Plugins")
enabled_notify_plugins = self.config["aprsd"]["watch_list"].get(
"enabled_plugins",
None,
)
if enabled_notify_plugins:
for p_name in enabled_notify_plugins:
self._load_plugin(p_name)
LOG.info("Completed Plugin Loading.")
def run(self, packet):
@ -480,6 +458,10 @@ class PluginManager:
with self.lock:
return self._pluggy_pm.hook.filter(packet=packet)
def run_watchlist(self, packet):
with self.lock:
return self._watchlist_pm.hook.filter(packet=packet)
def stop(self):
"""Stop all threads created by all plugins."""
with self.lock:
@ -492,5 +474,10 @@ class PluginManager:
self._pluggy_pm.register(obj)
def get_plugins(self):
plugin_list = []
if self._pluggy_pm:
return self._pluggy_pm.get_plugins()
plugin_list.append(self._pluggy_pm.get_plugins())
if self._watchlist_pm:
plugin_list.append(self._watchlist_pm.get_plugins())
return plugin_list

View File

@ -10,7 +10,7 @@ import time
import imapclient
from aprsd import messaging, plugin, stats, threads
from aprsd import packets, plugin, stats, threads
from aprsd.utils import trace
@ -80,20 +80,19 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
def create_threads(self):
if self.enabled:
return APRSDEmailThread(
msg_queues=threads.msg_queues,
config=self.config,
)
@trace.trace
def process(self, packet):
def process(self, packet: packets.MessagePacket):
LOG.info("Email COMMAND")
if not self.enabled:
# Email has not been enabled
# so the plugin will just NOOP
return messaging.NULL_MESSAGE
return packets.NULL_MESSAGE
fromcall = packet.get("from")
message = packet.get("message_text", None)
fromcall = packet.from_call
message = packet.message_text
ack = packet.get("msgNo", "0")
reply = None
@ -109,7 +108,7 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
if r is not None:
LOG.debug("RESEND EMAIL")
resend_email(self.config, r.group(1), fromcall)
reply = messaging.NULL_MESSAGE
reply = packets.NULL_MESSAGE
# -user@address.com body of email
elif re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message):
# (same search again)
@ -142,7 +141,7 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
if not too_soon or ack == 0:
LOG.info(f"Send email '{content}'")
send_result = send_email(self.config, to_addr, content)
reply = messaging.NULL_MESSAGE
reply = packets.NULL_MESSAGE
if send_result != 0:
reply = f"-{to_addr} failed"
else:
@ -157,7 +156,7 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
self.email_sent_dict.clear()
self.email_sent_dict[ack] = now
else:
reply = messaging.NULL_MESSAGE
reply = packets.NULL_MESSAGE
LOG.info(
"Email for message number "
+ ack
@ -165,7 +164,6 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
)
else:
reply = "Bad email address"
# messaging.send_message(fromcall, "Bad email address")
return reply
@ -466,13 +464,12 @@ def resend_email(config, count, fromcall):
from_addr = shortcuts_inverted[from_addr]
# asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors="ignore")
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(
config["aprs"]["login"],
fromcall,
reply,
pkt = packets.MessagePacket(
from_call=config["aprsd"]["callsign"],
to_call=fromcall,
message_text=reply,
)
msg.send()
pkt.send()
msgexists = True
if msgexists is not True:
@ -489,9 +486,12 @@ def resend_email(config, count, fromcall):
str(m).zfill(2),
str(s).zfill(2),
)
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(config["aprs"]["login"], fromcall, reply)
msg.send()
pkt = packets.MessagePacket(
from_call=config["aprsd"]["callsign"],
to_call=fromcall,
message_text=reply,
)
pkt.send()
# check email more often since we're resending one now
EmailInfo().delay = 60
@ -501,9 +501,8 @@ def resend_email(config, count, fromcall):
class APRSDEmailThread(threads.APRSDThread):
def __init__(self, msg_queues, config):
def __init__(self, config):
super().__init__("EmailThread")
self.msg_queues = msg_queues
self.config = config
self.past = datetime.datetime.now()
@ -605,12 +604,14 @@ class APRSDEmailThread(threads.APRSDThread):
from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " " + body.decode(errors="ignore")
msg = messaging.TextMessage(
self.config["aprs"]["login"],
self.config["ham"]["callsign"],
reply,
# Send the message to the registered user in the
# config ham.callsign
pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=self.config["ham"]["callsign"],
message_text=reply,
)
msg.send()
pkt.send()
# flag message as sent via aprs
try:
server.add_flags(msgid, ["APRS"])

View File

@ -2,7 +2,7 @@ import logging
import shutil
import subprocess
from aprsd import plugin
from aprsd import packets, plugin
from aprsd.utils import trace
@ -26,7 +26,7 @@ class FortunePlugin(plugin.APRSDRegexCommandPluginBase):
self.enabled = True
@trace.trace
def process(self, packet):
def process(self, packet: packets.MessagePacket):
LOG.info("FortunePlugin")
# fromcall = packet.get("from")

View File

@ -2,7 +2,7 @@ import logging
import re
import time
from aprsd import plugin, plugin_utils
from aprsd import packets, plugin, plugin_utils
from aprsd.utils import trace
@ -20,9 +20,9 @@ class LocationPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
self.ensure_aprs_fi_key()
@trace.trace
def process(self, packet):
def process(self, packet: packets.MessagePacket):
LOG.info("Location Plugin")
fromcall = packet.get("from")
fromcall = packet.from_call
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")

View File

@ -1,7 +1,6 @@
import logging
from aprsd import messaging, packets, plugin
from aprsd.utils import trace
from aprsd import packets, plugin
LOG = logging.getLogger("APRSD")
@ -18,44 +17,42 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
short_description = "Notify me when a CALLSIGN is recently seen on APRS-IS"
@trace.trace
def process(self, packet):
def process(self, packet: packets.MessagePacket):
LOG.info("NotifySeenPlugin")
notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"]
fromcall = packet.get("from")
fromcall = packet.from_call
wl = packets.WatchList()
age = wl.age(fromcall)
if wl.is_old(packet["from"]):
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
fromcall,
age,
wl.max_delta(),
),
)
packet_type = packets.get_packet_type(packet)
# we shouldn't notify the alert user that they are online.
if fromcall != notify_callsign:
msg = messaging.TextMessage(
self.config["aprs"]["login"],
notify_callsign,
f"{fromcall} was just seen by type:'{packet_type}'",
# We don't need to keep this around if it doesn't go thru
if fromcall != notify_callsign:
if wl.is_old(fromcall):
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
fromcall,
age,
wl.max_delta(),
),
)
packet_type = packet.__class__.__name__
# we shouldn't notify the alert user that they are online.
pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=notify_callsign,
message_text=(
f"{fromcall} was just seen by type:'{packet_type}'"
),
allow_delay=False,
)
return msg
pkt.allow_delay = False
return pkt
else:
LOG.debug("fromcall and notify_callsign are the same, not notifying")
return messaging.NULL_MESSAGE
LOG.debug(
"Not old enough to notify on callsign "
f"'{fromcall}' : {age} < {wl.max_delta()}",
)
return packets.NULL_MESSAGE
else:
LOG.debug(
"Not old enough to notify on callsign '{}' : {} < {}".format(
fromcall,
age,
wl.max_delta(),
),
)
return messaging.NULL_MESSAGE
LOG.debug("fromcall and notify_callsign are the same, ignoring")
return packets.NULL_MESSAGE

View File

@ -2,7 +2,8 @@ import datetime
import logging
import re
from aprsd import messaging, plugin
from aprsd import packets, plugin
from aprsd.packets import tracker
from aprsd.utils import trace
@ -17,17 +18,17 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
short_description = "APRSD Owner command to query messages in the MsgTrack"
@trace.trace
def process(self, packet):
def process(self, packet: packets.MessagePacket):
LOG.info("Query COMMAND")
fromcall = packet.get("from")
fromcall = packet.from_call
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
tracker = messaging.MsgTrack()
pkt_tracker = tracker.PacketTrack()
now = datetime.datetime.now()
reply = "Pending messages ({}) {}".format(
len(tracker),
len(pkt_tracker),
now.strftime("%H:%M:%S"),
)
@ -38,11 +39,11 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
# resend last N most recent: "!3"
r = re.search(r"^\!([0-9]).*", message)
if r is not None:
if len(tracker) > 0:
if len(pkt_tracker) > 0:
last_n = r.group(1)
reply = messaging.NULL_MESSAGE
reply = packets.NULL_MESSAGE
LOG.debug(reply)
tracker.restart_delayed(count=int(last_n))
pkt_tracker.restart_delayed(count=int(last_n))
else:
reply = "No pending msgs to resend"
LOG.debug(reply)
@ -51,10 +52,10 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
# resend all: "!a"
r = re.search(r"^\![aA].*", message)
if r is not None:
if len(tracker) > 0:
reply = messaging.NULL_MESSAGE
if len(pkt_tracker) > 0:
reply = packets.NULL_MESSAGE
LOG.debug(reply)
tracker.restart_delayed()
pkt_tracker.restart_delayed()
else:
reply = "No pending msgs"
LOG.debug(reply)
@ -65,7 +66,7 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
if r is not None:
reply = "Deleted ALL pending msgs."
LOG.debug(reply)
tracker.flush()
pkt_tracker.flush()
return reply
return reply

View File

@ -4,7 +4,7 @@ import time
import pytz
from aprsd import plugin, plugin_utils
from aprsd import packets, plugin, plugin_utils
from aprsd.utils import fuzzy, trace
@ -42,7 +42,7 @@ class TimePlugin(plugin.APRSDRegexCommandPluginBase):
return reply
@trace.trace
def process(self, packet):
def process(self, packet: packets.Packet):
LOG.info("TIME COMMAND")
# So we can mock this in unit tests
localzone = self._get_local_tz()
@ -60,9 +60,9 @@ class TimeOWMPlugin(TimePlugin, plugin.APRSFIKEYMixin):
self.ensure_aprs_fi_key()
@trace.trace
def process(self, packet):
fromcall = packet.get("from")
message = packet.get("message_text", None)
def process(self, packet: packets.MessagePacket):
fromcall = packet.from_call
message = packet.message_text
# ack = packet.get("msgNo", "0")
# optional second argument is a callsign to search

View File

@ -21,15 +21,6 @@ class APRSDStats:
_aprsis_server = None
_aprsis_keepalive = None
_msgs_tracked = 0
_msgs_tx = 0
_msgs_rx = 0
_msgs_mice_rx = 0
_ack_tx = 0
_ack_rx = 0
_email_thread_last_time = None
_email_tx = 0
_email_rx = 0
@ -37,6 +28,37 @@ class APRSDStats:
_mem_current = 0
_mem_peak = 0
_pkt_cnt = {
"Packet": {
"tx": 0,
"rx": 0,
},
"AckPacket": {
"tx": 0,
"rx": 0,
},
"GPSPacket": {
"tx": 0,
"rx": 0,
},
"StatusPacket": {
"tx": 0,
"rx": 0,
},
"MicEPacket": {
"tx": 0,
"rx": 0,
},
"MessagePacket": {
"tx": 0,
"rx": 0,
},
"WeatherPacket": {
"tx": 0,
"rx": 0,
},
}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
@ -90,59 +112,18 @@ class APRSDStats:
def set_aprsis_keepalive(self):
self._aprsis_keepalive = datetime.datetime.now()
@wrapt.synchronized(lock)
@property
def msgs_tx(self):
return self._msgs_tx
def rx(self, packet):
type = packet.__class__.__name__
self._pkt_cnt[type]["rx"] += 1
@wrapt.synchronized(lock)
def msgs_tx_inc(self):
self._msgs_tx += 1
@wrapt.synchronized(lock)
@property
def msgs_rx(self):
return self._msgs_rx
@wrapt.synchronized(lock)
def msgs_rx_inc(self):
self._msgs_rx += 1
@wrapt.synchronized(lock)
@property
def msgs_mice_rx(self):
return self._msgs_mice_rx
@wrapt.synchronized(lock)
def msgs_mice_inc(self):
self._msgs_mice_rx += 1
@wrapt.synchronized(lock)
@property
def ack_tx(self):
return self._ack_tx
@wrapt.synchronized(lock)
def ack_tx_inc(self):
self._ack_tx += 1
@wrapt.synchronized(lock)
@property
def ack_rx(self):
return self._ack_rx
@wrapt.synchronized(lock)
def ack_rx_inc(self):
self._ack_rx += 1
def tx(self, packet):
type = packet.__class__.__name__
self._pkt_cnt[type]["tx"] += 1
@wrapt.synchronized(lock)
@property
def msgs_tracked(self):
return self._msgs_tracked
@wrapt.synchronized(lock)
def msgs_tracked_inc(self):
self._msgs_tracked += 1
return packets.PacketTrack().total_tracked
@wrapt.synchronized(lock)
@property
@ -204,11 +185,13 @@ class APRSDStats:
wl = packets.WatchList()
sl = packets.SeenList()
pl = packets.PacketList()
stats = {
"aprsd": {
"version": aprsd.__version__,
"uptime": utils.strfdelta(self.uptime),
"callsign": self.config["aprsd"]["callsign"],
"memory_current": int(self.memory),
"memory_current_str": utils.human_size(self.memory),
"memory_peak": int(self.memory_peak),
@ -221,18 +204,20 @@ class APRSDStats:
"callsign": self.config["aprs"]["login"],
"last_update": last_aprsis_keepalive,
},
"packets": {
"tracked": int(pl.total_tx() + pl.total_rx()),
"sent": int(pl.total_tx()),
"received": int(pl.total_rx()),
},
"messages": {
"tracked": int(self.msgs_tracked),
"sent": int(self.msgs_tx),
"recieved": int(self.msgs_rx),
"ack_sent": int(self.ack_tx),
"ack_recieved": int(self.ack_rx),
"mic-e recieved": int(self.msgs_mice_rx),
"sent": self._pkt_cnt["MessagePacket"]["tx"],
"received": self._pkt_cnt["MessagePacket"]["tx"],
"ack_sent": self._pkt_cnt["AckPacket"]["tx"],
},
"email": {
"enabled": self.config["aprsd"]["email"]["enabled"],
"sent": int(self._email_tx),
"recieved": int(self._email_rx),
"received": int(self._email_rx),
"thread_last_update": last_update,
},
"plugins": plugin_stats,
@ -240,15 +225,16 @@ class APRSDStats:
return stats
def __str__(self):
pl = packets.PacketList()
return (
"Uptime:{} Msgs TX:{} RX:{} "
"ACK: TX:{} RX:{} "
"Email TX:{} RX:{} LastLoop:{} ".format(
self.uptime,
self._msgs_tx,
self._msgs_rx,
self._ack_tx,
self._ack_rx,
pl.total_tx(),
pl.total_rx(),
self._pkt_cnt["AckPacket"]["tx"],
self._pkt_cnt["AckPacket"]["rx"],
self._email_tx,
self._email_rx,
self._email_thread_last_time,

View File

@ -7,7 +7,4 @@ from .keep_alive import KeepAliveThread # noqa: F401
from .rx import APRSDRXThread # noqa: F401
rx_msg_queue = queue.Queue(maxsize=20)
msg_queues = {
"rx": rx_msg_queue,
}
packet_queue = queue.Queue(maxsize=20)

View File

@ -1,6 +1,5 @@
import abc
import logging
from queue import Queue
import threading
import wrapt
@ -16,7 +15,6 @@ class APRSDThreadList:
threads_list = []
lock = threading.Lock()
global_queue = Queue()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
@ -26,7 +24,6 @@ class APRSDThreadList:
@wrapt.synchronized(lock)
def add(self, thread_obj):
thread_obj.set_global_queue(self.global_queue)
self.threads_list.append(thread_obj)
@wrapt.synchronized(lock)
@ -35,10 +32,11 @@ class APRSDThreadList:
@wrapt.synchronized(lock)
def stop_all(self):
self.global_queue.put_nowait({"quit": True})
"""Iterate over all threads and call stop on them."""
for th in self.threads_list:
LOG.info(f"Stopping Thread {th.name}")
if hasattr(th, "packet"):
LOG.info(F"{th.name} packet {th.packet}")
th.stop()
@wrapt.synchronized(lock)
@ -48,30 +46,15 @@ class APRSDThreadList:
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
global_queue = None
def __init__(self, name):
super().__init__(name=name)
self.thread_stop = False
APRSDThreadList().add(self)
def set_global_queue(self, global_queue):
self.global_queue = global_queue
def _should_quit(self):
""" see if we have a quit message from the global queue."""
if self.thread_stop:
return True
if self.global_queue.empty():
return False
msg = self.global_queue.get(timeout=1)
if not msg:
return False
if "quit" in msg and msg["quit"] is True:
# put the message back on the queue for others
self.global_queue.put_nowait(msg)
self.thread_stop = True
return True
def stop(self):
self.thread_stop = True

View File

@ -3,7 +3,7 @@ import logging
import time
import tracemalloc
from aprsd import client, messaging, packets, stats, utils
from aprsd import client, packets, stats, utils
from aprsd.threads import APRSDThread, APRSDThreadList
@ -23,7 +23,7 @@ class KeepAliveThread(APRSDThread):
def loop(self):
if self.cntr % 60 == 0:
tracker = messaging.MsgTrack()
pkt_tracker = packets.PacketTrack()
stats_obj = stats.APRSDStats()
pl = packets.PacketList()
thread_list = APRSDThreadList()
@ -45,17 +45,22 @@ class KeepAliveThread(APRSDThread):
except KeyError:
login = self.config["ham"]["callsign"]
if pkt_tracker.is_initialized():
tracked_packets = len(pkt_tracker)
else:
tracked_packets = 0
keepalive = (
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}"
).format(
login,
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
len(tracker),
stats_obj.msgs_tx,
stats_obj.msgs_rx,
pl.total_rx(),
pl.total_tx(),
tracked_packets,
stats_obj._pkt_cnt["MessagePacket"]["tx"],
stats_obj._pkt_cnt["MessagePacket"]["rx"],
last_msg_time,
email_thread_time,
utils.human_size(current),

View File

@ -1,10 +1,11 @@
import abc
import logging
import queue
import time
import aprslib
from aprsd import client, messaging, packets, plugin, stats
from aprsd import client, packets, plugin
from aprsd.threads import APRSDThread
@ -12,10 +13,10 @@ LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread):
def __init__(self, msg_queues, config):
def __init__(self, config, packet_queue):
super().__init__("RX_MSG")
self.msg_queues = msg_queues
self.config = config
self.packet_queue = packet_queue
self._client = client.factory.create()
def stop(self):
@ -23,7 +24,6 @@ class APRSDRXThread(APRSDThread):
client.factory.create().client.stop()
def loop(self):
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
@ -66,11 +66,10 @@ class APRSDPluginRXThread(APRSDRXThread):
"""
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
thread = APRSDPluginProcessPacketThread(
config=self.config,
packet=packet,
)
thread.start()
# LOG.debug(raw)
packet.log(header="RX")
packets.PacketList().rx(packet)
self.packet_queue.put(packet)
class APRSDProcessPacketThread(APRSDThread):
@ -81,81 +80,85 @@ class APRSDProcessPacketThread(APRSDThread):
will ack a message before sending the packet to the subclass
for processing."""
def __init__(self, config, packet):
def __init__(self, config, packet_queue):
self.config = config
self.packet = packet
name = self.packet["raw"][:10]
super().__init__(f"RXPKT-{name}")
self.packet_queue = packet_queue
super().__init__("ProcessPKT")
self._loop_cnt = 1
def process_ack_packet(self, packet):
ack_num = packet.get("msgNo")
ack_num = packet.msgNo
LOG.info(f"Got ack for message {ack_num}")
messaging.log_message(
"RXACK",
packet["raw"],
None,
ack=ack_num,
fromcall=packet["from"],
)
tracker = messaging.MsgTrack()
tracker.remove(ack_num)
stats.APRSDStats().ack_rx_inc()
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
return
def loop(self):
"""Process a packet received from aprs-is server."""
packet = self.packet
packets.PacketList().add(packet)
try:
packet = self.packet_queue.get(timeout=1)
if packet:
self.process_packet(packet)
except queue.Empty:
pass
self._loop_cnt += 1
return True
fromcall = packet["from"]
tocall = packet.get("addresse", None)
msg = packet.get("message_text", None)
msg_id = packet.get("msgNo", "0")
msg_response = packet.get("response", None)
# LOG.debug(f"Got packet from '{fromcall}' - {packet}")
def process_packet(self, packet):
"""Process a packet received from aprs-is server."""
LOG.debug(f"RXPKT-LOOP {self._loop_cnt}")
our_call = self.config["aprsd"]["callsign"].lower()
from_call = packet.from_call
if packet.addresse:
to_call = packet.addresse
else:
to_call = packet.to_call
msg_id = packet.msgNo
# We don't put ack packets destined for us through the
# plugins.
wl = packets.WatchList()
wl.update_seen(packet)
if (
tocall
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
and msg_response == "ack"
isinstance(packet, packets.AckPacket)
and packet.addresse.lower() == our_call
):
self.process_ack_packet(packet)
else:
# It's not an ACK for us, so lets run it through
# the plugins.
messaging.log_message(
"Received Message",
packet["raw"],
msg,
fromcall=fromcall,
msg_num=msg_id,
)
# Only ack messages that were sent directly to us
if (
tocall
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
):
stats.APRSDStats().msgs_rx_inc()
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(
self.config["aprsd"]["callsign"],
fromcall,
msg_id=msg_id,
)
ack.send()
if isinstance(packet, packets.MessagePacket):
if to_call and to_call.lower() == our_call:
# It's a MessagePacket and it's for us!
# let any threads do their thing, then ack
# send an ack last
ack_pkt = packets.AckPacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
msgNo=msg_id,
)
ack_pkt.send()
self.process_non_ack_packet(packet)
self.process_our_message_packet(packet)
else:
# Packet wasn't meant for us!
self.process_other_packet(packet, for_us=False)
else:
LOG.info("Packet was not for us.")
LOG.debug("Packet processing complete")
self.process_other_packet(
packet, for_us=(to_call.lower() == our_call),
)
LOG.debug("Packet processing complete")
return False
@abc.abstractmethod
def process_non_ack_packet(self, *args, **kwargs):
"""Ack packets already dealt with here."""
def process_our_message_packet(self, packet):
"""Process a MessagePacket destined for us!"""
def process_other_packet(self, packet, for_us=False):
"""Process an APRS Packet that isn't a message or ack"""
if not for_us:
LOG.info("Got a packet not meant for us.")
else:
LOG.info("Got a non AckPacket/MessagePacket")
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
@ -163,18 +166,44 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
This is the main aprsd server plugin processing thread."""
def process_non_ack_packet(self, packet):
def process_other_packet(self, packet, for_us=False):
pm = plugin.PluginManager()
try:
results = pm.run_watchlist(packet)
for reply in results:
if isinstance(reply, list):
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, packets.Packet):
subreply.send()
else:
to_call = self.config["aprsd"]["watch_list"]["alert_callsign"]
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=to_call,
message_text=subreply,
)
msg_pkt.send()
elif isinstance(reply, packets.Packet):
# We have a message based object.
reply.send()
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.exception(ex)
def process_our_message_packet(self, packet):
"""Send the packet through the plugins."""
fromcall = packet["from"]
tocall = packet.get("addresse", None)
msg = packet.get("message_text", None)
packet.get("msgNo", "0")
packet.get("response", None)
from_call = packet.from_call
if packet.addresse:
to_call = packet.addresse
else:
to_call = None
# msg = packet.get("message_text", None)
# packet.get("msgNo", "0")
# packet.get("response", None)
pm = plugin.PluginManager()
try:
results = pm.run(packet)
wl = packets.WatchList()
wl.update_seen(packet)
replied = False
for reply in results:
if isinstance(reply, list):
@ -182,18 +211,17 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
replied = True
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, messaging.Message):
if isinstance(subreply, packets.Packet):
subreply.send()
else:
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
subreply,
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=subreply,
)
msg.send()
elif isinstance(reply, messaging.Message):
msg_pkt.send()
elif isinstance(reply, packets.Packet):
# We have a message based object.
LOG.debug(f"Sending '{reply}'")
reply.send()
replied = True
else:
@ -202,35 +230,36 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
# us that they processed the message correctly, but have
# nothing to reply with, so we avoid replying with a
# usage string
if reply is not messaging.NULL_MESSAGE:
if reply is not packets.NULL_MESSAGE:
LOG.debug(f"Sending '{reply}'")
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
reply,
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=reply,
)
msg.send()
msg_pkt.send()
# If the message was for us and we didn't have a
# response, then we send a usage statement.
if tocall == self.config["aprsd"]["callsign"] and not replied:
if to_call == self.config["aprsd"]["callsign"] and not replied:
LOG.warning("Sending help!")
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
"Unknown command! Send 'help' message for help",
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text="Unknown command! Send 'help' message for help",
)
msg.send()
msg_pkt.send()
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.exception(ex)
# Do we need to send a reply?
if tocall == self.config["aprsd"]["callsign"]:
if to_call == self.config["aprsd"]["callsign"]:
reply = "A Plugin failed! try again?"
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
reply,
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=reply,
)
msg.send()
msg_pkt.send()
LOG.debug("Completed process_our_message_packet")

135
aprsd/threads/tx.py Normal file
View File

@ -0,0 +1,135 @@
import datetime
import logging
import time
from aprsd import client
from aprsd import threads as aprsd_threads
from aprsd.packets import packet_list, tracker
LOG = logging.getLogger("APRSD")
class SendPacketThread(aprsd_threads.APRSDThread):
loop_count: int = 1
def __init__(self, packet):
self.packet = packet
name = self.packet.raw[:5]
super().__init__(f"TXPKT-{self.packet.msgNo}-{name}")
pkt_tracker = tracker.PacketTrack()
pkt_tracker.add(packet)
def loop(self):
"""Loop until a message is acked or it gets delayed.
We only sleep for 5 seconds between each loop run, so
that CTRL-C can exit the app in a short period. Each sleep
means the app quitting is blocked until sleep is done.
So we keep track of the last send attempt and only send if the
last send attempt is old enough.
"""
pkt_tracker = tracker.PacketTrack()
# lets see if the message is still in the tracking queue
packet = pkt_tracker.get(self.packet.msgNo)
if not packet:
# The message has been removed from the tracking queue
# So it got acked and we are done.
LOG.info(
f"{packet.__class__.__name__}"
f"({self.packet.msgNo}) "
"Message Send Complete via Ack.",
)
return False
else:
send_now = False
if packet.send_count == packet.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info(
f"{packet.__class__.__name__} "
f"({packet.msgNo}) "
"Message Send Complete. Max attempts reached"
f" {packet.retry_count}",
)
if not packet.allow_delay:
pkt_tracker.remove(packet.msgNo)
return False
# Message is still outstanding and needs to be acked.
if packet.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
sleeptime = (packet.send_count + 1) * 31
delta = now - packet.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
send_now = True
else:
send_now = True
if send_now:
# no attempt time, so lets send it, and start
# tracking the time.
packet.log("TX")
cl = client.factory.create().client
cl.send(packet.raw)
packet_list.PacketList().tx(packet)
packet.last_send_time = datetime.datetime.now()
packet.send_count += 1
time.sleep(1)
# Make sure we get called again.
self.loop_count += 1
return True
class SendAckThread(aprsd_threads.APRSDThread):
loop_count: int = 1
def __init__(self, packet):
self.packet = packet
super().__init__(f"SendAck-{self.packet.msgNo}")
def loop(self):
"""Separate thread to send acks with retries."""
send_now = False
if self.packet.send_count == self.packet.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info(
f"{self.packet.__class__.__name__}"
f"({self.packet.msgNo}) "
"Send Complete. Max attempts reached"
f" {self.packet.retry_count}",
)
return False
if self.packet.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
sleep_time = 31
delta = now - self.packet.last_send_time
if delta > datetime.timedelta(seconds=sleep_time):
# It's time to try to send it again
send_now = True
elif self.loop_count % 10 == 0:
LOG.debug(f"Still wating. {delta}")
else:
send_now = True
if send_now:
cl = client.factory.create().client
self.packet.log("TX")
cl.send(self.packet.raw)
packet_list.PacketList().tx(self.packet)
self.packet.send_count += 1
self.packet.last_send_time = datetime.datetime.now()
time.sleep(1)
self.loop_count += 1
return True

48
aprsd/utils/counter.py Normal file
View File

@ -0,0 +1,48 @@
from multiprocessing import RawValue
import threading
import wrapt
class PacketCounter:
"""
Global Packet id counter class.
This is a singleton based class that keeps
an incrementing counter for all packets to
be sent. All new Packet objects gets a new
message id, which is the next number available
from the PacketCounter.
"""
_instance = None
max_count = 9999
lock = threading.Lock()
def __new__(cls, *args, **kwargs):
"""Make this a singleton class."""
if cls._instance is None:
cls._instance = super().__new__(cls, *args, **kwargs)
cls._instance.val = RawValue("i", 1)
return cls._instance
@wrapt.synchronized(lock)
def increment(self):
if self.val.value == self.max_count:
self.val.value = 1
else:
self.val.value += 1
@property
@wrapt.synchronized(lock)
def value(self):
return self.val.value
@wrapt.synchronized(lock)
def __repr__(self):
return str(self.val.value)
@wrapt.synchronized(lock)
def __str__(self):
return str(self.val.value)

60
aprsd/utils/json.py Normal file
View File

@ -0,0 +1,60 @@
import datetime
import decimal
import json
import sys
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
args = (
"year", "month", "day", "hour", "minute",
"second", "microsecond",
)
return {
"__type__": "datetime.datetime",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.date):
args = ("year", "month", "day")
return {
"__type__": "datetime.date",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.time):
args = ("hour", "minute", "second", "microsecond")
return {
"__type__": "datetime.time",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.timedelta):
args = ("days", "seconds", "microseconds")
return {
"__type__": "datetime.timedelta",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, decimal.Decimal):
return {
"__type__": "decimal.Decimal",
"args": [str(obj)],
}
else:
return super().default(obj)
class EnhancedJSONDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
super().__init__(
*args, object_hook=self.object_hook,
**kwargs,
)
def object_hook(self, d):
if "__type__" not in d:
return d
o = sys.modules[__name__]
for e in d["__type__"].split("."):
o = getattr(o, e)
args, kwargs = d.get("args", ()), d.get("kwargs", {})
return o(*args, **kwargs)

View File

@ -1,3 +1,4 @@
import abc
import logging
import os
import pathlib
@ -9,7 +10,7 @@ from aprsd import config as aprsd_config
LOG = logging.getLogger("APRSD")
class ObjectStoreMixin:
class ObjectStoreMixin(metaclass=abc.ABCMeta):
"""Class 'MIXIN' intended to save/load object data.
The asumption of how this mixin is used:
@ -23,6 +24,13 @@ class ObjectStoreMixin:
When APRSD Starts, it calls load()
aprsd server -f (flush) will wipe all saved objects.
"""
@abc.abstractmethod
def is_initialized(self):
"""Return True if the class has been setup correctly.
If this returns False, the ObjectStore doesn't save anything.
"""
def __len__(self):
return len(self.data)
@ -36,13 +44,16 @@ class ObjectStoreMixin:
return self.data[id]
def _init_store(self):
sl = self._save_location()
if not os.path.exists(sl):
LOG.warning(f"Save location {sl} doesn't exist")
try:
os.makedirs(sl)
except Exception as ex:
LOG.exception(ex)
if self.is_initialized():
sl = self._save_location()
if not os.path.exists(sl):
LOG.warning(f"Save location {sl} doesn't exist")
try:
os.makedirs(sl)
except Exception as ex:
LOG.exception(ex)
else:
LOG.warning(f"{self.__class__.__name__} is not initialized")
def _save_location(self):
save_location = self.config.get("aprsd.save_location", None)
@ -68,38 +79,45 @@ class ObjectStoreMixin:
def save(self):
"""Save any queued to disk?"""
if len(self) > 0:
LOG.info(f"{self.__class__.__name__}::Saving {len(self)} entries to disk at {self._save_location()}")
with open(self._save_filename(), "wb+") as fp:
pickle.dump(self._dump(), fp)
else:
LOG.debug(
"{} Nothing to save, flushing old save file '{}'".format(
self.__class__.__name__,
self._save_filename(),
),
)
self.flush()
if self.is_initialized():
if len(self) > 0:
LOG.info(
f"{self.__class__.__name__}::Saving"
f" {len(self)} entries to disk at"
f"{self._save_location()}",
)
with open(self._save_filename(), "wb+") as fp:
pickle.dump(self._dump(), fp)
else:
LOG.debug(
"{} Nothing to save, flushing old save file '{}'".format(
self.__class__.__name__,
self._save_filename(),
),
)
self.flush()
def load(self):
if os.path.exists(self._save_filename()):
try:
with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
)
LOG.debug(f"{self.data}")
except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex)
self.data = {}
if self.is_initialized():
if os.path.exists(self._save_filename()):
try:
with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
)
LOG.debug(f"{self.data}")
except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex)
self.data = {}
def flush(self):
"""Nuke the old pickle file that stored the old results from last aprsd run."""
if os.path.exists(self._save_filename()):
pathlib.Path(self._save_filename()).unlink()
with self.lock:
self.data = {}
if self.is_initialized():
if os.path.exists(self._save_filename()):
pathlib.Path(self._save_filename()).unlink()
with self.lock:
self.data = {}

View File

@ -1,6 +1,9 @@
class RingBuffer:
"""class that implements a not-yet-full buffer"""
max: int = 100
data: list = []
def __init__(self, size_max):
self.max = size_max
self.data = []

View File

@ -219,6 +219,7 @@ function updateQuadData(chart, label, first, second, third, fourth) {
}
function update_stats( data ) {
our_callsign = data["stats"]["aprsd"]["callsign"];
$("#version").text( data["stats"]["aprsd"]["version"] );
$("#aprs_connection").html( data["aprs_connection"] );
$("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] );
@ -226,7 +227,7 @@ function update_stats( data ) {
$("#jsonstats").html(html_pretty);
short_time = data["time"].split(/\s(.+)/)[1];
updateDualData(packets_chart, short_time, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]);
updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["recieved"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]);
updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]);
}

View File

@ -1,5 +1,6 @@
// watchlist is a dict of ham callsign => symbol, packets
var watchlist = {};
var our_callsign = "";
function aprs_img(item, x_offset, y_offset) {
var x = x_offset * -16;
@ -107,34 +108,35 @@ function update_packets( data ) {
packetsdiv.html('')
}
jQuery.each(data, function(i, val) {
update_watchlist_from_packet(val['from'], val);
if ( packet_list.hasOwnProperty(val["ts"]) == false ) {
pkt = JSON.parse(val);
update_watchlist_from_packet(pkt['from_call'], pkt);
if ( packet_list.hasOwnProperty(val["timestamp"]) == false ) {
// Store the packet
packet_list[val["ts"]] = val;
ts_str = val["ts"].toString();
ts = ts_str.split(".")[0]*1000;
var d = new Date(ts).toLocaleDateString("en-US")
var t = new Date(ts).toLocaleTimeString("en-US")
if (val.hasOwnProperty('from') == false) {
from = val['fromcall']
title_id = 'title_tx'
packet_list[pkt["timestamp"]] = pkt;
//ts_str = val["timestamp"].toString();
//ts = ts_str.split(".")[0]*1000;
ts = pkt["timestamp"]
var d = new Date(ts).toLocaleDateString("en-US");
var t = new Date(ts).toLocaleTimeString("en-US");
var from_call = pkt['from_call'];
if (from_call == our_callsign) {
title_id = 'title_tx';
} else {
from = val['from']
title_id = 'title_rx'
title_id = 'title_rx';
}
var from_to = d + " " + t + "&nbsp;&nbsp;&nbsp;&nbsp;" + from + " > "
var from_to = d + " " + t + "&nbsp;&nbsp;&nbsp;&nbsp;" + from_call + " > "
if (val.hasOwnProperty('addresse')) {
from_to = from_to + val['addresse']
} else if (val.hasOwnProperty('tocall')) {
from_to = from_to + val['tocall']
} else if (val.hasOwnProperty('format') && val['format'] == 'mic-e') {
from_to = from_to + pkt['addresse']
} else if (pkt.hasOwnProperty('to_call')) {
from_to = from_to + pkt['to_call']
} else if (pkt.hasOwnProperty('format') && pkt['format'] == 'mic-e') {
from_to = from_to + "Mic-E"
}
from_to = from_to + "&nbsp;&nbsp;-&nbsp;&nbsp;" + val['raw']
from_to = from_to + "&nbsp;&nbsp;-&nbsp;&nbsp;" + pkt['raw']
json_pretty = Prism.highlight(JSON.stringify(val, null, '\t'), Prism.languages.json, 'json');
json_pretty = Prism.highlight(JSON.stringify(pkt, null, '\t'), Prism.languages.json, 'json');
pkt_html = '<div class="title" id="' + title_id + '"><i class="dropdown icon"></i>' + from_to + '</div><div class="content"><p class="transition hidden"><pre class="language-json">' + json_pretty + '</p></p></div>'
packetsdiv.prepend(pkt_html);
}

View File

@ -4,7 +4,7 @@
#
# pip-compile --annotation-style=line --resolver=backtracking dev-requirements.in
#
add-trailing-comma==2.3.0 # via gray
add-trailing-comma==2.4.0 # via gray
alabaster==0.7.12 # via sphinx
attrs==22.1.0 # via jsonschema, pytest
autoflake==1.5.3 # via gray
@ -34,7 +34,7 @@ imagesize==1.4.1 # via sphinx
importlib-metadata==5.1.0 # via sphinx
importlib-resources==5.10.1 # via fixit
iniconfig==1.1.1 # via pytest
isort==5.10.1 # via -r dev-requirements.in, gray
isort==5.11.2 # via -r dev-requirements.in, gray
jinja2==3.1.2 # via sphinx
jsonschema==4.17.3 # via fixit
libcst==0.4.9 # via fixit
@ -47,7 +47,7 @@ packaging==22.0 # via build, pyproject-api, pytest, sphinx, tox
pathspec==0.10.3 # via black
pep517==0.13.0 # via build
pep8-naming==0.13.2 # via -r dev-requirements.in
pip-tools==6.11.0 # via -r dev-requirements.in
pip-tools==6.12.0 # via -r dev-requirements.in
platformdirs==2.6.0 # via black, tox, virtualenv
pluggy==1.0.0 # via pytest, tox
pre-commit==2.20.0 # via -r dev-requirements.in
@ -74,7 +74,7 @@ sphinxcontrib-serializinghtml==1.1.5 # via sphinx
tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade
toml==0.10.2 # via autoflake, pre-commit
tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox
tox==4.0.8 # via -r dev-requirements.in
tox==4.0.9 # via -r dev-requirements.in
typing-extensions==4.4.0 # via black, libcst, mypy, typing-inspect
typing-inspect==0.8.0 # via libcst
unify==0.5 # via gray

View File

@ -6,7 +6,7 @@ from aprsd import plugin
LOG = logging.getLogger("APRSD")
class HelloPlugin(plugin.APRSDPluginBase):
class HelloPlugin(plugin.APRSDRegexCommandPluginBase):
"""Hello World."""
version = "1.0"
@ -14,7 +14,7 @@ class HelloPlugin(plugin.APRSDPluginBase):
command_regex = "^[hH]"
command_name = "hello"
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("HelloPlugin")
reply = f"Hello '{fromcall}'"
reply = f"Hello '{packet.from_call}'"
return reply

View File

@ -27,3 +27,5 @@ attrs==22.1.0
# for mobile checking
user-agents
pyopenssl
dataclasses
dacite2

View File

@ -17,6 +17,8 @@ click==8.1.3 # via -r requirements.in, click-completion, flask
click-completion==0.5.2 # via -r requirements.in
commonmark==0.9.1 # via rich
cryptography==38.0.4 # via pyopenssl
dacite2==2.0.0 # via -r requirements.in
dataclasses==0.6 # via -r requirements.in
dnspython==2.2.1 # via eventlet
eventlet==0.33.2 # via -r requirements.in
flask==2.1.2 # via -r requirements.in, flask-classful, flask-httpauth, flask-socketio

View File

@ -17,7 +17,10 @@ class TestDevTestPluginCommand(unittest.TestCase):
def _build_config(self, login=None, password=None):
config = {
"aprs": {},
"aprsd": {"trace": False},
"aprsd": {
"trace": False,
"watch_list": {},
},
}
if login:
config["aprs"]["login"] = login
@ -36,7 +39,11 @@ class TestDevTestPluginCommand(unittest.TestCase):
mock_parse_config.return_value = self._build_config()
result = runner.invoke(
cli, ["dev", "test-plugin", "bogus command"],
cli, [
"dev", "test-plugin",
"-p", "aprsd.plugins.version.VersionPlugin",
"bogus command",
],
catch_exceptions=False,
)
# rich.print(f"EXIT CODE {result.exit_code}")

View File

@ -17,7 +17,10 @@ class TestSendMessageCommand(unittest.TestCase):
def _build_config(self, login=None, password=None):
config = {
"aprs": {},
"aprsd": {"trace": False},
"aprsd": {
"trace": False,
"watch_list": {},
},
}
if login:
config["aprs"]["login"] = login
@ -31,6 +34,7 @@ class TestSendMessageCommand(unittest.TestCase):
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_login(self, mock_logging, mock_parse_config):
"""Make sure we get an error if there is no login and config."""
return
runner = CliRunner()
mock_parse_config.return_value = self._build_config()
@ -50,6 +54,7 @@ class TestSendMessageCommand(unittest.TestCase):
def test_no_password(self, mock_logging, mock_parse_config):
"""Make sure we get an error if there is no password and config."""
return
runner = CliRunner()
mock_parse_config.return_value = self._build_config(login="something")

View File

@ -7,8 +7,9 @@ import flask
import flask_socketio
from aprsd import config as aprsd_config
from aprsd import messaging, packets
from aprsd import packets
from aprsd.cmds import webchat # noqa
from aprsd.packets import core
from .. import fake
@ -63,12 +64,11 @@ class TestSendMessageCommand(unittest.TestCase):
self.assertIsInstance(socketio, flask_socketio.SocketIO)
self.assertIsInstance(flask_app, flask.Flask)
@mock.patch("aprsd.messaging.log_message")
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.messaging.MsgTrack.remove")
@mock.patch("aprsd.packets.tracker.PacketTrack.remove")
@mock.patch("aprsd.cmds.webchat.socketio.emit")
def test_process_ack_packet(
self, mock_parse_config, mock_log_message,
self, mock_parse_config,
mock_remove, mock_emit,
):
config = self._build_config()
@ -76,24 +76,23 @@ class TestSendMessageCommand(unittest.TestCase):
packet = fake.fake_packet(
message="blah",
msg_number=1,
message_format=packets.PACKET_TYPE_ACK,
message_format=core.PACKET_TYPE_ACK,
)
socketio = mock.MagicMock()
packets.PacketList(config=config)
messaging.MsgTrack(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
wcp = webchat.WebChatProcessPacketThread(config, packet, socketio)
wcp.process_ack_packet(packet)
mock_log_message.called_once()
mock_remove.called_once()
mock_emit.called_once()
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.packets.PacketList.add")
@mock.patch("aprsd.packets.PacketList.rx")
@mock.patch("aprsd.cmds.webchat.socketio.emit")
def test_process_non_ack_packet(
def test_process_our_message_packet(
self, mock_parse_config,
mock_packet_add,
mock_emit,
@ -103,15 +102,15 @@ class TestSendMessageCommand(unittest.TestCase):
packet = fake.fake_packet(
message="blah",
msg_number=1,
message_format=packets.PACKET_TYPE_MESSAGE,
message_format=core.PACKET_TYPE_MESSAGE,
)
socketio = mock.MagicMock()
packets.PacketList(config=config)
messaging.MsgTrack(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
wcp = webchat.WebChatProcessPacketThread(config, packet, socketio)
wcp.process_non_ack_packet(packet)
wcp.process_our_message_packet(packet)
mock_packet_add.called_once()
mock_emit.called_once()

View File

@ -1,4 +1,5 @@
from aprsd import packets, plugin, threads
from aprsd.packets import core
FAKE_MESSAGE_TEXT = "fake MeSSage"
@ -11,9 +12,9 @@ def fake_packet(
tocall=FAKE_TO_CALLSIGN,
message=None,
msg_number=None,
message_format=packets.PACKET_TYPE_MESSAGE,
message_format=core.PACKET_TYPE_MESSAGE,
):
packet = {
packet_dict = {
"from": fromcall,
"addresse": tocall,
"to": tocall,
@ -21,12 +22,12 @@ def fake_packet(
"raw": "",
}
if message:
packet["message_text"] = message
packet_dict["message_text"] = message
if msg_number:
packet["msgNo"] = msg_number
packet_dict["msgNo"] = str(msg_number)
return packet
return packets.Packet.factory(packet_dict)
class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase):

View File

@ -2,7 +2,7 @@ from unittest import mock
from aprsd import client
from aprsd import config as aprsd_config
from aprsd import messaging, packets
from aprsd import packets
from aprsd.plugins import notify as notify_plugin
from .. import fake, test_plugin
@ -28,7 +28,8 @@ class TestWatchListPlugin(test_plugin.TestPlugin):
default_wl = aprsd_config.DEFAULT_CONFIG_DICT["aprsd"]["watch_list"]
_config["ham"]["callsign"] = self.fromcall
_config["aprs"]["login"] = fake.FAKE_TO_CALLSIGN
_config["aprsd"]["callsign"] = self.fromcall
_config["aprs"]["login"] = self.fromcall
_config["services"]["aprs.fi"]["apiKey"] = "something"
# Set the watchlist specific config options
@ -62,7 +63,7 @@ class TestAPRSDWatchListPluginBase(TestWatchListPlugin):
msg_number=1,
)
actual = plugin.filter(packet)
expected = messaging.NULL_MESSAGE
expected = packets.NULL_MESSAGE
self.assertEqual(expected, actual)
@mock.patch("aprsd.client.ClientFactory", autospec=True)
@ -78,7 +79,7 @@ class TestAPRSDWatchListPluginBase(TestWatchListPlugin):
msg_number=1,
)
actual = plugin.filter(packet)
expected = messaging.NULL_MESSAGE
expected = packets.NULL_MESSAGE
self.assertEqual(expected, actual)
@ -94,7 +95,7 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
msg_number=1,
)
actual = plugin.filter(packet)
expected = messaging.NULL_MESSAGE
expected = packets.NULL_MESSAGE
self.assertEqual(expected, actual)
@mock.patch("aprsd.client.ClientFactory", autospec=True)
@ -109,7 +110,7 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
msg_number=1,
)
actual = plugin.filter(packet)
expected = messaging.NULL_MESSAGE
expected = packets.NULL_MESSAGE
self.assertEqual(expected, actual)
@mock.patch("aprsd.client.ClientFactory", autospec=True)
@ -130,7 +131,7 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
msg_number=1,
)
actual = plugin.filter(packet)
expected = messaging.NULL_MESSAGE
expected = packets.NULL_MESSAGE
self.assertEqual(expected, actual)
@mock.patch("aprsd.client.ClientFactory", autospec=True)
@ -152,7 +153,7 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
msg_number=1,
)
actual = plugin.filter(packet)
expected = messaging.NULL_MESSAGE
expected = packets.NULL_MESSAGE
self.assertEqual(expected, actual)
@mock.patch("aprsd.client.ClientFactory", autospec=True)
@ -160,7 +161,7 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
def test_callsign_in_watchlist_old_send_alert(self, mock_is_old, mock_factory):
client.factory = mock_factory
mock_is_old.return_value = True
notify_callsign = "KFAKE"
notify_callsign = fake.FAKE_TO_CALLSIGN
fromcall = "WB4BOR"
config = self._config(
watchlist_enabled=True,
@ -175,11 +176,11 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
message="ping",
msg_number=1,
)
packet_type = packets.get_packet_type(packet)
packet_type = packet.__class__.__name__
actual = plugin.filter(packet)
msg = f"{fromcall} was just seen by type:'{packet_type}'"
self.assertIsInstance(actual, messaging.TextMessage)
self.assertEqual(fake.FAKE_TO_CALLSIGN, actual.fromcall)
self.assertEqual(notify_callsign, actual.tocall)
self.assertEqual(msg, actual.message)
self.assertIsInstance(actual, packets.MessagePacket)
self.assertEqual(fake.FAKE_FROM_CALLSIGN, actual.from_call)
self.assertEqual(notify_callsign, actual.to_call)
self.assertEqual(msg, actual.message_text)

View File

@ -1,13 +1,14 @@
from unittest import mock
from aprsd import messaging
from aprsd import packets
from aprsd.packets import tracker
from aprsd.plugins import query as query_plugin
from .. import fake, test_plugin
class TestQueryPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.messaging.MsgTrack.flush")
@mock.patch("aprsd.packets.tracker.PacketTrack.flush")
def test_query_flush(self, mock_flush):
packet = fake.fake_packet(message="!delete")
query = query_plugin.QueryPlugin(self.config)
@ -17,9 +18,9 @@ class TestQueryPlugin(test_plugin.TestPlugin):
mock_flush.assert_called_once()
self.assertEqual(expected, actual)
@mock.patch("aprsd.messaging.MsgTrack.restart_delayed")
@mock.patch("aprsd.packets.tracker.PacketTrack.restart_delayed")
def test_query_restart_delayed(self, mock_restart):
track = messaging.MsgTrack()
track = tracker.PacketTrack()
track.data = {}
packet = fake.fake_packet(message="!4")
query = query_plugin.QueryPlugin(self.config)
@ -31,7 +32,11 @@ class TestQueryPlugin(test_plugin.TestPlugin):
mock_restart.reset_mock()
# add a message
msg = messaging.TextMessage(self.fromcall, "testing", self.ack)
track.add(msg)
pkt = packets.MessagePacket(
from_call=self.fromcall,
to_call="testing",
msgNo=self.ack,
)
track.add(pkt)
actual = query.filter(packet)
mock_restart.assert_called_once()

View File

@ -1,155 +0,0 @@
import datetime
import unittest
from unittest import mock
from aprsd import messaging
class TestMessageTrack(unittest.TestCase):
def setUp(self) -> None:
config = {}
messaging.MsgTrack(config=config)
def _clean_track(self):
track = messaging.MsgTrack()
track.data = {}
track.total_messages_tracked = 0
return track
def test_create(self):
track1 = messaging.MsgTrack()
track2 = messaging.MsgTrack()
self.assertEqual(track1, track2)
def test_add(self):
track = self._clean_track()
fromcall = "KFART"
tocall = "KHELP"
message = "somthing"
msg = messaging.TextMessage(fromcall, tocall, message)
track.add(msg)
self.assertEqual(msg, track.get(msg.id))
def test_remove(self):
track = self._clean_track()
fromcall = "KFART"
tocall = "KHELP"
message = "somthing"
msg = messaging.TextMessage(fromcall, tocall, message)
track.add(msg)
track.remove(msg.id)
self.assertEqual(None, track.get(msg.id))
def test_len(self):
"""Test getting length of tracked messages."""
track = self._clean_track()
fromcall = "KFART"
tocall = "KHELP"
message = "somthing"
msg = messaging.TextMessage(fromcall, tocall, message)
track.add(msg)
self.assertEqual(1, len(track))
msg2 = messaging.TextMessage(tocall, fromcall, message)
track.add(msg2)
self.assertEqual(2, len(track))
track.remove(msg.id)
self.assertEqual(1, len(track))
@mock.patch("aprsd.messaging.TextMessage.send")
def test__resend(self, mock_send):
"""Test the _resend method."""
track = self._clean_track()
fromcall = "KFART"
tocall = "KHELP"
message = "somthing"
msg = messaging.TextMessage(fromcall, tocall, message)
msg.last_send_attempt = 3
track.add(msg)
track._resend(msg)
msg.send.assert_called_with()
self.assertEqual(0, msg.last_send_attempt)
@mock.patch("aprsd.messaging.TextMessage.send")
def test_restart_delayed(self, mock_send):
"""Test the _resend method."""
track = self._clean_track()
fromcall = "KFART"
tocall = "KHELP"
message1 = "something"
message2 = "something another"
message3 = "something another again"
mock1_send = mock.MagicMock()
mock2_send = mock.MagicMock()
mock3_send = mock.MagicMock()
msg1 = messaging.TextMessage(fromcall, tocall, message1)
msg1.last_send_attempt = 3
msg1.last_send_time = datetime.datetime.now()
msg1.send = mock1_send
track.add(msg1)
msg2 = messaging.TextMessage(tocall, fromcall, message2)
msg2.last_send_attempt = 3
msg2.last_send_time = datetime.datetime.now()
msg2.send = mock2_send
track.add(msg2)
track.restart_delayed(count=None)
msg1.send.assert_called_once()
self.assertEqual(0, msg1.last_send_attempt)
msg2.send.assert_called_once()
self.assertEqual(0, msg2.last_send_attempt)
msg1.last_send_attempt = 3
msg1.send.reset_mock()
msg2.last_send_attempt = 3
msg2.send.reset_mock()
track.restart_delayed(count=1)
msg1.send.assert_not_called()
msg2.send.assert_called_once()
self.assertEqual(3, msg1.last_send_attempt)
self.assertEqual(0, msg2.last_send_attempt)
msg3 = messaging.TextMessage(tocall, fromcall, message3)
msg3.last_send_attempt = 3
msg3.last_send_time = datetime.datetime.now()
msg3.send = mock3_send
track.add(msg3)
msg1.last_send_attempt = 3
msg1.send.reset_mock()
msg2.last_send_attempt = 3
msg2.send.reset_mock()
msg3.last_send_attempt = 3
msg3.send.reset_mock()
track.restart_delayed(count=2)
msg1.send.assert_not_called()
msg2.send.assert_called_once()
msg3.send.assert_called_once()
self.assertEqual(3, msg1.last_send_attempt)
self.assertEqual(0, msg2.last_send_attempt)
self.assertEqual(0, msg3.last_send_attempt)
msg1.last_send_attempt = 3
msg1.send.reset_mock()
msg2.last_send_attempt = 3
msg2.send.reset_mock()
msg3.last_send_attempt = 3
msg3.send.reset_mock()
track.restart_delayed(count=2, most_recent=False)
msg1.send.assert_called_once()
msg2.send.assert_called_once()
msg3.send.assert_not_called()
self.assertEqual(0, msg1.last_send_attempt)
self.assertEqual(0, msg2.last_send_attempt)
self.assertEqual(3, msg3.last_send_attempt)

74
tests/test_packets.py Normal file
View File

@ -0,0 +1,74 @@
import unittest
from aprsd import packets
from aprsd.packets import core
from . import fake
class TestPluginBase(unittest.TestCase):
def _fake_dict(
self,
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
message=None,
msg_number=None,
message_format=core.PACKET_TYPE_MESSAGE,
):
packet_dict = {
"from": from_call,
"addresse": to_call,
"to": to_call,
"format": message_format,
"raw": "",
}
if message:
packet_dict["message_text"] = message
if msg_number:
packet_dict["msgNo"] = str(msg_number)
return packet_dict
def test_packet_construct(self):
pkt = packets.Packet(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
)
self.assertEqual(fake.FAKE_FROM_CALLSIGN, pkt.from_call)
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.to_call)
def test_packet_get_attr(self):
pkt = packets.Packet(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
)
self.assertEqual(
fake.FAKE_FROM_CALLSIGN,
pkt.get("from_call"),
)
def test_packet_factory(self):
pkt_dict = self._fake_dict()
pkt = packets.Packet.factory(pkt_dict)
self.assertIsInstance(pkt, packets.MessagePacket)
self.assertEqual(fake.FAKE_FROM_CALLSIGN, pkt.from_call)
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.to_call)
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.addresse)
pkt_dict["symbol"] = "_"
pkt_dict["weather"] = {
"wind_gust": 1.11,
"temperature": 32.01,
"humidity": 85,
"pressure": 1095.12,
"comment": "Home!",
}
pkt_dict["format"] = core.PACKET_TYPE_UNCOMPRESSED
pkt = packets.Packet.factory(pkt_dict)
self.assertIsInstance(pkt, packets.WeatherPacket)

View File

@ -2,7 +2,8 @@ import unittest
from unittest import mock
from aprsd import config as aprsd_config
from aprsd import messaging, packets, stats
from aprsd import packets, stats
from aprsd.packets import core
from . import fake
@ -18,7 +19,7 @@ class TestPlugin(unittest.TestCase):
stats.APRSDStats._instance = None
packets.WatchList._instance = None
packets.SeenList._instance = None
messaging.MsgTrack._instance = None
packets.PacketTrack._instance = None
self.config = None
def config_and_init(self, config=None):
@ -34,7 +35,7 @@ class TestPlugin(unittest.TestCase):
stats.APRSDStats(self.config)
packets.WatchList(config=self.config)
packets.SeenList(config=self.config)
messaging.MsgTrack(config=self.config)
packets.PacketTrack(config=self.config)
class TestPluginBase(TestPlugin):
@ -89,7 +90,7 @@ class TestPluginBase(TestPlugin):
packet = fake.fake_packet(
message="F",
message_format=packets.PACKET_TYPE_MICE,
message_format=core.PACKET_TYPE_MICE,
)
expected = None
actual = p.filter(packet)
@ -98,7 +99,7 @@ class TestPluginBase(TestPlugin):
packet = fake.fake_packet(
message="f",
message_format=packets.PACKET_TYPE_ACK,
message_format=core.PACKET_TYPE_ACK,
)
expected = None
actual = p.filter(packet)

View File

@ -2,7 +2,7 @@
minversion = 2.9.0
skipdist = True
skip_missing_interpreters = true
envlist = pep8,py{38,39}
envlist = pep8,py{39,310}
#requires = tox-pipenv
# pip==22.0.4
# pip-tools==5.4.0