Creation of MsgTrack object and other stuff

This patch adds the new MsgTrack object replacing the
global ack_dict.  the ack_dict was not thread safe.
the new MsgTrack is a singleton object that keeps track of
all outbound TextMessage objects.  When a TextMessage.send() is called
it is added to the MsgTrack object, and when an ack is received for that
message, the message is removed from the MsgTrack object.

TODO:  Add an automatic mechanism for saving the messages in MsgTrack
so that when CTRL-C is called to exit aprsd server, then the MsgTrack
state is saved to storage.   When aprsd server is started up again, add
the option to try and reload state of MsgTrack.

This patch also reworked the email thread into an APRSDThread object
that can exit gracefully with CTRL-C.

NOTE: Don't call sleep() with a long time (greater than 5 seconds), as
it causes a delay in exiting aprsd until the last sleep() finishes.
Since aprsd has so many threads now for processing incoming messages and
outgoing messages, we need to coordinate all thread operations so that
they don't block the exiting of the app.
This commit is contained in:
Hemna 2020-12-29 10:31:16 -05:00
parent 9d3ede6e71
commit 2e90c0bdbb
6 changed files with 478 additions and 239 deletions

View File

@ -73,6 +73,7 @@ class Aprsdis(aprslib.IS):
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
LOG.info("Shutdown Aprsdis client.")
def _socket_readlines(self, blocking=False): def _socket_readlines(self, blocking=False):
""" """
@ -94,7 +95,6 @@ class Aprsdis(aprslib.IS):
[self.sock], [], [], self.select_timeout [self.sock], [], [], self.select_timeout
) )
if not readable: if not readable:
self.logger.info("nothing to read")
continue continue
try: try:

View File

