Reworked the config file and options

This patch reorganizes the config file layout and options
to make more logical sense as well as make it more readable.

This breaks backwards compatibility.
This commit is contained in:
Hemna 2021-01-21 13:32:19 -05:00
parent 2f7c1bfcc1
commit ce5b09233c
7 changed files with 226 additions and 126 deletions

View File

@ -18,16 +18,16 @@ CONFIG = None
def _imap_connect():
imap_port = CONFIG["imap"].get("port", 143)
use_ssl = CONFIG["imap"].get("use_ssl", False)
host = CONFIG["imap"]["host"]
imap_port = CONFIG["aprsd"]["email"]["imap"].get("port", 143)
use_ssl = CONFIG["aprsd"]["email"]["imap"].get("use_ssl", False)
host = CONFIG["aprsd"]["email"]["imap"]["host"]
msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port)
# LOG.debug("Connect to IMAP host {} with user '{}'".
# format(msg, CONFIG['imap']['login']))
try:
server = imapclient.IMAPClient(
CONFIG["imap"]["host"],
CONFIG["aprsd"]["email"]["imap"]["host"],
port=imap_port,
use_uid=True,
ssl=use_ssl,
@ -37,7 +37,10 @@ def _imap_connect():
return
try:
server.login(CONFIG["imap"]["login"], CONFIG["imap"]["password"])
server.login(
CONFIG["aprsd"]["email"]["imap"]["login"],
CONFIG["aprsd"]["email"]["imap"]["password"],
)
except (imaplib.IMAP4.error, Exception) as e:
msg = getattr(e, "message", repr(e))
LOG.error("Failed to login {}".format(msg))
@ -48,12 +51,15 @@ def _imap_connect():
def _smtp_connect():
host = CONFIG["smtp"]["host"]
smtp_port = CONFIG["smtp"]["port"]
use_ssl = CONFIG["smtp"].get("use_ssl", False)
host = CONFIG["aprsd"]["email"]["smtp"]["host"]
smtp_port = CONFIG["aprsd"]["email"]["smtp"]["port"]
use_ssl = CONFIG["aprsd"]["email"]["smtp"].get("use_ssl", False)
msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port)
LOG.debug(
"Connect to SMTP host {} with user '{}'".format(msg, CONFIG["imap"]["login"]),
"Connect to SMTP host {} with user '{}'".format(
msg,
CONFIG["aprsd"]["email"]["imap"]["login"],
),
)
try:
@ -68,7 +74,10 @@ def _smtp_connect():
LOG.debug("Connected to smtp host {}".format(msg))
try:
server.login(CONFIG["smtp"]["login"], CONFIG["smtp"]["password"])
server.login(
CONFIG["aprsd"]["email"]["smtp"]["login"],
CONFIG["aprsd"]["email"]["smtp"]["password"],
)
except Exception:
LOG.error("Couldn't connect to SMTP Server")
return
@ -93,8 +102,8 @@ def validate_shortcuts(config):
email_address=shortcuts[key],
check_regex=True,
check_mx=False,
from_address=config["smtp"]["login"],
helo_host=config["smtp"]["host"],
from_address=config["aprsd"]["email"]["smtp"]["login"],
helo_host=config["aprsd"]["email"]["smtp"]["host"],
smtp_timeout=10,
dns_timeout=10,
use_blacklist=True,
@ -109,14 +118,14 @@ def validate_shortcuts(config):
delete_keys.append(key)
for key in delete_keys:
del config["shortcuts"][key]
del config["aprsd"]["email"]["shortcuts"][key]
LOG.info("Available shortcuts: {}".format(config["shortcuts"]))
def get_email_from_shortcut(addr):
if CONFIG.get("shortcuts", False):
return CONFIG["shortcuts"].get(addr, addr)
if CONFIG["aprsd"]["email"].get("shortcuts", False):
return CONFIG["aprsd"]["email"]["shortcuts"].get(addr, addr)
else:
return addr
@ -232,7 +241,7 @@ def parse_email(msgid, data, server):
def send_email(to_addr, content):
global check_email_delay
shortcuts = CONFIG["shortcuts"]
shortcuts = CONFIG["aprsd"]["email"]["shortcuts"]
email_address = get_email_from_shortcut(to_addr)
LOG.info("Sending Email_________________")
@ -250,12 +259,16 @@ def send_email(to_addr, content):
msg = MIMEText(content)
msg["Subject"] = subject
msg["From"] = CONFIG["smtp"]["login"]
msg["From"] = CONFIG["aprsd"]["email"]["smtp"]["login"]
msg["To"] = to_addr
server = _smtp_connect()
if server:
try:
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
server.sendmail(
CONFIG["aprsd"]["email"]["smtp"]["login"],
[to_addr],
msg.as_string(),
)
except Exception as e:
msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg)
@ -305,7 +318,11 @@ def resend_email(count, fromcall):
# asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors="ignore")
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply)
msg = messaging.TextMessage(
CONFIG["aprsd"]["email"]["aprs"]["login"],
fromcall,
reply,
)
msg.send()
msgexists = True
@ -362,7 +379,7 @@ class APRSDEmailThread(threads.APRSDThread):
check_email_delay += 1
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
shortcuts = CONFIG["shortcuts"]
shortcuts = CONFIG["aprsd"]["email"]["shortcuts"]
# swap key/value
shortcuts_inverted = {v: k for k, v in shortcuts.items()}

