mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-17 13:51:54 -05:00
commit
40ab7a7a94
@ -1,4 +1,6 @@
|
||||
import logging
|
||||
import select
|
||||
import socket
|
||||
import time
|
||||
|
||||
import aprslib
|
||||
@ -45,7 +47,7 @@ class Client(object):
|
||||
while not connected:
|
||||
try:
|
||||
LOG.info("Creating aprslib client")
|
||||
aprs_client = aprslib.IS(user, passwd=password, host=host, port=port)
|
||||
aprs_client = Aprsdis(user, passwd=password, host=host, port=port)
|
||||
# Force the logging to be the same
|
||||
aprs_client.logger = LOG
|
||||
aprs_client.connect()
|
||||
@ -60,6 +62,63 @@ class Client(object):
|
||||
return aprs_client
|
||||
|
||||
|
||||
class Aprsdis(aprslib.IS):
|
||||
"""Extend the aprslib class so we can exit properly."""
|
||||
|
||||
# flag to tell us to stop
|
||||
thread_stop = False
|
||||
|
||||
# timeout in seconds
|
||||
select_timeout = 10
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
LOG.info("Shutdown Aprsdis client.")
|
||||
|
||||
def _socket_readlines(self, blocking=False):
|
||||
"""
|
||||
Generator for complete lines, received from the server
|
||||
"""
|
||||
try:
|
||||
self.sock.setblocking(0)
|
||||
except socket.error as e:
|
||||
self.logger.error("socket error when setblocking(0): %s" % str(e))
|
||||
raise aprslib.ConnectionDrop("connection dropped")
|
||||
|
||||
while not self.thread_stop:
|
||||
short_buf = b""
|
||||
newline = b"\r\n"
|
||||
|
||||
# set a select timeout, so we get a chance to exit
|
||||
# when user hits CTRL-C
|
||||
readable, writable, exceptional = select.select(
|
||||
[self.sock], [], [], self.select_timeout
|
||||
)
|
||||
if not readable:
|
||||
continue
|
||||
|
||||
try:
|
||||
short_buf = self.sock.recv(4096)
|
||||
|
||||
# sock.recv returns empty if the connection drops
|
||||
if not short_buf:
|
||||
self.logger.error("socket.recv(): returned empty")
|
||||
raise aprslib.ConnectionDrop("connection dropped")
|
||||
except socket.error as e:
|
||||
# self.logger.error("socket error on recv(): %s" % str(e))
|
||||
if "Resource temporarily unavailable" in str(e):
|
||||
if not blocking:
|
||||
if len(self.buf) == 0:
|
||||
break
|
||||
|
||||
self.buf += short_buf
|
||||
|
||||
while newline in self.buf:
|
||||
line, self.buf = self.buf.split(newline, 1)
|
||||
|
||||
yield line
|
||||
|
||||
|
||||
def get_client():
|
||||
cl = Client()
|
||||
return cl.client
|
||||
|
253
aprsd/email.py
253
aprsd/email.py
@ -4,7 +4,6 @@ import imaplib
|
||||
import logging
|
||||
import re
|
||||
import smtplib
|
||||
import threading
|
||||
import time
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
@ -12,7 +11,7 @@ import imapclient
|
||||
import six
|
||||
from validate_email import validate_email
|
||||
|
||||
from aprsd import messaging
|
||||
from aprsd import messaging, threads
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
@ -20,13 +19,6 @@ LOG = logging.getLogger("APRSD")
|
||||
CONFIG = None
|
||||
|
||||
|
||||
def start_thread():
|
||||
checkemailthread = threading.Thread(
|
||||
target=check_email_thread, name="check_email", args=()
|
||||
) # args must be tuple
|
||||
checkemailthread.start()
|
||||
|
||||
|
||||
def _imap_connect():
|
||||
imap_port = CONFIG["imap"].get("port", 143)
|
||||
use_ssl = CONFIG["imap"].get("use_ssl", False)
|
||||
@ -120,6 +112,11 @@ def validate_shortcuts(config):
|
||||
LOG.info("Available shortcuts: {}".format(config["shortcuts"]))
|
||||
|
||||
|
||||
def get_email_from_shortcut(shortcut):
|
||||
if shortcut in CONFIG.get("shortcuts", None):
|
||||
return CONFIG["shortcuts"].get(shortcut, None)
|
||||
|
||||
|
||||
def validate_email_config(config, disable_validation=False):
|
||||
"""function to simply ensure we can connect to email services.
|
||||
|
||||
@ -221,6 +218,45 @@ def parse_email(msgid, data, server):
|
||||
# end parse_email
|
||||
|
||||
|
||||
def send_email(to_addr, content):
|
||||
global check_email_delay
|
||||
|
||||
shortcuts = CONFIG["shortcuts"]
|
||||
email_address = get_email_from_shortcut(to_addr)
|
||||
LOG.info("Sending Email_________________")
|
||||
|
||||
if to_addr in shortcuts:
|
||||
LOG.info("To : " + to_addr)
|
||||
to_addr = email_address
|
||||
LOG.info(" (" + to_addr + ")")
|
||||
subject = CONFIG["ham"]["callsign"]
|
||||
# content = content + "\n\n(NOTE: reply with one line)"
|
||||
LOG.info("Subject : " + subject)
|
||||
LOG.info("Body : " + content)
|
||||
|
||||
# check email more often since there's activity right now
|
||||
check_email_delay = 60
|
||||
|
||||
msg = MIMEText(content)
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = CONFIG["smtp"]["login"]
|
||||
msg["To"] = to_addr
|
||||
server = _smtp_connect()
|
||||
if server:
|
||||
try:
|
||||
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
|
||||
except Exception as e:
|
||||
msg = getattr(e, "message", repr(e))
|
||||
LOG.error("Sendmail Error!!!! '{}'", msg)
|
||||
server.quit()
|
||||
return -1
|
||||
server.quit()
|
||||
return 0
|
||||
|
||||
|
||||
# end send_email
|
||||
|
||||
|
||||
def resend_email(count, fromcall):
|
||||
global check_email_delay
|
||||
date = datetime.datetime.now()
|
||||
@ -257,7 +293,9 @@ def resend_email(count, fromcall):
|
||||
from_addr = shortcuts_inverted[from_addr]
|
||||
# asterisk indicates a resend
|
||||
reply = "-" + from_addr + " * " + body.decode(errors="ignore")
|
||||
messaging.send_message(fromcall, reply)
|
||||
# messaging.send_message(fromcall, reply)
|
||||
msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply)
|
||||
msg.send()
|
||||
msgexists = True
|
||||
|
||||
if msgexists is not True:
|
||||
@ -274,7 +312,9 @@ def resend_email(count, fromcall):
|
||||
str(m).zfill(2),
|
||||
str(s).zfill(2),
|
||||
)
|
||||
messaging.send_message(fromcall, reply)
|
||||
# messaging.send_message(fromcall, reply)
|
||||
msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply)
|
||||
msg.send()
|
||||
|
||||
# check email more often since we're resending one now
|
||||
check_email_delay = 60
|
||||
@ -283,117 +323,108 @@ def resend_email(count, fromcall):
|
||||
# end resend_email()
|
||||
|
||||
|
||||
def check_email_thread():
|
||||
global check_email_delay
|
||||
class APRSDEmailThread(threads.APRSDThread):
|
||||
def __init__(self, msg_queues, config):
|
||||
super(APRSDEmailThread, self).__init__("EmailThread")
|
||||
self.msg_queues = msg_queues
|
||||
self.config = config
|
||||
|
||||
# LOG.debug("FIXME initial email delay is 10 seconds")
|
||||
check_email_delay = 60
|
||||
while True:
|
||||
# LOG.debug("Top of check_email_thread.")
|
||||
def run(self):
|
||||
global check_email_delay
|
||||
|
||||
time.sleep(check_email_delay)
|
||||
check_email_delay = 60
|
||||
past = datetime.datetime.now()
|
||||
while not self.thread_stop:
|
||||
time.sleep(5)
|
||||
# always sleep for 5 seconds and see if we need to check email
|
||||
# This allows CTRL-C to stop the execution of this loop sooner
|
||||
# than check_email_delay time
|
||||
now = datetime.datetime.now()
|
||||
if now - past > datetime.timedelta(seconds=check_email_delay):
|
||||
# It's time to check email
|
||||
|
||||
# slowly increase delay every iteration, max out at 300 seconds
|
||||
# any send/receive/resend activity will reset this to 60 seconds
|
||||
if check_email_delay < 300:
|
||||
check_email_delay += 1
|
||||
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
|
||||
# slowly increase delay every iteration, max out at 300 seconds
|
||||
# any send/receive/resend activity will reset this to 60 seconds
|
||||
if check_email_delay < 300:
|
||||
check_email_delay += 1
|
||||
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
|
||||
|
||||
shortcuts = CONFIG["shortcuts"]
|
||||
# swap key/value
|
||||
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
|
||||
shortcuts = CONFIG["shortcuts"]
|
||||
# swap key/value
|
||||
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
|
||||
|
||||
date = datetime.datetime.now()
|
||||
month = date.strftime("%B")[:3] # Nov, Mar, Apr
|
||||
day = date.day
|
||||
year = date.year
|
||||
today = "%s-%s-%s" % (day, month, year)
|
||||
date = datetime.datetime.now()
|
||||
month = date.strftime("%B")[:3] # Nov, Mar, Apr
|
||||
day = date.day
|
||||
year = date.year
|
||||
today = "%s-%s-%s" % (day, month, year)
|
||||
|
||||
server = None
|
||||
try:
|
||||
server = _imap_connect()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to get IMAP server Can't check email.", e)
|
||||
server = None
|
||||
try:
|
||||
server = _imap_connect()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to get IMAP server Can't check email.", e)
|
||||
|
||||
if not server:
|
||||
continue
|
||||
if not server:
|
||||
continue
|
||||
|
||||
messages = server.search(["SINCE", today])
|
||||
# LOG.debug("{} messages received today".format(len(messages)))
|
||||
messages = server.search(["SINCE", today])
|
||||
# LOG.debug("{} messages received today".format(len(messages)))
|
||||
|
||||
for msgid, data in server.fetch(messages, ["ENVELOPE"]).items():
|
||||
envelope = data[b"ENVELOPE"]
|
||||
# LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date))
|
||||
f = re.search(
|
||||
r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0])
|
||||
)
|
||||
if f is not None:
|
||||
from_addr = f.group(1)
|
||||
for msgid, data in server.fetch(messages, ["ENVELOPE"]).items():
|
||||
envelope = data[b"ENVELOPE"]
|
||||
# LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date))
|
||||
f = re.search(
|
||||
r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0])
|
||||
)
|
||||
if f is not None:
|
||||
from_addr = f.group(1)
|
||||
else:
|
||||
from_addr = "noaddr"
|
||||
|
||||
# LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid]))
|
||||
# if "APRS" not in server.get_flags(msgid)[msgid]:
|
||||
# in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both
|
||||
taglist = [
|
||||
x.decode(errors="ignore")
|
||||
for x in server.get_flags(msgid)[msgid]
|
||||
]
|
||||
if "APRS" not in taglist:
|
||||
# if msg not flagged as sent via aprs
|
||||
server.fetch([msgid], ["RFC822"])
|
||||
(body, from_addr) = parse_email(msgid, data, server)
|
||||
# unset seen flag, will stay bold in email client
|
||||
server.remove_flags(msgid, [imapclient.SEEN])
|
||||
|
||||
if from_addr in shortcuts_inverted:
|
||||
# reverse lookup of a shortcut
|
||||
from_addr = shortcuts_inverted[from_addr]
|
||||
|
||||
reply = "-" + from_addr + " " + body.decode(errors="ignore")
|
||||
# messaging.send_message(CONFIG["ham"]["callsign"], reply)
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprs"]["login"],
|
||||
self.config["ham"]["callsign"],
|
||||
reply,
|
||||
)
|
||||
self.msg_queues["tx"].put(msg)
|
||||
# flag message as sent via aprs
|
||||
server.add_flags(msgid, ["APRS"])
|
||||
# unset seen flag, will stay bold in email client
|
||||
server.remove_flags(msgid, [imapclient.SEEN])
|
||||
# check email more often since we just received an email
|
||||
check_email_delay = 60
|
||||
# reset clock
|
||||
past = datetime.datetime.now()
|
||||
server.logout()
|
||||
else:
|
||||
from_addr = "noaddr"
|
||||
# We haven't hit the email delay yet.
|
||||
# LOG.debug("Delta({}) < {}".format(now - past, check_email_delay))
|
||||
pass
|
||||
|
||||
# LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid]))
|
||||
# if "APRS" not in server.get_flags(msgid)[msgid]:
|
||||
# in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both
|
||||
taglist = [
|
||||
x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid]
|
||||
]
|
||||
if "APRS" not in taglist:
|
||||
# if msg not flagged as sent via aprs
|
||||
server.fetch([msgid], ["RFC822"])
|
||||
(body, from_addr) = parse_email(msgid, data, server)
|
||||
# unset seen flag, will stay bold in email client
|
||||
server.remove_flags(msgid, [imapclient.SEEN])
|
||||
|
||||
if from_addr in shortcuts_inverted:
|
||||
# reverse lookup of a shortcut
|
||||
from_addr = shortcuts_inverted[from_addr]
|
||||
|
||||
reply = "-" + from_addr + " " + body.decode(errors="ignore")
|
||||
messaging.send_message(CONFIG["ham"]["callsign"], reply)
|
||||
# flag message as sent via aprs
|
||||
server.add_flags(msgid, ["APRS"])
|
||||
# unset seen flag, will stay bold in email client
|
||||
server.remove_flags(msgid, [imapclient.SEEN])
|
||||
# check email more often since we just received an email
|
||||
check_email_delay = 60
|
||||
|
||||
server.logout()
|
||||
# Remove ourselves from the global threads list
|
||||
threads.APRSDThreadList().remove(self)
|
||||
LOG.info("Exiting")
|
||||
|
||||
|
||||
# end check_email()
|
||||
|
||||
|
||||
def send_email(to_addr, content):
|
||||
global check_email_delay
|
||||
|
||||
LOG.info("Sending Email_________________")
|
||||
shortcuts = CONFIG["shortcuts"]
|
||||
if to_addr in shortcuts:
|
||||
LOG.info("To : " + to_addr)
|
||||
to_addr = shortcuts[to_addr]
|
||||
LOG.info(" (" + to_addr + ")")
|
||||
subject = CONFIG["ham"]["callsign"]
|
||||
# content = content + "\n\n(NOTE: reply with one line)"
|
||||
LOG.info("Subject : " + subject)
|
||||
LOG.info("Body : " + content)
|
||||
|
||||
# check email more often since there's activity right now
|
||||
check_email_delay = 60
|
||||
|
||||
msg = MIMEText(content)
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = CONFIG["smtp"]["login"]
|
||||
msg["To"] = to_addr
|
||||
server = _smtp_connect()
|
||||
if server:
|
||||
try:
|
||||
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
|
||||
except Exception as e:
|
||||
msg = getattr(e, "message", repr(e))
|
||||
LOG.error("Sendmail Error!!!! '{}'", msg)
|
||||
server.quit()
|
||||
return -1
|
||||
server.quit()
|
||||
return 0
|
||||
# end send_email
|
||||
|
205
aprsd/main.py
205
aprsd/main.py
@ -23,9 +23,10 @@
|
||||
# python included libs
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import queue
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from logging import NullHandler
|
||||
from logging.handlers import RotatingFileHandler
|
||||
@ -37,7 +38,7 @@ import yaml
|
||||
|
||||
# local imports here
|
||||
import aprsd
|
||||
from aprsd import client, email, messaging, plugin, utils
|
||||
from aprsd import client, email, messaging, plugin, threads, utils
|
||||
|
||||
# setup the global logger
|
||||
# logging.basicConfig(level=logging.DEBUG) # level=10
|
||||
@ -53,6 +54,8 @@ LOG_LEVELS = {
|
||||
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
server_event = threading.Event()
|
||||
|
||||
# localization, please edit:
|
||||
# HOST = "noam.aprs2.net" # north america tier2 servers round robin
|
||||
# USER = "KM6XXX-9" # callsign of this aprs client with SSID
|
||||
@ -140,9 +143,13 @@ def install(append, case_insensitive, shell, path):
|
||||
|
||||
|
||||
def signal_handler(signal, frame):
|
||||
LOG.info("Ctrl+C, exiting.")
|
||||
# sys.exit(0) # thread ignores this
|
||||
os._exit(0)
|
||||
global server_vent
|
||||
|
||||
LOG.info(
|
||||
"Ctrl+C, Sending all threads exit! Can take up to 10 seconds to exit all threads"
|
||||
)
|
||||
threads.APRSDThreadList().stop_all()
|
||||
server_event.set()
|
||||
|
||||
|
||||
# end signal_handler
|
||||
@ -172,99 +179,6 @@ def setup_logging(config, loglevel, quiet):
|
||||
LOG.addHandler(sh)
|
||||
|
||||
|
||||
def process_ack_packet(packet):
|
||||
ack_num = packet.get("msgNo")
|
||||
LOG.info("Got ack for message {}".format(ack_num))
|
||||
messaging.log_message(
|
||||
"ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"]
|
||||
)
|
||||
messaging.ack_dict.update({int(ack_num): 1})
|
||||
return
|
||||
|
||||
|
||||
def process_mic_e_packet(packet):
|
||||
LOG.info("Mic-E Packet detected. Currenlty unsupported.")
|
||||
messaging.log_packet(packet)
|
||||
return
|
||||
|
||||
|
||||
def process_message_packet(packet):
|
||||
LOG.info("Got a message packet")
|
||||
fromcall = packet["from"]
|
||||
message = packet.get("message_text", None)
|
||||
|
||||
msg_number = packet.get("msgNo", None)
|
||||
if msg_number:
|
||||
ack = msg_number
|
||||
else:
|
||||
ack = "0"
|
||||
|
||||
messaging.log_message(
|
||||
"Received Message", packet["raw"], message, fromcall=fromcall, ack=ack
|
||||
)
|
||||
|
||||
found_command = False
|
||||
# Get singleton of the PM
|
||||
pm = plugin.PluginManager()
|
||||
try:
|
||||
results = pm.run(fromcall=fromcall, message=message, ack=ack)
|
||||
for reply in results:
|
||||
found_command = True
|
||||
# A plugin can return a null message flag which signals
|
||||
# us that they processed the message correctly, but have
|
||||
# nothing to reply with, so we avoid replying with a usage string
|
||||
if reply is not messaging.NULL_MESSAGE:
|
||||
LOG.debug("Sending '{}'".format(reply))
|
||||
messaging.send_message(fromcall, reply)
|
||||
else:
|
||||
LOG.debug("Got NULL MESSAGE from plugin")
|
||||
|
||||
if not found_command:
|
||||
plugins = pm.get_plugins()
|
||||
names = [x.command_name for x in plugins]
|
||||
names.sort()
|
||||
|
||||
reply = "Usage: {}".format(", ".join(names))
|
||||
messaging.send_message(fromcall, reply)
|
||||
except Exception as ex:
|
||||
LOG.exception("Plugin failed!!!", ex)
|
||||
reply = "A Plugin failed! try again?"
|
||||
messaging.send_message(fromcall, reply)
|
||||
|
||||
# let any threads do their thing, then ack
|
||||
# send an ack last
|
||||
messaging.send_ack(fromcall, ack)
|
||||
LOG.debug("Packet processing complete")
|
||||
|
||||
|
||||
def process_packet(packet):
|
||||
"""Process a packet recieved from aprs-is server."""
|
||||
|
||||
LOG.debug("Process packet!")
|
||||
try:
|
||||
LOG.debug("Got message: {}".format(packet))
|
||||
|
||||
msg = packet.get("message_text", None)
|
||||
msg_format = packet.get("format", None)
|
||||
msg_response = packet.get("response", None)
|
||||
if msg_format == "message" and msg:
|
||||
# we want to send the message through the
|
||||
# plugins
|
||||
process_message_packet(packet)
|
||||
return
|
||||
elif msg_response == "ack":
|
||||
process_ack_packet(packet)
|
||||
return
|
||||
|
||||
if msg_format == "mic-e":
|
||||
# process a mic-e packet
|
||||
process_mic_e_packet(packet)
|
||||
return
|
||||
|
||||
except (aprslib.ParseError, aprslib.UnknownFormat) as exp:
|
||||
LOG.exception("Failed to parse packet from aprs-is", exp)
|
||||
|
||||
|
||||
@main.command()
|
||||
def sample_config():
|
||||
"""This dumps the config to stdout."""
|
||||
@ -329,7 +243,6 @@ def send_message(
|
||||
|
||||
setup_logging(config, loglevel, quiet)
|
||||
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
|
||||
message_number = random.randint(1, 90)
|
||||
if type(command) is tuple:
|
||||
command = " ".join(command)
|
||||
LOG.info("Sending Command '{}'".format(command))
|
||||
@ -348,19 +261,21 @@ def send_message(
|
||||
got_ack = True
|
||||
else:
|
||||
message = packet.get("message_text", None)
|
||||
LOG.info("We got a new message")
|
||||
fromcall = packet["from"]
|
||||
msg_number = packet.get("msgNo", None)
|
||||
if msg_number:
|
||||
ack = msg_number
|
||||
else:
|
||||
ack = "0"
|
||||
msg_number = packet.get("msgNo", "0")
|
||||
messaging.log_message(
|
||||
"Received Message", packet["raw"], message, fromcall=fromcall, ack=ack
|
||||
"Received Message",
|
||||
packet["raw"],
|
||||
message,
|
||||
fromcall=fromcall,
|
||||
ack=msg_number,
|
||||
)
|
||||
got_response = True
|
||||
# Send the ack back?
|
||||
messaging.send_ack_direct(fromcall, ack)
|
||||
ack = messaging.AckMessage(
|
||||
config["aprs"]["login"], fromcall, msg_id=msg_number
|
||||
)
|
||||
ack.send_direct()
|
||||
|
||||
if got_ack and got_response:
|
||||
sys.exit(0)
|
||||
@ -372,7 +287,8 @@ def send_message(
|
||||
# We should get an ack back as well as a new message
|
||||
# we should bail after we get the ack and send an ack back for the
|
||||
# message
|
||||
messaging.send_message_direct(tocallsign, command, message_number)
|
||||
msg = messaging.TextMessage(aprs_login, tocallsign, command)
|
||||
msg.send_direct()
|
||||
|
||||
try:
|
||||
# This will register a packet consumer with aprslib
|
||||
@ -416,9 +332,20 @@ def send_message(
|
||||
default=utils.DEFAULT_CONFIG_FILE,
|
||||
help="The aprsd config file to use for options.",
|
||||
)
|
||||
def server(loglevel, quiet, disable_validation, config_file):
|
||||
@click.option(
|
||||
"-f",
|
||||
"--flush",
|
||||
"flush",
|
||||
is_flag=True,
|
||||
show_default=True,
|
||||
default=False,
|
||||
help="Flush out all old aged messages on disk.",
|
||||
)
|
||||
def server(loglevel, quiet, disable_validation, config_file, flush):
|
||||
"""Start the aprsd server process."""
|
||||
global event
|
||||
|
||||
event = threading.Event()
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
click.echo("Load config")
|
||||
@ -429,7 +356,6 @@ def server(loglevel, quiet, disable_validation, config_file):
|
||||
# Accept the config as a constructor param, instead of this
|
||||
# hacky global setting
|
||||
email.CONFIG = config
|
||||
messaging.CONFIG = config
|
||||
|
||||
setup_logging(config, loglevel, quiet)
|
||||
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
|
||||
@ -441,32 +367,49 @@ def server(loglevel, quiet, disable_validation, config_file):
|
||||
LOG.error("Failed to validate email config options")
|
||||
sys.exit(-1)
|
||||
|
||||
# start the email thread
|
||||
email.start_thread()
|
||||
|
||||
# Create the initial PM singleton and Register plugins
|
||||
plugin_manager = plugin.PluginManager(config)
|
||||
plugin_manager.setup_plugins()
|
||||
cl = client.Client(config)
|
||||
client.Client(config)
|
||||
|
||||
# setup and run the main blocking loop
|
||||
while True:
|
||||
# Now use the helper which uses the singleton
|
||||
aprs_client = client.get_client()
|
||||
# Now load the msgTrack from disk if any
|
||||
if flush:
|
||||
LOG.debug("Deleting saved MsgTrack.")
|
||||
messaging.MsgTrack().flush()
|
||||
else:
|
||||
# Try and load saved MsgTrack list
|
||||
LOG.debug("Loading saved MsgTrack object.")
|
||||
messaging.MsgTrack().load()
|
||||
|
||||
# setup the consumer of messages and block until a messages
|
||||
try:
|
||||
# This will register a packet consumer with aprslib
|
||||
# When new packets come in the consumer will process
|
||||
# the packet
|
||||
aprs_client.consumer(process_packet, raw=False)
|
||||
except aprslib.exceptions.ConnectionDrop:
|
||||
LOG.error("Connection dropped, reconnecting")
|
||||
time.sleep(5)
|
||||
# Force the deletion of the client object connected to aprs
|
||||
# This will cause a reconnect, next time client.get_client()
|
||||
# is called
|
||||
cl.reset()
|
||||
rx_msg_queue = queue.Queue(maxsize=20)
|
||||
tx_msg_queue = queue.Queue(maxsize=20)
|
||||
msg_queues = {
|
||||
"rx": rx_msg_queue,
|
||||
"tx": tx_msg_queue,
|
||||
}
|
||||
|
||||
rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config)
|
||||
tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config)
|
||||
email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config)
|
||||
email_thread.start()
|
||||
rx_thread.start()
|
||||
tx_thread.start()
|
||||
|
||||
messaging.MsgTrack().restart()
|
||||
|
||||
cntr = 0
|
||||
while not server_event.is_set():
|
||||
# to keep the log noise down
|
||||
if cntr % 6 == 0:
|
||||
tracker = messaging.MsgTrack()
|
||||
LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker)))
|
||||
cntr += 1
|
||||
time.sleep(10)
|
||||
|
||||
# If there are items in the msgTracker, then save them
|
||||
tracker = messaging.MsgTrack()
|
||||
tracker.save()
|
||||
LOG.info("APRSD Exiting.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -1,20 +1,21 @@
|
||||
import abc
|
||||
import datetime
|
||||
import logging
|
||||
import pprint
|
||||
import os
|
||||
import pathlib
|
||||
import pickle
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from multiprocessing import RawValue
|
||||
|
||||
from aprsd import client
|
||||
from aprsd import client, threads, utils
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
CONFIG = None
|
||||
|
||||
# current aprs radio message number, increments for each message we
|
||||
# send over rf {int}
|
||||
message_number = 0
|
||||
|
||||
# message_nubmer:ack combos so we stop sending a message after an
|
||||
# ack from radio {int:int}
|
||||
# FIXME
|
||||
ack_dict = {}
|
||||
|
||||
# What to return from a plugin if we have processed the message
|
||||
@ -22,140 +23,411 @@ ack_dict = {}
|
||||
NULL_MESSAGE = -1
|
||||
|
||||
|
||||
def send_ack_thread(tocall, ack, retry_count):
|
||||
cl = client.get_client()
|
||||
tocall = tocall.ljust(9) # pad to nine chars
|
||||
line = "{}>APRS::{}:ack{}\n".format(CONFIG["aprs"]["login"], tocall, ack)
|
||||
for i in range(retry_count, 0, -1):
|
||||
log_message(
|
||||
"Sending ack",
|
||||
line.rstrip("\n"),
|
||||
None,
|
||||
ack=ack,
|
||||
tocall=tocall,
|
||||
retry_number=i,
|
||||
)
|
||||
cl.sendall(line)
|
||||
# aprs duplicate detection is 30 secs?
|
||||
# (21 only sends first, 28 skips middle)
|
||||
time.sleep(31)
|
||||
# end_send_ack_thread
|
||||
class MsgTrack(object):
|
||||
"""Class to keep track of outstanding text messages.
|
||||
|
||||
This is a thread safe class that keeps track of active
|
||||
messages.
|
||||
|
||||
When a message is asked to be sent, it is placed into this
|
||||
class via it's id. The TextMessage class's send() method
|
||||
automatically adds itself to this class. When the ack is
|
||||
recieved from the radio, the message object is removed from
|
||||
this class.
|
||||
|
||||
# TODO(hemna)
|
||||
When aprsd is asked to quit this class should be serialized and
|
||||
saved to disk/db to keep track of the state of outstanding messages.
|
||||
When aprsd is started, it should try and fetch the saved state,
|
||||
and reloaded to a live state.
|
||||
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
lock = None
|
||||
|
||||
track = {}
|
||||
total_messages_tracked = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MsgTrack, cls).__new__(cls)
|
||||
cls._instance.track = {}
|
||||
cls._instance.lock = threading.Lock()
|
||||
return cls._instance
|
||||
|
||||
def add(self, msg):
|
||||
with self.lock:
|
||||
key = int(msg.id)
|
||||
self.track[key] = msg
|
||||
self.total_messages_tracked += 1
|
||||
|
||||
def get(self, id):
|
||||
with self.lock:
|
||||
if id in self.track:
|
||||
return self.track[id]
|
||||
|
||||
def remove(self, id):
|
||||
with self.lock:
|
||||
key = int(id)
|
||||
if key in self.track.keys():
|
||||
del self.track[key]
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
return len(self.track)
|
||||
|
||||
def __str__(self):
|
||||
with self.lock:
|
||||
result = "{"
|
||||
for key in self.track.keys():
|
||||
result += "{}: {}, ".format(key, str(self.track[key]))
|
||||
result += "}"
|
||||
return result
|
||||
|
||||
def save(self):
|
||||
"""Save this shit to disk?"""
|
||||
if len(self) > 0:
|
||||
LOG.info("Saving {} tracking messages to disk".format(len(self)))
|
||||
pickle.dump(self.dump(), open(utils.DEFAULT_SAVE_FILE, "wb+"))
|
||||
else:
|
||||
self.flush()
|
||||
|
||||
def dump(self):
|
||||
dump = {}
|
||||
with self.lock:
|
||||
for key in self.track.keys():
|
||||
dump[key] = self.track[key]
|
||||
|
||||
return dump
|
||||
|
||||
def load(self):
|
||||
if os.path.exists(utils.DEFAULT_SAVE_FILE):
|
||||
raw = pickle.load(open(utils.DEFAULT_SAVE_FILE, "rb"))
|
||||
if raw:
|
||||
self.track = raw
|
||||
LOG.debug("Loaded MsgTrack dict from disk.")
|
||||
LOG.debug(self)
|
||||
|
||||
def restart(self):
|
||||
"""Walk the list of messages and restart them if any."""
|
||||
|
||||
for key in self.track.keys():
|
||||
msg = self.track[key]
|
||||
if msg.last_send_attempt < msg.retry_count:
|
||||
msg.send()
|
||||
|
||||
def restart_delayed(self):
|
||||
"""Walk the list of delayed messages and restart them if any."""
|
||||
for key in self.track.keys():
|
||||
msg = self.track[key]
|
||||
if msg.last_send_attempt == msg.retry_count:
|
||||
msg.last_send_attempt = 0
|
||||
msg.send()
|
||||
|
||||
def flush(self):
|
||||
"""Nuke the old pickle file that stored the old results from last aprsd run."""
|
||||
if os.path.exists(utils.DEFAULT_SAVE_FILE):
|
||||
pathlib.Path(utils.DEFAULT_SAVE_FILE).unlink()
|
||||
with self.lock:
|
||||
self.track = {}
|
||||
|
||||
|
||||
def send_ack(tocall, ack):
|
||||
LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack))
|
||||
class MessageCounter(object):
|
||||
"""
|
||||
Global message id counter class.
|
||||
|
||||
This is a singleton based class that keeps
|
||||
an incrementing counter for all messages to
|
||||
be sent. All new Message objects gets a new
|
||||
message id, which is the next number available
|
||||
from the MessageCounter.
|
||||
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
max_count = 9999
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""Make this a singleton class."""
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MessageCounter, cls).__new__(cls)
|
||||
cls._instance.val = RawValue("i", 1)
|
||||
cls._instance.lock = threading.Lock()
|
||||
return cls._instance
|
||||
|
||||
def increment(self):
|
||||
with self.lock:
|
||||
if self.val.value == self.max_count:
|
||||
self.val.value = 1
|
||||
else:
|
||||
self.val.value += 1
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
with self.lock:
|
||||
return self.val.value
|
||||
|
||||
def __repr__(self):
|
||||
with self.lock:
|
||||
return str(self.val.value)
|
||||
|
||||
def __str__(self):
|
||||
with self.lock:
|
||||
return str(self.val.value)
|
||||
|
||||
|
||||
class Message(object, metaclass=abc.ABCMeta):
|
||||
"""Base Message Class."""
|
||||
|
||||
# The message id to send over the air
|
||||
id = 0
|
||||
|
||||
retry_count = 3
|
||||
thread = threading.Thread(
|
||||
target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count)
|
||||
)
|
||||
thread.start()
|
||||
# end send_ack()
|
||||
last_send_time = None
|
||||
last_send_attempt = 0
|
||||
|
||||
def __init__(self, fromcall, tocall, msg_id=None):
|
||||
self.fromcall = fromcall
|
||||
self.tocall = tocall
|
||||
if not msg_id:
|
||||
c = MessageCounter()
|
||||
c.increment()
|
||||
msg_id = c.value
|
||||
self.id = msg_id
|
||||
|
||||
@abc.abstractmethod
|
||||
def send(self):
|
||||
"""Child class must declare."""
|
||||
pass
|
||||
|
||||
|
||||
def send_ack_direct(tocall, ack):
|
||||
"""Send an ack message without a separate thread."""
|
||||
LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack))
|
||||
cl = client.get_client()
|
||||
fromcall = CONFIG["aprs"]["login"]
|
||||
line = "{}>APRS::{}:ack{}\n".format(fromcall, tocall, ack)
|
||||
log_message(
|
||||
"Sending ack",
|
||||
line.rstrip("\n"),
|
||||
None,
|
||||
ack=ack,
|
||||
tocall=tocall,
|
||||
fromcall=fromcall,
|
||||
)
|
||||
cl.sendall(line)
|
||||
class TextMessage(Message):
|
||||
"""Send regular ARPS text/command messages/replies."""
|
||||
|
||||
message = None
|
||||
|
||||
def __init__(self, fromcall, tocall, message, msg_id=None, allow_delay=True):
|
||||
super(TextMessage, self).__init__(fromcall, tocall, msg_id)
|
||||
self.message = message
|
||||
# do we try and save this message for later if we don't get
|
||||
# an ack? Some messages we don't want to do this ever.
|
||||
self.allow_delay = allow_delay
|
||||
|
||||
def __repr__(self):
|
||||
"""Build raw string to send over the air."""
|
||||
return "{}>APRS::{}:{}{{{}\n".format(
|
||||
self.fromcall,
|
||||
self.tocall.ljust(9),
|
||||
self._filter_for_send(),
|
||||
str(self.id),
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
delta = "Never"
|
||||
if self.last_send_time:
|
||||
now = datetime.datetime.now()
|
||||
delta = now - self.last_send_time
|
||||
return "{}>{} Msg({})({}): '{}'".format(
|
||||
self.fromcall, self.tocall, self.id, delta, self.message
|
||||
)
|
||||
|
||||
def _filter_for_send(self):
|
||||
"""Filter and format message string for FCC."""
|
||||
# max? ftm400 displays 64, raw msg shows 74
|
||||
# and ftm400-send is max 64. setting this to
|
||||
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
||||
# feature req: break long ones into two msgs
|
||||
message = self.message[:67]
|
||||
# We all miss George Carlin
|
||||
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
||||
|
||||
def send(self):
|
||||
global ack_dict
|
||||
|
||||
tracker = MsgTrack()
|
||||
tracker.add(self)
|
||||
LOG.debug("Length of MsgTrack is {}".format(len(tracker)))
|
||||
thread = SendMessageThread(message=self)
|
||||
thread.start()
|
||||
|
||||
def send_direct(self):
|
||||
"""Send a message without a separate thread."""
|
||||
cl = client.get_client()
|
||||
log_message(
|
||||
"Sending Message Direct",
|
||||
repr(self).rstrip("\n"),
|
||||
self.message,
|
||||
tocall=self.tocall,
|
||||
fromcall=self.fromcall,
|
||||
)
|
||||
cl.sendall(repr(self))
|
||||
|
||||
|
||||
def send_message_thread(tocall, message, this_message_number, retry_count):
|
||||
cl = client.get_client()
|
||||
line = "{}>APRS::{}:{}{{{}\n".format(
|
||||
CONFIG["aprs"]["login"],
|
||||
tocall,
|
||||
message,
|
||||
str(this_message_number),
|
||||
)
|
||||
for i in range(retry_count, 0, -1):
|
||||
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
|
||||
LOG.debug(pprint.pformat(ack_dict))
|
||||
if ack_dict[this_message_number] != 1:
|
||||
class SendMessageThread(threads.APRSDThread):
|
||||
def __init__(self, message):
|
||||
self.msg = message
|
||||
name = self.msg.message[:5]
|
||||
super(SendMessageThread, self).__init__(
|
||||
"SendMessage-{}-{}".format(self.msg.id, name)
|
||||
)
|
||||
|
||||
def loop(self):
|
||||
"""Loop until a message is acked or it gets delayed.
|
||||
|
||||
We only sleep for 5 seconds between each loop run, so
|
||||
that CTRL-C can exit the app in a short period. Each sleep
|
||||
means the app quitting is blocked until sleep is done.
|
||||
So we keep track of the last send attempt and only send if the
|
||||
last send attempt is old enough.
|
||||
|
||||
"""
|
||||
cl = client.get_client()
|
||||
tracker = MsgTrack()
|
||||
# lets see if the message is still in the tracking queue
|
||||
msg = tracker.get(self.msg.id)
|
||||
if not msg:
|
||||
# The message has been removed from the tracking queue
|
||||
# So it got acked and we are done.
|
||||
LOG.info("Message Send Complete via Ack.")
|
||||
return False
|
||||
else:
|
||||
send_now = False
|
||||
if msg.last_send_attempt == msg.retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Message Send Complete. Max attempts reached.")
|
||||
return False
|
||||
|
||||
# Message is still outstanding and needs to be acked.
|
||||
if msg.last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
sleeptime = (msg.last_send_attempt + 1) * 31
|
||||
delta = now - msg.last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
# no attempt time, so lets send it, and start
|
||||
# tracking the time.
|
||||
log_message(
|
||||
"Sending Message",
|
||||
repr(msg).rstrip("\n"),
|
||||
msg.message,
|
||||
tocall=self.msg.tocall,
|
||||
retry_number=msg.last_send_attempt,
|
||||
msg_num=msg.id,
|
||||
)
|
||||
cl.sendall(repr(msg))
|
||||
msg.last_send_time = datetime.datetime.now()
|
||||
msg.last_send_attempt += 1
|
||||
|
||||
time.sleep(5)
|
||||
# Make sure we get called again.
|
||||
return True
|
||||
|
||||
|
||||
class AckMessage(Message):
|
||||
"""Class for building Acks and sending them."""
|
||||
|
||||
def __init__(self, fromcall, tocall, msg_id):
|
||||
super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id)
|
||||
|
||||
def __repr__(self):
|
||||
return "{}>APRS::{}:ack{}\n".format(
|
||||
self.fromcall, self.tocall.ljust(9), self.id
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return "From({}) TO({}) Ack ({})".format(self.fromcall, self.tocall, self.id)
|
||||
|
||||
def send_thread(self):
|
||||
"""Separate thread to send acks with retries."""
|
||||
cl = client.get_client()
|
||||
for i in range(self.retry_count, 0, -1):
|
||||
log_message(
|
||||
"Sending Message",
|
||||
line.rstrip("\n"),
|
||||
message,
|
||||
tocall=tocall,
|
||||
"Sending ack",
|
||||
repr(self).rstrip("\n"),
|
||||
None,
|
||||
ack=self.id,
|
||||
tocall=self.tocall,
|
||||
retry_number=i,
|
||||
)
|
||||
# tn.write(line)
|
||||
cl.sendall(line)
|
||||
# decaying repeats, 31 to 93 second intervals
|
||||
sleeptime = (retry_count - i + 1) * 31
|
||||
time.sleep(sleeptime)
|
||||
cl.sendall(repr(self))
|
||||
# aprs duplicate detection is 30 secs?
|
||||
# (21 only sends first, 28 skips middle)
|
||||
time.sleep(31)
|
||||
# end_send_ack_thread
|
||||
|
||||
def send(self):
|
||||
LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id))
|
||||
thread = SendAckThread(self)
|
||||
thread.start()
|
||||
|
||||
# end send_ack()
|
||||
|
||||
def send_direct(self):
|
||||
"""Send an ack message without a separate thread."""
|
||||
cl = client.get_client()
|
||||
log_message(
|
||||
"Sending ack",
|
||||
repr(self).rstrip("\n"),
|
||||
None,
|
||||
ack=self.id,
|
||||
tocall=self.tocall,
|
||||
fromcall=self.fromcall,
|
||||
)
|
||||
cl.sendall(repr(self))
|
||||
|
||||
|
||||
class SendAckThread(threads.APRSDThread):
|
||||
def __init__(self, ack):
|
||||
self.ack = ack
|
||||
super(SendAckThread, self).__init__("SendAck-{}".format(self.ack.id))
|
||||
|
||||
def loop(self):
|
||||
"""Separate thread to send acks with retries."""
|
||||
send_now = False
|
||||
if self.ack.last_send_attempt == self.ack.retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Ack Send Complete. Max attempts reached.")
|
||||
return False
|
||||
|
||||
if self.ack.last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
|
||||
# aprs duplicate detection is 30 secs?
|
||||
# (21 only sends first, 28 skips middle)
|
||||
sleeptime = 31
|
||||
delta = now - self.ack.last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
else:
|
||||
LOG.debug("Still wating. {}".format(delta))
|
||||
else:
|
||||
break
|
||||
return
|
||||
# end send_message_thread
|
||||
send_now = True
|
||||
|
||||
|
||||
def send_message(tocall, message):
|
||||
global message_number
|
||||
global ack_dict
|
||||
|
||||
retry_count = 3
|
||||
if message_number > 98: # global
|
||||
message_number = 0
|
||||
message_number += 1
|
||||
if len(ack_dict) > 90:
|
||||
# empty ack dict if it's really big, could result in key error later
|
||||
LOG.debug(
|
||||
"DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict)
|
||||
)
|
||||
ack_dict.clear()
|
||||
LOG.debug(pprint.pformat(ack_dict))
|
||||
LOG.debug(
|
||||
"DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict)
|
||||
)
|
||||
ack_dict[message_number] = 0 # clear ack for this message number
|
||||
tocall = tocall.ljust(9) # pad to nine chars
|
||||
|
||||
# max? ftm400 displays 64, raw msg shows 74
|
||||
# and ftm400-send is max 64. setting this to
|
||||
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
||||
# feature req: break long ones into two msgs
|
||||
message = message[:67]
|
||||
# We all miss George Carlin
|
||||
message = re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
||||
thread = threading.Thread(
|
||||
target=send_message_thread,
|
||||
name="send_message",
|
||||
args=(tocall, message, message_number, retry_count),
|
||||
)
|
||||
thread.start()
|
||||
return ()
|
||||
# end send_message()
|
||||
|
||||
|
||||
def send_message_direct(tocall, message, message_number=None):
|
||||
"""Send a message without a separate thread."""
|
||||
cl = client.get_client()
|
||||
if not message_number:
|
||||
this_message_number = 1
|
||||
else:
|
||||
this_message_number = message_number
|
||||
fromcall = CONFIG["aprs"]["login"]
|
||||
line = "{}>APRS::{}:{}{{{}\n".format(
|
||||
fromcall,
|
||||
tocall,
|
||||
message,
|
||||
str(this_message_number),
|
||||
)
|
||||
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
|
||||
log_message(
|
||||
"Sending Message", line.rstrip("\n"), message, tocall=tocall, fromcall=fromcall
|
||||
)
|
||||
cl.sendall(line)
|
||||
if send_now:
|
||||
cl = client.get_client()
|
||||
log_message(
|
||||
"Sending ack",
|
||||
repr(self.ack).rstrip("\n"),
|
||||
None,
|
||||
ack=self.ack.id,
|
||||
tocall=self.ack.tocall,
|
||||
retry_number=self.ack.last_send_attempt,
|
||||
)
|
||||
cl.sendall(repr(self.ack))
|
||||
self.ack.last_send_attempt += 1
|
||||
self.ack.last_send_time = datetime.datetime.now()
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def log_packet(packet):
|
||||
@ -189,6 +461,7 @@ def log_message(
|
||||
retry_number=None,
|
||||
ack=None,
|
||||
packet_type=None,
|
||||
uuid=None,
|
||||
):
|
||||
"""
|
||||
|
||||
@ -232,36 +505,9 @@ def log_message(
|
||||
if msg_num:
|
||||
# LOG.info(" Msg number : {}".format(msg_num))
|
||||
log_list.append(" Msg number : {}".format(msg_num))
|
||||
if uuid:
|
||||
log_list.append(" UUID : {}".format(uuid))
|
||||
# LOG.info(" {} _______________ Complete".format(header))
|
||||
log_list.append(" {} _______________ Complete".format(header))
|
||||
|
||||
LOG.info("\n".join(log_list))
|
||||
|
||||
|
||||
def process_message(line):
|
||||
f = re.search("^(.*)>", line)
|
||||
fromcall = f.group(1)
|
||||
searchstring = "::%s[ ]*:(.*)" % CONFIG["aprs"]["login"]
|
||||
# verify this, callsign is padded out with spaces to colon
|
||||
m = re.search(searchstring, line)
|
||||
fullmessage = m.group(1)
|
||||
|
||||
ack_attached = re.search("(.*){([0-9A-Z]+)", fullmessage)
|
||||
# ack formats include: {1, {AB}, {12
|
||||
if ack_attached:
|
||||
# "{##" suffix means radio wants an ack back
|
||||
# message content
|
||||
message = ack_attached.group(1)
|
||||
# suffix number to use in ack
|
||||
ack_num = ack_attached.group(2)
|
||||
else:
|
||||
message = fullmessage
|
||||
# ack not requested, but lets send one as 0
|
||||
ack_num = "0"
|
||||
|
||||
log_message(
|
||||
"Received message", line, message, fromcall=fromcall, msg_num=str(ack_num)
|
||||
)
|
||||
|
||||
return (fromcall, message, ack_num)
|
||||
# end process_message()
|
||||
|
@ -31,6 +31,7 @@ CORE_PLUGINS = [
|
||||
"aprsd.plugin.FortunePlugin",
|
||||
"aprsd.plugin.LocationPlugin",
|
||||
"aprsd.plugin.PingPlugin",
|
||||
"aprsd.plugin.QueryPlugin",
|
||||
"aprsd.plugin.TimePlugin",
|
||||
"aprsd.plugin.WeatherPlugin",
|
||||
"aprsd.plugin.VersionPlugin",
|
||||
@ -353,6 +354,43 @@ class PingPlugin(APRSDPluginBase):
|
||||
return reply.rstrip()
|
||||
|
||||
|
||||
class QueryPlugin(APRSDPluginBase):
|
||||
"""Query command."""
|
||||
|
||||
version = "1.0"
|
||||
command_regex = r"^\?.*"
|
||||
command_name = "query"
|
||||
|
||||
def command(self, fromcall, message, ack):
|
||||
LOG.info("Query COMMAND")
|
||||
|
||||
tracker = messaging.MsgTrack()
|
||||
reply = "Pending Messages ({})".format(len(tracker))
|
||||
|
||||
searchstring = "^" + self.config["ham"]["callsign"] + ".*"
|
||||
# only I can do admin commands
|
||||
if re.search(searchstring, fromcall):
|
||||
r = re.search(r"^\?-\*", message)
|
||||
if r is not None:
|
||||
if len(tracker) > 0:
|
||||
reply = "Resend ALL Delayed msgs"
|
||||
LOG.debug(reply)
|
||||
tracker.restart_delayed()
|
||||
else:
|
||||
reply = "No Delayed Msgs"
|
||||
LOG.debug(reply)
|
||||
return reply
|
||||
|
||||
r = re.search(r"^\?-[fF]!", message)
|
||||
if r is not None:
|
||||
reply = "Deleting ALL Delayed msgs."
|
||||
LOG.debug(reply)
|
||||
tracker.flush()
|
||||
return reply
|
||||
|
||||
return reply
|
||||
|
||||
|
||||
class TimePlugin(APRSDPluginBase):
|
||||
"""Time command."""
|
||||
|
||||
@ -449,8 +487,16 @@ class EmailPlugin(APRSDPluginBase):
|
||||
if a is not None:
|
||||
to_addr = a.group(1)
|
||||
content = a.group(2)
|
||||
|
||||
email_address = email.get_email_from_shortcut(to_addr)
|
||||
if not email_address:
|
||||
reply = "Bad email address"
|
||||
return reply
|
||||
|
||||
# send recipient link to aprs.fi map
|
||||
mapme = False
|
||||
if content == "mapme":
|
||||
mapme = True
|
||||
content = "Click for my location: http://aprs.fi/{}".format(
|
||||
self.config["ham"]["callsign"]
|
||||
)
|
||||
@ -458,6 +504,8 @@ class EmailPlugin(APRSDPluginBase):
|
||||
now = time.time()
|
||||
# see if we sent this msg number recently
|
||||
if ack in self.email_sent_dict:
|
||||
# BUG(hemna) - when we get a 2 different email command
|
||||
# with the same ack #, we don't send it.
|
||||
timedelta = now - self.email_sent_dict[ack]
|
||||
if timedelta < 300: # five minutes
|
||||
too_soon = 1
|
||||
@ -477,7 +525,10 @@ class EmailPlugin(APRSDPluginBase):
|
||||
)
|
||||
self.email_sent_dict.clear()
|
||||
self.email_sent_dict[ack] = now
|
||||
reply = "mapme email sent"
|
||||
if mapme:
|
||||
reply = "mapme email sent"
|
||||
else:
|
||||
reply = "Email sent."
|
||||
else:
|
||||
LOG.info(
|
||||
"Email for message number "
|
||||
|
228
aprsd/threads.py
Normal file
228
aprsd/threads.py
Normal file
@ -0,0 +1,228 @@
|
||||
import abc
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
|
||||
import aprslib
|
||||
|
||||
from aprsd import client, messaging, plugin
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
RX_THREAD = "RX"
|
||||
TX_THREAD = "TX"
|
||||
EMAIL_THREAD = "Email"
|
||||
|
||||
|
||||
class APRSDThreadList(object):
|
||||
"""Singleton class that keeps track of application wide threads."""
|
||||
|
||||
_instance = None
|
||||
|
||||
threads_list = []
|
||||
lock = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(APRSDThreadList, cls).__new__(cls)
|
||||
cls.lock = threading.Lock()
|
||||
cls.threads_list = []
|
||||
return cls._instance
|
||||
|
||||
def add(self, thread_obj):
|
||||
with self.lock:
|
||||
self.threads_list.append(thread_obj)
|
||||
|
||||
def remove(self, thread_obj):
|
||||
with self.lock:
|
||||
self.threads_list.remove(thread_obj)
|
||||
|
||||
def stop_all(self):
|
||||
"""Iterate over all threads and call stop on them."""
|
||||
with self.lock:
|
||||
for th in self.threads_list:
|
||||
th.stop()
|
||||
|
||||
|
||||
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
||||
def __init__(self, name):
|
||||
super(APRSDThread, self).__init__(name=name)
|
||||
self.thread_stop = False
|
||||
APRSDThreadList().add(self)
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
|
||||
def run(self):
|
||||
LOG.info("Starting")
|
||||
while not self.thread_stop:
|
||||
can_loop = self.loop()
|
||||
if not can_loop:
|
||||
self.stop()
|
||||
APRSDThreadList().remove(self)
|
||||
LOG.info("Exiting")
|
||||
|
||||
|
||||
class APRSDRXThread(APRSDThread):
|
||||
def __init__(self, msg_queues, config):
|
||||
super(APRSDRXThread, self).__init__("RX_MSG")
|
||||
self.msg_queues = msg_queues
|
||||
self.config = config
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
client.get_client().stop()
|
||||
|
||||
def callback(self, packet):
|
||||
try:
|
||||
packet = aprslib.parse(packet)
|
||||
print(packet)
|
||||
except (aprslib.ParseError, aprslib.UnknownFormat):
|
||||
pass
|
||||
|
||||
def loop(self):
|
||||
aprs_client = client.get_client()
|
||||
|
||||
# setup the consumer of messages and block until a messages
|
||||
try:
|
||||
# This will register a packet consumer with aprslib
|
||||
# When new packets come in the consumer will process
|
||||
# the packet
|
||||
|
||||
# Do a partial here because the consumer signature doesn't allow
|
||||
# For kwargs to be passed in to the consumer func we declare
|
||||
# and the aprslib developer didn't want to allow a PR to add
|
||||
# kwargs. :(
|
||||
# https://github.com/rossengeorgiev/aprs-python/pull/56
|
||||
aprs_client.consumer(self.process_packet, raw=False, blocking=False)
|
||||
|
||||
except aprslib.exceptions.ConnectionDrop:
|
||||
LOG.error("Connection dropped, reconnecting")
|
||||
time.sleep(5)
|
||||
# Force the deletion of the client object connected to aprs
|
||||
# This will cause a reconnect, next time client.get_client()
|
||||
# is called
|
||||
client.Client().reset()
|
||||
# Continue to loop
|
||||
return True
|
||||
|
||||
def process_ack_packet(self, packet):
|
||||
ack_num = packet.get("msgNo")
|
||||
LOG.info("Got ack for message {}".format(ack_num))
|
||||
messaging.log_message(
|
||||
"ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"]
|
||||
)
|
||||
tracker = messaging.MsgTrack()
|
||||
tracker.remove(ack_num)
|
||||
LOG.debug("Length of MsgTrack is {}".format(len(tracker)))
|
||||
# messaging.ack_dict.update({int(ack_num): 1})
|
||||
return
|
||||
|
||||
def process_mic_e_packet(self, packet):
|
||||
LOG.info("Mic-E Packet detected. Currenlty unsupported.")
|
||||
messaging.log_packet(packet)
|
||||
return
|
||||
|
||||
def process_message_packet(self, packet):
|
||||
LOG.info("Got a message packet")
|
||||
fromcall = packet["from"]
|
||||
message = packet.get("message_text", None)
|
||||
|
||||
msg_id = packet.get("msgNo", "0")
|
||||
|
||||
messaging.log_message(
|
||||
"Received Message",
|
||||
packet["raw"],
|
||||
message,
|
||||
fromcall=fromcall,
|
||||
msg_num=msg_id,
|
||||
)
|
||||
|
||||
found_command = False
|
||||
# Get singleton of the PM
|
||||
pm = plugin.PluginManager()
|
||||
try:
|
||||
results = pm.run(fromcall=fromcall, message=message, ack=msg_id)
|
||||
for reply in results:
|
||||
found_command = True
|
||||
# A plugin can return a null message flag which signals
|
||||
# us that they processed the message correctly, but have
|
||||
# nothing to reply with, so we avoid replying with a usage string
|
||||
if reply is not messaging.NULL_MESSAGE:
|
||||
LOG.debug("Sending '{}'".format(reply))
|
||||
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprs"]["login"], fromcall, reply
|
||||
)
|
||||
self.msg_queues["tx"].put(msg)
|
||||
else:
|
||||
LOG.debug("Got NULL MESSAGE from plugin")
|
||||
|
||||
if not found_command:
|
||||
plugins = pm.get_plugins()
|
||||
names = [x.command_name for x in plugins]
|
||||
names.sort()
|
||||
|
||||
reply = "Usage: {}".format(", ".join(names))
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprs"]["login"], fromcall, reply
|
||||
)
|
||||
self.msg_queues["tx"].put(msg)
|
||||
except Exception as ex:
|
||||
LOG.exception("Plugin failed!!!", ex)
|
||||
reply = "A Plugin failed! try again?"
|
||||
msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply)
|
||||
self.msg_queues["tx"].put(msg)
|
||||
|
||||
# let any threads do their thing, then ack
|
||||
# send an ack last
|
||||
ack = messaging.AckMessage(
|
||||
self.config["aprs"]["login"], fromcall, msg_id=msg_id
|
||||
)
|
||||
self.msg_queues["tx"].put(ack)
|
||||
LOG.debug("Packet processing complete")
|
||||
|
||||
def process_packet(self, packet):
|
||||
"""Process a packet recieved from aprs-is server."""
|
||||
|
||||
LOG.debug("Process packet! {}".format(self.msg_queues))
|
||||
try:
|
||||
LOG.debug("Got message: {}".format(packet))
|
||||
|
||||
msg = packet.get("message_text", None)
|
||||
msg_format = packet.get("format", None)
|
||||
msg_response = packet.get("response", None)
|
||||
if msg_format == "message" and msg:
|
||||
# we want to send the message through the
|
||||
# plugins
|
||||
self.process_message_packet(packet)
|
||||
return
|
||||
elif msg_response == "ack":
|
||||
self.process_ack_packet(packet)
|
||||
return
|
||||
|
||||
if msg_format == "mic-e":
|
||||
# process a mic-e packet
|
||||
self.process_mic_e_packet(packet)
|
||||
return
|
||||
|
||||
except (aprslib.ParseError, aprslib.UnknownFormat) as exp:
|
||||
LOG.exception("Failed to parse packet from aprs-is", exp)
|
||||
|
||||
|
||||
class APRSDTXThread(APRSDThread):
|
||||
def __init__(self, msg_queues, config):
|
||||
super(APRSDTXThread, self).__init__("TX_MSG")
|
||||
self.msg_queues = msg_queues
|
||||
self.config = config
|
||||
|
||||
def loop(self):
|
||||
try:
|
||||
msg = self.msg_queues["tx"].get(timeout=0.1)
|
||||
LOG.info("TXQ: got message '{}'".format(msg))
|
||||
msg.send()
|
||||
except queue.Empty:
|
||||
pass
|
||||
# Continue to loop
|
||||
return True
|
@ -5,6 +5,7 @@ import functools
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
import yaml
|
||||
@ -46,7 +47,10 @@ DEFAULT_CONFIG_DICT = {
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_CONFIG_FILE = "~/.config/aprsd/aprsd.yml"
|
||||
home = str(Path.home())
|
||||
DEFAULT_CONFIG_DIR = "{}/.config/aprsd/".format(home)
|
||||
DEFAULT_SAVE_FILE = "{}/.config/aprsd/aprsd.p".format(home)
|
||||
DEFAULT_CONFIG_FILE = "{}/.config/aprsd/aprsd.yml".format(home)
|
||||
|
||||
|
||||
def synchronized(wrapped):
|
||||
|
121
tests/test_plugin.py
Normal file
121
tests/test_plugin.py
Normal file
@ -0,0 +1,121 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import sys
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
import aprsd
|
||||
from aprsd import plugin
|
||||
from aprsd.fuzzyclock import fuzzy
|
||||
|
||||
|
||||
class testPlugin(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.fromcall = "KFART"
|
||||
self.ack = 1
|
||||
self.config = mock.MagicMock()
|
||||
|
||||
@mock.patch("shutil.which")
|
||||
def test_fortune_fail(self, mock_which):
|
||||
fortune_plugin = plugin.FortunePlugin(self.config)
|
||||
mock_which.return_value = None
|
||||
message = "fortune"
|
||||
expected = "Fortune command not installed"
|
||||
actual = fortune_plugin.run(self.fromcall, message, self.ack)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
@mock.patch("subprocess.Popen")
|
||||
@mock.patch("shutil.which")
|
||||
def test_fortune_success(self, mock_which, mock_popen):
|
||||
fortune_plugin = plugin.FortunePlugin(self.config)
|
||||
mock_which.return_value = "/usr/bin/games"
|
||||
|
||||
mock_process = mock.MagicMock()
|
||||
mock_process.communicate.return_value = [b"Funny fortune"]
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
message = "fortune"
|
||||
expected = "Funny fortune"
|
||||
actual = fortune_plugin.run(self.fromcall, message, self.ack)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
@mock.patch("time.localtime")
|
||||
def test_Time(self, mock_time):
|
||||
fake_time = mock.MagicMock()
|
||||
h = fake_time.tm_hour = 16
|
||||
m = fake_time.tm_min = 12
|
||||
s = fake_time.tm_sec = 55
|
||||
mock_time.return_value = fake_time
|
||||
time_plugin = plugin.TimePlugin(self.config)
|
||||
|
||||
fromcall = "KFART"
|
||||
message = "location"
|
||||
ack = 1
|
||||
|
||||
actual = time_plugin.run(fromcall, message, ack)
|
||||
self.assertEqual(None, actual)
|
||||
|
||||
cur_time = fuzzy(h, m, 1)
|
||||
|
||||
message = "time"
|
||||
expected = "{} ({}:{} PDT) ({})".format(
|
||||
cur_time, str(h), str(m).rjust(2, "0"), message.rstrip()
|
||||
)
|
||||
actual = time_plugin.run(fromcall, message, ack)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
@mock.patch("time.localtime")
|
||||
def test_Ping(self, mock_time):
|
||||
fake_time = mock.MagicMock()
|
||||
h = fake_time.tm_hour = 16
|
||||
m = fake_time.tm_min = 12
|
||||
s = fake_time.tm_sec = 55
|
||||
mock_time.return_value = fake_time
|
||||
|
||||
ping = plugin.PingPlugin(self.config)
|
||||
|
||||
fromcall = "KFART"
|
||||
message = "location"
|
||||
ack = 1
|
||||
|
||||
result = ping.run(fromcall, message, ack)
|
||||
self.assertEqual(None, result)
|
||||
|
||||
def ping_str(h, m, s):
|
||||
return (
|
||||
"Pong! "
|
||||
+ str(h).zfill(2)
|
||||
+ ":"
|
||||
+ str(m).zfill(2)
|
||||
+ ":"
|
||||
+ str(s).zfill(2)
|
||||
)
|
||||
|
||||
message = "Ping"
|
||||
actual = ping.run(fromcall, message, ack)
|
||||
expected = ping_str(h, m, s)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
message = "ping"
|
||||
actual = ping.run(fromcall, message, ack)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_version(self):
|
||||
expected = "APRSD version '{}'".format(aprsd.__version__)
|
||||
version_plugin = plugin.VersionPlugin(self.config)
|
||||
|
||||
fromcall = "KFART"
|
||||
message = "No"
|
||||
ack = 1
|
||||
|
||||
actual = version_plugin.run(fromcall, message, ack)
|
||||
self.assertEqual(None, actual)
|
||||
|
||||
message = "version"
|
||||
actual = version_plugin.run(fromcall, message, ack)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
message = "Version"
|
||||
actual = version_plugin.run(fromcall, message, ack)
|
||||
self.assertEqual(expected, actual)
|
Loading…
Reference in New Issue
Block a user