1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-12 11:26:13 -05:00

Added new Config object.

The config object now has builtin dot notation getter with default

config.get("some.path.here", default="Not found")
This commit is contained in:
Hemna 2021-10-04 15:22:10 -04:00
parent a6ed7b894b
commit 491644ece6
6 changed files with 86 additions and 76 deletions

View File

@ -125,23 +125,19 @@ class KISSClient(Client):
if "kiss" not in config:
return False
if "serial" in config["kiss"]:
if config["kiss"]["serial"].get("enabled", False):
return True
if config.get("kiss.serial.enabled", default=False):
return True
if "tcp" in config["kiss"]:
if config["kiss"]["tcp"].get("enabled", False):
return True
if config.get("kiss.tcp.enabled", default=False):
return True
@staticmethod
def transport(config):
if "serial" in config["kiss"]:
if config["kiss"]["serial"].get("enabled", False):
return TRANSPORT_SERIALKISS
if config.get("kiss.serial.enabled", default=False):
return TRANSPORT_SERIALKISS
if "tcp" in config["kiss"]:
if config["kiss"]["tcp"].get("enabled", False):
return TRANSPORT_TCPKISS
if config.get("kiss.tcp.enabled", default=False):
return TRANSPORT_TCPKISS
def decode_packet(self, *args, **kwargs):
"""We get a frame, which has to be decoded."""

View File