View File

@ -37,7 +37,6 @@ import aprslib
from aprslib.exceptions import LoginError
import click
import click_completion
import yaml
# setup the global logger
# logging.basicConfig(level=logging.DEBUG) # level=10
@ -174,7 +173,7 @@ def setup_logging(config, loglevel, quiet):
log_format = "[%(asctime)s] [%(threadName)-12s] [%(levelname)-5.5s]" " %(message)s"
date_format = "%m/%d/%Y %I:%M:%S %p"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
log_file = config["aprs"].get("logfile", None)
log_file = config["aprsd"].get("logfile", None)
if log_file:
fh = RotatingFileHandler(log_file, maxBytes=(10248576 * 5), backupCount=4)
else:
@ -192,7 +191,7 @@ def setup_logging(config, loglevel, quiet):
@main.command()
def sample_config():
"""This dumps the config to stdout."""
click.echo(utils.add_config_comments(yaml.dump(utils.DEFAULT_CONFIG_DICT)))
click.echo(utils.dump_default_cfg())
@main.command()

View File

@ -18,12 +18,13 @@ class LocationPlugin(plugin.APRSDPluginBase):
LOG.info("Location Plugin")
# get last location of a callsign, get descriptive name from weather service
try:
utils.check_config_option(self.config, "aprs.fi", "apiKey")
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["aprs.fi"]["apiKey"]
api_key = self.config["services"]["aprs.fi"]["apiKey"]
# optional second argument is a callsign to search
a = re.search(r"^.*\s+(.*)", message)
if a is not None:

View File

@ -53,7 +53,7 @@ class TimeOpenCageDataPlugin(TimePlugin):
command_name = "Time"
def command(self, fromcall, message, ack):
api_key = self.config["aprs.fi"]["apiKey"]
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex:
@ -93,7 +93,7 @@ class TimeOWMPlugin(TimePlugin):
command_name = "Time"
def command(self, fromcall, message, ack):
api_key = self.config["aprs.fi"]["apiKey"]
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex:
@ -105,12 +105,15 @@ class TimeOWMPlugin(TimePlugin):
lon = aprs_data["entries"][0]["lng"]
try:
utils.check_config_option(self.config, "openweathermap", "apiKey")
utils.check_config_option(
self.config,
["services", "openweathermap", "apiKey"],
)
except Exception as ex:
LOG.error("Failed to find config openweathermap:apiKey {}".format(ex))
return "No openweathermap apiKey found"
api_key = self.config["openweathermap"]["apiKey"]
api_key = self.config["services"]["openweathermap"]["apiKey"]
try:
results = plugin_utils.fetch_openweathermap(api_key, lat, lon)
except Exception as ex:

View File

