From c5acdba6dee13bba189af106925cb33c8adf078e Mon Sep 17 00:00:00 2001 From: Hemna Date: Sat, 31 Dec 2022 16:31:40 -0500 Subject: [PATCH] Updated Healthcheck to use rpc to call aprsd After adding the rpc service for aprsd server and separating the admin web REST interface, healthcheck no longer worked. The stats are available via rpc now. --- aprsd/cmds/healthcheck.py | 44 +++---- aprsd/cmds/server.py | 5 +- aprsd/flask.py | 157 ++----------------------- aprsd/rpc/__init__.py | 14 +++ aprsd/rpc/client.py | 150 +++++++++++++++++++++++ aprsd/{rpc_server.py => rpc/server.py} | 0 docker/Dockerfile | 2 +- 7 files changed, 200 insertions(+), 172 deletions(-) create mode 100644 aprsd/rpc/__init__.py create mode 100644 aprsd/rpc/client.py rename aprsd/{rpc_server.py => rpc/server.py} (100%) diff --git a/aprsd/cmds/healthcheck.py b/aprsd/cmds/healthcheck.py index 32d8f6e..1bf7c3b 100644 --- a/aprsd/cmds/healthcheck.py +++ b/aprsd/cmds/healthcheck.py @@ -5,35 +5,30 @@ # # python included libs import datetime -import json import logging import sys import click -import requests +from oslo_config import cfg from rich.console import Console import aprsd from aprsd import cli_helper, utils +from aprsd import conf # noqa # local imports here from aprsd.aprsd import cli +from aprsd.rpc import client as aprsd_rpc_client # setup the global logger # logging.basicConfig(level=logging.DEBUG) # level=10 +CONF = cfg.CONF LOG = logging.getLogger("APRSD") console = Console() @cli.command() @cli_helper.add_options(cli_helper.common_options) -@click.option( - "--url", - "health_url", - show_default=True, - default="http://localhost:8001/stats", - help="The aprsd url to call for checking health/stats", -) @click.option( "--timeout", show_default=True, @@ -41,23 +36,32 @@ console = Console() help="How long to wait for healtcheck url to come back", ) @click.pass_context -@cli_helper.process_standard_options_no_config -def healthcheck(ctx, health_url, timeout): +@cli_helper.process_standard_options +def healthcheck(ctx, timeout): """Check the health of the running aprsd server.""" console.log(f"APRSD HealthCheck version: {aprsd.__version__}") + if not CONF.rpc_settings.enabled: + LOG.error("Must enable rpc_settings.enabled to use healthcheck") + sys.exit(-1) + if not CONF.rpc_settings.ip: + LOG.error("Must enable rpc_settings.ip to use healthcheck") + sys.exit(-1) + if not CONF.rpc_settings.magic_word: + LOG.error("Must enable rpc_settings.magic_word to use healthcheck") + sys.exit(-1) + with console.status(f"APRSD HealthCheck version: {aprsd.__version__}") as status: try: - status.update(f"Contacting APRSD on {health_url}") - url = health_url - response = requests.get(url, timeout=timeout) - response.raise_for_status() + status.update(f"Contacting APRSD via RPC {CONF.rpc_settings.ip}") + stats = aprsd_rpc_client.RPCClient().get_stats_dict() except Exception as ex: - console.log(f"Failed to fetch healthcheck url '{url}' : '{ex}'") + console.log(f"Failed to fetch healthcheck : '{ex}'") sys.exit(-1) else: - status.update("Contacted APRSD. Parsing results.") - stats = json.loads(response.text) - email_thread_last_update = stats["stats"]["email"]["thread_last_update"] + if not stats: + console.log("No stats from aprsd") + sys.exit(-1) + email_thread_last_update = stats["email"]["thread_last_update"] if email_thread_last_update != "never": delta = utils.parse_delta_str(email_thread_last_update) @@ -68,7 +72,7 @@ def healthcheck(ctx, health_url, timeout): console.log(f"Email thread is very old! {d}") sys.exit(-1) - aprsis_last_update = stats["stats"]["aprs-is"]["last_update"] + aprsis_last_update = stats["aprs-is"]["last_update"] delta = utils.parse_delta_str(aprsis_last_update) d = datetime.timedelta(**delta) max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0} diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 07c20e2..ff5f74e 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -6,11 +6,10 @@ import click from oslo_config import cfg import aprsd -from aprsd import ( - cli_helper, client, packets, plugin, rpc_server, threads, utils, -) from aprsd import aprsd as aprsd_main +from aprsd import cli_helper, client, packets, plugin, threads, utils from aprsd.aprsd import cli +from aprsd.rpc import server as rpc_server from aprsd.threads import rx diff --git a/aprsd/flask.py b/aprsd/flask.py index cbe955f..199df01 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -10,13 +10,12 @@ import flask_classful from flask_httpauth import HTTPBasicAuth from flask_socketio import Namespace, SocketIO from oslo_config import cfg -import rpyc from werkzeug.security import check_password_hash, generate_password_hash import aprsd from aprsd import cli_helper, client, conf, packets, plugin, threads -from aprsd.conf import common from aprsd.logging import rich as aprsd_logging +from aprsd.rpc import client as aprsd_rpc_client CONF = cfg.CONF @@ -27,19 +26,6 @@ users = None app = None -class AuthSocketStream(rpyc.SocketStream): - """Used to authenitcate the RPC stream to remote.""" - - @classmethod - def connect(cls, *args, authorizer=None, **kwargs): - stream_obj = super().connect(*args, **kwargs) - - if callable(authorizer): - authorizer(stream_obj.sock) - - return stream_obj - - # HTTPBasicAuth doesn't work on a class method. # This has to be out here. Rely on the APRSDFlask # class to initialize the users from the config @@ -51,131 +37,6 @@ def verify_password(username, password): return username -class RPCClient: - _instance = None - _rpc_client = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self): - self._check_settings() - self.get_rpc_client() - - def _check_settings(self): - if not CONF.rpc_settings.enabled: - LOG.error("RPC is not enabled, no way to get stats!!") - - if CONF.rpc_settings.magic_word == common.APRSD_DEFAULT_MAGIC_WORD: - LOG.warning("You are using the default RPC magic word!!!") - LOG.warning("edit aprsd.conf and change rpc_settings.magic_word") - - def _rpyc_connect( - self, host, port, - service=rpyc.VoidService, - config={}, ipv6=False, - keepalive=False, authorizer=None, - ): - - print(f"Connecting to RPC host {host}:{port}") - try: - s = AuthSocketStream.connect( - host, port, ipv6=ipv6, keepalive=keepalive, - authorizer=authorizer, - ) - return rpyc.utils.factory.connect_stream(s, service, config=config) - except ConnectionRefusedError: - LOG.error(f"Failed to connect to RPC host {host}") - return None - - def get_rpc_client(self): - if not self._rpc_client: - magic = CONF.rpc_settings.magic_word - self._rpc_client = self._rpyc_connect( - CONF.rpc_settings.ip, - CONF.rpc_settings.port, - authorizer=lambda sock: sock.send(magic.encode()), - ) - return self._rpc_client - - def get_stats_dict(self): - cl = self.get_rpc_client() - result = {} - if not cl: - return result - - try: - rpc_stats_dict = cl.root.get_stats() - result = json.loads(rpc_stats_dict) - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_packet_track(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_packet_track() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_packet_list(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_packet_list() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_watch_list(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_watch_list() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_seen_list(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_seen_list() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_log_entries(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result_str = cl.root.get_log_entries() - result = json.loads(result_str) - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - class APRSDFlask(flask_classful.FlaskView): def set_config(self): @@ -194,7 +55,7 @@ class APRSDFlask(flask_classful.FlaskView): CONF.watch_list.callsigns, ), ) - wl = RPCClient().get_watch_list() + wl = aprsd_rpc_client.RPCClient().get_watch_list() if wl and wl.is_enabled(): watch_count = len(wl) watch_age = wl.max_delta() @@ -202,7 +63,7 @@ class APRSDFlask(flask_classful.FlaskView): watch_count = 0 watch_age = 0 - sl = RPCClient().get_seen_list() + sl = aprsd_rpc_client.RPCClient().get_seen_list() if sl: seen_count = len(sl) else: @@ -269,7 +130,7 @@ class APRSDFlask(flask_classful.FlaskView): @auth.login_required def packets(self): - packet_list = RPCClient().get_packet_list() + packet_list = aprsd_rpc_client.RPCClient().get_packet_list() if packet_list: packets = packet_list.get() tmp_list = [] @@ -295,12 +156,12 @@ class APRSDFlask(flask_classful.FlaskView): return json.dumps({"messages": "saved"}) def _stats(self): - track = RPCClient().get_packet_track() + track = aprsd_rpc_client.RPCClient().get_packet_track() now = datetime.datetime.now() time_format = "%m-%d-%Y %H:%M:%S" - stats_dict = RPCClient().get_stats_dict() + stats_dict = aprsd_rpc_client.RPCClient().get_stats_dict() if not stats_dict: stats_dict = { "aprsd": {}, @@ -320,7 +181,7 @@ class APRSDFlask(flask_classful.FlaskView): } # Convert the watch_list entries to age - wl = RPCClient().get_watch_list() + wl = aprsd_rpc_client.RPCClient().get_watch_list() new_list = {} if wl: for call in wl.get_all(): @@ -341,7 +202,7 @@ class APRSDFlask(flask_classful.FlaskView): } stats_dict["aprsd"]["watch_list"] = new_list - packet_list = RPCClient().get_packet_list() + packet_list = aprsd_rpc_client.RPCClient().get_packet_list() rx = tx = 0 if packet_list: rx = packet_list.total_rx() @@ -376,7 +237,7 @@ class LogUpdateThread(threads.APRSDThread): global socketio if socketio: - log_entries = RPCClient().get_log_entries() + log_entries = aprsd_rpc_client.RPCClient().get_log_entries() if log_entries: for entry in log_entries: diff --git a/aprsd/rpc/__init__.py b/aprsd/rpc/__init__.py new file mode 100644 index 0000000..87df2d6 --- /dev/null +++ b/aprsd/rpc/__init__.py @@ -0,0 +1,14 @@ +import rpyc + + +class AuthSocketStream(rpyc.SocketStream): + """Used to authenitcate the RPC stream to remote.""" + + @classmethod + def connect(cls, *args, authorizer=None, **kwargs): + stream_obj = super().connect(*args, **kwargs) + + if callable(authorizer): + authorizer(stream_obj.sock) + + return stream_obj diff --git a/aprsd/rpc/client.py b/aprsd/rpc/client.py new file mode 100644 index 0000000..8b192c7 --- /dev/null +++ b/aprsd/rpc/client.py @@ -0,0 +1,150 @@ +import json +import logging + +from oslo_config import cfg +import rpyc + +from aprsd import conf # noqa +from aprsd import rpc + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class RPCClient: + _instance = None + _rpc_client = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + self._check_settings() + self.get_rpc_client() + + def _check_settings(self): + if not CONF.rpc_settings.enabled: + LOG.error("RPC is not enabled, no way to get stats!!") + + if CONF.rpc_settings.magic_word == conf.common.APRSD_DEFAULT_MAGIC_WORD: + LOG.warning("You are using the default RPC magic word!!!") + LOG.warning("edit aprsd.conf and change rpc_settings.magic_word") + + def _rpyc_connect( + self, host, port, + service=rpyc.VoidService, + config={}, ipv6=False, + keepalive=False, authorizer=None, + ): + + print(f"Connecting to RPC host {host}:{port}") + try: + s = rpc.AuthSocketStream.connect( + host, port, ipv6=ipv6, keepalive=keepalive, + authorizer=authorizer, + ) + return rpyc.utils.factory.connect_stream(s, service, config=config) + except ConnectionRefusedError: + LOG.error(f"Failed to connect to RPC host {host}") + return None + + def get_rpc_client(self): + if not self._rpc_client: + magic = CONF.rpc_settings.magic_word + self._rpc_client = self._rpyc_connect( + CONF.rpc_settings.ip, + CONF.rpc_settings.port, + authorizer=lambda sock: sock.send(magic.encode()), + ) + return self._rpc_client + + def get_stats_dict(self): + cl = self.get_rpc_client() + result = {} + if not cl: + return result + + try: + rpc_stats_dict = cl.root.get_stats() + result = json.loads(rpc_stats_dict) + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_stats(self): + cl = self.get_rpc_client() + result = {} + if not cl: + return result + + try: + result = cl.root.get_stats_obj() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_packet_track(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_packet_track() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_packet_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_packet_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_watch_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_watch_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_seen_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_seen_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_log_entries(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result_str = cl.root.get_log_entries() + result = json.loads(result_str) + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result diff --git a/aprsd/rpc_server.py b/aprsd/rpc/server.py similarity index 100% rename from aprsd/rpc_server.py rename to aprsd/rpc/server.py diff --git a/docker/Dockerfile b/docker/Dockerfile index ae52a68..b0cec85 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -53,4 +53,4 @@ ADD bin/run.sh /usr/local/bin ENTRYPOINT ["/usr/local/bin/run.sh"] HEALTHCHECK --interval=5m --timeout=12s --start-period=30s \ - CMD aprsd healthcheck --config /config/aprsd.conf --url http://localhost:8001/stats + CMD aprsd healthcheck --config /config/aprsd.conf