From 94fb481014dea7960c0a029b3deb8828fea22ad8 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 15 Dec 2022 17:23:54 -0500 Subject: [PATCH] Reworked all packet processing This patch reworks all the packet processing to use the new Packets objects. Nuked all of the messaging classes. backwards incompatible changes all messaging.py classes are now gone and replaced by packets.py classes --- aprsd/aprsd.py | 4 +- aprsd/client.py | 7 +- aprsd/clients/aprsis.py | 18 -- aprsd/cmds/listen.py | 8 +- aprsd/cmds/send_message.py | 46 ++- aprsd/cmds/server.py | 12 +- aprsd/cmds/webchat.py | 49 +-- aprsd/messaging.py | 585 ----------------------------------- aprsd/packets.py | 385 ----------------------- aprsd/packets/__init__.py | 8 + aprsd/packets/core.py | 320 +++++++++++++++++++ aprsd/packets/packet_list.py | 60 ++++ aprsd/packets/seen_list.py | 44 +++ aprsd/packets/tracker.py | 116 +++++++ aprsd/packets/watch_list.py | 103 ++++++ aprsd/plugin.py | 17 +- aprsd/plugins/notify.py | 2 - aprsd/threads/aprsd.py | 2 + aprsd/threads/keep_alive.py | 6 +- aprsd/threads/rx.py | 120 ++++--- aprsd/threads/tx.py | 120 +++++++ aprsd/utils/counter.py | 48 +++ tox.ini | 2 +- 23 files changed, 976 insertions(+), 1106 deletions(-) delete mode 100644 aprsd/packets.py create mode 100644 aprsd/packets/__init__.py create mode 100644 aprsd/packets/core.py create mode 100644 aprsd/packets/packet_list.py create mode 100644 aprsd/packets/seen_list.py create mode 100644 aprsd/packets/tracker.py create mode 100644 aprsd/packets/watch_list.py create mode 100644 aprsd/threads/tx.py create mode 100644 aprsd/utils/counter.py diff --git a/aprsd/aprsd.py b/aprsd/aprsd.py index 7a0afb5..7c91d2e 100644 --- a/aprsd/aprsd.py +++ b/aprsd/aprsd.py @@ -34,7 +34,7 @@ import click_completion import aprsd from aprsd import cli_helper from aprsd import config as aprsd_config -from aprsd import messaging, packets, stats, threads, utils +from aprsd import packets, stats, threads, utils # setup the global logger @@ -85,7 +85,7 @@ def signal_handler(sig, frame): ), ) time.sleep(1.5) - messaging.MsgTrack().save() + packets.PacketTrack().save() packets.WatchList().save() packets.SeenList().save() LOG.info(stats.APRSDStats()) diff --git a/aprsd/client.py b/aprsd/client.py index 7664d62..4285990 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -8,6 +8,7 @@ from aprslib.exceptions import LoginError from aprsd import config as aprsd_config from aprsd import exception from aprsd.clients import aprsis, kiss +from aprsd.packets import core from aprsd.utils import trace @@ -109,7 +110,7 @@ class APRSISClient(Client): def decode_packet(self, *args, **kwargs): """APRS lib already decodes this.""" - return args[0] + return core.Packet.factory(args[0]) @trace.trace def setup_connection(self): @@ -198,8 +199,8 @@ class KISSClient(Client): # msg = frame.tnc2 LOG.debug(f"Decoding {msg}") - packet = aprslib.parse(msg) - return packet + raw = aprslib.parse(msg) + return core.Packet.factory(raw) @trace.trace def setup_connection(self): diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py index 635d27b..fc22008 100644 --- a/aprsd/clients/aprsis.py +++ b/aprsd/clients/aprsis.py @@ -1,6 +1,5 @@ import logging import select -import socket import threading import aprslib @@ -32,23 +31,6 @@ class Aprsdis(aprslib.IS): self.thread_stop = True LOG.info("Shutdown Aprsdis client.") - def is_socket_closed(self, sock: socket.socket) -> bool: - try: - # this will try to read bytes without blocking and also without removing them from buffer (peek only) - data = sock.recv(16, socket.MSG_DONTWAIT | socket.MSG_PEEK) - if len(data) == 0: - return True - except BlockingIOError: - return False # socket is open and reading from it would block - except ConnectionResetError: - return True # socket was closed for some other reason - except Exception: - self.logger.exception( - "unexpected exception when checking if a socket is closed", - ) - return False - return False - @wrapt.synchronized(lock) def send(self, msg): """Send an APRS Message object.""" diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 2945548..e889445 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -14,7 +14,7 @@ from rich.console import Console # local imports here import aprsd -from aprsd import cli_helper, client, messaging, packets, stats, threads, utils +from aprsd import cli_helper, client, packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.threads import rx @@ -39,9 +39,7 @@ def signal_handler(sig, frame): class APRSDListenThread(rx.APRSDRXThread): def process_packet(self, *args, **kwargs): - raw = self._client.decode_packet(*args, **kwargs) - packet = packets.Packet.factory(raw) - LOG.debug(f"Got packet {packet}") + packet = self._client.decode_packet(*args, **kwargs) packet.log(header="RX Packet") @@ -115,7 +113,7 @@ def listen( # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") - messaging.MsgTrack(config=config).load() + packets.PacketTrack(config=config).load() packets.WatchList(config=config).load() packets.SeenList(config=config).load() diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index ea2b234..08a236f 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError import click import aprsd -from aprsd import cli_helper, client, messaging, packets +from aprsd import cli_helper, client, packets from aprsd.aprsd import cli @@ -98,32 +98,22 @@ def send_message( def rx_packet(packet): global got_ack, got_response + cl = client.factory.create() + packet = cl.decode_packet(packet) + packet.log("RX_PKT") # LOG.debug("Got packet back {}".format(packet)) - resp = packet.get("response", None) - if resp == "ack": - ack_num = packet.get("msgNo") - LOG.info(f"We got ack for our sent message {ack_num}") - messaging.log_packet(packet) + if isinstance(packet, packets.AckPacket): got_ack = True else: - message = packet.get("message_text", None) - fromcall = packet["from"] - msg_number = packet.get("msgNo", "0") - messaging.log_message( - "Received Message", - packet["raw"], - message, - fromcall=fromcall, - ack=msg_number, - ) got_response = True - # Send the ack back? - ack = messaging.AckMessage( - config["aprs"]["login"], - fromcall, - msg_id=msg_number, + from_call = packet.from_call + our_call = config["aprsd"]["callsign"].lower() + ack_pkt = packets.AckPacket( + from_call=our_call, + to_call=from_call, + msgNo=packet.msgNo, ) - ack.send_direct() + ack_pkt.send_direct() if got_ack: if wait_response: @@ -144,12 +134,16 @@ def send_message( # we should bail after we get the ack and send an ack back for the # message if raw: - msg = messaging.RawMessage(raw) - msg.send_direct() + pkt = packets.Packet(from_call="", to_call="", raw=raw) + pkt.send_direct() sys.exit(0) else: - msg = messaging.TextMessage(aprs_login, tocallsign, command) - msg.send_direct() + pkt = packets.MessagePacket( + from_call=aprs_login, + to_call=tocallsign, + message_text=command, + ) + pkt.send_direct() if no_ack: sys.exit(0) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 70c47fd..4bb50cc 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -6,8 +6,7 @@ import click import aprsd from aprsd import ( - cli_helper, client, flask, messaging, packets, plugin, stats, threads, - utils, + cli_helper, client, flask, packets, plugin, stats, threads, utils, ) from aprsd import aprsd as aprsd_main from aprsd.aprsd import cli @@ -81,13 +80,15 @@ def server(ctx, flush): packets.PacketList(config=config) if flush: LOG.debug("Deleting saved MsgTrack.") - messaging.MsgTrack(config=config).flush() + #messaging.MsgTrack(config=config).flush() + packets.PacketTrack(config=config).flush() packets.WatchList(config=config) packets.SeenList(config=config) else: # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") - messaging.MsgTrack(config=config).load() + #messaging.MsgTrack(config=config).load() + packets.PacketTrack(config=config).load() packets.WatchList(config=config).load() packets.SeenList(config=config).load() @@ -102,7 +103,8 @@ def server(ctx, flush): ) rx_thread.start() - messaging.MsgTrack().restart() + #messaging.MsgTrack().restart() + packets.PacketTrack().restart() keepalive = threads.KeepAliveThread(config=config) keepalive.start() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 85a9fa7..fcd5fd7 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -22,7 +22,7 @@ import wrapt import aprsd from aprsd import cli_helper, client from aprsd import config as aprsd_config -from aprsd import messaging, packets, stats, threads, utils +from aprsd import packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.logging import rich as aprsd_logging from aprsd.threads import rx @@ -44,13 +44,11 @@ def signal_handler(sig, frame): threads.APRSDThreadList().stop_all() if "subprocess" not in str(frame): time.sleep(1.5) - # messaging.MsgTrack().save() # packets.WatchList().save() # packets.SeenList().save() LOG.info(stats.APRSDStats()) LOG.info("Telling flask to bail.") signal.signal(signal.SIGTERM, sys.exit(0)) - sys.exit(0) class SentMessages(objectstore.ObjectStoreMixin): @@ -67,11 +65,11 @@ class SentMessages(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def add(self, msg): - self.data[msg.id] = self.create(msg.id) - self.data[msg.id]["from"] = msg.fromcall - self.data[msg.id]["to"] = msg.tocall - self.data[msg.id]["message"] = msg.message.rstrip("\n") - self.data[msg.id]["raw"] = str(msg).rstrip("\n") + self.data[msg.msgNo] = self.create(msg.msgNo) + self.data[msg.msgNo]["from"] = msg.from_call + self.data[msg.msgNo]["to"] = msg.to_call + self.data[msg.msgNo]["message"] = msg.message_text.rstrip("\n") + self.data[msg.msgNo]["raw"] = msg.message_text.rstrip("\n") def create(self, id): return { @@ -344,21 +342,21 @@ class SendMessageNamespace(Namespace): LOG.debug(f"WS: on_send {data}") self.request = data data["from"] = self._config["aprs"]["login"] - msg = messaging.TextMessage( - data["from"], - data["to"].upper(), - data["message"], + pkt = packets.MessagePacket( + from_call=data["from"], + to_call=data["to"].upper(), + message_text=data["message"], ) - self.msg = msg + self.msg = pkt msgs = SentMessages() - msgs.add(msg) - msgs.set_status(msg.id, "Sending") - obj = msgs.get(self.msg.id) + msgs.add(pkt) + pkt.send() + msgs.set_status(pkt.msgNo, "Sending") + obj = msgs.get(pkt.msgNo) socketio.emit( "sent", obj, namespace="/sendmsg", ) - msg.send() def on_gps(self, data): LOG.debug(f"WS on_GPS: {data}") @@ -378,10 +376,17 @@ class SendMessageNamespace(Namespace): f":@{time_zulu}z{lat}/{long}l APRSD WebChat Beacon" ) - beacon_msg = messaging.RawMessage(txt) - beacon_msg.fromcall = self._config["aprs"]["login"] - beacon_msg.tocall = "APDW16" - beacon_msg.send_direct() + LOG.debug(f"Sending {txt}") + beacon = packets.GPSPacket( + from_call=self._config["aprs"]["login"], + to_call="APDW16", + raw=txt, + ) + beacon.send_direct() + #beacon_msg = messaging.RawMessage(txt) + #beacon_msg.fromcall = self._config["aprs"]["login"] + #beacon_msg.tocall = "APDW16" + #beacon_msg.send_direct() def handle_message(self, data): LOG.debug(f"WS Data {data}") @@ -534,7 +539,7 @@ def webchat(ctx, flush, port): sys.exit(-1) packets.PacketList(config=config) - messaging.MsgTrack(config=config) + packets.PacketTrack(config=config) packets.WatchList(config=config) packets.SeenList(config=config) diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 6f27002..3e25742 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,588 +1,3 @@ -import abc -import datetime -import logging -from multiprocessing import RawValue -import re -import threading -import time - -from aprsd import client, packets, stats, threads -from aprsd.utils import objectstore - - -LOG = logging.getLogger("APRSD") - # What to return from a plugin if we have processed the message # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 - - -class MsgTrack(objectstore.ObjectStoreMixin): - """Class to keep track of outstanding text messages. - - This is a thread safe class that keeps track of active - messages. - - When a message is asked to be sent, it is placed into this - class via it's id. The TextMessage class's send() method - automatically adds itself to this class. When the ack is - recieved from the radio, the message object is removed from - this class. - """ - - _instance = None - _start_time = None - lock = None - - data = {} - total_messages_tracked = 0 - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance.track = {} - cls._instance._start_time = datetime.datetime.now() - cls._instance.lock = threading.Lock() - cls._instance.config = kwargs["config"] - cls._instance._init_store() - return cls._instance - - def __getitem__(self, name): - with self.lock: - return self.data[name] - - def __iter__(self): - with self.lock: - return iter(self.data) - - def keys(self): - with self.lock: - return self.data.keys() - - def items(self): - with self.lock: - return self.data.items() - - def values(self): - with self.lock: - return self.data.values() - - def __len__(self): - with self.lock: - return len(self.data) - - def __str__(self): - with self.lock: - result = "{" - for key in self.data.keys(): - result += f"{key}: {str(self.data[key])}, " - result += "}" - return result - - def add(self, msg): - with self.lock: - key = int(msg.id) - self.data[key] = msg - stats.APRSDStats().msgs_tracked_inc() - self.total_messages_tracked += 1 - - def get(self, id): - with self.lock: - if id in self.data: - return self.data[id] - - def remove(self, id): - with self.lock: - key = int(id) - if key in self.data.keys(): - del self.data[key] - - def restart(self): - """Walk the list of messages and restart them if any.""" - - for key in self.data.keys(): - msg = self.data[key] - if msg.last_send_attempt < msg.retry_count: - msg.send() - - def _resend(self, msg): - msg.last_send_attempt = 0 - msg.send() - - def restart_delayed(self, count=None, most_recent=True): - """Walk the list of delayed messages and restart them if any.""" - if not count: - # Send all the delayed messages - for key in self.data.keys(): - msg = self.data[key] - if msg.last_send_attempt == msg.retry_count: - self._resend(msg) - else: - # They want to resend delayed messages - tmp = sorted( - self.data.items(), - reverse=most_recent, - key=lambda x: x[1].last_send_time, - ) - msg_list = tmp[:count] - for (_key, msg) in msg_list: - self._resend(msg) - - -class MessageCounter: - """ - Global message id counter class. - - This is a singleton based class that keeps - an incrementing counter for all messages to - be sent. All new Message objects gets a new - message id, which is the next number available - from the MessageCounter. - - """ - - _instance = None - max_count = 9999 - lock = None - - def __new__(cls, *args, **kwargs): - """Make this a singleton class.""" - if cls._instance is None: - cls._instance = super().__new__(cls, *args, **kwargs) - cls._instance.val = RawValue("i", 1) - cls._instance.lock = threading.Lock() - return cls._instance - - def increment(self): - with self.lock: - if self.val.value == self.max_count: - self.val.value = 1 - else: - self.val.value += 1 - - @property - def value(self): - with self.lock: - return self.val.value - - def __repr__(self): - with self.lock: - return str(self.val.value) - - def __str__(self): - with self.lock: - return str(self.val.value) - - -class Message(metaclass=abc.ABCMeta): - """Base Message Class.""" - - # The message id to send over the air - id = 0 - - retry_count = 3 - last_send_time = 0 - last_send_attempt = 0 - - transport = None - _raw_message = None - - def __init__( - self, - fromcall, - tocall, - msg_id=None, - allow_delay=True, - ): - self.fromcall = fromcall - self.tocall = tocall - if not msg_id: - c = MessageCounter() - c.increment() - msg_id = c.value - self.id = msg_id - - # do we try and save this message for later if we don't get - # an ack? Some messages we don't want to do this ever. - self.allow_delay = allow_delay - - @abc.abstractmethod - def send(self): - """Child class must declare.""" - - def _filter_for_send(self): - """Filter and format message string for FCC.""" - # max? ftm400 displays 64, raw msg shows 74 - # and ftm400-send is max 64. setting this to - # 67 displays 64 on the ftm400. (+3 {01 suffix) - # feature req: break long ones into two msgs - message = self._raw_message[:67] - # We all miss George Carlin - return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - - @property - def message(self): - return self._filter_for_send().rstrip("\n") - - def __str__(self): - return self.message - - -class RawMessage(Message): - """Send a raw message. - - This class is used for custom messages that contain the entire - contents of an APRS message in the message field. - - """ - - last_send_age = last_send_time = None - - def __init__(self, message, allow_delay=True): - super().__init__( - fromcall=None, tocall=None, msg_id=None, - allow_delay=allow_delay, - ) - self._raw_message = message - - def dict(self): - now = datetime.datetime.now() - last_send_age = None - if self.last_send_time: - last_send_age = str(now - self.last_send_time) - return { - "type": "raw", - "message": self.message, - "raw": str(self), - "retry_count": self.retry_count, - "last_send_attempt": self.last_send_attempt, - "last_send_time": str(self.last_send_time), - "last_send_age": last_send_age, - } - - def send(self): - tracker = MsgTrack() - tracker.add(self) - thread = SendMessageThread(message=self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send a message without a separate thread.""" - cl = client.factory.create().client - log_message( - "Sending Message Direct", - str(self), - self.message, - tocall=self.tocall, - fromcall=self.fromcall, - ) - cl.send(self) - stats.APRSDStats().msgs_tx_inc() - - -class TextMessage(Message): - """Send regular ARPS text/command messages/replies.""" - - last_send_time = last_send_age = None - - def __init__( - self, fromcall, tocall, message, - msg_id=None, allow_delay=True, - ): - super().__init__( - fromcall=fromcall, tocall=tocall, - msg_id=msg_id, allow_delay=allow_delay, - ) - self._raw_message = message - - def dict(self): - now = datetime.datetime.now() - - last_send_age = None - if self.last_send_time: - last_send_age = str(now - self.last_send_time) - - return { - "id": self.id, - "type": "text-message", - "fromcall": self.fromcall, - "tocall": self.tocall, - "message": self.message, - "raw": str(self), - "retry_count": self.retry_count, - "last_send_attempt": self.last_send_attempt, - "last_send_time": str(self.last_send_time), - "last_send_age": last_send_age, - } - - def __str__(self): - """Build raw string to send over the air.""" - return "{}>APZ100::{}:{}{{{}\n".format( - self.fromcall, - self.tocall.ljust(9), - self.message, - str(self.id), - ) - - def send(self): - tracker = MsgTrack() - tracker.add(self) - LOG.debug(f"Length of MsgTrack is {len(tracker)}") - thread = SendMessageThread(message=self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send a message without a separate thread.""" - if aprsis_client: - cl = aprsis_client - else: - cl = client.factory.create().client - log_message( - "Sending Message Direct", - str(self), - self.message, - tocall=self.tocall, - fromcall=self.fromcall, - ) - cl.send(self) - stats.APRSDStats().msgs_tx_inc() - pkt_dict = self.dict().copy() - pkt_dict["from"] = pkt_dict["fromcall"] - pkt_dict["to"] = pkt_dict["tocall"] - packet = packets.Packet.factory(pkt_dict) - packets.PacketList().add(packet) - - -class SendMessageThread(threads.APRSDThread): - def __init__(self, message): - self.msg = message - name = self.msg._raw_message[:5] - super().__init__(f"TXPKT-{self.msg.id}-{name}") - - def loop(self): - """Loop until a message is acked or it gets delayed. - - We only sleep for 5 seconds between each loop run, so - that CTRL-C can exit the app in a short period. Each sleep - means the app quitting is blocked until sleep is done. - So we keep track of the last send attempt and only send if the - last send attempt is old enough. - - """ - tracker = MsgTrack() - # lets see if the message is still in the tracking queue - msg = tracker.get(self.msg.id) - if not msg: - # The message has been removed from the tracking queue - # So it got acked and we are done. - LOG.info("Message Send Complete via Ack.") - return False - else: - send_now = False - if msg.last_send_attempt == msg.retry_count: - # we reached the send limit, don't send again - # TODO(hemna) - Need to put this in a delayed queue? - LOG.info("Message Send Complete. Max attempts reached.") - if not msg.allow_delay: - tracker.remove(msg.id) - return False - - # Message is still outstanding and needs to be acked. - if msg.last_send_time: - # Message has a last send time tracking - now = datetime.datetime.now() - sleeptime = (msg.last_send_attempt + 1) * 31 - delta = now - msg.last_send_time - if delta > datetime.timedelta(seconds=sleeptime): - # It's time to try to send it again - send_now = True - else: - send_now = True - - if send_now: - # no attempt time, so lets send it, and start - # tracking the time. - log_message( - "Sending Message", - str(msg), - msg.message, - tocall=self.msg.tocall, - retry_number=msg.last_send_attempt, - msg_num=msg.id, - ) - cl = client.factory.create().client - cl.send(msg) - stats.APRSDStats().msgs_tx_inc() - packets.PacketList().add(msg.dict()) - msg.last_send_time = datetime.datetime.now() - msg.last_send_attempt += 1 - - time.sleep(5) - # Make sure we get called again. - return True - - -class AckMessage(Message): - """Class for building Acks and sending them.""" - - def __init__(self, fromcall, tocall, msg_id): - super().__init__(fromcall, tocall, msg_id=msg_id) - - def dict(self): - now = datetime.datetime.now() - last_send_age = None - if self.last_send_time: - last_send_age = str(now - self.last_send_time) - return { - "id": self.id, - "type": "ack", - "fromcall": self.fromcall, - "tocall": self.tocall, - "raw": str(self).rstrip("\n"), - "retry_count": self.retry_count, - "last_send_attempt": self.last_send_attempt, - "last_send_time": str(self.last_send_time), - "last_send_age": last_send_age, - } - - def __str__(self): - return "{}>APZ100::{}:ack{}\n".format( - self.fromcall, - self.tocall.ljust(9), - self.id, - ) - - def _filter_for_send(self): - return f"ack{self.id}" - - def send(self): - LOG.debug(f"Send ACK({self.tocall}:{self.id}) to radio.") - thread = SendAckThread(self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send an ack message without a separate thread.""" - if aprsis_client: - cl = aprsis_client - else: - cl = client.factory.create().client - log_message( - "Sending ack", - str(self).rstrip("\n"), - None, - ack=self.id, - tocall=self.tocall, - fromcall=self.fromcall, - ) - cl.send(self) - - -class SendAckThread(threads.APRSDThread): - def __init__(self, ack): - self.ack = ack - super().__init__(f"SendAck-{self.ack.id}") - - def loop(self): - """Separate thread to send acks with retries.""" - send_now = False - if self.ack.last_send_attempt == self.ack.retry_count: - # we reached the send limit, don't send again - # TODO(hemna) - Need to put this in a delayed queue? - LOG.info("Ack Send Complete. Max attempts reached.") - return False - - if self.ack.last_send_time: - # Message has a last send time tracking - now = datetime.datetime.now() - - # aprs duplicate detection is 30 secs? - # (21 only sends first, 28 skips middle) - sleeptime = 31 - delta = now - self.ack.last_send_time - if delta > datetime.timedelta(seconds=sleeptime): - # It's time to try to send it again - send_now = True - else: - LOG.debug(f"Still wating. {delta}") - else: - send_now = True - - if send_now: - cl = client.factory.create().client - log_message( - "Sending ack", - str(self.ack).rstrip("\n"), - None, - ack=self.ack.id, - tocall=self.ack.tocall, - retry_number=self.ack.last_send_attempt, - ) - cl.send(self.ack) - stats.APRSDStats().ack_tx_inc() - packets.PacketList().add(self.ack.dict()) - self.ack.last_send_attempt += 1 - self.ack.last_send_time = datetime.datetime.now() - time.sleep(5) - return True - - -def log_packet(packet): - fromcall = packet.get("from", None) - tocall = packet.get("to", None) - - response_type = packet.get("response", None) - msg = packet.get("message_text", None) - msg_num = packet.get("msgNo", None) - ack = packet.get("ack", None) - - log_message( - "Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall, - ack=ack, packet_type=response_type, msg_num=msg_num, ) - - -def log_message( - header, raw, message, tocall=None, fromcall=None, msg_num=None, - retry_number=None, ack=None, packet_type=None, uuid=None, - console=None, -): - """ - - Log a message entry. - - This builds a long string with newlines for the log entry, so that - it's thread safe. If we log each item as a separate log.debug() call - Then the message information could get multiplexed with other log - messages. Each python log call is automatically synchronized. - - - """ - - log_list = [""] - if retry_number: - log_list.append(f"{header} _______________(TX:{retry_number})") - else: - log_list.append(f"{header} _______________") - - log_list.append(f" Raw : {raw}") - - if packet_type: - log_list.append(f" Packet : {packet_type}") - if tocall: - log_list.append(f" To : {tocall}") - if fromcall: - log_list.append(f" From : {fromcall}") - - if ack: - log_list.append(f" Ack : {ack}") - else: - log_list.append(f" Message : {message}") - if msg_num: - log_list.append(f" Msg # : {msg_num}") - if uuid: - log_list.append(f" UUID : {uuid}") - log_list.append(f"{header} _______________ Complete") - - if console: - console.log("\n".join(log_list)) - else: - LOG.info("\n".join(log_list)) diff --git a/aprsd/packets.py b/aprsd/packets.py deleted file mode 100644 index 629beea..0000000 --- a/aprsd/packets.py +++ /dev/null @@ -1,385 +0,0 @@ -from dataclasses import asdict, dataclass, field -import datetime -import logging -import threading -import time -# Due to a failure in python 3.8 -from typing import List - -import dacite -import wrapt - -from aprsd import utils -from aprsd.utils import objectstore - - -LOG = logging.getLogger("APRSD") - -PACKET_TYPE_MESSAGE = "message" -PACKET_TYPE_ACK = "ack" -PACKET_TYPE_MICE = "mic-e" -PACKET_TYPE_WX = "weather" -PACKET_TYPE_UNKNOWN = "unknown" -PACKET_TYPE_STATUS = "status" -PACKET_TYPE_BEACON = "beacon" -PACKET_TYPE_UNCOMPRESSED = "uncompressed" - - -@dataclass -class Packet: - from_call: str - to_call: str - addresse: str = None - format: str = None - msgNo: str = None # noqa: N815 - packet_type: str = None - timestamp: float = field(default_factory=time.time) - raw: str = None - _raw_dict: dict = field(repr=True, default_factory=lambda: {}) - - def get(self, key, default=None): - """Emulate a getter on a dict.""" - if hasattr(self, key): - return getattr(self, key) - else: - return default - - @staticmethod - def factory(raw_packet): - raw = raw_packet.copy() - raw["_raw_dict"] = raw.copy() - translate_fields = { - "from": "from_call", - "to": "to_call", - } - # First translate some fields - for key in translate_fields: - if key in raw: - raw[translate_fields[key]] = raw[key] - del raw[key] - - if "addresse" in raw: - raw["to_call"] = raw["addresse"] - - packet_type = get_packet_type(raw) - raw["packet_type"] = packet_type - class_name = TYPE_LOOKUP[packet_type] - if packet_type == PACKET_TYPE_UNKNOWN: - # Try and figure it out here - if "latitude" in raw: - class_name = GPSPacket - - if packet_type == PACKET_TYPE_WX: - # the weather information is in a dict - # this brings those values out to the outer dict - for key in raw["weather"]: - raw[key] = raw["weather"][key] - - return dacite.from_dict(data_class=class_name, data=raw) - - def log(self, header=None): - """LOG a packet to the logfile.""" - asdict(self) - log_list = ["\n"] - if header: - log_list.append(f"{header} _______________") - log_list.append(f" Packet : {self.__class__.__name__}") - log_list.append(f" Raw : {self.raw}") - if self.to_call: - log_list.append(f" To : {self.to_call}") - if self.from_call: - log_list.append(f" From : {self.from_call}") - if hasattr(self, "path"): - log_list.append(f" Path : {'=>'.join(self.path)}") - if hasattr(self, "via"): - log_list.append(f" VIA : {self.via}") - - elif isinstance(self, MessagePacket): - log_list.append(f" Message : {self.message_text}") - - if self.msgNo: - log_list.append(f" Msg # : {self.msgNo}") - log_list.append(f"{header} _______________ Complete") - - LOG.info("\n".join(log_list)) - LOG.debug(self) - - -@dataclass -class PathPacket(Packet): - path: List[str] = field(default_factory=list) - via: str = None - - -@dataclass -class AckPacket(PathPacket): - response: str = None - - -@dataclass -class MessagePacket(PathPacket): - message_text: str = None - - -@dataclass -class StatusPacket(PathPacket): - status: str = None - timestamp: int = 0 - messagecapable: bool = False - comment: str = None - - -@dataclass -class GPSPacket(PathPacket): - latitude: float = 0.00 - longitude: float = 0.00 - altitude: float = 0.00 - rng: float = 0.00 - posambiguity: int = 0 - timestamp: int = 0 - comment: str = None - symbol: str = None - symbol_table: str = None - speed: float = 0.00 - course: int = 0 - - -@dataclass -class MicEPacket(GPSPacket): - messagecapable: bool = False - mbits: str = None - mtype: str = None - - -@dataclass -class WeatherPacket(GPSPacket): - symbol: str = "_" - wind_gust: float = 0.00 - temperature: float = 0.00 - rain_1h: float = 0.00 - rain_24h: float = 0.00 - rain_since_midnight: float = 0.00 - humidity: int = 0 - pressure: float = 0.00 - comment: str = None - - -class PacketList: - """Class to track all of the packets rx'd and tx'd by aprsd.""" - - _instance = None - lock = threading.Lock() - config = None - - packet_list = {} - - total_recv = 0 - total_tx = 0 - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance.packet_list = utils.RingBuffer(1000) - cls._instance.config = kwargs["config"] - return cls._instance - - def __init__(self, config=None): - if config: - self.config = config - - @wrapt.synchronized(lock) - def __iter__(self): - return iter(self.packet_list) - - @wrapt.synchronized(lock) - def add(self, packet: Packet): - packet.ts = time.time() - if (packet.from_call == self.config["aprs"]["login"]): - self.total_tx += 1 - else: - self.total_recv += 1 - self.packet_list.append(packet) - SeenList().update_seen(packet) - - @wrapt.synchronized(lock) - def get(self): - return self.packet_list.get() - - @wrapt.synchronized(lock) - def total_received(self): - return self.total_recv - - @wrapt.synchronized(lock) - def total_sent(self): - return self.total_tx - - -class WatchList(objectstore.ObjectStoreMixin): - """Global watch list and info for callsigns.""" - - _instance = None - lock = threading.Lock() - data = {} - config = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - if "config" in kwargs: - cls._instance.config = kwargs["config"] - cls._instance._init_store() - cls._instance.data = {} - return cls._instance - - def __init__(self, config=None): - if config: - self.config = config - - ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10) - - for callsign in config["aprsd"]["watch_list"].get("callsigns", []): - call = callsign.replace("*", "") - # FIXME(waboring) - we should fetch the last time we saw - # a beacon from a callsign or some other mechanism to find - # last time a message was seen by aprs-is. For now this - # is all we can do. - self.data[call] = { - "last": datetime.datetime.now(), - "packets": utils.RingBuffer( - ring_size, - ), - } - - def is_enabled(self): - if self.config and "watch_list" in self.config["aprsd"]: - return self.config["aprsd"]["watch_list"].get("enabled", False) - else: - return False - - def callsign_in_watchlist(self, callsign): - return callsign in self.data - - @wrapt.synchronized(lock) - def update_seen(self, packet): - if packet.addresse: - callsign = packet.addresse - else: - callsign = packet.from_call - if self.callsign_in_watchlist(callsign): - self.data[callsign]["last"] = datetime.datetime.now() - self.data[callsign]["packets"].append(packet) - - def last_seen(self, callsign): - if self.callsign_in_watchlist(callsign): - return self.data[callsign]["last"] - - def age(self, callsign): - now = datetime.datetime.now() - return str(now - self.last_seen(callsign)) - - def max_delta(self, seconds=None): - watch_list_conf = self.config["aprsd"]["watch_list"] - if not seconds: - seconds = watch_list_conf["alert_time_seconds"] - max_timeout = {"seconds": seconds} - return datetime.timedelta(**max_timeout) - - def is_old(self, callsign, seconds=None): - """Watch list callsign last seen is old compared to now? - - This tests to see if the last time we saw a callsign packet, - if that is older than the allowed timeout in the config. - - We put this here so any notification plugin can use this - same test. - """ - age = self.age(callsign) - - delta = utils.parse_delta_str(age) - d = datetime.timedelta(**delta) - - max_delta = self.max_delta(seconds=seconds) - - if d > max_delta: - return True - else: - return False - - -class SeenList(objectstore.ObjectStoreMixin): - """Global callsign seen list.""" - - _instance = None - lock = threading.Lock() - data = {} - config = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - if "config" in kwargs: - cls._instance.config = kwargs["config"] - cls._instance._init_store() - cls._instance.data = {} - return cls._instance - - @wrapt.synchronized(lock) - def update_seen(self, packet: Packet): - callsign = None - if packet.from_call: - callsign = packet.from_call - else: - LOG.warning(f"Can't find FROM in packet {packet}") - return - if callsign not in self.data: - self.data[callsign] = { - "last": None, - "count": 0, - } - self.data[callsign]["last"] = str(datetime.datetime.now()) - self.data[callsign]["count"] += 1 - - -TYPE_LOOKUP = { - PACKET_TYPE_WX: WeatherPacket, - PACKET_TYPE_MESSAGE: MessagePacket, - PACKET_TYPE_ACK: AckPacket, - PACKET_TYPE_MICE: MicEPacket, - PACKET_TYPE_STATUS: StatusPacket, - PACKET_TYPE_BEACON: GPSPacket, - PACKET_TYPE_UNKNOWN: Packet, -} - - -def get_packet_type(packet: dict): - """Decode the packet type from the packet.""" - - format = packet.get("format", None) - msg_response = packet.get("response", None) - packet_type = "unknown" - if format == "message" and msg_response == "ack": - packet_type = PACKET_TYPE_ACK - elif format == "message": - packet_type = PACKET_TYPE_MESSAGE - elif format == "mic-e": - packet_type = PACKET_TYPE_MICE - elif format == "status": - packet_type = PACKET_TYPE_STATUS - elif format == PACKET_TYPE_BEACON: - packet_type = PACKET_TYPE_BEACON - elif format == PACKET_TYPE_UNCOMPRESSED: - if packet.get("symbol", None) == "_": - packet_type = PACKET_TYPE_WX - return packet_type - - -def is_message_packet(packet): - return get_packet_type(packet) == PACKET_TYPE_MESSAGE - - -def is_ack_packet(packet): - return get_packet_type(packet) == PACKET_TYPE_ACK - - -def is_mice_packet(packet): - return get_packet_type(packet) == PACKET_TYPE_MICE diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py new file mode 100644 index 0000000..2edb578 --- /dev/null +++ b/aprsd/packets/__init__.py @@ -0,0 +1,8 @@ +from aprsd.packets.core import ( + AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket, + StatusPacket, WeatherPacket, +) +from aprsd.packets.packet_list import PacketList +from aprsd.packets.seen_list import SeenList +from aprsd.packets.tracker import PacketTrack +from aprsd.packets.watch_list import WatchList diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py new file mode 100644 index 0000000..383f039 --- /dev/null +++ b/aprsd/packets/core.py @@ -0,0 +1,320 @@ +import abc +from dataclasses import asdict, dataclass, field +import logging +import re +import time +# Due to a failure in python 3.8 +from typing import List + +import dacite + +from aprsd import client, stats +from aprsd.threads import tx +from aprsd.utils import counter + + +LOG = logging.getLogger("APRSD") + +PACKET_TYPE_MESSAGE = "message" +PACKET_TYPE_ACK = "ack" +PACKET_TYPE_MICE = "mic-e" +PACKET_TYPE_WX = "weather" +PACKET_TYPE_UNKNOWN = "unknown" +PACKET_TYPE_STATUS = "status" +PACKET_TYPE_BEACON = "beacon" +PACKET_TYPE_UNCOMPRESSED = "uncompressed" + + +@dataclass() +class Packet(metaclass=abc.ABCMeta): + from_call: str + to_call: str + addresse: str = None + format: str = None + msgNo: str = None # noqa: N815 + packet_type: str = None + timestamp: float = field(default_factory=time.time) + raw: str = None + _raw_dict: dict = field(repr=False, default_factory=lambda: {}) + _retry_count = 3 + _last_send_time = 0 + _last_send_attempt = 0 + # Do we allow this packet to be saved to send later? + _allow_delay = True + + _transport = None + _raw_message = None + + def get(self, key, default=None): + """Emulate a getter on a dict.""" + if hasattr(self, key): + return getattr(self, key) + else: + return default + + def _init_for_send(self): + """Do stuff here that is needed prior to sending over the air.""" + if not self.msgNo: + c = counter.PacketCounter() + c.increment() + self.msgNo = c.value + + # now build the raw message for sending + self._build_raw() + + def _build_raw(self): + """Build the self.raw string which is what is sent over the air.""" + self.raw = self._filter_for_send().rstrip("\n") + + @staticmethod + def factory(raw_packet): + raw = raw_packet + raw["_raw_dict"] = raw.copy() + translate_fields = { + "from": "from_call", + "to": "to_call", + } + # First translate some fields + for key in translate_fields: + if key in raw: + raw[translate_fields[key]] = raw[key] + del raw[key] + + if "addresse" in raw: + raw["to_call"] = raw["addresse"] + + packet_type = get_packet_type(raw) + raw["packet_type"] = packet_type + class_name = TYPE_LOOKUP[packet_type] + if packet_type == PACKET_TYPE_UNKNOWN: + # Try and figure it out here + if "latitude" in raw: + class_name = GPSPacket + + if packet_type == PACKET_TYPE_WX: + # the weather information is in a dict + # this brings those values out to the outer dict + for key in raw["weather"]: + raw[key] = raw["weather"][key] + + return dacite.from_dict(data_class=class_name, data=raw) + + def log(self, header=None): + """LOG a packet to the logfile.""" + asdict(self) + log_list = ["\n"] + if header: + if isinstance(self, AckPacket): + log_list.append( + f"{header} ___________" + f"(TX:{self._send_count} of {self._retry_count})", + ) + else: + log_list.append(f"{header} _______________") + log_list.append(f" Packet : {self.__class__.__name__}") + log_list.append(f" Raw : {self.raw}") + if self.to_call: + log_list.append(f" To : {self.to_call}") + if self.from_call: + log_list.append(f" From : {self.from_call}") + if hasattr(self, "path") and self.path: + log_list.append(f" Path : {'=>'.join(self.path)}") + if hasattr(self, "via") and self.via: + log_list.append(f" VIA : {self.via}") + + elif isinstance(self, MessagePacket): + log_list.append(f" Message : {self.message_text}") + + if hasattr(self, "comment") and self.comment: + log_list.append(f" Comment : {self.comment}") + + if self.msgNo: + log_list.append(f" Msg # : {self.msgNo}") + log_list.append(f"{header} _______________ Complete") + + LOG.info("\n".join(log_list)) + LOG.debug(self) + + def _filter_for_send(self) -> str: + """Filter and format message string for FCC.""" + # max? ftm400 displays 64, raw msg shows 74 + # and ftm400-send is max 64. setting this to + # 67 displays 64 on the ftm400. (+3 {01 suffix) + # feature req: break long ones into two msgs + message = self.raw[:67] + # We all miss George Carlin + return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) + + def send(self): + """Method to send a packet.""" + LOG.warning("send() called!") + self._init_for_send() + thread = tx.SendPacketThread(packet=self) + LOG.warning(f"Starting thread to TX {self}") + thread.start() + LOG.warning("Thread started") + + def send_direct(self, aprsis_client=None): + """Send the message in the same thread as caller.""" + self._init_for_send() + if aprsis_client: + cl = aprsis_client + else: + cl = client.factory.create().client + self.log(header="Sending Message Direct") + cl.send(self.raw) + stats.APRSDStats().msgs_tx_inc() + + +@dataclass() +class PathPacket(Packet): + path: List[str] = field(default_factory=list) + via: str = None + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class AckPacket(PathPacket): + response: str = None + _send_count = 1 + + def _build_raw(self): + """Build the self.raw which is what is sent over the air.""" + self.raw = "{}>APZ100::{}:ack{}".format( + self.from_call, + self.to_call.ljust(9), + self.msgNo, + ) + + def send(self): + """Method to send a packet.""" + self._init_for_send() + thread = tx.SendAckThread(packet=self) + LOG.warning(f"Starting thread to TXACK {self}") + thread.start() + + +@dataclass() +class MessagePacket(PathPacket): + message_text: str = None + + def _filter_for_send(self) -> str: + """Filter and format message string for FCC.""" + # max? ftm400 displays 64, raw msg shows 74 + # and ftm400-send is max 64. setting this to + # 67 displays 64 on the ftm400. (+3 {01 suffix) + # feature req: break long ones into two msgs + message = self.message_text[:67] + # We all miss George Carlin + return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) + + def _build_raw(self): + """Build the self.raw which is what is sent over the air.""" + self.raw = "{}>APZ100::{}:{}{{{}".format( + self.from_call, + self.to_call.ljust(9), + self._filter_for_send().rstrip("\n"), + str(self.msgNo), + ) + + +@dataclass() +class StatusPacket(PathPacket): + status: str = None + timestamp: int = 0 + messagecapable: bool = False + comment: str = None + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class GPSPacket(PathPacket): + latitude: float = 0.00 + longitude: float = 0.00 + altitude: float = 0.00 + rng: float = 0.00 + posambiguity: int = 0 + timestamp: int = 0 + comment: str = None + symbol: str = None + symbol_table: str = None + speed: float = 0.00 + course: int = 0 + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class MicEPacket(GPSPacket): + messagecapable: bool = False + mbits: str = None + mtype: str = None + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class WeatherPacket(GPSPacket): + symbol: str = "_" + wind_gust: float = 0.00 + temperature: float = 0.00 + rain_1h: float = 0.00 + rain_24h: float = 0.00 + rain_since_midnight: float = 0.00 + humidity: int = 0 + pressure: float = 0.00 + comment: str = None + + def _build_raw(self): + raise NotImplementedError + + +TYPE_LOOKUP = { + PACKET_TYPE_WX: WeatherPacket, + PACKET_TYPE_MESSAGE: MessagePacket, + PACKET_TYPE_ACK: AckPacket, + PACKET_TYPE_MICE: MicEPacket, + PACKET_TYPE_STATUS: StatusPacket, + PACKET_TYPE_BEACON: GPSPacket, + PACKET_TYPE_UNKNOWN: Packet, +} + + +def get_packet_type(packet: dict): + """Decode the packet type from the packet.""" + + pkt_format = packet.get("format", None) + msg_response = packet.get("response", None) + packet_type = "unknown" + if pkt_format == "message" and msg_response == "ack": + packet_type = PACKET_TYPE_ACK + elif pkt_format == "message": + packet_type = PACKET_TYPE_MESSAGE + elif pkt_format == "mic-e": + packet_type = PACKET_TYPE_MICE + elif pkt_format == "status": + packet_type = PACKET_TYPE_STATUS + elif pkt_format == PACKET_TYPE_BEACON: + packet_type = PACKET_TYPE_BEACON + elif pkt_format == PACKET_TYPE_UNCOMPRESSED: + if packet.get("symbol", None) == "_": + packet_type = PACKET_TYPE_WX + return packet_type + + +def is_message_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MESSAGE + + +def is_ack_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_ACK + + +def is_mice_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MICE diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py new file mode 100644 index 0000000..89c1fd4 --- /dev/null +++ b/aprsd/packets/packet_list.py @@ -0,0 +1,60 @@ +import logging +import threading +import time + +import wrapt + +from aprsd import utils +from aprsd.packets import seen_list + + +LOG = logging.getLogger("APRSD") + + +class PacketList: + """Class to track all of the packets rx'd and tx'd by aprsd.""" + + _instance = None + lock = threading.Lock() + config = None + + packet_list = utils.RingBuffer(1000) + + total_recv = 0 + total_tx = 0 + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.config = kwargs["config"] + return cls._instance + + def __init__(self, config=None): + if config: + self.config = config + + @wrapt.synchronized(lock) + def __iter__(self): + return iter(self.packet_list) + + @wrapt.synchronized(lock) + def add(self, packet): + packet.ts = time.time() + if packet.from_call == self.config["aprs"]["login"]: + self.total_tx += 1 + else: + self.total_recv += 1 + self.packet_list.append(packet) + seen_list.SeenList().update_seen(packet) + + @wrapt.synchronized(lock) + def get(self): + return self.packet_list.get() + + @wrapt.synchronized(lock) + def total_received(self): + return self.total_recv + + @wrapt.synchronized(lock) + def total_sent(self): + return self.total_tx diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py new file mode 100644 index 0000000..3c9e1bf --- /dev/null +++ b/aprsd/packets/seen_list.py @@ -0,0 +1,44 @@ +import datetime +import logging +import threading + +import wrapt + +from aprsd.utils import objectstore + + +LOG = logging.getLogger("APRSD") + + +class SeenList(objectstore.ObjectStoreMixin): + """Global callsign seen list.""" + + _instance = None + lock = threading.Lock() + data = {} + config = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + if "config" in kwargs: + cls._instance.config = kwargs["config"] + cls._instance._init_store() + cls._instance.data = {} + return cls._instance + + @wrapt.synchronized(lock) + def update_seen(self, packet): + callsign = None + if packet.from_call: + callsign = packet.from_call + else: + LOG.warning(f"Can't find FROM in packet {packet}") + return + if callsign not in self.data: + self.data[callsign] = { + "last": None, + "count": 0, + } + self.data[callsign]["last"] = str(datetime.datetime.now()) + self.data[callsign]["count"] += 1 diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py new file mode 100644 index 0000000..82ff3f3 --- /dev/null +++ b/aprsd/packets/tracker.py @@ -0,0 +1,116 @@ +import datetime +import threading + +import wrapt + +from aprsd import stats +from aprsd.utils import objectstore + + +class PacketTrack(objectstore.ObjectStoreMixin): + """Class to keep track of outstanding text messages. + + This is a thread safe class that keeps track of active + messages. + + When a message is asked to be sent, it is placed into this + class via it's id. The TextMessage class's send() method + automatically adds itself to this class. When the ack is + recieved from the radio, the message object is removed from + this class. + """ + + _instance = None + _start_time = None + lock = threading.Lock() + + data = {} + total_tracked = 0 + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._start_time = datetime.datetime.now() + cls._instance.config = kwargs["config"] + cls._instance._init_store() + return cls._instance + + @wrapt.synchronized(lock) + def __getitem__(self, name): + return self.data[name] + + @wrapt.synchronized(lock) + def __iter__(self): + return iter(self.data) + + @wrapt.synchronized(lock) + def keys(self): + return self.data.keys() + + @wrapt.synchronized(lock) + def items(self): + return self.data.items() + + @wrapt.synchronized(lock) + def values(self): + return self.data.values() + + @wrapt.synchronized(lock) + def __len__(self): + return len(self.data) + + @wrapt.synchronized(lock) + def __str__(self): + result = "{" + for key in self.data.keys(): + result += f"{key}: {str(self.data[key])}, " + result += "}" + return result + + @wrapt.synchronized(lock) + def add(self, packet): + key = int(packet.msgNo) + self.data[key] = packet + stats.APRSDStats().msgs_tracked_inc() + self.total_tracked += 1 + + @wrapt.synchronized(lock) + def get(self, id): + if id in self.data: + return self.data[id] + + @wrapt.synchronized(lock) + def remove(self, id): + key = int(id) + if key in self.data.keys(): + del self.data[key] + + def restart(self): + """Walk the list of messages and restart them if any.""" + for key in self.data.keys(): + pkt = self.data[key] + if pkt.last_send_attempt < pkt.retry_count: + pkt.send() + + def _resend(self, packet): + packet._last_send_attempt = 0 + packet.send() + + def restart_delayed(self, count=None, most_recent=True): + """Walk the list of delayed messages and restart them if any.""" + if not count: + # Send all the delayed messages + for key in self.data.keys(): + pkt = self.data[key] + if pkt._last_send_attempt == pkt._retry_count: + self._resend(pkt) + else: + # They want to resend delayed messages + tmp = sorted( + self.data.items(), + reverse=most_recent, + key=lambda x: x[1].last_send_time, + ) + pkt_list = tmp[:count] + for (_key, pkt) in pkt_list: + self._resend(pkt) diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py new file mode 100644 index 0000000..54c3995 --- /dev/null +++ b/aprsd/packets/watch_list.py @@ -0,0 +1,103 @@ +import datetime +import logging +import threading + +import wrapt + +from aprsd import utils +from aprsd.utils import objectstore + + +LOG = logging.getLogger("APRSD") + + +class WatchList(objectstore.ObjectStoreMixin): + """Global watch list and info for callsigns.""" + + _instance = None + lock = threading.Lock() + data = {} + config = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + if "config" in kwargs: + cls._instance.config = kwargs["config"] + cls._instance._init_store() + cls._instance.data = {} + return cls._instance + + def __init__(self, config=None): + if config: + self.config = config + + ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10) + + for callsign in config["aprsd"]["watch_list"].get("callsigns", []): + call = callsign.replace("*", "") + # FIXME(waboring) - we should fetch the last time we saw + # a beacon from a callsign or some other mechanism to find + # last time a message was seen by aprs-is. For now this + # is all we can do. + self.data[call] = { + "last": datetime.datetime.now(), + "packets": utils.RingBuffer( + ring_size, + ), + } + + def is_enabled(self): + if self.config and "watch_list" in self.config["aprsd"]: + return self.config["aprsd"]["watch_list"].get("enabled", False) + else: + return False + + def callsign_in_watchlist(self, callsign): + return callsign in self.data + + @wrapt.synchronized(lock) + def update_seen(self, packet): + if packet.addresse: + callsign = packet.addresse + else: + callsign = packet.from_call + if self.callsign_in_watchlist(callsign): + self.data[callsign]["last"] = datetime.datetime.now() + self.data[callsign]["packets"].append(packet) + + def last_seen(self, callsign): + if self.callsign_in_watchlist(callsign): + return self.data[callsign]["last"] + + def age(self, callsign): + now = datetime.datetime.now() + return str(now - self.last_seen(callsign)) + + def max_delta(self, seconds=None): + watch_list_conf = self.config["aprsd"]["watch_list"] + if not seconds: + seconds = watch_list_conf["alert_time_seconds"] + max_timeout = {"seconds": seconds} + return datetime.timedelta(**max_timeout) + + def is_old(self, callsign, seconds=None): + """Watch list callsign last seen is old compared to now? + + This tests to see if the last time we saw a callsign packet, + if that is older than the allowed timeout in the config. + + We put this here so any notification plugin can use this + same test. + """ + age = self.age(callsign) + + delta = utils.parse_delta_str(age) + d = datetime.timedelta(**delta) + + max_delta = self.max_delta(seconds=seconds) + + if d > max_delta: + return True + else: + return False diff --git a/aprsd/plugin.py b/aprsd/plugin.py index a62ab9d..885740b 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -13,7 +13,8 @@ import pluggy from thesmuggler import smuggle import aprsd -from aprsd import client, messaging, packets, threads +from aprsd import client, messaging, threads +from aprsd.packets import watch_list # setup the global logger @@ -119,11 +120,11 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): thread.stop() @abc.abstractmethod - def filter(self, packet: packets.Packet): + def filter(self, packet): pass @abc.abstractmethod - def process(self, packet: packets.Packet): + def process(self, packet): """This is called when the filter passes.""" @@ -160,10 +161,10 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): LOG.warning("Watch list enabled, but no callsigns set.") @hookimpl - def filter(self, packet: packets.Packet): + def filter(self, packet): result = messaging.NULL_MESSAGE if self.enabled: - wl = packets.WatchList() + wl = watch_list.WatchList() if wl.callsign_in_watchlist(packet.from_call): # packet is from a callsign in the watch list self.rx_inc() @@ -212,7 +213,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): self.enabled = True @hookimpl - def filter(self, packet: packets.MessagePacket): + def filter(self, packet): result = None message = packet.get("message_text", None) @@ -272,7 +273,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase): def help(self): return "Help: send APRS help or help " - def process(self, packet: packets.MessagePacket): + def process(self, packet): LOG.info("HelpPlugin") # fromcall = packet.get("from") message = packet.message_text @@ -475,7 +476,7 @@ class PluginManager: self._load_plugin(p_name) LOG.info("Completed Plugin Loading.") - def run(self, packet: packets.Packet): + def run(self, packet): """Execute all the pluguns run method.""" with self.lock: return self._pluggy_pm.hook.filter(packet=packet) diff --git a/aprsd/plugins/notify.py b/aprsd/plugins/notify.py index 2a9e7ea..d75a19c 100644 --- a/aprsd/plugins/notify.py +++ b/aprsd/plugins/notify.py @@ -1,7 +1,6 @@ import logging from aprsd import messaging, packets, plugin -from aprsd.utils import trace LOG = logging.getLogger("APRSD") @@ -18,7 +17,6 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase): short_description = "Notify me when a CALLSIGN is recently seen on APRS-IS" - @trace.trace def process(self, packet: packets.MessagePacket): LOG.info("NotifySeenPlugin") diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index 819ee7f..5673fa4 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -39,6 +39,8 @@ class APRSDThreadList: """Iterate over all threads and call stop on them.""" for th in self.threads_list: LOG.info(f"Stopping Thread {th.name}") + if hasattr(th, "packet"): + LOG.info(F"{th.name} packet {th.packet}") th.stop() @wrapt.synchronized(lock) diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 95c7ea0..cfbdbc9 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -3,7 +3,7 @@ import logging import time import tracemalloc -from aprsd import client, messaging, packets, stats, utils +from aprsd import client, packets, stats, utils from aprsd.threads import APRSDThread, APRSDThreadList @@ -23,7 +23,7 @@ class KeepAliveThread(APRSDThread): def loop(self): if self.cntr % 60 == 0: - tracker = messaging.MsgTrack() + pkt_tracker = packets.PacketTrack() stats_obj = stats.APRSDStats() pl = packets.PacketList() thread_list = APRSDThreadList() @@ -53,7 +53,7 @@ class KeepAliveThread(APRSDThread): utils.strfdelta(stats_obj.uptime), pl.total_recv, pl.total_tx, - len(tracker), + len(pkt_tracker), stats_obj.msgs_tx, stats_obj.msgs_rx, last_msg_time, diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index dfa7c80..fad4096 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -64,12 +64,10 @@ class APRSDPluginRXThread(APRSDRXThread): processing in the PluginProcessPacketThread. """ def process_packet(self, *args, **kwargs): - raw = self._client.decode_packet(*args, **kwargs) + packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) - packet = packets.Packet.factory(raw.copy()) + #packet = packets.Packet.factory(raw.copy()) packet.log(header="RX Packet") - # LOG.debug(packet) - del raw thread = APRSDPluginProcessPacketThread( config=self.config, packet=packet, @@ -90,24 +88,20 @@ class APRSDProcessPacketThread(APRSDThread): self.packet = packet name = self.packet.raw[:10] super().__init__(f"RXPKT-{name}") + self._loop_cnt = 1 def process_ack_packet(self, packet): ack_num = packet.msgNo LOG.info(f"Got ack for message {ack_num}") - messaging.log_message( - "RXACK", - packet.raw, - None, - ack=ack_num, - fromcall=packet.from_call, - ) - tracker = messaging.MsgTrack() - tracker.remove(ack_num) + packet.log("RXACK") + pkt_tracker = packets.PacketTrack() + pkt_tracker.remove(ack_num) stats.APRSDStats().ack_rx_inc() return def loop(self): """Process a packet received from aprs-is server.""" + LOG.debug(f"RXPKT-LOOP {self._loop_cnt}") packet = self.packet packets.PacketList().add(packet) our_call = self.config["aprsd"]["callsign"].lower() @@ -136,12 +130,20 @@ class APRSDProcessPacketThread(APRSDThread): stats.APRSDStats().msgs_rx_inc() # let any threads do their thing, then ack # send an ack last - ack = messaging.AckMessage( - self.config["aprsd"]["callsign"], - from_call, - msg_id=msg_id, + ack_pkt = packets.AckPacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + msgNo=msg_id, ) - ack.send() + LOG.warning(f"Send AckPacket {ack_pkt}") + ack_pkt.send() + LOG.warning("Send ACK called Continue on") + #ack = messaging.AckMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # msg_id=msg_id, + #) + #ack.send() self.process_our_message_packet(packet) else: @@ -151,7 +153,8 @@ class APRSDProcessPacketThread(APRSDThread): self.process_other_packet( packet, for_us=(to_call.lower() == our_call), ) - LOG.debug("Packet processing complete") + LOG.debug("Packet processing complete") + return False @abc.abstractmethod def process_our_message_packet(self, *args, **kwargs): @@ -194,16 +197,29 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): if isinstance(subreply, messaging.Message): subreply.send() else: - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - subreply, + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=subreply, ) - msg.send() + msg_pkt.send() + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # subreply, + #) + #msg.send() elif isinstance(reply, messaging.Message): # We have a message based object. LOG.debug(f"Sending '{reply}'") - reply.send() + # Convert this to the new packet + msg_pkt = packets.MessagePacket( + from_call=reply.fromcall, + to_call=reply.tocall, + message_text=reply._raw_message, + ) + #reply.send() + msg_pkt.send() replied = True else: replied = True @@ -213,33 +229,55 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): # usage string if reply is not messaging.NULL_MESSAGE: LOG.debug(f"Sending '{reply}'") - - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - reply, + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, ) - msg.send() + LOG.warning("Calling msg_pkg.send()") + msg_pkt.send() + LOG.warning("Calling msg_pkg.send() --- DONE") + + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # reply, + #) + #msg.send() # If the message was for us and we didn't have a # response, then we send a usage statement. if to_call == self.config["aprsd"]["callsign"] and not replied: LOG.warning("Sending help!") - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - "Unknown command! Send 'help' message for help", + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text="Unknown command! Send 'help' message for help", ) - msg.send() + msg_pkt.send() + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # "Unknown command! Send 'help' message for help", + #) + #msg.send() except Exception as ex: LOG.error("Plugin failed!!!") LOG.exception(ex) # Do we need to send a reply? if to_call == self.config["aprsd"]["callsign"]: reply = "A Plugin failed! try again?" - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - reply, + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, ) - msg.send() + msg_pkt.send() + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # reply, + #) + #msg.send() + + LOG.debug("Completed process_our_message_packet") diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py new file mode 100644 index 0000000..8df838c --- /dev/null +++ b/aprsd/threads/tx.py @@ -0,0 +1,120 @@ +import datetime +import logging +import time + +from aprsd import client, stats +from aprsd import threads as aprsd_threads +from aprsd.packets import packet_list, tracker + + +LOG = logging.getLogger("APRSD") + + +class SendPacketThread(aprsd_threads.APRSDThread): + def __init__(self, packet): + self.packet = packet + name = self.packet.raw[:5] + super().__init__(f"TXPKT-{self.packet.msgNo}-{name}") + pkt_tracker = tracker.PacketTrack() + pkt_tracker.add(packet) + + def loop(self): + LOG.debug("TX Loop") + """Loop until a message is acked or it gets delayed. + + We only sleep for 5 seconds between each loop run, so + that CTRL-C can exit the app in a short period. Each sleep + means the app quitting is blocked until sleep is done. + So we keep track of the last send attempt and only send if the + last send attempt is old enough. + + """ + pkt_tracker = tracker.PacketTrack() + # lets see if the message is still in the tracking queue + packet = pkt_tracker.get(self.packet.msgNo) + if not packet: + # The message has been removed from the tracking queue + # So it got acked and we are done. + LOG.info("Message Send Complete via Ack.") + return False + else: + send_now = False + if packet._last_send_attempt == packet._retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Message Send Complete. Max attempts reached.") + if not packet._allow_delay: + pkt_tracker.remove(packet.msgNo) + return False + + # Message is still outstanding and needs to be acked. + if packet._last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + sleeptime = (packet._last_send_attempt + 1) * 31 + delta = now - packet._last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + send_now = True + + if send_now: + # no attempt time, so lets send it, and start + # tracking the time. + packet.log("Sending Message") + cl = client.factory.create().client + cl.send(packet.raw) + stats.APRSDStats().msgs_tx_inc() + packet_list.PacketList().add(packet) + packet._last_send_time = datetime.datetime.now() + packet._last_send_attempt += 1 + + time.sleep(5) + # Make sure we get called again. + return True + + +class SendAckThread(aprsd_threads.APRSDThread): + def __init__(self, packet): + self.packet = packet + super().__init__(f"SendAck-{self.packet.msgNo}") + self._loop_cnt = 1 + + def loop(self): + """Separate thread to send acks with retries.""" + send_now = False + if self.packet._last_send_attempt == self.packet._retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Ack Send Complete. Max attempts reached.") + return False + + if self.packet._last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + sleeptime = 31 + delta = now - self.packet._last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + elif self._loop_cnt % 5 == 0: + LOG.debug(f"Still wating. {delta}") + else: + send_now = True + + if send_now: + cl = client.factory.create().client + self.packet.log("Sending ACK") + cl.send(self.packet.raw) + self.packet._send_count += 1 + stats.APRSDStats().ack_tx_inc() + packet_list.PacketList().add(self.packet) + self.packet._last_send_attempt += 1 + self.packet._last_send_time = datetime.datetime.now() + time.sleep(1) + self._loop_cnt += 1 + return True diff --git a/aprsd/utils/counter.py b/aprsd/utils/counter.py new file mode 100644 index 0000000..e423bfb --- /dev/null +++ b/aprsd/utils/counter.py @@ -0,0 +1,48 @@ +from multiprocessing import RawValue +import threading + +import wrapt + + +class PacketCounter: + """ + Global Packet id counter class. + + This is a singleton based class that keeps + an incrementing counter for all packets to + be sent. All new Packet objects gets a new + message id, which is the next number available + from the PacketCounter. + + """ + + _instance = None + max_count = 9999 + lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + """Make this a singleton class.""" + if cls._instance is None: + cls._instance = super().__new__(cls, *args, **kwargs) + cls._instance.val = RawValue("i", 1) + return cls._instance + + @wrapt.synchronized(lock) + def increment(self): + if self.val.value == self.max_count: + self.val.value = 1 + else: + self.val.value += 1 + + @property + @wrapt.synchronized(lock) + def value(self): + return self.val.value + + @wrapt.synchronized(lock) + def __repr__(self): + return str(self.val.value) + + @wrapt.synchronized(lock) + def __str__(self): + return str(self.val.value) diff --git a/tox.ini b/tox.ini index 84b8b24..c0fe0c1 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ minversion = 2.9.0 skipdist = True skip_missing_interpreters = true -envlist = pep8,py{38,39} +envlist = pep8,py{39,310} #requires = tox-pipenv # pip==22.0.4 # pip-tools==5.4.0