Merge pull request #43 from craigerl/rework_config

Reworked the config file and options
This commit is contained in:
Walter A. Boring IV 2021-01-21 13:36:44 -05:00 committed by GitHub
commit a656508ba6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 226 additions and 126 deletions

View File

@ -18,16 +18,16 @@ CONFIG = None
def _imap_connect(): def _imap_connect():
imap_port = CONFIG["imap"].get("port", 143) imap_port = CONFIG["aprsd"]["email"]["imap"].get("port", 143)
use_ssl = CONFIG["imap"].get("use_ssl", False) use_ssl = CONFIG["aprsd"]["email"]["imap"].get("use_ssl", False)
host = CONFIG["imap"]["host"] host = CONFIG["aprsd"]["email"]["imap"]["host"]
msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port) msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port)
# LOG.debug("Connect to IMAP host {} with user '{}'". # LOG.debug("Connect to IMAP host {} with user '{}'".
# format(msg, CONFIG['imap']['login'])) # format(msg, CONFIG['imap']['login']))
try: try:
server = imapclient.IMAPClient( server = imapclient.IMAPClient(
CONFIG["imap"]["host"], CONFIG["aprsd"]["email"]["imap"]["host"],
port=imap_port, port=imap_port,
use_uid=True, use_uid=True,
ssl=use_ssl, ssl=use_ssl,
@ -37,7 +37,10 @@ def _imap_connect():
return return
try: try:
server.login(CONFIG["imap"]["login"], CONFIG["imap"]["password"]) server.login(
CONFIG["aprsd"]["email"]["imap"]["login"],
CONFIG["aprsd"]["email"]["imap"]["password"],
)
except (imaplib.IMAP4.error, Exception) as e: except (imaplib.IMAP4.error, Exception) as e:
msg = getattr(e, "message", repr(e)) msg = getattr(e, "message", repr(e))
LOG.error("Failed to login {}".format(msg)) LOG.error("Failed to login {}".format(msg))
@ -48,12 +51,15 @@ def _imap_connect():
def _smtp_connect(): def _smtp_connect():
host = CONFIG["smtp"]["host"] host = CONFIG["aprsd"]["email"]["smtp"]["host"]
smtp_port = CONFIG["smtp"]["port"] smtp_port = CONFIG["aprsd"]["email"]["smtp"]["port"]
use_ssl = CONFIG["smtp"].get("use_ssl", False) use_ssl = CONFIG["aprsd"]["email"]["smtp"].get("use_ssl", False)
msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port) msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port)
LOG.debug( LOG.debug(
"Connect to SMTP host {} with user '{}'".format(msg, CONFIG["imap"]["login"]), "Connect to SMTP host {} with user '{}'".format(
msg,
CONFIG["aprsd"]["email"]["imap"]["login"],
),
) )
try: try:
@ -68,7 +74,10 @@ def _smtp_connect():
LOG.debug("Connected to smtp host {}".format(msg)) LOG.debug("Connected to smtp host {}".format(msg))
try: try:
server.login(CONFIG["smtp"]["login"], CONFIG["smtp"]["password"]) server.login(
CONFIG["aprsd"]["email"]["smtp"]["login"],
CONFIG["aprsd"]["email"]["smtp"]["password"],
)
except Exception: except Exception:
LOG.error("Couldn't connect to SMTP Server") LOG.error("Couldn't connect to SMTP Server")
return return
@ -93,8 +102,8 @@ def validate_shortcuts(config):
email_address=shortcuts[key], email_address=shortcuts[key],
check_regex=True, check_regex=True,
check_mx=False, check_mx=False,
from_address=config["smtp"]["login"], from_address=config["aprsd"]["email"]["smtp"]["login"],
helo_host=config["smtp"]["host"], helo_host=config["aprsd"]["email"]["smtp"]["host"],
smtp_timeout=10, smtp_timeout=10,
dns_timeout=10, dns_timeout=10,
use_blacklist=True, use_blacklist=True,
@ -109,14 +118,14 @@ def validate_shortcuts(config):
delete_keys.append(key) delete_keys.append(key)
for key in delete_keys: for key in delete_keys:
del config["shortcuts"][key] del config["aprsd"]["email"]["shortcuts"][key]
LOG.info("Available shortcuts: {}".format(config["shortcuts"])) LOG.info("Available shortcuts: {}".format(config["shortcuts"]))
def get_email_from_shortcut(addr): def get_email_from_shortcut(addr):
if CONFIG.get("shortcuts", False): if CONFIG["aprsd"]["email"].get("shortcuts", False):
return CONFIG["shortcuts"].get(addr, addr) return CONFIG["aprsd"]["email"]["shortcuts"].get(addr, addr)
else: else:
return addr return addr
@ -232,7 +241,7 @@ def parse_email(msgid, data, server):
def send_email(to_addr, content): def send_email(to_addr, content):
global check_email_delay global check_email_delay
shortcuts = CONFIG["shortcuts"] shortcuts = CONFIG["aprsd"]["email"]["shortcuts"]
email_address = get_email_from_shortcut(to_addr) email_address = get_email_from_shortcut(to_addr)
LOG.info("Sending Email_________________") LOG.info("Sending Email_________________")
@ -250,12 +259,16 @@ def send_email(to_addr, content):
msg = MIMEText(content) msg = MIMEText(content)
msg["Subject"] = subject msg["Subject"] = subject
msg["From"] = CONFIG["smtp"]["login"] msg["From"] = CONFIG["aprsd"]["email"]["smtp"]["login"]
msg["To"] = to_addr msg["To"] = to_addr
server = _smtp_connect() server = _smtp_connect()
if server: if server:
try: try:
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string()) server.sendmail(
CONFIG["aprsd"]["email"]["smtp"]["login"],
[to_addr],
msg.as_string(),
)
except Exception as e: except Exception as e:
msg = getattr(e, "message", repr(e)) msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg) LOG.error("Sendmail Error!!!! '{}'", msg)
@ -305,7 +318,11 @@ def resend_email(count, fromcall):
# asterisk indicates a resend # asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors="ignore") reply = "-" + from_addr + " * " + body.decode(errors="ignore")
# messaging.send_message(fromcall, reply) # messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) msg = messaging.TextMessage(
CONFIG["aprsd"]["email"]["aprs"]["login"],
fromcall,
reply,
)
msg.send() msg.send()
msgexists = True msgexists = True
@ -362,7 +379,7 @@ class APRSDEmailThread(threads.APRSDThread):
check_email_delay += 1 check_email_delay += 1
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
shortcuts = CONFIG["shortcuts"] shortcuts = CONFIG["aprsd"]["email"]["shortcuts"]
# swap key/value # swap key/value
shortcuts_inverted = {v: k for k, v in shortcuts.items()} shortcuts_inverted = {v: k for k, v in shortcuts.items()}

