From 9768003c2a384389a0b6427a7e761efc5675a2ea Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 23 Dec 2020 13:12:04 -0500 Subject: [PATCH 1/8] Reworked messaging lib This patch updates the messaging lib to use Message Objects for each message type (text, ack) that know how to send themselves with the same interface. --- aprsd/main.py | 128 +++++++++++----- aprsd/messaging.py | 359 ++++++++++++++++++++++++++------------------- 2 files changed, 297 insertions(+), 190 deletions(-) diff --git a/aprsd/main.py b/aprsd/main.py index d2905c5..c0aa32b 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -21,11 +21,15 @@ # # python included libs +import concurrent.futures +import functools import logging import os +import queue import random import signal import sys +import threading import time from logging import NullHandler from logging.handlers import RotatingFileHandler @@ -53,6 +57,10 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) +# Global threading event to trigger stopping all threads +# When user quits app via CTRL-C +event = None + # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin # USER = "KM6XXX-9" # callsign of this aprs client with SSID @@ -140,11 +148,10 @@ def install(append, case_insensitive, shell, path): def signal_handler(signal, frame): - LOG.info("Ctrl+C, exiting.") - # sys.exit(0) # thread ignores this - os._exit(0) - + global event + LOG.info("Ctrl+C, Sending all threads exit!") + sys.exit(0) # thread ignores this # end signal_handler @@ -172,7 +179,7 @@ def setup_logging(config, loglevel, quiet): LOG.addHandler(sh) -def process_ack_packet(packet): +def process_ack_packet(packet, config): ack_num = packet.get("msgNo") LOG.info("Got ack for message {}".format(ack_num)) messaging.log_message( @@ -182,32 +189,30 @@ def process_ack_packet(packet): return -def process_mic_e_packet(packet): +def process_mic_e_packet(packet, config): LOG.info("Mic-E Packet detected. Currenlty unsupported.") messaging.log_packet(packet) return -def process_message_packet(packet): +def process_message_packet(packet, config, tx_msg_queue): LOG.info("Got a message packet") fromcall = packet["from"] message = packet.get("message_text", None) - msg_number = packet.get("msgNo", None) - if msg_number: - ack = msg_number - else: - ack = "0" + msg_id = packet.get("msgNo", None) + if not msg_id: + msg_id = "0" messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=ack + "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id ) found_command = False # Get singleton of the PM pm = plugin.PluginManager() try: - results = pm.run(fromcall=fromcall, message=message, ack=ack) + results = pm.run(fromcall=fromcall, message=message, ack=msg_id) for reply in results: found_command = True # A plugin can return a null message flag which signals @@ -215,7 +220,11 @@ def process_message_packet(packet): # nothing to reply with, so we avoid replying with a usage string if reply is not messaging.NULL_MESSAGE: LOG.debug("Sending '{}'".format(reply)) - messaging.send_message(fromcall, reply) + + # msg = {"fromcall": fromcall, "msg": reply} + msg = messaging.TextMessage(config["aprs"]["login"], + fromcall, reply) + tx_msg_queue.put(msg) else: LOG.debug("Got NULL MESSAGE from plugin") @@ -225,22 +234,29 @@ def process_message_packet(packet): names.sort() reply = "Usage: {}".format(", ".join(names)) - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(config["aprs"]["login"], + fromcall, reply) + tx_msg_queue.put(msg) except Exception as ex: LOG.exception("Plugin failed!!!", ex) reply = "A Plugin failed! try again?" - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(config["aprs"]["login"], + fromcall, reply) + tx_msg_queue.put(msg) # let any threads do their thing, then ack # send an ack last - messaging.send_ack(fromcall, ack) + ack = messaging.AckMessage(config["aprs"]["login"], fromcall, msg_id=msg_id) + ack.send() LOG.debug("Packet processing complete") -def process_packet(packet): +def process_packet(packet, config=None, msg_queues=None, event=None): """Process a packet recieved from aprs-is server.""" - LOG.debug("Process packet!") + LOG.debug("Process packet! {}".format(msg_queues)) try: LOG.debug("Got message: {}".format(packet)) @@ -250,15 +266,15 @@ def process_packet(packet): if msg_format == "message" and msg: # we want to send the message through the # plugins - process_message_packet(packet) + process_message_packet(packet, config, msg_queues["tx"]) return elif msg_response == "ack": - process_ack_packet(packet) + process_ack_packet(packet, config) return if msg_format == "mic-e": # process a mic-e packet - process_mic_e_packet(packet) + process_mic_e_packet(packet, config) return except (aprslib.ParseError, aprslib.UnknownFormat) as exp: @@ -350,17 +366,15 @@ def send_message( message = packet.get("message_text", None) LOG.info("We got a new message") fromcall = packet["from"] - msg_number = packet.get("msgNo", None) - if msg_number: - ack = msg_number - else: - ack = "0" + msg_number = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=ack + "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_number ) got_response = True # Send the ack back? - messaging.send_ack_direct(fromcall, ack) + ack = messaging.AckMessage(config["aprs"]["login"], + fromcall, msg_id=msg_number) + ack.send_direct() if got_ack and got_response: sys.exit(0) @@ -372,7 +386,9 @@ def send_message( # We should get an ack back as well as a new message # we should bail after we get the ack and send an ack back for the # message - messaging.send_message_direct(tocallsign, command, message_number) + msg = messaging.TextMessage(aprs_login, tocallsign, command) + msg.send_direct() + #messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -389,6 +405,16 @@ def send_message( cl.reset() +def tx_msg_thread(msg_queues=None, event=None): + """Thread to handle sending any messages outbound.""" + LOG.info("TX_MSG_THREAD") + while not event.is_set() or not msg_queues["tx"].empty(): + msg = msg_queues["tx"].get() + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + #messaging.send_message(msg["fromcall"], msg["msg"]) + + # main() ### @main.command() @click.option( @@ -418,7 +444,9 @@ def send_message( ) def server(loglevel, quiet, disable_validation, config_file): """Start the aprsd server process.""" + global event + event = threading.Event() signal.signal(signal.SIGINT, signal_handler) click.echo("Load config") @@ -429,7 +457,6 @@ def server(loglevel, quiet, disable_validation, config_file): # Accept the config as a constructor param, instead of this # hacky global setting email.CONFIG = config - messaging.CONFIG = config setup_logging(config, loglevel, quiet) LOG.info("APRSD Started version: {}".format(aprsd.__version__)) @@ -449,7 +476,22 @@ def server(loglevel, quiet, disable_validation, config_file): plugin_manager.setup_plugins() cl = client.Client(config) - # setup and run the main blocking loop + rx_msg_queue = queue.Queue(maxsize=20) + tx_msg_queue = queue.Queue(maxsize=20) + msg_queues = {"rx": rx_msg_queue, + "tx": tx_msg_queue,} + + rx_msg = threading.Thread( + target=rx_msg_thread, name="RX_msg", kwargs={'msg_queues':msg_queues, + 'event': event} + ) + tx_msg = threading.Thread( + target=tx_msg_thread, name="TX_msg", kwargs={'msg_queues':msg_queues, + 'event': event} + ) + rx_msg.start() + tx_msg.start() + while True: # Now use the helper which uses the singleton aprs_client = client.get_client() @@ -459,14 +501,30 @@ def server(loglevel, quiet, disable_validation, config_file): # This will register a packet consumer with aprslib # When new packets come in the consumer will process # the packet - aprs_client.consumer(process_packet, raw=False) + + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + process_partial = functools.partial(process_packet, + msg_queues=msg_queues, + event=event, config=config) + aprs_client.consumer(process_partial, raw=False) except aprslib.exceptions.ConnectionDrop: LOG.error("Connection dropped, reconnecting") time.sleep(5) # Force the deletion of the client object connected to aprs # This will cause a reconnect, next time client.get_client() # is called - cl.reset() + client.Client().reset() + + + LOG.info("APRSD Exiting.") + tx_msg.join() + + sys.exit(0) + # setup and run the main blocking loop if __name__ == "__main__": diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 18e71d3..25c3c85 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,161 +1,239 @@ +import abc import logging +from multiprocessing import RawValue import pprint import re import threading import time +import uuid from aprsd import client LOG = logging.getLogger("APRSD") -CONFIG = None - -# current aprs radio message number, increments for each message we -# send over rf {int} -message_number = 0 # message_nubmer:ack combos so we stop sending a message after an # ack from radio {int:int} +# FIXME ack_dict = {} # What to return from a plugin if we have processed the message # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 +class MessageCounter(object): + """ + Global message id counter class. -def send_ack_thread(tocall, ack, retry_count): - cl = client.get_client() - tocall = tocall.ljust(9) # pad to nine chars - line = "{}>APRS::{}:ack{}\n".format(CONFIG["aprs"]["login"], tocall, ack) - for i in range(retry_count, 0, -1): - log_message( - "Sending ack", - line.rstrip("\n"), - None, - ack=ack, - tocall=tocall, - retry_number=i, - ) - cl.sendall(line) - # aprs duplicate detection is 30 secs? - # (21 only sends first, 28 skips middle) - time.sleep(31) - # end_send_ack_thread + This is a singleton based class that keeps + an incrementing counter for all messages to + be sent. All new Message objects gets a new + message id, which is the next number available + from the MessageCounter. + + """ + _instance = None + + def __new__(cls, *args, **kwargs): + """Make this a singleton class.""" + if cls._instance is None: + cls._instance = super(MessageCounter, cls).__new__(cls) + cls._instance.val = RawValue('i', 1) + cls._instance.lock = threading.Lock() + return cls._instance + + def increment(self): + with self.lock: + self.val.value += 1 + + @property + def value(self): + with self.lock: + return self.val.value + + def __repr__(self): + with self.lock: + return str(self.val.value) + + def __str__(self): + with self.lock: + return str(self.val.value) -def send_ack(tocall, ack): - LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack)) +class Message(object, metaclass=abc.ABCMeta): + """Base Message Class.""" + # The message id to send over the air + id = 0 + + # Unique identifier for this message + uuid = None + + sent = False + sent_time = None + acked = False + acked_time = None + retry_count = 3 - thread = threading.Thread( - target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count) - ) - thread.start() - # end send_ack() + + def __init__(self, fromcall, tocall, msg_id=None): + self.fromcall = fromcall + self.tocall = tocall + self.uuid = uuid.uuid4() + if not msg_id: + c = MessageCounter() + c.increment() + msg_id = c.value + self.id = msg_id + + @abc.abstractmethod + def send(self): + """Child class must declare.""" + pass -def send_ack_direct(tocall, ack): - """Send an ack message without a separate thread.""" - LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack)) - cl = client.get_client() - fromcall = CONFIG["aprs"]["login"] - line = "{}>APRS::{}:ack{}\n".format(fromcall, tocall, ack) - log_message( - "Sending ack", - line.rstrip("\n"), - None, - ack=ack, - tocall=tocall, - fromcall=fromcall, - ) - cl.sendall(line) +class TextMessage(Message): + """Send regular ARPS text/command messages/replies.""" + + message = None + + def __init__(self, fromcall, tocall, message): + super(TextMessage, self).__init__(fromcall, tocall) + self.message = message + + def __repr__(self): + """Build raw string to send over the air.""" + return "{}>APRS::{}:{}{{{}\n".format( + self.fromcall, self.tocall.ljust(9), + self._filter_for_send(), str(self.id),) + + def __str__(self): + return "From({}) TO({}) - Message({}): '{}'".format( + self.fromcall, self.tocall, + self.id, self.message + ) + + def ack(self): + """Build an Ack Message object from this object.""" + return AckMessage(self.fromcall, self.tocall, msg_id=self.id) + + def _filter_for_send(self): + """Filter and format message string for FCC.""" + # max? ftm400 displays 64, raw msg shows 74 + # and ftm400-send is max 64. setting this to + # 67 displays 64 on the ftm400. (+3 {01 suffix) + # feature req: break long ones into two msgs + message = self.message[:67] + # We all miss George Carlin + return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) + + def send_thread(self): + cl = client.get_client() + for i in range(self.retry_count, 0, -1): + LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") + LOG.debug(pprint.pformat(ack_dict)) + if ack_dict[self.id] != 1: + log_message( + "Sending Message", + repr(self).rstrip("\n"), + self.message, + tocall=self.tocall, + retry_number=i, + ) + # tn.write(line) + cl.sendall(repr(self)) + # decaying repeats, 31 to 93 second intervals + sleeptime = (self.retry_count - i + 1) * 31 + time.sleep(sleeptime) + else: + break + return + # end send_message_thread + + def send(self): + global ack_dict + + # TODO(Hemna) - Need a better metchanism for this. + # This can nuke an ack_dict while it's still being used. + # FIXME FIXME + if len(ack_dict) > 90: + # empty ack dict if it's really big, could result in key error later + LOG.debug( + "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) + ) + ack_dict.clear() + LOG.debug(pprint.pformat(ack_dict)) + LOG.debug( + "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict) + ) + ack_dict[self.id] = 0 # clear ack for this message number + + thread = threading.Thread(target=self.send_thread, name="send_message") + thread.start() + + def send_direct(self): + """Send a message without a separate thread.""" + cl = client.get_client() + log_message( + "Sending Message Direct", repr(self).rstrip("\n"), self.message, tocall=self.tocall, + fromcall=self.fromcall + ) + cl.sendall(repr(self)) -def send_message_thread(tocall, message, this_message_number, retry_count): - cl = client.get_client() - line = "{}>APRS::{}:{}{{{}\n".format( - CONFIG["aprs"]["login"], - tocall, - message, - str(this_message_number), - ) - for i in range(retry_count, 0, -1): - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - LOG.debug(pprint.pformat(ack_dict)) - if ack_dict[this_message_number] != 1: +class AckMessage(Message): + """Class for building Acks and sending them.""" + + + def __init__(self, fromcall, tocall, msg_id): + super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id) + + def __repr__(self): + return "{}>APRS::{}:ack{}\n".format( + self.fromcall, self.tocall.ljust(9), self.id) + + def __str__(self): + return "From({}) TO({}) Ack ({})".format( + self.fromcall, self.tocall, + self.id + ) + + def send_thread(self): + """Separate thread to send acks with retries.""" + cl = client.get_client() + for i in range(self.retry_count, 0, -1): log_message( - "Sending Message", - line.rstrip("\n"), - message, - tocall=tocall, + "Sending ack", + repr(self).rstrip("\n"), + None, + ack=self.id, + tocall=self.tocall, retry_number=i, ) - # tn.write(line) - cl.sendall(line) - # decaying repeats, 31 to 93 second intervals - sleeptime = (retry_count - i + 1) * 31 - time.sleep(sleeptime) - else: - break - return - # end send_message_thread + cl.sendall(repr(self)) + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + time.sleep(31) + # end_send_ack_thread - -def send_message(tocall, message): - global message_number - global ack_dict - - retry_count = 3 - if message_number > 98: # global - message_number = 0 - message_number += 1 - if len(ack_dict) > 90: - # empty ack dict if it's really big, could result in key error later - LOG.debug( - "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) + def send(self): + LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) + thread = threading.Thread( + target=self.send_thread, name="send_ack" ) - ack_dict.clear() - LOG.debug(pprint.pformat(ack_dict)) - LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict) + thread.start() + # end send_ack() + + def send_direct(self): + """Send an ack message without a separate thread.""" + cl = client.get_client() + log_message( + "Sending ack", + repr(self).rstrip("\n"), + None, + ack=self.id, + tocall=self.tocall, + fromcall=self.fromcall, ) - ack_dict[message_number] = 0 # clear ack for this message number - tocall = tocall.ljust(9) # pad to nine chars - - # max? ftm400 displays 64, raw msg shows 74 - # and ftm400-send is max 64. setting this to - # 67 displays 64 on the ftm400. (+3 {01 suffix) - # feature req: break long ones into two msgs - message = message[:67] - # We all miss George Carlin - message = re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - thread = threading.Thread( - target=send_message_thread, - name="send_message", - args=(tocall, message, message_number, retry_count), - ) - thread.start() - return () - # end send_message() - - -def send_message_direct(tocall, message, message_number=None): - """Send a message without a separate thread.""" - cl = client.get_client() - if not message_number: - this_message_number = 1 - else: - this_message_number = message_number - fromcall = CONFIG["aprs"]["login"] - line = "{}>APRS::{}:{}{{{}\n".format( - fromcall, - tocall, - message, - str(this_message_number), - ) - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - log_message( - "Sending Message", line.rstrip("\n"), message, tocall=tocall, fromcall=fromcall - ) - cl.sendall(line) + cl.sendall(repr(self)) def log_packet(packet): @@ -236,32 +314,3 @@ def log_message( log_list.append(" {} _______________ Complete".format(header)) LOG.info("\n".join(log_list)) - - -def process_message(line): - f = re.search("^(.*)>", line) - fromcall = f.group(1) - searchstring = "::%s[ ]*:(.*)" % CONFIG["aprs"]["login"] - # verify this, callsign is padded out with spaces to colon - m = re.search(searchstring, line) - fullmessage = m.group(1) - - ack_attached = re.search("(.*){([0-9A-Z]+)", fullmessage) - # ack formats include: {1, {AB}, {12 - if ack_attached: - # "{##" suffix means radio wants an ack back - # message content - message = ack_attached.group(1) - # suffix number to use in ack - ack_num = ack_attached.group(2) - else: - message = fullmessage - # ack not requested, but lets send one as 0 - ack_num = "0" - - log_message( - "Received message", line, message, fromcall=fromcall, msg_num=str(ack_num) - ) - - return (fromcall, message, ack_num) - # end process_message() From f65707cb8cc10c84a3cc0faafdc871de93d20ee9 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 24 Dec 2020 12:39:48 -0500 Subject: [PATCH 2/8] reworked threading This patch reworks the threading code for processing messages. This patch also extends the aprslib IS class to allow us to stop processing the consumer packets when someone hits CTRL-C correctly. Alloing the app to exit. --- aprsd/client.py | 61 +++++++++++++- aprsd/main.py | 194 +++++++-------------------------------------- aprsd/messaging.py | 43 +++++----- aprsd/threads.py | 186 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 297 insertions(+), 187 deletions(-) create mode 100644 aprsd/threads.py diff --git a/aprsd/client.py b/aprsd/client.py index b3613dd..58b1be4 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -1,4 +1,6 @@ import logging +import select +import socket import time import aprslib @@ -45,7 +47,7 @@ class Client(object): while not connected: try: LOG.info("Creating aprslib client") - aprs_client = aprslib.IS(user, passwd=password, host=host, port=port) + aprs_client = Aprsdis(user, passwd=password, host=host, port=port) # Force the logging to be the same aprs_client.logger = LOG aprs_client.connect() @@ -60,6 +62,63 @@ class Client(object): return aprs_client +class Aprsdis(aprslib.IS): + """Extend the aprslib class so we can exit properly.""" + + # flag to tell us to stop + thread_stop = False + + # timeout in seconds + select_timeout = 10 + + def stop(self): + self.thread_stop = True + + def _socket_readlines(self, blocking=False): + """ + Generator for complete lines, received from the server + """ + try: + self.sock.setblocking(0) + except socket.error as e: + self.logger.error("socket error when setblocking(0): %s" % str(e)) + raise aprslib.ConnectionDrop("connection dropped") + + while not self.thread_stop: + short_buf = b"" + newline = b"\r\n" + + # set a select timeout, so we get a chance to exit + # when user hits CTRL-C + readable, writable, exceptional = select.select( + [self.sock], [], [], self.select_timeout + ) + if not readable: + self.logger.info("nothing to read") + continue + + try: + short_buf = self.sock.recv(4096) + + # sock.recv returns empty if the connection drops + if not short_buf: + self.logger.error("socket.recv(): returned empty") + raise aprslib.ConnectionDrop("connection dropped") + except socket.error as e: + # self.logger.error("socket error on recv(): %s" % str(e)) + if "Resource temporarily unavailable" in str(e): + if not blocking: + if len(self.buf) == 0: + break + + self.buf += short_buf + + while newline in self.buf: + line, self.buf = self.buf.split(newline, 1) + + yield line + + def get_client(): cl = Client() return cl.client diff --git a/aprsd/main.py b/aprsd/main.py index c0aa32b..58dfee0 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -21,12 +21,9 @@ # # python included libs -import concurrent.futures -import functools import logging import os import queue -import random import signal import sys import threading @@ -41,7 +38,7 @@ import yaml # local imports here import aprsd -from aprsd import client, email, messaging, plugin, utils +from aprsd import client, email, messaging, plugin, threads, utils # setup the global logger # logging.basicConfig(level=logging.DEBUG) # level=10 @@ -57,9 +54,7 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -# Global threading event to trigger stopping all threads -# When user quits app via CTRL-C -event = None +server_threads = [] # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin @@ -151,7 +146,11 @@ def signal_handler(signal, frame): global event LOG.info("Ctrl+C, Sending all threads exit!") + for th in server_threads: + th.stop() sys.exit(0) # thread ignores this + + # end signal_handler @@ -179,108 +178,6 @@ def setup_logging(config, loglevel, quiet): LOG.addHandler(sh) -def process_ack_packet(packet, config): - ack_num = packet.get("msgNo") - LOG.info("Got ack for message {}".format(ack_num)) - messaging.log_message( - "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] - ) - messaging.ack_dict.update({int(ack_num): 1}) - return - - -def process_mic_e_packet(packet, config): - LOG.info("Mic-E Packet detected. Currenlty unsupported.") - messaging.log_packet(packet) - return - - -def process_message_packet(packet, config, tx_msg_queue): - LOG.info("Got a message packet") - fromcall = packet["from"] - message = packet.get("message_text", None) - - msg_id = packet.get("msgNo", None) - if not msg_id: - msg_id = "0" - - messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id - ) - - found_command = False - # Get singleton of the PM - pm = plugin.PluginManager() - try: - results = pm.run(fromcall=fromcall, message=message, ack=msg_id) - for reply in results: - found_command = True - # A plugin can return a null message flag which signals - # us that they processed the message correctly, but have - # nothing to reply with, so we avoid replying with a usage string - if reply is not messaging.NULL_MESSAGE: - LOG.debug("Sending '{}'".format(reply)) - - # msg = {"fromcall": fromcall, "msg": reply} - msg = messaging.TextMessage(config["aprs"]["login"], - fromcall, reply) - tx_msg_queue.put(msg) - else: - LOG.debug("Got NULL MESSAGE from plugin") - - if not found_command: - plugins = pm.get_plugins() - names = [x.command_name for x in plugins] - names.sort() - - reply = "Usage: {}".format(", ".join(names)) - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage(config["aprs"]["login"], - fromcall, reply) - tx_msg_queue.put(msg) - except Exception as ex: - LOG.exception("Plugin failed!!!", ex) - reply = "A Plugin failed! try again?" - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage(config["aprs"]["login"], - fromcall, reply) - tx_msg_queue.put(msg) - - # let any threads do their thing, then ack - # send an ack last - ack = messaging.AckMessage(config["aprs"]["login"], fromcall, msg_id=msg_id) - ack.send() - LOG.debug("Packet processing complete") - - -def process_packet(packet, config=None, msg_queues=None, event=None): - """Process a packet recieved from aprs-is server.""" - - LOG.debug("Process packet! {}".format(msg_queues)) - try: - LOG.debug("Got message: {}".format(packet)) - - msg = packet.get("message_text", None) - msg_format = packet.get("format", None) - msg_response = packet.get("response", None) - if msg_format == "message" and msg: - # we want to send the message through the - # plugins - process_message_packet(packet, config, msg_queues["tx"]) - return - elif msg_response == "ack": - process_ack_packet(packet, config) - return - - if msg_format == "mic-e": - # process a mic-e packet - process_mic_e_packet(packet, config) - return - - except (aprslib.ParseError, aprslib.UnknownFormat) as exp: - LOG.exception("Failed to parse packet from aprs-is", exp) - - @main.command() def sample_config(): """This dumps the config to stdout.""" @@ -345,7 +242,6 @@ def send_message( setup_logging(config, loglevel, quiet) LOG.info("APRSD Started version: {}".format(aprsd.__version__)) - message_number = random.randint(1, 90) if type(command) is tuple: command = " ".join(command) LOG.info("Sending Command '{}'".format(command)) @@ -368,12 +264,17 @@ def send_message( fromcall = packet["from"] msg_number = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_number + "Received Message", + packet["raw"], + message, + fromcall=fromcall, + ack=msg_number, ) got_response = True # Send the ack back? - ack = messaging.AckMessage(config["aprs"]["login"], - fromcall, msg_id=msg_number) + ack = messaging.AckMessage( + config["aprs"]["login"], fromcall, msg_id=msg_number + ) ack.send_direct() if got_ack and got_response: @@ -388,7 +289,7 @@ def send_message( # message msg = messaging.TextMessage(aprs_login, tocallsign, command) msg.send_direct() - #messaging.send_message_direct(tocallsign, command, message_number) + # messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -405,16 +306,6 @@ def send_message( cl.reset() -def tx_msg_thread(msg_queues=None, event=None): - """Thread to handle sending any messages outbound.""" - LOG.info("TX_MSG_THREAD") - while not event.is_set() or not msg_queues["tx"].empty(): - msg = msg_queues["tx"].get() - LOG.info("TXQ: got message '{}'".format(msg)) - msg.send() - #messaging.send_message(msg["fromcall"], msg["msg"]) - - # main() ### @main.command() @click.option( @@ -474,55 +365,24 @@ def server(loglevel, quiet, disable_validation, config_file): # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) plugin_manager.setup_plugins() - cl = client.Client(config) + client.Client(config) rx_msg_queue = queue.Queue(maxsize=20) tx_msg_queue = queue.Queue(maxsize=20) - msg_queues = {"rx": rx_msg_queue, - "tx": tx_msg_queue,} - - rx_msg = threading.Thread( - target=rx_msg_thread, name="RX_msg", kwargs={'msg_queues':msg_queues, - 'event': event} - ) - tx_msg = threading.Thread( - target=tx_msg_thread, name="TX_msg", kwargs={'msg_queues':msg_queues, - 'event': event} - ) - rx_msg.start() - tx_msg.start() - - while True: - # Now use the helper which uses the singleton - aprs_client = client.get_client() - - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - process_partial = functools.partial(process_packet, - msg_queues=msg_queues, - event=event, config=config) - aprs_client.consumer(process_partial, raw=False) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() + msg_queues = { + "rx": rx_msg_queue, + "tx": tx_msg_queue, + } + rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) + tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) + # TODO(hemna): add EmailThread + server_threads.append(rx_thread) + server_threads.append(tx_thread) + rx_thread.start() + tx_thread.start() LOG.info("APRSD Exiting.") - tx_msg.join() - sys.exit(0) # setup and run the main blocking loop diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 25c3c85..ded7ac7 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,11 +1,11 @@ import abc import logging -from multiprocessing import RawValue import pprint import re import threading import time import uuid +from multiprocessing import RawValue from aprsd import client @@ -20,6 +20,7 @@ ack_dict = {} # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 + class MessageCounter(object): """ Global message id counter class. @@ -31,13 +32,14 @@ class MessageCounter(object): from the MessageCounter. """ + _instance = None def __new__(cls, *args, **kwargs): """Make this a singleton class.""" if cls._instance is None: cls._instance = super(MessageCounter, cls).__new__(cls) - cls._instance.val = RawValue('i', 1) + cls._instance.val = RawValue("i", 1) cls._instance.lock = threading.Lock() return cls._instance @@ -61,6 +63,7 @@ class MessageCounter(object): class Message(object, metaclass=abc.ABCMeta): """Base Message Class.""" + # The message id to send over the air id = 0 @@ -102,14 +105,16 @@ class TextMessage(Message): def __repr__(self): """Build raw string to send over the air.""" return "{}>APRS::{}:{}{{{}\n".format( - self.fromcall, self.tocall.ljust(9), - self._filter_for_send(), str(self.id),) + self.fromcall, + self.tocall.ljust(9), + self._filter_for_send(), + str(self.id), + ) def __str__(self): return "From({}) TO({}) - Message({}): '{}'".format( - self.fromcall, self.tocall, - self.id, self.message - ) + self.fromcall, self.tocall, self.id, self.message + ) def ack(self): """Build an Ack Message object from this object.""" @@ -162,7 +167,8 @@ class TextMessage(Message): ack_dict.clear() LOG.debug(pprint.pformat(ack_dict)) LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict) + "DEBUG: Cleared ack dictionary, ack_dict length is now %s." + % len(ack_dict) ) ack_dict[self.id] = 0 # clear ack for this message number @@ -173,8 +179,11 @@ class TextMessage(Message): """Send a message without a separate thread.""" cl = client.get_client() log_message( - "Sending Message Direct", repr(self).rstrip("\n"), self.message, tocall=self.tocall, - fromcall=self.fromcall + "Sending Message Direct", + repr(self).rstrip("\n"), + self.message, + tocall=self.tocall, + fromcall=self.fromcall, ) cl.sendall(repr(self)) @@ -182,19 +191,16 @@ class TextMessage(Message): class AckMessage(Message): """Class for building Acks and sending them.""" - def __init__(self, fromcall, tocall, msg_id): super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id) def __repr__(self): return "{}>APRS::{}:ack{}\n".format( - self.fromcall, self.tocall.ljust(9), self.id) + self.fromcall, self.tocall.ljust(9), self.id + ) def __str__(self): - return "From({}) TO({}) Ack ({})".format( - self.fromcall, self.tocall, - self.id - ) + return "From({}) TO({}) Ack ({})".format(self.fromcall, self.tocall, self.id) def send_thread(self): """Separate thread to send acks with retries.""" @@ -216,10 +222,9 @@ class AckMessage(Message): def send(self): LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) - thread = threading.Thread( - target=self.send_thread, name="send_ack" - ) + thread = threading.Thread(target=self.send_thread, name="send_ack") thread.start() + # end send_ack() def send_direct(self): diff --git a/aprsd/threads.py b/aprsd/threads.py new file mode 100644 index 0000000..2b666cb --- /dev/null +++ b/aprsd/threads.py @@ -0,0 +1,186 @@ +import logging +import queue +import threading +import time + +import aprslib + +from aprsd import client, messaging, plugin + +LOG = logging.getLogger("APRSD") + + +class APRSDThread(threading.Thread): + def __init__(self, name, msg_queues, config): + super(APRSDThread, self).__init__(name=name) + self.msg_queues = msg_queues + self.config = config + self.thread_stop = False + + def stop(self): + self.thread_stop = True + + def run(self): + while not self.thread_stop: + self._run() + + +class APRSDRXThread(APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config) + self.thread_stop = False + + def stop(self): + self.thread_stop = True + self.aprs.stop() + + def callback(self, packet): + try: + packet = aprslib.parse(packet) + print(packet) + except (aprslib.ParseError, aprslib.UnknownFormat): + pass + + def run(self): + LOG.info("Starting") + while not self.thread_stop: + aprs_client = client.get_client() + + # setup the consumer of messages and block until a messages + try: + # This will register a packet consumer with aprslib + # When new packets come in the consumer will process + # the packet + + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + aprs_client.consumer(self.process_packet, raw=False, blocking=False) + + except aprslib.exceptions.ConnectionDrop: + LOG.error("Connection dropped, reconnecting") + time.sleep(5) + # Force the deletion of the client object connected to aprs + # This will cause a reconnect, next time client.get_client() + # is called + client.Client().reset() + LOG.info("Exiting ") + + def process_ack_packet(self, packet): + ack_num = packet.get("msgNo") + LOG.info("Got ack for message {}".format(ack_num)) + messaging.log_message( + "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] + ) + messaging.ack_dict.update({int(ack_num): 1}) + return + + def process_mic_e_packet(self, packet): + LOG.info("Mic-E Packet detected. Currenlty unsupported.") + messaging.log_packet(packet) + return + + def process_message_packet(self, packet): + LOG.info("Got a message packet") + fromcall = packet["from"] + message = packet.get("message_text", None) + + msg_id = packet.get("msgNo", None) + if not msg_id: + msg_id = "0" + + messaging.log_message( + "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id + ) + + found_command = False + # Get singleton of the PM + pm = plugin.PluginManager() + try: + results = pm.run(fromcall=fromcall, message=message, ack=msg_id) + for reply in results: + found_command = True + # A plugin can return a null message flag which signals + # us that they processed the message correctly, but have + # nothing to reply with, so we avoid replying with a usage string + if reply is not messaging.NULL_MESSAGE: + LOG.debug("Sending '{}'".format(reply)) + + # msg = {"fromcall": fromcall, "msg": reply} + msg = messaging.TextMessage( + self.config["aprs"]["login"], fromcall, reply + ) + self.msg_queues["tx"].put(msg) + else: + LOG.debug("Got NULL MESSAGE from plugin") + + if not found_command: + plugins = pm.get_plugins() + names = [x.command_name for x in plugins] + names.sort() + + reply = "Usage: {}".format(", ".join(names)) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage( + self.config["aprs"]["login"], fromcall, reply + ) + self.msg_queues["tx"].put(msg) + except Exception as ex: + LOG.exception("Plugin failed!!!", ex) + reply = "A Plugin failed! try again?" + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) + self.msg_queues["tx"].put(msg) + + # let any threads do their thing, then ack + # send an ack last + ack = messaging.AckMessage( + self.config["aprs"]["login"], fromcall, msg_id=msg_id + ) + ack.send() + LOG.debug("Packet processing complete") + + def process_packet(self, packet): + """Process a packet recieved from aprs-is server.""" + + LOG.debug("Process packet! {}".format(self.msg_queues)) + try: + LOG.debug("Got message: {}".format(packet)) + + msg = packet.get("message_text", None) + msg_format = packet.get("format", None) + msg_response = packet.get("response", None) + if msg_format == "message" and msg: + # we want to send the message through the + # plugins + self.process_message_packet(packet) + return + elif msg_response == "ack": + self.process_ack_packet(packet) + return + + if msg_format == "mic-e": + # process a mic-e packet + self.process_mic_e_packet(packet) + return + + except (aprslib.ParseError, aprslib.UnknownFormat) as exp: + LOG.exception("Failed to parse packet from aprs-is", exp) + + +class APRSDTXThread(APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config) + + def run(self): + LOG.info("Starting") + while not self.thread_stop: + try: + msg = self.msg_queues["tx"].get(timeout=0.1) + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + except queue.Empty: + pass + LOG.info("Exiting ") From 171703ac1a603661debda12278133ca14cc10c6f Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 25 Dec 2020 16:45:28 -0500 Subject: [PATCH 3/8] Added some plugin unit tests This patch adds some real unit tests for the following plugins PingPlugin TimePlugin VersionPlugin --- tests/test_plugin.py | 95 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/test_plugin.py diff --git a/tests/test_plugin.py b/tests/test_plugin.py new file mode 100644 index 0000000..b1a4334 --- /dev/null +++ b/tests/test_plugin.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +import sys +import unittest +from unittest import mock + +import pytest + +import aprsd +from aprsd import plugin +from aprsd.fuzzyclock import fuzzy + + +class testPlugin(unittest.TestCase): + def setUp(self): + self.config = mock.MagicMock() + + @mock.patch("time.localtime") + def test_Time(self, mock_time): + fake_time = mock.MagicMock() + h = fake_time.tm_hour = 16 + m = fake_time.tm_min = 12 + s = fake_time.tm_sec = 55 + mock_time.return_value = fake_time + time_plugin = plugin.TimePlugin(self.config) + + fromcall = "KFART" + message = "location" + ack = 1 + + actual = time_plugin.run(fromcall, message, ack) + self.assertEqual(None, actual) + + cur_time = fuzzy(h, m, 1) + + message = "time" + expected = "{} ({}:{} PDT) ({})".format( + cur_time, str(h), str(m).rjust(2, "0"), message.rstrip() + ) + actual = time_plugin.run(fromcall, message, ack) + self.assertEqual(expected, actual) + + @mock.patch("time.localtime") + def test_Ping(self, mock_time): + fake_time = mock.MagicMock() + h = fake_time.tm_hour = 16 + m = fake_time.tm_min = 12 + s = fake_time.tm_sec = 55 + mock_time.return_value = fake_time + + ping = plugin.PingPlugin(self.config) + + fromcall = "KFART" + message = "location" + ack = 1 + + result = ping.run(fromcall, message, ack) + self.assertEqual(None, result) + + def ping_str(h, m, s): + return ( + "Pong! " + + str(h).zfill(2) + + ":" + + str(m).zfill(2) + + ":" + + str(s).zfill(2) + ) + + message = "Ping" + actual = ping.run(fromcall, message, ack) + expected = ping_str(h, m, s) + self.assertEqual(expected, actual) + + message = "ping" + actual = ping.run(fromcall, message, ack) + self.assertEqual(expected, actual) + + def test_version(self): + expected = "APRSD version '{}'".format(aprsd.__version__) + version_plugin = plugin.VersionPlugin(self.config) + + fromcall = "KFART" + message = "No" + ack = 1 + + actual = version_plugin.run(fromcall, message, ack) + self.assertEqual(None, actual) + + message = "version" + actual = version_plugin.run(fromcall, message, ack) + self.assertEqual(expected, actual) + + message = "Version" + actual = version_plugin.run(fromcall, message, ack) + self.assertEqual(expected, actual) From 9d3ede6e71313dd5800c091c67b11b9682057ae4 Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 25 Dec 2020 18:13:52 -0500 Subject: [PATCH 4/8] Added FortunePlugin unit test this patch adds te FortunePlugin unit tests. --- tests/test_plugin.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index b1a4334..886064f 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -12,8 +12,34 @@ from aprsd.fuzzyclock import fuzzy class testPlugin(unittest.TestCase): def setUp(self): + self.fromcall = "KFART" + self.ack = 1 self.config = mock.MagicMock() + @mock.patch("shutil.which") + def test_fortune_fail(self, mock_which): + fortune_plugin = plugin.FortunePlugin(self.config) + mock_which.return_value = None + message = "fortune" + expected = "Fortune command not installed" + actual = fortune_plugin.run(self.fromcall, message, self.ack) + self.assertEqual(expected, actual) + + @mock.patch("subprocess.Popen") + @mock.patch("shutil.which") + def test_fortune_success(self, mock_which, mock_popen): + fortune_plugin = plugin.FortunePlugin(self.config) + mock_which.return_value = "/usr/bin/games" + + mock_process = mock.MagicMock() + mock_process.communicate.return_value = [b"Funny fortune"] + mock_popen.return_value = mock_process + + message = "fortune" + expected = "Funny fortune" + actual = fortune_plugin.run(self.fromcall, message, self.ack) + self.assertEqual(expected, actual) + @mock.patch("time.localtime") def test_Time(self, mock_time): fake_time = mock.MagicMock() From 2e90c0bdbb2ee37923b61eb6cc5cdd695d99f53d Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 29 Dec 2020 10:31:16 -0500 Subject: [PATCH 5/8] Creation of MsgTrack object and other stuff This patch adds the new MsgTrack object replacing the global ack_dict. the ack_dict was not thread safe. the new MsgTrack is a singleton object that keeps track of all outbound TextMessage objects. When a TextMessage.send() is called it is added to the MsgTrack object, and when an ack is received for that message, the message is removed from the MsgTrack object. TODO: Add an automatic mechanism for saving the messages in MsgTrack so that when CTRL-C is called to exit aprsd server, then the MsgTrack state is saved to storage. When aprsd server is started up again, add the option to try and reload state of MsgTrack. This patch also reworked the email thread into an APRSDThread object that can exit gracefully with CTRL-C. NOTE: Don't call sleep() with a long time (greater than 5 seconds), as it causes a delay in exiting aprsd until the last sleep() finishes. Since aprsd has so many threads now for processing incoming messages and outgoing messages, we need to coordinate all thread operations so that they don't block the exiting of the app. --- aprsd/client.py | 2 +- aprsd/email.py | 253 +++++++++++++++++++++++------------------ aprsd/main.py | 31 +++--- aprsd/messaging.py | 272 ++++++++++++++++++++++++++++++++++----------- aprsd/plugin.py | 15 ++- aprsd/threads.py | 144 +++++++++++++++--------- 6 files changed, 478 insertions(+), 239 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 58b1be4..9f256c7 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -73,6 +73,7 @@ class Aprsdis(aprslib.IS): def stop(self): self.thread_stop = True + LOG.info("Shutdown Aprsdis client.") def _socket_readlines(self, blocking=False): """ @@ -94,7 +95,6 @@ class Aprsdis(aprslib.IS): [self.sock], [], [], self.select_timeout ) if not readable: - self.logger.info("nothing to read") continue try: diff --git a/aprsd/email.py b/aprsd/email.py index 8c270a1..0e6ffad 100644 --- a/aprsd/email.py +++ b/aprsd/email.py @@ -4,7 +4,6 @@ import imaplib import logging import re import smtplib -import threading import time from email.mime.text import MIMEText @@ -12,7 +11,7 @@ import imapclient import six from validate_email import validate_email -from aprsd import messaging +from aprsd import messaging, threads LOG = logging.getLogger("APRSD") @@ -20,13 +19,6 @@ LOG = logging.getLogger("APRSD") CONFIG = None -def start_thread(): - checkemailthread = threading.Thread( - target=check_email_thread, name="check_email", args=() - ) # args must be tuple - checkemailthread.start() - - def _imap_connect(): imap_port = CONFIG["imap"].get("port", 143) use_ssl = CONFIG["imap"].get("use_ssl", False) @@ -120,6 +112,11 @@ def validate_shortcuts(config): LOG.info("Available shortcuts: {}".format(config["shortcuts"])) +def get_email_from_shortcut(shortcut): + if shortcut in CONFIG.get("shortcuts", None): + return CONFIG["shortcuts"].get(shortcut, None) + + def validate_email_config(config, disable_validation=False): """function to simply ensure we can connect to email services. @@ -221,6 +218,45 @@ def parse_email(msgid, data, server): # end parse_email +def send_email(to_addr, content): + global check_email_delay + + shortcuts = CONFIG["shortcuts"] + email_address = get_email_from_shortcut(to_addr) + LOG.info("Sending Email_________________") + + if to_addr in shortcuts: + LOG.info("To : " + to_addr) + to_addr = email_address + LOG.info(" (" + to_addr + ")") + subject = CONFIG["ham"]["callsign"] + # content = content + "\n\n(NOTE: reply with one line)" + LOG.info("Subject : " + subject) + LOG.info("Body : " + content) + + # check email more often since there's activity right now + check_email_delay = 60 + + msg = MIMEText(content) + msg["Subject"] = subject + msg["From"] = CONFIG["smtp"]["login"] + msg["To"] = to_addr + server = _smtp_connect() + if server: + try: + server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) + except Exception as e: + msg = getattr(e, "message", repr(e)) + LOG.error("Sendmail Error!!!! '{}'", msg) + server.quit() + return -1 + server.quit() + return 0 + + +# end send_email + + def resend_email(count, fromcall): global check_email_delay date = datetime.datetime.now() @@ -257,7 +293,9 @@ def resend_email(count, fromcall): from_addr = shortcuts_inverted[from_addr] # asterisk indicates a resend reply = "-" + from_addr + " * " + body.decode(errors="ignore") - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() msgexists = True if msgexists is not True: @@ -274,7 +312,9 @@ def resend_email(count, fromcall): str(m).zfill(2), str(s).zfill(2), ) - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() # check email more often since we're resending one now check_email_delay = 60 @@ -283,117 +323,108 @@ def resend_email(count, fromcall): # end resend_email() -def check_email_thread(): - global check_email_delay +class APRSDEmailThread(threads.APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDEmailThread, self).__init__("EmailThread") + self.msg_queues = msg_queues + self.config = config - # LOG.debug("FIXME initial email delay is 10 seconds") - check_email_delay = 60 - while True: - # LOG.debug("Top of check_email_thread.") + def run(self): + global check_email_delay - time.sleep(check_email_delay) + check_email_delay = 60 + past = datetime.datetime.now() + while not self.thread_stop: + time.sleep(5) + # always sleep for 5 seconds and see if we need to check email + # This allows CTRL-C to stop the execution of this loop sooner + # than check_email_delay time + now = datetime.datetime.now() + if now - past > datetime.timedelta(seconds=check_email_delay): + # It's time to check email - # slowly increase delay every iteration, max out at 300 seconds - # any send/receive/resend activity will reset this to 60 seconds - if check_email_delay < 300: - check_email_delay += 1 - LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") + # slowly increase delay every iteration, max out at 300 seconds + # any send/receive/resend activity will reset this to 60 seconds + if check_email_delay < 300: + check_email_delay += 1 + LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") - shortcuts = CONFIG["shortcuts"] - # swap key/value - shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) + shortcuts = CONFIG["shortcuts"] + # swap key/value + shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = "%s-%s-%s" % (day, month, year) + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = "%s-%s-%s" % (day, month, year) - server = None - try: - server = _imap_connect() - except Exception as e: - LOG.exception("Failed to get IMAP server Can't check email.", e) + server = None + try: + server = _imap_connect() + except Exception as e: + LOG.exception("Failed to get IMAP server Can't check email.", e) - if not server: - continue + if not server: + continue - messages = server.search(["SINCE", today]) - # LOG.debug("{} messages received today".format(len(messages))) + messages = server.search(["SINCE", today]) + # LOG.debug("{} messages received today".format(len(messages))) - for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): - envelope = data[b"ENVELOPE"] - # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) - f = re.search( - r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) - ) - if f is not None: - from_addr = f.group(1) + for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): + envelope = data[b"ENVELOPE"] + # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) + f = re.search( + r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) + ) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + + # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) + # if "APRS" not in server.get_flags(msgid)[msgid]: + # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both + taglist = [ + x.decode(errors="ignore") + for x in server.get_flags(msgid)[msgid] + ] + if "APRS" not in taglist: + # if msg not flagged as sent via aprs + server.fetch([msgid], ["RFC822"]) + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + server.remove_flags(msgid, [imapclient.SEEN]) + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + + reply = "-" + from_addr + " " + body.decode(errors="ignore") + # messaging.send_message(CONFIG["ham"]["callsign"], reply) + msg = messaging.TextMessage( + self.config["aprs"]["login"], + self.config["ham"]["callsign"], + reply, + ) + self.msg_queues["tx"].put(msg) + # flag message as sent via aprs + server.add_flags(msgid, ["APRS"]) + # unset seen flag, will stay bold in email client + server.remove_flags(msgid, [imapclient.SEEN]) + # check email more often since we just received an email + check_email_delay = 60 + # reset clock + past = datetime.datetime.now() + server.logout() else: - from_addr = "noaddr" + # We haven't hit the email delay yet. + # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) + pass - # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) - # if "APRS" not in server.get_flags(msgid)[msgid]: - # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both - taglist = [ - x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid] - ] - if "APRS" not in taglist: - # if msg not flagged as sent via aprs - server.fetch([msgid], ["RFC822"]) - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - server.remove_flags(msgid, [imapclient.SEEN]) - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - - reply = "-" + from_addr + " " + body.decode(errors="ignore") - messaging.send_message(CONFIG["ham"]["callsign"], reply) - # flag message as sent via aprs - server.add_flags(msgid, ["APRS"]) - # unset seen flag, will stay bold in email client - server.remove_flags(msgid, [imapclient.SEEN]) - # check email more often since we just received an email - check_email_delay = 60 - - server.logout() + # Remove ourselves from the global threads list + threads.APRSDThreadList().remove(self) + LOG.info("Exiting") # end check_email() - - -def send_email(to_addr, content): - global check_email_delay - - LOG.info("Sending Email_________________") - shortcuts = CONFIG["shortcuts"] - if to_addr in shortcuts: - LOG.info("To : " + to_addr) - to_addr = shortcuts[to_addr] - LOG.info(" (" + to_addr + ")") - subject = CONFIG["ham"]["callsign"] - # content = content + "\n\n(NOTE: reply with one line)" - LOG.info("Subject : " + subject) - LOG.info("Body : " + content) - - # check email more often since there's activity right now - check_email_delay = 60 - - msg = MIMEText(content) - msg["Subject"] = subject - msg["From"] = CONFIG["smtp"]["login"] - msg["To"] = to_addr - server = _smtp_connect() - if server: - try: - server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) - except Exception as e: - msg = getattr(e, "message", repr(e)) - LOG.error("Sendmail Error!!!! '{}'", msg) - server.quit() - return -1 - server.quit() - return 0 - # end send_email diff --git a/aprsd/main.py b/aprsd/main.py index 58dfee0..a741477 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -54,7 +54,7 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -server_threads = [] +server_event = threading.Event() # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin @@ -143,11 +143,13 @@ def install(append, case_insensitive, shell, path): def signal_handler(signal, frame): - global event + global server_vent - LOG.info("Ctrl+C, Sending all threads exit!") - for th in server_threads: - th.stop() + LOG.info( + "Ctrl+C, Sending all threads exit! Can take up to 10 seconds to exit all threads" + ) + threads.APRSDThreadList().stop_all() + server_event.set() sys.exit(0) # thread ignores this @@ -260,7 +262,6 @@ def send_message( got_ack = True else: message = packet.get("message_text", None) - LOG.info("We got a new message") fromcall = packet["from"] msg_number = packet.get("msgNo", "0") messaging.log_message( @@ -289,7 +290,6 @@ def send_message( # message msg = messaging.TextMessage(aprs_login, tocallsign, command) msg.send_direct() - # messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -359,9 +359,6 @@ def server(loglevel, quiet, disable_validation, config_file): LOG.error("Failed to validate email config options") sys.exit(-1) - # start the email thread - email.start_thread() - # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) plugin_manager.setup_plugins() @@ -376,12 +373,20 @@ def server(loglevel, quiet, disable_validation, config_file): rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) - # TODO(hemna): add EmailThread - server_threads.append(rx_thread) - server_threads.append(tx_thread) + email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config) + email_thread.start() rx_thread.start() tx_thread.start() + cntr = 0 + while not server_event.is_set(): + # to keep the log noise down + if cntr % 6 == 0: + tracker = messaging.MsgTrack() + LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker))) + cntr += 1 + time.sleep(10) + LOG.info("APRSD Exiting.") sys.exit(0) # setup and run the main blocking loop diff --git a/aprsd/messaging.py b/aprsd/messaging.py index ded7ac7..faed185 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,13 +1,12 @@ import abc +import datetime import logging -import pprint import re import threading import time -import uuid from multiprocessing import RawValue -from aprsd import client +from aprsd import client, threads LOG = logging.getLogger("APRSD") @@ -21,6 +20,75 @@ ack_dict = {} NULL_MESSAGE = -1 +class MsgTrack(object): + """Class to keep track of outstanding text messages. + + This is a thread safe class that keeps track of active + messages. + + When a message is asked to be sent, it is placed into this + class via it's id. The TextMessage class's send() method + automatically adds itself to this class. When the ack is + recieved from the radio, the message object is removed from + this class. + + # TODO(hemna) + When aprsd is asked to quit this class should be serialized and + saved to disk/db to keep track of the state of outstanding messages. + When aprsd is started, it should try and fetch the saved state, + and reloaded to a live state. + + """ + + _instance = None + lock = None + + track = {} + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(MsgTrack, cls).__new__(cls) + cls._instance.track = {} + cls._instance.lock = threading.Lock() + return cls._instance + + def add(self, msg): + with self.lock: + key = int(msg.id) + self.track[key] = msg + + def get(self, id): + with self.lock: + if id in self.track: + return self.track[id] + + def remove(self, id): + with self.lock: + key = int(id) + if key in self.track.keys(): + del self.track[key] + + def __len__(self): + with self.lock: + return len(self.track) + + def __str__(self): + with self.lock: + result = "{" + for key in self.track.keys(): + result += "{}: {}, ".format(key, str(self.track[key])) + result += "}" + return result + + def save(self): + """Save this shit to disk?""" + pass + + def restore(self): + """Restore this shit?""" + pass + + class MessageCounter(object): """ Global message id counter class. @@ -34,6 +102,7 @@ class MessageCounter(object): """ _instance = None + max_count = 9999 def __new__(cls, *args, **kwargs): """Make this a singleton class.""" @@ -45,7 +114,10 @@ class MessageCounter(object): def increment(self): with self.lock: - self.val.value += 1 + if self.val.value == self.max_count: + self.val.value = 1 + else: + self.val.value += 1 @property def value(self): @@ -67,20 +139,13 @@ class Message(object, metaclass=abc.ABCMeta): # The message id to send over the air id = 0 - # Unique identifier for this message - uuid = None - - sent = False - sent_time = None - acked = False - acked_time = None - retry_count = 3 + last_send_time = None + last_send_attempt = 0 def __init__(self, fromcall, tocall, msg_id=None): self.fromcall = fromcall self.tocall = tocall - self.uuid = uuid.uuid4() if not msg_id: c = MessageCounter() c.increment() @@ -98,9 +163,12 @@ class TextMessage(Message): message = None - def __init__(self, fromcall, tocall, message): - super(TextMessage, self).__init__(fromcall, tocall) + def __init__(self, fromcall, tocall, message, msg_id=None, allow_delay=True): + super(TextMessage, self).__init__(fromcall, tocall, msg_id) self.message = message + # do we try and save this message for later if we don't get + # an ack? Some messages we don't want to do this ever. + self.allow_delay = allow_delay def __repr__(self): """Build raw string to send over the air.""" @@ -112,14 +180,14 @@ class TextMessage(Message): ) def __str__(self): - return "From({}) TO({}) - Message({}): '{}'".format( - self.fromcall, self.tocall, self.id, self.message + delta = "Never" + if self.last_send_time: + now = datetime.datetime.now() + delta = now - self.last_send_time + return "{}>{} Msg({})({}): '{}'".format( + self.fromcall, self.tocall, self.id, delta, self.message ) - def ack(self): - """Build an Ack Message object from this object.""" - return AckMessage(self.fromcall, self.tocall, msg_id=self.id) - def _filter_for_send(self): """Filter and format message string for FCC.""" # max? ftm400 displays 64, raw msg shows 74 @@ -130,49 +198,13 @@ class TextMessage(Message): # We all miss George Carlin return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def send_thread(self): - cl = client.get_client() - for i in range(self.retry_count, 0, -1): - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - LOG.debug(pprint.pformat(ack_dict)) - if ack_dict[self.id] != 1: - log_message( - "Sending Message", - repr(self).rstrip("\n"), - self.message, - tocall=self.tocall, - retry_number=i, - ) - # tn.write(line) - cl.sendall(repr(self)) - # decaying repeats, 31 to 93 second intervals - sleeptime = (self.retry_count - i + 1) * 31 - time.sleep(sleeptime) - else: - break - return - # end send_message_thread - def send(self): global ack_dict - # TODO(Hemna) - Need a better metchanism for this. - # This can nuke an ack_dict while it's still being used. - # FIXME FIXME - if len(ack_dict) > 90: - # empty ack dict if it's really big, could result in key error later - LOG.debug( - "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) - ) - ack_dict.clear() - LOG.debug(pprint.pformat(ack_dict)) - LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." - % len(ack_dict) - ) - ack_dict[self.id] = 0 # clear ack for this message number - - thread = threading.Thread(target=self.send_thread, name="send_message") + tracker = MsgTrack() + tracker.add(self) + LOG.debug("Length of MsgTrack is {}".format(len(tracker))) + thread = SendMessageThread(message=self) thread.start() def send_direct(self): @@ -188,6 +220,73 @@ class TextMessage(Message): cl.sendall(repr(self)) +class SendMessageThread(threads.APRSDThread): + def __init__(self, message): + self.msg = message + name = self.msg.message[:5] + super(SendMessageThread, self).__init__( + "SendMessage-{}-{}".format(self.msg.id, name) + ) + + def loop(self): + """Loop until a message is acked or it gets delayed. + + We only sleep for 5 seconds between each loop run, so + that CTRL-C can exit the app in a short period. Each sleep + means the app quitting is blocked until sleep is done. + So we keep track of the last send attempt and only send if the + last send attempt is old enough. + + """ + cl = client.get_client() + tracker = MsgTrack() + # lets see if the message is still in the tracking queue + msg = tracker.get(self.msg.id) + if not msg: + # The message has been removed from the tracking queue + # So it got acked and we are done. + LOG.info("Message Send Complete via Ack.") + return False + else: + send_now = False + if msg.last_send_attempt == msg.retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Message Send Complete. Max attempts reached.") + return False + + # Message is still outstanding and needs to be acked. + if msg.last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + sleeptime = (msg.last_send_attempt + 1) * 31 + delta = now - msg.last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + send_now = True + + if send_now: + # no attempt time, so lets send it, and start + # tracking the time. + log_message( + "Sending Message", + repr(msg).rstrip("\n"), + msg.message, + tocall=self.msg.tocall, + retry_number=msg.last_send_attempt, + msg_num=msg.id, + ) + cl.sendall(repr(msg)) + msg.last_send_time = datetime.datetime.now() + msg.last_send_attempt += 1 + + time.sleep(5) + # Make sure we get called again. + return True + + class AckMessage(Message): """Class for building Acks and sending them.""" @@ -222,7 +321,7 @@ class AckMessage(Message): def send(self): LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) - thread = threading.Thread(target=self.send_thread, name="send_ack") + thread = SendAckThread(self) thread.start() # end send_ack() @@ -241,6 +340,52 @@ class AckMessage(Message): cl.sendall(repr(self)) +class SendAckThread(threads.APRSDThread): + def __init__(self, ack): + self.ack = ack + super(SendAckThread, self).__init__("SendAck-{}".format(self.ack.id)) + + def loop(self): + """Separate thread to send acks with retries.""" + send_now = False + if self.ack.last_send_attempt == self.ack.retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Ack Send Complete. Max attempts reached.") + return False + + if self.ack.last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + sleeptime = 31 + delta = now - self.ack.last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + LOG.debug("Still wating. {}".format(delta)) + else: + send_now = True + + if send_now: + cl = client.get_client() + log_message( + "Sending ack", + repr(self.ack).rstrip("\n"), + None, + ack=self.ack.id, + tocall=self.ack.tocall, + retry_number=self.ack.last_send_attempt, + ) + cl.sendall(repr(self.ack)) + self.ack.last_send_attempt += 1 + self.ack.last_send_time = datetime.datetime.now() + time.sleep(5) + + def log_packet(packet): fromcall = packet.get("from", None) tocall = packet.get("to", None) @@ -272,6 +417,7 @@ def log_message( retry_number=None, ack=None, packet_type=None, + uuid=None, ): """ @@ -315,6 +461,8 @@ def log_message( if msg_num: # LOG.info(" Msg number : {}".format(msg_num)) log_list.append(" Msg number : {}".format(msg_num)) + if uuid: + log_list.append(" UUID : {}".format(uuid)) # LOG.info(" {} _______________ Complete".format(header)) log_list.append(" {} _______________ Complete".format(header)) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 47e491b..1969af2 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -449,8 +449,16 @@ class EmailPlugin(APRSDPluginBase): if a is not None: to_addr = a.group(1) content = a.group(2) + + email_address = email.get_email_from_shortcut(to_addr) + if not email_address: + reply = "Bad email address" + return reply + # send recipient link to aprs.fi map + mapme = False if content == "mapme": + mapme = True content = "Click for my location: http://aprs.fi/{}".format( self.config["ham"]["callsign"] ) @@ -458,6 +466,8 @@ class EmailPlugin(APRSDPluginBase): now = time.time() # see if we sent this msg number recently if ack in self.email_sent_dict: + # BUG(hemna) - when we get a 2 different email command + # with the same ack #, we don't send it. timedelta = now - self.email_sent_dict[ack] if timedelta < 300: # five minutes too_soon = 1 @@ -477,7 +487,10 @@ class EmailPlugin(APRSDPluginBase): ) self.email_sent_dict.clear() self.email_sent_dict[ack] = now - reply = "mapme email sent" + if mapme: + reply = "mapme email sent" + else: + reply = "Email sent." else: LOG.info( "Email for message number " diff --git a/aprsd/threads.py b/aprsd/threads.py index 2b666cb..9de0c0c 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -1,3 +1,4 @@ +import abc import logging import queue import threading @@ -9,30 +10,69 @@ from aprsd import client, messaging, plugin LOG = logging.getLogger("APRSD") +RX_THREAD = "RX" +TX_THREAD = "TX" +EMAIL_THREAD = "Email" -class APRSDThread(threading.Thread): - def __init__(self, name, msg_queues, config): + +class APRSDThreadList(object): + """Singleton class that keeps track of application wide threads.""" + + _instance = None + + threads_list = [] + lock = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(APRSDThreadList, cls).__new__(cls) + cls.lock = threading.Lock() + cls.threads_list = [] + return cls._instance + + def add(self, thread_obj): + with self.lock: + self.threads_list.append(thread_obj) + + def remove(self, thread_obj): + with self.lock: + self.threads_list.remove(thread_obj) + + def stop_all(self): + """Iterate over all threads and call stop on them.""" + with self.lock: + for th in self.threads_list: + th.stop() + + +class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + def __init__(self, name): super(APRSDThread, self).__init__(name=name) - self.msg_queues = msg_queues - self.config = config self.thread_stop = False + APRSDThreadList().add(self) def stop(self): self.thread_stop = True def run(self): + LOG.info("Starting") while not self.thread_stop: - self._run() + can_loop = self.loop() + if not can_loop: + self.stop() + APRSDThreadList().remove(self) + LOG.info("Exiting") class APRSDRXThread(APRSDThread): def __init__(self, msg_queues, config): - super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config) - self.thread_stop = False + super(APRSDRXThread, self).__init__("RX_MSG") + self.msg_queues = msg_queues + self.config = config def stop(self): self.thread_stop = True - self.aprs.stop() + client.get_client().stop() def callback(self, packet): try: @@ -41,32 +81,31 @@ class APRSDRXThread(APRSDThread): except (aprslib.ParseError, aprslib.UnknownFormat): pass - def run(self): - LOG.info("Starting") - while not self.thread_stop: - aprs_client = client.get_client() + def loop(self): + aprs_client = client.get_client() - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet + # setup the consumer of messages and block until a messages + try: + # This will register a packet consumer with aprslib + # When new packets come in the consumer will process + # the packet - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - aprs_client.consumer(self.process_packet, raw=False, blocking=False) + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + aprs_client.consumer(self.process_packet, raw=False, blocking=False) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() - LOG.info("Exiting ") + except aprslib.exceptions.ConnectionDrop: + LOG.error("Connection dropped, reconnecting") + time.sleep(5) + # Force the deletion of the client object connected to aprs + # This will cause a reconnect, next time client.get_client() + # is called + client.Client().reset() + # Continue to loop + return True def process_ack_packet(self, packet): ack_num = packet.get("msgNo") @@ -74,7 +113,10 @@ class APRSDRXThread(APRSDThread): messaging.log_message( "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] ) - messaging.ack_dict.update({int(ack_num): 1}) + tracker = messaging.MsgTrack() + tracker.remove(ack_num) + LOG.debug("Length of MsgTrack is {}".format(len(tracker))) + # messaging.ack_dict.update({int(ack_num): 1}) return def process_mic_e_packet(self, packet): @@ -87,12 +129,14 @@ class APRSDRXThread(APRSDThread): fromcall = packet["from"] message = packet.get("message_text", None) - msg_id = packet.get("msgNo", None) - if not msg_id: - msg_id = "0" + msg_id = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id + "Received Message", + packet["raw"], + message, + fromcall=fromcall, + msg_num=msg_id, ) found_command = False @@ -108,7 +152,6 @@ class APRSDRXThread(APRSDThread): if reply is not messaging.NULL_MESSAGE: LOG.debug("Sending '{}'".format(reply)) - # msg = {"fromcall": fromcall, "msg": reply} msg = messaging.TextMessage( self.config["aprs"]["login"], fromcall, reply ) @@ -122,7 +165,6 @@ class APRSDRXThread(APRSDThread): names.sort() reply = "Usage: {}".format(", ".join(names)) - # messaging.send_message(fromcall, reply) msg = messaging.TextMessage( self.config["aprs"]["login"], fromcall, reply ) @@ -130,7 +172,6 @@ class APRSDRXThread(APRSDThread): except Exception as ex: LOG.exception("Plugin failed!!!", ex) reply = "A Plugin failed! try again?" - # messaging.send_message(fromcall, reply) msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) self.msg_queues["tx"].put(msg) @@ -139,7 +180,7 @@ class APRSDRXThread(APRSDThread): ack = messaging.AckMessage( self.config["aprs"]["login"], fromcall, msg_id=msg_id ) - ack.send() + self.msg_queues["tx"].put(ack) LOG.debug("Packet processing complete") def process_packet(self, packet): @@ -172,15 +213,16 @@ class APRSDRXThread(APRSDThread): class APRSDTXThread(APRSDThread): def __init__(self, msg_queues, config): - super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config) + super(APRSDTXThread, self).__init__("TX_MSG") + self.msg_queues = msg_queues + self.config = config - def run(self): - LOG.info("Starting") - while not self.thread_stop: - try: - msg = self.msg_queues["tx"].get(timeout=0.1) - LOG.info("TXQ: got message '{}'".format(msg)) - msg.send() - except queue.Empty: - pass - LOG.info("Exiting ") + def loop(self): + try: + msg = self.msg_queues["tx"].get(timeout=0.1) + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + except queue.Empty: + pass + # Continue to loop + return True From 2659a0b3b945bcbf82605a3ac0384919ee73676b Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 30 Dec 2020 07:32:20 -0500 Subject: [PATCH 6/8] Added support to save/load MsgTrack on exit/start This patch added saving of the MsgTrack list of messages at aprsd exit. The will be loaded at startup unless you pass in the --flush option. --- aprsd/main.py | 28 ++++++++++++++++++++++++---- aprsd/messaging.py | 39 ++++++++++++++++++++++++++++++++++----- aprsd/utils.py | 6 +++++- 3 files changed, 63 insertions(+), 10 deletions(-) diff --git a/aprsd/main.py b/aprsd/main.py index a741477..416c996 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -150,7 +150,6 @@ def signal_handler(signal, frame): ) threads.APRSDThreadList().stop_all() server_event.set() - sys.exit(0) # thread ignores this # end signal_handler @@ -333,7 +332,16 @@ def send_message( default=utils.DEFAULT_CONFIG_FILE, help="The aprsd config file to use for options.", ) -def server(loglevel, quiet, disable_validation, config_file): +@click.option( + "-f", + "--flush", + "flush", + is_flag=True, + show_default=True, + default=False, + help="Flush out all old aged messages on disk.", +) +def server(loglevel, quiet, disable_validation, config_file, flush): """Start the aprsd server process.""" global event @@ -364,6 +372,15 @@ def server(loglevel, quiet, disable_validation, config_file): plugin_manager.setup_plugins() client.Client(config) + # Now load the msgTrack from disk if any + if flush: + LOG.debug("Deleting saved MsgTrack.") + messaging.MsgTrack().flush() + else: + # Try and load saved MsgTrack list + LOG.debug("Loading saved MsgTrack object.") + messaging.MsgTrack().load() + rx_msg_queue = queue.Queue(maxsize=20) tx_msg_queue = queue.Queue(maxsize=20) msg_queues = { @@ -378,6 +395,8 @@ def server(loglevel, quiet, disable_validation, config_file): rx_thread.start() tx_thread.start() + messaging.MsgTrack().restart() + cntr = 0 while not server_event.is_set(): # to keep the log noise down @@ -387,9 +406,10 @@ def server(loglevel, quiet, disable_validation, config_file): cntr += 1 time.sleep(10) + # If there are items in the msgTracker, then save them + tracker = messaging.MsgTrack() + tracker.save() LOG.info("APRSD Exiting.") - sys.exit(0) - # setup and run the main blocking loop if __name__ == "__main__": diff --git a/aprsd/messaging.py b/aprsd/messaging.py index faed185..043e599 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,12 +1,15 @@ import abc import datetime import logging +import os +import pathlib +import pickle import re import threading import time from multiprocessing import RawValue -from aprsd import client, threads +from aprsd import client, threads, utils LOG = logging.getLogger("APRSD") @@ -44,6 +47,7 @@ class MsgTrack(object): lock = None track = {} + total_messages_tracked = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: @@ -56,6 +60,7 @@ class MsgTrack(object): with self.lock: key = int(msg.id) self.track[key] = msg + self.total_messages_tracked += 1 def get(self, id): with self.lock: @@ -82,11 +87,35 @@ class MsgTrack(object): def save(self): """Save this shit to disk?""" - pass + if len(self) > 0: + LOG.info("Need to save tracking to disk") + pickle.dump(self.dump(), open(utils.DEFAULT_SAVE_FILE, "wb+")) - def restore(self): - """Restore this shit?""" - pass + def dump(self): + dump = {} + with self.lock: + for key in self.track.keys(): + dump[key] = self.track[key] + + return dump + + def load(self): + if os.path.exists(utils.DEFAULT_SAVE_FILE): + raw = pickle.load(open(utils.DEFAULT_SAVE_FILE, "rb")) + if raw: + self.track = raw + LOG.debug("Loaded MsgTrack dict from disk.") + LOG.debug(self) + + def restart(self): + """Walk the list of messages and restart them if any.""" + for key in self.track.keys(): + msg = self.track[key] + msg.send() + + def flush(self): + """Nuke the old pickle file that stored the old results from last aprsd run.""" + pathlib.Path(utils.DEFAULT_SAVE_FILE).unlink() class MessageCounter(object): diff --git a/aprsd/utils.py b/aprsd/utils.py index 6d7970b..f92c7d9 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -5,6 +5,7 @@ import functools import os import sys import threading +from pathlib import Path import click import yaml @@ -46,7 +47,10 @@ DEFAULT_CONFIG_DICT = { }, } -DEFAULT_CONFIG_FILE = "~/.config/aprsd/aprsd.yml" +home = str(Path.home()) +DEFAULT_CONFIG_DIR = "{}/.config/aprsd/".format(home) +DEFAULT_SAVE_FILE = "{}/.config/aprsd/aprsd.p".format(home) +DEFAULT_CONFIG_FILE = "{}/.config/aprsd/aprsd.yml".format(home) def synchronized(wrapped): From 28f3daf6d07e82ccb77ebd985726790ebc4cd854 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 30 Dec 2020 08:13:26 -0500 Subject: [PATCH 7/8] Added QueryPlugin Query Plugin looks for ^/?.* and currently only responds with the list of pending messages in the MsgTrack queue. --- aprsd/plugin.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 1969af2..9e69103 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -31,6 +31,7 @@ CORE_PLUGINS = [ "aprsd.plugin.FortunePlugin", "aprsd.plugin.LocationPlugin", "aprsd.plugin.PingPlugin", + "aprsd.plugin.QueryPlugin", "aprsd.plugin.TimePlugin", "aprsd.plugin.WeatherPlugin", "aprsd.plugin.VersionPlugin", @@ -353,6 +354,22 @@ class PingPlugin(APRSDPluginBase): return reply.rstrip() +class QueryPlugin(APRSDPluginBase): + """Query command.""" + + version = "1.0" + command_regex = r"^\?.*" + command_name = "query" + + def command(self, fromcall, message, ack): + LOG.info("Query COMMAND") + + tracker = messaging.MsgTrack() + reply = "Pending Messages ({})".format(len(tracker)) + + return reply + + class TimePlugin(APRSDPluginBase): """Time command.""" From af0d4491c39bec3a8837c90b14db80b4e4c8c45b Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 30 Dec 2020 09:10:28 -0500 Subject: [PATCH 8/8] Added QueryPlugin resend all delayed msgs or Flush This patch also updates the QueryPlugin to allow the configured user to immediately resend all Delayed messages! This patch updates the QueryPlugin to allow the configured user to immediately Flush/delete all messages! --- aprsd/messaging.py | 21 ++++++++++++++++++--- aprsd/plugin.py | 21 +++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 043e599..32229e0 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -88,8 +88,10 @@ class MsgTrack(object): def save(self): """Save this shit to disk?""" if len(self) > 0: - LOG.info("Need to save tracking to disk") + LOG.info("Saving {} tracking messages to disk".format(len(self))) pickle.dump(self.dump(), open(utils.DEFAULT_SAVE_FILE, "wb+")) + else: + self.flush() def dump(self): dump = {} @@ -109,13 +111,26 @@ class MsgTrack(object): def restart(self): """Walk the list of messages and restart them if any.""" + for key in self.track.keys(): msg = self.track[key] - msg.send() + if msg.last_send_attempt < msg.retry_count: + msg.send() + + def restart_delayed(self): + """Walk the list of delayed messages and restart them if any.""" + for key in self.track.keys(): + msg = self.track[key] + if msg.last_send_attempt == msg.retry_count: + msg.last_send_attempt = 0 + msg.send() def flush(self): """Nuke the old pickle file that stored the old results from last aprsd run.""" - pathlib.Path(utils.DEFAULT_SAVE_FILE).unlink() + if os.path.exists(utils.DEFAULT_SAVE_FILE): + pathlib.Path(utils.DEFAULT_SAVE_FILE).unlink() + with self.lock: + self.track = {} class MessageCounter(object): diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 9e69103..8a1b4f8 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -367,6 +367,27 @@ class QueryPlugin(APRSDPluginBase): tracker = messaging.MsgTrack() reply = "Pending Messages ({})".format(len(tracker)) + searchstring = "^" + self.config["ham"]["callsign"] + ".*" + # only I can do admin commands + if re.search(searchstring, fromcall): + r = re.search(r"^\?-\*", message) + if r is not None: + if len(tracker) > 0: + reply = "Resend ALL Delayed msgs" + LOG.debug(reply) + tracker.restart_delayed() + else: + reply = "No Delayed Msgs" + LOG.debug(reply) + return reply + + r = re.search(r"^\?-[fF]!", message) + if r is not None: + reply = "Deleting ALL Delayed msgs." + LOG.debug(reply) + tracker.flush() + return reply + return reply