From 9768003c2a384389a0b6427a7e761efc5675a2ea Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 23 Dec 2020 13:12:04 -0500 Subject: [PATCH] Reworked messaging lib This patch updates the messaging lib to use Message Objects for each message type (text, ack) that know how to send themselves with the same interface. --- aprsd/main.py | 128 +++++++++++----- aprsd/messaging.py | 359 ++++++++++++++++++++++++++------------------- 2 files changed, 297 insertions(+), 190 deletions(-) diff --git a/aprsd/main.py b/aprsd/main.py index d2905c5..c0aa32b 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -21,11 +21,15 @@ # # python included libs +import concurrent.futures +import functools import logging import os +import queue import random import signal import sys +import threading import time from logging import NullHandler from logging.handlers import RotatingFileHandler @@ -53,6 +57,10 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) +# Global threading event to trigger stopping all threads +# When user quits app via CTRL-C +event = None + # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin # USER = "KM6XXX-9" # callsign of this aprs client with SSID @@ -140,11 +148,10 @@ def install(append, case_insensitive, shell, path): def signal_handler(signal, frame): - LOG.info("Ctrl+C, exiting.") - # sys.exit(0) # thread ignores this - os._exit(0) - + global event + LOG.info("Ctrl+C, Sending all threads exit!") + sys.exit(0) # thread ignores this # end signal_handler @@ -172,7 +179,7 @@ def setup_logging(config, loglevel, quiet): LOG.addHandler(sh) -def process_ack_packet(packet): +def process_ack_packet(packet, config): ack_num = packet.get("msgNo") LOG.info("Got ack for message {}".format(ack_num)) messaging.log_message( @@ -182,32 +189,30 @@ def process_ack_packet(packet): return -def process_mic_e_packet(packet): +def process_mic_e_packet(packet, config): LOG.info("Mic-E Packet detected. Currenlty unsupported.") messaging.log_packet(packet) return -def process_message_packet(packet): +def process_message_packet(packet, config, tx_msg_queue): LOG.info("Got a message packet") fromcall = packet["from"] message = packet.get("message_text", None) - msg_number = packet.get("msgNo", None) - if msg_number: - ack = msg_number - else: - ack = "0" + msg_id = packet.get("msgNo", None) + if not msg_id: + msg_id = "0" messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=ack + "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id ) found_command = False # Get singleton of the PM pm = plugin.PluginManager() try: - results = pm.run(fromcall=fromcall, message=message, ack=ack) + results = pm.run(fromcall=fromcall, message=message, ack=msg_id) for reply in results: found_command = True # A plugin can return a null message flag which signals @@ -215,7 +220,11 @@ def process_message_packet(packet): # nothing to reply with, so we avoid replying with a usage string if reply is not messaging.NULL_MESSAGE: LOG.debug("Sending '{}'".format(reply)) - messaging.send_message(fromcall, reply) + + # msg = {"fromcall": fromcall, "msg": reply} + msg = messaging.TextMessage(config["aprs"]["login"], + fromcall, reply) + tx_msg_queue.put(msg) else: LOG.debug("Got NULL MESSAGE from plugin") @@ -225,22 +234,29 @@ def process_message_packet(packet): names.sort() reply = "Usage: {}".format(", ".join(names)) - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(config["aprs"]["login"], + fromcall, reply) + tx_msg_queue.put(msg) except Exception as ex: LOG.exception("Plugin failed!!!", ex) reply = "A Plugin failed! try again?" - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(config["aprs"]["login"], + fromcall, reply) + tx_msg_queue.put(msg) # let any threads do their thing, then ack # send an ack last - messaging.send_ack(fromcall, ack) + ack = messaging.AckMessage(config["aprs"]["login"], fromcall, msg_id=msg_id) + ack.send() LOG.debug("Packet processing complete") -def process_packet(packet): +def process_packet(packet, config=None, msg_queues=None, event=None): """Process a packet recieved from aprs-is server.""" - LOG.debug("Process packet!") + LOG.debug("Process packet! {}".format(msg_queues)) try: LOG.debug("Got message: {}".format(packet)) @@ -250,15 +266,15 @@ def process_packet(packet): if msg_format == "message" and msg: # we want to send the message through the # plugins - process_message_packet(packet) + process_message_packet(packet, config, msg_queues["tx"]) return elif msg_response == "ack": - process_ack_packet(packet) + process_ack_packet(packet, config) return if msg_format == "mic-e": # process a mic-e packet - process_mic_e_packet(packet) + process_mic_e_packet(packet, config) return except (aprslib.ParseError, aprslib.UnknownFormat) as exp: @@ -350,17 +366,15 @@ def send_message( message = packet.get("message_text", None) LOG.info("We got a new message") fromcall = packet["from"] - msg_number = packet.get("msgNo", None) - if msg_number: - ack = msg_number - else: - ack = "0" + msg_number = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=ack + "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_number ) got_response = True # Send the ack back? - messaging.send_ack_direct(fromcall, ack) + ack = messaging.AckMessage(config["aprs"]["login"], + fromcall, msg_id=msg_number) + ack.send_direct() if got_ack and got_response: sys.exit(0) @@ -372,7 +386,9 @@ def send_message( # We should get an ack back as well as a new message # we should bail after we get the ack and send an ack back for the # message - messaging.send_message_direct(tocallsign, command, message_number) + msg = messaging.TextMessage(aprs_login, tocallsign, command) + msg.send_direct() + #messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -389,6 +405,16 @@ def send_message( cl.reset() +def tx_msg_thread(msg_queues=None, event=None): + """Thread to handle sending any messages outbound.""" + LOG.info("TX_MSG_THREAD") + while not event.is_set() or not msg_queues["tx"].empty(): + msg = msg_queues["tx"].get() + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + #messaging.send_message(msg["fromcall"], msg["msg"]) + + # main() ### @main.command() @click.option( @@ -418,7 +444,9 @@ def send_message( ) def server(loglevel, quiet, disable_validation, config_file): """Start the aprsd server process.""" + global event + event = threading.Event() signal.signal(signal.SIGINT, signal_handler) click.echo("Load config") @@ -429,7 +457,6 @@ def server(loglevel, quiet, disable_validation, config_file): # Accept the config as a constructor param, instead of this # hacky global setting email.CONFIG = config - messaging.CONFIG = config setup_logging(config, loglevel, quiet) LOG.info("APRSD Started version: {}".format(aprsd.__version__)) @@ -449,7 +476,22 @@ def server(loglevel, quiet, disable_validation, config_file): plugin_manager.setup_plugins() cl = client.Client(config) - # setup and run the main blocking loop + rx_msg_queue = queue.Queue(maxsize=20) + tx_msg_queue = queue.Queue(maxsize=20) + msg_queues = {"rx": rx_msg_queue, + "tx": tx_msg_queue,} + + rx_msg = threading.Thread( + target=rx_msg_thread, name="RX_msg", kwargs={'msg_queues':msg_queues, + 'event': event} + ) + tx_msg = threading.Thread( + target=tx_msg_thread, name="TX_msg", kwargs={'msg_queues':msg_queues, + 'event': event} + ) + rx_msg.start() + tx_msg.start() + while True: # Now use the helper which uses the singleton aprs_client = client.get_client() @@ -459,14 +501,30 @@ def server(loglevel, quiet, disable_validation, config_file): # This will register a packet consumer with aprslib # When new packets come in the consumer will process # the packet - aprs_client.consumer(process_packet, raw=False) + + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + process_partial = functools.partial(process_packet, + msg_queues=msg_queues, + event=event, config=config) + aprs_client.consumer(process_partial, raw=False) except aprslib.exceptions.ConnectionDrop: LOG.error("Connection dropped, reconnecting") time.sleep(5) # Force the deletion of the client object connected to aprs # This will cause a reconnect, next time client.get_client() # is called - cl.reset() + client.Client().reset() + + + LOG.info("APRSD Exiting.") + tx_msg.join() + + sys.exit(0) + # setup and run the main blocking loop if __name__ == "__main__": diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 18e71d3..25c3c85 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,161 +1,239 @@ +import abc import logging +from multiprocessing import RawValue import pprint import re import threading import time +import uuid from aprsd import client LOG = logging.getLogger("APRSD") -CONFIG = None - -# current aprs radio message number, increments for each message we -# send over rf {int} -message_number = 0 # message_nubmer:ack combos so we stop sending a message after an # ack from radio {int:int} +# FIXME ack_dict = {} # What to return from a plugin if we have processed the message # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 +class MessageCounter(object): + """ + Global message id counter class. -def send_ack_thread(tocall, ack, retry_count): - cl = client.get_client() - tocall = tocall.ljust(9) # pad to nine chars - line = "{}>APRS::{}:ack{}\n".format(CONFIG["aprs"]["login"], tocall, ack) - for i in range(retry_count, 0, -1): - log_message( - "Sending ack", - line.rstrip("\n"), - None, - ack=ack, - tocall=tocall, - retry_number=i, - ) - cl.sendall(line) - # aprs duplicate detection is 30 secs? - # (21 only sends first, 28 skips middle) - time.sleep(31) - # end_send_ack_thread + This is a singleton based class that keeps + an incrementing counter for all messages to + be sent. All new Message objects gets a new + message id, which is the next number available + from the MessageCounter. + + """ + _instance = None + + def __new__(cls, *args, **kwargs): + """Make this a singleton class.""" + if cls._instance is None: + cls._instance = super(MessageCounter, cls).__new__(cls) + cls._instance.val = RawValue('i', 1) + cls._instance.lock = threading.Lock() + return cls._instance + + def increment(self): + with self.lock: + self.val.value += 1 + + @property + def value(self): + with self.lock: + return self.val.value + + def __repr__(self): + with self.lock: + return str(self.val.value) + + def __str__(self): + with self.lock: + return str(self.val.value) -def send_ack(tocall, ack): - LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack)) +class Message(object, metaclass=abc.ABCMeta): + """Base Message Class.""" + # The message id to send over the air + id = 0 + + # Unique identifier for this message + uuid = None + + sent = False + sent_time = None + acked = False + acked_time = None + retry_count = 3 - thread = threading.Thread( - target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count) - ) - thread.start() - # end send_ack() + + def __init__(self, fromcall, tocall, msg_id=None): + self.fromcall = fromcall + self.tocall = tocall + self.uuid = uuid.uuid4() + if not msg_id: + c = MessageCounter() + c.increment() + msg_id = c.value + self.id = msg_id + + @abc.abstractmethod + def send(self): + """Child class must declare.""" + pass -def send_ack_direct(tocall, ack): - """Send an ack message without a separate thread.""" - LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack)) - cl = client.get_client() - fromcall = CONFIG["aprs"]["login"] - line = "{}>APRS::{}:ack{}\n".format(fromcall, tocall, ack) - log_message( - "Sending ack", - line.rstrip("\n"), - None, - ack=ack, - tocall=tocall, - fromcall=fromcall, - ) - cl.sendall(line) +class TextMessage(Message): + """Send regular ARPS text/command messages/replies.""" + + message = None + + def __init__(self, fromcall, tocall, message): + super(TextMessage, self).__init__(fromcall, tocall) + self.message = message + + def __repr__(self): + """Build raw string to send over the air.""" + return "{}>APRS::{}:{}{{{}\n".format( + self.fromcall, self.tocall.ljust(9), + self._filter_for_send(), str(self.id),) + + def __str__(self): + return "From({}) TO({}) - Message({}): '{}'".format( + self.fromcall, self.tocall, + self.id, self.message + ) + + def ack(self): + """Build an Ack Message object from this object.""" + return AckMessage(self.fromcall, self.tocall, msg_id=self.id) + + def _filter_for_send(self): + """Filter and format message string for FCC.""" + # max? ftm400 displays 64, raw msg shows 74 + # and ftm400-send is max 64. setting this to + # 67 displays 64 on the ftm400. (+3 {01 suffix) + # feature req: break long ones into two msgs + message = self.message[:67] + # We all miss George Carlin + return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) + + def send_thread(self): + cl = client.get_client() + for i in range(self.retry_count, 0, -1): + LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") + LOG.debug(pprint.pformat(ack_dict)) + if ack_dict[self.id] != 1: + log_message( + "Sending Message", + repr(self).rstrip("\n"), + self.message, + tocall=self.tocall, + retry_number=i, + ) + # tn.write(line) + cl.sendall(repr(self)) + # decaying repeats, 31 to 93 second intervals + sleeptime = (self.retry_count - i + 1) * 31 + time.sleep(sleeptime) + else: + break + return + # end send_message_thread + + def send(self): + global ack_dict + + # TODO(Hemna) - Need a better metchanism for this. + # This can nuke an ack_dict while it's still being used. + # FIXME FIXME + if len(ack_dict) > 90: + # empty ack dict if it's really big, could result in key error later + LOG.debug( + "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) + ) + ack_dict.clear() + LOG.debug(pprint.pformat(ack_dict)) + LOG.debug( + "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict) + ) + ack_dict[self.id] = 0 # clear ack for this message number + + thread = threading.Thread(target=self.send_thread, name="send_message") + thread.start() + + def send_direct(self): + """Send a message without a separate thread.""" + cl = client.get_client() + log_message( + "Sending Message Direct", repr(self).rstrip("\n"), self.message, tocall=self.tocall, + fromcall=self.fromcall + ) + cl.sendall(repr(self)) -def send_message_thread(tocall, message, this_message_number, retry_count): - cl = client.get_client() - line = "{}>APRS::{}:{}{{{}\n".format( - CONFIG["aprs"]["login"], - tocall, - message, - str(this_message_number), - ) - for i in range(retry_count, 0, -1): - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - LOG.debug(pprint.pformat(ack_dict)) - if ack_dict[this_message_number] != 1: +class AckMessage(Message): + """Class for building Acks and sending them.""" + + + def __init__(self, fromcall, tocall, msg_id): + super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id) + + def __repr__(self): + return "{}>APRS::{}:ack{}\n".format( + self.fromcall, self.tocall.ljust(9), self.id) + + def __str__(self): + return "From({}) TO({}) Ack ({})".format( + self.fromcall, self.tocall, + self.id + ) + + def send_thread(self): + """Separate thread to send acks with retries.""" + cl = client.get_client() + for i in range(self.retry_count, 0, -1): log_message( - "Sending Message", - line.rstrip("\n"), - message, - tocall=tocall, + "Sending ack", + repr(self).rstrip("\n"), + None, + ack=self.id, + tocall=self.tocall, retry_number=i, ) - # tn.write(line) - cl.sendall(line) - # decaying repeats, 31 to 93 second intervals - sleeptime = (retry_count - i + 1) * 31 - time.sleep(sleeptime) - else: - break - return - # end send_message_thread + cl.sendall(repr(self)) + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + time.sleep(31) + # end_send_ack_thread - -def send_message(tocall, message): - global message_number - global ack_dict - - retry_count = 3 - if message_number > 98: # global - message_number = 0 - message_number += 1 - if len(ack_dict) > 90: - # empty ack dict if it's really big, could result in key error later - LOG.debug( - "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) + def send(self): + LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) + thread = threading.Thread( + target=self.send_thread, name="send_ack" ) - ack_dict.clear() - LOG.debug(pprint.pformat(ack_dict)) - LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict) + thread.start() + # end send_ack() + + def send_direct(self): + """Send an ack message without a separate thread.""" + cl = client.get_client() + log_message( + "Sending ack", + repr(self).rstrip("\n"), + None, + ack=self.id, + tocall=self.tocall, + fromcall=self.fromcall, ) - ack_dict[message_number] = 0 # clear ack for this message number - tocall = tocall.ljust(9) # pad to nine chars - - # max? ftm400 displays 64, raw msg shows 74 - # and ftm400-send is max 64. setting this to - # 67 displays 64 on the ftm400. (+3 {01 suffix) - # feature req: break long ones into two msgs - message = message[:67] - # We all miss George Carlin - message = re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - thread = threading.Thread( - target=send_message_thread, - name="send_message", - args=(tocall, message, message_number, retry_count), - ) - thread.start() - return () - # end send_message() - - -def send_message_direct(tocall, message, message_number=None): - """Send a message without a separate thread.""" - cl = client.get_client() - if not message_number: - this_message_number = 1 - else: - this_message_number = message_number - fromcall = CONFIG["aprs"]["login"] - line = "{}>APRS::{}:{}{{{}\n".format( - fromcall, - tocall, - message, - str(this_message_number), - ) - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - log_message( - "Sending Message", line.rstrip("\n"), message, tocall=tocall, fromcall=fromcall - ) - cl.sendall(line) + cl.sendall(repr(self)) def log_packet(packet): @@ -236,32 +314,3 @@ def log_message( log_list.append(" {} _______________ Complete".format(header)) LOG.info("\n".join(log_list)) - - -def process_message(line): - f = re.search("^(.*)>", line) - fromcall = f.group(1) - searchstring = "::%s[ ]*:(.*)" % CONFIG["aprs"]["login"] - # verify this, callsign is padded out with spaces to colon - m = re.search(searchstring, line) - fullmessage = m.group(1) - - ack_attached = re.search("(.*){([0-9A-Z]+)", fullmessage) - # ack formats include: {1, {AB}, {12 - if ack_attached: - # "{##" suffix means radio wants an ack back - # message content - message = ack_attached.group(1) - # suffix number to use in ack - ack_num = ack_attached.group(2) - else: - message = fullmessage - # ack not requested, but lets send one as 0 - ack_num = "0" - - log_message( - "Received message", line, message, fromcall=fromcall, msg_num=str(ack_num) - ) - - return (fromcall, message, ack_num) - # end process_message()