@ -4,7 +4,6 @@ import imaplib
import logging import logging
import re import re
import smtplib import smtplib
import threading
import time import time
from email.mime.text import MIMEText from email.mime.text import MIMEText
@ -12,7 +11,7 @@ import imapclient
import six import six
from validate_email import validate_email from validate_email import validate_email
from aprsd import messaging from aprsd import messaging, threads
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -20,13 +19,6 @@ LOG = logging.getLogger("APRSD")
CONFIG = None CONFIG = None
def start_thread():
checkemailthread = threading.Thread(
target=check_email_thread, name="check_email", args=()
) # args must be tuple
checkemailthread.start()
def _imap_connect(): def _imap_connect():
imap_port = CONFIG["imap"].get("port", 143) imap_port = CONFIG["imap"].get("port", 143)
use_ssl = CONFIG["imap"].get("use_ssl", False) use_ssl = CONFIG["imap"].get("use_ssl", False)
@ -120,6 +112,11 @@ def validate_shortcuts(config):
LOG.info("Available shortcuts: {}".format(config["shortcuts"])) LOG.info("Available shortcuts: {}".format(config["shortcuts"]))
def get_email_from_shortcut(shortcut):
if shortcut in CONFIG.get("shortcuts", None):
return CONFIG["shortcuts"].get(shortcut, None)
def validate_email_config(config, disable_validation=False): def validate_email_config(config, disable_validation=False):
"""function to simply ensure we can connect to email services. """function to simply ensure we can connect to email services.
@ -221,6 +218,45 @@ def parse_email(msgid, data, server):
# end parse_email # end parse_email
def send_email(to_addr, content):
global check_email_delay
shortcuts = CONFIG["shortcuts"]
email_address = get_email_from_shortcut(to_addr)
LOG.info("Sending Email_________________")
if to_addr in shortcuts:
LOG.info("To : " + to_addr)
to_addr = email_address
LOG.info(" (" + to_addr + ")")
subject = CONFIG["ham"]["callsign"]
# content = content + "\n\n(NOTE: reply with one line)"
LOG.info("Subject : " + subject)
LOG.info("Body : " + content)
# check email more often since there's activity right now
check_email_delay = 60
msg = MIMEText(content)
msg["Subject"] = subject
msg["From"] = CONFIG["smtp"]["login"]
msg["To"] = to_addr
server = _smtp_connect()
if server:
try:
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
except Exception as e:
msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg)
server.quit()
return -1
server.quit()
return 0
# end send_email
def resend_email(count, fromcall): def resend_email(count, fromcall):
global check_email_delay global check_email_delay
date = datetime.datetime.now() date = datetime.datetime.now()
@ -257,7 +293,9 @@ def resend_email(count, fromcall):
from_addr = shortcuts_inverted[from_addr] from_addr = shortcuts_inverted[from_addr]
# asterisk indicates a resend # asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors="ignore") reply = "-" + from_addr + " * " + body.decode(errors="ignore")
messaging.send_message(fromcall, reply) # messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply)
msg.send()
msgexists = True msgexists = True
if msgexists is not True: if msgexists is not True:
@ -274,7 +312,9 @@ def resend_email(count, fromcall):
str(m).zfill(2), str(m).zfill(2),
str(s).zfill(2), str(s).zfill(2),
) )
messaging.send_message(fromcall, reply) # messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply)
msg.send()
# check email more often since we're resending one now # check email more often since we're resending one now
check_email_delay = 60 check_email_delay = 60
@ -283,117 +323,108 @@ def resend_email(count, fromcall):
# end resend_email() # end resend_email()
def check_email_thread(): class APRSDEmailThread(threads.APRSDThread):
global check_email_delay def __init__(self, msg_queues, config):
super(APRSDEmailThread, self).__init__("EmailThread")
self.msg_queues = msg_queues
self.config = config
# LOG.debug("FIXME initial email delay is 10 seconds") def run(self):
check_email_delay = 60 global check_email_delay
while True:
# LOG.debug("Top of check_email_thread.")
time.sleep(check_email_delay) check_email_delay = 60
past = datetime.datetime.now()
while not self.thread_stop:
time.sleep(5)
# always sleep for 5 seconds and see if we need to check email
# This allows CTRL-C to stop the execution of this loop sooner
# than check_email_delay time
now = datetime.datetime.now()
if now - past > datetime.timedelta(seconds=check_email_delay):
# It's time to check email
# slowly increase delay every iteration, max out at 300 seconds # slowly increase delay every iteration, max out at 300 seconds
# any send/receive/resend activity will reset this to 60 seconds # any send/receive/resend activity will reset this to 60 seconds
if check_email_delay < 300: if check_email_delay < 300:
check_email_delay += 1 check_email_delay += 1
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
shortcuts = CONFIG["shortcuts"] shortcuts = CONFIG["shortcuts"]
# swap key/value # swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
date = datetime.datetime.now() date = datetime.datetime.now()
month = date.strftime("%B")[:3] # Nov, Mar, Apr month = date.strftime("%B")[:3] # Nov, Mar, Apr
day = date.day day = date.day
year = date.year year = date.year
today = "%s-%s-%s" % (day, month, year) today = "%s-%s-%s" % (day, month, year)
server = None server = None
try: try:
server = _imap_connect() server = _imap_connect()
except Exception as e: except Exception as e:
LOG.exception("Failed to get IMAP server Can't check email.", e) LOG.exception("Failed to get IMAP server Can't check email.", e)
if not server: if not server:
continue continue
messages = server.search(["SINCE", today]) messages = server.search(["SINCE", today])
# LOG.debug("{} messages received today".format(len(messages))) # LOG.debug("{} messages received today".format(len(messages)))
for msgid, data in server.fetch(messages, ["ENVELOPE"]).items(): for msgid, data in server.fetch(messages, ["ENVELOPE"]).items():
envelope = data[b"ENVELOPE"] envelope = data[b"ENVELOPE"]
# LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date))
f = re.search( f = re.search(
r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0]) r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0])
) )
if f is not None: if f is not None:
from_addr = f.group(1) from_addr = f.group(1)
else:
from_addr = "noaddr"
# LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid]))
# if "APRS" not in server.get_flags(msgid)[msgid]:
# in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both
taglist = [
x.decode(errors="ignore")
for x in server.get_flags(msgid)[msgid]
]
if "APRS" not in taglist:
# if msg not flagged as sent via aprs
server.fetch([msgid], ["RFC822"])
(body, from_addr) = parse_email(msgid, data, server)
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN])
if from_addr in shortcuts_inverted:
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " " + body.decode(errors="ignore")
# messaging.send_message(CONFIG["ham"]["callsign"], reply)
msg = messaging.TextMessage(
self.config["aprs"]["login"],
self.config["ham"]["callsign"],
reply,
)
self.msg_queues["tx"].put(msg)
# flag message as sent via aprs
server.add_flags(msgid, ["APRS"])
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN])
# check email more often since we just received an email
check_email_delay = 60
# reset clock
past = datetime.datetime.now()
server.logout()
else: else:
from_addr = "noaddr" # We haven't hit the email delay yet.
# LOG.debug("Delta({}) < {}".format(now - past, check_email_delay))
pass
# LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) # Remove ourselves from the global threads list
# if "APRS" not in server.get_flags(msgid)[msgid]: threads.APRSDThreadList().remove(self)
# in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both LOG.info("Exiting")
taglist = [
x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid]
]
if "APRS" not in taglist:
# if msg not flagged as sent via aprs
server.fetch([msgid], ["RFC822"])
(body, from_addr) = parse_email(msgid, data, server)
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN])
if from_addr in shortcuts_inverted:
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " " + body.decode(errors="ignore")
messaging.send_message(CONFIG["ham"]["callsign"], reply)
# flag message as sent via aprs
server.add_flags(msgid, ["APRS"])
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN])
# check email more often since we just received an email
check_email_delay = 60
server.logout()
# end check_email() # end check_email()
def send_email(to_addr, content):
global check_email_delay
LOG.info("Sending Email_________________")
shortcuts = CONFIG["shortcuts"]
if to_addr in shortcuts:
LOG.info("To : " + to_addr)
to_addr = shortcuts[to_addr]
LOG.info(" (" + to_addr + ")")
subject = CONFIG["ham"]["callsign"]
# content = content + "\n\n(NOTE: reply with one line)"
LOG.info("Subject : " + subject)
LOG.info("Body : " + content)
# check email more often since there's activity right now
check_email_delay = 60
msg = MIMEText(content)
msg["Subject"] = subject
msg["From"] = CONFIG["smtp"]["login"]
msg["To"] = to_addr
server = _smtp_connect()
if server:
try:
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
except Exception as e:
msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg)
server.quit()
return -1
server.quit()
return 0
# end send_email

