diff --git a/aprsd/aprsd.py b/aprsd/aprsd.py index 7a0afb5..7c91d2e 100644 --- a/aprsd/aprsd.py +++ b/aprsd/aprsd.py @@ -34,7 +34,7 @@ import click_completion import aprsd from aprsd import cli_helper from aprsd import config as aprsd_config -from aprsd import messaging, packets, stats, threads, utils +from aprsd import packets, stats, threads, utils # setup the global logger @@ -85,7 +85,7 @@ def signal_handler(sig, frame): ), ) time.sleep(1.5) - messaging.MsgTrack().save() + packets.PacketTrack().save() packets.WatchList().save() packets.SeenList().save() LOG.info(stats.APRSDStats()) diff --git a/aprsd/client.py b/aprsd/client.py index 7664d62..4285990 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -8,6 +8,7 @@ from aprslib.exceptions import LoginError from aprsd import config as aprsd_config from aprsd import exception from aprsd.clients import aprsis, kiss +from aprsd.packets import core from aprsd.utils import trace @@ -109,7 +110,7 @@ class APRSISClient(Client): def decode_packet(self, *args, **kwargs): """APRS lib already decodes this.""" - return args[0] + return core.Packet.factory(args[0]) @trace.trace def setup_connection(self): @@ -198,8 +199,8 @@ class KISSClient(Client): # msg = frame.tnc2 LOG.debug(f"Decoding {msg}") - packet = aprslib.parse(msg) - return packet + raw = aprslib.parse(msg) + return core.Packet.factory(raw) @trace.trace def setup_connection(self): diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py index 635d27b..fc22008 100644 --- a/aprsd/clients/aprsis.py +++ b/aprsd/clients/aprsis.py @@ -1,6 +1,5 @@ import logging import select -import socket import threading import aprslib @@ -32,23 +31,6 @@ class Aprsdis(aprslib.IS): self.thread_stop = True LOG.info("Shutdown Aprsdis client.") - def is_socket_closed(self, sock: socket.socket) -> bool: - try: - # this will try to read bytes without blocking and also without removing them from buffer (peek only) - data = sock.recv(16, socket.MSG_DONTWAIT | socket.MSG_PEEK) - if len(data) == 0: - return True - except BlockingIOError: - return False # socket is open and reading from it would block - except ConnectionResetError: - return True # socket was closed for some other reason - except Exception: - self.logger.exception( - "unexpected exception when checking if a socket is closed", - ) - return False - return False - @wrapt.synchronized(lock) def send(self, msg): """Send an APRS Message object.""" diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 2945548..e889445 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -14,7 +14,7 @@ from rich.console import Console # local imports here import aprsd -from aprsd import cli_helper, client, messaging, packets, stats, threads, utils +from aprsd import cli_helper, client, packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.threads import rx @@ -39,9 +39,7 @@ def signal_handler(sig, frame): class APRSDListenThread(rx.APRSDRXThread): def process_packet(self, *args, **kwargs): - raw = self._client.decode_packet(*args, **kwargs) - packet = packets.Packet.factory(raw) - LOG.debug(f"Got packet {packet}") + packet = self._client.decode_packet(*args, **kwargs) packet.log(header="RX Packet") @@ -115,7 +113,7 @@ def listen( # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") - messaging.MsgTrack(config=config).load() + packets.PacketTrack(config=config).load() packets.WatchList(config=config).load() packets.SeenList(config=config).load() diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index ea2b234..08a236f 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError import click import aprsd -from aprsd import cli_helper, client, messaging, packets +from aprsd import cli_helper, client, packets from aprsd.aprsd import cli @@ -98,32 +98,22 @@ def send_message( def rx_packet(packet): global got_ack, got_response + cl = client.factory.create() + packet = cl.decode_packet(packet) + packet.log("RX_PKT") # LOG.debug("Got packet back {}".format(packet)) - resp = packet.get("response", None) - if resp == "ack": - ack_num = packet.get("msgNo") - LOG.info(f"We got ack for our sent message {ack_num}") - messaging.log_packet(packet) + if isinstance(packet, packets.AckPacket): got_ack = True else: - message = packet.get("message_text", None) - fromcall = packet["from"] - msg_number = packet.get("msgNo", "0") - messaging.log_message( - "Received Message", - packet["raw"], - message, - fromcall=fromcall, - ack=msg_number, - ) got_response = True - # Send the ack back? - ack = messaging.AckMessage( - config["aprs"]["login"], - fromcall, - msg_id=msg_number, + from_call = packet.from_call + our_call = config["aprsd"]["callsign"].lower() + ack_pkt = packets.AckPacket( + from_call=our_call, + to_call=from_call, + msgNo=packet.msgNo, ) - ack.send_direct() + ack_pkt.send_direct() if got_ack: if wait_response: @@ -144,12 +134,16 @@ def send_message( # we should bail after we get the ack and send an ack back for the # message if raw: - msg = messaging.RawMessage(raw) - msg.send_direct() + pkt = packets.Packet(from_call="", to_call="", raw=raw) + pkt.send_direct() sys.exit(0) else: - msg = messaging.TextMessage(aprs_login, tocallsign, command) - msg.send_direct() + pkt = packets.MessagePacket( + from_call=aprs_login, + to_call=tocallsign, + message_text=command, + ) + pkt.send_direct() if no_ack: sys.exit(0) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 70c47fd..4bb50cc 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -6,8 +6,7 @@ import click import aprsd from aprsd import ( - cli_helper, client, flask, messaging, packets, plugin, stats, threads, - utils, + cli_helper, client, flask, packets, plugin, stats, threads, utils, ) from aprsd import aprsd as aprsd_main from aprsd.aprsd import cli @@ -81,13 +80,15 @@ def server(ctx, flush): packets.PacketList(config=config) if flush: LOG.debug("Deleting saved MsgTrack.") - messaging.MsgTrack(config=config).flush() + #messaging.MsgTrack(config=config).flush() + packets.PacketTrack(config=config).flush() packets.WatchList(config=config) packets.SeenList(config=config) else: # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") - messaging.MsgTrack(config=config).load() + #messaging.MsgTrack(config=config).load() + packets.PacketTrack(config=config).load() packets.WatchList(config=config).load() packets.SeenList(config=config).load() @@ -102,7 +103,8 @@ def server(ctx, flush): ) rx_thread.start() - messaging.MsgTrack().restart() + #messaging.MsgTrack().restart() + packets.PacketTrack().restart() keepalive = threads.KeepAliveThread(config=config) keepalive.start() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 85a9fa7..fcd5fd7 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -22,7 +22,7 @@ import wrapt import aprsd from aprsd import cli_helper, client from aprsd import config as aprsd_config -from aprsd import messaging, packets, stats, threads, utils +from aprsd import packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.logging import rich as aprsd_logging from aprsd.threads import rx @@ -44,13 +44,11 @@ def signal_handler(sig, frame): threads.APRSDThreadList().stop_all() if "subprocess" not in str(frame): time.sleep(1.5) - # messaging.MsgTrack().save() # packets.WatchList().save() # packets.SeenList().save() LOG.info(stats.APRSDStats()) LOG.info("Telling flask to bail.") signal.signal(signal.SIGTERM, sys.exit(0)) - sys.exit(0) class SentMessages(objectstore.ObjectStoreMixin): @@ -67,11 +65,11 @@ class SentMessages(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def add(self, msg): - self.data[msg.id] = self.create(msg.id) - self.data[msg.id]["from"] = msg.fromcall - self.data[msg.id]["to"] = msg.tocall - self.data[msg.id]["message"] = msg.message.rstrip("\n") - self.data[msg.id]["raw"] = str(msg).rstrip("\n") + self.data[msg.msgNo] = self.create(msg.msgNo) + self.data[msg.msgNo]["from"] = msg.from_call + self.data[msg.msgNo]["to"] = msg.to_call + self.data[msg.msgNo]["message"] = msg.message_text.rstrip("\n") + self.data[msg.msgNo]["raw"] = msg.message_text.rstrip("\n") def create(self, id): return { @@ -344,21 +342,21 @@ class SendMessageNamespace(Namespace): LOG.debug(f"WS: on_send {data}") self.request = data data["from"] = self._config["aprs"]["login"] - msg = messaging.TextMessage( - data["from"], - data["to"].upper(), - data["message"], + pkt = packets.MessagePacket( + from_call=data["from"], + to_call=data["to"].upper(), + message_text=data["message"], ) - self.msg = msg + self.msg = pkt msgs = SentMessages() - msgs.add(msg) - msgs.set_status(msg.id, "Sending") - obj = msgs.get(self.msg.id) + msgs.add(pkt) + pkt.send() + msgs.set_status(pkt.msgNo, "Sending") + obj = msgs.get(pkt.msgNo) socketio.emit( "sent", obj, namespace="/sendmsg", ) - msg.send() def on_gps(self, data): LOG.debug(f"WS on_GPS: {data}") @@ -378,10 +376,17 @@ class SendMessageNamespace(Namespace): f":@{time_zulu}z{lat}/{long}l APRSD WebChat Beacon" ) - beacon_msg = messaging.RawMessage(txt) - beacon_msg.fromcall = self._config["aprs"]["login"] - beacon_msg.tocall = "APDW16" - beacon_msg.send_direct() + LOG.debug(f"Sending {txt}") + beacon = packets.GPSPacket( + from_call=self._config["aprs"]["login"], + to_call="APDW16", + raw=txt, + ) + beacon.send_direct() + #beacon_msg = messaging.RawMessage(txt) + #beacon_msg.fromcall = self._config["aprs"]["login"] + #beacon_msg.tocall = "APDW16" + #beacon_msg.send_direct() def handle_message(self, data): LOG.debug(f"WS Data {data}") @@ -534,7 +539,7 @@ def webchat(ctx, flush, port): sys.exit(-1) packets.PacketList(config=config) - messaging.MsgTrack(config=config) + packets.PacketTrack(config=config) packets.WatchList(config=config) packets.SeenList(config=config) diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 6f27002..3e25742 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,588 +1,3 @@ -import abc -import datetime -import logging -from multiprocessing import RawValue -import re -import threading -import time - -from aprsd import client, packets, stats, threads -from aprsd.utils import objectstore - - -LOG = logging.getLogger("APRSD") - # What to return from a plugin if we have processed the message # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 - - -class MsgTrack(objectstore.ObjectStoreMixin): - """Class to keep track of outstanding text messages. - - This is a thread safe class that keeps track of active - messages. - - When a message is asked to be sent, it is placed into this - class via it's id. The TextMessage class's send() method - automatically adds itself to this class. When the ack is - recieved from the radio, the message object is removed from - this class. - """ - - _instance = None - _start_time = None - lock = None - - data = {} - total_messages_tracked = 0 - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance.track = {} - cls._instance._start_time = datetime.datetime.now() - cls._instance.lock = threading.Lock() - cls._instance.config = kwargs["config"] - cls._instance._init_store() - return cls._instance - - def __getitem__(self, name): - with self.lock: - return self.data[name] - - def __iter__(self): - with self.lock: - return iter(self.data) - - def keys(self): - with self.lock: - return self.data.keys() - - def items(self): - with self.lock: - return self.data.items() - - def values(self): - with self.lock: - return self.data.values() - - def __len__(self): - with self.lock: - return len(self.data) - - def __str__(self): - with self.lock: - result = "{" - for key in self.data.keys(): - result += f"{key}: {str(self.data[key])}, " - result += "}" - return result - - def add(self, msg): - with self.lock: - key = int(msg.id) - self.data[key] = msg - stats.APRSDStats().msgs_tracked_inc() - self.total_messages_tracked += 1 - - def get(self, id): - with self.lock: - if id in self.data: - return self.data[id] - - def remove(self, id): - with self.lock: - key = int(id) - if key in self.data.keys(): - del self.data[key] - - def restart(self): - """Walk the list of messages and restart them if any.""" - - for key in self.data.keys(): - msg = self.data[key] - if msg.last_send_attempt < msg.retry_count: - msg.send() - - def _resend(self, msg): - msg.last_send_attempt = 0 - msg.send() - - def restart_delayed(self, count=None, most_recent=True): - """Walk the list of delayed messages and restart them if any.""" - if not count: - # Send all the delayed messages - for key in self.data.keys(): - msg = self.data[key] - if msg.last_send_attempt == msg.retry_count: - self._resend(msg) - else: - # They want to resend delayed messages - tmp = sorted( - self.data.items(), - reverse=most_recent, - key=lambda x: x[1].last_send_time, - ) - msg_list = tmp[:count] - for (_key, msg) in msg_list: - self._resend(msg) - - -class MessageCounter: - """ - Global message id counter class. - - This is a singleton based class that keeps - an incrementing counter for all messages to - be sent. All new Message objects gets a new - message id, which is the next number available - from the MessageCounter. - - """ - - _instance = None - max_count = 9999 - lock = None - - def __new__(cls, *args, **kwargs): - """Make this a singleton class.""" - if cls._instance is None: - cls._instance = super().__new__(cls, *args, **kwargs) - cls._instance.val = RawValue("i", 1) - cls._instance.lock = threading.Lock() - return cls._instance - - def increment(self): - with self.lock: - if self.val.value == self.max_count: - self.val.value = 1 - else: - self.val.value += 1 - - @property - def value(self): - with self.lock: - return self.val.value - - def __repr__(self): - with self.lock: - return str(self.val.value) - - def __str__(self): - with self.lock: - return str(self.val.value) - - -class Message(metaclass=abc.ABCMeta): - """Base Message Class.""" - - # The message id to send over the air - id = 0 - - retry_count = 3 - last_send_time = 0 - last_send_attempt = 0 - - transport = None - _raw_message = None - - def __init__( - self, - fromcall, - tocall, - msg_id=None, - allow_delay=True, - ): - self.fromcall = fromcall - self.tocall = tocall - if not msg_id: - c = MessageCounter() - c.increment() - msg_id = c.value - self.id = msg_id - - # do we try and save this message for later if we don't get - # an ack? Some messages we don't want to do this ever. - self.allow_delay = allow_delay - - @abc.abstractmethod - def send(self): - """Child class must declare.""" - - def _filter_for_send(self): - """Filter and format message string for FCC.""" - # max? ftm400 displays 64, raw msg shows 74 - # and ftm400-send is max 64. setting this to - # 67 displays 64 on the ftm400. (+3 {01 suffix) - # feature req: break long ones into two msgs - message = self._raw_message[:67] - # We all miss George Carlin - return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - - @property - def message(self): - return self._filter_for_send().rstrip("\n") - - def __str__(self): - return self.message - - -class RawMessage(Message): - """Send a raw message. - - This class is used for custom messages that contain the entire - contents of an APRS message in the message field. - - """ - - last_send_age = last_send_time = None - - def __init__(self, message, allow_delay=True): - super().__init__( - fromcall=None, tocall=None, msg_id=None, - allow_delay=allow_delay, - ) - self._raw_message = message - - def dict(self): - now = datetime.datetime.now() - last_send_age = None - if self.last_send_time: - last_send_age = str(now - self.last_send_time) - return { - "type": "raw", - "message": self.message, - "raw": str(self), - "retry_count": self.retry_count, - "last_send_attempt": self.last_send_attempt, - "last_send_time": str(self.last_send_time), - "last_send_age": last_send_age, - } - - def send(self): - tracker = MsgTrack() - tracker.add(self) - thread = SendMessageThread(message=self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send a message without a separate thread.""" - cl = client.factory.create().client - log_message( - "Sending Message Direct", - str(self), - self.message, - tocall=self.tocall, - fromcall=self.fromcall, - ) - cl.send(self) - stats.APRSDStats().msgs_tx_inc() - - -class TextMessage(Message): - """Send regular ARPS text/command messages/replies.""" - - last_send_time = last_send_age = None - - def __init__( - self, fromcall, tocall, message, - msg_id=None, allow_delay=True, - ): - super().__init__( - fromcall=fromcall, tocall=tocall, - msg_id=msg_id, allow_delay=allow_delay, - ) - self._raw_message = message - - def dict(self): - now = datetime.datetime.now() - - last_send_age = None - if self.last_send_time: - last_send_age = str(now - self.last_send_time) - - return { - "id": self.id, - "type": "text-message", - "fromcall": self.fromcall, - "tocall": self.tocall, - "message": self.message, - "raw": str(self), - "retry_count": self.retry_count, - "last_send_attempt": self.last_send_attempt, - "last_send_time": str(self.last_send_time), - "last_send_age": last_send_age, - } - - def __str__(self): - """Build raw string to send over the air.""" - return "{}>APZ100::{}:{}{{{}\n".format( - self.fromcall, - self.tocall.ljust(9), - self.message, - str(self.id), - ) - - def send(self): - tracker = MsgTrack() - tracker.add(self) - LOG.debug(f"Length of MsgTrack is {len(tracker)}") - thread = SendMessageThread(message=self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send a message without a separate thread.""" - if aprsis_client: - cl = aprsis_client - else: - cl = client.factory.create().client - log_message( - "Sending Message Direct", - str(self), - self.message, - tocall=self.tocall, - fromcall=self.fromcall, - ) - cl.send(self) - stats.APRSDStats().msgs_tx_inc() - pkt_dict = self.dict().copy() - pkt_dict["from"] = pkt_dict["fromcall"] - pkt_dict["to"] = pkt_dict["tocall"] - packet = packets.Packet.factory(pkt_dict) - packets.PacketList().add(packet) - - -class SendMessageThread(threads.APRSDThread): - def __init__(self, message): - self.msg = message - name = self.msg._raw_message[:5] - super().__init__(f"TXPKT-{self.msg.id}-{name}") - - def loop(self): - """Loop until a message is acked or it gets delayed. - - We only sleep for 5 seconds between each loop run, so - that CTRL-C can exit the app in a short period. Each sleep - means the app quitting is blocked until sleep is done. - So we keep track of the last send attempt and only send if the - last send attempt is old enough. - - """ - tracker = MsgTrack() - # lets see if the message is still in the tracking queue - msg = tracker.get(self.msg.id) - if not msg: - # The message has been removed from the tracking queue - # So it got acked and we are done. - LOG.info("Message Send Complete via Ack.") - return False - else: - send_now = False - if msg.last_send_attempt == msg.retry_count: - # we reached the send limit, don't send again - # TODO(hemna) - Need to put this in a delayed queue? - LOG.info("Message Send Complete. Max attempts reached.") - if not msg.allow_delay: - tracker.remove(msg.id) - return False - - # Message is still outstanding and needs to be acked. - if msg.last_send_time: - # Message has a last send time tracking - now = datetime.datetime.now() - sleeptime = (msg.last_send_attempt + 1) * 31 - delta = now - msg.last_send_time - if delta > datetime.timedelta(seconds=sleeptime): - # It's time to try to send it again - send_now = True - else: - send_now = True - - if send_now: - # no attempt time, so lets send it, and start - # tracking the time. - log_message( - "Sending Message", - str(msg), - msg.message, - tocall=self.msg.tocall, - retry_number=msg.last_send_attempt, - msg_num=msg.id, - ) - cl = client.factory.create().client - cl.send(msg) - stats.APRSDStats().msgs_tx_inc() - packets.PacketList().add(msg.dict()) - msg.last_send_time = datetime.datetime.now() - msg.last_send_attempt += 1 - - time.sleep(5) - # Make sure we get called again. - return True - - -class AckMessage(Message): - """Class for building Acks and sending them.""" - - def __init__(self, fromcall, tocall, msg_id): - super().__init__(fromcall, tocall, msg_id=msg_id) - - def dict(self): - now = datetime.datetime.now() - last_send_age = None - if self.last_send_time: - last_send_age = str(now - self.last_send_time) - return { - "id": self.id, - "type": "ack", - "fromcall": self.fromcall, - "tocall": self.tocall, - "raw": str(self).rstrip("\n"), - "retry_count": self.retry_count, - "last_send_attempt": self.last_send_attempt, - "last_send_time": str(self.last_send_time), - "last_send_age": last_send_age, - } - - def __str__(self): - return "{}>APZ100::{}:ack{}\n".format( - self.fromcall, - self.tocall.ljust(9), - self.id, - ) - - def _filter_for_send(self): - return f"ack{self.id}" - - def send(self): - LOG.debug(f"Send ACK({self.tocall}:{self.id}) to radio.") - thread = SendAckThread(self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send an ack message without a separate thread.""" - if aprsis_client: - cl = aprsis_client - else: - cl = client.factory.create().client - log_message( - "Sending ack", - str(self).rstrip("\n"), - None, - ack=self.id, - tocall=self.tocall, - fromcall=self.fromcall, - ) - cl.send(self) - - -class SendAckThread(threads.APRSDThread): - def __init__(self, ack): - self.ack = ack - super().__init__(f"SendAck-{self.ack.id}") - - def loop(self): - """Separate thread to send acks with retries.""" - send_now = False - if self.ack.last_send_attempt == self.ack.retry_count: - # we reached the send limit, don't send again - # TODO(hemna) - Need to put this in a delayed queue? - LOG.info("Ack Send Complete. Max attempts reached.") - return False - - if self.ack.last_send_time: - # Message has a last send time tracking - now = datetime.datetime.now() - - # aprs duplicate detection is 30 secs? - # (21 only sends first, 28 skips middle) - sleeptime = 31 - delta = now - self.ack.last_send_time - if delta > datetime.timedelta(seconds=sleeptime): - # It's time to try to send it again - send_now = True - else: - LOG.debug(f"Still wating. {delta}") - else: - send_now = True - - if send_now: - cl = client.factory.create().client - log_message( - "Sending ack", - str(self.ack).rstrip("\n"), - None, - ack=self.ack.id, - tocall=self.ack.tocall, - retry_number=self.ack.last_send_attempt, - ) - cl.send(self.ack) - stats.APRSDStats().ack_tx_inc() - packets.PacketList().add(self.ack.dict()) - self.ack.last_send_attempt += 1 - self.ack.last_send_time = datetime.datetime.now() - time.sleep(5) - return True - - -def log_packet(packet): - fromcall = packet.get("from", None) - tocall = packet.get("to", None) - - response_type = packet.get("response", None) - msg = packet.get("message_text", None) - msg_num = packet.get("msgNo", None) - ack = packet.get("ack", None) - - log_message( - "Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall, - ack=ack, packet_type=response_type, msg_num=msg_num, ) - - -def log_message( - header, raw, message, tocall=None, fromcall=None, msg_num=None, - retry_number=None, ack=None, packet_type=None, uuid=None, - console=None, -): - """ - - Log a message entry. - - This builds a long string with newlines for the log entry, so that - it's thread safe. If we log each item as a separate log.debug() call - Then the message information could get multiplexed with other log - messages. Each python log call is automatically synchronized. - - - """ - - log_list = [""] - if retry_number: - log_list.append(f"{header} _______________(TX:{retry_number})") - else: - log_list.append(f"{header} _______________") - - log_list.append(f" Raw : {raw}") - - if packet_type: - log_list.append(f" Packet : {packet_type}") - if tocall: - log_list.append(f" To : {tocall}") - if fromcall: - log_list.append(f" From : {fromcall}") - - if ack: - log_list.append(f" Ack : {ack}") - else: - log_list.append(f" Message : {message}") - if msg_num: - log_list.append(f" Msg # : {msg_num}") - if uuid: - log_list.append(f" UUID : {uuid}") - log_list.append(f"{header} _______________ Complete") - - if console: - console.log("\n".join(log_list)) - else: - LOG.info("\n".join(log_list)) diff --git a/aprsd/packets.py b/aprsd/packets.py deleted file mode 100644 index 629beea..0000000 --- a/aprsd/packets.py +++ /dev/null @@ -1,385 +0,0 @@ -from dataclasses import asdict, dataclass, field -import datetime -import logging -import threading -import time -# Due to a failure in python 3.8 -from typing import List - -import dacite -import wrapt - -from aprsd import utils -from aprsd.utils import objectstore - - -LOG = logging.getLogger("APRSD") - -PACKET_TYPE_MESSAGE = "message" -PACKET_TYPE_ACK = "ack" -PACKET_TYPE_MICE = "mic-e" -PACKET_TYPE_WX = "weather" -PACKET_TYPE_UNKNOWN = "unknown" -PACKET_TYPE_STATUS = "status" -PACKET_TYPE_BEACON = "beacon" -PACKET_TYPE_UNCOMPRESSED = "uncompressed" - - -@dataclass -class Packet: - from_call: str - to_call: str - addresse: str = None - format: str = None - msgNo: str = None # noqa: N815 - packet_type: str = None - timestamp: float = field(default_factory=time.time) - raw: str = None - _raw_dict: dict = field(repr=True, default_factory=lambda: {}) - - def get(self, key, default=None): - """Emulate a getter on a dict.""" - if hasattr(self, key): - return getattr(self, key) - else: - return default - - @staticmethod - def factory(raw_packet): - raw = raw_packet.copy() - raw["_raw_dict"] = raw.copy() - translate_fields = { - "from": "from_call", - "to": "to_call", - } - # First translate some fields - for key in translate_fields: - if key in raw: - raw[translate_fields[key]] = raw[key] - del raw[key] - - if "addresse" in raw: - raw["to_call"] = raw["addresse"] - - packet_type = get_packet_type(raw) - raw["packet_type"] = packet_type - class_name = TYPE_LOOKUP[packet_type] - if packet_type == PACKET_TYPE_UNKNOWN: - # Try and figure it out here - if "latitude" in raw: - class_name = GPSPacket - - if packet_type == PACKET_TYPE_WX: - # the weather information is in a dict - # this brings those values out to the outer dict - for key in raw["weather"]: - raw[key] = raw["weather"][key] - - return dacite.from_dict(data_class=class_name, data=raw) - - def log(self, header=None): - """LOG a packet to the logfile.""" - asdict(self) - log_list = ["\n"] - if header: - log_list.append(f"{header} _______________") - log_list.append(f" Packet : {self.__class__.__name__}") - log_list.append(f" Raw : {self.raw}") - if self.to_call: - log_list.append(f" To : {self.to_call}") - if self.from_call: - log_list.append(f" From : {self.from_call}") - if hasattr(self, "path"): - log_list.append(f" Path : {'=>'.join(self.path)}") - if hasattr(self, "via"): - log_list.append(f" VIA : {self.via}") - - elif isinstance(self, MessagePacket): - log_list.append(f" Message : {self.message_text}") - - if self.msgNo: - log_list.append(f" Msg # : {self.msgNo}") - log_list.append(f"{header} _______________ Complete") - - LOG.info("\n".join(log_list)) - LOG.debug(self) - - -@dataclass -class PathPacket(Packet): - path: List[str] = field(default_factory=list) - via: str = None - - -@dataclass -class AckPacket(PathPacket): - response: str = None - - -@dataclass -class MessagePacket(PathPacket): - message_text: str = None - - -@dataclass -class StatusPacket(PathPacket): - status: str = None - timestamp: int = 0 - messagecapable: bool = False - comment: str = None - - -@dataclass -class GPSPacket(PathPacket): - latitude: float = 0.00 - longitude: float = 0.00 - altitude: float = 0.00 - rng: float = 0.00 - posambiguity: int = 0 - timestamp: int = 0 - comment: str = None - symbol: str = None - symbol_table: str = None - speed: float = 0.00 - course: int = 0 - - -@dataclass -class MicEPacket(GPSPacket): - messagecapable: bool = False - mbits: str = None - mtype: str = None - - -@dataclass -class WeatherPacket(GPSPacket): - symbol: str = "_" - wind_gust: float = 0.00 - temperature: float = 0.00 - rain_1h: float = 0.00 - rain_24h: float = 0.00 - rain_since_midnight: float = 0.00 - humidity: int = 0 - pressure: float = 0.00 - comment: str = None - - -class PacketList: - """Class to track all of the packets rx'd and tx'd by aprsd.""" - - _instance = None - lock = threading.Lock() - config = None - - packet_list = {} - - total_recv = 0 - total_tx = 0 - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance.packet_list = utils.RingBuffer(1000) - cls._instance.config = kwargs["config"] - return cls._instance - - def __init__(self, config=None): - if config: - self.config = config - - @wrapt.synchronized(lock) - def __iter__(self): - return iter(self.packet_list) - - @wrapt.synchronized(lock) - def add(self, packet: Packet): - packet.ts = time.time() - if (packet.from_call == self.config["aprs"]["login"]): - self.total_tx += 1 - else: - self.total_recv += 1 - self.packet_list.append(packet) - SeenList().update_seen(packet) - - @wrapt.synchronized(lock) - def get(self): - return self.packet_list.get() - - @wrapt.synchronized(lock) - def total_received(self): - return self.total_recv - - @wrapt.synchronized(lock) - def total_sent(self): - return self.total_tx - - -class WatchList(objectstore.ObjectStoreMixin): - """Global watch list and info for callsigns.""" - - _instance = None - lock = threading.Lock() - data = {} - config = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - if "config" in kwargs: - cls._instance.config = kwargs["config"] - cls._instance._init_store() - cls._instance.data = {} - return cls._instance - - def __init__(self, config=None): - if config: - self.config = config - - ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10) - - for callsign in config["aprsd"]["watch_list"].get("callsigns", []): - call = callsign.replace("*", "") - # FIXME(waboring) - we should fetch the last time we saw - # a beacon from a callsign or some other mechanism to find - # last time a message was seen by aprs-is. For now this - # is all we can do. - self.data[call] = { - "last": datetime.datetime.now(), - "packets": utils.RingBuffer( - ring_size, - ), - } - - def is_enabled(self): - if self.config and "watch_list" in self.config["aprsd"]: - return self.config["aprsd"]["watch_list"].get("enabled", False) - else: - return False - - def callsign_in_watchlist(self, callsign): - return callsign in self.data - - @wrapt.synchronized(lock) - def update_seen(self, packet): - if packet.addresse: - callsign = packet.addresse - else: - callsign = packet.from_call - if self.callsign_in_watchlist(callsign): - self.data[callsign]["last"] = datetime.datetime.now() - self.data[callsign]["packets"].append(packet) - - def last_seen(self, callsign): - if self.callsign_in_watchlist(callsign): - return self.data[callsign]["last"] - - def age(self, callsign): - now = datetime.datetime.now() - return str(now - self.last_seen(callsign)) - - def max_delta(self, seconds=None): - watch_list_conf = self.config["aprsd"]["watch_list"] - if not seconds: - seconds = watch_list_conf["alert_time_seconds"] - max_timeout = {"seconds": seconds} - return datetime.timedelta(**max_timeout) - - def is_old(self, callsign, seconds=None): - """Watch list callsign last seen is old compared to now? - - This tests to see if the last time we saw a callsign packet, - if that is older than the allowed timeout in the config. - - We put this here so any notification plugin can use this - same test. - """ - age = self.age(callsign) - - delta = utils.parse_delta_str(age) - d = datetime.timedelta(**delta) - - max_delta = self.max_delta(seconds=seconds) - - if d > max_delta: - return True - else: - return False - - -class SeenList(objectstore.ObjectStoreMixin): - """Global callsign seen list.""" - - _instance = None - lock = threading.Lock() - data = {} - config = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - if "config" in kwargs: - cls._instance.config = kwargs["config"] - cls._instance._init_store() - cls._instance.data = {} - return cls._instance - - @wrapt.synchronized(lock) - def update_seen(self, packet: Packet): - callsign = None - if packet.from_call: - callsign = packet.from_call - else: - LOG.warning(f"Can't find FROM in packet {packet}") - return - if callsign not in self.data: - self.data[callsign] = { - "last": None, - "count": 0, - } - self.data[callsign]["last"] = str(datetime.datetime.now()) - self.data[callsign]["count"] += 1 - - -TYPE_LOOKUP = { - PACKET_TYPE_WX: WeatherPacket, - PACKET_TYPE_MESSAGE: MessagePacket, - PACKET_TYPE_ACK: AckPacket, - PACKET_TYPE_MICE: MicEPacket, - PACKET_TYPE_STATUS: StatusPacket, - PACKET_TYPE_BEACON: GPSPacket, - PACKET_TYPE_UNKNOWN: Packet, -} - - -def get_packet_type(packet: dict): - """Decode the packet type from the packet.""" - - format = packet.get("format", None) - msg_response = packet.get("response", None) - packet_type = "unknown" - if format == "message" and msg_response == "ack": - packet_type = PACKET_TYPE_ACK - elif format == "message": - packet_type = PACKET_TYPE_MESSAGE - elif format == "mic-e": - packet_type = PACKET_TYPE_MICE - elif format == "status": - packet_type = PACKET_TYPE_STATUS - elif format == PACKET_TYPE_BEACON: - packet_type = PACKET_TYPE_BEACON - elif format == PACKET_TYPE_UNCOMPRESSED: - if packet.get("symbol", None) == "_": - packet_type = PACKET_TYPE_WX - return packet_type - - -def is_message_packet(packet): - return get_packet_type(packet) == PACKET_TYPE_MESSAGE - - -def is_ack_packet(packet): - return get_packet_type(packet) == PACKET_TYPE_ACK - - -def is_mice_packet(packet): - return get_packet_type(packet) == PACKET_TYPE_MICE diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py new file mode 100644 index 0000000..2edb578 --- /dev/null +++ b/aprsd/packets/__init__.py @@ -0,0 +1,8 @@ +from aprsd.packets.core import ( + AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket, + StatusPacket, WeatherPacket, +) +from aprsd.packets.packet_list import PacketList +from aprsd.packets.seen_list import SeenList +from aprsd.packets.tracker import PacketTrack +from aprsd.packets.watch_list import WatchList diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py new file mode 100644 index 0000000..383f039 --- /dev/null +++ b/aprsd/packets/core.py @@ -0,0 +1,320 @@ +import abc +from dataclasses import asdict, dataclass, field +import logging +import re +import time +# Due to a failure in python 3.8 +from typing import List + +import dacite + +from aprsd import client, stats +from aprsd.threads import tx +from aprsd.utils import counter + + +LOG = logging.getLogger("APRSD") + +PACKET_TYPE_MESSAGE = "message" +PACKET_TYPE_ACK = "ack" +PACKET_TYPE_MICE = "mic-e" +PACKET_TYPE_WX = "weather" +PACKET_TYPE_UNKNOWN = "unknown" +PACKET_TYPE_STATUS = "status" +PACKET_TYPE_BEACON = "beacon" +PACKET_TYPE_UNCOMPRESSED = "uncompressed" + + +@dataclass() +class Packet(metaclass=abc.ABCMeta): + from_call: str + to_call: str + addresse: str = None + format: str = None + msgNo: str = None # noqa: N815 + packet_type: str = None + timestamp: float = field(default_factory=time.time) + raw: str = None + _raw_dict: dict = field(repr=False, default_factory=lambda: {}) + _retry_count = 3 + _last_send_time = 0 + _last_send_attempt = 0 + # Do we allow this packet to be saved to send later? + _allow_delay = True + + _transport = None + _raw_message = None + + def get(self, key, default=None): + """Emulate a getter on a dict.""" + if hasattr(self, key): + return getattr(self, key) + else: + return default + + def _init_for_send(self): + """Do stuff here that is needed prior to sending over the air.""" + if not self.msgNo: + c = counter.PacketCounter() + c.increment() + self.msgNo = c.value + + # now build the raw message for sending + self._build_raw() + + def _build_raw(self): + """Build the self.raw string which is what is sent over the air.""" + self.raw = self._filter_for_send().rstrip("\n") + + @staticmethod + def factory(raw_packet): + raw = raw_packet + raw["_raw_dict"] = raw.copy() + translate_fields = { + "from": "from_call", + "to": "to_call", + } + # First translate some fields + for key in translate_fields: + if key in raw: + raw[translate_fields[key]] = raw[key] + del raw[key] + + if "addresse" in raw: + raw["to_call"] = raw["addresse"] + + packet_type = get_packet_type(raw) + raw["packet_type"] = packet_type + class_name = TYPE_LOOKUP[packet_type] + if packet_type == PACKET_TYPE_UNKNOWN: + # Try and figure it out here + if "latitude" in raw: + class_name = GPSPacket + + if packet_type == PACKET_TYPE_WX: + # the weather information is in a dict + # this brings those values out to the outer dict + for key in raw["weather"]: + raw[key] = raw["weather"][key] + + return dacite.from_dict(data_class=class_name, data=raw) + + def log(self, header=None): + """LOG a packet to the logfile.""" + asdict(self) + log_list = ["\n"] + if header: + if isinstance(self, AckPacket): + log_list.append( + f"{header} ___________" + f"(TX:{self._send_count} of {self._retry_count})", + ) + else: + log_list.append(f"{header} _______________") + log_list.append(f" Packet : {self.__class__.__name__}") + log_list.append(f" Raw : {self.raw}") + if self.to_call: + log_list.append(f" To : {self.to_call}") + if self.from_call: + log_list.append(f" From : {self.from_call}") + if hasattr(self, "path") and self.path: + log_list.append(f" Path : {'=>'.join(self.path)}") + if hasattr(self, "via") and self.via: + log_list.append(f" VIA : {self.via}") + + elif isinstance(self, MessagePacket): + log_list.append(f" Message : {self.message_text}") + + if hasattr(self, "comment") and self.comment: + log_list.append(f" Comment : {self.comment}") + + if self.msgNo: + log_list.append(f" Msg # : {self.msgNo}") + log_list.append(f"{header} _______________ Complete") + + LOG.info("\n".join(log_list)) + LOG.debug(self) + + def _filter_for_send(self) -> str: + """Filter and format message string for FCC.""" + # max? ftm400 displays 64, raw msg shows 74 + # and ftm400-send is max 64. setting this to + # 67 displays 64 on the ftm400. (+3 {01 suffix) + # feature req: break long ones into two msgs + message = self.raw[:67] + # We all miss George Carlin + return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) + + def send(self): + """Method to send a packet.""" + LOG.warning("send() called!") + self._init_for_send() + thread = tx.SendPacketThread(packet=self) + LOG.warning(f"Starting thread to TX {self}") + thread.start() + LOG.warning("Thread started") + + def send_direct(self, aprsis_client=None): + """Send the message in the same thread as caller.""" + self._init_for_send() + if aprsis_client: + cl = aprsis_client + else: + cl = client.factory.create().client + self.log(header="Sending Message Direct") + cl.send(self.raw) + stats.APRSDStats().msgs_tx_inc() + + +@dataclass() +class PathPacket(Packet): + path: List[str] = field(default_factory=list) + via: str = None + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class AckPacket(PathPacket): + response: str = None + _send_count = 1 + + def _build_raw(self): + """Build the self.raw which is what is sent over the air.""" + self.raw = "{}>APZ100::{}:ack{}".format( + self.from_call, + self.to_call.ljust(9), + self.msgNo, + ) + + def send(self): + """Method to send a packet.""" + self._init_for_send() + thread = tx.SendAckThread(packet=self) + LOG.warning(f"Starting thread to TXACK {self}") + thread.start() + + +@dataclass() +class MessagePacket(PathPacket): + message_text: str = None + + def _filter_for_send(self) -> str: + """Filter and format message string for FCC.""" + # max? ftm400 displays 64, raw msg shows 74 + # and ftm400-send is max 64. setting this to + # 67 displays 64 on the ftm400. (+3 {01 suffix) + # feature req: break long ones into two msgs + message = self.message_text[:67] + # We all miss George Carlin + return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) + + def _build_raw(self): + """Build the self.raw which is what is sent over the air.""" + self.raw = "{}>APZ100::{}:{}{{{}".format( + self.from_call, + self.to_call.ljust(9), + self._filter_for_send().rstrip("\n"), + str(self.msgNo), + ) + + +@dataclass() +class StatusPacket(PathPacket): + status: str = None + timestamp: int = 0 + messagecapable: bool = False + comment: str = None + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class GPSPacket(PathPacket): + latitude: float = 0.00 + longitude: float = 0.00 + altitude: float = 0.00 + rng: float = 0.00 + posambiguity: int = 0 + timestamp: int = 0 + comment: str = None + symbol: str = None + symbol_table: str = None + speed: float = 0.00 + course: int = 0 + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class MicEPacket(GPSPacket): + messagecapable: bool = False + mbits: str = None + mtype: str = None + + def _build_raw(self): + raise NotImplementedError + + +@dataclass() +class WeatherPacket(GPSPacket): + symbol: str = "_" + wind_gust: float = 0.00 + temperature: float = 0.00 + rain_1h: float = 0.00 + rain_24h: float = 0.00 + rain_since_midnight: float = 0.00 + humidity: int = 0 + pressure: float = 0.00 + comment: str = None + + def _build_raw(self): + raise NotImplementedError + + +TYPE_LOOKUP = { + PACKET_TYPE_WX: WeatherPacket, + PACKET_TYPE_MESSAGE: MessagePacket, + PACKET_TYPE_ACK: AckPacket, + PACKET_TYPE_MICE: MicEPacket, + PACKET_TYPE_STATUS: StatusPacket, + PACKET_TYPE_BEACON: GPSPacket, + PACKET_TYPE_UNKNOWN: Packet, +} + + +def get_packet_type(packet: dict): + """Decode the packet type from the packet.""" + + pkt_format = packet.get("format", None) + msg_response = packet.get("response", None) + packet_type = "unknown" + if pkt_format == "message" and msg_response == "ack": + packet_type = PACKET_TYPE_ACK + elif pkt_format == "message": + packet_type = PACKET_TYPE_MESSAGE + elif pkt_format == "mic-e": + packet_type = PACKET_TYPE_MICE + elif pkt_format == "status": + packet_type = PACKET_TYPE_STATUS + elif pkt_format == PACKET_TYPE_BEACON: + packet_type = PACKET_TYPE_BEACON + elif pkt_format == PACKET_TYPE_UNCOMPRESSED: + if packet.get("symbol", None) == "_": + packet_type = PACKET_TYPE_WX + return packet_type + + +def is_message_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MESSAGE + + +def is_ack_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_ACK + + +def is_mice_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MICE diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py new file mode 100644 index 0000000..89c1fd4 --- /dev/null +++ b/aprsd/packets/packet_list.py @@ -0,0 +1,60 @@ +import logging +import threading +import time + +import wrapt + +from aprsd import utils +from aprsd.packets import seen_list + + +LOG = logging.getLogger("APRSD") + + +class PacketList: + """Class to track all of the packets rx'd and tx'd by aprsd.""" + + _instance = None + lock = threading.Lock() + config = None + + packet_list = utils.RingBuffer(1000) + + total_recv = 0 + total_tx = 0 + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.config = kwargs["config"] + return cls._instance + + def __init__(self, config=None): + if config: + self.config = config + + @wrapt.synchronized(lock) + def __iter__(self): + return iter(self.packet_list) + + @wrapt.synchronized(lock) + def add(self, packet): + packet.ts = time.time() + if packet.from_call == self.config["aprs"]["login"]: + self.total_tx += 1 + else: + self.total_recv += 1 + self.packet_list.append(packet) + seen_list.SeenList().update_seen(packet) + + @wrapt.synchronized(lock) + def get(self): + return self.packet_list.get() + + @wrapt.synchronized(lock) + def total_received(self): + return self.total_recv + + @wrapt.synchronized(lock) + def total_sent(self): + return self.total_tx diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py new file mode 100644 index 0000000..3c9e1bf --- /dev/null +++ b/aprsd/packets/seen_list.py @@ -0,0 +1,44 @@ +import datetime +import logging +import threading + +import wrapt + +from aprsd.utils import objectstore + + +LOG = logging.getLogger("APRSD") + + +class SeenList(objectstore.ObjectStoreMixin): + """Global callsign seen list.""" + + _instance = None + lock = threading.Lock() + data = {} + config = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + if "config" in kwargs: + cls._instance.config = kwargs["config"] + cls._instance._init_store() + cls._instance.data = {} + return cls._instance + + @wrapt.synchronized(lock) + def update_seen(self, packet): + callsign = None + if packet.from_call: + callsign = packet.from_call + else: + LOG.warning(f"Can't find FROM in packet {packet}") + return + if callsign not in self.data: + self.data[callsign] = { + "last": None, + "count": 0, + } + self.data[callsign]["last"] = str(datetime.datetime.now()) + self.data[callsign]["count"] += 1 diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py new file mode 100644 index 0000000..82ff3f3 --- /dev/null +++ b/aprsd/packets/tracker.py @@ -0,0 +1,116 @@ +import datetime +import threading + +import wrapt + +from aprsd import stats +from aprsd.utils import objectstore + + +class PacketTrack(objectstore.ObjectStoreMixin): + """Class to keep track of outstanding text messages. + + This is a thread safe class that keeps track of active + messages. + + When a message is asked to be sent, it is placed into this + class via it's id. The TextMessage class's send() method + automatically adds itself to this class. When the ack is + recieved from the radio, the message object is removed from + this class. + """ + + _instance = None + _start_time = None + lock = threading.Lock() + + data = {} + total_tracked = 0 + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._start_time = datetime.datetime.now() + cls._instance.config = kwargs["config"] + cls._instance._init_store() + return cls._instance + + @wrapt.synchronized(lock) + def __getitem__(self, name): + return self.data[name] + + @wrapt.synchronized(lock) + def __iter__(self): + return iter(self.data) + + @wrapt.synchronized(lock) + def keys(self): + return self.data.keys() + + @wrapt.synchronized(lock) + def items(self): + return self.data.items() + + @wrapt.synchronized(lock) + def values(self): + return self.data.values() + + @wrapt.synchronized(lock) + def __len__(self): + return len(self.data) + + @wrapt.synchronized(lock) + def __str__(self): + result = "{" + for key in self.data.keys(): + result += f"{key}: {str(self.data[key])}, " + result += "}" + return result + + @wrapt.synchronized(lock) + def add(self, packet): + key = int(packet.msgNo) + self.data[key] = packet + stats.APRSDStats().msgs_tracked_inc() + self.total_tracked += 1 + + @wrapt.synchronized(lock) + def get(self, id): + if id in self.data: + return self.data[id] + + @wrapt.synchronized(lock) + def remove(self, id): + key = int(id) + if key in self.data.keys(): + del self.data[key] + + def restart(self): + """Walk the list of messages and restart them if any.""" + for key in self.data.keys(): + pkt = self.data[key] + if pkt.last_send_attempt < pkt.retry_count: + pkt.send() + + def _resend(self, packet): + packet._last_send_attempt = 0 + packet.send() + + def restart_delayed(self, count=None, most_recent=True): + """Walk the list of delayed messages and restart them if any.""" + if not count: + # Send all the delayed messages + for key in self.data.keys(): + pkt = self.data[key] + if pkt._last_send_attempt == pkt._retry_count: + self._resend(pkt) + else: + # They want to resend delayed messages + tmp = sorted( + self.data.items(), + reverse=most_recent, + key=lambda x: x[1].last_send_time, + ) + pkt_list = tmp[:count] + for (_key, pkt) in pkt_list: + self._resend(pkt) diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py new file mode 100644 index 0000000..54c3995 --- /dev/null +++ b/aprsd/packets/watch_list.py @@ -0,0 +1,103 @@ +import datetime +import logging +import threading + +import wrapt + +from aprsd import utils +from aprsd.utils import objectstore + + +LOG = logging.getLogger("APRSD") + + +class WatchList(objectstore.ObjectStoreMixin): + """Global watch list and info for callsigns.""" + + _instance = None + lock = threading.Lock() + data = {} + config = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + if "config" in kwargs: + cls._instance.config = kwargs["config"] + cls._instance._init_store() + cls._instance.data = {} + return cls._instance + + def __init__(self, config=None): + if config: + self.config = config + + ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10) + + for callsign in config["aprsd"]["watch_list"].get("callsigns", []): + call = callsign.replace("*", "") + # FIXME(waboring) - we should fetch the last time we saw + # a beacon from a callsign or some other mechanism to find + # last time a message was seen by aprs-is. For now this + # is all we can do. + self.data[call] = { + "last": datetime.datetime.now(), + "packets": utils.RingBuffer( + ring_size, + ), + } + + def is_enabled(self): + if self.config and "watch_list" in self.config["aprsd"]: + return self.config["aprsd"]["watch_list"].get("enabled", False) + else: + return False + + def callsign_in_watchlist(self, callsign): + return callsign in self.data + + @wrapt.synchronized(lock) + def update_seen(self, packet): + if packet.addresse: + callsign = packet.addresse + else: + callsign = packet.from_call + if self.callsign_in_watchlist(callsign): + self.data[callsign]["last"] = datetime.datetime.now() + self.data[callsign]["packets"].append(packet) + + def last_seen(self, callsign): + if self.callsign_in_watchlist(callsign): + return self.data[callsign]["last"] + + def age(self, callsign): + now = datetime.datetime.now() + return str(now - self.last_seen(callsign)) + + def max_delta(self, seconds=None): + watch_list_conf = self.config["aprsd"]["watch_list"] + if not seconds: + seconds = watch_list_conf["alert_time_seconds"] + max_timeout = {"seconds": seconds} + return datetime.timedelta(**max_timeout) + + def is_old(self, callsign, seconds=None): + """Watch list callsign last seen is old compared to now? + + This tests to see if the last time we saw a callsign packet, + if that is older than the allowed timeout in the config. + + We put this here so any notification plugin can use this + same test. + """ + age = self.age(callsign) + + delta = utils.parse_delta_str(age) + d = datetime.timedelta(**delta) + + max_delta = self.max_delta(seconds=seconds) + + if d > max_delta: + return True + else: + return False diff --git a/aprsd/plugin.py b/aprsd/plugin.py index a62ab9d..885740b 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -13,7 +13,8 @@ import pluggy from thesmuggler import smuggle import aprsd -from aprsd import client, messaging, packets, threads +from aprsd import client, messaging, threads +from aprsd.packets import watch_list # setup the global logger @@ -119,11 +120,11 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): thread.stop() @abc.abstractmethod - def filter(self, packet: packets.Packet): + def filter(self, packet): pass @abc.abstractmethod - def process(self, packet: packets.Packet): + def process(self, packet): """This is called when the filter passes.""" @@ -160,10 +161,10 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): LOG.warning("Watch list enabled, but no callsigns set.") @hookimpl - def filter(self, packet: packets.Packet): + def filter(self, packet): result = messaging.NULL_MESSAGE if self.enabled: - wl = packets.WatchList() + wl = watch_list.WatchList() if wl.callsign_in_watchlist(packet.from_call): # packet is from a callsign in the watch list self.rx_inc() @@ -212,7 +213,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): self.enabled = True @hookimpl - def filter(self, packet: packets.MessagePacket): + def filter(self, packet): result = None message = packet.get("message_text", None) @@ -272,7 +273,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase): def help(self): return "Help: send APRS help or help " - def process(self, packet: packets.MessagePacket): + def process(self, packet): LOG.info("HelpPlugin") # fromcall = packet.get("from") message = packet.message_text @@ -475,7 +476,7 @@ class PluginManager: self._load_plugin(p_name) LOG.info("Completed Plugin Loading.") - def run(self, packet: packets.Packet): + def run(self, packet): """Execute all the pluguns run method.""" with self.lock: return self._pluggy_pm.hook.filter(packet=packet) diff --git a/aprsd/plugins/notify.py b/aprsd/plugins/notify.py index 2a9e7ea..d75a19c 100644 --- a/aprsd/plugins/notify.py +++ b/aprsd/plugins/notify.py @@ -1,7 +1,6 @@ import logging from aprsd import messaging, packets, plugin -from aprsd.utils import trace LOG = logging.getLogger("APRSD") @@ -18,7 +17,6 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase): short_description = "Notify me when a CALLSIGN is recently seen on APRS-IS" - @trace.trace def process(self, packet: packets.MessagePacket): LOG.info("NotifySeenPlugin") diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index 819ee7f..5673fa4 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -39,6 +39,8 @@ class APRSDThreadList: """Iterate over all threads and call stop on them.""" for th in self.threads_list: LOG.info(f"Stopping Thread {th.name}") + if hasattr(th, "packet"): + LOG.info(F"{th.name} packet {th.packet}") th.stop() @wrapt.synchronized(lock) diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 95c7ea0..cfbdbc9 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -3,7 +3,7 @@ import logging import time import tracemalloc -from aprsd import client, messaging, packets, stats, utils +from aprsd import client, packets, stats, utils from aprsd.threads import APRSDThread, APRSDThreadList @@ -23,7 +23,7 @@ class KeepAliveThread(APRSDThread): def loop(self): if self.cntr % 60 == 0: - tracker = messaging.MsgTrack() + pkt_tracker = packets.PacketTrack() stats_obj = stats.APRSDStats() pl = packets.PacketList() thread_list = APRSDThreadList() @@ -53,7 +53,7 @@ class KeepAliveThread(APRSDThread): utils.strfdelta(stats_obj.uptime), pl.total_recv, pl.total_tx, - len(tracker), + len(pkt_tracker), stats_obj.msgs_tx, stats_obj.msgs_rx, last_msg_time, diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index dfa7c80..fad4096 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -64,12 +64,10 @@ class APRSDPluginRXThread(APRSDRXThread): processing in the PluginProcessPacketThread. """ def process_packet(self, *args, **kwargs): - raw = self._client.decode_packet(*args, **kwargs) + packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) - packet = packets.Packet.factory(raw.copy()) + #packet = packets.Packet.factory(raw.copy()) packet.log(header="RX Packet") - # LOG.debug(packet) - del raw thread = APRSDPluginProcessPacketThread( config=self.config, packet=packet, @@ -90,24 +88,20 @@ class APRSDProcessPacketThread(APRSDThread): self.packet = packet name = self.packet.raw[:10] super().__init__(f"RXPKT-{name}") + self._loop_cnt = 1 def process_ack_packet(self, packet): ack_num = packet.msgNo LOG.info(f"Got ack for message {ack_num}") - messaging.log_message( - "RXACK", - packet.raw, - None, - ack=ack_num, - fromcall=packet.from_call, - ) - tracker = messaging.MsgTrack() - tracker.remove(ack_num) + packet.log("RXACK") + pkt_tracker = packets.PacketTrack() + pkt_tracker.remove(ack_num) stats.APRSDStats().ack_rx_inc() return def loop(self): """Process a packet received from aprs-is server.""" + LOG.debug(f"RXPKT-LOOP {self._loop_cnt}") packet = self.packet packets.PacketList().add(packet) our_call = self.config["aprsd"]["callsign"].lower() @@ -136,12 +130,20 @@ class APRSDProcessPacketThread(APRSDThread): stats.APRSDStats().msgs_rx_inc() # let any threads do their thing, then ack # send an ack last - ack = messaging.AckMessage( - self.config["aprsd"]["callsign"], - from_call, - msg_id=msg_id, + ack_pkt = packets.AckPacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + msgNo=msg_id, ) - ack.send() + LOG.warning(f"Send AckPacket {ack_pkt}") + ack_pkt.send() + LOG.warning("Send ACK called Continue on") + #ack = messaging.AckMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # msg_id=msg_id, + #) + #ack.send() self.process_our_message_packet(packet) else: @@ -151,7 +153,8 @@ class APRSDProcessPacketThread(APRSDThread): self.process_other_packet( packet, for_us=(to_call.lower() == our_call), ) - LOG.debug("Packet processing complete") + LOG.debug("Packet processing complete") + return False @abc.abstractmethod def process_our_message_packet(self, *args, **kwargs): @@ -194,16 +197,29 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): if isinstance(subreply, messaging.Message): subreply.send() else: - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - subreply, + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=subreply, ) - msg.send() + msg_pkt.send() + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # subreply, + #) + #msg.send() elif isinstance(reply, messaging.Message): # We have a message based object. LOG.debug(f"Sending '{reply}'") - reply.send() + # Convert this to the new packet + msg_pkt = packets.MessagePacket( + from_call=reply.fromcall, + to_call=reply.tocall, + message_text=reply._raw_message, + ) + #reply.send() + msg_pkt.send() replied = True else: replied = True @@ -213,33 +229,55 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): # usage string if reply is not messaging.NULL_MESSAGE: LOG.debug(f"Sending '{reply}'") - - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - reply, + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, ) - msg.send() + LOG.warning("Calling msg_pkg.send()") + msg_pkt.send() + LOG.warning("Calling msg_pkg.send() --- DONE") + + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # reply, + #) + #msg.send() # If the message was for us and we didn't have a # response, then we send a usage statement. if to_call == self.config["aprsd"]["callsign"] and not replied: LOG.warning("Sending help!") - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - "Unknown command! Send 'help' message for help", + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text="Unknown command! Send 'help' message for help", ) - msg.send() + msg_pkt.send() + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # "Unknown command! Send 'help' message for help", + #) + #msg.send() except Exception as ex: LOG.error("Plugin failed!!!") LOG.exception(ex) # Do we need to send a reply? if to_call == self.config["aprsd"]["callsign"]: reply = "A Plugin failed! try again?" - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - from_call, - reply, + msg_pkt = packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, ) - msg.send() + msg_pkt.send() + #msg = messaging.TextMessage( + # self.config["aprsd"]["callsign"], + # from_call, + # reply, + #) + #msg.send() + + LOG.debug("Completed process_our_message_packet") diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py new file mode 100644 index 0000000..8df838c --- /dev/null +++ b/aprsd/threads/tx.py @@ -0,0 +1,120 @@ +import datetime +import logging +import time + +from aprsd import client, stats +from aprsd import threads as aprsd_threads +from aprsd.packets import packet_list, tracker + + +LOG = logging.getLogger("APRSD") + + +class SendPacketThread(aprsd_threads.APRSDThread): + def __init__(self, packet): + self.packet = packet + name = self.packet.raw[:5] + super().__init__(f"TXPKT-{self.packet.msgNo}-{name}") + pkt_tracker = tracker.PacketTrack() + pkt_tracker.add(packet) + + def loop(self): + LOG.debug("TX Loop") + """Loop until a message is acked or it gets delayed. + + We only sleep for 5 seconds between each loop run, so + that CTRL-C can exit the app in a short period. Each sleep + means the app quitting is blocked until sleep is done. + So we keep track of the last send attempt and only send if the + last send attempt is old enough. + + """ + pkt_tracker = tracker.PacketTrack() + # lets see if the message is still in the tracking queue + packet = pkt_tracker.get(self.packet.msgNo) + if not packet: + # The message has been removed from the tracking queue + # So it got acked and we are done. + LOG.info("Message Send Complete via Ack.") + return False + else: + send_now = False + if packet._last_send_attempt == packet._retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Message Send Complete. Max attempts reached.") + if not packet._allow_delay: + pkt_tracker.remove(packet.msgNo) + return False + + # Message is still outstanding and needs to be acked. + if packet._last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + sleeptime = (packet._last_send_attempt + 1) * 31 + delta = now - packet._last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + send_now = True + + if send_now: + # no attempt time, so lets send it, and start + # tracking the time. + packet.log("Sending Message") + cl = client.factory.create().client + cl.send(packet.raw) + stats.APRSDStats().msgs_tx_inc() + packet_list.PacketList().add(packet) + packet._last_send_time = datetime.datetime.now() + packet._last_send_attempt += 1 + + time.sleep(5) + # Make sure we get called again. + return True + + +class SendAckThread(aprsd_threads.APRSDThread): + def __init__(self, packet): + self.packet = packet + super().__init__(f"SendAck-{self.packet.msgNo}") + self._loop_cnt = 1 + + def loop(self): + """Separate thread to send acks with retries.""" + send_now = False + if self.packet._last_send_attempt == self.packet._retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Ack Send Complete. Max attempts reached.") + return False + + if self.packet._last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + sleeptime = 31 + delta = now - self.packet._last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + elif self._loop_cnt % 5 == 0: + LOG.debug(f"Still wating. {delta}") + else: + send_now = True + + if send_now: + cl = client.factory.create().client + self.packet.log("Sending ACK") + cl.send(self.packet.raw) + self.packet._send_count += 1 + stats.APRSDStats().ack_tx_inc() + packet_list.PacketList().add(self.packet) + self.packet._last_send_attempt += 1 + self.packet._last_send_time = datetime.datetime.now() + time.sleep(1) + self._loop_cnt += 1 + return True diff --git a/aprsd/utils/counter.py b/aprsd/utils/counter.py new file mode 100644 index 0000000..e423bfb --- /dev/null +++ b/aprsd/utils/counter.py @@ -0,0 +1,48 @@ +from multiprocessing import RawValue +import threading + +import wrapt + + +class PacketCounter: + """ + Global Packet id counter class. + + This is a singleton based class that keeps + an incrementing counter for all packets to + be sent. All new Packet objects gets a new + message id, which is the next number available + from the PacketCounter. + + """ + + _instance = None + max_count = 9999 + lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + """Make this a singleton class.""" + if cls._instance is None: + cls._instance = super().__new__(cls, *args, **kwargs) + cls._instance.val = RawValue("i", 1) + return cls._instance + + @wrapt.synchronized(lock) + def increment(self): + if self.val.value == self.max_count: + self.val.value = 1 + else: + self.val.value += 1 + + @property + @wrapt.synchronized(lock) + def value(self): + return self.val.value + + @wrapt.synchronized(lock) + def __repr__(self): + return str(self.val.value) + + @wrapt.synchronized(lock) + def __str__(self): + return str(self.val.value) diff --git a/tox.ini b/tox.ini index 84b8b24..c0fe0c1 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ minversion = 2.9.0 skipdist = True skip_missing_interpreters = true -envlist = pep8,py{38,39} +envlist = pep8,py{39,310} #requires = tox-pipenv # pip==22.0.4 # pip-tools==5.4.0