Removed references to old custom config

Also updated unittests to pass.
This commit is contained in:
Hemna 2022-12-27 14:30:03 -05:00
parent e13ca0061a
commit 7ccfc253cf
31 changed files with 436 additions and 930 deletions

View File

@ -6,6 +6,7 @@ import click
from oslo_config import cfg
import aprsd
from aprsd import conf # noqa: F401
from aprsd.logging import log
from aprsd.utils import trace

View File

@ -6,13 +6,15 @@
import logging
import click
from oslo_config import cfg
# local imports here
from aprsd import cli_helper, client, packets, plugin, stats, utils
from aprsd import cli_helper, client, conf, packets, plugin
from aprsd.aprsd import cli
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@ -68,23 +70,16 @@ def test_plugin(
message,
):
"""Test an individual APRSD plugin given a python path."""
config = ctx.obj["config"]
flat_config = utils.flatten_dict(config)
LOG.info("Using CONFIG values:")
for x in flat_config:
if "password" in x or "aprsd.web.users.admin" in x:
LOG.info(f"{x} = XXXXXXXXXXXXXXXXXXX")
else:
LOG.info(f"{x} = {flat_config[x]}")
CONF.log_opt_values(LOG, logging.DEBUG)
if not aprs_login:
if not config.exists("aprs.login"):
if CONF.aprs_network.login == conf.client.DEFAULT_LOGIN:
click.echo("Must set --aprs_login or APRS_LOGIN")
ctx.exit(-1)
return
else:
fromcall = config.get("aprs.login")
fromcall = CONF.aprs_network.login
else:
fromcall = aprs_login
@ -97,21 +92,17 @@ def test_plugin(
if type(message) is tuple:
message = " ".join(message)
if config["aprsd"].get("trace", False):
if CONF.trace_enabled:
trace.setup_tracing(["method", "api"])
client.Client(config)
stats.APRSDStats(config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
client.Client()
pm = plugin.PluginManager(config)
pm = plugin.PluginManager()
if load_all:
pm.setup_plugins()
else:
pm._init()
obj = pm._create_class(plugin_path, plugin.APRSDPluginBase, config=config)
obj = pm._create_class(plugin_path, plugin.APRSDPluginBase)
if not obj:
click.echo(ctx.get_help())
click.echo("")
@ -125,14 +116,13 @@ def test_plugin(
),
)
pm._pluggy_pm.register(obj)
login = config["aprs"]["login"]
packet = {
"from": fromcall, "addresse": login,
"message_text": message,
"format": "message",
"msgNo": 1,
}
packet = packets.MessagePacket(
from_call=fromcall,
to_call=CONF.callsign,
msgNo=1,
message_text=message,
)
LOG.info(f"P'{plugin_path}' F'{fromcall}' C'{message}'")
for x in range(number):

View File

@ -5,13 +5,16 @@ import time
import aprslib
from aprslib.exceptions import LoginError
import click
from oslo_config import cfg
import aprsd
from aprsd import cli_helper, client, packets
from aprsd import conf # noqa : F401
from aprsd.aprsd import cli
from aprsd.threads import tx
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
@ -62,24 +65,24 @@ def send_message(
):
"""Send a message to a callsign via APRS_IS."""
global got_ack, got_response
config = ctx.obj["config"]
quiet = ctx.obj["quiet"]
if not aprs_login:
if not config.exists("aprs.login"):
if CONF.aprs_network.login == conf.client.DEFAULT_LOGIN:
click.echo("Must set --aprs_login or APRS_LOGIN")
ctx.exit(-1)
return
else:
config["aprs"]["login"] = aprs_login
else:
aprs_login = CONF.aprs_network.login
if not aprs_password:
if not config.exists("aprs.password"):
LOG.warning(CONF.aprs_network.password)
if not CONF.aprs_network.password:
click.echo("Must set --aprs-password or APRS_PASSWORD")
ctx.exit(-1)
return
else:
config["aprs"]["password"] = aprs_password
else:
aprs_password = CONF.aprs_network.password
LOG.info(f"APRSD LISTEN Started version: {aprsd.__version__}")
if type(command) is tuple:
@ -90,9 +93,9 @@ def send_message(
else:
LOG.info(f"L'{aprs_login}' To'{tocallsign}' C'{command}'")
packets.PacketList(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
packets.PacketList()
packets.WatchList()
packets.SeenList()
got_ack = False
got_response = False
@ -109,7 +112,7 @@ def send_message(
else:
got_response = True
from_call = packet.from_call
our_call = config["aprsd"]["callsign"].lower()
our_call = CONF.callsign.lower()
tx.send(
packets.AckPacket(
from_call=our_call,
@ -127,7 +130,7 @@ def send_message(
sys.exit(0)
try:
client.ClientFactory.setup(config)
client.ClientFactory.setup()
client.factory.create().client
except LoginError:
sys.exit(-1)

View File

@ -15,23 +15,24 @@ from flask.logging import default_handler
import flask_classful
from flask_httpauth import HTTPBasicAuth
from flask_socketio import Namespace, SocketIO
from oslo_config import cfg
from user_agents import parse as ua_parse
from werkzeug.security import check_password_hash, generate_password_hash
import wrapt
import aprsd
from aprsd import cli_helper, client
from aprsd import config as aprsd_config
from aprsd import packets, stats, threads, utils
from aprsd import cli_helper, client, conf, packets, stats, threads, utils
from aprsd.aprsd import cli
from aprsd.logging import rich as aprsd_logging
from aprsd.threads import rx, tx
from aprsd.utils import objectstore, trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
auth = HTTPBasicAuth()
users = None
socketio = None
def signal_handler(sig, frame):
@ -128,16 +129,16 @@ class SentMessages(objectstore.ObjectStoreMixin):
def verify_password(username, password):
global users
if username in users and check_password_hash(users.get(username), password):
if username in users and check_password_hash(users[username], password):
return username
class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
"""Class that handles packets being sent to us."""
def __init__(self, config, packet_queue, socketio):
def __init__(self, packet_queue, socketio):
self.socketio = socketio
self.connected = False
super().__init__(config, packet_queue)
super().__init__(packet_queue)
def process_ack_packet(self, packet: packets.AckPacket):
super().process_ack_packet(packet)
@ -174,21 +175,16 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
class WebChatFlask(flask_classful.FlaskView):
config = None
def set_config(self, config):
def set_config(self):
global users
self.config = config
self.users = {}
for user in self.config["aprsd"]["web"]["users"]:
self.users[user] = generate_password_hash(
self.config["aprsd"]["web"]["users"][user],
)
user = CONF.admin.user
self.users[user] = generate_password_hash(CONF.admin.password)
users = self.users
def _get_transport(self, stats):
if self.config["aprs"].get("enabled", True):
if CONF.aprs_network.enabled:
transport = "aprs-is"
aprs_connection = (
"APRS-IS Server: <a href='http://status.aprs2.net' >"
@ -196,27 +192,22 @@ class WebChatFlask(flask_classful.FlaskView):
)
else:
# We might be connected to a KISS socket?
if client.KISSClient.is_enabled(self.config):
transport = client.KISSClient.transport(self.config)
if client.KISSClient.is_enabled():
transport = client.KISSClient.transport()
if transport == client.TRANSPORT_TCPKISS:
aprs_connection = (
"TCPKISS://{}:{}".format(
self.config["kiss"]["tcp"]["host"],
self.config["kiss"]["tcp"]["port"],
CONF.kiss_tcp.host,
CONF.kiss_tcp.port,
)
)
elif transport == client.TRANSPORT_SERIALKISS:
# for pep8 violation
kiss_default = aprsd_config.DEFAULT_DATE_FORMAT["kiss"]
default_baudrate = kiss_default["serial"]["baudrate"]
aprs_connection = (
"SerialKISS://{}@{} baud".format(
self.config["kiss"]["serial"]["device"],
self.config["kiss"]["serial"].get(
"baudrate",
default_baudrate,
),
)
CONF.kiss_serial.device,
CONF.kiss_serial.baudrate,
),
)
return transport, aprs_connection
@ -250,7 +241,7 @@ class WebChatFlask(flask_classful.FlaskView):
html_template,
initial_stats=stats,
aprs_connection=aprs_connection,
callsign=self.config["aprsd"]["callsign"],
callsign=CONF.callsign,
version=aprsd.__version__,
)
@ -287,14 +278,12 @@ class WebChatFlask(flask_classful.FlaskView):
class SendMessageNamespace(Namespace):
"""Class to handle the socketio interactions."""
_config = None
got_ack = False
reply_sent = False
msg = None
request = None
def __init__(self, namespace=None, config=None):
self._config = config
super().__init__(namespace)
def on_connect(self):
@ -312,7 +301,7 @@ class SendMessageNamespace(Namespace):
global socketio
LOG.debug(f"WS: on_send {data}")
self.request = data
data["from"] = self._config["aprs"]["login"]
data["from"] = CONF.callsign
pkt = packets.MessagePacket(
from_call=data["from"],
to_call=data["to"].upper(),
@ -338,7 +327,7 @@ class SendMessageNamespace(Namespace):
tx.send(
packets.GPSPacket(
from_call=self._config["aprs"]["login"],
from_call=CONF.callsign,
to_call="APDW16",
latitude=lat,
longitude=long,
@ -354,25 +343,16 @@ class SendMessageNamespace(Namespace):
LOG.debug(f"WS json {data}")
def setup_logging(config, flask_app, loglevel, quiet):
def setup_logging(flask_app, loglevel, quiet):
flask_log = logging.getLogger("werkzeug")
flask_app.logger.removeHandler(default_handler)
flask_log.removeHandler(default_handler)
log_level = aprsd_config.LOG_LEVELS[loglevel]
log_level = conf.log.LOG_LEVELS[loglevel]
flask_log.setLevel(log_level)
date_format = config["aprsd"].get(
"dateformat",
aprsd_config.DEFAULT_DATE_FORMAT,
)
date_format = CONF.logging.date_format
if not config["aprsd"]["web"].get("logging_enabled", False):
# disable web logging
flask_log.disabled = True
flask_app.logger.disabled = True
# return
if config["aprsd"].get("rich_logging", False) and not quiet:
if CONF.logging.rich_logging and not quiet:
log_format = "%(message)s"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
rh = aprsd_logging.APRSDRichHandler(
@ -382,13 +362,10 @@ def setup_logging(config, flask_app, loglevel, quiet):
rh.setFormatter(log_formatter)
flask_log.addHandler(rh)
log_file = config["aprsd"].get("logfile", None)
log_file = CONF.logging.logfile
if log_file:
log_format = config["aprsd"].get(
"logformat",
aprsd_config.DEFAULT_LOG_FORMAT,
)
log_format = CONF.loging.logformat
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
fh = RotatingFileHandler(
log_file, maxBytes=(10248576 * 5),
@ -399,7 +376,7 @@ def setup_logging(config, flask_app, loglevel, quiet):
@trace.trace
def init_flask(config, loglevel, quiet):
def init_flask(loglevel, quiet):
global socketio
flask_app = flask.Flask(
@ -408,9 +385,9 @@ def init_flask(config, loglevel, quiet):
static_folder="web/chat/static",
template_folder="web/chat/templates",
)
setup_logging(config, flask_app, loglevel, quiet)
setup_logging(flask_app, loglevel, quiet)
server = WebChatFlask()
server.set_config(config)
server.set_config()
flask_app.route("/", methods=["GET"])(server.index)
flask_app.route("/stats", methods=["GET"])(server.stats)
# flask_app.route("/send-message", methods=["GET"])(server.send_message)
@ -427,7 +404,7 @@ def init_flask(config, loglevel, quiet):
socketio.on_namespace(
SendMessageNamespace(
"/sendmsg", config=config,
"/sendmsg",
),
)
return socketio, flask_app
@ -457,17 +434,12 @@ def init_flask(config, loglevel, quiet):
@cli_helper.process_standard_options
def webchat(ctx, flush, port):
"""Web based HAM Radio chat program!"""
ctx.obj["config_file"]
loglevel = ctx.obj["loglevel"]
quiet = ctx.obj["quiet"]
config = ctx.obj["config"]
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if not quiet:
click.echo("Load config")
level, msg = utils._check_version()
if level:
LOG.warning(msg)
@ -475,19 +447,11 @@ def webchat(ctx, flush, port):
LOG.info(msg)
LOG.info(f"APRSD Started version: {aprsd.__version__}")
flat_config = utils.flatten_dict(config)
LOG.info("Using CONFIG values:")
for x in flat_config:
if "password" in x or "aprsd.web.users.admin" in x:
LOG.info(f"{x} = XXXXXXXXXXXXXXXXXXX")
else:
LOG.info(f"{x} = {flat_config[x]}")
stats.APRSDStats(config)
CONF.log_opt_values(LOG, logging.DEBUG)
# Initialize the client factory and create
# The correct client object ready for use
client.ClientFactory.setup(config)
client.ClientFactory.setup()
# Make sure we have 1 client transport enabled
if not client.factory.is_client_enabled():
LOG.error("No Clients are enabled in config.")
@ -497,32 +461,30 @@ def webchat(ctx, flush, port):
LOG.error("APRS client is not properly configured in config file.")
sys.exit(-1)
packets.PacketList(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
packets.PacketList()
packets.PacketTrack()
packets.WatchList()
packets.SeenList()
(socketio, app) = init_flask(config, loglevel, quiet)
(socketio, app) = init_flask(loglevel, quiet)
rx_thread = rx.APRSDPluginRXThread(
config=config,
packet_queue=threads.packet_queue,
)
rx_thread.start()
process_thread = WebChatProcessPacketThread(
config=config,
packet_queue=threads.packet_queue,
socketio=socketio,
)
process_thread.start()
keepalive = threads.KeepAliveThread(config=config)
keepalive = threads.KeepAliveThread()
LOG.info("Start KeepAliveThread")
keepalive.start()
LOG.info("Start socketio.run()")
socketio.run(
app,
ssl_context="adhoc",
host=config["aprsd"]["web"]["host"],
host=CONF.admin.web_ip,
port=port,
)

View File

@ -1,404 +0,0 @@
import collections
import logging
import os
from pathlib import Path
import sys
import click
import yaml
from aprsd import exception, utils
home = str(Path.home())
DEFAULT_CONFIG_DIR = f"{home}/.config/aprsd/"
DEFAULT_SAVE_FILE = f"{home}/.config/aprsd/aprsd.p"
DEFAULT_CONFIG_FILE = f"{home}/.config/aprsd/aprsd.yml"
LOG_LEVELS = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}
DEFAULT_DATE_FORMAT = "%m/%d/%Y %I:%M:%S %p"
DEFAULT_LOG_FORMAT = (
"[%(asctime)s] [%(threadName)-20.20s] [%(levelname)-5.5s]"
" %(message)s - [%(pathname)s:%(lineno)d]"
)
QUEUE_DATE_FORMAT = "[%m/%d/%Y] [%I:%M:%S %p]"
QUEUE_LOG_FORMAT = (
"%(asctime)s [%(threadName)-20.20s] [%(levelname)-5.5s]"
" %(message)s - [%(pathname)s:%(lineno)d]"
)
CORE_MESSAGE_PLUGINS = [
"aprsd.plugins.email.EmailPlugin",
"aprsd.plugins.fortune.FortunePlugin",
"aprsd.plugins.location.LocationPlugin",
"aprsd.plugins.ping.PingPlugin",
"aprsd.plugins.query.QueryPlugin",
"aprsd.plugins.time.TimePlugin",
"aprsd.plugins.weather.USWeatherPlugin",
"aprsd.plugins.version.VersionPlugin",
]
CORE_NOTIFY_PLUGINS = [
"aprsd.plugins.notify.NotifySeenPlugin",
]
ALL_PLUGINS = []
for i in CORE_MESSAGE_PLUGINS:
ALL_PLUGINS.append(i)
for i in CORE_NOTIFY_PLUGINS:
ALL_PLUGINS.append(i)
# an example of what should be in the ~/.aprsd/config.yml
DEFAULT_CONFIG_DICT = {
"ham": {"callsign": "NOCALL"},
"aprs": {
"enabled": True,
# Only used as the login for aprsis.
"login": "CALLSIGN",
"password": "00000",
"host": "rotate.aprs2.net",
"port": 14580,
},
"kiss": {
"tcp": {
"enabled": False,
"host": "direwolf.ip.address",
"port": "8001",
},
"serial": {
"enabled": False,
"device": "/dev/ttyS0",
"baudrate": 9600,
},
},
"aprsd": {
# Callsign to use for all packets to/from aprsd instance
# regardless of the client (aprsis vs kiss)
"callsign": "NOCALL",
"logfile": "/tmp/aprsd.log",
"logformat": DEFAULT_LOG_FORMAT,
"dateformat": DEFAULT_DATE_FORMAT,
"save_location": DEFAULT_CONFIG_DIR,
"rich_logging": True,
"trace": False,
"enabled_plugins": ALL_PLUGINS,
"units": "imperial",
"watch_list": {
"enabled": False,
# Who gets the alert?
"alert_callsign": "NOCALL",
# 43200 is 12 hours
"alert_time_seconds": 43200,
# How many packets to save in a ring Buffer
# for a particular callsign
"packet_keep_count": 10,
"callsigns": [],
},
"web": {
"enabled": True,
"logging_enabled": True,
"host": "0.0.0.0",
"port": 8001,
"users": {
"admin": "password-here",
},
},
"email": {
"enabled": True,
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "SMTP_USERNAME",
"password": "SMTP_PASSWORD",
"host": "smtp.gmail.com",
"port": 465,
"use_ssl": False,
"debug": False,
},
"imap": {
"login": "IMAP_USERNAME",
"password": "IMAP_PASSWORD",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
"debug": False,
},
},
},
"services": {
"aprs.fi": {"apiKey": "APIKEYVALUE"},
"openweathermap": {"apiKey": "APIKEYVALUE"},
"opencagedata": {"apiKey": "APIKEYVALUE"},
"avwx": {"base_url": "http://host:port", "apiKey": "APIKEYVALUE"},
},
}
class Config(collections.UserDict):
def _get(self, d, keys, default=None):
"""
Example:
d = {'meta': {'status': 'OK', 'status_code': 200}}
_get(d, ['meta', 'status_code']) # => 200
_get(d, ['garbage', 'status_code']) # => None
_get(d, ['meta', 'garbage'], default='-') # => '-'
"""
if type(keys) is str and "." in keys:
keys = keys.split(".")
assert type(keys) is list
if d is None:
return default
if not keys:
return d
if type(d) is str:
return default
return self._get(d.get(keys[0]), keys[1:], default)
def get(self, path, default=None):
return self._get(self.data, path, default=default)
def exists(self, path):
"""See if a conf value exists."""
test = "-3.14TEST41.3-"
return self.get(path, default=test) != test
def check_option(self, path, default_fail=None):
"""Make sure the config option doesn't have default value."""
if not self.exists(path):
if type(path) is list:
path = ".".join(path)
raise exception.MissingConfigOptionException(path)
val = self.get(path)
if val == default_fail:
# We have to fail and bail if the user hasn't edited
# this config option.
raise exception.ConfigOptionBogusDefaultException(
path, default_fail,
)
def add_config_comments(raw_yaml):
end_idx = utils.end_substr(raw_yaml, "ham:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # Callsign that owns this instance of APRSD.",
end_idx,
)
end_idx = utils.end_substr(raw_yaml, "aprsd:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # Callsign to use for all APRSD Packets as the to/from."
"\n # regardless of client type (aprsis vs tcpkiss vs serial)",
end_idx,
)
end_idx = utils.end_substr(raw_yaml, "aprs:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # Set enabled to False if there is no internet connectivity."
"\n # This is useful for a direwolf KISS aprs connection only. "
"\n"
"\n # Get the passcode for your callsign here: "
"\n # https://apps.magicbug.co.uk/passcode",
end_idx,
)
end_idx = utils.end_substr(raw_yaml, "aprs.fi:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # Get the apiKey from your aprs.fi account here: "
"\n # http://aprs.fi/account",
end_idx,
)
end_idx = utils.end_substr(raw_yaml, "opencagedata:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # (Optional for TimeOpenCageDataPlugin) "
"\n # Get the apiKey from your opencagedata account here: "
"\n # https://opencagedata.com/dashboard#api-keys",
end_idx,
)
end_idx = utils.end_substr(raw_yaml, "openweathermap:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # (Optional for OWMWeatherPlugin) "
"\n # Get the apiKey from your "
"\n # openweathermap account here: "
"\n # https://home.openweathermap.org/api_keys",
end_idx,
)
end_idx = utils.end_substr(raw_yaml, "avwx:")
if end_idx != -1:
# lets insert a comment
raw_yaml = utils.insert_str(
raw_yaml,
"\n # (Optional for AVWXWeatherPlugin) "
"\n # Use hosted avwx-api here: https://avwx.rest "
"\n # or deploy your own from here: "
"\n # https://github.com/avwx-rest/avwx-api",
end_idx,
)
return raw_yaml
def dump_default_cfg():
return add_config_comments(
yaml.dump(
DEFAULT_CONFIG_DICT,
indent=4,
),
)
def create_default_config():
"""Create a default config file."""
# make sure the directory location exists
config_file_expanded = os.path.expanduser(DEFAULT_CONFIG_FILE)
config_dir = os.path.dirname(config_file_expanded)
if not os.path.exists(config_dir):
click.echo(f"Config dir '{config_dir}' doesn't exist, creating.")
utils.mkdir_p(config_dir)
with open(config_file_expanded, "w+") as cf:
cf.write(dump_default_cfg())
def get_config(config_file):
"""This tries to read the yaml config from <config_file>."""
config_file_expanded = os.path.expanduser(config_file)
if os.path.exists(config_file_expanded):
with open(config_file_expanded) as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
return Config(config)
else:
if config_file == DEFAULT_CONFIG_FILE:
click.echo(
f"{config_file_expanded} is missing, creating config file",
)
create_default_config()
msg = (
"Default config file created at {}. Please edit with your "
"settings.".format(config_file)
)
click.echo(msg)
else:
# The user provided a config file path different from the
# Default, so we won't try and create it, just bitch and bail.
msg = f"Custom config file '{config_file}' is missing."
click.echo(msg)
sys.exit(-1)
# This method tries to parse the config yaml file
# and consume the settings.
# If the required params don't exist,
# it will look in the environment
def parse_config(config_file):
config = get_config(config_file)
def fail(msg):
click.echo(msg)
sys.exit(-1)
def check_option(config, path, default_fail=None):
try:
config.check_option(path, default_fail=default_fail)
except Exception as ex:
fail(repr(ex))
else:
return config
# special check here to make sure user has edited the config file
# and changed the ham callsign
check_option(
config,
"ham.callsign",
default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"],
)
check_option(
config,
["aprsd"],
)
check_option(
config,
"aprsd.callsign",
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["callsign"],
)
# Ensure they change the admin password
if config.get("aprsd.web.enabled") is True:
check_option(
config,
["aprsd", "web", "users", "admin"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["web"]["users"]["admin"],
)
if config.get("aprsd.watch_list.enabled") is True:
check_option(
config,
["aprsd", "watch_list", "alert_callsign"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["watch_list"]["alert_callsign"],
)
if config.get("aprsd.email.enabled") is True:
# Check IMAP server settings
check_option(config, ["aprsd", "email", "imap", "host"])
check_option(config, ["aprsd", "email", "imap", "port"])
check_option(
config,
["aprsd", "email", "imap", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["imap"]["login"],
)
check_option(
config,
["aprsd", "email", "imap", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["imap"]["password"],
)
# Check SMTP server settings
check_option(config, ["aprsd", "email", "smtp", "host"])
check_option(config, ["aprsd", "email", "smtp", "port"])
check_option(
config,
["aprsd", "email", "smtp", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["smtp"]["login"],
)
check_option(
config,
["aprsd", "email", "smtp", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["smtp"]["password"],
)
return config

View File

@ -13,19 +13,19 @@ from flask.logging import default_handler
import flask_classful
from flask_httpauth import HTTPBasicAuth
from flask_socketio import Namespace, SocketIO
from oslo_config import cfg
from werkzeug.security import check_password_hash, generate_password_hash
import wrapt
import aprsd
from aprsd import client
from aprsd import config as aprsd_config
from aprsd import packets, plugin, stats, threads, utils
from aprsd import client, conf, packets, plugin, stats, threads, utils
from aprsd.clients import aprsis
from aprsd.logging import log
from aprsd.logging import rich as aprsd_logging
from aprsd.threads import tx
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
auth = HTTPBasicAuth()
@ -117,8 +117,7 @@ class SendMessageThread(threads.APRSDRXThread):
got_ack = False
got_reply = False
def __init__(self, config, info, packet, namespace):
self.config = config
def __init__(self, info, packet, namespace):
self.request = info
self.packet = packet
self.namespace = namespace
@ -133,8 +132,8 @@ class SendMessageThread(threads.APRSDRXThread):
def setup_connection(self):
user = self.request["from"]
password = self.request["password"]
host = self.config["aprs"].get("host", "rotate.aprs.net")
port = self.config["aprs"].get("port", 14580)
host = CONF.aprs_network.host
port = CONF.aprs_network.port
connected = False
backoff = 1
while not connected:
@ -281,17 +280,12 @@ class SendMessageThread(threads.APRSDRXThread):
class APRSDFlask(flask_classful.FlaskView):
config = None
def set_config(self, config):
def set_config(self):
global users
self.config = config
self.users = {}
for user in self.config["aprsd"]["web"]["users"]:
self.users[user] = generate_password_hash(
self.config["aprsd"]["web"]["users"][user],
)
user = CONF.admin.user
self.users[user] = generate_password_hash(CONF.admin.password)
users = self.users
@auth.login_required
@ -299,7 +293,7 @@ class APRSDFlask(flask_classful.FlaskView):
stats = self._stats()
LOG.debug(
"watch list? {}".format(
self.config["aprsd"]["watch_list"],
CONF.watch_list.callsigns,
),
)
wl = packets.WatchList()
@ -317,7 +311,7 @@ class APRSDFlask(flask_classful.FlaskView):
plugins = pm.get_plugins()
plugin_count = len(plugins)
if self.config["aprs"].get("enabled", True):
if CONF.aprs_network.enabled:
transport = "aprs-is"
aprs_connection = (
"APRS-IS Server: <a href='http://status.aprs2.net' >"
@ -325,33 +319,34 @@ class APRSDFlask(flask_classful.FlaskView):
)
else:
# We might be connected to a KISS socket?
if client.KISSClient.kiss_enabled(self.config):
transport = client.KISSClient.transport(self.config)
if client.KISSClient.kiss_enabled():
transport = client.KISSClient.transport()
if transport == client.TRANSPORT_TCPKISS:
aprs_connection = (
"TCPKISS://{}:{}".format(
self.config["kiss"]["tcp"]["host"],
self.config["kiss"]["tcp"]["port"],
CONF.kiss_tcp.host,
CONF.kiss_tcp.port,
)
)
elif transport == client.TRANSPORT_SERIALKISS:
aprs_connection = (
"SerialKISS://{}@{} baud".format(
self.config["kiss"]["serial"]["device"],
self.config["kiss"]["serial"]["baudrate"],
CONF.kiss_serial.device,
CONF.kiss_serial.baudrate,
)
)
stats["transport"] = transport
stats["aprs_connection"] = aprs_connection
entries = conf.conf_to_dict()
return flask.render_template(
"index.html",
initial_stats=stats,
aprs_connection=aprs_connection,
callsign=self.config["aprs"]["login"],
callsign=CONF.callsign,
version=aprsd.__version__,
config_json=json.dumps(self.config.data),
config_json=json.dumps(entries),
watch_count=watch_count,
watch_age=watch_age,
seen_count=seen_count,
@ -381,7 +376,7 @@ class APRSDFlask(flask_classful.FlaskView):
if request.method == "GET":
return flask.render_template(
"send-message.html",
callsign=self.config["aprs"]["login"],
callsign=CONF.callsign,
version=aprsd.__version__,
)
@ -392,7 +387,6 @@ class APRSDFlask(flask_classful.FlaskView):
for pkt in packet_list:
tmp_list.append(pkt.json)
LOG.info(f"PACKETS {tmp_list}")
return json.dumps(tmp_list)
@auth.login_required
@ -453,14 +447,12 @@ class APRSDFlask(flask_classful.FlaskView):
class SendMessageNamespace(Namespace):
_config = None
got_ack = False
reply_sent = False
packet = None
request = None
def __init__(self, namespace=None, config=None):
self._config = config
def __init__(self, namespace=None):
super().__init__(namespace)
def on_connect(self):
@ -492,13 +484,13 @@ class SendMessageNamespace(Namespace):
)
socketio.start_background_task(
self._start, self._config, data,
self._start, data,
self.packet, self,
)
LOG.warning("WS: on_send: exit")
def _start(self, config, data, packet, namespace):
msg_thread = SendMessageThread(self._config, data, packet, self)
def _start(self, data, packet, namespace):
msg_thread = SendMessageThread(data, packet, self)
msg_thread.start()
def handle_message(self, data):
@ -566,25 +558,18 @@ class LoggingNamespace(Namespace):
self.log_thread.stop()
def setup_logging(config, flask_app, loglevel, quiet):
def setup_logging(flask_app, loglevel, quiet):
flask_log = logging.getLogger("werkzeug")
flask_app.logger.removeHandler(default_handler)
flask_log.removeHandler(default_handler)
log_level = aprsd_config.LOG_LEVELS[loglevel]
log_level = conf.log.LOG_LEVELS[loglevel]
flask_log.setLevel(log_level)
date_format = config["aprsd"].get(
"dateformat",
aprsd_config.DEFAULT_DATE_FORMAT,
)
date_format = CONF.logging.date_format
flask_log.disabled = True
flask_app.logger.disabled = True
if not config["aprsd"]["web"].get("logging_enabled", False):
# disable web logging
flask_log.disabled = True
flask_app.logger.disabled = True
return
if config["aprsd"].get("rich_logging", False) and not quiet:
if CONF.logging.rich_logging:
log_format = "%(message)s"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
rh = aprsd_logging.APRSDRichHandler(
@ -594,13 +579,10 @@ def setup_logging(config, flask_app, loglevel, quiet):
rh.setFormatter(log_formatter)
flask_log.addHandler(rh)
log_file = config["aprsd"].get("logfile", None)
log_file = CONF.logging.logfile
if log_file:
log_format = config["aprsd"].get(
"logformat",
aprsd_config.DEFAULT_LOG_FORMAT,
)
log_format = CONF.logging.logformat
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
fh = RotatingFileHandler(
log_file, maxBytes=(10248576 * 5),
@ -610,7 +592,7 @@ def setup_logging(config, flask_app, loglevel, quiet):
flask_log.addHandler(fh)
def init_flask(config, loglevel, quiet):
def init_flask(loglevel, quiet):
global socketio
flask_app = flask.Flask(
@ -619,9 +601,9 @@ def init_flask(config, loglevel, quiet):
static_folder="web/admin/static",
template_folder="web/admin/templates",
)
setup_logging(config, flask_app, loglevel, quiet)
setup_logging(flask_app, loglevel, quiet)
server = APRSDFlask()
server.set_config(config)
server.set_config()
flask_app.route("/", methods=["GET"])(server.index)
flask_app.route("/stats", methods=["GET"])(server.stats)
flask_app.route("/messages", methods=["GET"])(server.messages)
@ -638,6 +620,6 @@ def init_flask(config, loglevel, quiet):
# import eventlet
# eventlet.monkey_patch()
socketio.on_namespace(SendMessageNamespace("/sendmsg", config=config))
socketio.on_namespace(SendMessageNamespace("/sendmsg"))
socketio.on_namespace(LoggingNamespace("/logs"))
return socketio, flask_app

View File

@ -6,7 +6,7 @@ import sys
from oslo_config import cfg
from aprsd import config as aprsd_config
from aprsd import conf
from aprsd.logging import rich as aprsd_logging
@ -19,9 +19,9 @@ logging_queue = queue.Queue()
# to disable logging to stdout, but still log to file
# use the --quiet option on the cmdln
def setup_logging(loglevel, quiet):
log_level = aprsd_config.LOG_LEVELS[loglevel]
log_level = conf.log.LOG_LEVELS[loglevel]
LOG.setLevel(log_level)
date_format = CONF.logging.get("date_format", aprsd_config.DEFAULT_DATE_FORMAT)
date_format = CONF.logging.date_format
rh = None
fh = None
@ -51,16 +51,15 @@ def setup_logging(loglevel, quiet):
imap_logger = logging.getLogger("imapclient.imaplib")
imap_logger.setLevel(log_level)
if rh:
imap_logger.addHandler(rh)
imap_logger.addHandler(rh)
if fh:
imap_logger.addHandler(fh)
if CONF.admin.get("web_enabled", default=False):
if CONF.admin.web_enabled:
qh = logging.handlers.QueueHandler(logging_queue)
q_log_formatter = logging.Formatter(
fmt=aprsd_config.QUEUE_LOG_FORMAT,
datefmt=aprsd_config.QUEUE_DATE_FORMAT,
fmt=CONF.logging.logformat,
datefmt=CONF.logging.date_format,
)
qh.setFormatter(q_log_formatter)
LOG.addHandler(qh)
@ -74,10 +73,10 @@ def setup_logging(loglevel, quiet):
def setup_logging_no_config(loglevel, quiet):
log_level = aprsd_config.LOG_LEVELS[loglevel]
log_level = conf.log.LOG_LEVELS[loglevel]
LOG.setLevel(log_level)
log_format = aprsd_config.DEFAULT_LOG_FORMAT
date_format = aprsd_config.DEFAULT_DATE_FORMAT
log_format = CONF.logging.logformat
date_format = CONF.logging.date_format
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
fh = NullHandler()

View File

@ -84,6 +84,9 @@ class Packet(metaclass=abc.ABCMeta):
else:
return default
def update_timestamp(self):
self.timestamp = _int_timestamp()
def prepare(self):
"""Do stuff here that is needed prior to sending over the air."""
# now build the raw message for sending

View File

@ -30,9 +30,6 @@ class WatchList(objectstore.ObjectStoreMixin):
def __init__(self, config=None):
ring_size = CONF.watch_list.packet_keep_count
if not self.is_enabled():
LOG.info("Watch List is disabled.")
if CONF.watch_list.callsigns:
for callsign in CONF.watch_list.callsigns:
call = callsign.replace("*", "")

View File

@ -59,8 +59,7 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
# Set this in setup()
enabled = False
def __init__(self, config):
self.config = config
def __init__(self):
self.message_counter = 0
self.setup()
self.threads = self.create_threads() or []
@ -142,15 +141,10 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
def setup(self):
# if we have a watch list enabled, we need to add filtering
# to enable seeing packets from the watch list.
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][
"watch_list"
].get("enabled", False):
if CONF.watch_list.enabled:
# watch list is enabled
self.enabled = True
watch_list = self.config["aprsd"]["watch_list"].get(
"callsigns",
[],
)
watch_list = CONF.watch_list.callsigns
# make sure the timeout is set or this doesn't work
if watch_list:
aprs_client = client.factory.create().client
@ -214,39 +208,39 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
@hookimpl
def filter(self, packet: packets.core.MessagePacket):
if not self.enabled:
LOG.info(f"{self.__class__.__name__} is not enabled.")
return None
result = f"{self.__class__.__name__} isn't enabled"
LOG.warning(result)
return result
if not isinstance(packet, packets.core.MessagePacket):
LOG.warning(f"Got a {packet.__class__.__name__} ignoring")
return packets.NULL_MESSAGE
result = None
message = packet.get("message_text", None)
msg_format = packet.get("format", None)
tocall = packet.get("addresse", None)
message = packet.message_text
tocall = packet.to_call
# Only process messages destined for us
# and is an APRS message format and has a message.
if (
tocall == CONF.callsign
and msg_format == "message"
and isinstance(packet, packets.core.MessagePacket)
and message
):
if re.search(self.command_regex, message):
self.rx_inc()
if self.enabled:
try:
result = self.process(packet)
except Exception as ex:
LOG.error(
"Plugin {} failed to process packet {}".format(
self.__class__, ex,
),
)
LOG.exception(ex)
if result:
self.tx_inc()
else:
result = f"{self.__class__.__name__} isn't enabled"
LOG.warning(result)
try:
result = self.process(packet)
except Exception as ex:
LOG.error(
"Plugin {} failed to process packet {}".format(
self.__class__, ex,
),
)
LOG.exception(ex)
if result:
self.tx_inc()
return result
@ -376,12 +370,17 @@ class PluginManager:
:param kwargs: parameters to pass
:return:
"""
module_name, class_name = module_class_string.rsplit(".", 1)
module_name = None
class_name = None
try:
module_name, class_name = module_class_string.rsplit(".", 1)
module = importlib.import_module(module_name)
module = importlib.reload(module)
except Exception as ex:
LOG.error(f"Failed to load Plugin '{module_name}' : '{ex}'")
if not module_name:
LOG.error(f"Failed to load Plugin {module_class_string}")
else:
LOG.error(f"Failed to load Plugin '{module_name}' : '{ex}'")
return
assert hasattr(module, class_name), "class {} is not in {}".format(
@ -411,7 +410,6 @@ class PluginManager:
plugin_obj = self._create_class(
plugin_name,
APRSDPluginBase,
config=self.config,
)
if plugin_obj:
if isinstance(plugin_obj, APRSDWatchListPluginBase):
@ -452,7 +450,7 @@ class PluginManager:
LOG.info("Loading APRSD Plugins")
self._init()
# Help plugin is always enabled.
_help = HelpPlugin(self.config)
_help = HelpPlugin()
self._pluggy_pm.register(_help)
enabled_plugins = CONF.enabled_plugins

View File

@ -25,6 +25,7 @@ def get_aprs_fi(api_key, callsign):
def get_weather_gov_for_gps(lat, lon):
# FIXME(hemna) This is currently BROKEN
LOG.debug(f"Fetch station at {lat}, {lon}")
headers = requests.utils.default_headers()
headers.update(
@ -32,8 +33,8 @@ def get_weather_gov_for_gps(lat, lon):
)
try:
url2 = (
#"https://forecast.weather.gov/MapClick.php?lat=%s"
#"&lon=%s&FcstType=json" % (lat, lon)
# "https://forecast.weather.gov/MapClick.php?lat=%s"
# "&lon=%s&FcstType=json" % (lat, lon)
f"https://api.weather.gov/points/{lat},{lon}"
)
LOG.debug(f"Fetching weather '{url2}'")

View File

@ -272,7 +272,6 @@ def _build_shortcuts_dict():
else:
shortcuts_dict = {}
LOG.info(f"Shortcuts Dict {shortcuts_dict}")
return shortcuts_dict

View File

@ -27,7 +27,6 @@ class LocationPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
LOG.info("Location Plugin")
fromcall = packet.from_call
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
api_key = CONF.aprs_fi.apiKey

View File

@ -33,7 +33,6 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
fromcall = packet.from_call
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
pkt_tracker = tracker.PacketTrack()
now = datetime.datetime.now()

View File

@ -2,7 +2,6 @@ import logging
import aprsd
from aprsd import plugin, stats
from aprsd.utils import trace
LOG = logging.getLogger("APRSD")
@ -19,7 +18,6 @@ class VersionPlugin(plugin.APRSDRegexCommandPluginBase):
# five mins {int:int}
email_sent_dict = {}
@trace.trace
def process(self, packet):
LOG.info("Version COMMAND")
# fromcall = packet.get("from")
@ -27,6 +25,7 @@ class VersionPlugin(plugin.APRSDRegexCommandPluginBase):
# ack = packet.get("msgNo", "0")
stats_obj = stats.APRSDStats()
s = stats_obj.stats()
print(s)
return "APRSD ver:{} uptime:{}".format(
aprsd.__version__,
s["aprsd"]["uptime"],

View File

@ -62,21 +62,17 @@ class USWeatherPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin)
LOG.info(f"WX data {wx_data}")
if wx_data["success"] == False:
# Failed to fetch the weather
reply = "Failed to fetch weather for location"
else:
reply = (
"%sF(%sF/%sF) %s. %s, %s."
% (
wx_data["currentobservation"]["Temp"],
wx_data["data"]["temperature"][0],
wx_data["data"]["temperature"][1],
wx_data["data"]["weather"][0],
wx_data["time"]["startPeriodName"][1],
wx_data["data"]["weather"][1],
)
).rstrip()
reply = (
"%sF(%sF/%sF) %s. %s, %s."
% (
wx_data["currentobservation"]["Temp"],
wx_data["data"]["temperature"][0],
wx_data["data"]["temperature"][1],
wx_data["data"]["weather"][0],
wx_data["time"]["startPeriodName"][1],
wx_data["data"]["weather"][1],
)
).rstrip()
LOG.debug(f"reply: '{reply}' ")
return reply
@ -105,6 +101,7 @@ class USMetarPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
@trace.trace
def process(self, packet):
print("FISTY")
fromcall = packet.get("from")
message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")

View File

@ -4,7 +4,7 @@ import time
from aprsd import client
from aprsd import threads as aprsd_threads
from aprsd.packets import core, packet_list, tracker
from aprsd.packets import core, tracker
LOG = logging.getLogger("APRSD")
@ -27,9 +27,9 @@ def send(packet: core.Packet, direct=False, aprs_client=None):
else:
cl = client.factory.create()
packet.update_timestamp()
packet.log(header="TX")
cl.send(packet)
packet_list.PacketList().tx(packet)
class SendPacketThread(aprsd_threads.APRSDThread):
@ -94,8 +94,8 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if send_now:
# no attempt time, so lets send it, and start
# tracking the time.
send(packet, direct=True)
packet.last_send_time = datetime.datetime.now()
send(packet, direct=True)
packet.send_count += 1
time.sleep(1)

View File

@ -107,18 +107,24 @@ function update_packets( data ) {
if (size_dict(packet_list) == 0 && size_dict(data) > 0) {
packetsdiv.html('')
}
console.log("PACKET_LIST")
console.log(packet_list);
jQuery.each(data, function(i, val) {
pkt = JSON.parse(val);
console.log("PACKET");
console.log(pkt);
console.log(pkt.timestamp);
update_watchlist_from_packet(pkt['from_call'], pkt);
if ( packet_list.hasOwnProperty(val["timestamp"]) == false ) {
if ( packet_list.hasOwnProperty(pkt.timestamp) == false ) {
// Store the packet
packet_list[pkt["timestamp"]] = pkt;
packet_list[pkt.timestamp] = pkt;
//ts_str = val["timestamp"].toString();
//ts = ts_str.split(".")[0]*1000;
ts = pkt["timestamp"]
ts = pkt.timestamp
var d = new Date(ts).toLocaleDateString("en-US");
var t = new Date(ts).toLocaleTimeString("en-US");
var from_call = pkt['from_call'];
var from_call = pkt.from_call;
if (from_call == our_callsign) {
title_id = 'title_tx';
} else {

View File

@ -3,62 +3,39 @@ import unittest
from unittest import mock
from click.testing import CliRunner
from oslo_config import cfg
from aprsd import config as aprsd_config
from aprsd import conf # noqa: F401
from aprsd.aprsd import cli
from aprsd.cmds import dev # noqa
from .. import fake
CONF = cfg.CONF
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
class TestDevTestPluginCommand(unittest.TestCase):
def _build_config(self, login=None, password=None):
config = {
"aprs": {},
"aprsd": {
"trace": False,
"watch_list": {},
},
}
def config_and_init(self, login=None, password=None):
CONF.callsign = fake.FAKE_TO_CALLSIGN
CONF.trace_enabled = False
CONF.watch_list.packet_keep_count = 1
if login:
config["aprs"]["login"] = login
CONF.aprs_network.login = login
if password:
config["aprs"]["password"] = password
CONF.aprs_network.password = password
return aprsd_config.Config(config)
CONF.admin.user = "admin"
CONF.admin.password = "password"
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_login(self, mock_logging, mock_parse_config):
def test_no_plugin_arg(self, mock_logging):
"""Make sure we get an error if there is no login and config."""
runner = CliRunner()
mock_parse_config.return_value = self._build_config()
result = runner.invoke(
cli, [
"dev", "test-plugin",
"-p", "aprsd.plugins.version.VersionPlugin",
"bogus command",
],
catch_exceptions=False,
)
# rich.print(f"EXIT CODE {result.exit_code}")
# rich.print(f"Exception {result.exception}")
# rich.print(f"OUTPUT {result.output}")
assert result.exit_code == -1
assert "Must set --aprs_login or APRS_LOGIN" in result.output
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_plugin_arg(self, mock_logging, mock_parse_config):
"""Make sure we get an error if there is no login and config."""
runner = CliRunner()
mock_parse_config.return_value = self._build_config(login="something")
self.config_and_init(login="something")
result = runner.invoke(
cli, ["dev", "test-plugin", "bogus command"],

View File

@ -3,78 +3,42 @@ import unittest
from unittest import mock
from click.testing import CliRunner
from oslo_config import cfg
from aprsd import config as aprsd_config
from aprsd import conf # noqa : F401
from aprsd.aprsd import cli
from aprsd.cmds import send_message # noqa
from .. import fake
CONF = cfg.CONF
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
class TestSendMessageCommand(unittest.TestCase):
def _build_config(self, login=None, password=None):
config = {
"aprs": {},
"aprsd": {
"trace": False,
"watch_list": {},
},
}
def config_and_init(self, login=None, password=None):
CONF.callsign = fake.FAKE_TO_CALLSIGN
CONF.trace_enabled = False
CONF.watch_list.packet_keep_count = 1
if login:
config["aprs"]["login"] = login
CONF.aprs_network.login = login
if password:
config["aprs"]["password"] = password
CONF.aprs_network.password = password
return aprsd_config.Config(config)
CONF.admin.user = "admin"
CONF.admin.password = "password"
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_login(self, mock_logging, mock_parse_config):
"""Make sure we get an error if there is no login and config."""
return
runner = CliRunner()
mock_parse_config.return_value = self._build_config()
result = runner.invoke(
cli, ["send-message", "WB4BOR", "wx"],
catch_exceptions=False,
)
# rich.print(f"EXIT CODE {result.exit_code}")
# rich.print(f"Exception {result.exception}")
# rich.print(f"OUTPUT {result.output}")
assert result.exit_code == -1
assert "Must set --aprs_login or APRS_LOGIN" in result.output
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_password(self, mock_logging, mock_parse_config):
"""Make sure we get an error if there is no password and config."""
return
runner = CliRunner()
mock_parse_config.return_value = self._build_config(login="something")
result = runner.invoke(
cli, ["send-message", "WB4BOR", "wx"],
catch_exceptions=False,
)
assert result.exit_code == -1
assert "Must set --aprs-password or APRS_PASSWORD" in result.output
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_tocallsign(self, mock_logging, mock_parse_config):
def test_no_tocallsign(self, mock_logging):
"""Make sure we get an error if there is no tocallsign."""
runner = CliRunner()
mock_parse_config.return_value = self._build_config(
self.config_and_init(
login="something",
password="another",
)
runner = CliRunner()
result = runner.invoke(
cli, ["send-message"],
@ -83,16 +47,15 @@ class TestSendMessageCommand(unittest.TestCase):
assert result.exit_code == 2
assert "Error: Missing argument 'TOCALLSIGN'" in result.output
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_no_command(self, mock_logging, mock_parse_config):
def test_no_command(self, mock_logging):
"""Make sure we get an error if there is no command."""
runner = CliRunner()
mock_parse_config.return_value = self._build_config(
self.config_and_init(
login="something",
password="another",
)
runner = CliRunner()
result = runner.invoke(
cli, ["send-message", "WB4BOR"],

View File

@ -5,112 +5,81 @@ from unittest import mock
from click.testing import CliRunner
import flask
import flask_socketio
from oslo_config import cfg
from aprsd import config as aprsd_config
from aprsd import packets
from aprsd import conf # noqa: F401
from aprsd.cmds import webchat # noqa
from aprsd.packets import core
from .. import fake
CONF = cfg.CONF
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
class TestSendMessageCommand(unittest.TestCase):
def _build_config(self, login=None, password=None):
config = {
"aprs": {},
"aprsd": {
"trace": False,
"web": {
"users": {"admin": "password"},
},
"watch_list": {"packet_keep_count": 1},
},
}
def config_and_init(self, login=None, password=None):
CONF.callsign = fake.FAKE_TO_CALLSIGN
CONF.trace_enabled = False
CONF.watch_list.packet_keep_count = 1
if login:
config["aprs"]["login"] = login
CONF.aprs_network.login = login
if password:
config["aprs"]["password"] = password
CONF.aprs_network.password = password
return aprsd_config.Config(config)
CONF.admin.user = "admin"
CONF.admin.password = "password"
@mock.patch("aprsd.config.parse_config")
def test_missing_config(self, mock_parse_config):
CliRunner()
cfg = self._build_config()
del cfg["aprsd"]["web"]["users"]
mock_parse_config.return_value = cfg
server = webchat.WebChatFlask()
self.assertRaises(
KeyError,
server.set_config, cfg,
)
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.logging.log.setup_logging")
def test_init_flask(self, mock_logging, mock_parse_config):
def test_init_flask(self, mock_logging):
"""Make sure we get an error if there is no login and config."""
CliRunner()
cfg = self._build_config()
mock_parse_config.return_value = cfg
self.config_and_init()
socketio, flask_app = webchat.init_flask(cfg, "DEBUG", False)
socketio, flask_app = webchat.init_flask("DEBUG", False)
self.assertIsInstance(socketio, flask_socketio.SocketIO)
self.assertIsInstance(flask_app, flask.Flask)
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.packets.tracker.PacketTrack.remove")
@mock.patch("aprsd.cmds.webchat.socketio.emit")
@mock.patch("aprsd.cmds.webchat.socketio")
def test_process_ack_packet(
self, mock_parse_config,
mock_remove, mock_emit,
self,
mock_remove, mock_socketio,
):
config = self._build_config()
mock_parse_config.return_value = config
self.config_and_init()
mock_socketio.emit = mock.MagicMock()
packet = fake.fake_packet(
message="blah",
msg_number=1,
message_format=core.PACKET_TYPE_ACK,
)
socketio = mock.MagicMock()
packets.PacketList(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
wcp = webchat.WebChatProcessPacketThread(config, packet, socketio)
wcp = webchat.WebChatProcessPacketThread(packet, socketio)
wcp.process_ack_packet(packet)
mock_remove.called_once()
mock_emit.called_once()
mock_socketio.called_once()
@mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.packets.PacketList.rx")
@mock.patch("aprsd.cmds.webchat.socketio.emit")
@mock.patch("aprsd.cmds.webchat.socketio")
def test_process_our_message_packet(
self, mock_parse_config,
self,
mock_packet_add,
mock_emit,
mock_socketio,
):
config = self._build_config()
mock_parse_config.return_value = config
self.config_and_init()
mock_socketio.emit = mock.MagicMock()
packet = fake.fake_packet(
message="blah",
msg_number=1,
message_format=core.PACKET_TYPE_MESSAGE,
)
socketio = mock.MagicMock()
packets.PacketList(config=config)
packets.PacketTrack(config=config)
packets.WatchList(config=config)
packets.SeenList(config=config)
wcp = webchat.WebChatProcessPacketThread(config, packet, socketio)
wcp = webchat.WebChatProcessPacketThread(packet, socketio)
wcp.process_our_message_packet(packet)
mock_packet_add.called_once()
mock_emit.called_once()
mock_socketio.called_once()

View File

@ -1,15 +1,21 @@
from unittest import mock
from oslo_config import cfg
from aprsd import conf # noqa: F401
from aprsd.plugins import fortune as fortune_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestFortunePlugin(test_plugin.TestPlugin):
@mock.patch("shutil.which")
def test_fortune_fail(self, mock_which):
mock_which.return_value = None
fortune = fortune_plugin.FortunePlugin(self.config)
fortune = fortune_plugin.FortunePlugin()
expected = "FortunePlugin isn't enabled"
packet = fake.fake_packet(message="fortune")
actual = fortune.filter(packet)
@ -20,7 +26,8 @@ class TestFortunePlugin(test_plugin.TestPlugin):
def test_fortune_success(self, mock_which, mock_output):
mock_which.return_value = "/usr/bin/games/fortune"
mock_output.return_value = "Funny fortune"
fortune = fortune_plugin.FortunePlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
fortune = fortune_plugin.FortunePlugin()
expected = "Funny fortune"
packet = fake.fake_packet(message="fortune")

View File

@ -1,18 +1,24 @@
from unittest import mock
from oslo_config import cfg
from aprsd import conf # noqa: F401
from aprsd.plugins import location as location_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestLocationPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.config.Config.check_option")
def test_location_not_enabled_missing_aprs_fi_key(self, mock_check):
def test_location_not_enabled_missing_aprs_fi_key(self):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check.side_effect = Exception
fortune = location_plugin.LocationPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
CONF.aprs_fi.apiKey = None
fortune = location_plugin.LocationPlugin()
expected = "LocationPlugin isn't enabled"
packet = fake.fake_packet(message="location")
actual = fortune.filter(packet)
@ -23,7 +29,8 @@ class TestLocationPlugin(test_plugin.TestPlugin):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check.side_effect = Exception
fortune = location_plugin.LocationPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
fortune = location_plugin.LocationPlugin()
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="location")
actual = fortune.filter(packet)
@ -34,7 +41,8 @@ class TestLocationPlugin(test_plugin.TestPlugin):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check.return_value = {"entries": []}
fortune = location_plugin.LocationPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
fortune = location_plugin.LocationPlugin()
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="location")
actual = fortune.filter(packet)
@ -57,7 +65,8 @@ class TestLocationPlugin(test_plugin.TestPlugin):
}
mock_weather.side_effect = Exception
mock_time.return_value = 10
fortune = location_plugin.LocationPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
fortune = location_plugin.LocationPlugin()
expected = "KFAKE: Unknown Location 0' 10,11 0.0h ago"
packet = fake.fake_packet(message="location")
actual = fortune.filter(packet)
@ -82,7 +91,8 @@ class TestLocationPlugin(test_plugin.TestPlugin):
wx_data = {"location": {"areaDescription": expected_town}}
mock_weather.return_value = wx_data
mock_time.return_value = 10
fortune = location_plugin.LocationPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
fortune = location_plugin.LocationPlugin()
expected = f"KFAKE: {expected_town} 0' 10,11 0.0h ago"
packet = fake.fake_packet(message="location")
actual = fortune.filter(packet)

View File

@ -1,14 +1,16 @@
from unittest import mock
from aprsd import client
from aprsd import config as aprsd_config
from aprsd import packets
from oslo_config import cfg
from aprsd import client, packets
from aprsd import conf # noqa: F401
from aprsd.plugins import notify as notify_plugin
from .. import fake, test_plugin
DEFAULT_WATCHLIST_CALLSIGNS = [fake.FAKE_FROM_CALLSIGN]
CONF = cfg.CONF
DEFAULT_WATCHLIST_CALLSIGNS = fake.FAKE_FROM_CALLSIGN
class TestWatchListPlugin(test_plugin.TestPlugin):
@ -16,7 +18,7 @@ class TestWatchListPlugin(test_plugin.TestPlugin):
self.fromcall = fake.FAKE_FROM_CALLSIGN
self.ack = 1
def _config(
def config_and_init(
self,
watchlist_enabled=True,
watchlist_alert_callsign=None,
@ -24,39 +26,33 @@ class TestWatchListPlugin(test_plugin.TestPlugin):
watchlist_packet_keep_count=None,
watchlist_callsigns=DEFAULT_WATCHLIST_CALLSIGNS,
):
_config = aprsd_config.Config(aprsd_config.DEFAULT_CONFIG_DICT)
default_wl = aprsd_config.DEFAULT_CONFIG_DICT["aprsd"]["watch_list"]
_config["ham"]["callsign"] = self.fromcall
_config["aprsd"]["callsign"] = self.fromcall
_config["aprs"]["login"] = self.fromcall
_config["services"]["aprs.fi"]["apiKey"] = "something"
CONF.callsign = self.fromcall
CONF.aprs_network.login = self.fromcall
CONF.aprs_fi.apiKey = "something"
# Set the watchlist specific config options
CONF.watch_list.enabled = watchlist_enabled
_config["aprsd"]["watch_list"]["enabled"] = watchlist_enabled
if not watchlist_alert_callsign:
watchlist_alert_callsign = fake.FAKE_TO_CALLSIGN
_config["aprsd"]["watch_list"]["alert_callsign"] = watchlist_alert_callsign
CONF.watch_list.alert_callsign = watchlist_alert_callsign
if not watchlist_alert_time_seconds:
watchlist_alert_time_seconds = default_wl["alert_time_seconds"]
_config["aprsd"]["watch_list"]["alert_time_seconds"] = watchlist_alert_time_seconds
watchlist_alert_time_seconds = CONF.watch_list.alert_time_seconds
CONF.watch_list.alert_time_seconds = watchlist_alert_time_seconds
if not watchlist_packet_keep_count:
watchlist_packet_keep_count = default_wl["packet_keep_count"]
_config["aprsd"]["watch_list"]["packet_keep_count"] = watchlist_packet_keep_count
watchlist_packet_keep_count = CONF.watch_list.packet_keep_count
CONF.watch_list.packet_keep_count = watchlist_packet_keep_count
_config["aprsd"]["watch_list"]["callsigns"] = watchlist_callsigns
return _config
CONF.watch_list.callsigns = watchlist_callsigns
class TestAPRSDWatchListPluginBase(TestWatchListPlugin):
def test_watchlist_not_enabled(self):
config = self._config(watchlist_enabled=False)
self.config_and_init(config=config)
plugin = fake.FakeWatchListPlugin(self.config)
self.config_and_init(watchlist_enabled=False)
plugin = fake.FakeWatchListPlugin()
packet = fake.fake_packet(
message="version",
@ -69,9 +65,8 @@ class TestAPRSDWatchListPluginBase(TestWatchListPlugin):
@mock.patch("aprsd.client.ClientFactory", autospec=True)
def test_watchlist_not_in_watchlist(self, mock_factory):
client.factory = mock_factory
config = self._config()
self.config_and_init(config=config)
plugin = fake.FakeWatchListPlugin(self.config)
self.config_and_init()
plugin = fake.FakeWatchListPlugin()
packet = fake.fake_packet(
fromcall="FAKE",
@ -86,9 +81,8 @@ class TestAPRSDWatchListPluginBase(TestWatchListPlugin):
class TestNotifySeenPlugin(TestWatchListPlugin):
def test_disabled(self):
config = self._config(watchlist_enabled=False)
self.config_and_init(config=config)
plugin = notify_plugin.NotifySeenPlugin(self.config)
self.config_and_init(watchlist_enabled=False)
plugin = notify_plugin.NotifySeenPlugin()
packet = fake.fake_packet(
message="version",
@ -101,9 +95,8 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
@mock.patch("aprsd.client.ClientFactory", autospec=True)
def test_callsign_not_in_watchlist(self, mock_factory):
client.factory = mock_factory
config = self._config(watchlist_enabled=False)
self.config_and_init(config=config)
plugin = notify_plugin.NotifySeenPlugin(self.config)
self.config_and_init(watchlist_enabled=False)
plugin = notify_plugin.NotifySeenPlugin()
packet = fake.fake_packet(
message="version",
@ -118,12 +111,11 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
def test_callsign_in_watchlist_not_old(self, mock_is_old, mock_factory):
client.factory = mock_factory
mock_is_old.return_value = False
config = self._config(
self.config_and_init(
watchlist_enabled=True,
watchlist_callsigns=["WB4BOR"],
)
self.config_and_init(config=config)
plugin = notify_plugin.NotifySeenPlugin(self.config)
plugin = notify_plugin.NotifySeenPlugin()
packet = fake.fake_packet(
fromcall="WB4BOR",
@ -139,13 +131,12 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
def test_callsign_in_watchlist_old_same_alert_callsign(self, mock_is_old, mock_factory):
client.factory = mock_factory
mock_is_old.return_value = True
config = self._config(
self.config_and_init(
watchlist_enabled=True,
watchlist_alert_callsign="WB4BOR",
watchlist_callsigns=["WB4BOR"],
)
self.config_and_init(config=config)
plugin = notify_plugin.NotifySeenPlugin(self.config)
plugin = notify_plugin.NotifySeenPlugin()
packet = fake.fake_packet(
fromcall="WB4BOR",
@ -163,13 +154,12 @@ class TestNotifySeenPlugin(TestWatchListPlugin):
mock_is_old.return_value = True
notify_callsign = fake.FAKE_TO_CALLSIGN
fromcall = "WB4BOR"
config = self._config(
self.config_and_init(
watchlist_enabled=True,
watchlist_alert_callsign=notify_callsign,
watchlist_callsigns=["WB4BOR"],
)
self.config_and_init(config=config)
plugin = notify_plugin.NotifySeenPlugin(self.config)
plugin = notify_plugin.NotifySeenPlugin()
packet = fake.fake_packet(
fromcall=fromcall,

View File

@ -1,10 +1,16 @@
from unittest import mock
from oslo_config import cfg
from aprsd import conf # noqa: F401
from aprsd.plugins import ping as ping_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestPingPlugin(test_plugin.TestPlugin):
@mock.patch("time.localtime")
def test_ping(self, mock_time):
@ -14,7 +20,8 @@ class TestPingPlugin(test_plugin.TestPlugin):
s = fake_time.tm_sec = 55
mock_time.return_value = fake_time
ping = ping_plugin.PingPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
ping = ping_plugin.PingPlugin()
packet = fake.fake_packet(
message="location",

View File

@ -1,5 +1,7 @@
from unittest import mock
from oslo_config import cfg
from aprsd import packets
from aprsd.packets import tracker
from aprsd.plugins import query as query_plugin
@ -7,11 +9,18 @@ from aprsd.plugins import query as query_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestQueryPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.packets.tracker.PacketTrack.flush")
def test_query_flush(self, mock_flush):
packet = fake.fake_packet(message="!delete")
query = query_plugin.QueryPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
CONF.save_enabled = True
CONF.query_plugin.callsign = fake.FAKE_FROM_CALLSIGN
query = query_plugin.QueryPlugin()
query.enabled = True
expected = "Deleted ALL pending msgs."
actual = query.filter(packet)
@ -20,10 +29,13 @@ class TestQueryPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.packets.tracker.PacketTrack.restart_delayed")
def test_query_restart_delayed(self, mock_restart):
CONF.callsign = fake.FAKE_TO_CALLSIGN
CONF.save_enabled = True
CONF.query_plugin.callsign = fake.FAKE_FROM_CALLSIGN
track = tracker.PacketTrack()
track.data = {}
packet = fake.fake_packet(message="!4")
query = query_plugin.QueryPlugin(self.config)
query = query_plugin.QueryPlugin()
expected = "No pending msgs to resend"
actual = query.filter(packet)

View File

@ -1,5 +1,6 @@
from unittest import mock
from oslo_config import cfg
import pytz
from aprsd.plugins import time as time_plugin
@ -8,6 +9,9 @@ from aprsd.utils import fuzzy
from .. import fake, test_plugin
CONF = cfg.CONF
class TestTimePlugins(test_plugin.TestPlugin):
@mock.patch("aprsd.plugins.time.TimePlugin._get_local_tz")
@ -25,7 +29,8 @@ class TestTimePlugins(test_plugin.TestPlugin):
h = int(local_t.strftime("%H"))
m = int(local_t.strftime("%M"))
fake_time.tm_sec = 13
time = time_plugin.TimePlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
time = time_plugin.TimePlugin()
packet = fake.fake_packet(
message="location",

View File

@ -1,4 +1,4 @@
from unittest import mock
from oslo_config import cfg
import aprsd
from aprsd.plugins import version as version_plugin
@ -6,11 +6,16 @@ from aprsd.plugins import version as version_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestVersionPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.plugin.PluginManager.get_plugins")
def test_version(self, mock_get_plugins):
def test_version(self):
expected = f"APRSD ver:{aprsd.__version__} uptime:00:00:00"
version = version_plugin.VersionPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
version = version_plugin.VersionPlugin()
version.enabled = True
packet = fake.fake_packet(
message="No",

View File

@ -1,18 +1,24 @@
from unittest import mock
from oslo_config import cfg
from aprsd import conf # noqa: F401
from aprsd.plugins import weather as weather_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.config.Config.check_option")
def test_not_enabled_missing_aprs_fi_key(self, mock_check):
def test_not_enabled_missing_aprs_fi_key(self):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check.side_effect = Exception
wx = weather_plugin.USWeatherPlugin(self.config)
CONF.aprs_fi.apiKey = None
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
expected = "USWeatherPlugin isn't enabled"
packet = fake.fake_packet(message="weather")
actual = wx.filter(packet)
@ -23,7 +29,9 @@ class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.side_effect = Exception
wx = weather_plugin.USWeatherPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="weather")
actual = wx.filter(packet)
@ -34,7 +42,10 @@ class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.return_value = {"entries": []}
wx = weather_plugin.USWeatherPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
wx.enabled = True
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="weather")
actual = wx.filter(packet)
@ -55,7 +66,10 @@ class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
],
}
mock_weather.side_effect = Exception
wx = weather_plugin.USWeatherPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
wx.enabled = True
expected = "Unable to get weather"
packet = fake.fake_packet(message="weather")
actual = wx.filter(packet)
@ -83,7 +97,10 @@ class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
},
"time": {"startPeriodName": ["ignored", "sometime"]},
}
wx = weather_plugin.USWeatherPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
wx.enabled = True
expected = "400F(10F/11F) test. sometime, another."
packet = fake.fake_packet(message="weather")
actual = wx.filter(packet)
@ -92,12 +109,11 @@ class TestUSWeatherPluginPlugin(test_plugin.TestPlugin):
class TestUSMetarPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.config.Config.check_option")
def test_not_enabled_missing_aprs_fi_key(self, mock_check):
def test_not_enabled_missing_aprs_fi_key(self):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check.side_effect = Exception
wx = weather_plugin.USMetarPlugin(self.config)
CONF.aprs_fi.apiKey = None
wx = weather_plugin.USMetarPlugin()
expected = "USMetarPlugin isn't enabled"
packet = fake.fake_packet(message="metar")
actual = wx.filter(packet)
@ -108,7 +124,10 @@ class TestUSMetarPlugin(test_plugin.TestPlugin):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.side_effect = Exception
wx = weather_plugin.USMetarPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="metar")
actual = wx.filter(packet)
@ -119,7 +138,10 @@ class TestUSMetarPlugin(test_plugin.TestPlugin):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.return_value = {"entries": []}
wx = weather_plugin.USMetarPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="metar")
actual = wx.filter(packet)
@ -128,7 +150,10 @@ class TestUSMetarPlugin(test_plugin.TestPlugin):
@mock.patch("aprsd.plugin_utils.get_weather_gov_metar")
def test_gov_metar_fetch_fails(self, mock_metar):
mock_metar.side_effect = Exception
wx = weather_plugin.USMetarPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "Unable to find station METAR"
packet = fake.fake_packet(message="metar KPAO")
actual = wx.filter(packet)
@ -141,7 +166,10 @@ class TestUSMetarPlugin(test_plugin.TestPlugin):
text = '{"properties": {"rawMessage": "BOGUSMETAR"}}'
mock_metar.return_value = Response()
wx = weather_plugin.USMetarPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "BOGUSMETAR"
packet = fake.fake_packet(message="metar KPAO")
actual = wx.filter(packet)
@ -169,7 +197,10 @@ class TestUSMetarPlugin(test_plugin.TestPlugin):
}
mock_metar.return_value = Response()
wx = weather_plugin.USMetarPlugin(self.config)
CONF.aprs_fi.apiKey = "abc123"
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "BOGUSMETAR"
packet = fake.fake_packet(message="metar")
actual = wx.filter(packet)

View File

@ -1,24 +1,32 @@
import unittest
from oslo_config import cfg
from aprsd import conf # noqa: F401
from aprsd.plugins import email
CONF = cfg.CONF
class TestEmail(unittest.TestCase):
def test_get_email_from_shortcut(self):
CONF.email_plugin.shortcuts = None
email_address = "something@something.com"
addr = f"-{email_address}"
actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual)
config = {"aprsd": {"email": {"nothing": "nothing"}}}
CONF.email_plugin.shortcuts = None
actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual)
config = {"aprsd": {"email": {"shortcuts": {"not_used": "empty"}}}}
CONF.email_plugin.shortcuts = None
actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual)
config = {"aprsd": {"email": {"shortcuts": {"-wb": email_address}}}}
short = "-wb"
CONF.email_plugin.email_shortcuts = ["wb=something@something.com"]
email.shortcuts_dict = None
short = "wb"
actual = email.get_email_from_shortcut(short)
self.assertEqual(email_address, actual)

View File

@ -1,7 +1,9 @@
import unittest
from unittest import mock
from aprsd import config as aprsd_config
from oslo_config import cfg
from aprsd import conf # noqa: F401
from aprsd import packets
from aprsd import plugin as aprsd_plugin
from aprsd import plugins, stats
@ -10,6 +12,9 @@ from aprsd.packets import core
from . import fake
CONF = cfg.CONF
class TestPluginManager(unittest.TestCase):
def setUp(self) -> None:
@ -21,34 +26,26 @@ class TestPluginManager(unittest.TestCase):
aprsd_plugin.PluginManager._instance = None
def config_and_init(self):
self.config = aprsd_config.Config(aprsd_config.DEFAULT_CONFIG_DICT)
self.config["ham"]["callsign"] = self.fromcall
self.config["aprs"]["login"] = fake.FAKE_TO_CALLSIGN
self.config["services"]["aprs.fi"]["apiKey"] = "something"
self.config["aprsd"]["enabled_plugins"] = [
"aprsd.plugins.ping.PingPlugin",
]
print(self.config)
def test_init_no_config(self):
pm = aprsd_plugin.PluginManager()
self.assertEqual(None, pm.config)
def test_init_with_config(self):
pm = aprsd_plugin.PluginManager(self.config)
self.assertEqual(self.config, pm.config)
CONF.callsign = self.fromcall
CONF.aprs_network.login = fake.FAKE_TO_CALLSIGN
CONF.aprs_fi.apiKey = "something"
CONF.enabled_plugins = "aprsd.plugins.ping.PingPlugin"
CONF.enable_save = False
def test_get_plugins_no_plugins(self):
pm = aprsd_plugin.PluginManager(self.config)
CONF.enabled_plugins = []
pm = aprsd_plugin.PluginManager()
plugin_list = pm.get_plugins()
self.assertEqual([], plugin_list)
def test_get_plugins_with_plugins(self):
pm = aprsd_plugin.PluginManager(self.config)
CONF.enabled_plugins = ["aprsd.plugins.ping.PingPlugin"]
pm = aprsd_plugin.PluginManager()
plugin_list = pm.get_plugins()
self.assertEqual([], plugin_list)
pm.setup_plugins()
plugin_list = pm.get_plugins()
print(plugin_list)
self.assertIsInstance(plugin_list, list)
self.assertIsInstance(
plugin_list[0],
@ -59,7 +56,7 @@ class TestPluginManager(unittest.TestCase):
)
def test_get_watchlist_plugins(self):
pm = aprsd_plugin.PluginManager(self.config)
pm = aprsd_plugin.PluginManager()
plugin_list = pm.get_plugins()
self.assertEqual([], plugin_list)
pm.setup_plugins()
@ -68,7 +65,8 @@ class TestPluginManager(unittest.TestCase):
self.assertEqual(0, len(plugin_list))
def test_get_message_plugins(self):
pm = aprsd_plugin.PluginManager(self.config)
CONF.enabled_plugins = ["aprsd.plugins.ping.PingPlugin"]
pm = aprsd_plugin.PluginManager()
plugin_list = pm.get_plugins()
self.assertEqual([], plugin_list)
pm.setup_plugins()
@ -98,27 +96,19 @@ class TestPlugin(unittest.TestCase):
packets.PacketTrack._instance = None
self.config = None
def config_and_init(self, config=None):
if not config:
self.config = aprsd_config.Config(aprsd_config.DEFAULT_CONFIG_DICT)
self.config["ham"]["callsign"] = self.fromcall
self.config["aprs"]["login"] = fake.FAKE_TO_CALLSIGN
self.config["services"]["aprs.fi"]["apiKey"] = "something"
else:
self.config = config
# Inintialize the stats object with the config
stats.APRSDStats(self.config)
packets.WatchList(config=self.config)
packets.SeenList(config=self.config)
packets.PacketTrack(config=self.config)
def config_and_init(self):
CONF.callsign = self.fromcall
CONF.aprs_network.login = fake.FAKE_TO_CALLSIGN
CONF.aprs_fi.apiKey = "something"
CONF.enabled_plugins = "aprsd.plugins.ping.PingPlugin"
CONF.enable_save = False
class TestPluginBase(TestPlugin):
@mock.patch.object(fake.FakeBaseNoThreadsPlugin, "process")
def test_base_plugin_no_threads(self, mock_process):
p = fake.FakeBaseNoThreadsPlugin(self.config)
p = fake.FakeBaseNoThreadsPlugin()
expected = []
actual = p.create_threads()
@ -139,19 +129,20 @@ class TestPluginBase(TestPlugin):
@mock.patch.object(fake.FakeBaseThreadsPlugin, "create_threads")
def test_base_plugin_threads_created(self, mock_create):
p = fake.FakeBaseThreadsPlugin(self.config)
p = fake.FakeBaseThreadsPlugin()
mock_create.assert_called_once()
p.stop_threads()
def test_base_plugin_threads(self):
p = fake.FakeBaseThreadsPlugin(self.config)
p = fake.FakeBaseThreadsPlugin()
actual = p.create_threads()
self.assertTrue(isinstance(actual, fake.FakeThread))
p.stop_threads()
@mock.patch.object(fake.FakeRegexCommandPlugin, "process")
def test_regex_base_not_called(self, mock_process):
p = fake.FakeRegexCommandPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
p = fake.FakeRegexCommandPlugin()
packet = fake.fake_packet(message="a")
expected = None
actual = p.filter(packet)
@ -165,32 +156,32 @@ class TestPluginBase(TestPlugin):
mock_process.assert_not_called()
packet = fake.fake_packet(
message="F",
message_format=core.PACKET_TYPE_MICE,
)
expected = None
expected = packets.NULL_MESSAGE
actual = p.filter(packet)
self.assertEqual(expected, actual)
mock_process.assert_not_called()
packet = fake.fake_packet(
message="f",
message_format=core.PACKET_TYPE_ACK,
)
expected = None
expected = packets.NULL_MESSAGE
actual = p.filter(packet)
self.assertEqual(expected, actual)
mock_process.assert_not_called()
@mock.patch.object(fake.FakeRegexCommandPlugin, "process")
def test_regex_base_assert_called(self, mock_process):
p = fake.FakeRegexCommandPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
p = fake.FakeRegexCommandPlugin()
packet = fake.fake_packet(message="f")
p.filter(packet)
mock_process.assert_called_once()
def test_regex_base_process_called(self):
p = fake.FakeRegexCommandPlugin(self.config)
CONF.callsign = fake.FAKE_TO_CALLSIGN
p = fake.FakeRegexCommandPlugin()
packet = fake.fake_packet(message="f")
expected = fake.FAKE_MESSAGE_TEXT