From 2e90c0bdbb2ee37923b61eb6cc5cdd695d99f53d Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 29 Dec 2020 10:31:16 -0500 Subject: [PATCH] Creation of MsgTrack object and other stuff This patch adds the new MsgTrack object replacing the global ack_dict. the ack_dict was not thread safe. the new MsgTrack is a singleton object that keeps track of all outbound TextMessage objects. When a TextMessage.send() is called it is added to the MsgTrack object, and when an ack is received for that message, the message is removed from the MsgTrack object. TODO: Add an automatic mechanism for saving the messages in MsgTrack so that when CTRL-C is called to exit aprsd server, then the MsgTrack state is saved to storage. When aprsd server is started up again, add the option to try and reload state of MsgTrack. This patch also reworked the email thread into an APRSDThread object that can exit gracefully with CTRL-C. NOTE: Don't call sleep() with a long time (greater than 5 seconds), as it causes a delay in exiting aprsd until the last sleep() finishes. Since aprsd has so many threads now for processing incoming messages and outgoing messages, we need to coordinate all thread operations so that they don't block the exiting of the app. --- aprsd/client.py | 2 +- aprsd/email.py | 253 +++++++++++++++++++++++------------------ aprsd/main.py | 31 +++--- aprsd/messaging.py | 272 ++++++++++++++++++++++++++++++++++----------- aprsd/plugin.py | 15 ++- aprsd/threads.py | 144 +++++++++++++++--------- 6 files changed, 478 insertions(+), 239 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 58b1be4..9f256c7 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -73,6 +73,7 @@ class Aprsdis(aprslib.IS): def stop(self): self.thread_stop = True + LOG.info("Shutdown Aprsdis client.") def _socket_readlines(self, blocking=False): """ @@ -94,7 +95,6 @@ class Aprsdis(aprslib.IS): [self.sock], [], [], self.select_timeout ) if not readable: - self.logger.info("nothing to read") continue try: diff --git a/aprsd/email.py b/aprsd/email.py index 8c270a1..0e6ffad 100644 --- a/aprsd/email.py +++ b/aprsd/email.py @@ -4,7 +4,6 @@ import imaplib import logging import re import smtplib -import threading import time from email.mime.text import MIMEText @@ -12,7 +11,7 @@ import imapclient import six from validate_email import validate_email -from aprsd import messaging +from aprsd import messaging, threads LOG = logging.getLogger("APRSD") @@ -20,13 +19,6 @@ LOG = logging.getLogger("APRSD") CONFIG = None -def start_thread(): - checkemailthread = threading.Thread( - target=check_email_thread, name="check_email", args=() - ) # args must be tuple - checkemailthread.start() - - def _imap_connect(): imap_port = CONFIG["imap"].get("port", 143) use_ssl = CONFIG["imap"].get("use_ssl", False) @@ -120,6 +112,11 @@ def validate_shortcuts(config): LOG.info("Available shortcuts: {}".format(config["shortcuts"])) +def get_email_from_shortcut(shortcut): + if shortcut in CONFIG.get("shortcuts", None): + return CONFIG["shortcuts"].get(shortcut, None) + + def validate_email_config(config, disable_validation=False): """function to simply ensure we can connect to email services. @@ -221,6 +218,45 @@ def parse_email(msgid, data, server): # end parse_email +def send_email(to_addr, content): + global check_email_delay + + shortcuts = CONFIG["shortcuts"] + email_address = get_email_from_shortcut(to_addr) + LOG.info("Sending Email_________________") + + if to_addr in shortcuts: + LOG.info("To : " + to_addr) + to_addr = email_address + LOG.info(" (" + to_addr + ")") + subject = CONFIG["ham"]["callsign"] + # content = content + "\n\n(NOTE: reply with one line)" + LOG.info("Subject : " + subject) + LOG.info("Body : " + content) + + # check email more often since there's activity right now + check_email_delay = 60 + + msg = MIMEText(content) + msg["Subject"] = subject + msg["From"] = CONFIG["smtp"]["login"] + msg["To"] = to_addr + server = _smtp_connect() + if server: + try: + server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) + except Exception as e: + msg = getattr(e, "message", repr(e)) + LOG.error("Sendmail Error!!!! '{}'", msg) + server.quit() + return -1 + server.quit() + return 0 + + +# end send_email + + def resend_email(count, fromcall): global check_email_delay date = datetime.datetime.now() @@ -257,7 +293,9 @@ def resend_email(count, fromcall): from_addr = shortcuts_inverted[from_addr] # asterisk indicates a resend reply = "-" + from_addr + " * " + body.decode(errors="ignore") - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() msgexists = True if msgexists is not True: @@ -274,7 +312,9 @@ def resend_email(count, fromcall): str(m).zfill(2), str(s).zfill(2), ) - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() # check email more often since we're resending one now check_email_delay = 60 @@ -283,117 +323,108 @@ def resend_email(count, fromcall): # end resend_email() -def check_email_thread(): - global check_email_delay +class APRSDEmailThread(threads.APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDEmailThread, self).__init__("EmailThread") + self.msg_queues = msg_queues + self.config = config - # LOG.debug("FIXME initial email delay is 10 seconds") - check_email_delay = 60 - while True: - # LOG.debug("Top of check_email_thread.") + def run(self): + global check_email_delay - time.sleep(check_email_delay) + check_email_delay = 60 + past = datetime.datetime.now() + while not self.thread_stop: + time.sleep(5) + # always sleep for 5 seconds and see if we need to check email + # This allows CTRL-C to stop the execution of this loop sooner + # than check_email_delay time + now = datetime.datetime.now() + if now - past > datetime.timedelta(seconds=check_email_delay): + # It's time to check email - # slowly increase delay every iteration, max out at 300 seconds - # any send/receive/resend activity will reset this to 60 seconds - if check_email_delay < 300: - check_email_delay += 1 - LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") + # slowly increase delay every iteration, max out at 300 seconds + # any send/receive/resend activity will reset this to 60 seconds + if check_email_delay < 300: + check_email_delay += 1 + LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") - shortcuts = CONFIG["shortcuts"] - # swap key/value - shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) + shortcuts = CONFIG["shortcuts"] + # swap key/value + shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = "%s-%s-%s" % (day, month, year) + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = "%s-%s-%s" % (day, month, year) - server = None - try: - server = _imap_connect() - except Exception as e: - LOG.exception("Failed to get IMAP server Can't check email.", e) + server = None + try: + server = _imap_connect() + except Exception as e: + LOG.exception("Failed to get IMAP server Can't check email.", e) - if not server: - continue + if not server: + continue - messages = server.search(["SINCE", today]) - # LOG.debug("{} messages received today".format(len(messages))) + messages = server.search(["SINCE", today]) + # LOG.debug("{} messages received today".format(len(messages))) - for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): - envelope = data[b"ENVELOPE"] - # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) - f = re.search( - r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) - ) - if f is not None: - from_addr = f.group(1) + for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): + envelope = data[b"ENVELOPE"] + # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) + f = re.search( + r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) + ) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + + # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) + # if "APRS" not in server.get_flags(msgid)[msgid]: + # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both + taglist = [ + x.decode(errors="ignore") + for x in server.get_flags(msgid)[msgid] + ] + if "APRS" not in taglist: + # if msg not flagged as sent via aprs + server.fetch([msgid], ["RFC822"]) + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + server.remove_flags(msgid, [imapclient.SEEN]) + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + + reply = "-" + from_addr + " " + body.decode(errors="ignore") + # messaging.send_message(CONFIG["ham"]["callsign"], reply) + msg = messaging.TextMessage( + self.config["aprs"]["login"], + self.config["ham"]["callsign"], + reply, + ) + self.msg_queues["tx"].put(msg) + # flag message as sent via aprs + server.add_flags(msgid, ["APRS"]) + # unset seen flag, will stay bold in email client + server.remove_flags(msgid, [imapclient.SEEN]) + # check email more often since we just received an email + check_email_delay = 60 + # reset clock + past = datetime.datetime.now() + server.logout() else: - from_addr = "noaddr" + # We haven't hit the email delay yet. + # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) + pass - # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) - # if "APRS" not in server.get_flags(msgid)[msgid]: - # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both - taglist = [ - x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid] - ] - if "APRS" not in taglist: - # if msg not flagged as sent via aprs - server.fetch([msgid], ["RFC822"]) - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - server.remove_flags(msgid, [imapclient.SEEN]) - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - - reply = "-" + from_addr + " " + body.decode(errors="ignore") - messaging.send_message(CONFIG["ham"]["callsign"], reply) - # flag message as sent via aprs - server.add_flags(msgid, ["APRS"]) - # unset seen flag, will stay bold in email client - server.remove_flags(msgid, [imapclient.SEEN]) - # check email more often since we just received an email - check_email_delay = 60 - - server.logout() + # Remove ourselves from the global threads list + threads.APRSDThreadList().remove(self) + LOG.info("Exiting") # end check_email() - - -def send_email(to_addr, content): - global check_email_delay - - LOG.info("Sending Email_________________") - shortcuts = CONFIG["shortcuts"] - if to_addr in shortcuts: - LOG.info("To : " + to_addr) - to_addr = shortcuts[to_addr] - LOG.info(" (" + to_addr + ")") - subject = CONFIG["ham"]["callsign"] - # content = content + "\n\n(NOTE: reply with one line)" - LOG.info("Subject : " + subject) - LOG.info("Body : " + content) - - # check email more often since there's activity right now - check_email_delay = 60 - - msg = MIMEText(content) - msg["Subject"] = subject - msg["From"] = CONFIG["smtp"]["login"] - msg["To"] = to_addr - server = _smtp_connect() - if server: - try: - server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) - except Exception as e: - msg = getattr(e, "message", repr(e)) - LOG.error("Sendmail Error!!!! '{}'", msg) - server.quit() - return -1 - server.quit() - return 0 - # end send_email diff --git a/aprsd/main.py b/aprsd/main.py index 58dfee0..a741477 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -54,7 +54,7 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -server_threads = [] +server_event = threading.Event() # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin @@ -143,11 +143,13 @@ def install(append, case_insensitive, shell, path): def signal_handler(signal, frame): - global event + global server_vent - LOG.info("Ctrl+C, Sending all threads exit!") - for th in server_threads: - th.stop() + LOG.info( + "Ctrl+C, Sending all threads exit! Can take up to 10 seconds to exit all threads" + ) + threads.APRSDThreadList().stop_all() + server_event.set() sys.exit(0) # thread ignores this @@ -260,7 +262,6 @@ def send_message( got_ack = True else: message = packet.get("message_text", None) - LOG.info("We got a new message") fromcall = packet["from"] msg_number = packet.get("msgNo", "0") messaging.log_message( @@ -289,7 +290,6 @@ def send_message( # message msg = messaging.TextMessage(aprs_login, tocallsign, command) msg.send_direct() - # messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -359,9 +359,6 @@ def server(loglevel, quiet, disable_validation, config_file): LOG.error("Failed to validate email config options") sys.exit(-1) - # start the email thread - email.start_thread() - # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) plugin_manager.setup_plugins() @@ -376,12 +373,20 @@ def server(loglevel, quiet, disable_validation, config_file): rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) - # TODO(hemna): add EmailThread - server_threads.append(rx_thread) - server_threads.append(tx_thread) + email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config) + email_thread.start() rx_thread.start() tx_thread.start() + cntr = 0 + while not server_event.is_set(): + # to keep the log noise down + if cntr % 6 == 0: + tracker = messaging.MsgTrack() + LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker))) + cntr += 1 + time.sleep(10) + LOG.info("APRSD Exiting.") sys.exit(0) # setup and run the main blocking loop diff --git a/aprsd/messaging.py b/aprsd/messaging.py index ded7ac7..faed185 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,13 +1,12 @@ import abc +import datetime import logging -import pprint import re import threading import time -import uuid from multiprocessing import RawValue -from aprsd import client +from aprsd import client, threads LOG = logging.getLogger("APRSD") @@ -21,6 +20,75 @@ ack_dict = {} NULL_MESSAGE = -1 +class MsgTrack(object): + """Class to keep track of outstanding text messages. + + This is a thread safe class that keeps track of active + messages. + + When a message is asked to be sent, it is placed into this + class via it's id. The TextMessage class's send() method + automatically adds itself to this class. When the ack is + recieved from the radio, the message object is removed from + this class. + + # TODO(hemna) + When aprsd is asked to quit this class should be serialized and + saved to disk/db to keep track of the state of outstanding messages. + When aprsd is started, it should try and fetch the saved state, + and reloaded to a live state. + + """ + + _instance = None + lock = None + + track = {} + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(MsgTrack, cls).__new__(cls) + cls._instance.track = {} + cls._instance.lock = threading.Lock() + return cls._instance + + def add(self, msg): + with self.lock: + key = int(msg.id) + self.track[key] = msg + + def get(self, id): + with self.lock: + if id in self.track: + return self.track[id] + + def remove(self, id): + with self.lock: + key = int(id) + if key in self.track.keys(): + del self.track[key] + + def __len__(self): + with self.lock: + return len(self.track) + + def __str__(self): + with self.lock: + result = "{" + for key in self.track.keys(): + result += "{}: {}, ".format(key, str(self.track[key])) + result += "}" + return result + + def save(self): + """Save this shit to disk?""" + pass + + def restore(self): + """Restore this shit?""" + pass + + class MessageCounter(object): """ Global message id counter class. @@ -34,6 +102,7 @@ class MessageCounter(object): """ _instance = None + max_count = 9999 def __new__(cls, *args, **kwargs): """Make this a singleton class.""" @@ -45,7 +114,10 @@ class MessageCounter(object): def increment(self): with self.lock: - self.val.value += 1 + if self.val.value == self.max_count: + self.val.value = 1 + else: + self.val.value += 1 @property def value(self): @@ -67,20 +139,13 @@ class Message(object, metaclass=abc.ABCMeta): # The message id to send over the air id = 0 - # Unique identifier for this message - uuid = None - - sent = False - sent_time = None - acked = False - acked_time = None - retry_count = 3 + last_send_time = None + last_send_attempt = 0 def __init__(self, fromcall, tocall, msg_id=None): self.fromcall = fromcall self.tocall = tocall - self.uuid = uuid.uuid4() if not msg_id: c = MessageCounter() c.increment() @@ -98,9 +163,12 @@ class TextMessage(Message): message = None - def __init__(self, fromcall, tocall, message): - super(TextMessage, self).__init__(fromcall, tocall) + def __init__(self, fromcall, tocall, message, msg_id=None, allow_delay=True): + super(TextMessage, self).__init__(fromcall, tocall, msg_id) self.message = message + # do we try and save this message for later if we don't get + # an ack? Some messages we don't want to do this ever. + self.allow_delay = allow_delay def __repr__(self): """Build raw string to send over the air.""" @@ -112,14 +180,14 @@ class TextMessage(Message): ) def __str__(self): - return "From({}) TO({}) - Message({}): '{}'".format( - self.fromcall, self.tocall, self.id, self.message + delta = "Never" + if self.last_send_time: + now = datetime.datetime.now() + delta = now - self.last_send_time + return "{}>{} Msg({})({}): '{}'".format( + self.fromcall, self.tocall, self.id, delta, self.message ) - def ack(self): - """Build an Ack Message object from this object.""" - return AckMessage(self.fromcall, self.tocall, msg_id=self.id) - def _filter_for_send(self): """Filter and format message string for FCC.""" # max? ftm400 displays 64, raw msg shows 74 @@ -130,49 +198,13 @@ class TextMessage(Message): # We all miss George Carlin return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def send_thread(self): - cl = client.get_client() - for i in range(self.retry_count, 0, -1): - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - LOG.debug(pprint.pformat(ack_dict)) - if ack_dict[self.id] != 1: - log_message( - "Sending Message", - repr(self).rstrip("\n"), - self.message, - tocall=self.tocall, - retry_number=i, - ) - # tn.write(line) - cl.sendall(repr(self)) - # decaying repeats, 31 to 93 second intervals - sleeptime = (self.retry_count - i + 1) * 31 - time.sleep(sleeptime) - else: - break - return - # end send_message_thread - def send(self): global ack_dict - # TODO(Hemna) - Need a better metchanism for this. - # This can nuke an ack_dict while it's still being used. - # FIXME FIXME - if len(ack_dict) > 90: - # empty ack dict if it's really big, could result in key error later - LOG.debug( - "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) - ) - ack_dict.clear() - LOG.debug(pprint.pformat(ack_dict)) - LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." - % len(ack_dict) - ) - ack_dict[self.id] = 0 # clear ack for this message number - - thread = threading.Thread(target=self.send_thread, name="send_message") + tracker = MsgTrack() + tracker.add(self) + LOG.debug("Length of MsgTrack is {}".format(len(tracker))) + thread = SendMessageThread(message=self) thread.start() def send_direct(self): @@ -188,6 +220,73 @@ class TextMessage(Message): cl.sendall(repr(self)) +class SendMessageThread(threads.APRSDThread): + def __init__(self, message): + self.msg = message + name = self.msg.message[:5] + super(SendMessageThread, self).__init__( + "SendMessage-{}-{}".format(self.msg.id, name) + ) + + def loop(self): + """Loop until a message is acked or it gets delayed. + + We only sleep for 5 seconds between each loop run, so + that CTRL-C can exit the app in a short period. Each sleep + means the app quitting is blocked until sleep is done. + So we keep track of the last send attempt and only send if the + last send attempt is old enough. + + """ + cl = client.get_client() + tracker = MsgTrack() + # lets see if the message is still in the tracking queue + msg = tracker.get(self.msg.id) + if not msg: + # The message has been removed from the tracking queue + # So it got acked and we are done. + LOG.info("Message Send Complete via Ack.") + return False + else: + send_now = False + if msg.last_send_attempt == msg.retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Message Send Complete. Max attempts reached.") + return False + + # Message is still outstanding and needs to be acked. + if msg.last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + sleeptime = (msg.last_send_attempt + 1) * 31 + delta = now - msg.last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + send_now = True + + if send_now: + # no attempt time, so lets send it, and start + # tracking the time. + log_message( + "Sending Message", + repr(msg).rstrip("\n"), + msg.message, + tocall=self.msg.tocall, + retry_number=msg.last_send_attempt, + msg_num=msg.id, + ) + cl.sendall(repr(msg)) + msg.last_send_time = datetime.datetime.now() + msg.last_send_attempt += 1 + + time.sleep(5) + # Make sure we get called again. + return True + + class AckMessage(Message): """Class for building Acks and sending them.""" @@ -222,7 +321,7 @@ class AckMessage(Message): def send(self): LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) - thread = threading.Thread(target=self.send_thread, name="send_ack") + thread = SendAckThread(self) thread.start() # end send_ack() @@ -241,6 +340,52 @@ class AckMessage(Message): cl.sendall(repr(self)) +class SendAckThread(threads.APRSDThread): + def __init__(self, ack): + self.ack = ack + super(SendAckThread, self).__init__("SendAck-{}".format(self.ack.id)) + + def loop(self): + """Separate thread to send acks with retries.""" + send_now = False + if self.ack.last_send_attempt == self.ack.retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Ack Send Complete. Max attempts reached.") + return False + + if self.ack.last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + sleeptime = 31 + delta = now - self.ack.last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + LOG.debug("Still wating. {}".format(delta)) + else: + send_now = True + + if send_now: + cl = client.get_client() + log_message( + "Sending ack", + repr(self.ack).rstrip("\n"), + None, + ack=self.ack.id, + tocall=self.ack.tocall, + retry_number=self.ack.last_send_attempt, + ) + cl.sendall(repr(self.ack)) + self.ack.last_send_attempt += 1 + self.ack.last_send_time = datetime.datetime.now() + time.sleep(5) + + def log_packet(packet): fromcall = packet.get("from", None) tocall = packet.get("to", None) @@ -272,6 +417,7 @@ def log_message( retry_number=None, ack=None, packet_type=None, + uuid=None, ): """ @@ -315,6 +461,8 @@ def log_message( if msg_num: # LOG.info(" Msg number : {}".format(msg_num)) log_list.append(" Msg number : {}".format(msg_num)) + if uuid: + log_list.append(" UUID : {}".format(uuid)) # LOG.info(" {} _______________ Complete".format(header)) log_list.append(" {} _______________ Complete".format(header)) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 47e491b..1969af2 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -449,8 +449,16 @@ class EmailPlugin(APRSDPluginBase): if a is not None: to_addr = a.group(1) content = a.group(2) + + email_address = email.get_email_from_shortcut(to_addr) + if not email_address: + reply = "Bad email address" + return reply + # send recipient link to aprs.fi map + mapme = False if content == "mapme": + mapme = True content = "Click for my location: http://aprs.fi/{}".format( self.config["ham"]["callsign"] ) @@ -458,6 +466,8 @@ class EmailPlugin(APRSDPluginBase): now = time.time() # see if we sent this msg number recently if ack in self.email_sent_dict: + # BUG(hemna) - when we get a 2 different email command + # with the same ack #, we don't send it. timedelta = now - self.email_sent_dict[ack] if timedelta < 300: # five minutes too_soon = 1 @@ -477,7 +487,10 @@ class EmailPlugin(APRSDPluginBase): ) self.email_sent_dict.clear() self.email_sent_dict[ack] = now - reply = "mapme email sent" + if mapme: + reply = "mapme email sent" + else: + reply = "Email sent." else: LOG.info( "Email for message number " diff --git a/aprsd/threads.py b/aprsd/threads.py index 2b666cb..9de0c0c 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -1,3 +1,4 @@ +import abc import logging import queue import threading @@ -9,30 +10,69 @@ from aprsd import client, messaging, plugin LOG = logging.getLogger("APRSD") +RX_THREAD = "RX" +TX_THREAD = "TX" +EMAIL_THREAD = "Email" -class APRSDThread(threading.Thread): - def __init__(self, name, msg_queues, config): + +class APRSDThreadList(object): + """Singleton class that keeps track of application wide threads.""" + + _instance = None + + threads_list = [] + lock = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(APRSDThreadList, cls).__new__(cls) + cls.lock = threading.Lock() + cls.threads_list = [] + return cls._instance + + def add(self, thread_obj): + with self.lock: + self.threads_list.append(thread_obj) + + def remove(self, thread_obj): + with self.lock: + self.threads_list.remove(thread_obj) + + def stop_all(self): + """Iterate over all threads and call stop on them.""" + with self.lock: + for th in self.threads_list: + th.stop() + + +class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + def __init__(self, name): super(APRSDThread, self).__init__(name=name) - self.msg_queues = msg_queues - self.config = config self.thread_stop = False + APRSDThreadList().add(self) def stop(self): self.thread_stop = True def run(self): + LOG.info("Starting") while not self.thread_stop: - self._run() + can_loop = self.loop() + if not can_loop: + self.stop() + APRSDThreadList().remove(self) + LOG.info("Exiting") class APRSDRXThread(APRSDThread): def __init__(self, msg_queues, config): - super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config) - self.thread_stop = False + super(APRSDRXThread, self).__init__("RX_MSG") + self.msg_queues = msg_queues + self.config = config def stop(self): self.thread_stop = True - self.aprs.stop() + client.get_client().stop() def callback(self, packet): try: @@ -41,32 +81,31 @@ class APRSDRXThread(APRSDThread): except (aprslib.ParseError, aprslib.UnknownFormat): pass - def run(self): - LOG.info("Starting") - while not self.thread_stop: - aprs_client = client.get_client() + def loop(self): + aprs_client = client.get_client() - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet + # setup the consumer of messages and block until a messages + try: + # This will register a packet consumer with aprslib + # When new packets come in the consumer will process + # the packet - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - aprs_client.consumer(self.process_packet, raw=False, blocking=False) + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + aprs_client.consumer(self.process_packet, raw=False, blocking=False) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() - LOG.info("Exiting ") + except aprslib.exceptions.ConnectionDrop: + LOG.error("Connection dropped, reconnecting") + time.sleep(5) + # Force the deletion of the client object connected to aprs + # This will cause a reconnect, next time client.get_client() + # is called + client.Client().reset() + # Continue to loop + return True def process_ack_packet(self, packet): ack_num = packet.get("msgNo") @@ -74,7 +113,10 @@ class APRSDRXThread(APRSDThread): messaging.log_message( "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] ) - messaging.ack_dict.update({int(ack_num): 1}) + tracker = messaging.MsgTrack() + tracker.remove(ack_num) + LOG.debug("Length of MsgTrack is {}".format(len(tracker))) + # messaging.ack_dict.update({int(ack_num): 1}) return def process_mic_e_packet(self, packet): @@ -87,12 +129,14 @@ class APRSDRXThread(APRSDThread): fromcall = packet["from"] message = packet.get("message_text", None) - msg_id = packet.get("msgNo", None) - if not msg_id: - msg_id = "0" + msg_id = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id + "Received Message", + packet["raw"], + message, + fromcall=fromcall, + msg_num=msg_id, ) found_command = False @@ -108,7 +152,6 @@ class APRSDRXThread(APRSDThread): if reply is not messaging.NULL_MESSAGE: LOG.debug("Sending '{}'".format(reply)) - # msg = {"fromcall": fromcall, "msg": reply} msg = messaging.TextMessage( self.config["aprs"]["login"], fromcall, reply ) @@ -122,7 +165,6 @@ class APRSDRXThread(APRSDThread): names.sort() reply = "Usage: {}".format(", ".join(names)) - # messaging.send_message(fromcall, reply) msg = messaging.TextMessage( self.config["aprs"]["login"], fromcall, reply ) @@ -130,7 +172,6 @@ class APRSDRXThread(APRSDThread): except Exception as ex: LOG.exception("Plugin failed!!!", ex) reply = "A Plugin failed! try again?" - # messaging.send_message(fromcall, reply) msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) self.msg_queues["tx"].put(msg) @@ -139,7 +180,7 @@ class APRSDRXThread(APRSDThread): ack = messaging.AckMessage( self.config["aprs"]["login"], fromcall, msg_id=msg_id ) - ack.send() + self.msg_queues["tx"].put(ack) LOG.debug("Packet processing complete") def process_packet(self, packet): @@ -172,15 +213,16 @@ class APRSDRXThread(APRSDThread): class APRSDTXThread(APRSDThread): def __init__(self, msg_queues, config): - super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config) + super(APRSDTXThread, self).__init__("TX_MSG") + self.msg_queues = msg_queues + self.config = config - def run(self): - LOG.info("Starting") - while not self.thread_stop: - try: - msg = self.msg_queues["tx"].get(timeout=0.1) - LOG.info("TXQ: got message '{}'".format(msg)) - msg.send() - except queue.Empty: - pass - LOG.info("Exiting ") + def loop(self): + try: + msg = self.msg_queues["tx"].get(timeout=0.1) + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + except queue.Empty: + pass + # Continue to loop + return True