@ -1,3 +1,4 @@
import collections
import logging
import os
from pathlib import Path
@ -134,6 +135,60 @@ DEFAULT_SAVE_FILE = f"{home}/.config/aprsd/aprsd.p"
DEFAULT_CONFIG_FILE = f"{home}/.config/aprsd/aprsd.yml"
class Config(collections.UserDict):
def _get(self, d, keys, default=None):
"""
Example:
d = {'meta': {'status': 'OK', 'status_code': 200}}
deep_get(d, ['meta', 'status_code']) # => 200
deep_get(d, ['garbage', 'status_code']) # => None
deep_get(d, ['meta', 'garbage'], default='-') # => '-'
"""
if type(keys) is str and "." in keys:
keys = keys.split(".")
assert type(keys) is list
if d is None:
return default
if not keys:
return d
if type(d) is str:
return default
return self._get(d.get(keys[0]), keys[1:], default)
def get(self, path, default=None):
return self._get(self.data, path, default=default)
def exists(self, path):
"""See if a conf value exists."""
test = "-3.14TEST41.3-"
return (self.get(path, default=test) != test)
def check_option(self, path, default_fail=None):
"""Make sure the config option doesn't have default value."""
if not self.exists(path):
raise Exception(
"Option '{}' was not in config file".format(
path,
),
)
val = self.get(path)
if val == default_fail:
# We have to fail and bail if the user hasn't edited
# this config option.
raise Exception(
"Config file needs to be changed from provided"
" defaults for '{}'".format(
path,
),
)
def add_config_comments(raw_yaml):
end_idx = utils.end_substr(raw_yaml, "aprs:")
if end_idx != -1:
@ -223,7 +278,7 @@ def get_config(config_file):
if os.path.exists(config_file_expanded):
with open(config_file_expanded) as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
return config
return Config(config)
else:
if config_file == DEFAULT_CONFIG_FILE:
click.echo(
@ -249,33 +304,28 @@ def get_config(config_file):
# If the required params don't exist,
# it will look in the environment
def parse_config(config_file):
# for now we still use globals....ugh
global CONFIG
config = get_config(config_file)
def fail(msg):
click.echo(msg)
sys.exit(-1)
def check_option(config, chain, default_fail=None):
def check_option(config, path, default_fail=None):
try:
config = check_config_option(config, chain, default_fail=default_fail)
config.check_option(path, default_fail=default_fail)
except Exception as ex:
fail(repr(ex))
else:
return config
config = get_config(config_file)
# special check here to make sure user has edited the config file
# and changed the ham callsign
check_option(
config,
[
"ham",
"callsign",
],
"ham.callsign",
default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"],
)
check_option(
config,
["services", "aprs.fi", "apiKey"],
@ -283,7 +333,7 @@ def parse_config(config_file):
)
check_option(
config,
["aprs", "login"],
"aprs.login",
default_fail=DEFAULT_CONFIG_DICT["aprs"]["login"],
)
check_option(
@ -293,21 +343,21 @@ def parse_config(config_file):
)
# Ensure they change the admin password
if config["aprsd"]["web"]["enabled"] is True:
if config.get("aprsd.web.enabled") is True:
check_option(
config,
["aprsd", "web", "users", "admin"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["web"]["users"]["admin"],
)
if config["aprsd"]["watch_list"]["enabled"] is True:
if config.get("aprsd.watch_list.enabled") is True:
check_option(
config,
["aprsd", "watch_list", "alert_callsign"],
default_fail=DEFAULT_CONFIG_DICT["aprsd"]["watch_list"]["alert_callsign"],
)
if config["aprsd"]["email"]["enabled"] is True:
if config.get("aprsd.email.enabled") is True:
# Check IMAP server settings
check_option(config, ["aprsd", "email", "imap", "host"])
check_option(config, ["aprsd", "email", "imap", "port"])
@ -337,31 +387,3 @@ def parse_config(config_file):
)
return config
def conf_option_exists(conf, chain):
_key = chain.pop(0)
if _key in conf:
return conf_option_exists(conf[_key], chain) if chain else conf[_key]
def check_config_option(config, chain, default_fail=None):
result = conf_option_exists(config, chain.copy())
if result is None:
raise Exception(
"'{}' was not in config file".format(
chain,
),
)
else:
if default_fail:
if result == default_fail:
# We have to fail and bail if the user hasn't edited
# this config option.
raise Exception(
"Config file needs to be edited from provided defaults for {}.".format(
chain,
),
)
else:
return config

View File

@ -174,20 +174,13 @@ def setup_logging(config, loglevel, quiet):
LOG.addHandler(fh)
imap_logger = None
if config["aprsd"]["email"].get("enabled", False) and config["aprsd"]["email"][
"imap"
].get("debug", False):
if config.get("aprsd.email.enabled", default=False) and config.get("aprsd.email.imap.debug", default=False):
imap_logger = logging.getLogger("imapclient.imaplib")
imap_logger.setLevel(log_level)
imap_logger.addHandler(fh)
if (
aprsd_config.check_config_option(
config, ["aprsd", "web", "enabled"],
default_fail=False,
)
):
if config.get("aprsd.web.enabled", default=False):
qh = logging.handlers.QueueHandler(threads.logging_queue)
q_log_formatter = logging.Formatter(
fmt=aprsd_config.QUEUE_LOG_FORMAT,
@ -498,7 +491,7 @@ def server(
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()
web_enabled = aprsd_config.check_config_option(config, ["aprsd", "web", "enabled"], default_fail=False)
web_enabled = config.get("aprsd.web.enabled", default=False)
if web_enabled:
flask_enabled = True

View File

@ -56,7 +56,6 @@ class EmailInfo:
self._delay = val
class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
"""Email Plugin."""

View File

@ -2,7 +2,7 @@ import logging
import re
import time
from aprsd import config, plugin, plugin_utils, trace
from aprsd import plugin, plugin_utils, trace
LOG = logging.getLogger("APRSD")
@ -24,7 +24,7 @@ class LocationPlugin(plugin.APRSDRegexCommandPluginBase):
# get last location of a callsign, get descriptive name from weather service
try:
config.check_config_option(self.config, ["services", "aprs.fi", "apiKey"])
self.config.check_option(["services", "aprs.fi", "apiKey"])
except Exception as ex:
LOG.error(f"Failed to find config aprs.fi:apikey {ex}")
return "No aprs.fi apikey found"

View File

@ -5,21 +5,21 @@ from aprsd.plugins import email
class TestEmail(unittest.TestCase):
def test_get_email_from_shortcut(self):
email.CONFIG = {"aprsd": {"email": {"shortcuts": {}}}}
config = {"aprsd": {"email": {"shortcuts": {}}}}
email_address = "something@something.com"
addr = f"-{email_address}"
actual = email.get_email_from_shortcut(addr)
actual = email.get_email_from_shortcut(config, addr)
self.assertEqual(addr, actual)
email.CONFIG = {"aprsd": {"email": {"nothing": "nothing"}}}
actual = email.get_email_from_shortcut(addr)
config = {"aprsd": {"email": {"nothing": "nothing"}}}
actual = email.get_email_from_shortcut(config, addr)
self.assertEqual(addr, actual)
email.CONFIG = {"aprsd": {"email": {"shortcuts": {"not_used": "empty"}}}}
actual = email.get_email_from_shortcut(addr)
config = {"aprsd": {"email": {"shortcuts": {"not_used": "empty"}}}}
actual = email.get_email_from_shortcut(config, addr)
self.assertEqual(addr, actual)
email.CONFIG = {"aprsd": {"email": {"shortcuts": {"-wb": email_address}}}}
config = {"aprsd": {"email": {"shortcuts": {"-wb": email_address}}}}
short = "-wb"
actual = email.get_email_from_shortcut(short)
actual = email.get_email_from_shortcut(config, short)
self.assertEqual(email_address, actual)