View File

@ -37,7 +37,6 @@ import aprslib
from aprslib.exceptions import LoginError from aprslib.exceptions import LoginError
import click import click
import click_completion import click_completion
import yaml
# setup the global logger # setup the global logger
# logging.basicConfig(level=logging.DEBUG) # level=10 # logging.basicConfig(level=logging.DEBUG) # level=10
@ -174,7 +173,7 @@ def setup_logging(config, loglevel, quiet):
log_format = "[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s]" " %(message)s" log_format = "[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s]" " %(message)s"
date_format = "%m/%d/%Y %I:%M:%S %p" date_format = "%m/%d/%Y %I:%M:%S %p"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format) log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
log_file = config["aprs"].get("logfile", None) log_file = config["aprsd"].get("logfile", None)
if log_file: if log_file:
fh = RotatingFileHandler(log_file, maxBytes=(10248576 * 5), backupCount=4) fh = RotatingFileHandler(log_file, maxBytes=(10248576 * 5), backupCount=4)
else: else:
@ -192,7 +191,7 @@ def setup_logging(config, loglevel, quiet):
@main.command() @main.command()
def sample_config(): def sample_config():
"""This dumps the config to stdout.""" """This dumps the config to stdout."""
click.echo(utils.add_config_comments(yaml.dump(utils.DEFAULT_CONFIG_DICT))) click.echo(utils.dump_default_cfg())
@main.command() @main.command()

View File

