Updated Healthcheck to use rpc to call aprsd

After adding the rpc service for aprsd server and separating the
admin web REST interface, healthcheck no longer worked.   The stats
are available via rpc now.
This commit is contained in:
Hemna 2022-12-31 16:31:40 -05:00
parent 79e7ed1e91
commit c5acdba6de
7 changed files with 200 additions and 172 deletions

View File

@ -5,35 +5,30 @@
# #
# python included libs # python included libs
import datetime import datetime
import json
import logging import logging
import sys import sys
import click import click
import requests from oslo_config import cfg
from rich.console import Console from rich.console import Console
import aprsd import aprsd
from aprsd import cli_helper, utils from aprsd import cli_helper, utils
from aprsd import conf # noqa
# local imports here # local imports here
from aprsd.aprsd import cli from aprsd.aprsd import cli
from aprsd.rpc import client as aprsd_rpc_client
# setup the global logger # setup the global logger
# logging.basicConfig(level=logging.DEBUG) # level=10 # logging.basicConfig(level=logging.DEBUG) # level=10
CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
console = Console() console = Console()
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option(
"--url",
"health_url",
show_default=True,
default="http://localhost:8001/stats",
help="The aprsd url to call for checking health/stats",
)
@click.option( @click.option(
"--timeout", "--timeout",
show_default=True, show_default=True,
@ -41,23 +36,32 @@ console = Console()
help="How long to wait for healtcheck url to come back", help="How long to wait for healtcheck url to come back",
) )
@click.pass_context @click.pass_context
@cli_helper.process_standard_options_no_config @cli_helper.process_standard_options
def healthcheck(ctx, health_url, timeout): def healthcheck(ctx, timeout):
"""Check the health of the running aprsd server.""" """Check the health of the running aprsd server."""
console.log(f"APRSD HealthCheck version: {aprsd.__version__}") console.log(f"APRSD HealthCheck version: {aprsd.__version__}")
if not CONF.rpc_settings.enabled:
LOG.error("Must enable rpc_settings.enabled to use healthcheck")
sys.exit(-1)
if not CONF.rpc_settings.ip:
LOG.error("Must enable rpc_settings.ip to use healthcheck")
sys.exit(-1)
if not CONF.rpc_settings.magic_word:
LOG.error("Must enable rpc_settings.magic_word to use healthcheck")
sys.exit(-1)
with console.status(f"APRSD HealthCheck version: {aprsd.__version__}") as status: with console.status(f"APRSD HealthCheck version: {aprsd.__version__}") as status:
try: try:
status.update(f"Contacting APRSD on {health_url}") status.update(f"Contacting APRSD via RPC {CONF.rpc_settings.ip}")
url = health_url stats = aprsd_rpc_client.RPCClient().get_stats_dict()
response = requests.get(url, timeout=timeout)
response.raise_for_status()
except Exception as ex: except Exception as ex:
console.log(f"Failed to fetch healthcheck url '{url}' : '{ex}'") console.log(f"Failed to fetch healthcheck : '{ex}'")
sys.exit(-1) sys.exit(-1)
else: else:
status.update("Contacted APRSD. Parsing results.") if not stats:
stats = json.loads(response.text) console.log("No stats from aprsd")
email_thread_last_update = stats["stats"]["email"]["thread_last_update"] sys.exit(-1)
email_thread_last_update = stats["email"]["thread_last_update"]
if email_thread_last_update != "never": if email_thread_last_update != "never":
delta = utils.parse_delta_str(email_thread_last_update) delta = utils.parse_delta_str(email_thread_last_update)
@ -68,7 +72,7 @@ def healthcheck(ctx, health_url, timeout):
console.log(f"Email thread is very old! {d}") console.log(f"Email thread is very old! {d}")
sys.exit(-1) sys.exit(-1)
aprsis_last_update = stats["stats"]["aprs-is"]["last_update"] aprsis_last_update = stats["aprs-is"]["last_update"]
delta = utils.parse_delta_str(aprsis_last_update) delta = utils.parse_delta_str(aprsis_last_update)
d = datetime.timedelta(**delta) d = datetime.timedelta(**delta)
max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0} max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0}

View File

@ -6,11 +6,10 @@ import click
from oslo_config import cfg from oslo_config import cfg
import aprsd import aprsd
from aprsd import (
cli_helper, client, packets, plugin, rpc_server, threads, utils,
)
from aprsd import aprsd as aprsd_main from aprsd import aprsd as aprsd_main
from aprsd import cli_helper, client, packets, plugin, threads, utils
from aprsd.aprsd import cli from aprsd.aprsd import cli
from aprsd.rpc import server as rpc_server
from aprsd.threads import rx from aprsd.threads import rx

View File