View File

@ -54,7 +54,7 @@ LOG_LEVELS = {
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
server_threads = [] server_event = threading.Event()
# localization, please edit: # localization, please edit:
# HOST = "noam.aprs2.net" # north america tier2 servers round robin # HOST = "noam.aprs2.net" # north america tier2 servers round robin
@ -143,11 +143,13 @@ def install(append, case_insensitive, shell, path):
def signal_handler(signal, frame): def signal_handler(signal, frame):
global event global server_vent
LOG.info("Ctrl+C, Sending all threads exit!") LOG.info(
for th in server_threads: "Ctrl+C, Sending all threads exit! Can take up to 10 seconds to exit all threads"
th.stop() )
threads.APRSDThreadList().stop_all()
server_event.set()
sys.exit(0) # thread ignores this sys.exit(0) # thread ignores this
@ -260,7 +262,6 @@ def send_message(
got_ack = True got_ack = True
else: else:
message = packet.get("message_text", None) message = packet.get("message_text", None)
LOG.info("We got a new message")
fromcall = packet["from"] fromcall = packet["from"]
msg_number = packet.get("msgNo", "0") msg_number = packet.get("msgNo", "0")
messaging.log_message( messaging.log_message(
@ -289,7 +290,6 @@ def send_message(
# message # message
msg = messaging.TextMessage(aprs_login, tocallsign, command) msg = messaging.TextMessage(aprs_login, tocallsign, command)
msg.send_direct() msg.send_direct()
# messaging.send_message_direct(tocallsign, command, message_number)
try: try:
# This will register a packet consumer with aprslib # This will register a packet consumer with aprslib
@ -359,9 +359,6 @@ def server(loglevel, quiet, disable_validation, config_file):
LOG.error("Failed to validate email config options") LOG.error("Failed to validate email config options")
sys.exit(-1) sys.exit(-1)
# start the email thread
email.start_thread()
# Create the initial PM singleton and Register plugins # Create the initial PM singleton and Register plugins
plugin_manager = plugin.PluginManager(config) plugin_manager = plugin.PluginManager(config)
plugin_manager.setup_plugins() plugin_manager.setup_plugins()
@ -376,12 +373,20 @@ def server(loglevel, quiet, disable_validation, config_file):
rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config)
tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config)
# TODO(hemna): add EmailThread email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config)
server_threads.append(rx_thread) email_thread.start()
server_threads.append(tx_thread)
rx_thread.start() rx_thread.start()
tx_thread.start() tx_thread.start()
cntr = 0
while not server_event.is_set():
# to keep the log noise down
if cntr % 6 == 0:
tracker = messaging.MsgTrack()
LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker)))
cntr += 1
time.sleep(10)
LOG.info("APRSD Exiting.") LOG.info("APRSD Exiting.")
sys.exit(0) sys.exit(0)
# setup and run the main blocking loop # setup and run the main blocking loop

