From 30d1eb57dd249c609f5b092d8084c40cadda7bd9 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 18 Dec 2024 16:55:26 -0500 Subject: [PATCH] Added new KeepAliveCollector This new collector allows external extensions and plugins to get callbacks during the keepalive thread runs. Every 60 seconds the keepalive thread will do some checks and log some entries in the logfile to keep the user/admin aware of the state of aprsd itself. --- aprsd/client/aprsis.py | 25 +++++++-- aprsd/client/base.py | 12 +++++ aprsd/client/kiss.py | 17 ++++++ aprsd/cmds/listen.py | 6 +-- aprsd/cmds/server.py | 4 +- aprsd/threads/{keep_alive.py => keepalive.py} | 50 +++-------------- aprsd/threads/keepalive_collector.py | 54 +++++++++++++++++++ 7 files changed, 116 insertions(+), 52 deletions(-) rename aprsd/threads/{keep_alive.py => keepalive.py} (61%) create mode 100644 aprsd/threads/keepalive_collector.py diff --git a/aprsd/client/aprsis.py b/aprsd/client/aprsis.py index d6ad9ec..9773f7f 100644 --- a/aprsd/client/aprsis.py +++ b/aprsd/client/aprsis.py @@ -3,7 +3,9 @@ import logging import time from aprslib.exceptions import LoginError +from loguru import logger from oslo_config import cfg +import timeago from aprsd import client, exception from aprsd.client import base @@ -13,11 +15,13 @@ from aprsd.packets import core CONF = cfg.CONF LOG = logging.getLogger("APRSD") +LOGU = logger class APRSISClient(base.APRSClient): _client = None + _checks = False def __init__(self): max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} @@ -45,6 +49,20 @@ class APRSISClient(base.APRSClient): return stats + def keepalive_check(self): + # Don't check the first time through. + if not self.is_alive() and self._checks: + LOG.warning("Resetting client. It's not alive.") + self.reset() + self._checks = True + + def keepalive_log(self): + if ka := self._client.aprsd_keepalive: + keepalive = timeago.format(ka) + else: + keepalive = "N/A" + LOGU.opt(colors=True).info(f"Client keepalive {keepalive}") + @staticmethod def is_enabled(): # Defaults to True if the enabled flag is non existent @@ -81,14 +99,13 @@ class APRSISClient(base.APRSClient): if delta > self.max_delta: LOG.error(f"Connection is stale, last heard {delta} ago.") return True + return False def is_alive(self): - if self._client: - return self._client.is_alive() and not self._is_stale_connection() - else: + if not self._client: LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") return False - + return self._client.is_alive() and not self._is_stale_connection() def close(self): if self._client: self._client.stop() diff --git a/aprsd/client/base.py b/aprsd/client/base.py index 8470546..c0bfbe8 100644 --- a/aprsd/client/base.py +++ b/aprsd/client/base.py @@ -6,6 +6,7 @@ from oslo_config import cfg import wrapt from aprsd.packets import core +from aprsd.threads import keepalive_collector CONF = cfg.CONF @@ -30,6 +31,7 @@ class APRSClient: """This magic turns this into a singleton.""" if cls._instance is None: cls._instance = super().__new__(cls) + keepalive_collector.KeepAliveCollector().register(cls) # Put any initialization here. cls._instance._create_client() return cls._instance @@ -42,6 +44,16 @@ class APRSClient: dict: Statistics about the connection and packet handling """ + @abc.abstractmethod + def keepalive_check(self) -> None: + """Called during keepalive run to check status.""" + ... + + @abc.abstractmethod + def keepalive_log(self) -> None: + """Log any keepalive information.""" + ... + @property def is_connected(self): return self.connected diff --git a/aprsd/client/kiss.py b/aprsd/client/kiss.py index d413aa1..eaa546e 100644 --- a/aprsd/client/kiss.py +++ b/aprsd/client/kiss.py @@ -2,7 +2,9 @@ import datetime import logging import aprslib +from loguru import logger from oslo_config import cfg +import timeago from aprsd import client, exception from aprsd.client import base @@ -12,6 +14,7 @@ from aprsd.packets import core CONF = cfg.CONF LOG = logging.getLogger("APRSD") +LOGU = logger class KISSClient(base.APRSClient): @@ -79,6 +82,20 @@ class KISSClient(base.APRSClient): if self._client: self._client.stop() + def keepalive_check(self): + # Don't check the first time through. + if not self.is_alive() and self._checks: + LOG.warning("Resetting client. It's not alive.") + self.reset() + self._checks = True + + def keepalive_log(self): + if ka := self._client.aprsd_keepalive: + keepalive = timeago.format(ka) + else: + keepalive = "N/A" + LOGU.opt(colors=True).info(f"Client keepalive {keepalive}") + @staticmethod def transport(): if CONF.kiss_serial.enabled: diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index fc18e64..17f49e4 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -23,7 +23,7 @@ from aprsd.packets import collector as packet_collector from aprsd.packets import log as packet_log from aprsd.packets import seen_list from aprsd.stats import collector -from aprsd.threads import keep_alive, rx +from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread from aprsd.threads.aprsd import APRSDThread @@ -126,7 +126,7 @@ class ListenStatsThread(APRSDThread): thread_hex = f"fg {utils.hex_from_name(k)}" LOGU.opt(colors=True).info( f"<{thread_hex}>{k:<15} " - f"RX: {v['rx']} TX: {v['tx']}", + f"RX: {v["rx"]} TX: {v["tx"]}", ) time.sleep(1) @@ -265,7 +265,7 @@ def listen( LOG.debug(f"Filter by '{filter}'") aprs_client.set_filter(filter) - keepalive = keep_alive.KeepAliveThread() + keepalive = keepalive.KeepAliveThread() if not CONF.enable_seen_list: # just deregister the class from the packet collector diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 0732a8d..fc0bce9 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -14,7 +14,7 @@ from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import seen_list from aprsd.threads import aprsd as aprsd_threads -from aprsd.threads import keep_alive, registry, rx +from aprsd.threads import keepalive, registry, rx from aprsd.threads import stats as stats_thread from aprsd.threads import tx from aprsd.utils import singleton @@ -146,7 +146,7 @@ def server(ctx, flush): # Now start all the main processing threads. - server_threads.register(keep_alive.KeepAliveThread()) + server_threads.register(keepalive.KeepAliveThread()) server_threads.register(stats_thread.APRSDStatsStoreThread()) server_threads.register( rx.APRSDPluginRXThread( diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keepalive.py similarity index 61% rename from aprsd/threads/keep_alive.py rename to aprsd/threads/keepalive.py index a47e4a5..d4f53ca 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keepalive.py @@ -5,13 +5,11 @@ import tracemalloc from loguru import logger from oslo_config import cfg -import timeago from aprsd import packets, utils -from aprsd.client import client_factory from aprsd.log import log as aprsd_log from aprsd.stats import collector -from aprsd.threads import APRSDThread, APRSDThreadList +from aprsd.threads import APRSDThread, APRSDThreadList, keepalive_collector CONF = cfg.CONF @@ -36,15 +34,6 @@ class KeepAliveThread(APRSDThread): thread_list = APRSDThreadList() now = datetime.datetime.now() - if "EmailStats" in stats_json: - email_stats = stats_json["EmailStats"] - if email_stats.get("last_check_time"): - email_thread_time = utils.strfdelta(now - email_stats["last_check_time"]) - else: - email_thread_time = "N/A" - else: - email_thread_time = "N/A" - if "APRSClientStats" in stats_json and stats_json["APRSClientStats"].get("transport") == "aprsis": if stats_json["APRSClientStats"].get("server_keepalive"): last_msg_time = utils.strfdelta(now - stats_json["APRSClientStats"]["server_keepalive"]) @@ -64,7 +53,7 @@ class KeepAliveThread(APRSDThread): keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " - "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}" + "Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}" ).format( stats_json["APRSDStats"]["callsign"], stats_json["APRSDStats"]["uptime"], @@ -74,7 +63,6 @@ class KeepAliveThread(APRSDThread): tx_msg, rx_msg, last_msg_time, - email_thread_time, stats_json["APRSDStats"]["memory_current_str"], stats_json["APRSDStats"]["memory_peak_str"], len(thread_list), @@ -97,35 +85,11 @@ class KeepAliveThread(APRSDThread): LOGU.opt(colors=True).info(thread_msg) # LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}") - # check the APRS connection - cl = client_factory.create() - cl_stats = cl.stats() - ka = cl_stats.get("connection_keepalive", None) - if ka: - keepalive = timeago.format(ka) - else: - keepalive = "N/A" - LOGU.opt(colors=True).info(f"Client keepalive {keepalive}") - # Reset the connection if it's dead and this isn't our - # First time through the loop. - # The first time through the loop can happen at startup where - # The keepalive thread starts before the client has a chance - # to make it's connection the first time. - if not cl.is_alive() and self.cntr > 0: - LOG.error(f"{cl.__class__.__name__} is not alive!!! Resetting") - client_factory.create().reset() - # else: - # # See if we should reset the aprs-is client - # # Due to losing a keepalive from them - # delta_dict = utils.parse_delta_str(last_msg_time) - # delta = datetime.timedelta(**delta_dict) - # - # if delta > self.max_delta: - # # We haven't gotten a keepalive from aprs-is in a while - # # reset the connection.a - # if not client.KISSClient.is_enabled(): - # LOG.warning(f"Resetting connection to APRS-IS {delta}") - # client.factory.create().reset() + # Go through the registered keepalive collectors + # and check them as well as call log. + collect = keepalive_collector.KeepAliveCollector() + collect.check() + collect.log() # Check version every day delta = now - self.checker_time diff --git a/aprsd/threads/keepalive_collector.py b/aprsd/threads/keepalive_collector.py new file mode 100644 index 0000000..52dd0b0 --- /dev/null +++ b/aprsd/threads/keepalive_collector.py @@ -0,0 +1,54 @@ +import logging +from typing import Callable, Protocol, runtime_checkable + +from aprsd.utils import singleton + + +LOG = logging.getLogger("APRSD") + + +@runtime_checkable +class KeepAliveProducer(Protocol): + """The KeepAliveProducer protocol is used to define the interface for running Keepalive checks.""" + def keepalive_check(self) -> dict: + """Check for keepalive.""" + ... + + def keepalive_log(self): + """Log any keepalive information.""" + ... + + +@singleton +class KeepAliveCollector: + """The Collector class is used to collect stats from multiple StatsProducer instances.""" + def __init__(self): + self.producers: list[Callable] = [] + + def check(self) -> None: + """Do any keepalive checks.""" + for name in self.producers: + cls = name() + try: + cls.keepalive_check() + except Exception as e: + LOG.error(f"Error in producer {name} (check): {e}") + + def log(self) -> None: + """Log any relevant information during a KeepAlive check""" + for name in self.producers: + cls = name() + try: + cls.keepalive_log() + except Exception as e: + LOG.error(f"Error in producer {name} (check): {e}") + + def register(self, producer_name: Callable): + if not isinstance(producer_name, KeepAliveProducer): + raise TypeError(f"Producer {producer_name} is not a KeepAliveProducer") + self.producers.append(producer_name) + + def unregister(self, producer_name: Callable): + if not isinstance(producer_name, KeepAliveProducer): + raise TypeError(f"Producer {producer_name} is not a KeepAliveProducer") + self.producers.remove(producer_name)