@ -18,12 +18,13 @@ class LocationPlugin(plugin.APRSDPluginBase):
LOG.info("Location Plugin") LOG.info("Location Plugin")
# get last location of a callsign, get descriptive name from weather service # get last location of a callsign, get descriptive name from weather service
try: try:
utils.check_config_option(self.config, "aprs.fi", "apiKey") utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex: except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex)) LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found" return "No aprs.fi apikey found"
api_key = self.config["aprs.fi"]["apiKey"] api_key = self.config["services"]["aprs.fi"]["apiKey"]
# optional second argument is a callsign to search # optional second argument is a callsign to search
a = re.search(r"^.*\s+(.*)", message) a = re.search(r"^.*\s+(.*)", message)
if a is not None: if a is not None:

View File

@ -53,7 +53,7 @@ class TimeOpenCageDataPlugin(TimePlugin):
command_name = "Time" command_name = "Time"
def command(self, fromcall, message, ack): def command(self, fromcall, message, ack):
api_key = self.config["aprs.fi"]["apiKey"] api_key = self.config["services"]["aprs.fi"]["apiKey"]
try: try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall) aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex: except Exception as ex:
@ -93,7 +93,7 @@ class TimeOWMPlugin(TimePlugin):
command_name = "Time" command_name = "Time"
def command(self, fromcall, message, ack): def command(self, fromcall, message, ack):
api_key = self.config["aprs.fi"]["apiKey"] api_key = self.config["services"]["aprs.fi"]["apiKey"]
try: try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall) aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex: except Exception as ex:
@ -105,12 +105,15 @@ class TimeOWMPlugin(TimePlugin):
lon = aprs_data["entries"][0]["lng"] lon = aprs_data["entries"][0]["lng"]
try: try:
utils.check_config_option(self.config, "openweathermap", "apiKey") utils.check_config_option(
self.config,
["services", "openweathermap", "apiKey"],
)
except Exception as ex: except Exception as ex:
LOG.error("Failed to find config openweathermap:apiKey {}".format(ex)) LOG.error("Failed to find config openweathermap:apiKey {}".format(ex))
return "No openweathermap apiKey found" return "No openweathermap apiKey found"
api_key = self.config["openweathermap"]["apiKey"] api_key = self.config["services"]["openweathermap"]["apiKey"]
try: try:
results = plugin_utils.fetch_openweathermap(api_key, lat, lon) results = plugin_utils.fetch_openweathermap(api_key, lat, lon)
except Exception as ex: except Exception as ex:

View File

