1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-09-26 23:26:38 -04:00

Merge pull request #65 from craigerl/plugin_interface_change

Refactor the plugin interface and manager
This commit is contained in:
Walter A. Boring IV 2021-07-16 08:43:24 -04:00 committed by GitHub
commit f31a4c07b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 948 additions and 119 deletions

View File

@ -226,7 +226,7 @@ class Aprsdis(aprslib.IS):
except ParseError as exp:
self.logger.log(11, "%s\n Packet: %s", exp.args[0], exp.args[1])
except UnknownFormat as exp:
self.logger.log(9, "%s\n Packet: %s", exp.args[0], exp.args[1])
self.logger.log(9, "unknown format %s", exp.args)
except LoginError as exp:
self.logger.error("%s: %s", exp.__class__.__name__, exp.args[0])
except (KeyboardInterrupt, SystemExit):

View File

@ -46,12 +46,30 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required
def index(self):
stats = self._stats()
LOG.debug(
"watch list? {}".format(
self.config["aprsd"]["watch_list"],
),
)
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][
"watch_list"
].get("enabled", False):
watch_count = len(self.config["aprsd"]["watch_list"]["callsigns"])
watch_age = self.config["aprsd"]["watch_list"]["alert_time_seconds"]
age_time = {"seconds": watch_age}
watch_age = datetime.timedelta(**age_time)
else:
watch_count = 0
watch_age = 0
return flask.render_template(
"index.html",
initial_stats=stats,
callsign=self.config["aprs"]["login"],
version=aprsd.__version__,
config_json=json.dumps(self.config),
watch_count=watch_count,
watch_age=watch_age,
)
@auth.login_required
@ -92,6 +110,18 @@ class APRSDFlask(flask_classful.FlaskView):
stats_dict = stats_obj.stats()
# Convert the watch_list entries to age
watch_list = stats_dict["aprsd"]["watch_list"]
new_list = {}
for call in watch_list:
call_date = datetime.datetime.strptime(
watch_list[call],
"%Y-%m-%d %H:%M:%S.%f",
)
new_list[call] = str(now - call_date)
stats_dict["aprsd"]["watch_list"] = new_list
result = {
"time": now.strftime(time_format),
"size_tracker": len(track),

390
aprsd/listen.py Normal file
View File

@ -0,0 +1,390 @@
#
# Listen on amateur radio aprs-is network for messages and respond to them.
# You must have an amateur radio callsign to use this software. You must
# create an ~/.aprsd/config.yml file with all of the required settings. To
# generate an example config.yml, just run aprsd, then copy the sample config
# to ~/.aprsd/config.yml and edit the settings.
#
# APRS messages:
# l(ocation) = descriptive location of calling station
# w(eather) = temp, (hi/low) forecast, later forecast
# t(ime) = respond with the current time
# f(ortune) = respond with a short fortune
# -email_addr email text = send an email
# -2 = display the last 2 emails received
# p(ing) = respond with Pong!/time
# anything else = respond with usage
#
# (C)2018 Craig Lamparter
# License GPLv2
#
# python included libs
import datetime
import logging
from logging import NullHandler
from logging.handlers import RotatingFileHandler
import os
import signal
import sys
import time
# local imports here
import aprsd
from aprsd import client, messaging, stats, threads, trace, utils
import aprslib
from aprslib.exceptions import LoginError
import click
import click_completion
# setup the global logger
# logging.basicConfig(level=logging.DEBUG) # level=10
LOG = logging.getLogger("APRSD")
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
flask_enabled = False
# server_event = threading.Event()
# localization, please edit:
# HOST = "noam.aprs2.net" # north america tier2 servers round robin
# USER = "KM6XXX-9" # callsign of this aprs client with SSID
# PASS = "99999" # google how to generate this
# BASECALLSIGN = "KM6XXX" # callsign of radio in the field to send email
# shortcuts = {
# "aa" : "5551239999@vtext.com",
# "cl" : "craiglamparter@somedomain.org",
# "wb" : "5553909472@vtext.com"
# }
def custom_startswith(string, incomplete):
"""A custom completion match that supports case insensitive matching."""
if os.environ.get("_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE"):
string = string.lower()
incomplete = incomplete.lower()
return string.startswith(incomplete)
click_completion.core.startswith = custom_startswith
click_completion.init()
cmd_help = """Shell completion for click-completion-command
Available shell types:
\b
%s
Default type: auto
""" % "\n ".join(
"{:<12} {}".format(k, click_completion.core.shells[k])
for k in sorted(click_completion.core.shells.keys())
)
@click.group(help=cmd_help, context_settings=CONTEXT_SETTINGS)
@click.version_option()
def main():
pass
@main.command()
@click.option(
"-i",
"--case-insensitive/--no-case-insensitive",
help="Case insensitive completion",
)
@click.argument(
"shell",
required=False,
type=click_completion.DocumentedChoice(click_completion.core.shells),
)
def show(shell, case_insensitive):
"""Show the click-completion-command completion code"""
extra_env = (
{"_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE": "ON"}
if case_insensitive
else {}
)
click.echo(click_completion.core.get_code(shell, extra_env=extra_env))
@main.command()
@click.option(
"--append/--overwrite",
help="Append the completion code to the file",
default=None,
)
@click.option(
"-i",
"--case-insensitive/--no-case-insensitive",
help="Case insensitive completion",
)
@click.argument(
"shell",
required=False,
type=click_completion.DocumentedChoice(click_completion.core.shells),
)
@click.argument("path", required=False)
def install(append, case_insensitive, shell, path):
"""Install the click-completion-command completion"""
extra_env = (
{"_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE": "ON"}
if case_insensitive
else {}
)
shell, path = click_completion.core.install(
shell=shell,
path=path,
append=append,
extra_env=extra_env,
)
click.echo("{} completion installed in {}".format(shell, path))
def signal_handler(sig, frame):
global flask_enabled
threads.APRSDThreadList().stop_all()
if "subprocess" not in str(frame):
LOG.info(
"Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format(
datetime.datetime.now(),
),
)
time.sleep(5)
tracker = messaging.MsgTrack()
tracker.save()
LOG.info(stats.APRSDStats())
# signal.signal(signal.SIGTERM, sys.exit(0))
# sys.exit(0)
if flask_enabled:
signal.signal(signal.SIGTERM, sys.exit(0))
# Setup the logging faciility
# to disable logging to stdout, but still log to file
# use the --quiet option on the cmdln
def setup_logging(config, loglevel, quiet):
log_level = utils.LOG_LEVELS[loglevel]
LOG.setLevel(log_level)
log_format = config["aprsd"].get("logformat", utils.DEFAULT_LOG_FORMAT)
date_format = config["aprsd"].get("dateformat", utils.DEFAULT_DATE_FORMAT)
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
log_file = config["aprsd"].get("logfile", None)
if log_file:
fh = RotatingFileHandler(log_file, maxBytes=(10248576 * 5), backupCount=4)
else:
fh = NullHandler()
fh.setFormatter(log_formatter)
LOG.addHandler(fh)
imap_logger = None
if config["aprsd"]["email"].get("enabled", False) and config["aprsd"]["email"][
"imap"
].get("debug", False):
imap_logger = logging.getLogger("imapclient.imaplib")
imap_logger.setLevel(log_level)
imap_logger.addHandler(fh)
if not quiet:
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(log_formatter)
LOG.addHandler(sh)
if imap_logger:
imap_logger.addHandler(sh)
@main.command()
@click.option(
"--loglevel",
default="DEBUG",
show_default=True,
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"],
case_sensitive=False,
),
show_choices=True,
help="The log level to use for aprsd.log",
)
@click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout")
@click.option(
"-c",
"--config",
"config_file",
show_default=True,
default=utils.DEFAULT_CONFIG_FILE,
help="The aprsd config file to use for options.",
)
@click.option(
"--aprs-login",
envvar="APRS_LOGIN",
show_envvar=True,
help="What callsign to send the message from.",
)
@click.option(
"--aprs-password",
envvar="APRS_PASSWORD",
show_envvar=True,
help="the APRS-IS password for APRS_LOGIN",
)
@click.option(
"--no-ack",
"-n",
is_flag=True,
show_default=True,
default=False,
help="Don't wait for an ack, just sent it to APRS-IS and bail.",
)
@click.option("--raw", default=None, help="Send a raw message. Implies --no-ack")
@click.argument("tocallsign", required=False)
@click.argument("command", nargs=-1, required=False)
def listen(
loglevel,
quiet,
config_file,
aprs_login,
aprs_password,
no_ack,
raw,
tocallsign,
command,
):
"""Send a message to a callsign via APRS_IS."""
global got_ack, got_response
config = utils.parse_config(config_file)
if not aprs_login:
click.echo("Must set --aprs_login or APRS_LOGIN")
return
if not aprs_password:
click.echo("Must set --aprs-password or APRS_PASSWORD")
return
config["aprs"]["login"] = aprs_login
config["aprs"]["password"] = aprs_password
messaging.CONFIG = config
setup_logging(config, loglevel, quiet)
LOG.info("APRSD TEST Started version: {}".format(aprsd.__version__))
if type(command) is tuple:
command = " ".join(command)
if not quiet:
if raw:
LOG.info("L'{}' R'{}'".format(aprs_login, raw))
else:
LOG.info("L'{}' To'{}' C'{}'".format(aprs_login, tocallsign, command))
flat_config = utils.flatten_dict(config)
LOG.info("Using CONFIG values:")
for x in flat_config:
if "password" in x or "aprsd.web.users.admin" in x:
LOG.info("{} = XXXXXXXXXXXXXXXXXXX".format(x))
else:
LOG.info("{} = {}".format(x, flat_config[x]))
got_ack = False
got_response = False
# TODO(walt) - manually edit this list
# prior to running aprsd-listen listen
watch_list = []
# build last seen list
last_seen = {}
for callsign in watch_list:
call = callsign.replace("*", "")
last_seen[call] = datetime.datetime.now()
LOG.debug("Last seen list")
LOG.debug(last_seen)
@trace.trace
def rx_packet(packet):
global got_ack, got_response
LOG.debug("Got packet back {}".format(packet["raw"]))
if packet["from"] in last_seen:
now = datetime.datetime.now()
age = str(now - last_seen[packet["from"]])
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
max_timeout = {
"seconds": config["aprsd"]["watch_list"]["alert_time_seconds"],
}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.debug(
"NOTIFY!!! {} last seen {} max age={}".format(
packet["from"],
age,
max_delta,
),
)
else:
LOG.debug("Not old enough to notify {} < {}".format(d, max_delta))
LOG.debug("Update last seen from {}".format(packet["from"]))
last_seen[packet["from"]] = now
else:
LOG.debug(
"ignoring packet because {} not in watch list".format(packet["from"]),
)
resp = packet.get("response", None)
if resp == "ack":
ack_num = packet.get("msgNo")
LOG.info("We saw an ACK {} Ignoring".format(ack_num))
# messaging.log_packet(packet)
got_ack = True
else:
message = packet.get("message_text", None)
fromcall = packet["from"]
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message",
packet["raw"],
message,
fromcall=fromcall,
ack=msg_number,
)
try:
cl = client.Client(config)
cl.setup_connection()
except LoginError:
sys.exit(-1)
aprs_client = client.get_client()
# filter_str = 'b/{}'.format('/'.join(watch_list))
# LOG.debug("Filter by '{}'".format(filter_str))
# aprs_client.set_filter(filter_str)
filter_str = "p/{}".format("/".join(watch_list))
LOG.debug("Filter by '{}'".format(filter_str))
aprs_client.set_filter(filter_str)
while True:
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
aprs_client.consumer(rx_packet, raw=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
cl.reset()
except aprslib.exceptions.UnknownFormat:
LOG.error("Got a shitty packet")
if __name__ == "__main__":
main()

View File

@ -504,12 +504,26 @@ def server(
LOG.debug("Loading saved MsgTrack object.")
messaging.MsgTrack().load()
rx_notify_queue = queue.Queue(maxsize=20)
rx_msg_queue = queue.Queue(maxsize=20)
tx_msg_queue = queue.Queue(maxsize=20)
msg_queues = {"rx": rx_msg_queue, "tx": tx_msg_queue}
msg_queues = {
"rx": rx_msg_queue,
"tx": tx_msg_queue,
"notify": rx_notify_queue,
}
rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config)
tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config)
if "watch_list" in config["aprsd"] and config["aprsd"]["watch_list"].get(
"enabled",
True,
):
notify_thread = threads.APRSDNotifyThread(
msg_queues=msg_queues,
config=config,
)
notify_thread.start()
if email_enabled:
email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config)

