1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-04-19 09:49:01 -04:00

Fixed some more ruff checks

this updates some code to fix any more 'ruff check' failures.
This commit is contained in:
Hemna 2025-03-05 16:48:18 -05:00
parent 2b185ee1b8
commit 94ba915ed4
5 changed files with 234 additions and 232 deletions

View File

@ -3,58 +3,60 @@ import sys
import time
import aprslib
from aprslib.exceptions import LoginError
import click
from aprslib.exceptions import LoginError
from oslo_config import cfg
import aprsd
from aprsd import cli_helper, packets
from aprsd import conf # noqa : F401
import aprsd.packets # noqa : F401
from aprsd import (
cli_helper,
conf, # noqa : F401
packets,
)
from aprsd.client import client_factory
from aprsd.main import cli
import aprsd.packets # noqa : F401
from aprsd.packets import collector
from aprsd.packets import log as packet_log
from aprsd.threads import tx
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.option(
"--aprs-login",
envvar="APRS_LOGIN",
'--aprs-login',
envvar='APRS_LOGIN',
show_envvar=True,
help="What callsign to send the message from. Defaults to config entry.",
help='What callsign to send the message from. Defaults to config entry.',
)
@click.option(
"--aprs-password",
envvar="APRS_PASSWORD",
'--aprs-password',
envvar='APRS_PASSWORD',
show_envvar=True,
help="the APRS-IS password for APRS_LOGIN. Defaults to config entry.",
help='the APRS-IS password for APRS_LOGIN. Defaults to config entry.',
)
@click.option(
"--no-ack",
"-n",
'--no-ack',
'-n',
is_flag=True,
show_default=True,
default=False,
help="Don't wait for an ack, just sent it to APRS-IS and bail.",
)
@click.option(
"--wait-response",
"-w",
'--wait-response',
'-w',
is_flag=True,
show_default=True,
default=False,
help="Wait for a response to the message?",
help='Wait for a response to the message?',
)
@click.option("--raw", default=None, help="Send a raw message. Implies --no-ack")
@click.argument("tocallsign", required=True)
@click.argument("command", nargs=-1, required=True)
@click.option('--raw', default=None, help='Send a raw message. Implies --no-ack')
@click.argument('tocallsign', required=True)
@click.argument('command', nargs=-1, required=True)
@click.pass_context
@cli_helper.process_standard_options
def send_message(
@ -69,11 +71,11 @@ def send_message(
):
"""Send a message to a callsign via APRS_IS."""
global got_ack, got_response
quiet = ctx.obj["quiet"]
quiet = ctx.obj['quiet']
if not aprs_login:
if CONF.aprs_network.login == conf.client.DEFAULT_LOGIN:
click.echo("Must set --aprs_login or APRS_LOGIN")
click.echo('Must set --aprs_login or APRS_LOGIN')
ctx.exit(-1)
return
else:
@ -81,15 +83,15 @@ def send_message(
if not aprs_password:
if not CONF.aprs_network.password:
click.echo("Must set --aprs-password or APRS_PASSWORD")
click.echo('Must set --aprs-password or APRS_PASSWORD')
ctx.exit(-1)
return
else:
aprs_password = CONF.aprs_network.password
LOG.info(f"APRSD LISTEN Started version: {aprsd.__version__}")
LOG.info(f'APRSD LISTEN Started version: {aprsd.__version__}')
if type(command) is tuple:
command = " ".join(command)
command = ' '.join(command)
if not quiet:
if raw:
LOG.info(f"L'{aprs_login}' R'{raw}'")
@ -129,7 +131,7 @@ def send_message(
sys.exit(0)
try:
client_factory.create().client
client_factory.create().client # noqa: B018
except LoginError:
sys.exit(-1)
@ -140,7 +142,7 @@ def send_message(
# message
if raw:
tx.send(
packets.Packet(from_call="", to_call="", raw=raw),
packets.Packet(from_call='', to_call='', raw=raw),
direct=True,
)
sys.exit(0)
@ -164,7 +166,7 @@ def send_message(
aprs_client = client_factory.create().client
aprs_client.consumer(rx_packet, raw=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
LOG.error('Connection dropped, reconnecting')
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()

View File

@ -39,8 +39,8 @@ from aprsd.stats import collector
# setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
LOG = logging.getLogger('APRSD')
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
flask_enabled = False
@ -68,18 +68,18 @@ def main():
# First import all the possible commands for the CLI
# The commands themselves live in the cmds directory
load_commands()
utils.load_entry_points("aprsd.extension")
cli(auto_envvar_prefix="APRSD")
utils.load_entry_points('aprsd.extension')
cli(auto_envvar_prefix='APRSD')
def signal_handler(sig, frame):
global flask_enabled
click.echo("signal_handler: called")
click.echo('signal_handler: called')
threads.APRSDThreadList().stop_all()
if "subprocess" not in str(frame):
if 'subprocess' not in str(frame):
LOG.info(
"Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format(
'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
datetime.datetime.now(),
),
)
@ -91,7 +91,7 @@ def signal_handler(sig, frame):
packets.PacketList().save()
collector.Collector().collect()
except Exception as e:
LOG.error(f"Failed to save data: {e}")
LOG.error(f'Failed to save data: {e}')
sys.exit(0)
# signal.signal(signal.SIGTERM, sys.exit(0))
# sys.exit(0)
@ -108,9 +108,9 @@ def check_version(ctx):
"""Check this version against the latest in pypi.org."""
level, msg = utils._check_version()
if level:
click.secho(msg, fg="yellow")
click.secho(msg, fg='yellow')
else:
click.secho(msg, fg="green")
click.secho(msg, fg='green')
@cli.command()
@ -124,12 +124,12 @@ def sample_config(ctx):
if sys.version_info < (3, 10):
all = imp.entry_points()
selected = []
if "oslo.config.opts" in all:
for x in all["oslo.config.opts"]:
if x.group == "oslo.config.opts":
if 'oslo.config.opts' in all:
for x in all['oslo.config.opts']:
if x.group == 'oslo.config.opts':
selected.append(x)
else:
selected = imp.entry_points(group="oslo.config.opts")
selected = imp.entry_points(group='oslo.config.opts')
return selected
@ -139,23 +139,23 @@ def sample_config(ctx):
# selected = imp.entry_points(group="oslo.config.opts")
selected = _get_selected_entry_points()
for entry in selected:
if "aprsd" in entry.name:
args.append("--namespace")
if 'aprsd' in entry.name:
args.append('--namespace')
args.append(entry.name)
return args
args = get_namespaces()
config_version = metadata_version("oslo.config")
config_version = metadata_version('oslo.config')
logging.basicConfig(level=logging.WARN)
conf = cfg.ConfigOpts()
generator.register_cli_opts(conf)
try:
conf(args, version=config_version)
except cfg.RequiredOptError:
except cfg.RequiredOptError as ex:
conf.print_help()
if not sys.argv[1:]:
raise SystemExit
raise SystemExit from ex
raise
generator.generate(conf)
return
@ -165,9 +165,9 @@ def sample_config(ctx):
@click.pass_context
def version(ctx):
"""Show the APRSD version."""
click.echo(click.style("APRSD Version : ", fg="white"), nl=False)
click.secho(f"{aprsd.__version__}", fg="yellow", bold=True)
click.echo(click.style('APRSD Version : ', fg='white'), nl=False)
click.secho(f'{aprsd.__version__}', fg='yellow', bold=True)
if __name__ == "__main__":
if __name__ == '__main__':
main()

View File

@ -17,24 +17,24 @@ from aprsd.packets import watch_list
# setup the global logger
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
CORE_MESSAGE_PLUGINS = [
"aprsd.plugins.email.EmailPlugin",
"aprsd.plugins.fortune.FortunePlugin",
"aprsd.plugins.location.LocationPlugin",
"aprsd.plugins.ping.PingPlugin",
"aprsd.plugins.time.TimePlugin",
"aprsd.plugins.weather.USWeatherPlugin",
"aprsd.plugins.version.VersionPlugin",
'aprsd.plugins.email.EmailPlugin',
'aprsd.plugins.fortune.FortunePlugin',
'aprsd.plugins.location.LocationPlugin',
'aprsd.plugins.ping.PingPlugin',
'aprsd.plugins.time.TimePlugin',
'aprsd.plugins.weather.USWeatherPlugin',
'aprsd.plugins.version.VersionPlugin',
]
CORE_NOTIFY_PLUGINS = [
"aprsd.plugins.notify.NotifySeenPlugin",
'aprsd.plugins.notify.NotifySeenPlugin',
]
hookspec = pluggy.HookspecMarker("aprsd")
hookimpl = pluggy.HookimplMarker("aprsd")
hookspec = pluggy.HookspecMarker('aprsd')
hookimpl = pluggy.HookimplMarker('aprsd')
class APRSDPluginSpec:
@ -76,14 +76,14 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
else:
LOG.error(
"Can't start thread {}:{}, Must be a child "
"of aprsd.threads.APRSDThread".format(
'of aprsd.threads.APRSDThread'.format(
self,
thread,
),
)
except Exception:
LOG.error(
"Failed to start threads for plugin {}".format(
'Failed to start threads for plugin {}'.format(
self,
),
)
@ -93,7 +93,7 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
return self.message_counter
def help(self) -> str:
return "Help!"
return 'Help!'
@abc.abstractmethod
def setup(self):
@ -147,10 +147,10 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
# make sure the timeout is set or this doesn't work
if watch_list:
aprs_client = client.client_factory.create().client
filter_str = "b/{}".format("/".join(watch_list))
filter_str = 'b/{}'.format('/'.join(watch_list))
aprs_client.set_filter(filter_str)
else:
LOG.warning("Watch list enabled, but no callsigns set.")
LOG.warning('Watch list enabled, but no callsigns set.')
@hookimpl
def filter(self, packet: type[packets.Packet]) -> str | packets.MessagePacket:
@ -164,7 +164,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
result = self.process(packet)
except Exception as ex:
LOG.error(
"Plugin {} failed to process packet {}".format(
'Plugin {} failed to process packet {}'.format(
self.__class__,
ex,
),
@ -172,7 +172,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
if result:
self.tx_inc()
else:
LOG.warning(f"{self.__class__} plugin is not enabled")
LOG.warning(f'{self.__class__} plugin is not enabled')
return result
@ -196,7 +196,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
raise NotImplementedError
def help(self):
return "{}: {}".format(
return '{}: {}'.format(
self.command_name.lower(),
self.command_regex,
)
@ -207,7 +207,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
@hookimpl
def filter(self, packet: packets.MessagePacket) -> str | packets.MessagePacket:
LOG.debug(f"{self.__class__.__name__} called")
LOG.debug(f'{self.__class__.__name__} called')
if not self.enabled:
result = f"{self.__class__.__name__} isn't enabled"
LOG.warning(result)
@ -215,7 +215,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
if not isinstance(packet, packets.MessagePacket):
LOG.warning(
f"{self.__class__.__name__} Got a {packet.__class__.__name__} ignoring"
f'{self.__class__.__name__} Got a {packet.__class__.__name__} ignoring'
)
return packets.NULL_MESSAGE
@ -237,7 +237,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
result = self.process(packet)
except Exception as ex:
LOG.error(
"Plugin {} failed to process packet {}".format(
'Plugin {} failed to process packet {}'.format(
self.__class__,
ex,
),
@ -254,7 +254,7 @@ class APRSFIKEYMixin:
def ensure_aprs_fi_key(self):
if not CONF.aprs_fi.apiKey:
LOG.error("Config aprs_fi.apiKey is not set")
LOG.error('Config aprs_fi.apiKey is not set')
self.enabled = False
else:
self.enabled = True
@ -266,25 +266,25 @@ class HelpPlugin(APRSDRegexCommandPluginBase):
This plugin is in this file to prevent a circular import.
"""
command_regex = "^[hH]"
command_name = "help"
command_regex = '^[hH]'
command_name = 'help'
def help(self):
return "Help: send APRS help or help <plugin>"
return 'Help: send APRS help or help <plugin>'
def process(self, packet: packets.MessagePacket):
LOG.info("HelpPlugin")
LOG.info('HelpPlugin')
# fromcall = packet.get("from")
message = packet.message_text
# ack = packet.get("msgNo", "0")
a = re.search(r"^.*\s+(.*)", message)
a = re.search(r'^.*\s+(.*)', message)
command_name = None
if a is not None:
command_name = a.group(1).lower()
pm = PluginManager()
if command_name and "?" not in command_name:
if command_name and '?' not in command_name:
# user wants help for a specific plugin
reply = None
for p in pm.get_plugins():
@ -303,20 +303,20 @@ class HelpPlugin(APRSDRegexCommandPluginBase):
LOG.debug(p)
if p.enabled and isinstance(p, APRSDRegexCommandPluginBase):
name = p.command_name.lower()
if name not in list and "help" not in name:
if name not in list and 'help' not in name:
list.append(name)
list.sort()
reply = " ".join(list)
reply = ' '.join(list)
lines = textwrap.wrap(reply, 60)
replies = ["Send APRS MSG of 'help' or 'help <plugin>'"]
for line in lines:
replies.append(f"plugins: {line}")
replies.append(f'plugins: {line}')
for entry in replies:
LOG.debug(f"{len(entry)} {entry}")
LOG.debug(f'{len(entry)} {entry}')
LOG.debug(f"{replies}")
LOG.debug(f'{replies}')
return replies
@ -341,17 +341,17 @@ class PluginManager:
return cls._instance
def _init(self):
self._pluggy_pm = pluggy.PluginManager("aprsd")
self._pluggy_pm = pluggy.PluginManager('aprsd')
self._pluggy_pm.add_hookspecs(APRSDPluginSpec)
# For the watchlist plugins
self._watchlist_pm = pluggy.PluginManager("aprsd")
self._watchlist_pm = pluggy.PluginManager('aprsd')
self._watchlist_pm.add_hookspecs(APRSDPluginSpec)
def stats(self, serializable=False) -> dict:
"""Collect and return stats for all plugins."""
def full_name_with_qualname(obj):
return "{}.{}".format(
return '{}.{}'.format(
obj.__class__.__module__,
obj.__class__.__qualname__,
)
@ -361,10 +361,10 @@ class PluginManager:
if plugins:
for p in plugins:
plugin_stats[full_name_with_qualname(p)] = {
"enabled": p.enabled,
"rx": p.rx_count,
"tx": p.tx_count,
"version": p.version,
'enabled': p.enabled,
'rx': p.rx_count,
'tx': p.tx_count,
'version': p.version,
}
return plugin_stats
@ -392,19 +392,19 @@ class PluginManager:
module_name = None
class_name = None
try:
module_name, class_name = module_class_string.rsplit(".", 1)
module_name, class_name = module_class_string.rsplit('.', 1)
module = importlib.import_module(module_name)
# Commented out because the email thread starts in a different context
# and hence gives a different singleton for the EmailStats
# module = importlib.reload(module)
except Exception as ex:
if not module_name:
LOG.error(f"Failed to load Plugin {module_class_string}")
LOG.error(f'Failed to load Plugin {module_class_string}')
else:
LOG.error(f"Failed to load Plugin '{module_name}' : '{ex}'")
return
assert hasattr(module, class_name), "class {} is not in {}".format(
assert hasattr(module, class_name), 'class {} is not in {}'.format(
class_name,
module_name,
)
@ -412,7 +412,7 @@ class PluginManager:
# class_name, module_name))
cls = getattr(module, class_name)
if super_cls is not None:
assert issubclass(cls, super_cls), "class {} should inherit from {}".format(
assert issubclass(cls, super_cls), 'class {} should inherit from {}'.format(
class_name,
super_cls.__name__,
)
@ -444,7 +444,7 @@ class PluginManager:
self._watchlist_pm.register(plugin_obj)
else:
LOG.warning(
f"Plugin {plugin_obj.__class__.__name__} is disabled"
f'Plugin {plugin_obj.__class__.__name__} is disabled'
)
elif isinstance(plugin_obj, APRSDRegexCommandPluginBase):
if plugin_obj.enabled:
@ -458,7 +458,7 @@ class PluginManager:
self._pluggy_pm.register(plugin_obj)
else:
LOG.warning(
f"Plugin {plugin_obj.__class__.__name__} is disabled"
f'Plugin {plugin_obj.__class__.__name__} is disabled'
)
elif isinstance(plugin_obj, APRSDPluginBase):
if plugin_obj.enabled:
@ -471,7 +471,7 @@ class PluginManager:
self._pluggy_pm.register(plugin_obj)
else:
LOG.warning(
f"Plugin {plugin_obj.__class__.__name__} is disabled"
f'Plugin {plugin_obj.__class__.__name__} is disabled'
)
except Exception as ex:
LOG.error(f"Couldn't load plugin '{plugin_name}'")
@ -485,11 +485,11 @@ class PluginManager:
def setup_plugins(
self,
load_help_plugin=True,
plugin_list=[],
plugin_list=None,
):
"""Create the plugin manager and register plugins."""
LOG.info("Loading APRSD Plugins")
LOG.info('Loading APRSD Plugins')
# Help plugin is always enabled.
if load_help_plugin:
_help = HelpPlugin()
@ -509,7 +509,7 @@ class PluginManager:
for p_name in CORE_MESSAGE_PLUGINS:
self._load_plugin(p_name)
LOG.info("Completed Plugin Loading.")
LOG.info('Completed Plugin Loading.')
def run(self, packet: packets.MessagePacket):
"""Execute all the plugins run method."""
@ -524,7 +524,7 @@ class PluginManager:
"""Stop all threads created by all plugins."""
with self.lock:
for p in self.get_plugins():
if hasattr(p, "stop_threads"):
if hasattr(p, 'stop_threads'):
p.stop_threads()
def register_msg(self, obj):

View File

@ -4,21 +4,20 @@ import logging
import requests
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
def get_aprs_fi(api_key, callsign):
LOG.debug(f"Fetch aprs.fi location for '{callsign}'")
try:
url = (
"http://api.aprs.fi/api/get?"
"&what=loc&apikey={}&format=json"
"&name={}".format(api_key, callsign)
'http://api.aprs.fi/api/get?&what=loc&apikey={}&format=json&name={}'.format(
api_key, callsign
)
)
response = requests.get(url)
except Exception:
raise Exception("Failed to get aprs.fi location")
except Exception as e:
raise Exception('Failed to get aprs.fi location') from e
else:
response.raise_for_status()
return json.loads(response.text)
@ -26,22 +25,22 @@ def get_aprs_fi(api_key, callsign):
def get_weather_gov_for_gps(lat, lon):
# FIXME(hemna) This is currently BROKEN
LOG.debug(f"Fetch station at {lat}, {lon}")
LOG.debug(f'Fetch station at {lat}, {lon}')
headers = requests.utils.default_headers()
headers.update(
{"User-Agent": "(aprsd, waboring@hemna.com)"},
{'User-Agent': '(aprsd, waboring@hemna.com)'},
)
try:
url2 = (
"https://forecast.weather.gov/MapClick.php?lat=%s"
"&lon=%s&FcstType=json" % (lat, lon)
'https://forecast.weather.gov/MapClick.php?lat=%s'
'&lon=%s&FcstType=json' % (lat, lon)
# f"https://api.weather.gov/points/{lat},{lon}"
)
LOG.debug(f"Fetching weather '{url2}'")
response = requests.get(url2, headers=headers)
except Exception as e:
LOG.error(e)
raise Exception("Failed to get weather")
raise Exception('Failed to get weather') from e
else:
response.raise_for_status()
return json.loads(response.text)
@ -50,25 +49,25 @@ def get_weather_gov_for_gps(lat, lon):
def get_weather_gov_metar(station):
LOG.debug(f"Fetch metar for station '{station}'")
try:
url = "https://api.weather.gov/stations/{}/observations/latest".format(
url = 'https://api.weather.gov/stations/{}/observations/latest'.format(
station,
)
response = requests.get(url)
except Exception:
raise Exception("Failed to fetch metar")
except Exception as e:
raise Exception('Failed to fetch metar') from e
else:
response.raise_for_status()
return json.loads(response)
def fetch_openweathermap(api_key, lat, lon, units="metric", exclude=None):
LOG.debug(f"Fetch openweathermap for {lat}, {lon}")
def fetch_openweathermap(api_key, lat, lon, units='metric', exclude=None):
LOG.debug(f'Fetch openweathermap for {lat}, {lon}')
if not exclude:
exclude = "minutely,hourly,daily,alerts"
exclude = 'minutely,hourly,daily,alerts'
try:
url = (
"https://api.openweathermap.org/data/2.5/onecall?"
"lat={}&lon={}&appid={}&units={}&exclude={}".format(
'https://api.openweathermap.org/data/2.5/onecall?'
'lat={}&lon={}&appid={}&units={}&exclude={}'.format(
lat,
lon,
api_key,
@ -80,7 +79,7 @@ def fetch_openweathermap(api_key, lat, lon, units="metric", exclude=None):
response = requests.get(url)
except Exception as e:
LOG.error(e)
raise Exception("Failed to get weather")
raise Exception('Failed to get weather') from e
else:
response.raise_for_status()
return json.loads(response.text)

View File

@ -9,7 +9,7 @@ from aprsd import plugin, plugin_utils
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class USWeatherPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
@ -26,22 +26,22 @@ class USWeatherPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin)
"""
# command_regex = r"^([w][x]|[w][x]\s|weather)"
command_regex = r"^[wW]"
command_regex = r'^[wW]'
command_name = "USWeather"
short_description = "Provide USA only weather of GPS Beacon location"
command_name = 'USWeather'
short_description = 'Provide USA only weather of GPS Beacon location'
def setup(self):
self.ensure_aprs_fi_key()
@trace.trace
def process(self, packet):
LOG.info("Weather Plugin")
LOG.info('Weather Plugin')
fromcall = packet.from_call
message = packet.get("message_text", None)
message = packet.get('message_text', None)
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
a = re.search(r"^.*\s+(.*)", message)
a = re.search(r'^.*\s+(.*)', message)
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
@ -51,34 +51,34 @@ class USWeatherPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin)
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex:
LOG.error(f"Failed to fetch aprs.fi data {ex}")
return "Failed to fetch aprs.fi location"
LOG.error(f'Failed to fetch aprs.fi data {ex}')
return 'Failed to fetch aprs.fi location'
LOG.debug(f"LocationPlugin: aprs_data = {aprs_data}")
if not len(aprs_data["entries"]):
LOG.debug(f'LocationPlugin: aprs_data = {aprs_data}')
if not len(aprs_data['entries']):
LOG.error("Didn't get any entries from aprs.fi")
return "Failed to fetch aprs.fi location"
return 'Failed to fetch aprs.fi location'
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
try:
wx_data = plugin_utils.get_weather_gov_for_gps(lat, lon)
except Exception as ex:
LOG.error(f"Couldn't fetch forecast.weather.gov '{ex}'")
return "Unable to get weather"
return 'Unable to get weather'
LOG.info(f"WX data {wx_data}")
LOG.info(f'WX data {wx_data}')
reply = (
"%sF(%sF/%sF) %s. %s, %s."
'%sF(%sF/%sF) %s. %s, %s.'
% (
wx_data["currentobservation"]["Temp"],
wx_data["data"]["temperature"][0],
wx_data["data"]["temperature"][1],
wx_data["data"]["weather"][0],
wx_data["time"]["startPeriodName"][1],
wx_data["data"]["weather"][1],
wx_data['currentobservation']['Temp'],
wx_data['data']['temperature'][0],
wx_data['data']['temperature'][1],
wx_data['data']['weather'][0],
wx_data['time']['startPeriodName'][1],
wx_data['data']['weather'][1],
)
).rstrip()
LOG.debug(f"reply: '{reply}' ")
@ -100,31 +100,31 @@ class USMetarPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
"""
command_regex = r"^([m]|[M]|[m]\s|metar)"
command_name = "USMetar"
short_description = "USA only METAR of GPS Beacon location"
command_regex = r'^([m]|[M]|[m]\s|metar)'
command_name = 'USMetar'
short_description = 'USA only METAR of GPS Beacon location'
def setup(self):
self.ensure_aprs_fi_key()
@trace.trace
def process(self, packet):
fromcall = packet.get("from")
message = packet.get("message_text", None)
fromcall = packet.get('from')
message = packet.get('message_text', None)
# ack = packet.get("msgNo", "0")
LOG.info(f"WX Plugin '{message}'")
a = re.search(r"^.*\s+(.*)", message)
a = re.search(r'^.*\s+(.*)', message)
if a is not None:
searchcall = a.group(1)
station = searchcall.upper()
try:
resp = plugin_utils.get_weather_gov_metar(station)
except Exception as e:
LOG.debug(f"Weather failed with: {str(e)}")
reply = "Unable to find station METAR"
LOG.debug(f'Weather failed with: {str(e)}')
reply = 'Unable to find station METAR'
else:
station_data = json.loads(resp.text)
reply = station_data["properties"]["rawMessage"]
reply = station_data['properties']['rawMessage']
return reply
else:
@ -136,36 +136,36 @@ class USMetarPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, fromcall)
except Exception as ex:
LOG.error(f"Failed to fetch aprs.fi data {ex}")
return "Failed to fetch aprs.fi location"
LOG.error(f'Failed to fetch aprs.fi data {ex}')
return 'Failed to fetch aprs.fi location'
# LOG.debug("LocationPlugin: aprs_data = {}".format(aprs_data))
if not len(aprs_data["entries"]):
LOG.error("Found no entries from aprs.fi!")
return "Failed to fetch aprs.fi location"
if not len(aprs_data['entries']):
LOG.error('Found no entries from aprs.fi!')
return 'Failed to fetch aprs.fi location'
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
try:
wx_data = plugin_utils.get_weather_gov_for_gps(lat, lon)
except Exception as ex:
LOG.error(f"Couldn't fetch forecast.weather.gov '{ex}'")
return "Unable to metar find station."
return 'Unable to metar find station.'
if wx_data["location"]["metar"]:
station = wx_data["location"]["metar"]
if wx_data['location']['metar']:
station = wx_data['location']['metar']
try:
resp = plugin_utils.get_weather_gov_metar(station)
except Exception as e:
LOG.debug(f"Weather failed with: {str(e)}")
reply = "Failed to get Metar"
LOG.debug(f'Weather failed with: {str(e)}')
reply = 'Failed to get Metar'
else:
station_data = json.loads(resp.text)
reply = station_data["properties"]["rawMessage"]
reply = station_data['properties']['rawMessage']
else:
# Couldn't find a station
reply = "No Metar station found"
reply = 'No Metar station found'
return reply
@ -190,35 +190,36 @@ class OWMWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
"""
# command_regex = r"^([w][x]|[w][x]\s|weather)"
command_regex = r"^[wW]"
command_regex = r'^[wW]'
command_name = "OpenWeatherMap"
short_description = "OpenWeatherMap weather of GPS Beacon location"
command_name = 'OpenWeatherMap'
short_description = 'OpenWeatherMap weather of GPS Beacon location'
def setup(self):
if not CONF.owm_weather_plugin.apiKey:
LOG.error("Config.owm_weather_plugin.apiKey is not set. Disabling")
LOG.error('Config.owm_weather_plugin.apiKey is not set. Disabling')
self.enabled = False
else:
self.enabled = True
def help(self):
_help = [
"openweathermap: Send {} to get weather " "from your location".format(
'openweathermap: Send {} to get weather from your location'.format(
self.command_regex
),
'openweathermap: Send {} <callsign> to get weather from <callsign>'.format(
self.command_regex
),
"openweathermap: Send {} <callsign> to get "
"weather from <callsign>".format(self.command_regex),
]
return _help
@trace.trace
def process(self, packet):
fromcall = packet.get("from_call")
message = packet.get("message_text", None)
fromcall = packet.get('from_call')
message = packet.get('message_text', None)
# ack = packet.get("msgNo", "0")
LOG.info(f"OWMWeather Plugin '{message}'")
a = re.search(r"^.*\s+(.*)", message)
a = re.search(r'^.*\s+(.*)', message)
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
@ -230,16 +231,16 @@ class OWMWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex:
LOG.error(f"Failed to fetch aprs.fi data {ex}")
return "Failed to fetch location"
LOG.error(f'Failed to fetch aprs.fi data {ex}')
return 'Failed to fetch location'
# LOG.debug("LocationPlugin: aprs_data = {}".format(aprs_data))
if not len(aprs_data["entries"]):
LOG.error("Found no entries from aprs.fi!")
return "Failed to fetch location"
if not len(aprs_data['entries']):
LOG.error('Found no entries from aprs.fi!')
return 'Failed to fetch location'
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
units = CONF.units
api_key = CONF.owm_weather_plugin.apiKey
@ -249,40 +250,40 @@ class OWMWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
lat,
lon,
units=units,
exclude="minutely,hourly",
exclude='minutely,hourly',
)
except Exception as ex:
LOG.error(f"Couldn't fetch openweathermap api '{ex}'")
# default to UTC
return "Unable to get weather"
return 'Unable to get weather'
if units == "metric":
degree = "C"
if units == 'metric':
degree = 'C'
else:
degree = "F"
degree = 'F'
if "wind_gust" in wx_data["current"]:
wind = "{:.0f}@{}G{:.0f}".format(
wx_data["current"]["wind_speed"],
wx_data["current"]["wind_deg"],
wx_data["current"]["wind_gust"],
if 'wind_gust' in wx_data['current']:
wind = '{:.0f}@{}G{:.0f}'.format(
wx_data['current']['wind_speed'],
wx_data['current']['wind_deg'],
wx_data['current']['wind_gust'],
)
else:
wind = "{:.0f}@{}".format(
wx_data["current"]["wind_speed"],
wx_data["current"]["wind_deg"],
wind = '{:.0f}@{}'.format(
wx_data['current']['wind_speed'],
wx_data['current']['wind_deg'],
)
# LOG.debug(wx_data["current"])
# LOG.debug(wx_data["daily"])
reply = "{} {:.1f}{}/{:.1f}{} Wind {} {}%".format(
wx_data["current"]["weather"][0]["description"],
wx_data["current"]["temp"],
reply = '{} {:.1f}{}/{:.1f}{} Wind {} {}%'.format(
wx_data['current']['weather'][0]['description'],
wx_data['current']['temp'],
degree,
wx_data["current"]["dew_point"],
wx_data['current']['dew_point'],
degree,
wind,
wx_data["current"]["humidity"],
wx_data['current']['humidity'],
)
return reply
@ -311,26 +312,26 @@ class AVWXWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
docker build -f Dockerfile -t avwx-api:master .
"""
command_regex = r"^([m]|[m]|[m]\s|metar)"
command_name = "AVWXWeather"
short_description = "AVWX weather of GPS Beacon location"
command_regex = r'^([m]|[m]|[m]\s|metar)'
command_name = 'AVWXWeather'
short_description = 'AVWX weather of GPS Beacon location'
def setup(self):
if not CONF.avwx_plugin.base_url:
LOG.error("Config avwx_plugin.base_url not specified. Disabling")
LOG.error('Config avwx_plugin.base_url not specified. Disabling')
return False
elif not CONF.avwx_plugin.apiKey:
LOG.error("Config avwx_plugin.apiKey not specified. Disabling")
LOG.error('Config avwx_plugin.apiKey not specified. Disabling')
return False
else:
return True
def help(self):
_help = [
"avwxweather: Send {} to get weather " "from your location".format(
'avwxweather: Send {} to get weather from your location'.format(
self.command_regex
),
"avwxweather: Send {} <callsign> to get " "weather from <callsign>".format(
'avwxweather: Send {} <callsign> to get weather from <callsign>'.format(
self.command_regex
),
]
@ -338,11 +339,11 @@ class AVWXWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
@trace.trace
def process(self, packet):
fromcall = packet.get("from")
message = packet.get("message_text", None)
fromcall = packet.get('from')
message = packet.get('message_text', None)
# ack = packet.get("msgNo", "0")
LOG.info(f"AVWXWeather Plugin '{message}'")
a = re.search(r"^.*\s+(.*)", message)
a = re.search(r'^.*\s+(.*)', message)
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
@ -353,43 +354,43 @@ class AVWXWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex:
LOG.error(f"Failed to fetch aprs.fi data {ex}")
return "Failed to fetch location"
LOG.error(f'Failed to fetch aprs.fi data {ex}')
return 'Failed to fetch location'
# LOG.debug("LocationPlugin: aprs_data = {}".format(aprs_data))
if not len(aprs_data["entries"]):
LOG.error("Found no entries from aprs.fi!")
return "Failed to fetch location"
if not len(aprs_data['entries']):
LOG.error('Found no entries from aprs.fi!')
return 'Failed to fetch location'
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
api_key = CONF.avwx_plugin.apiKey
base_url = CONF.avwx_plugin.base_url
token = f"TOKEN {api_key}"
headers = {"Authorization": token}
token = f'TOKEN {api_key}'
headers = {'Authorization': token}
try:
coord = f"{lat},{lon}"
coord = f'{lat},{lon}'
url = (
"{}/api/station/near/{}?"
"n=1&airport=false&reporting=true&format=json".format(base_url, coord)
'{}/api/station/near/{}?'
'n=1&airport=false&reporting=true&format=json'.format(base_url, coord)
)
LOG.debug(f"Get stations near me '{url}'")
response = requests.get(url, headers=headers)
except Exception as ex:
LOG.error(ex)
raise Exception(f"Failed to get the weather '{ex}'")
raise Exception(f"Failed to get the weather '{ex}'") from ex
else:
wx_data = json.loads(response.text)
# LOG.debug(wx_data)
station = wx_data[0]["station"]["icao"]
station = wx_data[0]['station']['icao']
try:
url = (
"{}/api/metar/{}?options=info,translate,summary"
"&airport=true&reporting=true&format=json&onfail=cache".format(
'{}/api/metar/{}?options=info,translate,summary'
'&airport=true&reporting=true&format=json&onfail=cache'.format(
base_url,
station,
)
@ -399,9 +400,9 @@ class AVWXWeatherPlugin(plugin.APRSDRegexCommandPluginBase):
response = requests.get(url, headers=headers)
except Exception as ex:
LOG.error(ex)
raise Exception(f"Failed to get metar {ex}")
raise Exception(f'Failed to get metar {ex}') from ex
else:
metar_data = json.loads(response.text)
# LOG.debug(metar_data)
return metar_data["raw"]
return metar_data['raw']