View File

@ -1,13 +1,12 @@
import abc import abc
import datetime
import logging import logging
import pprint
import re import re
import threading import threading
import time import time
import uuid
from multiprocessing import RawValue from multiprocessing import RawValue
from aprsd import client from aprsd import client, threads
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -21,6 +20,75 @@ ack_dict = {}
NULL_MESSAGE = -1 NULL_MESSAGE = -1
class MsgTrack(object):
"""Class to keep track of outstanding text messages.
This is a thread safe class that keeps track of active
messages.
When a message is asked to be sent, it is placed into this
class via it's id. The TextMessage class's send() method
automatically adds itself to this class. When the ack is
recieved from the radio, the message object is removed from
this class.
# TODO(hemna)
When aprsd is asked to quit this class should be serialized and
saved to disk/db to keep track of the state of outstanding messages.
When aprsd is started, it should try and fetch the saved state,
and reloaded to a live state.
"""
_instance = None
lock = None
track = {}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(MsgTrack, cls).__new__(cls)
cls._instance.track = {}
cls._instance.lock = threading.Lock()
return cls._instance
def add(self, msg):
with self.lock:
key = int(msg.id)
self.track[key] = msg
def get(self, id):
with self.lock:
if id in self.track:
return self.track[id]
def remove(self, id):
with self.lock:
key = int(id)
if key in self.track.keys():
del self.track[key]
def __len__(self):
with self.lock:
return len(self.track)
def __str__(self):
with self.lock:
result = "{"
for key in self.track.keys():
result += "{}: {}, ".format(key, str(self.track[key]))
result += "}"
return result
def save(self):
"""Save this shit to disk?"""
pass
def restore(self):
"""Restore this shit?"""
pass
class MessageCounter(object): class MessageCounter(object):
""" """
Global message id counter class. Global message id counter class.
@ -34,6 +102,7 @@ class MessageCounter(object):
""" """
_instance = None _instance = None
max_count = 9999
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
"""Make this a singleton class.""" """Make this a singleton class."""
@ -45,7 +114,10 @@ class MessageCounter(object):
def increment(self): def increment(self):
with self.lock: with self.lock:
self.val.value += 1 if self.val.value == self.max_count:
self.val.value = 1
else:
self.val.value += 1
@property @property
def value(self): def value(self):
@ -67,20 +139,13 @@ class Message(object, metaclass=abc.ABCMeta):
# The message id to send over the air # The message id to send over the air
id = 0 id = 0
# Unique identifier for this message
uuid = None
sent = False
sent_time = None
acked = False
acked_time = None
retry_count = 3 retry_count = 3
last_send_time = None
last_send_attempt = 0
def __init__(self, fromcall, tocall, msg_id=None): def __init__(self, fromcall, tocall, msg_id=None):
self.fromcall = fromcall self.fromcall = fromcall
self.tocall = tocall self.tocall = tocall
self.uuid = uuid.uuid4()
if not msg_id: if not msg_id:
c = MessageCounter() c = MessageCounter()
c.increment() c.increment()
@ -98,9 +163,12 @@ class TextMessage(Message):
message = None message = None
def __init__(self, fromcall, tocall, message): def __init__(self, fromcall, tocall, message, msg_id=None, allow_delay=True):
super(TextMessage, self).__init__(fromcall, tocall) super(TextMessage, self).__init__(fromcall, tocall, msg_id)
self.message = message self.message = message
# do we try and save this message for later if we don't get
# an ack? Some messages we don't want to do this ever.
self.allow_delay = allow_delay
def __repr__(self): def __repr__(self):
"""Build raw string to send over the air.""" """Build raw string to send over the air."""
@ -112,14 +180,14 @@ class TextMessage(Message):
) )
def __str__(self): def __str__(self):
return "From({}) TO({}) - Message({}): '{}'".format( delta = "Never"
self.fromcall, self.tocall, self.id, self.message if self.last_send_time:
now = datetime.datetime.now()
delta = now - self.last_send_time
return "{}>{} Msg({})({}): '{}'".format(
self.fromcall, self.tocall, self.id, delta, self.message
) )
def ack(self):
"""Build an Ack Message object from this object."""
return AckMessage(self.fromcall, self.tocall, msg_id=self.id)
def _filter_for_send(self): def _filter_for_send(self):
"""Filter and format message string for FCC.""" """Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74 # max? ftm400 displays 64, raw msg shows 74
@ -130,49 +198,13 @@ class TextMessage(Message):
# We all miss George Carlin # We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def send_thread(self):
cl = client.get_client()
for i in range(self.retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[self.id] != 1:
log_message(
"Sending Message",
repr(self).rstrip("\n"),
self.message,
tocall=self.tocall,
retry_number=i,
)
# tn.write(line)
cl.sendall(repr(self))
# decaying repeats, 31 to 93 second intervals
sleeptime = (self.retry_count - i + 1) * 31
time.sleep(sleeptime)
else:
break
return
# end send_message_thread
def send(self): def send(self):
global ack_dict global ack_dict
# TODO(Hemna) - Need a better metchanism for this. tracker = MsgTrack()
# This can nuke an ack_dict while it's still being used. tracker.add(self)
# FIXME FIXME LOG.debug("Length of MsgTrack is {}".format(len(tracker)))
if len(ack_dict) > 90: thread = SendMessageThread(message=self)
# empty ack dict if it's really big, could result in key error later
LOG.debug(
"DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict)
)
ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict))
LOG.debug(
"DEBUG: Cleared ack dictionary, ack_dict length is now %s."
% len(ack_dict)
)
ack_dict[self.id] = 0 # clear ack for this message number
thread = threading.Thread(target=self.send_thread, name="send_message")
thread.start() thread.start()
def send_direct(self): def send_direct(self):
@ -188,6 +220,73 @@ class TextMessage(Message):
cl.sendall(repr(self)) cl.sendall(repr(self))
class SendMessageThread(threads.APRSDThread):
def __init__(self, message):
self.msg = message
name = self.msg.message[:5]
super(SendMessageThread, self).__init__(
"SendMessage-{}-{}".format(self.msg.id, name)
)
def loop(self):
"""Loop until a message is acked or it gets delayed.
We only sleep for 5 seconds between each loop run, so
that CTRL-C can exit the app in a short period. Each sleep
means the app quitting is blocked until sleep is done.
So we keep track of the last send attempt and only send if the
last send attempt is old enough.
"""
cl = client.get_client()
tracker = MsgTrack()
# lets see if the message is still in the tracking queue
msg = tracker.get(self.msg.id)
if not msg:
# The message has been removed from the tracking queue
# So it got acked and we are done.
LOG.info("Message Send Complete via Ack.")
return False
else:
send_now = False
if msg.last_send_attempt == msg.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Message Send Complete. Max attempts reached.")
return False
# Message is still outstanding and needs to be acked.
if msg.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
sleeptime = (msg.last_send_attempt + 1) * 31
delta = now - msg.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
send_now = True
else:
send_now = True
if send_now:
# no attempt time, so lets send it, and start
# tracking the time.
log_message(
"Sending Message",
repr(msg).rstrip("\n"),
msg.message,
tocall=self.msg.tocall,
retry_number=msg.last_send_attempt,
msg_num=msg.id,
)
cl.sendall(repr(msg))
msg.last_send_time = datetime.datetime.now()
msg.last_send_attempt += 1
time.sleep(5)
# Make sure we get called again.
return True
class AckMessage(Message): class AckMessage(Message):
"""Class for building Acks and sending them.""" """Class for building Acks and sending them."""
@ -222,7 +321,7 @@ class AckMessage(Message):
def send(self): def send(self):
LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id))
thread = threading.Thread(target=self.send_thread, name="send_ack") thread = SendAckThread(self)
thread.start() thread.start()
# end send_ack() # end send_ack()
@ -241,6 +340,52 @@ class AckMessage(Message):
cl.sendall(repr(self)) cl.sendall(repr(self))
class SendAckThread(threads.APRSDThread):
def __init__(self, ack):
self.ack = ack
super(SendAckThread, self).__init__("SendAck-{}".format(self.ack.id))
def loop(self):
"""Separate thread to send acks with retries."""
send_now = False
if self.ack.last_send_attempt == self.ack.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Ack Send Complete. Max attempts reached.")
return False
if self.ack.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
sleeptime = 31
delta = now - self.ack.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
send_now = True
else:
LOG.debug("Still wating. {}".format(delta))
else:
send_now = True
if send_now:
cl = client.get_client()
log_message(
"Sending ack",
repr(self.ack).rstrip("\n"),
None,
ack=self.ack.id,
tocall=self.ack.tocall,
retry_number=self.ack.last_send_attempt,
)
cl.sendall(repr(self.ack))
self.ack.last_send_attempt += 1
self.ack.last_send_time = datetime.datetime.now()
time.sleep(5)
def log_packet(packet): def log_packet(packet):
fromcall = packet.get("from", None) fromcall = packet.get("from", None)
tocall = packet.get("to", None) tocall = packet.get("to", None)
@ -272,6 +417,7 @@ def log_message(
retry_number=None, retry_number=None,
ack=None, ack=None,
packet_type=None, packet_type=None,
uuid=None,
): ):
""" """
@ -315,6 +461,8 @@ def log_message(
if msg_num: if msg_num:
# LOG.info(" Msg number : {}".format(msg_num)) # LOG.info(" Msg number : {}".format(msg_num))
log_list.append(" Msg number : {}".format(msg_num)) log_list.append(" Msg number : {}".format(msg_num))
if uuid:
log_list.append(" UUID : {}".format(uuid))
# LOG.info(" {} _______________ Complete".format(header)) # LOG.info(" {} _______________ Complete".format(header))
log_list.append(" {} _______________ Complete".format(header)) log_list.append(" {} _______________ Complete".format(header))

View File

@ -449,8 +449,16 @@ class EmailPlugin(APRSDPluginBase):
if a is not None: if a is not None:
to_addr = a.group(1) to_addr = a.group(1)
content = a.group(2) content = a.group(2)
email_address = email.get_email_from_shortcut(to_addr)
if not email_address:
reply = "Bad email address"
return reply
# send recipient link to aprs.fi map # send recipient link to aprs.fi map
mapme = False
if content == "mapme": if content == "mapme":
mapme = True
content = "Click for my location: http://aprs.fi/{}".format( content = "Click for my location: http://aprs.fi/{}".format(
self.config["ham"]["callsign"] self.config["ham"]["callsign"]
) )
@ -458,6 +466,8 @@ class EmailPlugin(APRSDPluginBase):
now = time.time() now = time.time()
# see if we sent this msg number recently # see if we sent this msg number recently
if ack in self.email_sent_dict: if ack in self.email_sent_dict:
# BUG(hemna) - when we get a 2 different email command
# with the same ack #, we don't send it.
timedelta = now - self.email_sent_dict[ack] timedelta = now - self.email_sent_dict[ack]
if timedelta < 300: # five minutes if timedelta < 300: # five minutes
too_soon = 1 too_soon = 1
@ -477,7 +487,10 @@ class EmailPlugin(APRSDPluginBase):
) )
self.email_sent_dict.clear() self.email_sent_dict.clear()
self.email_sent_dict[ack] = now self.email_sent_dict[ack] = now
reply = "mapme email sent" if mapme:
reply = "mapme email sent"
else:
reply = "Email sent."
else: else:
LOG.info( LOG.info(
"Email for message number " "Email for message number "

View File

@ -1,3 +1,4 @@
import abc
import logging import logging
import queue import queue
import threading import threading
@ -9,30 +10,69 @@ from aprsd import client, messaging, plugin
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
RX_THREAD = "RX"
TX_THREAD = "TX"
EMAIL_THREAD = "Email"
class APRSDThread(threading.Thread):
def __init__(self, name, msg_queues, config): class APRSDThreadList(object):
"""Singleton class that keeps track of application wide threads."""
_instance = None
threads_list = []
lock = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(APRSDThreadList, cls).__new__(cls)
cls.lock = threading.Lock()
cls.threads_list = []
return cls._instance
def add(self, thread_obj):
with self.lock:
self.threads_list.append(thread_obj)
def remove(self, thread_obj):
with self.lock:
self.threads_list.remove(thread_obj)
def stop_all(self):
"""Iterate over all threads and call stop on them."""
with self.lock:
for th in self.threads_list:
th.stop()
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
def __init__(self, name):
super(APRSDThread, self).__init__(name=name) super(APRSDThread, self).__init__(name=name)
self.msg_queues = msg_queues
self.config = config
self.thread_stop = False self.thread_stop = False
APRSDThreadList().add(self)
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
def run(self): def run(self):
LOG.info("Starting")
while not self.thread_stop: while not self.thread_stop:
self._run() can_loop = self.loop()
if not can_loop:
self.stop()
APRSDThreadList().remove(self)
LOG.info("Exiting")
class APRSDRXThread(APRSDThread): class APRSDRXThread(APRSDThread):
def __init__(self, msg_queues, config): def __init__(self, msg_queues, config):
super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config) super(APRSDRXThread, self).__init__("RX_MSG")
self.thread_stop = False self.msg_queues = msg_queues
self.config = config
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
self.aprs.stop() client.get_client().stop()
def callback(self, packet): def callback(self, packet):
try: try:
@ -41,32 +81,31 @@ class APRSDRXThread(APRSDThread):
except (aprslib.ParseError, aprslib.UnknownFormat): except (aprslib.ParseError, aprslib.UnknownFormat):
pass pass
def run(self): def loop(self):
LOG.info("Starting") aprs_client = client.get_client()
while not self.thread_stop:
aprs_client = client.get_client()
# setup the consumer of messages and block until a messages # setup the consumer of messages and block until a messages
try: try:
# This will register a packet consumer with aprslib # This will register a packet consumer with aprslib
# When new packets come in the consumer will process # When new packets come in the consumer will process
# the packet # the packet
# Do a partial here because the consumer signature doesn't allow # Do a partial here because the consumer signature doesn't allow
# For kwargs to be passed in to the consumer func we declare # For kwargs to be passed in to the consumer func we declare
# and the aprslib developer didn't want to allow a PR to add # and the aprslib developer didn't want to allow a PR to add
# kwargs. :( # kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56 # https://github.com/rossengeorgiev/aprs-python/pull/56
aprs_client.consumer(self.process_packet, raw=False, blocking=False) aprs_client.consumer(self.process_packet, raw=False, blocking=False)
except aprslib.exceptions.ConnectionDrop: except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting") LOG.error("Connection dropped, reconnecting")
time.sleep(5) time.sleep(5)
# Force the deletion of the client object connected to aprs # Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client() # This will cause a reconnect, next time client.get_client()
# is called # is called
client.Client().reset() client.Client().reset()
LOG.info("Exiting ") # Continue to loop
return True
def process_ack_packet(self, packet): def process_ack_packet(self, packet):
ack_num = packet.get("msgNo") ack_num = packet.get("msgNo")
@ -74,7 +113,10 @@ class APRSDRXThread(APRSDThread):
messaging.log_message( messaging.log_message(
"ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"]
) )
messaging.ack_dict.update({int(ack_num): 1}) tracker = messaging.MsgTrack()
tracker.remove(ack_num)
LOG.debug("Length of MsgTrack is {}".format(len(tracker)))
# messaging.ack_dict.update({int(ack_num): 1})
return return
def process_mic_e_packet(self, packet): def process_mic_e_packet(self, packet):
@ -87,12 +129,14 @@ class APRSDRXThread(APRSDThread):
fromcall = packet["from"] fromcall = packet["from"]
message = packet.get("message_text", None) message = packet.get("message_text", None)
msg_id = packet.get("msgNo", None) msg_id = packet.get("msgNo", "0")
if not msg_id:
msg_id = "0"
messaging.log_message( messaging.log_message(
"Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id "Received Message",
packet["raw"],
message,
fromcall=fromcall,
msg_num=msg_id,
) )
found_command = False found_command = False
@ -108,7 +152,6 @@ class APRSDRXThread(APRSDThread):
if reply is not messaging.NULL_MESSAGE: if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply)) LOG.debug("Sending '{}'".format(reply))
# msg = {"fromcall": fromcall, "msg": reply}
msg = messaging.TextMessage( msg = messaging.TextMessage(
self.config["aprs"]["login"], fromcall, reply self.config["aprs"]["login"], fromcall, reply
) )
@ -122,7 +165,6 @@ class APRSDRXThread(APRSDThread):
names.sort() names.sort()
reply = "Usage: {}".format(", ".join(names)) reply = "Usage: {}".format(", ".join(names))
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage( msg = messaging.TextMessage(
self.config["aprs"]["login"], fromcall, reply self.config["aprs"]["login"], fromcall, reply
) )
@ -130,7 +172,6 @@ class APRSDRXThread(APRSDThread):
except Exception as ex: except Exception as ex:
LOG.exception("Plugin failed!!!", ex) LOG.exception("Plugin failed!!!", ex)
reply = "A Plugin failed! try again?" reply = "A Plugin failed! try again?"
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply)
self.msg_queues["tx"].put(msg) self.msg_queues["tx"].put(msg)
@ -139,7 +180,7 @@ class APRSDRXThread(APRSDThread):
ack = messaging.AckMessage( ack = messaging.AckMessage(
self.config["aprs"]["login"], fromcall, msg_id=msg_id self.config["aprs"]["login"], fromcall, msg_id=msg_id
) )
ack.send() self.msg_queues["tx"].put(ack)
LOG.debug("Packet processing complete") LOG.debug("Packet processing complete")
def process_packet(self, packet): def process_packet(self, packet):
@ -172,15 +213,16 @@ class APRSDRXThread(APRSDThread):
class APRSDTXThread(APRSDThread): class APRSDTXThread(APRSDThread):
def __init__(self, msg_queues, config): def __init__(self, msg_queues, config):
super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config) super(APRSDTXThread, self).__init__("TX_MSG")
self.msg_queues = msg_queues
self.config = config
def run(self): def loop(self):
LOG.info("Starting") try:
while not self.thread_stop: msg = self.msg_queues["tx"].get(timeout=0.1)
try: LOG.info("TXQ: got message '{}'".format(msg))
msg = self.msg_queues["tx"].get(timeout=0.1) msg.send()
LOG.info("TXQ: got message '{}'".format(msg)) except queue.Empty:
msg.send() pass
except queue.Empty: # Continue to loop
pass return True
LOG.info("Exiting ")