View File

@ -186,11 +186,12 @@ class MessageCounter:
_instance = None
max_count = 9999
lock = None
def __new__(cls, *args, **kwargs):
"""Make this a singleton class."""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance = super().__new__(cls, *args, **kwargs)
cls._instance.val = RawValue("i", 1)
cls._instance.lock = threading.Lock()
return cls._instance

View File

@ -4,6 +4,10 @@ import time
LOG = logging.getLogger("APRSD")
PACKET_TYPE_MESSAGE = "message"
PACKET_TYPE_ACK = "ack"
PACKET_TYPE_MICE = "mic-e"
class PacketList:
"""Class to track all of the packets rx'd and tx'd by aprsd."""
@ -28,3 +32,17 @@ class PacketList:
now = time.time()
ts = str(now).split(".")[0]
self.packet_list[ts] = packet
def get_packet_type(packet):
"""Decode the packet type from the packet."""
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
if msg_format == "message":
packet_type = PACKET_TYPE_MESSAGE
elif msg_response == "ack":
packet_type = PACKET_TYPE_ACK
elif msg_format == "mic-e":
packet_type = PACKET_TYPE_MICE
return packet_type

View File

@ -17,7 +17,7 @@ LOG = logging.getLogger("APRSD")
hookspec = pluggy.HookspecMarker("aprsd")
hookimpl = pluggy.HookimplMarker("aprsd")
CORE_PLUGINS = [
CORE_MESSAGE_PLUGINS = [
"aprsd.plugins.email.EmailPlugin",
"aprsd.plugins.fortune.FortunePlugin",
"aprsd.plugins.location.LocationPlugin",
@ -29,17 +29,58 @@ CORE_PLUGINS = [
"aprsd.plugins.version.VersionPlugin",
]
CORE_NOTIFY_PLUGINS = [
"aprsd.plugins.notify.BaseNotifyPlugin",
]
class APRSDCommandSpec:
"""A hook specification namespace."""
@hookspec
def run(self, fromcall, message, ack):
def run(self, packet):
"""My special little hook that you can customize."""
pass
class APRSDPluginBase(metaclass=abc.ABCMeta):
class APRSDNotificationPluginBase(metaclass=abc.ABCMeta):
"""Base plugin class for all notification ased plugins.
All these plugins will get every packet seen by APRSD's
registered list of HAM callsigns in the config file's
watch_list.
When you want to 'notify' something when a packet is seen
by a particular HAM callsign, write a plugin based off of
this class.
"""
def __init__(self, config):
"""The aprsd config object is stored."""
self.config = config
self.message_counter = 0
@hookimpl
def run(self, packet):
return self.notify(packet)
@abc.abstractmethod
def notify(self, packet):
"""This is the main method called when a packet is rx.
This will get called when a packet is seen by a callsign
registered in the watch list in the config file."""
pass
class APRSDMessagePluginBase(metaclass=abc.ABCMeta):
"""Base Message plugin class.
When you want to search for a particular command in an
APRSD message and send a direct reply, write a plugin
based off of this class.
"""
def __init__(self, config):
"""The aprsd config object is stored."""
self.config = config
@ -65,13 +106,14 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
return self.message_counter
@hookimpl
def run(self, fromcall, message, ack):
def run(self, packet):
message = packet.get("message_text", None)
if re.search(self.command_regex, message):
self.message_counter += 1
return self.command(fromcall, message, ack)
return self.command(packet)
@abc.abstractmethod
def command(self, fromcall, message, ack):
def command(self, packet):
"""This is the command that runs when the regex matches.
To reply with a message over the air, return a string
@ -84,8 +126,11 @@ class PluginManager:
# The singleton instance object for this class
_instance = None
# the pluggy PluginManager
_pluggy_pm = None
# the pluggy PluginManager for all Message plugins
_pluggy_msg_pm = None
# the pluggy PluginManager for all Notification plugins
_pluggy_notify_pm = None
# aprsd config dict
config = None
@ -130,7 +175,10 @@ class PluginManager:
def is_plugin(self, obj):
for c in inspect.getmro(obj):
if issubclass(c, APRSDPluginBase):
if issubclass(c, APRSDMessagePluginBase) or issubclass(
c,
APRSDNotificationPluginBase,
):
return True
return False
@ -167,7 +215,7 @@ class PluginManager:
obj = cls(**kwargs)
return obj
def _load_plugin(self, plugin_name):
def _load_msg_plugin(self, plugin_name):
"""
Given a python fully qualified class path.name,
Try importing the path, then creating the object,
@ -177,42 +225,81 @@ class PluginManager:
try:
plugin_obj = self._create_class(
plugin_name,
APRSDPluginBase,
APRSDMessagePluginBase,
config=self.config,
)
if plugin_obj:
LOG.info(
"Registering Command plugin '{}'({}) '{}'".format(
"Registering Message plugin '{}'({}) '{}'".format(
plugin_name,
plugin_obj.version,
plugin_obj.command_regex,
),
)
self._pluggy_pm.register(plugin_obj)
self._pluggy_msg_pm.register(plugin_obj)
except Exception as ex:
LOG.exception("Couldn't load plugin '{}'".format(plugin_name), ex)
def _load_notify_plugin(self, plugin_name):
"""
Given a python fully qualified class path.name,
Try importing the path, then creating the object,
then registering it as a aprsd Command Plugin
"""
plugin_obj = None
try:
plugin_obj = self._create_class(
plugin_name,
APRSDNotificationPluginBase,
config=self.config,
)
if plugin_obj:
LOG.info(
"Registering Notification plugin '{}'({})".format(
plugin_name,
plugin_obj.version,
),
)
self._pluggy_notify_pm.register(plugin_obj)
except Exception as ex:
LOG.exception("Couldn't load plugin '{}'".format(plugin_name), ex)
def reload_plugins(self):
with self.lock:
del self._pluggy_pm
del self._pluggy_msg_pm
del self._pluggy_notify_pm
self.setup_plugins()
def setup_plugins(self):
"""Create the plugin manager and register plugins."""
LOG.info("Loading Core APRSD Command Plugins")
enabled_plugins = self.config["aprsd"].get("enabled_plugins", None)
self._pluggy_pm = pluggy.PluginManager("aprsd")
self._pluggy_pm.add_hookspecs(APRSDCommandSpec)
if enabled_plugins:
for p_name in enabled_plugins:
self._load_plugin(p_name)
LOG.info("Loading APRSD Message Plugins")
enabled_msg_plugins = self.config["aprsd"].get("enabled_plugins", None)
self._pluggy_msg_pm = pluggy.PluginManager("aprsd")
self._pluggy_msg_pm.add_hookspecs(APRSDCommandSpec)
if enabled_msg_plugins:
for p_name in enabled_msg_plugins:
self._load_msg_plugin(p_name)
else:
# Enabled plugins isn't set, so we default to loading all of
# the core plugins.
for p_name in CORE_PLUGINS:
for p_name in CORE_MESSAGE_PLUGINS:
self._load_plugin(p_name)
if self.config["aprsd"]["watch_list"].get("enabled", False):
LOG.info("Loading APRSD Notification Plugins")
enabled_notify_plugins = self.config["aprsd"]["watch_list"].get(
"enabled_plugins",
None,
)
self._pluggy_notify_pm = pluggy.PluginManager("aprsd")
self._pluggy_notify_pm.add_hookspecs(APRSDCommandSpec)
if enabled_notify_plugins:
for p_name in enabled_notify_plugins:
self._load_notify_plugin(p_name)
# FIXME(Walt) - no real need to support loading random python classes
# from a directory anymore. Need to remove this.
plugin_dir = self.config["aprsd"].get("plugin_dir", None)
if plugin_dir:
LOG.info("Trying to load custom plugins from '{}'".format(plugin_dir))
@ -221,8 +308,6 @@ class PluginManager:
LOG.info("Discovered {} modules to load".format(len(plugins_list)))
for o in plugins_list:
plugin_obj = None
# not setting enabled plugins means load all?
plugin_obj = o["obj"]
if plugin_obj:
LOG.info(
@ -238,14 +323,19 @@ class PluginManager:
LOG.info("Skipping Custom Plugins directory.")
LOG.info("Completed Plugin Loading.")
def run(self, *args, **kwargs):
def run(self, packet):
"""Execute all the pluguns run method."""
with self.lock:
return self._pluggy_pm.hook.run(*args, **kwargs)
return self._pluggy_msg_pm.hook.run(packet=packet)
def register(self, obj):
def notify(self, packet):
"""Execute all the notify pluguns run method."""
with self.lock:
return self._pluggy_notify_pm.hook.run(packet=packet)
def register_msg(self, obj):
"""Register the plugin."""
self._pluggy_pm.register(obj)
self._pluggy_msg_pm.register(obj)
def get_plugins(self):
return self._pluggy_pm.get_plugins()
def get_msg_plugins(self):
return self._pluggy_msg_pm.get_plugins()

View File

@ -7,7 +7,7 @@ from aprsd import email, messaging, plugin, trace
LOG = logging.getLogger("APRSD")
class EmailPlugin(plugin.APRSDPluginBase):
class EmailPlugin(plugin.APRSDMessagePluginBase):
"""Email Plugin."""
version = "1.0"
@ -19,8 +19,13 @@ class EmailPlugin(plugin.APRSDPluginBase):
email_sent_dict = {}
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("Email COMMAND")
fromcall = packet.get("from")
message = packet.get("message_text", None)
ack = packet.get("msgNo", "0")
reply = None
if not self.config["aprsd"]["email"].get("enabled", False):
LOG.debug("Email is not enabled in config file ignoring.")

View File

@ -7,7 +7,7 @@ from aprsd import plugin, trace
LOG = logging.getLogger("APRSD")
class FortunePlugin(plugin.APRSDPluginBase):
class FortunePlugin(plugin.APRSDMessagePluginBase):
"""Fortune."""
version = "1.0"
@ -15,8 +15,13 @@ class FortunePlugin(plugin.APRSDPluginBase):
command_name = "fortune"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("FortunePlugin")
# fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
reply = None
fortune_path = shutil.which("fortune")

View File

@ -7,7 +7,7 @@ from aprsd import plugin, plugin_utils, trace, utils
LOG = logging.getLogger("APRSD")
class LocationPlugin(plugin.APRSDPluginBase):
class LocationPlugin(plugin.APRSDMessagePluginBase):
"""Location!"""
version = "1.0"
@ -15,8 +15,12 @@ class LocationPlugin(plugin.APRSDPluginBase):
command_name = "location"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("Location Plugin")
fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
# get last location of a callsign, get descriptive name from weather service
try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])

23
aprsd/plugins/notify.py Normal file
View File

@ -0,0 +1,23 @@
import logging
from aprsd import packets, plugin, trace
LOG = logging.getLogger("APRSD")
class BaseNotifyPlugin(plugin.APRSDNotificationPluginBase):
"""Notification base plugin."""
version = "1.0"
@trace.trace
def notify(self, packet):
LOG.info("BaseNotifyPlugin")
notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"]
fromcall = packet.get("from")
packet_type = packets.get_packet_type(packet)
# we shouldn't notify the alert user that they are online.
if fromcall != notify_callsign:
return "{} was just seen by type:'{}'".format(fromcall, packet_type)

View File

@ -6,7 +6,7 @@ from aprsd import plugin, trace
LOG = logging.getLogger("APRSD")
class PingPlugin(plugin.APRSDPluginBase):
class PingPlugin(plugin.APRSDMessagePluginBase):
"""Ping."""
version = "1.0"
@ -14,8 +14,11 @@ class PingPlugin(plugin.APRSDPluginBase):
command_name = "ping"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("PINGPlugin")
# fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min

View File

@ -7,7 +7,7 @@ from aprsd import messaging, plugin, trace
LOG = logging.getLogger("APRSD")
class QueryPlugin(plugin.APRSDPluginBase):
class QueryPlugin(plugin.APRSDMessagePluginBase):
"""Query command."""
version = "1.0"
@ -15,9 +15,13 @@ class QueryPlugin(plugin.APRSDPluginBase):
command_name = "query"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("Query COMMAND")
fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
tracker = messaging.MsgTrack()
now = datetime.datetime.now()
reply = "Pending messages ({}) {}".format(

View File

@ -7,7 +7,7 @@ import yfinance as yf
LOG = logging.getLogger("APRSD")
class StockPlugin(plugin.APRSDPluginBase):
class StockPlugin(plugin.APRSDMessagePluginBase):
"""Stock market plugin for fetching stock quotes"""
version = "1.0"
@ -15,9 +15,13 @@ class StockPlugin(plugin.APRSDPluginBase):
command_name = "stock"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("StockPlugin")
# fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
a = re.search(r"^.*\s+(.*)", message)
if a is not None:
searchcall = a.group(1)

View File

@ -8,7 +8,7 @@ import pytz
LOG = logging.getLogger("APRSD")
class TimePlugin(plugin.APRSDPluginBase):
class TimePlugin(plugin.APRSDMessagePluginBase):
"""Time command."""
version = "1.0"
@ -39,7 +39,7 @@ class TimePlugin(plugin.APRSDPluginBase):
return reply
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("TIME COMMAND")
# So we can mock this in unit tests
localzone = self._get_local_tz()
@ -54,7 +54,11 @@ class TimeOpenCageDataPlugin(TimePlugin):
command_name = "Time"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
@ -95,7 +99,10 @@ class TimeOWMPlugin(TimePlugin):
command_name = "Time"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)

View File

@ -6,7 +6,7 @@ from aprsd import plugin, stats, trace
LOG = logging.getLogger("APRSD")
class VersionPlugin(plugin.APRSDPluginBase):
class VersionPlugin(plugin.APRSDMessagePluginBase):
"""Version of APRSD Plugin."""
version = "1.0"
@ -18,8 +18,11 @@ class VersionPlugin(plugin.APRSDPluginBase):
email_sent_dict = {}
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("Version COMMAND")
# fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
stats_obj = stats.APRSDStats()
s = stats_obj.stats()
return "APRSD ver:{} uptime:{}".format(

View File

@ -8,7 +8,7 @@ import requests
LOG = logging.getLogger("APRSD")
class USWeatherPlugin(plugin.APRSDPluginBase):
class USWeatherPlugin(plugin.APRSDMessagePluginBase):
"""USWeather Command
Returns a weather report for the calling weather station
@ -26,8 +26,11 @@ class USWeatherPlugin(plugin.APRSDPluginBase):
command_name = "weather"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
LOG.info("Weather Plugin")
fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
@ -66,7 +69,7 @@ class USWeatherPlugin(plugin.APRSDPluginBase):
return reply
class USMetarPlugin(plugin.APRSDPluginBase):
class USMetarPlugin(plugin.APRSDMessagePluginBase):
"""METAR Command
This provides a METAR weather report from a station near the caller
@ -86,7 +89,10 @@ class USMetarPlugin(plugin.APRSDPluginBase):
command_name = "Metar"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
LOG.info("WX Plugin '{}'".format(message))
a = re.search(r"^.*\s+(.*)", message)
if a is not None:
@ -154,7 +160,7 @@ class USMetarPlugin(plugin.APRSDPluginBase):
return reply
class OWMWeatherPlugin(plugin.APRSDPluginBase):
class OWMWeatherPlugin(plugin.APRSDMessagePluginBase):
"""OpenWeatherMap Weather Command
This provides weather near the caller or callsign.
@ -178,7 +184,10 @@ class OWMWeatherPlugin(plugin.APRSDPluginBase):
command_name = "Weather"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
LOG.info("OWMWeather Plugin '{}'".format(message))
a = re.search(r"^.*\s+(.*)", message)
if a is not None:
@ -271,7 +280,7 @@ class OWMWeatherPlugin(plugin.APRSDPluginBase):
return reply
class AVWXWeatherPlugin(plugin.APRSDPluginBase):
class AVWXWeatherPlugin(plugin.APRSDMessagePluginBase):
"""AVWXWeatherMap Weather Command
Fetches a METAR weather report for the nearest
@ -299,7 +308,10 @@ class AVWXWeatherPlugin(plugin.APRSDPluginBase):
command_name = "Weather"
@trace.trace
def command(self, fromcall, message, ack):
def command(self, packet):
fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
LOG.info("OWMWeather Plugin '{}'".format(message))
a = re.search(r"^.*\s+(.*)", message)
if a is not None:

View File

@ -33,6 +33,7 @@ class APRSDStats:
_mem_current = 0
_mem_peak = 0
_watch_list = {}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
@ -169,6 +170,15 @@ class APRSDStats:
with self.lock:
self._email_thread_last_time = datetime.datetime.now()
@property
def watch_list(self):
with self.lock:
return self._watch_list
def update_watch_list(self, watch_list):
with self.lock:
self._watch_list = watch_list
def stats(self):
now = datetime.datetime.now()
if self._email_thread_last_time:
@ -182,7 +192,7 @@ class APRSDStats:
last_aprsis_keepalive = "never"
pm = plugin.PluginManager()
plugins = pm.get_plugins()
plugins = pm.get_msg_plugins()
plugin_stats = {}
def full_name_with_qualname(obj):
@ -202,6 +212,7 @@ class APRSDStats:
"memory_current_str": utils.human_size(self.memory),
"memory_peak": self.memory_peak,
"memory_peak_str": utils.human_size(self.memory_peak),
"watch_list": self.watch_list,
},
"aprs-is": {
"server": self.aprsis_server,

View File

@ -6,7 +6,7 @@ import threading
import time
import tracemalloc
from aprsd import client, messaging, packets, plugin, stats, trace, utils
from aprsd import client, messaging, packets, plugin, stats, utils
import aprslib
LOG = logging.getLogger("APRSD")
@ -114,6 +114,96 @@ class KeepAliveThread(APRSDThread):
return True
class APRSDNotifyThread(APRSDThread):
last_seen = {}
def __init__(self, msg_queues, config):
super().__init__("NOTIFY_MSG")
self.msg_queues = msg_queues
self.config = config
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.last_seen[call] = datetime.datetime.now()
self.update_stats()
def update_stats(self):
stats_seen = {}
for callsign in self.last_seen:
stats_seen[callsign] = str(self.last_seen[callsign])
stats.APRSDStats().update_watch_list(stats_seen)
def loop(self):
try:
packet = self.msg_queues["notify"].get(timeout=5)
if packet["from"] in self.last_seen:
# We only notify if the last time a callsign was seen
# is older than the alert_time_seconds
now = datetime.datetime.now()
age = str(now - self.last_seen[packet["from"]])
delta = utils.parse_delta_str(age)
d = datetime.timedelta(**delta)
watch_list_conf = self.config["aprsd"]["watch_list"]
max_timeout = {
"seconds": watch_list_conf["alert_time_seconds"],
}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.info(
"NOTIFY {} last seen {} max age={}".format(
packet["from"],
age,
max_delta,
),
)
# NOW WE RUN through the notify plugins.
# If they return a msg, then we queue it for sending.
pm = plugin.PluginManager()
results = pm.notify(packet)
for reply in results:
if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply))
msg = messaging.TextMessage(
self.config["aprs"]["login"],
watch_list_conf["alert_callsign"],
reply,
)
self.msg_queues["tx"].put(msg)
else:
LOG.debug("Got NULL MESSAGE from plugin")
else:
LOG.debug(
"Not old enough to notify callsign {}: {} < {}".format(
packet["from"],
age,
max_delta,
),
)
LOG.debug("Update last seen from {}".format(packet["from"]))
self.last_seen[packet["from"]] = now
else:
LOG.debug("Ignoring packet from {}".format(packet["from"]))
# Allows stats object to have latest info from the last_seen dict
self.update_stats()
LOG.debug("Packet processing complete")
except queue.Empty:
pass
# Continue to loop
return True
class APRSDRXThread(APRSDThread):
def __init__(self, msg_queues, config):
super().__init__("RX_MSG")
@ -127,6 +217,23 @@ class APRSDRXThread(APRSDThread):
def loop(self):
aprs_client = client.get_client()
# if we have a watch list enabled, we need to add filtering
# to enable seeing packets from the watch list.
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][
"watch_list"
].get("enabled", False):
# watch list is enabled
watch_list = self.config["aprsd"]["watch_list"].get(
"callsigns",
[],
)
# make sure the timeout is set or this doesn't work
if watch_list:
filter_str = "p/{}".format("/".join(watch_list))
aprs_client.set_filter(filter_str)
else:
LOG.warning("Watch list enabled, but no callsigns set.")
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
@ -189,7 +296,7 @@ class APRSDRXThread(APRSDThread):
# Get singleton of the PM
pm = plugin.PluginManager()
try:
results = pm.run(fromcall=fromcall, message=message, ack=msg_id)
results = pm.run(packet)
for reply in results:
found_command = True
# A plugin can return a null message flag which signals
@ -208,7 +315,7 @@ class APRSDRXThread(APRSDThread):
LOG.debug("Got NULL MESSAGE from plugin")
if not found_command:
plugins = pm.get_plugins()
plugins = pm.get_msg_plugins()
names = [x.command_name for x in plugins]
names.sort()
@ -237,30 +344,43 @@ class APRSDRXThread(APRSDThread):
self.msg_queues["tx"].put(ack)
LOG.debug("Packet processing complete")
@trace.trace
def process_packet(self, packet):
"""Process a packet recieved from aprs-is server."""
try:
stats.APRSDStats().msgs_rx_inc()
packets.PacketList().add(packet)
LOG.debug("Adding packet to notify queue {}".format(packet["raw"]))
self.msg_queues["notify"].put(packet)
msg = packet.get("message_text", None)
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
if msg_format == "message" and msg:
# we want to send the message through the
# plugins
self.process_message_packet(packet)
return
elif msg_response == "ack":
self.process_ack_packet(packet)
return
# since we can see packets from anyone now with the
# watch list, we need to filter messages directly only to us.
tocall = packet.get("addresse", None)
if tocall == self.config["aprs"]["login"]:
stats.APRSDStats().msgs_rx_inc()
packets.PacketList().add(packet)
if msg_format == "mic-e":
# process a mic-e packet
self.process_mic_e_packet(packet)
return
msg = packet.get("message_text", None)
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
if msg_format == "message" and msg:
# we want to send the message through the
# plugins
self.process_message_packet(packet)
return
elif msg_response == "ack":
self.process_ack_packet(packet)
return
if msg_format == "mic-e":
# process a mic-e packet
self.process_mic_e_packet(packet)
return
else:
LOG.debug(
"Packet wasn't meant for us '{}'. Ignoring packet to '{}'".format(
self.config["aprs"]["login"],
tocall,
),
)
except (aprslib.ParseError, aprslib.UnknownFormat) as exp:
LOG.exception("Failed to parse packet from aprs-is", exp)
@ -274,7 +394,7 @@ class APRSDTXThread(APRSDThread):
def loop(self):
try:
msg = self.msg_queues["tx"].get(timeout=0.1)
msg = self.msg_queues["tx"].get(timeout=5)
packets.PacketList().add(msg.dict())
msg.send()
except queue.Empty:

