mirror of
https://github.com/craigerl/aprsd.git
synced 2025-04-19 09:49:01 -04:00
Added new KeepAliveCollector
This new collector allows external extensions and plugins to get callbacks during the keepalive thread runs. Every 60 seconds the keepalive thread will do some checks and log some entries in the logfile to keep the user/admin aware of the state of aprsd itself.
This commit is contained in:
parent
bbdbb9aba1
commit
30d1eb57dd
@ -3,7 +3,9 @@ import logging
|
||||
import time
|
||||
|
||||
from aprslib.exceptions import LoginError
|
||||
from loguru import logger
|
||||
from oslo_config import cfg
|
||||
import timeago
|
||||
|
||||
from aprsd import client, exception
|
||||
from aprsd.client import base
|
||||
@ -13,11 +15,13 @@ from aprsd.packets import core
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger("APRSD")
|
||||
LOGU = logger
|
||||
|
||||
|
||||
class APRSISClient(base.APRSClient):
|
||||
|
||||
_client = None
|
||||
_checks = False
|
||||
|
||||
def __init__(self):
|
||||
max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0}
|
||||
@ -45,6 +49,20 @@ class APRSISClient(base.APRSClient):
|
||||
|
||||
return stats
|
||||
|
||||
def keepalive_check(self):
|
||||
# Don't check the first time through.
|
||||
if not self.is_alive() and self._checks:
|
||||
LOG.warning("Resetting client. It's not alive.")
|
||||
self.reset()
|
||||
self._checks = True
|
||||
|
||||
def keepalive_log(self):
|
||||
if ka := self._client.aprsd_keepalive:
|
||||
keepalive = timeago.format(ka)
|
||||
else:
|
||||
keepalive = "N/A"
|
||||
LOGU.opt(colors=True).info(f"<green>Client keepalive {keepalive}</green>")
|
||||
|
||||
@staticmethod
|
||||
def is_enabled():
|
||||
# Defaults to True if the enabled flag is non existent
|
||||
@ -81,14 +99,13 @@ class APRSISClient(base.APRSClient):
|
||||
if delta > self.max_delta:
|
||||
LOG.error(f"Connection is stale, last heard {delta} ago.")
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_alive(self):
|
||||
if self._client:
|
||||
return self._client.is_alive() and not self._is_stale_connection()
|
||||
else:
|
||||
if not self._client:
|
||||
LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!")
|
||||
return False
|
||||
|
||||
return self._client.is_alive() and not self._is_stale_connection()
|
||||
def close(self):
|
||||
if self._client:
|
||||
self._client.stop()
|
||||
|
@ -6,6 +6,7 @@ from oslo_config import cfg
|
||||
import wrapt
|
||||
|
||||
from aprsd.packets import core
|
||||
from aprsd.threads import keepalive_collector
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -30,6 +31,7 @@ class APRSClient:
|
||||
"""This magic turns this into a singleton."""
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
keepalive_collector.KeepAliveCollector().register(cls)
|
||||
# Put any initialization here.
|
||||
cls._instance._create_client()
|
||||
return cls._instance
|
||||
@ -42,6 +44,16 @@ class APRSClient:
|
||||
dict: Statistics about the connection and packet handling
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def keepalive_check(self) -> None:
|
||||
"""Called during keepalive run to check status."""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def keepalive_log(self) -> None:
|
||||
"""Log any keepalive information."""
|
||||
...
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
return self.connected
|
||||
|
@ -2,7 +2,9 @@ import datetime
|
||||
import logging
|
||||
|
||||
import aprslib
|
||||
from loguru import logger
|
||||
from oslo_config import cfg
|
||||
import timeago
|
||||
|
||||
from aprsd import client, exception
|
||||
from aprsd.client import base
|
||||
@ -12,6 +14,7 @@ from aprsd.packets import core
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger("APRSD")
|
||||
LOGU = logger
|
||||
|
||||
|
||||
class KISSClient(base.APRSClient):
|
||||
@ -79,6 +82,20 @@ class KISSClient(base.APRSClient):
|
||||
if self._client:
|
||||
self._client.stop()
|
||||
|
||||
def keepalive_check(self):
|
||||
# Don't check the first time through.
|
||||
if not self.is_alive() and self._checks:
|
||||
LOG.warning("Resetting client. It's not alive.")
|
||||
self.reset()
|
||||
self._checks = True
|
||||
|
||||
def keepalive_log(self):
|
||||
if ka := self._client.aprsd_keepalive:
|
||||
keepalive = timeago.format(ka)
|
||||
else:
|
||||
keepalive = "N/A"
|
||||
LOGU.opt(colors=True).info(f"<green>Client keepalive {keepalive}</green>")
|
||||
|
||||
@staticmethod
|
||||
def transport():
|
||||
if CONF.kiss_serial.enabled:
|
||||
|
@ -23,7 +23,7 @@ from aprsd.packets import collector as packet_collector
|
||||
from aprsd.packets import log as packet_log
|
||||
from aprsd.packets import seen_list
|
||||
from aprsd.stats import collector
|
||||
from aprsd.threads import keep_alive, rx
|
||||
from aprsd.threads import keepalive, rx
|
||||
from aprsd.threads import stats as stats_thread
|
||||
from aprsd.threads.aprsd import APRSDThread
|
||||
|
||||
@ -126,7 +126,7 @@ class ListenStatsThread(APRSDThread):
|
||||
thread_hex = f"fg {utils.hex_from_name(k)}"
|
||||
LOGU.opt(colors=True).info(
|
||||
f"<{thread_hex}>{k:<15}</{thread_hex}> "
|
||||
f"<blue>RX: {v['rx']}</blue> <red>TX: {v['tx']}</red>",
|
||||
f"<blue>RX: {v["rx"]}</blue> <red>TX: {v["tx"]}</red>",
|
||||
)
|
||||
|
||||
time.sleep(1)
|
||||
@ -265,7 +265,7 @@ def listen(
|
||||
LOG.debug(f"Filter by '{filter}'")
|
||||
aprs_client.set_filter(filter)
|
||||
|
||||
keepalive = keep_alive.KeepAliveThread()
|
||||
keepalive = keepalive.KeepAliveThread()
|
||||
|
||||
if not CONF.enable_seen_list:
|
||||
# just deregister the class from the packet collector
|
||||
|
@ -14,7 +14,7 @@ from aprsd.main import cli
|
||||
from aprsd.packets import collector as packet_collector
|
||||
from aprsd.packets import seen_list
|
||||
from aprsd.threads import aprsd as aprsd_threads
|
||||
from aprsd.threads import keep_alive, registry, rx
|
||||
from aprsd.threads import keepalive, registry, rx
|
||||
from aprsd.threads import stats as stats_thread
|
||||
from aprsd.threads import tx
|
||||
from aprsd.utils import singleton
|
||||
@ -146,7 +146,7 @@ def server(ctx, flush):
|
||||
|
||||
# Now start all the main processing threads.
|
||||
|
||||
server_threads.register(keep_alive.KeepAliveThread())
|
||||
server_threads.register(keepalive.KeepAliveThread())
|
||||
server_threads.register(stats_thread.APRSDStatsStoreThread())
|
||||
server_threads.register(
|
||||
rx.APRSDPluginRXThread(
|
||||
|
@ -5,13 +5,11 @@ import tracemalloc
|
||||
|
||||
from loguru import logger
|
||||
from oslo_config import cfg
|
||||
import timeago
|
||||
|
||||
from aprsd import packets, utils
|
||||
from aprsd.client import client_factory
|
||||
from aprsd.log import log as aprsd_log
|
||||
from aprsd.stats import collector
|
||||
from aprsd.threads import APRSDThread, APRSDThreadList
|
||||
from aprsd.threads import APRSDThread, APRSDThreadList, keepalive_collector
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -36,15 +34,6 @@ class KeepAliveThread(APRSDThread):
|
||||
thread_list = APRSDThreadList()
|
||||
now = datetime.datetime.now()
|
||||
|
||||
if "EmailStats" in stats_json:
|
||||
email_stats = stats_json["EmailStats"]
|
||||
if email_stats.get("last_check_time"):
|
||||
email_thread_time = utils.strfdelta(now - email_stats["last_check_time"])
|
||||
else:
|
||||
email_thread_time = "N/A"
|
||||
else:
|
||||
email_thread_time = "N/A"
|
||||
|
||||
if "APRSClientStats" in stats_json and stats_json["APRSClientStats"].get("transport") == "aprsis":
|
||||
if stats_json["APRSClientStats"].get("server_keepalive"):
|
||||
last_msg_time = utils.strfdelta(now - stats_json["APRSClientStats"]["server_keepalive"])
|
||||
@ -64,7 +53,7 @@ class KeepAliveThread(APRSDThread):
|
||||
|
||||
keepalive = (
|
||||
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
|
||||
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}"
|
||||
"Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}"
|
||||
).format(
|
||||
stats_json["APRSDStats"]["callsign"],
|
||||
stats_json["APRSDStats"]["uptime"],
|
||||
@ -74,7 +63,6 @@ class KeepAliveThread(APRSDThread):
|
||||
tx_msg,
|
||||
rx_msg,
|
||||
last_msg_time,
|
||||
email_thread_time,
|
||||
stats_json["APRSDStats"]["memory_current_str"],
|
||||
stats_json["APRSDStats"]["memory_peak_str"],
|
||||
len(thread_list),
|
||||
@ -97,35 +85,11 @@ class KeepAliveThread(APRSDThread):
|
||||
LOGU.opt(colors=True).info(thread_msg)
|
||||
# LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}")
|
||||
|
||||
# check the APRS connection
|
||||
cl = client_factory.create()
|
||||
cl_stats = cl.stats()
|
||||
ka = cl_stats.get("connection_keepalive", None)
|
||||
if ka:
|
||||
keepalive = timeago.format(ka)
|
||||
else:
|
||||
keepalive = "N/A"
|
||||
LOGU.opt(colors=True).info(f"<green>Client keepalive {keepalive}</green>")
|
||||
# Reset the connection if it's dead and this isn't our
|
||||
# First time through the loop.
|
||||
# The first time through the loop can happen at startup where
|
||||
# The keepalive thread starts before the client has a chance
|
||||
# to make it's connection the first time.
|
||||
if not cl.is_alive() and self.cntr > 0:
|
||||
LOG.error(f"{cl.__class__.__name__} is not alive!!! Resetting")
|
||||
client_factory.create().reset()
|
||||
# else:
|
||||
# # See if we should reset the aprs-is client
|
||||
# # Due to losing a keepalive from them
|
||||
# delta_dict = utils.parse_delta_str(last_msg_time)
|
||||
# delta = datetime.timedelta(**delta_dict)
|
||||
#
|
||||
# if delta > self.max_delta:
|
||||
# # We haven't gotten a keepalive from aprs-is in a while
|
||||
# # reset the connection.a
|
||||
# if not client.KISSClient.is_enabled():
|
||||
# LOG.warning(f"Resetting connection to APRS-IS {delta}")
|
||||
# client.factory.create().reset()
|
||||
# Go through the registered keepalive collectors
|
||||
# and check them as well as call log.
|
||||
collect = keepalive_collector.KeepAliveCollector()
|
||||
collect.check()
|
||||
collect.log()
|
||||
|
||||
# Check version every day
|
||||
delta = now - self.checker_time
|
54
aprsd/threads/keepalive_collector.py
Normal file
54
aprsd/threads/keepalive_collector.py
Normal file
@ -0,0 +1,54 @@
|
||||
import logging
|
||||
from typing import Callable, Protocol, runtime_checkable
|
||||
|
||||
from aprsd.utils import singleton
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class KeepAliveProducer(Protocol):
|
||||
"""The KeepAliveProducer protocol is used to define the interface for running Keepalive checks."""
|
||||
def keepalive_check(self) -> dict:
|
||||
"""Check for keepalive."""
|
||||
...
|
||||
|
||||
def keepalive_log(self):
|
||||
"""Log any keepalive information."""
|
||||
...
|
||||
|
||||
|
||||
@singleton
|
||||
class KeepAliveCollector:
|
||||
"""The Collector class is used to collect stats from multiple StatsProducer instances."""
|
||||
def __init__(self):
|
||||
self.producers: list[Callable] = []
|
||||
|
||||
def check(self) -> None:
|
||||
"""Do any keepalive checks."""
|
||||
for name in self.producers:
|
||||
cls = name()
|
||||
try:
|
||||
cls.keepalive_check()
|
||||
except Exception as e:
|
||||
LOG.error(f"Error in producer {name} (check): {e}")
|
||||
|
||||
def log(self) -> None:
|
||||
"""Log any relevant information during a KeepAlive check"""
|
||||
for name in self.producers:
|
||||
cls = name()
|
||||
try:
|
||||
cls.keepalive_log()
|
||||
except Exception as e:
|
||||
LOG.error(f"Error in producer {name} (check): {e}")
|
||||
|
||||
def register(self, producer_name: Callable):
|
||||
if not isinstance(producer_name, KeepAliveProducer):
|
||||
raise TypeError(f"Producer {producer_name} is not a KeepAliveProducer")
|
||||
self.producers.append(producer_name)
|
||||
|
||||
def unregister(self, producer_name: Callable):
|
||||
if not isinstance(producer_name, KeepAliveProducer):
|
||||
raise TypeError(f"Producer {producer_name} is not a KeepAliveProducer")
|
||||
self.producers.remove(producer_name)
|
Loading…
Reference in New Issue
Block a user