@ -27,7 +27,13 @@ class USWeatherPlugin(plugin.APRSDPluginBase):
def command(self, fromcall, message, ack):
LOG.info("Weather Plugin")
api_key = self.config["aprs.fi"]["apiKey"]
try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex:
@ -98,7 +104,17 @@ class USMetarPlugin(plugin.APRSDPluginBase):
# if no second argument, search for calling station
fromcall = fromcall
api_key = self.config["aprs.fi"]["apiKey"]
try:
utils.check_config_option(
self.config,
["services", "aprs.fi", "apiKey"],
)
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex:
@ -168,7 +184,13 @@ class OWMWeatherPlugin(plugin.APRSDPluginBase):
else:
searchcall = fromcall
api_key = self.config["aprs.fi"]["apiKey"]
try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex:
@ -184,20 +206,23 @@ class OWMWeatherPlugin(plugin.APRSDPluginBase):
lon = aprs_data["entries"][0]["lng"]
try:
utils.check_config_option(self.config, "openweathermap", "apiKey")
utils.check_config_option(
self.config,
["services", "openweathermap", "apiKey"],
)
except Exception as ex:
LOG.error("Failed to find config openweathermap:apiKey {}".format(ex))
return "No openweathermap apiKey found"
try:
utils.check_config_option(self.config, "aprsd", "units")
utils.check_config_option(self.config, ["aprsd", "units"])
except Exception:
LOG.debug("Couldn't find untis in aprsd:services:units")
units = "metric"
else:
units = self.config["aprsd"]["units"]
api_key = self.config["openweathermap"]["apiKey"]
api_key = self.config["services"]["openweathermap"]["apiKey"]
try:
wx_data = plugin_utils.fetch_openweathermap(
api_key,
@ -279,7 +304,13 @@ class AVWXWeatherPlugin(plugin.APRSDPluginBase):
else:
searchcall = fromcall
api_key = self.config["aprs.fi"]["apiKey"]
try:
utils.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config aprs.fi:apikey {}".format(ex))
return "No aprs.fi apikey found"
api_key = self.config["services"]["aprs.fi"]["apiKey"]
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex:
@ -295,20 +326,20 @@ class AVWXWeatherPlugin(plugin.APRSDPluginBase):
lon = aprs_data["entries"][0]["lng"]
try:
utils.check_config_option(self.config, "avwx", "apiKey")
utils.check_config_option(self.config, ["services", "avwx", "apiKey"])
except Exception as ex:
LOG.error("Failed to find config avwx:apiKey {}".format(ex))
return "No avwx apiKey found"
try:
utils.check_config_option(self.config, "avwx", "base_url")
utils.check_config_option(self.config, ["services", "avwx", "base_url"])
except Exception as ex:
LOG.debut("Didn't find avwx:base_url {}".format(ex))
base_url = "https://avwx.rest"
else:
base_url = self.config["avwx"]["base_url"]
base_url = self.config["services"]["avwx"]["base_url"]
api_key = self.config["avwx"]["apiKey"]
api_key = self.config["services"]["avwx"]["apiKey"]
token = "TOKEN {}".format(api_key)
headers = {"Authorization": token}
try:

View File

@ -17,37 +17,42 @@ DEFAULT_CONFIG_DICT = {
"aprs": {
"login": "CALLSIGN",
"password": "00000",
"host": "rotate.aprs.net",
"host": "rotate.aprs2.net",
"port": 14580,
"logfile": "/tmp/aprsd.log",
},
"aprs.fi": {"apiKey": "set me"},
"openweathermap": {"apiKey": "set me"},
"opencagedata": {"apiKey": "set me"},
"avwx": {"base_url": "http://host:port", "apiKey": "set me"},
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "SMTP_USERNAME",
"password": "SMTP_PASSWORD",
"host": "smtp.gmail.com",
"port": 465,
"use_ssl": False,
},
"imap": {
"login": "IMAP_USERNAME",
"password": "IMAP_PASSWORD",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
},
"aprsd": {
"logfile": "/tmp/aprsd.log",
"plugin_dir": "~/.config/aprsd/plugins",
"enabled_plugins": plugin.CORE_PLUGINS,
"units": "imperial",
"email": {
"enabled": True,
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "SMTP_USERNAME",
"password": "SMTP_PASSWORD",
"host": "smtp.gmail.com",
"port": 465,
"use_ssl": False,
},
"imap": {
"login": "IMAP_USERNAME",
"password": "IMAP_PASSWORD",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
},
},
},
"services": {
"aprs.fi": {"apiKey": "APIKEYVALUE"},
"openweathermap": {"apiKey": "APIKEYVALUE"},
"opencagedata": {"apiKey": "APIKEYVALUE"},
"avwx": {"base_url": "http://host:port", "apiKey": "APIKEYVALUE"},
},
}
@ -105,14 +110,33 @@ def end_substr(original, substr):
return idx
def dump_default_cfg():
return add_config_comments(
yaml.dump(
DEFAULT_CONFIG_DICT,
indent=4,
),
)
def add_config_comments(raw_yaml):
end_idx = end_substr(raw_yaml, "aprs:")
if end_idx != -1:
# lets insert a comment
raw_yaml = insert_str(
raw_yaml,
"\n # Get the passcode for your callsign here: "
"\n # https://apps.magicbug.co.uk/passcode",
end_idx,
)
end_idx = end_substr(raw_yaml, "aprs.fi:")
if end_idx != -1:
# lets insert a comment
raw_yaml = insert_str(
raw_yaml,
"\n # Get the apiKey from your aprs.fi account here: "
"\n # http://aprs.fi/account",
"\n # Get the apiKey from your aprs.fi account here: "
"\n # http://aprs.fi/account",
end_idx,
)
@ -121,9 +145,9 @@ def add_config_comments(raw_yaml):
# lets insert a comment
raw_yaml = insert_str(
raw_yaml,
"\n # (Optional for TimeOpenCageDataPlugin) "
"\n # Get the apiKey from your opencagedata account here: "
"\n # https://opencagedata.com/dashboard#api-keys",
"\n # (Optional for TimeOpenCageDataPlugin) "
"\n # Get the apiKey from your opencagedata account here: "
"\n # https://opencagedata.com/dashboard#api-keys",
end_idx,
)
@ -132,10 +156,10 @@ def add_config_comments(raw_yaml):
# lets insert a comment
raw_yaml = insert_str(
raw_yaml,
"\n # (Optional for OWMWeatherPlugin) "
"\n # Get the apiKey from your "
"\n # openweathermap account here: "
"\n # https://home.openweathermap.org/api_keys",
"\n # (Optional for OWMWeatherPlugin) "
"\n # Get the apiKey from your "
"\n # openweathermap account here: "
"\n # https://home.openweathermap.org/api_keys",
end_idx,
)
@ -144,10 +168,10 @@ def add_config_comments(raw_yaml):
# lets insert a comment
raw_yaml = insert_str(
raw_yaml,
"\n # (Optional for AVWXWeatherPlugin) "
"\n # Use hosted avwx-api here: https://avwx.rest "
"\n # or deploy your own from here: "
"\n # https://github.com/avwx-rest/avwx-api",
"\n # (Optional for AVWXWeatherPlugin) "
"\n # Use hosted avwx-api here: https://avwx.rest "
"\n # or deploy your own from here: "
"\n # https://github.com/avwx-rest/avwx-api",
end_idx,
)
@ -163,8 +187,7 @@ def create_default_config():
click.echo("Config dir '{}' doesn't exist, creating.".format(config_dir))
mkdir_p(config_dir)
with open(config_file_expanded, "w+") as cf:
raw_yaml = yaml.dump(DEFAULT_CONFIG_DICT)
cf.write(add_config_comments(raw_yaml))
cf.write(dump_default_cfg())
def get_config(config_file):
@ -194,33 +217,32 @@ def get_config(config_file):
sys.exit(-1)
def check_config_option(config, section, name=None, default=None, default_fail=None):
if section in config:
def conf_option_exists(conf, chain):
_key = chain.pop(0)
if _key in conf:
return conf_option_exists(conf[_key], chain) if chain else conf[_key]
if name and name not in config[section]:
if not default:
raise Exception(
"'{}' was not in '{}' section of config file".format(
name,
section,
),
)
else:
config[section][name] = default
else:
if (
default_fail
and name in config[section]
and config[section][name] == default_fail
):
def check_config_option(config, chain, default_fail=None):
result = conf_option_exists(config, chain.copy())
if not result:
raise Exception(
"'{}' was not in config file".format(
chain,
),
)
else:
if default_fail:
if result == default_fail:
# We have to fail and bail if the user hasn't edited
# this config option.
raise Exception(
"Config file needs to be edited from provided defaults.",
"Config file needs to be edited from provided defaults for {}.".format(
chain,
),
)
else:
raise Exception("'%s' section wasn't in config file" % section)
return config
else:
return config
# This method tries to parse the config yaml file
@ -235,41 +257,68 @@ def parse_config(config_file):
click.echo(msg)
sys.exit(-1)
def check_option(config, section, name=None, default=None, default_fail=None):
def check_option(config, chain, default_fail=None):
try:
config = check_config_option(config, section, name, default, default_fail)
config = check_config_option(config, chain, default_fail=default_fail)
except Exception as ex:
fail(repr(ex))
else:
return config
config = get_config(config_file)
check_option(config, "shortcuts")
# special check here to make sure user has edited the config file
# and changed the ham callsign
check_option(
config,
"ham",
"callsign",
[
"ham",
"callsign",
],
default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"],
)
check_option(
config,
"aprs.fi",
"apiKey",
default_fail=DEFAULT_CONFIG_DICT["aprs.fi"]["apiKey"],
["services", "aprs.fi", "apiKey"],
default_fail=DEFAULT_CONFIG_DICT["services"]["aprs.fi"]["apiKey"],
)
check_option(config, "aprs", "login")
check_option(config, "aprs", "password")
# check_option(config, "aprs", "host")
# check_option(config, "aprs", "port")
check_option(config, "aprs", "logfile", "./aprsd.log")
check_option(config, "imap", "host")
check_option(config, "imap", "login")
check_option(config, "imap", "password")
check_option(config, "smtp", "host")
check_option(config, "smtp", "port")
check_option(config, "smtp", "login")
check_option(config, "smtp", "password")
check_option(
config,
["aprs", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprs"]["login"],
)
check_option(
config,
["aprs", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprs"]["password"],
)
if config["aprsd"]["email"]["enabled"] is True:
# Check IMAP server settings
check_option(config, ["aprsd", "email", "imap", "host"])
check_option(config, ["aprsd", "email", "imap", "port"])
check_option(
config,
["aprsd", "email", "imap", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["imap"]["login"],
)
check_option(
config,
["aprsd", "email", "imap", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["imap"]["password"],
)
# Check SMTP server settings
check_option(config, ["aprsd", "email", "smtp", "host"])
check_option(config, ["aprsd", "email", "smtp", "port"])
check_option(
config,
["aprsd", "email", "smtp", "login"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["smtp"]["login"],
)
check_option(
config,
["aprsd", "email", "smtp", "password"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["email"]["smtp"]["password"],
)
return config

View File

@ -5,21 +5,21 @@ from aprsd import email
class TestEmail(unittest.TestCase):
def test_get_email_from_shortcut(self):
email.CONFIG = {"shortcuts": {}}
email.CONFIG = {"aprsd": {"email": {"shortcuts": {}}}}
email_address = "something@something.com"
addr = "-{}".format(email_address)
actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual)
email.CONFIG = {"nothing": "nothing"}
email.CONFIG = {"aprsd": {"email": {"nothing": "nothing"}}}
actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual)
email.CONFIG = {"shortcuts": {"not_used": "empty"}}
email.CONFIG = {"aprsd": {"email": {"shortcuts": {"not_used": "empty"}}}}
actual = email.get_email_from_shortcut(addr)
self.assertEqual(addr, actual)
email.CONFIG = {"shortcuts": {"-wb": email_address}}
email.CONFIG = {"aprsd": {"email": {"shortcuts": {"-wb": email_address}}}}
short = "-wb"
actual = email.get_email_from_shortcut(short)
self.assertEqual(email_address, actual)