From bd005f628dc98bdb009431b858967afe8a470131 Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 29 Mar 2024 11:51:15 -0400 Subject: [PATCH] Reworked the stats making the rpc server obsolete. This patch implements a new stats collector paradigm which uses the typing Protocol. Any object that wants to supply stats to the collector has to implement the aprsd.stats.collector.StatsProducer protocol, which at the current time is implementing a stats() method on the object. Then register the stats singleton producer with the collector by calling collector.Collector().register_producer() This only works if the stats producer object is a singleton. --- aprsd/client.py | 80 +++++++++-- aprsd/clients/aprsis.py | 9 +- aprsd/cmds/list_plugins.py | 2 +- aprsd/cmds/listen.py | 17 +-- aprsd/cmds/server.py | 15 +- aprsd/main.py | 5 +- aprsd/packets/packet_list.py | 13 +- aprsd/packets/tracker.py | 47 +++---- aprsd/packets/watch_list.py | 22 ++- aprsd/plugin.py | 24 +++- aprsd/plugins/email.py | 33 ++++- aprsd/plugins/query.py | 81 ----------- aprsd/stats.py | 265 ----------------------------------- aprsd/threads/__init__.py | 5 +- aprsd/threads/aprsd.py | 12 ++ aprsd/threads/keep_alive.py | 103 +++++++------- aprsd/threads/log_monitor.py | 5 + aprsd/threads/rx.py | 4 +- aprsd/utils/__init__.py | 12 ++ aprsd/utils/objectstore.py | 7 +- 20 files changed, 286 insertions(+), 475 deletions(-) delete mode 100644 aprsd/plugins/query.py delete mode 100644 aprsd/stats.py diff --git a/aprsd/client.py b/aprsd/client.py index 2561692..de5006f 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -1,4 +1,5 @@ import abc +import datetime import logging import time @@ -9,7 +10,7 @@ from oslo_config import cfg from aprsd import exception from aprsd.clients import aprsis, fake, kiss from aprsd.packets import core, packet_list -from aprsd.utils import trace +from aprsd.utils import singleton, trace CONF = cfg.CONF @@ -24,6 +25,26 @@ TRANSPORT_FAKE = "fake" # Correct config factory = None +@singleton +class APRSClientStats: + def stats(self): + client = factory.create() + stats = { + "transport": client.transport(), + "filter": client.filter, + "connected": client.connected, + } + + if client.transport() == TRANSPORT_APRSIS: + stats["server_string"] = client.client.server_string + stats["sever_keepalive"] = client.client.aprsd_keepalive + elif client.transport() == TRANSPORT_TCPKISS: + stats["host"] = CONF.kiss_tcp.host + stats["port"] = CONF.kiss_tcp.port + elif client.transport() == TRANSPORT_SERIALKISS: + stats["device"] = CONF.kiss_serial.device + return stats + class Client: """Singleton client class that constructs the aprslib connection.""" @@ -32,7 +53,6 @@ class Client: _client = None connected = False - server_string = None filter = None def __new__(cls, *args, **kwargs): @@ -43,6 +63,10 @@ class Client: cls._instance._create_client() return cls._instance + @abc.abstractmethod + def stats(self) -> dict: + pass + def set_filter(self, filter): self.filter = filter if self._client: @@ -102,11 +126,30 @@ class Client: def consumer(self, callback, blocking=False, immortal=False, raw=False): pass + @abc.abstractmethod + def is_alive(self): + pass + class APRSISClient(Client): _client = None + def __init__(self): + max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} + self.max_delta = datetime.timedelta(**max_timeout) + + def stats(self) -> dict: + stats = {} + if self.is_configured(): + stats = { + "server_string": self._client.server_string, + "sever_keepalive": self._client.aprsd_keepalive, + "filter": self.filter, + } + + return stats + @staticmethod def is_enabled(): # Defaults to True if the enabled flag is non existent @@ -138,10 +181,15 @@ class APRSISClient(Client): return True return True + def _is_stale_connection(self): + delta = datetime.datetime.now() - self._client.aprsd_keepalive + if delta > self.max_delta: + LOG.error(f"Connection is stale, last heard {delta} ago.") + return True + def is_alive(self): if self._client: - LOG.warning(f"APRS_CLIENT {self._client} alive? {self._client.is_alive()}") - return self._client.is_alive() + return self._client.is_alive() and not self._is_stale_connection() else: LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") return False @@ -159,25 +207,25 @@ class APRSISClient(Client): password = CONF.aprs_network.password host = CONF.aprs_network.host port = CONF.aprs_network.port - connected = False + self.connected = False backoff = 1 aprs_client = None - while not connected: + while not self.connected: try: LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.") aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) # Force the log to be the same aprs_client.logger = LOG aprs_client.connect() - connected = True + self.connected = True backoff = 1 except LoginError as e: LOG.error(f"Failed to login to APRS-IS Server '{e}'") - connected = False + self.connected = False time.sleep(backoff) except Exception as e: LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") - connected = False + self.connected = False time.sleep(backoff) # Don't allow the backoff to go to inifinity. if backoff > 5: @@ -201,6 +249,14 @@ class KISSClient(Client): _client = None + def stats(self) -> dict: + stats = {} + if self.is_configured(): + return { + "transport": self.transport(), + } + return stats + @staticmethod def is_enabled(): """Return if tcp or serial KISS is enabled.""" @@ -268,6 +324,7 @@ class KISSClient(Client): def setup_connection(self): self._client = kiss.KISS3Client() + self.connected = True return self._client def consumer(self, callback, blocking=False, immortal=False, raw=False): @@ -276,6 +333,9 @@ class KISSClient(Client): class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): + def stats(self) -> dict: + return {} + @staticmethod def is_enabled(): if CONF.fake_client.enabled: @@ -290,6 +350,7 @@ class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): return True def setup_connection(self): + self.connected = True return fake.APRSDFakeClient() @staticmethod @@ -329,7 +390,6 @@ class ClientFactory: key = TRANSPORT_FAKE builder = self._builders.get(key) - LOG.debug(f"ClientFactory Creating client of type '{key}'") if not builder: raise ValueError(key) return builder() diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py index 07052ac..06b2d6b 100644 --- a/aprsd/clients/aprsis.py +++ b/aprsd/clients/aprsis.py @@ -1,3 +1,4 @@ +import datetime import logging import select import threading @@ -11,7 +12,6 @@ from aprslib.exceptions import ( import wrapt import aprsd -from aprsd import stats from aprsd.packets import core @@ -24,6 +24,9 @@ class Aprsdis(aprslib.IS): # flag to tell us to stop thread_stop = False + # date for last time we heard from the server + aprsd_keepalive = datetime.datetime.now() + # timeout in seconds select_timeout = 1 lock = threading.Lock() @@ -142,7 +145,6 @@ class Aprsdis(aprslib.IS): self.logger.info(f"Connected to {server_string}") self.server_string = server_string - stats.APRSDStats().set_aprsis_server(server_string) except LoginError as e: self.logger.error(str(e)) @@ -176,13 +178,14 @@ class Aprsdis(aprslib.IS): try: for line in self._socket_readlines(blocking): if line[0:1] != b"#": + self.aprsd_keepalive = datetime.datetime.now() if raw: callback(line) else: callback(self._parse(line)) else: self.logger.debug("Server: %s", line.decode("utf8")) - stats.APRSDStats().set_aprsis_keepalive() + self.aprsd_keepalive = datetime.datetime.now() except ParseError as exp: self.logger.log( 11, diff --git a/aprsd/cmds/list_plugins.py b/aprsd/cmds/list_plugins.py index 357d725..6c4dc08 100644 --- a/aprsd/cmds/list_plugins.py +++ b/aprsd/cmds/list_plugins.py @@ -21,7 +21,7 @@ from aprsd import cli_helper from aprsd import plugin as aprsd_plugin from aprsd.main import cli from aprsd.plugins import ( - email, fortune, location, notify, ping, query, time, version, weather, + email, fortune, location, notify, ping, time, version, weather, ) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 93564ac..b30eee6 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -15,10 +15,10 @@ from rich.console import Console # local imports here import aprsd -from aprsd import cli_helper, client, packets, plugin, stats, threads +from aprsd import cli_helper, client, packets, plugin, threads from aprsd.main import cli from aprsd.packets import log as packet_log -from aprsd.rpc import server as rpc_server +from aprsd.stats import collector from aprsd.threads import rx @@ -38,7 +38,7 @@ def signal_handler(sig, frame): ), ) time.sleep(5) - LOG.info(stats.APRSDStats()) + LOG.info(collector.Collector().collect()) class APRSDListenThread(rx.APRSDRXThread): @@ -169,6 +169,7 @@ def listen( LOG.info(f"APRSD Listen Started version: {aprsd.__version__}") CONF.log_opt_values(LOG, logging.DEBUG) + collector.Collector() # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") @@ -192,10 +193,6 @@ def listen( keepalive = threads.KeepAliveThread() # keepalive.start() - if CONF.rpc_settings.enabled: - rpc = rpc_server.APRSDRPCThread() - rpc.start() - pm = None pm = plugin.PluginManager() if load_plugins: @@ -206,6 +203,8 @@ def listen( "Not Loading any plugins use --load-plugins to load what's " "defined in the config file.", ) + stats_thread = threads.APRSDStatsStoreThread() + stats_thread.start() LOG.debug("Create APRSDListenThread") listen_thread = APRSDListenThread( @@ -221,6 +220,4 @@ def listen( keepalive.join() LOG.debug("listen_thread Join") listen_thread.join() - - if CONF.rpc_settings.enabled: - rpc.join() + stats_thread.join() diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 89a6c74..c644270 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -10,7 +10,6 @@ from aprsd import cli_helper, client from aprsd import main as aprsd_main from aprsd import packets, plugin, threads, utils from aprsd.main import cli -from aprsd.rpc import server as rpc_server from aprsd.threads import registry, rx, tx @@ -47,6 +46,14 @@ def server(ctx, flush): # Initialize the client factory and create # The correct client object ready for use client.ClientFactory.setup() + if not client.factory.is_client_enabled(): + LOG.error("No Clients are enabled in config.") + sys.exit(-1) + + # Creates the client object + LOG.info("Creating client connection") + aprs_client = client.factory.create() + LOG.info(aprs_client) # Create the initial PM singleton and Register plugins # We register plugins first here so we can register each @@ -97,6 +104,9 @@ def server(ctx, flush): keepalive = threads.KeepAliveThread() keepalive.start() + stats_thread = threads.APRSDStatsStoreThread() + stats_thread.start() + rx_thread = rx.APRSDPluginRXThread( packet_queue=threads.packet_queue, ) @@ -106,7 +116,6 @@ def server(ctx, flush): rx_thread.start() process_thread.start() - packets.PacketTrack().restart() if CONF.enable_beacon: LOG.info("Beacon Enabled. Starting Beacon thread.") bcn_thread = tx.BeaconSendThread() @@ -118,8 +127,6 @@ def server(ctx, flush): registry_thread.start() if CONF.rpc_settings.enabled: - rpc = rpc_server.APRSDRPCThread() - rpc.start() log_monitor = threads.log_monitor.LogMonitorThread() log_monitor.start() diff --git a/aprsd/main.py b/aprsd/main.py index 26c228e..e6eba5b 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -35,7 +35,8 @@ from oslo_config import cfg, generator # local imports here import aprsd -from aprsd import cli_helper, packets, stats, threads, utils +from aprsd import cli_helper, packets, threads, utils +from aprsd.stats import collector # setup the global logger @@ -96,7 +97,7 @@ def signal_handler(sig, frame): packets.PacketTrack().save() packets.WatchList().save() packets.SeenList().save() - LOG.info(stats.APRSDStats()) + LOG.info(collector.Collector().collect()) # signal.signal(signal.SIGTERM, sys.exit(0)) # sys.exit(0) diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 3d75a07..4d94efb 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -6,7 +6,6 @@ import threading from oslo_config import cfg import wrapt -from aprsd import stats from aprsd.packets import seen_list @@ -38,7 +37,6 @@ class PacketList(MutableMapping): self.types[ptype] = {"tx": 0, "rx": 0} self.types[ptype]["rx"] += 1 seen_list.SeenList().update_seen(packet) - stats.APRSDStats().rx(packet) @wrapt.synchronized(lock) def tx(self, packet): @@ -50,7 +48,6 @@ class PacketList(MutableMapping): self.types[ptype] = {"tx": 0, "rx": 0} self.types[ptype]["tx"] += 1 seen_list.SeenList().update_seen(packet) - stats.APRSDStats().tx(packet) @wrapt.synchronized(lock) def add(self, packet): @@ -97,3 +94,13 @@ class PacketList(MutableMapping): @wrapt.synchronized(lock) def total_tx(self): return self._total_tx + + def stats(self) -> dict: + stats = { + "total_tracked": self.total_tx() + self.total_rx(), + "rx": self.total_rx(), + "tx": self.total_tx(), + "packets": self.types, + } + + return stats diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 1246dc1..288ac6b 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -4,7 +4,6 @@ import threading from oslo_config import cfg import wrapt -from aprsd.threads import tx from aprsd.utils import objectstore @@ -58,6 +57,22 @@ class PacketTrack(objectstore.ObjectStoreMixin): def values(self): return self.data.values() + @wrapt.synchronized(lock) + def stats(self): + stats = { + "total_tracked": self.total_tracked, + } + pkts = {} + for key in self.data: + pkts[key] = { + "last_send_time": self.data[key].last_send_time, + "last_send_attempt": self.data[key]._last_send_attempt, + "retry_count": self.data[key].retry_count, + "message": self.data[key].raw, + } + stats["packets"] = pkts + return stats + @wrapt.synchronized(lock) def __len__(self): return len(self.data) @@ -79,33 +94,3 @@ class PacketTrack(objectstore.ObjectStoreMixin): del self.data[key] except KeyError: pass - - def restart(self): - """Walk the list of messages and restart them if any.""" - for key in self.data.keys(): - pkt = self.data[key] - if pkt._last_send_attempt < pkt.retry_count: - tx.send(pkt) - - def _resend(self, packet): - packet._last_send_attempt = 0 - tx.send(packet) - - def restart_delayed(self, count=None, most_recent=True): - """Walk the list of delayed messages and restart them if any.""" - if not count: - # Send all the delayed messages - for key in self.data.keys(): - pkt = self.data[key] - if pkt._last_send_attempt == pkt._retry_count: - self._resend(pkt) - else: - # They want to resend delayed messages - tmp = sorted( - self.data.items(), - reverse=most_recent, - key=lambda x: x[1].last_send_time, - ) - pkt_list = tmp[:count] - for (_key, pkt) in pkt_list: - self._resend(pkt) diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index dee0631..10684ee 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -28,7 +28,7 @@ class WatchList(objectstore.ObjectStoreMixin): return cls._instance def __init__(self, config=None): - ring_size = CONF.watch_list.packet_keep_count + CONF.watch_list.packet_keep_count if CONF.watch_list.callsigns: for callsign in CONF.watch_list.callsigns: @@ -38,12 +38,22 @@ class WatchList(objectstore.ObjectStoreMixin): # last time a message was seen by aprs-is. For now this # is all we can do. self.data[call] = { - "last": datetime.datetime.now(), - "packets": utils.RingBuffer( - ring_size, - ), + "last": None, + "packet": None, } + @wrapt.synchronized(lock) + def stats(self) -> dict: + stats = {} + for callsign in self.data: + stats[callsign] = { + "last": self.data[callsign]["last"], + "packet": self.data[callsign]["packet"], + "age": self.age(callsign), + "old": self.is_old(callsign), + } + return stats + def is_enabled(self): return CONF.watch_list.enabled @@ -58,7 +68,7 @@ class WatchList(objectstore.ObjectStoreMixin): callsign = packet.from_call if self.callsign_in_watchlist(callsign): self.data[callsign]["last"] = datetime.datetime.now() - self.data[callsign]["packets"].append(packet) + self.data[callsign]["packet"] = packet def last_seen(self, callsign): if self.callsign_in_watchlist(callsign): diff --git a/aprsd/plugin.py b/aprsd/plugin.py index f73e0c6..32c7010 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -344,6 +344,28 @@ class PluginManager: self._watchlist_pm = pluggy.PluginManager("aprsd") self._watchlist_pm.add_hookspecs(APRSDPluginSpec) + def stats(self) -> dict: + """Collect and return stats for all plugins.""" + def full_name_with_qualname(obj): + return "{}.{}".format( + obj.__class__.__module__, + obj.__class__.__qualname__, + ) + + plugin_stats = {} + plugins = self.get_plugins() + if plugins: + + for p in plugins: + plugin_stats[full_name_with_qualname(p)] = { + "enabled": p.enabled, + "rx": p.rx_count, + "tx": p.tx_count, + "version": p.version, + } + + return plugin_stats + def is_plugin(self, obj): for c in inspect.getmro(obj): if issubclass(c, APRSDPluginBase): @@ -369,7 +391,7 @@ class PluginManager: try: module_name, class_name = module_class_string.rsplit(".", 1) module = importlib.import_module(module_name) - module = importlib.reload(module) + #module = importlib.reload(module) except Exception as ex: if not module_name: LOG.error(f"Failed to load Plugin {module_class_string}") diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index f6f273a..2ab12f7 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -11,7 +11,7 @@ import time import imapclient from oslo_config import cfg -from aprsd import packets, plugin, stats, threads +from aprsd import packets, plugin, threads, utils from aprsd.threads import tx from aprsd.utils import trace @@ -60,6 +60,33 @@ class EmailInfo: self._delay = val +@utils.singleton +class EmailStats: + """Singleton object to store stats related to email.""" + _instance = None + tx = 0 + rx = 0 + email_thread_last_time = None + + def stats(self): + if CONF.email_plugin.enabled: + stats = { + "tx": self.tx, + "rx": self.rx, + "last_check_time": self.email_thread_last_time, + } + else: + stats = {} + return stats + + def tx_inc(self): + self.tx += 1 + def rx_inc(self): + self.rx += 1 + def email_thread_update(self): + self.email_thread_last_time = datetime.datetime.now() + + class EmailPlugin(plugin.APRSDRegexCommandPluginBase): """Email Plugin.""" @@ -440,7 +467,7 @@ def send_email(to_addr, content): [to_addr], msg.as_string(), ) - stats.APRSDStats().email_tx_inc() + EmailStats().tx_inc() except Exception: LOG.exception("Sendmail Error!!!!") server.quit() @@ -545,7 +572,7 @@ class APRSDEmailThread(threads.APRSDThread): def loop(self): time.sleep(5) - stats.APRSDStats().email_thread_update() + EmailStats().email_thread_update() # always sleep for 5 seconds and see if we need to check email # This allows CTRL-C to stop the execution of this loop sooner # than check_email_delay time diff --git a/aprsd/plugins/query.py b/aprsd/plugins/query.py deleted file mode 100644 index 871b249..0000000 --- a/aprsd/plugins/query.py +++ /dev/null @@ -1,81 +0,0 @@ -import datetime -import logging -import re - -from oslo_config import cfg - -from aprsd import packets, plugin -from aprsd.packets import tracker -from aprsd.utils import trace - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -class QueryPlugin(plugin.APRSDRegexCommandPluginBase): - """Query command.""" - - command_regex = r"^\!.*" - command_name = "query" - short_description = "APRSD Owner command to query messages in the MsgTrack" - - def setup(self): - """Do any plugin setup here.""" - if not CONF.query_plugin.callsign: - LOG.error("Config query_plugin.callsign not set. Disabling plugin") - self.enabled = False - self.enabled = True - - @trace.trace - def process(self, packet: packets.MessagePacket): - LOG.info("Query COMMAND") - - fromcall = packet.from_call - message = packet.get("message_text", None) - - pkt_tracker = tracker.PacketTrack() - now = datetime.datetime.now() - reply = "Pending messages ({}) {}".format( - len(pkt_tracker), - now.strftime("%H:%M:%S"), - ) - - searchstring = "^" + CONF.query_plugin.callsign + ".*" - # only I can do admin commands - if re.search(searchstring, fromcall): - - # resend last N most recent: "!3" - r = re.search(r"^\!([0-9]).*", message) - if r is not None: - if len(pkt_tracker) > 0: - last_n = r.group(1) - reply = packets.NULL_MESSAGE - LOG.debug(reply) - pkt_tracker.restart_delayed(count=int(last_n)) - else: - reply = "No pending msgs to resend" - LOG.debug(reply) - return reply - - # resend all: "!a" - r = re.search(r"^\![aA].*", message) - if r is not None: - if len(pkt_tracker) > 0: - reply = packets.NULL_MESSAGE - LOG.debug(reply) - pkt_tracker.restart_delayed() - else: - reply = "No pending msgs" - LOG.debug(reply) - return reply - - # delete all: "!d" - r = re.search(r"^\![dD].*", message) - if r is not None: - reply = "Deleted ALL pending msgs." - LOG.debug(reply) - pkt_tracker.flush() - return reply - - return reply diff --git a/aprsd/stats.py b/aprsd/stats.py deleted file mode 100644 index cb20eb8..0000000 --- a/aprsd/stats.py +++ /dev/null @@ -1,265 +0,0 @@ -import datetime -import logging -import threading - -from oslo_config import cfg -import wrapt - -import aprsd -from aprsd import packets, plugin, utils - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -class APRSDStats: - - _instance = None - lock = threading.Lock() - - start_time = None - _aprsis_server = None - _aprsis_keepalive = None - - _email_thread_last_time = None - _email_tx = 0 - _email_rx = 0 - - _mem_current = 0 - _mem_peak = 0 - - _thread_info = {} - - _pkt_cnt = { - "Packet": { - "tx": 0, - "rx": 0, - }, - "AckPacket": { - "tx": 0, - "rx": 0, - }, - "GPSPacket": { - "tx": 0, - "rx": 0, - }, - "StatusPacket": { - "tx": 0, - "rx": 0, - }, - "MicEPacket": { - "tx": 0, - "rx": 0, - }, - "MessagePacket": { - "tx": 0, - "rx": 0, - }, - "WeatherPacket": { - "tx": 0, - "rx": 0, - }, - "ObjectPacket": { - "tx": 0, - "rx": 0, - }, - } - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - # any init here - cls._instance.start_time = datetime.datetime.now() - cls._instance._aprsis_keepalive = datetime.datetime.now() - return cls._instance - - @wrapt.synchronized(lock) - @property - def uptime(self): - return datetime.datetime.now() - self.start_time - - @wrapt.synchronized(lock) - @property - def memory(self): - return self._mem_current - - @wrapt.synchronized(lock) - def set_memory(self, memory): - self._mem_current = memory - - @wrapt.synchronized(lock) - @property - def memory_peak(self): - return self._mem_peak - - @wrapt.synchronized(lock) - def set_memory_peak(self, memory): - self._mem_peak = memory - - @wrapt.synchronized(lock) - def set_thread_info(self, thread_info): - self._thread_info = thread_info - - @wrapt.synchronized(lock) - @property - def thread_info(self): - return self._thread_info - - @wrapt.synchronized(lock) - @property - def aprsis_server(self): - return self._aprsis_server - - @wrapt.synchronized(lock) - def set_aprsis_server(self, server): - self._aprsis_server = server - - @wrapt.synchronized(lock) - @property - def aprsis_keepalive(self): - return self._aprsis_keepalive - - @wrapt.synchronized(lock) - def set_aprsis_keepalive(self): - self._aprsis_keepalive = datetime.datetime.now() - - def rx(self, packet): - pkt_type = packet.__class__.__name__ - if pkt_type not in self._pkt_cnt: - self._pkt_cnt[pkt_type] = { - "tx": 0, - "rx": 0, - } - self._pkt_cnt[pkt_type]["rx"] += 1 - - def tx(self, packet): - pkt_type = packet.__class__.__name__ - if pkt_type not in self._pkt_cnt: - self._pkt_cnt[pkt_type] = { - "tx": 0, - "rx": 0, - } - self._pkt_cnt[pkt_type]["tx"] += 1 - - @wrapt.synchronized(lock) - @property - def msgs_tracked(self): - return packets.PacketTrack().total_tracked - - @wrapt.synchronized(lock) - @property - def email_tx(self): - return self._email_tx - - @wrapt.synchronized(lock) - def email_tx_inc(self): - self._email_tx += 1 - - @wrapt.synchronized(lock) - @property - def email_rx(self): - return self._email_rx - - @wrapt.synchronized(lock) - def email_rx_inc(self): - self._email_rx += 1 - - @wrapt.synchronized(lock) - @property - def email_thread_time(self): - return self._email_thread_last_time - - @wrapt.synchronized(lock) - def email_thread_update(self): - self._email_thread_last_time = datetime.datetime.now() - - def stats(self): - now = datetime.datetime.now() - if self._email_thread_last_time: - last_update = str(now - self._email_thread_last_time) - else: - last_update = "never" - - if self._aprsis_keepalive: - last_aprsis_keepalive = str(now - self._aprsis_keepalive) - else: - last_aprsis_keepalive = "never" - - pm = plugin.PluginManager() - plugins = pm.get_plugins() - plugin_stats = {} - if plugins: - def full_name_with_qualname(obj): - return "{}.{}".format( - obj.__class__.__module__, - obj.__class__.__qualname__, - ) - - for p in plugins: - plugin_stats[full_name_with_qualname(p)] = { - "enabled": p.enabled, - "rx": p.rx_count, - "tx": p.tx_count, - "version": p.version, - } - - wl = packets.WatchList() - sl = packets.SeenList() - pl = packets.PacketList() - - stats = { - "aprsd": { - "version": aprsd.__version__, - "uptime": utils.strfdelta(self.uptime), - "callsign": CONF.callsign, - "memory_current": int(self.memory), - "memory_current_str": utils.human_size(self.memory), - "memory_peak": int(self.memory_peak), - "memory_peak_str": utils.human_size(self.memory_peak), - "threads": self._thread_info, - "watch_list": wl.get_all(), - "seen_list": sl.get_all(), - }, - "aprs-is": { - "server": str(self.aprsis_server), - "callsign": CONF.aprs_network.login, - "last_update": last_aprsis_keepalive, - }, - "packets": { - "total_tracked": int(pl.total_tx() + pl.total_rx()), - "total_sent": int(pl.total_tx()), - "total_received": int(pl.total_rx()), - "by_type": self._pkt_cnt, - }, - "messages": { - "sent": self._pkt_cnt["MessagePacket"]["tx"], - "received": self._pkt_cnt["MessagePacket"]["tx"], - "ack_sent": self._pkt_cnt["AckPacket"]["tx"], - }, - "email": { - "enabled": CONF.email_plugin.enabled, - "sent": int(self._email_tx), - "received": int(self._email_rx), - "thread_last_update": last_update, - }, - "plugins": plugin_stats, - } - return stats - - def __str__(self): - pl = packets.PacketList() - return ( - "Uptime:{} Msgs TX:{} RX:{} " - "ACK: TX:{} RX:{} " - "Email TX:{} RX:{} LastLoop:{} ".format( - self.uptime, - pl.total_tx(), - pl.total_rx(), - self._pkt_cnt["AckPacket"]["tx"], - self._pkt_cnt["AckPacket"]["rx"], - self._email_tx, - self._email_rx, - self._email_thread_last_time, - ) - ) diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index 9220a65..fd01292 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -4,7 +4,10 @@ import queue # aprsd.threads from .aprsd import APRSDThread, APRSDThreadList # noqa: F401 from .keep_alive import KeepAliveThread # noqa: F401 -from .rx import APRSDRXThread, APRSDDupeRXThread, APRSDProcessPacketThread # noqa: F401 +from .rx import ( # noqa: F401 + APRSDDupeRXThread, APRSDProcessPacketThread, APRSDRXThread, +) +from .stats import APRSDStatsStoreThread packet_queue = queue.Queue(maxsize=20) diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index d815af6..c1044d1 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -47,6 +47,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def run(self): LOG.debug("Starting") while not self._should_quit(): + self.loop_count += 1 can_loop = self.loop() self.loop_interval += 1 self._last_loop = datetime.datetime.now() @@ -71,6 +72,17 @@ class APRSDThreadList: cls.threads_list = [] return cls._instance + def stats(self) -> dict: + stats = {} + for th in self.threads_list: + stats[th.__class__.__name__] = { + "name": th.name, + "alive": th.is_alive(), + "age": th.loop_age(), + "loop_count": th.loop_count, + } + return stats + @wrapt.synchronized(lock) def add(self, thread_obj): self.threads_list.append(thread_obj) diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 940802d..73f1729 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -5,7 +5,8 @@ import tracemalloc from oslo_config import cfg -from aprsd import client, packets, stats, utils +from aprsd import client, packets, utils +from aprsd.stats import collector from aprsd.threads import APRSDThread, APRSDThreadList @@ -24,61 +25,68 @@ class KeepAliveThread(APRSDThread): self.max_delta = datetime.timedelta(**max_timeout) def loop(self): - if self.cntr % 60 == 0: - pkt_tracker = packets.PacketTrack() - stats_obj = stats.APRSDStats() + if self.loop_count % 60 == 0: + stats_json = collector.Collector().collect() + #LOG.debug(stats_json) pl = packets.PacketList() thread_list = APRSDThreadList() now = datetime.datetime.now() - last_email = stats_obj.email_thread_time - if last_email: - email_thread_time = utils.strfdelta(now - last_email) + + if "EmailStats" in stats_json: + email_stats = stats_json["EmailStats"] + if email_stats["last_check_time"]: + email_thread_time = utils.strfdelta(now - email_stats["last_check_time"]) + else: + email_thread_time = "N/A" else: email_thread_time = "N/A" - last_msg_time = utils.strfdelta(now - stats_obj.aprsis_keepalive) + if "APRSClientStats" in stats_json and stats_json["APRSClientStats"].get("transport") == "aprsis": + if stats_json["APRSClientStats"].get("server_keepalive"): + last_msg_time = utils.strfdelta(now - stats_json["APRSClientStats"]["server_keepalive"]) + else: + last_msg_time = "N/A" + else: + last_msg_time = "N/A" - current, peak = tracemalloc.get_traced_memory() - stats_obj.set_memory(current) - stats_obj.set_memory_peak(peak) + tracked_packets = stats_json["PacketTrack"]["total_tracked"] + tx_msg = 0 + rx_msg = 0 + if "PacketList" in stats_json: + msg_packets = stats_json["PacketList"].get("MessagePacket") + if msg_packets: + tx_msg = msg_packets.get("tx", 0) + rx_msg = msg_packets.get("rx", 0) - login = CONF.callsign - - tracked_packets = len(pkt_tracker) keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" ).format( - login, - utils.strfdelta(stats_obj.uptime), + stats_json["APRSDStats"]["callsign"], + stats_json["APRSDStats"]["uptime"], pl.total_rx(), pl.total_tx(), tracked_packets, - stats_obj._pkt_cnt["MessagePacket"]["tx"], - stats_obj._pkt_cnt["MessagePacket"]["rx"], + tx_msg, + rx_msg, last_msg_time, email_thread_time, - utils.human_size(current), - utils.human_size(peak), + stats_json["APRSDStats"]["memory_current_str"], + stats_json["APRSDStats"]["memory_peak_str"], len(thread_list), ) LOG.info(keepalive) - thread_out = [] - thread_info = {} - for thread in thread_list.threads_list: - alive = thread.is_alive() - age = thread.loop_age() - key = thread.__class__.__name__ - thread_out.append(f"{key}:{alive}:{age}") - if key not in thread_info: - thread_info[key] = {} - thread_info[key]["alive"] = alive - thread_info[key]["age"] = age - if not alive: - LOG.error(f"Thread {thread}") - LOG.info(",".join(thread_out)) - stats_obj.set_thread_info(thread_info) + if "APRSDThreadList" in stats_json: + thread_list = stats_json["APRSDThreadList"] + for thread_name in thread_list: + thread = thread_list[thread_name] + alive = thread["alive"] + age = thread["age"] + key = thread["name"] + if not alive: + LOG.error(f"Thread {thread}") + LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}") # check the APRS connection cl = client.factory.create() @@ -90,18 +98,18 @@ class KeepAliveThread(APRSDThread): if not cl.is_alive() and self.cntr > 0: LOG.error(f"{cl.__class__.__name__} is not alive!!! Resetting") client.factory.create().reset() - else: - # See if we should reset the aprs-is client - # Due to losing a keepalive from them - delta_dict = utils.parse_delta_str(last_msg_time) - delta = datetime.timedelta(**delta_dict) - - if delta > self.max_delta: - # We haven't gotten a keepalive from aprs-is in a while - # reset the connection.a - if not client.KISSClient.is_enabled(): - LOG.warning(f"Resetting connection to APRS-IS {delta}") - client.factory.create().reset() + # else: + # # See if we should reset the aprs-is client + # # Due to losing a keepalive from them + # delta_dict = utils.parse_delta_str(last_msg_time) + # delta = datetime.timedelta(**delta_dict) + # + # if delta > self.max_delta: + # # We haven't gotten a keepalive from aprs-is in a while + # # reset the connection.a + # if not client.KISSClient.is_enabled(): + # LOG.warning(f"Resetting connection to APRS-IS {delta}") + # client.factory.create().reset() # Check version every day delta = now - self.checker_time @@ -110,6 +118,5 @@ class KeepAliveThread(APRSDThread): level, msg = utils._check_version() if level: LOG.warning(msg) - self.cntr += 1 time.sleep(1) return True diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py index 8b93ab5..2d95d06 100644 --- a/aprsd/threads/log_monitor.py +++ b/aprsd/threads/log_monitor.py @@ -20,6 +20,11 @@ class LogEntries: cls._instance = super().__new__(cls) return cls._instance + def stats(self) -> dict: + return { + "log_entries": self.entries, + } + @wrapt.synchronized(lock) def add(self, entry): self.entries.append(entry) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index e78a204..3da957a 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -155,7 +155,6 @@ class APRSDProcessPacketThread(APRSDThread): def __init__(self, packet_queue): self.packet_queue = packet_queue super().__init__("ProcessPKT") - self._loop_cnt = 1 def process_ack_packet(self, packet): """We got an ack for a message, no need to resend it.""" @@ -178,12 +177,11 @@ class APRSDProcessPacketThread(APRSDThread): self.process_packet(packet) except queue.Empty: pass - self._loop_cnt += 1 return True def process_packet(self, packet): """Process a packet received from aprs-is server.""" - LOG.debug(f"ProcessPKT-LOOP {self._loop_cnt}") + LOG.debug(f"ProcessPKT-LOOP {self.loop_count}") our_call = CONF.callsign.lower() from_call = packet.from_call diff --git a/aprsd/utils/__init__.py b/aprsd/utils/__init__.py index 1924172..eb24fac 100644 --- a/aprsd/utils/__init__.py +++ b/aprsd/utils/__init__.py @@ -1,6 +1,7 @@ """Utilities and helper functions.""" import errno +import functools import os import re import sys @@ -22,6 +23,17 @@ else: from collections.abc import MutableMapping +def singleton(cls): + """Make a class a Singleton class (only one instance)""" + @functools.wraps(cls) + def wrapper_singleton(*args, **kwargs): + if wrapper_singleton.instance is None: + wrapper_singleton.instance = cls(*args, **kwargs) + return wrapper_singleton.instance + wrapper_singleton.instance = None + return wrapper_singleton + + def env(*vars, **kwargs): """This returns the first environment variable set. if none are non-empty, defaults to '' or keyword arg default diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index 7431b7e..1abcaf5 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -71,12 +71,13 @@ class ObjectStoreMixin: if not CONF.enable_save: return if len(self) > 0: + save_filename = self._save_filename() LOG.info( f"{self.__class__.__name__}::Saving" - f" {len(self)} entries to disk at" - f"{CONF.save_location}", + f" {len(self)} entries to disk at " + f"{save_filename}", ) - with open(self._save_filename(), "wb+") as fp: + with open(save_filename, "wb+") as fp: pickle.dump(self._dump(), fp) else: LOG.debug(