diff --git a/aprsd/client.py b/aprsd/client.py index 58b1be4..9f256c7 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -73,6 +73,7 @@ class Aprsdis(aprslib.IS): def stop(self): self.thread_stop = True + LOG.info("Shutdown Aprsdis client.") def _socket_readlines(self, blocking=False): """ @@ -94,7 +95,6 @@ class Aprsdis(aprslib.IS): [self.sock], [], [], self.select_timeout ) if not readable: - self.logger.info("nothing to read") continue try: diff --git a/aprsd/email.py b/aprsd/email.py index 8c270a1..0e6ffad 100644 --- a/aprsd/email.py +++ b/aprsd/email.py @@ -4,7 +4,6 @@ import imaplib import logging import re import smtplib -import threading import time from email.mime.text import MIMEText @@ -12,7 +11,7 @@ import imapclient import six from validate_email import validate_email -from aprsd import messaging +from aprsd import messaging, threads LOG = logging.getLogger("APRSD") @@ -20,13 +19,6 @@ LOG = logging.getLogger("APRSD") CONFIG = None -def start_thread(): - checkemailthread = threading.Thread( - target=check_email_thread, name="check_email", args=() - ) # args must be tuple - checkemailthread.start() - - def _imap_connect(): imap_port = CONFIG["imap"].get("port", 143) use_ssl = CONFIG["imap"].get("use_ssl", False) @@ -120,6 +112,11 @@ def validate_shortcuts(config): LOG.info("Available shortcuts: {}".format(config["shortcuts"])) +def get_email_from_shortcut(shortcut): + if shortcut in CONFIG.get("shortcuts", None): + return CONFIG["shortcuts"].get(shortcut, None) + + def validate_email_config(config, disable_validation=False): """function to simply ensure we can connect to email services. @@ -221,6 +218,45 @@ def parse_email(msgid, data, server): # end parse_email +def send_email(to_addr, content): + global check_email_delay + + shortcuts = CONFIG["shortcuts"] + email_address = get_email_from_shortcut(to_addr) + LOG.info("Sending Email_________________") + + if to_addr in shortcuts: + LOG.info("To : " + to_addr) + to_addr = email_address + LOG.info(" (" + to_addr + ")") + subject = CONFIG["ham"]["callsign"] + # content = content + "\n\n(NOTE: reply with one line)" + LOG.info("Subject : " + subject) + LOG.info("Body : " + content) + + # check email more often since there's activity right now + check_email_delay = 60 + + msg = MIMEText(content) + msg["Subject"] = subject + msg["From"] = CONFIG["smtp"]["login"] + msg["To"] = to_addr + server = _smtp_connect() + if server: + try: + server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) + except Exception as e: + msg = getattr(e, "message", repr(e)) + LOG.error("Sendmail Error!!!! '{}'", msg) + server.quit() + return -1 + server.quit() + return 0 + + +# end send_email + + def resend_email(count, fromcall): global check_email_delay date = datetime.datetime.now() @@ -257,7 +293,9 @@ def resend_email(count, fromcall): from_addr = shortcuts_inverted[from_addr] # asterisk indicates a resend reply = "-" + from_addr + " * " + body.decode(errors="ignore") - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() msgexists = True if msgexists is not True: @@ -274,7 +312,9 @@ def resend_email(count, fromcall): str(m).zfill(2), str(s).zfill(2), ) - messaging.send_message(fromcall, reply) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() # check email more often since we're resending one now check_email_delay = 60 @@ -283,117 +323,108 @@ def resend_email(count, fromcall): # end resend_email() -def check_email_thread(): - global check_email_delay +class APRSDEmailThread(threads.APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDEmailThread, self).__init__("EmailThread") + self.msg_queues = msg_queues + self.config = config - # LOG.debug("FIXME initial email delay is 10 seconds") - check_email_delay = 60 - while True: - # LOG.debug("Top of check_email_thread.") + def run(self): + global check_email_delay - time.sleep(check_email_delay) + check_email_delay = 60 + past = datetime.datetime.now() + while not self.thread_stop: + time.sleep(5) + # always sleep for 5 seconds and see if we need to check email + # This allows CTRL-C to stop the execution of this loop sooner + # than check_email_delay time + now = datetime.datetime.now() + if now - past > datetime.timedelta(seconds=check_email_delay): + # It's time to check email - # slowly increase delay every iteration, max out at 300 seconds - # any send/receive/resend activity will reset this to 60 seconds - if check_email_delay < 300: - check_email_delay += 1 - LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") + # slowly increase delay every iteration, max out at 300 seconds + # any send/receive/resend activity will reset this to 60 seconds + if check_email_delay < 300: + check_email_delay += 1 + LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") - shortcuts = CONFIG["shortcuts"] - # swap key/value - shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) + shortcuts = CONFIG["shortcuts"] + # swap key/value + shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = "%s-%s-%s" % (day, month, year) + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = "%s-%s-%s" % (day, month, year) - server = None - try: - server = _imap_connect() - except Exception as e: - LOG.exception("Failed to get IMAP server Can't check email.", e) + server = None + try: + server = _imap_connect() + except Exception as e: + LOG.exception("Failed to get IMAP server Can't check email.", e) - if not server: - continue + if not server: + continue - messages = server.search(["SINCE", today]) - # LOG.debug("{} messages received today".format(len(messages))) + messages = server.search(["SINCE", today]) + # LOG.debug("{} messages received today".format(len(messages))) - for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): - envelope = data[b"ENVELOPE"] - # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) - f = re.search( - r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) - ) - if f is not None: - from_addr = f.group(1) + for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): + envelope = data[b"ENVELOPE"] + # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) + f = re.search( + r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) + ) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + + # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) + # if "APRS" not in server.get_flags(msgid)[msgid]: + # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both + taglist = [ + x.decode(errors="ignore") + for x in server.get_flags(msgid)[msgid] + ] + if "APRS" not in taglist: + # if msg not flagged as sent via aprs + server.fetch([msgid], ["RFC822"]) + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + server.remove_flags(msgid, [imapclient.SEEN]) + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + + reply = "-" + from_addr + " " + body.decode(errors="ignore") + # messaging.send_message(CONFIG["ham"]["callsign"], reply) + msg = messaging.TextMessage( + self.config["aprs"]["login"], + self.config["ham"]["callsign"], + reply, + ) + self.msg_queues["tx"].put(msg) + # flag message as sent via aprs + server.add_flags(msgid, ["APRS"]) + # unset seen flag, will stay bold in email client + server.remove_flags(msgid, [imapclient.SEEN]) + # check email more often since we just received an email + check_email_delay = 60 + # reset clock + past = datetime.datetime.now() + server.logout() else: - from_addr = "noaddr" + # We haven't hit the email delay yet. + # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) + pass - # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) - # if "APRS" not in server.get_flags(msgid)[msgid]: - # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both - taglist = [ - x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid] - ] - if "APRS" not in taglist: - # if msg not flagged as sent via aprs - server.fetch([msgid], ["RFC822"]) - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - server.remove_flags(msgid, [imapclient.SEEN]) - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - - reply = "-" + from_addr + " " + body.decode(errors="ignore") - messaging.send_message(CONFIG["ham"]["callsign"], reply) - # flag message as sent via aprs - server.add_flags(msgid, ["APRS"]) - # unset seen flag, will stay bold in email client - server.remove_flags(msgid, [imapclient.SEEN]) - # check email more often since we just received an email - check_email_delay = 60 - - server.logout() + # Remove ourselves from the global threads list + threads.APRSDThreadList().remove(self) + LOG.info("Exiting") # end check_email() - - -def send_email(to_addr, content): - global check_email_delay - - LOG.info("Sending Email_________________") - shortcuts = CONFIG["shortcuts"] - if to_addr in shortcuts: - LOG.info("To : " + to_addr) - to_addr = shortcuts[to_addr] - LOG.info(" (" + to_addr + ")") - subject = CONFIG["ham"]["callsign"] - # content = content + "\n\n(NOTE: reply with one line)" - LOG.info("Subject : " + subject) - LOG.info("Body : " + content) - - # check email more often since there's activity right now - check_email_delay = 60 - - msg = MIMEText(content) - msg["Subject"] = subject - msg["From"] = CONFIG["smtp"]["login"] - msg["To"] = to_addr - server = _smtp_connect() - if server: - try: - server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) - except Exception as e: - msg = getattr(e, "message", repr(e)) - LOG.error("Sendmail Error!!!! '{}'", msg) - server.quit() - return -1 - server.quit() - return 0 - # end send_email diff --git a/aprsd/main.py b/aprsd/main.py index 58dfee0..a741477 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -54,7 +54,7 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -server_threads = [] +server_event = threading.Event() # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin @@ -143,11 +143,13 @@ def install(append, case_insensitive, shell, path): def signal_handler(signal, frame): - global event + global server_vent - LOG.info("Ctrl+C, Sending all threads exit!") - for th in server_threads: - th.stop() + LOG.info( + "Ctrl+C, Sending all threads exit! Can take up to 10 seconds to exit all threads" + ) + threads.APRSDThreadList().stop_all() + server_event.set() sys.exit(0) # thread ignores this @@ -260,7 +262,6 @@ def send_message( got_ack = True else: message = packet.get("message_text", None) - LOG.info("We got a new message") fromcall = packet["from"] msg_number = packet.get("msgNo", "0") messaging.log_message( @@ -289,7 +290,6 @@ def send_message( # message msg = messaging.TextMessage(aprs_login, tocallsign, command) msg.send_direct() - # messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -359,9 +359,6 @@ def server(loglevel, quiet, disable_validation, config_file): LOG.error("Failed to validate email config options") sys.exit(-1) - # start the email thread - email.start_thread() - # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) plugin_manager.setup_plugins() @@ -376,12 +373,20 @@ def server(loglevel, quiet, disable_validation, config_file): rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) - # TODO(hemna): add EmailThread - server_threads.append(rx_thread) - server_threads.append(tx_thread) + email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config) + email_thread.start() rx_thread.start() tx_thread.start() + cntr = 0 + while not server_event.is_set(): + # to keep the log noise down + if cntr % 6 == 0: + tracker = messaging.MsgTrack() + LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker))) + cntr += 1 + time.sleep(10) + LOG.info("APRSD Exiting.") sys.exit(0) # setup and run the main blocking loop diff --git a/aprsd/messaging.py b/aprsd/messaging.py index ded7ac7..faed185 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,13 +1,12 @@ import abc +import datetime import logging -import pprint import re import threading import time -import uuid from multiprocessing import RawValue -from aprsd import client +from aprsd import client, threads LOG = logging.getLogger("APRSD") @@ -21,6 +20,75 @@ ack_dict = {} NULL_MESSAGE = -1 +class MsgTrack(object): + """Class to keep track of outstanding text messages. + + This is a thread safe class that keeps track of active + messages. + + When a message is asked to be sent, it is placed into this + class via it's id. The TextMessage class's send() method + automatically adds itself to this class. When the ack is + recieved from the radio, the message object is removed from + this class. + + # TODO(hemna) + When aprsd is asked to quit this class should be serialized and + saved to disk/db to keep track of the state of outstanding messages. + When aprsd is started, it should try and fetch the saved state, + and reloaded to a live state. + + """ + + _instance = None + lock = None + + track = {} + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(MsgTrack, cls).__new__(cls) + cls._instance.track = {} + cls._instance.lock = threading.Lock() + return cls._instance + + def add(self, msg): + with self.lock: + key = int(msg.id) + self.track[key] = msg + + def get(self, id): + with self.lock: + if id in self.track: + return self.track[id] + + def remove(self, id): + with self.lock: + key = int(id) + if key in self.track.keys(): + del self.track[key] + + def __len__(self): + with self.lock: + return len(self.track) + + def __str__(self): + with self.lock: + result = "{" + for key in self.track.keys(): + result += "{}: {}, ".format(key, str(self.track[key])) + result += "}" + return result + + def save(self): + """Save this shit to disk?""" + pass + + def restore(self): + """Restore this shit?""" + pass + + class MessageCounter(object): """ Global message id counter class. @@ -34,6 +102,7 @@ class MessageCounter(object): """ _instance = None + max_count = 9999 def __new__(cls, *args, **kwargs): """Make this a singleton class.""" @@ -45,7 +114,10 @@ class MessageCounter(object): def increment(self): with self.lock: - self.val.value += 1 + if self.val.value == self.max_count: + self.val.value = 1 + else: + self.val.value += 1 @property def value(self): @@ -67,20 +139,13 @@ class Message(object, metaclass=abc.ABCMeta): # The message id to send over the air id = 0 - # Unique identifier for this message - uuid = None - - sent = False - sent_time = None - acked = False - acked_time = None - retry_count = 3 + last_send_time = None + last_send_attempt = 0 def __init__(self, fromcall, tocall, msg_id=None): self.fromcall = fromcall self.tocall = tocall - self.uuid = uuid.uuid4() if not msg_id: c = MessageCounter() c.increment() @@ -98,9 +163,12 @@ class TextMessage(Message): message = None - def __init__(self, fromcall, tocall, message): - super(TextMessage, self).__init__(fromcall, tocall) + def __init__(self, fromcall, tocall, message, msg_id=None, allow_delay=True): + super(TextMessage, self).__init__(fromcall, tocall, msg_id) self.message = message + # do we try and save this message for later if we don't get + # an ack? Some messages we don't want to do this ever. + self.allow_delay = allow_delay def __repr__(self): """Build raw string to send over the air.""" @@ -112,14 +180,14 @@ class TextMessage(Message): ) def __str__(self): - return "From({}) TO({}) - Message({}): '{}'".format( - self.fromcall, self.tocall, self.id, self.message + delta = "Never" + if self.last_send_time: + now = datetime.datetime.now() + delta = now - self.last_send_time + return "{}>{} Msg({})({}): '{}'".format( + self.fromcall, self.tocall, self.id, delta, self.message ) - def ack(self): - """Build an Ack Message object from this object.""" - return AckMessage(self.fromcall, self.tocall, msg_id=self.id) - def _filter_for_send(self): """Filter and format message string for FCC.""" # max? ftm400 displays 64, raw msg shows 74 @@ -130,49 +198,13 @@ class TextMessage(Message): # We all miss George Carlin return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def send_thread(self): - cl = client.get_client() - for i in range(self.retry_count, 0, -1): - LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") - LOG.debug(pprint.pformat(ack_dict)) - if ack_dict[self.id] != 1: - log_message( - "Sending Message", - repr(self).rstrip("\n"), - self.message, - tocall=self.tocall, - retry_number=i, - ) - # tn.write(line) - cl.sendall(repr(self)) - # decaying repeats, 31 to 93 second intervals - sleeptime = (self.retry_count - i + 1) * 31 - time.sleep(sleeptime) - else: - break - return - # end send_message_thread - def send(self): global ack_dict - # TODO(Hemna) - Need a better metchanism for this. - # This can nuke an ack_dict while it's still being used. - # FIXME FIXME - if len(ack_dict) > 90: - # empty ack dict if it's really big, could result in key error later - LOG.debug( - "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict) - ) - ack_dict.clear() - LOG.debug(pprint.pformat(ack_dict)) - LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." - % len(ack_dict) - ) - ack_dict[self.id] = 0 # clear ack for this message number - - thread = threading.Thread(target=self.send_thread, name="send_message") + tracker = MsgTrack() + tracker.add(self) + LOG.debug("Length of MsgTrack is {}".format(len(tracker))) + thread = SendMessageThread(message=self) thread.start() def send_direct(self): @@ -188,6 +220,73 @@ class TextMessage(Message): cl.sendall(repr(self)) +class SendMessageThread(threads.APRSDThread): + def __init__(self, message): + self.msg = message + name = self.msg.message[:5] + super(SendMessageThread, self).__init__( + "SendMessage-{}-{}".format(self.msg.id, name) + ) + + def loop(self): + """Loop until a message is acked or it gets delayed. + + We only sleep for 5 seconds between each loop run, so + that CTRL-C can exit the app in a short period. Each sleep + means the app quitting is blocked until sleep is done. + So we keep track of the last send attempt and only send if the + last send attempt is old enough. + + """ + cl = client.get_client() + tracker = MsgTrack() + # lets see if the message is still in the tracking queue + msg = tracker.get(self.msg.id) + if not msg: + # The message has been removed from the tracking queue + # So it got acked and we are done. + LOG.info("Message Send Complete via Ack.") + return False + else: + send_now = False + if msg.last_send_attempt == msg.retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Message Send Complete. Max attempts reached.") + return False + + # Message is still outstanding and needs to be acked. + if msg.last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + sleeptime = (msg.last_send_attempt + 1) * 31 + delta = now - msg.last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + send_now = True + + if send_now: + # no attempt time, so lets send it, and start + # tracking the time. + log_message( + "Sending Message", + repr(msg).rstrip("\n"), + msg.message, + tocall=self.msg.tocall, + retry_number=msg.last_send_attempt, + msg_num=msg.id, + ) + cl.sendall(repr(msg)) + msg.last_send_time = datetime.datetime.now() + msg.last_send_attempt += 1 + + time.sleep(5) + # Make sure we get called again. + return True + + class AckMessage(Message): """Class for building Acks and sending them.""" @@ -222,7 +321,7 @@ class AckMessage(Message): def send(self): LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) - thread = threading.Thread(target=self.send_thread, name="send_ack") + thread = SendAckThread(self) thread.start() # end send_ack() @@ -241,6 +340,52 @@ class AckMessage(Message): cl.sendall(repr(self)) +class SendAckThread(threads.APRSDThread): + def __init__(self, ack): + self.ack = ack + super(SendAckThread, self).__init__("SendAck-{}".format(self.ack.id)) + + def loop(self): + """Separate thread to send acks with retries.""" + send_now = False + if self.ack.last_send_attempt == self.ack.retry_count: + # we reached the send limit, don't send again + # TODO(hemna) - Need to put this in a delayed queue? + LOG.info("Ack Send Complete. Max attempts reached.") + return False + + if self.ack.last_send_time: + # Message has a last send time tracking + now = datetime.datetime.now() + + # aprs duplicate detection is 30 secs? + # (21 only sends first, 28 skips middle) + sleeptime = 31 + delta = now - self.ack.last_send_time + if delta > datetime.timedelta(seconds=sleeptime): + # It's time to try to send it again + send_now = True + else: + LOG.debug("Still wating. {}".format(delta)) + else: + send_now = True + + if send_now: + cl = client.get_client() + log_message( + "Sending ack", + repr(self.ack).rstrip("\n"), + None, + ack=self.ack.id, + tocall=self.ack.tocall, + retry_number=self.ack.last_send_attempt, + ) + cl.sendall(repr(self.ack)) + self.ack.last_send_attempt += 1 + self.ack.last_send_time = datetime.datetime.now() + time.sleep(5) + + def log_packet(packet): fromcall = packet.get("from", None) tocall = packet.get("to", None) @@ -272,6 +417,7 @@ def log_message( retry_number=None, ack=None, packet_type=None, + uuid=None, ): """ @@ -315,6 +461,8 @@ def log_message( if msg_num: # LOG.info(" Msg number : {}".format(msg_num)) log_list.append(" Msg number : {}".format(msg_num)) + if uuid: + log_list.append(" UUID : {}".format(uuid)) # LOG.info(" {} _______________ Complete".format(header)) log_list.append(" {} _______________ Complete".format(header)) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 47e491b..1969af2 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -449,8 +449,16 @@ class EmailPlugin(APRSDPluginBase): if a is not None: to_addr = a.group(1) content = a.group(2) + + email_address = email.get_email_from_shortcut(to_addr) + if not email_address: + reply = "Bad email address" + return reply + # send recipient link to aprs.fi map + mapme = False if content == "mapme": + mapme = True content = "Click for my location: http://aprs.fi/{}".format( self.config["ham"]["callsign"] ) @@ -458,6 +466,8 @@ class EmailPlugin(APRSDPluginBase): now = time.time() # see if we sent this msg number recently if ack in self.email_sent_dict: + # BUG(hemna) - when we get a 2 different email command + # with the same ack #, we don't send it. timedelta = now - self.email_sent_dict[ack] if timedelta < 300: # five minutes too_soon = 1 @@ -477,7 +487,10 @@ class EmailPlugin(APRSDPluginBase): ) self.email_sent_dict.clear() self.email_sent_dict[ack] = now - reply = "mapme email sent" + if mapme: + reply = "mapme email sent" + else: + reply = "Email sent." else: LOG.info( "Email for message number " diff --git a/aprsd/threads.py b/aprsd/threads.py index 2b666cb..9de0c0c 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -1,3 +1,4 @@ +import abc import logging import queue import threading @@ -9,30 +10,69 @@ from aprsd import client, messaging, plugin LOG = logging.getLogger("APRSD") +RX_THREAD = "RX" +TX_THREAD = "TX" +EMAIL_THREAD = "Email" -class APRSDThread(threading.Thread): - def __init__(self, name, msg_queues, config): + +class APRSDThreadList(object): + """Singleton class that keeps track of application wide threads.""" + + _instance = None + + threads_list = [] + lock = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(APRSDThreadList, cls).__new__(cls) + cls.lock = threading.Lock() + cls.threads_list = [] + return cls._instance + + def add(self, thread_obj): + with self.lock: + self.threads_list.append(thread_obj) + + def remove(self, thread_obj): + with self.lock: + self.threads_list.remove(thread_obj) + + def stop_all(self): + """Iterate over all threads and call stop on them.""" + with self.lock: + for th in self.threads_list: + th.stop() + + +class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + def __init__(self, name): super(APRSDThread, self).__init__(name=name) - self.msg_queues = msg_queues - self.config = config self.thread_stop = False + APRSDThreadList().add(self) def stop(self): self.thread_stop = True def run(self): + LOG.info("Starting") while not self.thread_stop: - self._run() + can_loop = self.loop() + if not can_loop: + self.stop() + APRSDThreadList().remove(self) + LOG.info("Exiting") class APRSDRXThread(APRSDThread): def __init__(self, msg_queues, config): - super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config) - self.thread_stop = False + super(APRSDRXThread, self).__init__("RX_MSG") + self.msg_queues = msg_queues + self.config = config def stop(self): self.thread_stop = True - self.aprs.stop() + client.get_client().stop() def callback(self, packet): try: @@ -41,32 +81,31 @@ class APRSDRXThread(APRSDThread): except (aprslib.ParseError, aprslib.UnknownFormat): pass - def run(self): - LOG.info("Starting") - while not self.thread_stop: - aprs_client = client.get_client() + def loop(self): + aprs_client = client.get_client() - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet + # setup the consumer of messages and block until a messages + try: + # This will register a packet consumer with aprslib + # When new packets come in the consumer will process + # the packet - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - aprs_client.consumer(self.process_packet, raw=False, blocking=False) + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + aprs_client.consumer(self.process_packet, raw=False, blocking=False) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() - LOG.info("Exiting ") + except aprslib.exceptions.ConnectionDrop: + LOG.error("Connection dropped, reconnecting") + time.sleep(5) + # Force the deletion of the client object connected to aprs + # This will cause a reconnect, next time client.get_client() + # is called + client.Client().reset() + # Continue to loop + return True def process_ack_packet(self, packet): ack_num = packet.get("msgNo") @@ -74,7 +113,10 @@ class APRSDRXThread(APRSDThread): messaging.log_message( "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] ) - messaging.ack_dict.update({int(ack_num): 1}) + tracker = messaging.MsgTrack() + tracker.remove(ack_num) + LOG.debug("Length of MsgTrack is {}".format(len(tracker))) + # messaging.ack_dict.update({int(ack_num): 1}) return def process_mic_e_packet(self, packet): @@ -87,12 +129,14 @@ class APRSDRXThread(APRSDThread): fromcall = packet["from"] message = packet.get("message_text", None) - msg_id = packet.get("msgNo", None) - if not msg_id: - msg_id = "0" + msg_id = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id + "Received Message", + packet["raw"], + message, + fromcall=fromcall, + msg_num=msg_id, ) found_command = False @@ -108,7 +152,6 @@ class APRSDRXThread(APRSDThread): if reply is not messaging.NULL_MESSAGE: LOG.debug("Sending '{}'".format(reply)) - # msg = {"fromcall": fromcall, "msg": reply} msg = messaging.TextMessage( self.config["aprs"]["login"], fromcall, reply ) @@ -122,7 +165,6 @@ class APRSDRXThread(APRSDThread): names.sort() reply = "Usage: {}".format(", ".join(names)) - # messaging.send_message(fromcall, reply) msg = messaging.TextMessage( self.config["aprs"]["login"], fromcall, reply ) @@ -130,7 +172,6 @@ class APRSDRXThread(APRSDThread): except Exception as ex: LOG.exception("Plugin failed!!!", ex) reply = "A Plugin failed! try again?" - # messaging.send_message(fromcall, reply) msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) self.msg_queues["tx"].put(msg) @@ -139,7 +180,7 @@ class APRSDRXThread(APRSDThread): ack = messaging.AckMessage( self.config["aprs"]["login"], fromcall, msg_id=msg_id ) - ack.send() + self.msg_queues["tx"].put(ack) LOG.debug("Packet processing complete") def process_packet(self, packet): @@ -172,15 +213,16 @@ class APRSDRXThread(APRSDThread): class APRSDTXThread(APRSDThread): def __init__(self, msg_queues, config): - super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config) + super(APRSDTXThread, self).__init__("TX_MSG") + self.msg_queues = msg_queues + self.config = config - def run(self): - LOG.info("Starting") - while not self.thread_stop: - try: - msg = self.msg_queues["tx"].get(timeout=0.1) - LOG.info("TXQ: got message '{}'".format(msg)) - msg.send() - except queue.Empty: - pass - LOG.info("Exiting ") + def loop(self): + try: + msg = self.msg_queues["tx"].get(timeout=0.1) + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + except queue.Empty: + pass + # Continue to loop + return True