From 71d72adf065f1fb96d4e198612a98ae4322a3289 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 2 Apr 2024 14:07:37 -0400 Subject: [PATCH] Allow stats collector to serialize upon creation This does some cleanup with the stats collector and usage of the stats. The patch adds a new optional param to the collector's collect() method to tell the object to provide serializable stats. This is used for the webchat app that sends stats to the browser. --- aprsd/client.py | 34 ++++++++++++++++++++------ aprsd/cmds/webchat.py | 24 +++++++++--------- aprsd/packets/core.py | 42 ++++++++++++++++++++++++++++++-- aprsd/packets/packet_list.py | 2 +- aprsd/packets/tracker.py | 8 +++--- aprsd/packets/watch_list.py | 2 +- aprsd/plugin.py | 2 +- aprsd/plugins/email.py | 7 ++++-- aprsd/stats/app.py | 7 ++++-- aprsd/stats/collector.py | 7 +++--- aprsd/threads/aprsd.py | 8 +++--- aprsd/threads/keep_alive.py | 1 + aprsd/threads/rx.py | 16 ++++++------ aprsd/threads/tx.py | 16 ++++++++++-- aprsd/web/chat/static/js/main.js | 5 ++-- 15 files changed, 132 insertions(+), 49 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 6af9d1e..77782f0 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -28,7 +28,7 @@ factory = None @singleton class APRSClientStats: - def stats(self): + def stats(self, serializable=False): client = factory.create() stats = { "transport": client.transport(), @@ -38,7 +38,10 @@ class APRSClientStats: if client.transport() == TRANSPORT_APRSIS: stats["server_string"] = client.client.server_string - stats["sever_keepalive"] = client.client.aprsd_keepalive + keepalive = client.client.aprsd_keepalive + if keepalive: + keepalive = keepalive.isoformat() + stats["sever_keepalive"] = keepalive elif client.transport() == TRANSPORT_TCPKISS: stats["host"] = CONF.kiss_tcp.host stats["port"] = CONF.kiss_tcp.port @@ -96,7 +99,9 @@ class Client: def reset(self): """Call this to force a rebuild/reconnect.""" + LOG.info("Resetting client connection.") if self._client: + self._client.close() del self._client self._create_client() else: @@ -131,6 +136,10 @@ class Client: def is_alive(self): pass + @abc.abstractmethod + def close(self): + pass + class APRSISClient(Client): @@ -195,6 +204,11 @@ class APRSISClient(Client): LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") return False + def close(self): + if self._client: + self._client.stop() + self._client.close() + @staticmethod def transport(): return TRANSPORT_APRSIS @@ -239,11 +253,10 @@ class APRSISClient(Client): return aprs_client def consumer(self, callback, blocking=False, immortal=False, raw=False): - if self.is_alive(): - self._client.consumer( - callback, blocking=blocking, - immortal=immortal, raw=raw, - ) + self._client.consumer( + callback, blocking=blocking, + immortal=immortal, raw=raw, + ) class KISSClient(Client): @@ -296,6 +309,10 @@ class KISSClient(Client): else: return False + def close(self): + if self._client: + self._client.stop() + @staticmethod def transport(): if CONF.kiss_serial.enabled: @@ -350,6 +367,9 @@ class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): def is_alive(self): return True + def close(self): + pass + def setup_connection(self): self.connected = True return fake.APRSDFakeClient() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 0a3f68e..f88bb21 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -63,7 +63,7 @@ def signal_handler(sig, frame): time.sleep(1.5) # packets.WatchList().save() # packets.SeenList().save() - LOG.info(stats.APRSDStats()) + LOG.info(stats.stats_collector.collect()) LOG.info("Telling flask to bail.") signal.signal(signal.SIGTERM, sys.exit(0)) @@ -378,7 +378,7 @@ def _get_transport(stats): transport = "aprs-is" aprs_connection = ( "APRS-IS Server: " - "{}".format(stats["stats"]["aprs-is"]["server"]) + "{}".format(stats["APRSClientStats"]["server_string"]) ) elif client.KISSClient.is_enabled(): transport = client.KISSClient.transport() @@ -414,12 +414,13 @@ def location(callsign): @flask_app.route("/") def index(): stats = _stats() + LOG.error(stats) # For development html_template = "index.html" LOG.debug(f"Template {html_template}") - transport, aprs_connection = _get_transport(stats) + transport, aprs_connection = _get_transport(stats["stats"]) LOG.debug(f"transport {transport} aprs_connection {aprs_connection}") stats["transport"] = transport @@ -454,18 +455,17 @@ def send_message_status(): def _stats(): - stats_obj = stats.APRSDStats() now = datetime.datetime.now() time_format = "%m-%d-%Y %H:%M:%S" - stats_dict = stats_obj.stats() + stats_dict = stats.stats_collector.collect(serializable=True) # Webchat doesnt need these - if "watch_list" in stats_dict["aprsd"]: - del stats_dict["aprsd"]["watch_list"] - if "seen_list" in stats_dict["aprsd"]: - del stats_dict["aprsd"]["seen_list"] - if "threads" in stats_dict["aprsd"]: - del stats_dict["aprsd"]["threads"] + if "WatchList" in stats_dict: + del stats_dict["WatchList"] + if "seen_list" in stats_dict: + del stats_dict["seen_list"] + if "APRSDThreadList" in stats_dict: + del stats_dict["APRSDThreadList"] # del stats_dict["email"] # del stats_dict["plugins"] # del stats_dict["messages"] @@ -544,7 +544,7 @@ class SendMessageNamespace(Namespace): LOG.debug(f"Long {long}") tx.send( - packets.GPSPacket( + packets.BeaconPacket( from_call=CONF.callsign, to_call="APDW16", latitude=lat, diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 1665e29..87c51c6 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -218,6 +218,11 @@ class BulletinPacket(Packet): bid: Optional[str] = field(default="1") message_text: Optional[str] = field(default=None) + @property + def key(self) -> str: + """Build a key for finding this packet in a dict.""" + return f"{self.from_call}:BLN{self.bid}" + @property def human_info(self) -> str: return f"BLN{self.bid} {self.message_text}" @@ -385,6 +390,14 @@ class BeaconPacket(GPSPacket): f"{self.payload}" ) + @property + def key(self) -> str: + """Build a key for finding this packet in a dict.""" + if self.raw_timestamp: + return f"{self.from_call}:{self.raw_timestamp}" + else: + return f"{self.from_call}:{self.human_info.replace(' ','')}" + @property def human_info(self) -> str: h_str = [] @@ -407,6 +420,11 @@ class MicEPacket(GPSPacket): # 0 to 360 course: int = 0 + @property + def key(self) -> str: + """Build a key for finding this packet in a dict.""" + return f"{self.from_call}:{self.human_info.replace(' ', '')}" + @property def human_info(self) -> str: h_info = super().human_info @@ -428,6 +446,14 @@ class TelemetryPacket(GPSPacket): # 0 to 360 course: int = 0 + @property + def key(self) -> str: + """Build a key for finding this packet in a dict.""" + if self.raw_timestamp: + return f"{self.from_call}:{self.raw_timestamp}" + else: + return f"{self.from_call}:{self.human_info.replace(' ','')}" + @property def human_info(self) -> str: h_info = super().human_info @@ -548,6 +574,14 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin): raw = cls._translate(cls, kvs) # type: ignore return super().from_dict(raw) + @property + def key(self) -> str: + """Build a key for finding this packet in a dict.""" + if self.raw_timestamp: + return f"{self.from_call}:{self.raw_timestamp}" + elif self.wx_raw_timestamp: + return f"{self.from_call}:{self.wx_raw_timestamp}" + @property def human_info(self) -> str: h_str = [] @@ -643,6 +677,11 @@ class ThirdPartyPacket(Packet, DataClassJsonMixin): obj.subpacket = factory(obj.subpacket) # type: ignore return obj + @property + def key(self) -> str: + """Build a key for finding this packet in a dict.""" + return f"{self.from_call}:{self.subpacket.key}" + @property def human_info(self) -> str: sub_info = self.subpacket.human_info @@ -772,8 +811,7 @@ def factory(raw_packet: dict[Any, Any]) -> type[Packet]: if "latitude" in raw: packet_class = GPSPacket else: - LOG.warning(f"Unknown packet type {packet_type}") - LOG.warning(raw) + # LOG.warning(raw) packet_class = UnknownPacket raw.get("addresse", raw.get("to_call")) diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 4d94efb..11ec7fe 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -95,7 +95,7 @@ class PacketList(MutableMapping): def total_tx(self): return self._total_tx - def stats(self) -> dict: + def stats(self, serializable=False) -> dict: stats = { "total_tracked": self.total_tx() + self.total_rx(), "rx": self.total_rx(), diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 288ac6b..1714557 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -58,15 +58,17 @@ class PacketTrack(objectstore.ObjectStoreMixin): return self.data.values() @wrapt.synchronized(lock) - def stats(self): + def stats(self, serializable=False): stats = { "total_tracked": self.total_tracked, } pkts = {} for key in self.data: + last_send_time = self.data[key].last_send_time + last_send_attempt = self.data[key]._last_send_attempt pkts[key] = { - "last_send_time": self.data[key].last_send_time, - "last_send_attempt": self.data[key]._last_send_attempt, + "last_send_time": last_send_time, + "last_send_attempt": last_send_attempt, "retry_count": self.data[key].retry_count, "message": self.data[key].raw, } diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index 713200f..ad8f222 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -43,7 +43,7 @@ class WatchList(objectstore.ObjectStoreMixin): } @wrapt.synchronized(lock) - def stats(self) -> dict: + def stats(self, serializable=False) -> dict: stats = {} for callsign in self.data: stats[callsign] = { diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 23433b4..b923ee9 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -344,7 +344,7 @@ class PluginManager: self._watchlist_pm = pluggy.PluginManager("aprsd") self._watchlist_pm.add_hookspecs(APRSDPluginSpec) - def stats(self) -> dict: + def stats(self, serializable=False) -> dict: """Collect and return stats for all plugins.""" def full_name_with_qualname(obj): return "{}.{}".format( diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index c80adb2..12657a1 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -68,12 +68,15 @@ class EmailStats: rx = 0 email_thread_last_time = None - def stats(self): + def stats(self, serializable=False): if CONF.email_plugin.enabled: + last_check_time = self.email_thread_last_time + if serializable and last_check_time: + last_check_time = last_check_time.isoformat() stats = { "tx": self.tx, "rx": self.rx, - "last_check_time": self.email_thread_last_time, + "last_check_time": last_check_time, } else: stats = {} diff --git a/aprsd/stats/app.py b/aprsd/stats/app.py index 1db8a32..890bf02 100644 --- a/aprsd/stats/app.py +++ b/aprsd/stats/app.py @@ -30,11 +30,14 @@ class APRSDStats: def uptime(self): return datetime.datetime.now() - self.start_time - def stats(self) -> dict: + def stats(self, serializable=False) -> dict: current, peak = tracemalloc.get_traced_memory() + uptime = self.uptime() + if serializable: + uptime = str(uptime) stats = { "version": aprsd.__version__, - "uptime": self.uptime(), + "uptime": uptime, "callsign": CONF.callsign, "memory_current": int(current), "memory_current_str": utils.human_size(current), diff --git a/aprsd/stats/collector.py b/aprsd/stats/collector.py index c6e8c1b..c58b242 100644 --- a/aprsd/stats/collector.py +++ b/aprsd/stats/collector.py @@ -5,7 +5,8 @@ from aprsd.utils import singleton class StatsProducer(Protocol): """The StatsProducer protocol is used to define the interface for collecting stats.""" - def stats(self) -> dict: + def stats(self, serializeable=False) -> dict: + """provide stats in a dictionary format.""" ... @@ -15,11 +16,11 @@ class Collector: def __init__(self): self.producers: dict[str, StatsProducer] = {} - def collect(self): + def collect(self, serializable=False) -> dict: stats = {} for name, producer in self.producers.items(): # No need to put in empty stats - tmp_stats = producer.stats() + tmp_stats = producer.stats(serializable=serializable) if tmp_stats: stats[name] = tmp_stats return stats diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index c1044d1..52220fe 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -13,7 +13,7 @@ LOG = logging.getLogger("APRSD") class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): """Base class for all threads in APRSD.""" - loop_interval = 1 + loop_count = 1 def __init__(self, name): super().__init__(name=name) @@ -49,7 +49,6 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): while not self._should_quit(): self.loop_count += 1 can_loop = self.loop() - self.loop_interval += 1 self._last_loop = datetime.datetime.now() if not can_loop: self.stop() @@ -72,9 +71,12 @@ class APRSDThreadList: cls.threads_list = [] return cls._instance - def stats(self) -> dict: + def stats(self, serializable=False) -> dict: stats = {} for th in self.threads_list: + age = th.loop_age() + if serializable: + age = str(age) stats[th.__class__.__name__] = { "name": th.name, "alive": th.is_alive(), diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 025cf47..98e2208 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -116,5 +116,6 @@ class KeepAliveThread(APRSDThread): level, msg = utils._check_version() if level: LOG.warning(msg) + self.cntr += 1 time.sleep(1) return True diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 3da957a..59956a6 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -6,7 +6,7 @@ import time import aprslib from oslo_config import cfg -from aprsd import client, packets, plugin, stats +from aprsd import client, packets, plugin from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx @@ -27,7 +27,7 @@ class APRSDRXThread(APRSDThread): self._client.stop() def loop(self): - LOG.debug(f"RX_MSG-LOOP {self.loop_interval}") + LOG.debug(f"RX_MSG-LOOP {self.loop_count}") if not self._client: self._client = client.factory.create() time.sleep(1) @@ -53,21 +53,21 @@ class APRSDRXThread(APRSDThread): aprslib.exceptions.ConnectionError, ): LOG.error("Connection dropped, reconnecting") - time.sleep(5) # Force the deletion of the client object connected to aprs # This will cause a reconnect, next time client.get_client() # is called self._client.reset() - except Exception as ex: - LOG.error("Something bad happened!!!") - LOG.exception(ex) - return False + time.sleep(5) + except Exception: + # LOG.exception(ex) + LOG.error("Resetting connection and trying again.") + self._client.reset() + time.sleep(5) # Continue to loop return True def _process_packet(self, *args, **kwargs): """Intermediate callback so we can update the keepalive time.""" - stats.APRSDStats().set_aprsis_keepalive() # Now call the 'real' packet processing for a RX'x packet self.process_packet(*args, **kwargs) diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index c84b4bf..3dbab16 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -77,7 +77,11 @@ def _send_direct(packet, aprs_client=None): packet.update_timestamp() packet_log.log(packet, tx=True) - cl.send(packet) + try: + cl.send(packet) + except Exception as e: + LOG.error(f"Failed to send packet: {packet}") + LOG.error(e) class SendPacketThread(aprsd_threads.APRSDThread): @@ -232,7 +236,15 @@ class BeaconSendThread(aprsd_threads.APRSDThread): comment="APRSD GPS Beacon", symbol=CONF.beacon_symbol, ) - send(pkt, direct=True) + try: + # Only send it once + pkt.retry_count = 1 + send(pkt, direct=True) + except Exception as e: + LOG.error(f"Failed to send beacon: {e}") + client.factory.create().reset() + time.sleep(5) + self._loop_cnt += 1 time.sleep(1) return True diff --git a/aprsd/web/chat/static/js/main.js b/aprsd/web/chat/static/js/main.js index a0c505c..d940209 100644 --- a/aprsd/web/chat/static/js/main.js +++ b/aprsd/web/chat/static/js/main.js @@ -19,9 +19,10 @@ function show_aprs_icon(item, symbol) { function ord(str){return str.charCodeAt(0);} function update_stats( data ) { - $("#version").text( data["stats"]["aprsd"]["version"] ); + console.log(data); + $("#version").text( data["stats"]["APRSDStats"]["version"] ); $("#aprs_connection").html( data["aprs_connection"] ); - $("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] ); + $("#uptime").text( "uptime: " + data["stats"]["APRSDStats"]["uptime"] ); short_time = data["time"].split(/\s(.+)/)[1]; }