@ -27,7 +27,13 @@ class USWeatherPlugin(plugin.APRSDPluginBase):
def command(self, fromcall, message, ack): def command(self, fromcall, message, ack):
LOG.info("Weather Plugin") LOG.info("Weather Plugin")
api_key = self.config["aprs.fi"]["apiKey"] try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try: try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall) aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex: except Exception as ex:
@ -98,7 +104,17 @@ class USMetarPlugin(plugin.APRSDPluginBase):
# if no second argument, search for calling station # if no second argument, search for calling station
fromcall = fromcall fromcall = fromcall
api_key = self.config["aprs.fi"]["apiKey"] try:
utils.check_config_option(
self.config,
["services", "aprs.fi", "apiKey"],
)
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try: try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall) aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex: except Exception as ex:
@ -168,7 +184,13 @@ class OWMWeatherPlugin(plugin.APRSDPluginBase):
else: else:
searchcall = fromcall searchcall = fromcall
api_key = self.config["aprs.fi"]["apiKey"] try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try: try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall) aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex: except Exception as ex:
@ -184,20 +206,23 @@ class OWMWeatherPlugin(plugin.APRSDPluginBase):
lon = aprs_data["entries"][0]["lng"] lon = aprs_data["entries"][0]["lng"]
try: try:
utils.check_config_option(self.config, "openweathermap", "apiKey") utils.check_config_option(
self.config,
["services", "openweathermap", "apiKey"],
)
except Exception as ex: except Exception as ex:
LOG.error("Failed to find config openweathermap:apiKey {}".format(ex)) LOG.error("Failed to find config openweathermap:apiKey {}".format(ex))
return "No openweathermap apiKey found" return "No openweathermap apiKey found"
try: try:
utils.check_config_option(self.config, "aprsd", "units") utils.check_config_option(self.config, ["aprsd", "units"])
except Exception: except Exception:
LOG.debug("Couldn't find untis in aprsd:services:units") LOG.debug("Couldn't find untis in aprsd:services:units")
units = "metric" units = "metric"
else: else:
units = self.config["aprsd"]["units"] units = self.config["aprsd"]["units"]
api_key = self.config["openweathermap"]["apiKey"] api_key = self.config["services"]["openweathermap"]["apiKey"]
try: try:
wx_data = plugin_utils.fetch_openweathermap( wx_data = plugin_utils.fetch_openweathermap(
api_key, api_key,
@ -279,7 +304,13 @@ class AVWXWeatherPlugin(plugin.APRSDPluginBase):
else: else:
searchcall = fromcall searchcall = fromcall
api_key = self.config["aprs.fi"]["apiKey"] try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try: try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall) aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex: except Exception as ex:
@ -295,20 +326,20 @@ class AVWXWeatherPlugin(plugin.APRSDPluginBase):
lon = aprs_data["entries"][0]["lng"] lon = aprs_data["entries"][0]["lng"]
try: try:
utils.check_config_option(self.config, "avwx", "apiKey") utils.check_config_option(self.config, ["services", "avwx", "apiKey"])
except Exception as ex: except Exception as ex:
LOG.error("Failed to find config avwx:apiKey {}".format(ex)) LOG.error("Failed to find config avwx:apiKey {}".format(ex))
return "No avwx apiKey found" return "No avwx apiKey found"
try: try:
utils.check_config_option(self.config, "avwx", "base_url") utils.check_config_option(self.config, ["services", "avwx", "base_url"])
except Exception as ex: except Exception as ex:
LOG.debut("Didn't find avwx:base_url {}".format(ex)) LOG.debut("Didn't find avwx:base_url {}".format(ex))
base_url = "https://avwx.rest" base_url = "https://avwx.rest"
else: else:
base_url = self.config["avwx"]["base_url"] base_url = self.config["services"]["avwx"]["base_url"]
api_key = self.config["avwx"]["apiKey"] api_key = self.config["services"]["avwx"]["apiKey"]
token = "TOKEN {}".format(api_key) token = "TOKEN {}".format(api_key)
headers = {"Authorization": token} headers = {"Authorization": token}
try: try:

View File

@ -17,37 +17,42 @@ DEFAULT_CONFIG_DICT = {
"aprs": { "aprs": {
"login": "CALLSIGN", "login": "CALLSIGN",
"password": "00000", "password": "00000",
"host": "rotate.aprs.net", "host": "rotate.aprs2.net",
"port": 14580, "port": 14580,
"logfile": "/tmp/aprsd.log",
},
"aprs.fi": {"apiKey": "set me"},
"openweathermap": {"apiKey": "set me"},
"opencagedata": {"apiKey": "set me"},
"avwx": {"base_url": "http://host:port", "apiKey": "set me"},
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "SMTP_USERNAME",
"password": "SMTP_PASSWORD",
"host": "smtp.gmail.com",
"port": 465,
"use_ssl": False,
},
"imap": {
"login": "IMAP_USERNAME",
"password": "IMAP_PASSWORD",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
}, },
"aprsd": { "aprsd": {
"logfile": "/tmp/aprsd.log",
"plugin_dir": "~/.config/aprsd/plugins", "plugin_dir": "~/.config/aprsd/plugins",
"enabled_plugins": plugin.CORE_PLUGINS, "enabled_plugins": plugin.CORE_PLUGINS,
"units": "imperial", "units": "imperial",
"email": {
"enabled": True,
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "SMTP_USERNAME",
"password": "SMTP_PASSWORD",
"host": "smtp.gmail.com",
"port": 465,
"use_ssl": False,
},
"imap": {
"login": "IMAP_USERNAME",
"password": "IMAP_PASSWORD",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
},
},
},
"services": {
"aprs.fi": {"apiKey": "APIKEYVALUE"},
"openweathermap": {"apiKey": "APIKEYVALUE"},
"opencagedata": {"apiKey": "APIKEYVALUE"},
"avwx": {"base_url": "http://host:port", "apiKey": "APIKEYVALUE"},
}, },
} }
@ -105,14 +110,33 @@ def end_substr(original, substr):
return idx return idx
def dump_default_cfg():
return add_config_comments(
yaml.dump(
DEFAULT_CONFIG_DICT,
indent=4,
),
)
def add_config_comments(raw_yaml): def add_config_comments(raw_yaml):
end_idx = end_substr(raw_yaml, "aprs:")
if end_idx != -1:
# lets insert a comment
raw_yaml = insert_str(
raw_yaml,
"\n # Get the passcode for your callsign here: "
"\n # https://apps.magicbug.co.uk/passcode",
end_idx,
)
end_idx = end_substr(raw_yaml, "aprs.fi:") end_idx = end_substr(raw_yaml, "aprs.fi:")
if end_idx != -1: if end_idx != -1:
# lets insert a comment # lets insert a comment
raw_yaml = insert_str( raw_yaml = insert_str(
raw_yaml, raw_yaml,
"\n # Get the apiKey from your aprs.fi account here: " "\n # Get the apiKey from your aprs.fi account here: "
"\n # http://aprs.fi/account", "\n # http://aprs.fi/account",
end_idx, end_idx,
) )
@ -121,9 +145,9 @@ def add_config_comments(raw_yaml):
# lets insert a comment # lets insert a comment
raw_yaml = insert_str( raw_yaml = insert_str(
raw_yaml, raw_yaml,
"\n # (Optional for TimeOpenCageDataPlugin) " "\n # (Optional for TimeOpenCageDataPlugin) "
"\n # Get the apiKey from your opencagedata account here: " "\n # Get the apiKey from your opencagedata account here: "
"\n # https://opencagedata.com/dashboard#api-keys", "\n # https://opencagedata.com/dashboard#api-keys",
end_idx, end_idx,
) )
@ -132,10 +156,10 @@ def add_config_comments(raw_yaml):
# lets insert a comment # lets insert a comment
raw_yaml = insert_str( raw_yaml = insert_str(
raw_yaml, raw_yaml,
"\n # (Optional for OWMWeatherPlugin) " "\n # (Optional for OWMWeatherPlugin) "
"\n # Get the apiKey from your " "\n # Get the apiKey from your "
"\n # openweathermap account here: " "\n # openweathermap account here: "
"\n # https://home.openweathermap.org/api_keys", "\n # https://home.openweathermap.org/api_keys",
end_idx, end_idx,
) )
@ -144,10 +168,10 @@ def add_config_comments(raw_yaml):
# lets insert a comment # lets insert a comment
raw_yaml = insert_str( raw_yaml = insert_str(
raw_yaml, raw_yaml,
"\n # (Optional for AVWXWeatherPlugin) " "\n # (Optional for AVWXWeatherPlugin) "
"\n # Use hosted avwx-api here: https://avwx.rest " "\n # Use hosted avwx-api here: https://avwx.rest "
"\n # or deploy your own from here: " "\n # or deploy your own from here: "
"\n # https://github.com/avwx-rest/avwx-api", "\n # https://github.com/avwx-rest/avwx-api",
end_idx, end_idx,
) )
@ -163,8 +187,7 @@ def create_default_config():
click.echo("Config dir '{}' doesn't exist, creating.".format(config_dir)) click.echo("Config dir '{}' doesn't exist, creating.".format(config_dir))
mkdir_p(config_dir) mkdir_p(config_dir)
with open(config_file_expanded, "w+") as cf: with open(config_file_expanded, "w+") as cf:
raw_yaml = yaml.dump(DEFAULT_CONFIG_DICT) cf.write(dump_default_cfg())
cf.write(add_config_comments(raw_yaml))
def get_config(config_file): def get_config(config_file):
@ -194,33 +217,32 @@ def get_config(config_file):
sys.exit(-1) sys.exit(-1)
def check_config_option(config, section, name=None, default=None, default_fail=None): def conf_option_exists(conf, chain):
if section in config: _key = chain.pop(0)
if _key in conf:
return conf_option_exists(conf[_key], chain) if chain else conf[_key]
if name and name not in config[section]:
if not default: def check_config_option(config, chain, default_fail=None):
raise Exception( result = conf_option_exists(config, chain.copy())
"'{}' was not in '{}' section of config file".format( if not result:
name, raise Exception(
section, "'{}' was not in config file".format(
), chain,
) ),
else: )
config[section][name] = default else:
else: if default_fail:
if ( if result == default_fail:
default_fail
and name in config[section]
and config[section][name] == default_fail
):
# We have to fail and bail if the user hasn't edited # We have to fail and bail if the user hasn't edited
# this config option. # this config option.
raise Exception( raise Exception(
"Config file needs to be edited from provided defaults.", "Config file needs to be edited from provided defaults for {}.".format(
chain,
),
) )
else: else:
raise Exception("'%s' section wasn't in config file" % section) return config
return config
# This method tries to parse the config yaml file # This method tries to parse the config yaml file
@ -235,41 +257,68 @@ def parse_config(config_file):
click.echo(msg) click.echo(msg)
sys.exit(-1) sys.exit(-1)
def check_option(config, section, name=None, default=None, default_fail=None): def check_option(config, chain, default_fail=None):
try: try:
config = check_config_option(config, section, name, default, default_fail) config = check_config_option(config, chain, default_fail=default_fail)
except Exception as ex: except Exception as ex:
fail(repr(ex)) fail(repr(ex))
else: else:
return config return config
config = get_config(config_file) config = get_config(config_file)
check_option(config, "shortcuts")
# special check here to make sure user has edited the config file # special check here to make sure user has edited the config file
# and changed the ham callsign # and changed the ham callsign
check_option( check_option(
config, config,
"ham", [
"callsign", "ham",
"callsign",
],
default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"], default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"],
) )
check_option( check_option(
config, config,
"aprs.fi", ["services", "aprs.fi", "apiKey"],
"apiKey", default_fail=DEFAULT_CONFIG_DICT["services"]["aprs.fi"]["apiKey"],
default_fail=DEFAULT_CONFIG_DICT["aprs.fi"]["apiKey"],
) )
check_option(config, "aprs", "login") check_option(
check_option(config, "aprs", "password") config,
# check_option(config, "aprs", "host") ["aprs", "login"],
# check_option(config, "aprs", "port") default_fail=DEFAULT_CONFIG_DICT["aprs"]["login"],
check_option(config, "aprs", "logfile", "./aprsd.log") )
check_option(config, "imap", "host") check_option(
check_option(config, "imap", "login") config,
check_option(config, "imap", "password") ["aprs", "password"],
check_option(config, "smtp", "host") default_fail=DEFAULT_CONFIG_DICT["aprs"]["password"],
check_option(config, "smtp", "port") )
check_option(config, "smtp", "login") if config["aprsd"]["email"]["enabled"] is True:
check_option(config, "smtp", "password") # Check IMAP server settings
check_option(config, ["aprsd", "email", "imap", "host"])
check_option(config, ["aprsd", "email", "imap", "port"])
check_option(
config,
["aprsd", "email", "imap", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["imap"]["login"],
)
check_option(
config,
["aprsd", "email", "imap", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["imap"]["password"],
)
# Check SMTP server settings
check_option(config, ["aprsd", "email", "smtp", "host"])
check_option(config, ["aprsd", "email", "smtp", "port"])
check_option(
config,
["aprsd", "email", "smtp", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["smtp"]["login"],
)
check_option(
config,
["aprsd", "email", "smtp", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["smtp"]["password"],
)
return config return config

View File

@ -5,21 +5,21 @@ from aprsd import email
class TestEmail(unittest.TestCase): class TestEmail(unittest.TestCase):
def test_get_email_from_shortcut(self): def test_get_email_from_shortcut(self):
email.CONFIG = {"shortcuts": {}} email.CONFIG = {"aprsd": {"email": {"shortcuts": {}}}}
email_address = "something@something.com" email_address = "something@something.com"
addr = "-{}".format(email_address) addr = "-{}".format(email_address)
actual = email.get_email_from_shortcut(addr) actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual) self.assertEqual(addr, actual)
email.CONFIG = {"nothing": "nothing"} email.CONFIG = {"aprsd": {"email": {"nothing": "nothing"}}}
actual = email.get_email_from_shortcut(addr) actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual) self.assertEqual(addr, actual)
email.CONFIG = {"shortcuts": {"not_used": "empty"}} email.CONFIG = {"aprsd": {"email": {"shortcuts": {"not_used": "empty"}}}}
actual = email.get_email_from_shortcut(addr) actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual) self.assertEqual(addr, actual)
email.CONFIG = {"shortcuts": {"-wb": email_address}} email.CONFIG = {"aprsd": {"email": {"shortcuts": {"-wb": email_address}}}}
short = "-wb" short = "-wb"
actual = email.get_email_from_shortcut(short) actual = email.get_email_from_shortcut(short)
self.assertEqual(email_address, actual) self.assertEqual(email_address, actual)