View File

@ -6,6 +6,7 @@ import functools
import logging
import os
from pathlib import Path
import re
import sys
import threading
@ -32,9 +33,9 @@ DEFAULT_DATE_FORMAT = "%m/%d/%Y %I:%M:%S %p"
# an example of what should be in the ~/.aprsd/config.yml
DEFAULT_CONFIG_DICT = {
"ham": {"callsign": "CALLSIGN"},
"ham": {"callsign": "NOCALL"},
"aprs": {
"login": "CALLSIGN",
"login": "NOCALL",
"password": "00000",
"host": "rotate.aprs2.net",
"port": 14580,
@ -45,15 +46,24 @@ DEFAULT_CONFIG_DICT = {
"dateformat": DEFAULT_DATE_FORMAT,
"trace": False,
"plugin_dir": "~/.config/aprsd/plugins",
"enabled_plugins": plugin.CORE_PLUGINS,
"enabled_plugins": plugin.CORE_MESSAGE_PLUGINS,
"units": "imperial",
"watch_list": {
"enabled": False,
# Who gets the alert?
"alert_callsign": "NOCALL",
# 43200 is 12 hours
"alert_time_seconds": 43200,
"callsigns": [],
"enabled_plugins": plugin.CORE_NOTIFY_PLUGINS,
},
"web": {
"enabled": True,
"logging_enabled": True,
"host": "0.0.0.0",
"port": 8001,
"users": {
"admin": "aprsd",
"admin": "password-here",
},
},
"email": {
@ -334,6 +344,13 @@ def parse_config(config_file):
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["web"]["users"]["admin"],
)
if config["aprsd"]["watch_list"]["enabled"] is True:
check_option(
config,
["aprsd", "watch_list", "alert_callsign"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["watch_list"]["alert_callsign"],
)
if config["aprsd"]["email"]["enabled"] is True:
# Check IMAP server settings
check_option(config, ["aprsd", "email", "imap", "host"])
@ -407,3 +424,14 @@ def flatten_dict(d, parent_key="", sep="."):
else:
items.append((new_key, v))
return dict(items)
def parse_delta_str(s):
if "day" in s:
m = re.match(
r"(?P<days>[-\d]+) day[s]*, (?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)",
s,
)
else:
m = re.match(r"(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)", s)
return {key: float(val) for key, val in m.groupdict().items()}

View File

@ -177,7 +177,7 @@ function updateQuadData(chart, label, first, second, third, fourth) {
function update_stats( data ) {
$("#version").text( data["stats"]["aprsd"]["version"] );
$("#aprsis").text( "APRS-IS Server: " + data["stats"]["aprs-is"]["server"] );
$("#aprsis").html( "APRS-IS Server: <a href='http://status.aprs2.net' >" + data["stats"]["aprs-is"]["server"] + "</a>" );
$("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] );
const html_pretty = Prism.highlight(JSON.stringify(data, null, '\t'), Prism.languages.json, 'json');
$("#jsonstats").html(html_pretty);
@ -185,6 +185,16 @@ function update_stats( data ) {
updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["recieved"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]);
updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]);
// Update the watch list
var watchdiv = $("#watchDiv");
var html_str = '<table class="ui celled striped table"><thead><tr><th>HAM Callsign</th><th>Age since last seen by APRSD</th></tr></thead><tbody>'
watchdiv.html('')
jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) {
html_str += '<tr><td class="collapsing"><i class="phone volume icon"></i>' + i + '</td><td>' + val + '</td></tr>'
});
html_str += "</tbody></table>";
watchdiv.append(html_str);
}

View File

@ -68,6 +68,7 @@
<div class="ui top attached tabular menu">
<div class="active item" data-tab="charts-tab">Charts</div>
<div class="item" data-tab="msgs-tab">Messages</div>
<div class="item" data-tab="watch-tab">Watch List</div>
<div class="item" data-tab="config-tab">Config</div>
</div>
@ -94,6 +95,15 @@
</div>
</div>
<div class="ui bottom attached tab segment" data-tab="watch-tab">
<h3 class="ui dividing header">
Callsign Watch List (<span id="watch_count">{{ watch_count }}</span>)
&nbsp;&nbsp;&nbsp;
Notification age - <span id="watch_age">{{ watch_age }}</span>
</h3>
<div id="watchDiv" class="ui mini text">Loading</div>
</div>
<div class="ui bottom attached tab segment" data-tab="config-tab">
<h3 class="ui dividing header">Config</h3>
<pre id="configjson" class="language-json">{{ config_json|safe }}</pre>

View File

@ -34,6 +34,7 @@ packages =
[entry_points]
console_scripts =
aprsd = aprsd.main:main
aprsd-listen = aprsd.listen:main
aprsd-dev = aprsd.dev:main
aprsd-healthcheck = aprsd.healthcheck:main
fake_aprs = aprsd.fake_aprs:main

View File

@ -21,13 +21,23 @@ class TestPlugin(unittest.TestCase):
# Inintialize the stats object with the config
stats.APRSDStats(self.config)
def fake_packet(self, fromcall="KFART", message=None, msg_number=None):
packet = {"from": fromcall}
if message:
packet["message_text"] = message
if msg_number:
packet["msgNo"] = msg_number
return packet
@mock.patch("shutil.which")
def test_fortune_fail(self, mock_which):
fortune = fortune_plugin.FortunePlugin(self.config)
mock_which.return_value = None
message = "fortune"
expected = "Fortune command not installed"
actual = fortune.run(self.fromcall, message, self.ack)
packet = self.fake_packet(message="fortune")
actual = fortune.run(packet)
self.assertEqual(expected, actual)
@mock.patch("subprocess.check_output")
@ -38,18 +48,18 @@ class TestPlugin(unittest.TestCase):
mock_output.return_value = "Funny fortune"
message = "fortune"
expected = "Funny fortune"
actual = fortune.run(self.fromcall, message, self.ack)
packet = self.fake_packet(message="fortune")
actual = fortune.run(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.messaging.MsgTrack.flush")
def test_query_flush(self, mock_flush):
message = "!delete"
packet = self.fake_packet(message="!delete")
query = query_plugin.QueryPlugin(self.config)
expected = "Deleted ALL pending msgs."
actual = query.run(self.fromcall, message, self.ack)
actual = query.run(packet)
mock_flush.assert_called_once()
self.assertEqual(expected, actual)
@ -57,11 +67,11 @@ class TestPlugin(unittest.TestCase):
def test_query_restart_delayed(self, mock_restart):
track = messaging.MsgTrack()
track.track = {}
message = "!4"
packet = self.fake_packet(message="!4")
query = query_plugin.QueryPlugin(self.config)
expected = "No pending msgs to resend"
actual = query.run(self.fromcall, message, self.ack)
actual = query.run(packet)
mock_restart.assert_not_called()
self.assertEqual(expected, actual)
mock_restart.reset_mock()
@ -69,7 +79,7 @@ class TestPlugin(unittest.TestCase):
# add a message
msg = messaging.TextMessage(self.fromcall, "testing", self.ack)
track.add(msg)
actual = query.run(self.fromcall, message, self.ack)
actual = query.run(packet)
mock_restart.assert_called_once()
@mock.patch("aprsd.plugins.time.TimePlugin._get_local_tz")
@ -89,22 +99,28 @@ class TestPlugin(unittest.TestCase):
fake_time.tm_sec = 13
time = time_plugin.TimePlugin(self.config)
fromcall = "KFART"
message = "location"
ack = 1
packet = self.fake_packet(
fromcall="KFART",
message="location",
msg_number=1,
)
actual = time.run(fromcall, message, ack)
actual = time.run(packet)
self.assertEqual(None, actual)
cur_time = fuzzy(h, m, 1)
message = "time"
packet = self.fake_packet(
fromcall="KFART",
message="time",
msg_number=1,
)
local_short_str = local_t.strftime("%H:%M %Z")
expected = "{} ({})".format(
cur_time,
local_short_str,
)
actual = time.run(fromcall, message, ack)
actual = time.run(packet)
self.assertEqual(expected, actual)
@mock.patch("time.localtime")
@ -117,11 +133,13 @@ class TestPlugin(unittest.TestCase):
ping = ping_plugin.PingPlugin(self.config)
fromcall = "KFART"
message = "location"
ack = 1
packet = self.fake_packet(
fromcall="KFART",
message="location",
msg_number=1,
)
result = ping.run(fromcall, message, ack)
result = ping.run(packet)
self.assertEqual(None, result)
def ping_str(h, m, s):
@ -134,31 +152,49 @@ class TestPlugin(unittest.TestCase):
+ str(s).zfill(2)
)
message = "Ping"
actual = ping.run(fromcall, message, ack)
packet = self.fake_packet(
fromcall="KFART",
message="Ping",
msg_number=1,
)
actual = ping.run(packet)
expected = ping_str(h, m, s)
self.assertEqual(expected, actual)
message = "ping"
actual = ping.run(fromcall, message, ack)
packet = self.fake_packet(
fromcall="KFART",
message="ping",
msg_number=1,
)
actual = ping.run(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin.PluginManager.get_plugins")
@mock.patch("aprsd.plugin.PluginManager.get_msg_plugins")
def test_version(self, mock_get_plugins):
expected = "APRSD ver:{} uptime:0:0:0".format(aprsd.__version__)
version = version_plugin.VersionPlugin(self.config)
fromcall = "KFART"
message = "No"
ack = 1
packet = self.fake_packet(
fromcall="KFART",
message="No",
msg_number=1,
)
actual = version.run(fromcall, message, ack)
actual = version.run(packet)
self.assertEqual(None, actual)
message = "version"
actual = version.run(fromcall, message, ack)
packet = self.fake_packet(
fromcall="KFART",
message="version",
msg_number=1,
)
actual = version.run(packet)
self.assertEqual(expected, actual)
message = "Version"
actual = version.run(fromcall, message, ack)
packet = self.fake_packet(
fromcall="KFART",
message="Version",
msg_number=1,
)
actual = version.run(packet)
self.assertEqual(expected, actual)