@ -10,13 +10,12 @@ import flask_classful
from flask_httpauth import HTTPBasicAuth from flask_httpauth import HTTPBasicAuth
from flask_socketio import Namespace, SocketIO from flask_socketio import Namespace, SocketIO
from oslo_config import cfg from oslo_config import cfg
import rpyc
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
import aprsd import aprsd
from aprsd import cli_helper, client, conf, packets, plugin, threads from aprsd import cli_helper, client, conf, packets, plugin, threads
from aprsd.conf import common
from aprsd.logging import rich as aprsd_logging from aprsd.logging import rich as aprsd_logging
from aprsd.rpc import client as aprsd_rpc_client
CONF = cfg.CONF CONF = cfg.CONF
@ -27,19 +26,6 @@ users = None
app = None app = None
class AuthSocketStream(rpyc.SocketStream):
"""Used to authenitcate the RPC stream to remote."""
@classmethod
def connect(cls, *args, authorizer=None, **kwargs):
stream_obj = super().connect(*args, **kwargs)
if callable(authorizer):
authorizer(stream_obj.sock)
return stream_obj
# HTTPBasicAuth doesn't work on a class method. # HTTPBasicAuth doesn't work on a class method.
# This has to be out here. Rely on the APRSDFlask # This has to be out here. Rely on the APRSDFlask
# class to initialize the users from the config # class to initialize the users from the config
@ -51,131 +37,6 @@ def verify_password(username, password):
return username return username
class RPCClient:
_instance = None
_rpc_client = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self._check_settings()
self.get_rpc_client()
def _check_settings(self):
if not CONF.rpc_settings.enabled:
LOG.error("RPC is not enabled, no way to get stats!!")
if CONF.rpc_settings.magic_word == common.APRSD_DEFAULT_MAGIC_WORD:
LOG.warning("You are using the default RPC magic word!!!")
LOG.warning("edit aprsd.conf and change rpc_settings.magic_word")
def _rpyc_connect(
self, host, port,
service=rpyc.VoidService,
config={}, ipv6=False,
keepalive=False, authorizer=None,
):
print(f"Connecting to RPC host {host}:{port}")
try:
s = AuthSocketStream.connect(
host, port, ipv6=ipv6, keepalive=keepalive,
authorizer=authorizer,
)
return rpyc.utils.factory.connect_stream(s, service, config=config)
except ConnectionRefusedError:
LOG.error(f"Failed to connect to RPC host {host}")
return None
def get_rpc_client(self):
if not self._rpc_client:
magic = CONF.rpc_settings.magic_word
self._rpc_client = self._rpyc_connect(
CONF.rpc_settings.ip,
CONF.rpc_settings.port,
authorizer=lambda sock: sock.send(magic.encode()),
)
return self._rpc_client
def get_stats_dict(self):
cl = self.get_rpc_client()
result = {}
if not cl:
return result
try:
rpc_stats_dict = cl.root.get_stats()
result = json.loads(rpc_stats_dict)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_track(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_track()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_watch_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_watch_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_seen_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_seen_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_log_entries(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result_str = cl.root.get_log_entries()
result = json.loads(result_str)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
class APRSDFlask(flask_classful.FlaskView): class APRSDFlask(flask_classful.FlaskView):
def set_config(self): def set_config(self):
@ -194,7 +55,7 @@ class APRSDFlask(flask_classful.FlaskView):
CONF.watch_list.callsigns, CONF.watch_list.callsigns,
), ),
) )
wl = RPCClient().get_watch_list() wl = aprsd_rpc_client.RPCClient().get_watch_list()
if wl and wl.is_enabled(): if wl and wl.is_enabled():
watch_count = len(wl) watch_count = len(wl)
watch_age = wl.max_delta() watch_age = wl.max_delta()
@ -202,7 +63,7 @@ class APRSDFlask(flask_classful.FlaskView):
watch_count = 0 watch_count = 0
watch_age = 0 watch_age = 0
sl = RPCClient().get_seen_list() sl = aprsd_rpc_client.RPCClient().get_seen_list()
if sl: if sl:
seen_count = len(sl) seen_count = len(sl)
else: else:
@ -269,7 +130,7 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required @auth.login_required
def packets(self): def packets(self):
packet_list = RPCClient().get_packet_list() packet_list = aprsd_rpc_client.RPCClient().get_packet_list()
if packet_list: if packet_list:
packets = packet_list.get() packets = packet_list.get()
tmp_list = [] tmp_list = []
@ -295,12 +156,12 @@ class APRSDFlask(flask_classful.FlaskView):
return json.dumps({"messages": "saved"}) return json.dumps({"messages": "saved"})
def _stats(self): def _stats(self):
track = RPCClient().get_packet_track() track = aprsd_rpc_client.RPCClient().get_packet_track()
now = datetime.datetime.now() now = datetime.datetime.now()
time_format = "%m-%d-%Y %H:%M:%S" time_format = "%m-%d-%Y %H:%M:%S"
stats_dict = RPCClient().get_stats_dict() stats_dict = aprsd_rpc_client.RPCClient().get_stats_dict()
if not stats_dict: if not stats_dict:
stats_dict = { stats_dict = {
"aprsd": {}, "aprsd": {},
@ -320,7 +181,7 @@ class APRSDFlask(flask_classful.FlaskView):
} }
# Convert the watch_list entries to age # Convert the watch_list entries to age
wl = RPCClient().get_watch_list() wl = aprsd_rpc_client.RPCClient().get_watch_list()
new_list = {} new_list = {}
if wl: if wl:
for call in wl.get_all(): for call in wl.get_all():
@ -341,7 +202,7 @@ class APRSDFlask(flask_classful.FlaskView):
} }
stats_dict["aprsd"]["watch_list"] = new_list stats_dict["aprsd"]["watch_list"] = new_list
packet_list = RPCClient().get_packet_list() packet_list = aprsd_rpc_client.RPCClient().get_packet_list()
rx = tx = 0 rx = tx = 0
if packet_list: if packet_list:
rx = packet_list.total_rx() rx = packet_list.total_rx()
@ -376,7 +237,7 @@ class LogUpdateThread(threads.APRSDThread):
global socketio global socketio
if socketio: if socketio:
log_entries = RPCClient().get_log_entries() log_entries = aprsd_rpc_client.RPCClient().get_log_entries()
if log_entries: if log_entries:
for entry in log_entries: for entry in log_entries:

14
aprsd/rpc/__init__.py Normal file
View File

@ -0,0 +1,14 @@
import rpyc
class AuthSocketStream(rpyc.SocketStream):
"""Used to authenitcate the RPC stream to remote."""
@classmethod
def connect(cls, *args, authorizer=None, **kwargs):
stream_obj = super().connect(*args, **kwargs)
if callable(authorizer):
authorizer(stream_obj.sock)
return stream_obj

150
aprsd/rpc/client.py Normal file
View File

@ -0,0 +1,150 @@
import json
import logging
from oslo_config import cfg
import rpyc
from aprsd import conf # noqa
from aprsd import rpc
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class RPCClient:
_instance = None
_rpc_client = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self._check_settings()
self.get_rpc_client()
def _check_settings(self):
if not CONF.rpc_settings.enabled:
LOG.error("RPC is not enabled, no way to get stats!!")
if CONF.rpc_settings.magic_word == conf.common.APRSD_DEFAULT_MAGIC_WORD:
LOG.warning("You are using the default RPC magic word!!!")
LOG.warning("edit aprsd.conf and change rpc_settings.magic_word")
def _rpyc_connect(
self, host, port,
service=rpyc.VoidService,
config={}, ipv6=False,
keepalive=False, authorizer=None,
):
print(f"Connecting to RPC host {host}:{port}")
try:
s = rpc.AuthSocketStream.connect(
host, port, ipv6=ipv6, keepalive=keepalive,
authorizer=authorizer,
)
return rpyc.utils.factory.connect_stream(s, service, config=config)
except ConnectionRefusedError:
LOG.error(f"Failed to connect to RPC host {host}")
return None
def get_rpc_client(self):
if not self._rpc_client:
magic = CONF.rpc_settings.magic_word
self._rpc_client = self._rpyc_connect(
CONF.rpc_settings.ip,
CONF.rpc_settings.port,
authorizer=lambda sock: sock.send(magic.encode()),
)
return self._rpc_client
def get_stats_dict(self):
cl = self.get_rpc_client()
result = {}
if not cl:
return result
try:
rpc_stats_dict = cl.root.get_stats()
result = json.loads(rpc_stats_dict)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_stats(self):
cl = self.get_rpc_client()
result = {}
if not cl:
return result
try:
result = cl.root.get_stats_obj()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_track(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_track()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_watch_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_watch_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_seen_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_seen_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_log_entries(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result_str = cl.root.get_log_entries()
result = json.loads(result_str)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result

View File

@ -53,4 +53,4 @@ ADD bin/run.sh /usr/local/bin
ENTRYPOINT ["/usr/local/bin/run.sh"] ENTRYPOINT ["/usr/local/bin/run.sh"]
HEALTHCHECK --interval=5m --timeout=12s --start-period=30s \ HEALTHCHECK --interval=5m --timeout=12s --start-period=30s \
CMD aprsd healthcheck --config /config/aprsd.conf --url http://localhost:8001/stats CMD aprsd healthcheck --config /config/aprsd.conf