diff --git a/aprsd/client.py b/aprsd/client.py index de5006f..6af9d1e 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -25,6 +25,7 @@ TRANSPORT_FAKE = "fake" # Correct config factory = None + @singleton class APRSClientStats: def stats(self): diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index 10684ee..05ba8db 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -39,7 +39,7 @@ class WatchList(objectstore.ObjectStoreMixin): # is all we can do. self.data[call] = { "last": None, - "packet": None, + "packet": None, } @wrapt.synchronized(lock) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 32c7010..23433b4 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -391,7 +391,9 @@ class PluginManager: try: module_name, class_name = module_class_string.rsplit(".", 1) module = importlib.import_module(module_name) - #module = importlib.reload(module) + # Commented out because the email thread starts in a different context + # and hence gives a different singleton for the EmailStats + # module = importlib.reload(module) except Exception as ex: if not module_name: LOG.error(f"Failed to load Plugin {module_class_string}") diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 2ab12f7..c80adb2 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -81,8 +81,10 @@ class EmailStats: def tx_inc(self): self.tx += 1 + def rx_inc(self): self.rx += 1 + def email_thread_update(self): self.email_thread_last_time = datetime.datetime.now() @@ -217,10 +219,6 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase): def _imap_connect(): imap_port = CONF.email_plugin.imap_port use_ssl = CONF.email_plugin.imap_use_ssl - # host = CONFIG["aprsd"]["email"]["imap"]["host"] - # msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port) - # LOG.debug("Connect to IMAP host {} with user '{}'". - # format(msg, CONFIG['imap']['login'])) try: server = imapclient.IMAPClient( diff --git a/aprsd/stats/__init__.py b/aprsd/stats/__init__.py new file mode 100644 index 0000000..e1861c7 --- /dev/null +++ b/aprsd/stats/__init__.py @@ -0,0 +1,19 @@ +from aprsd import client as aprs_client +from aprsd import plugin +from aprsd.packets import packet_list, tracker, watch_list +from aprsd.plugins import email +from aprsd.stats import app, collector +from aprsd.threads import aprsd + + +# Create the collector and register all the objects +# that APRSD has that implement the stats protocol +stats_collector = collector.Collector() +stats_collector.register_producer(app.APRSDStats()) +stats_collector.register_producer(packet_list.PacketList()) +stats_collector.register_producer(watch_list.WatchList()) +stats_collector.register_producer(tracker.PacketTrack()) +stats_collector.register_producer(plugin.PluginManager()) +stats_collector.register_producer(aprsd.APRSDThreadList()) +stats_collector.register_producer(email.EmailStats()) +stats_collector.register_producer(aprs_client.APRSClientStats()) diff --git a/aprsd/stats/app.py b/aprsd/stats/app.py new file mode 100644 index 0000000..5382818 --- /dev/null +++ b/aprsd/stats/app.py @@ -0,0 +1,34 @@ +import datetime +import tracemalloc + +from oslo_config import cfg + +import aprsd +from aprsd import utils + + +CONF = cfg.CONF + + +@utils.singleton +class APRSDStats: + """The AppStats class is used to collect stats from the application.""" + def __init__(self): + self.start_time = datetime.datetime.now() + + @property + def uptime(self): + return datetime.datetime.now() - self.start_time + + def stats(self) -> dict: + current, peak = tracemalloc.get_traced_memory() + stats = { + "version": aprsd.__version__, + "uptime": self.uptime, + "callsign": CONF.callsign, + "memory_current": int(current), + "memory_current_str": utils.human_size(current), + "memory_peak": int(peak), + "memory_peak_str": utils.human_size(peak), + } + return stats diff --git a/aprsd/stats/collector.py b/aprsd/stats/collector.py new file mode 100644 index 0000000..c6e8c1b --- /dev/null +++ b/aprsd/stats/collector.py @@ -0,0 +1,29 @@ +from typing import Protocol + +from aprsd.utils import singleton + + +class StatsProducer(Protocol): + """The StatsProducer protocol is used to define the interface for collecting stats.""" + def stats(self) -> dict: + ... + + +@singleton +class Collector: + """The Collector class is used to collect stats from multiple StatsProducer instances.""" + def __init__(self): + self.producers: dict[str, StatsProducer] = {} + + def collect(self): + stats = {} + for name, producer in self.producers.items(): + # No need to put in empty stats + tmp_stats = producer.stats() + if tmp_stats: + stats[name] = tmp_stats + return stats + + def register_producer(self, producer: StatsProducer): + name = producer.__class__.__name__ + self.producers[name] = producer diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index fd01292..db36e17 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -7,7 +7,7 @@ from .keep_alive import KeepAliveThread # noqa: F401 from .rx import ( # noqa: F401 APRSDDupeRXThread, APRSDProcessPacketThread, APRSDRXThread, ) -from .stats import APRSDStatsStoreThread +from .stats import APRSDStatsStoreThread # noqa: F401 packet_queue = queue.Queue(maxsize=20) diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 73f1729..025cf47 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -27,7 +27,6 @@ class KeepAliveThread(APRSDThread): def loop(self): if self.loop_count % 60 == 0: stats_json = collector.Collector().collect() - #LOG.debug(stats_json) pl = packets.PacketList() thread_list = APRSDThreadList() now = datetime.datetime.now() @@ -53,11 +52,10 @@ class KeepAliveThread(APRSDThread): tx_msg = 0 rx_msg = 0 if "PacketList" in stats_json: - msg_packets = stats_json["PacketList"].get("MessagePacket") - if msg_packets: - tx_msg = msg_packets.get("tx", 0) - rx_msg = msg_packets.get("rx", 0) - + msg_packets = stats_json["PacketList"].get("MessagePacket") + if msg_packets: + tx_msg = msg_packets.get("tx", 0) + rx_msg = msg_packets.get("rx", 0) keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py new file mode 100644 index 0000000..a6517be --- /dev/null +++ b/aprsd/threads/stats.py @@ -0,0 +1,38 @@ +import logging +import threading +import time + +from oslo_config import cfg + +from aprsd.stats import collector +from aprsd.threads import APRSDThread +from aprsd.utils import objectstore + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class StatsStore(objectstore.ObjectStoreMixin): + """Container to save the stats from the collector.""" + lock = threading.Lock() + + +class APRSDStatsStoreThread(APRSDThread): + """Save APRSD Stats to disk periodically.""" + + # how often in seconds to write the file + save_interval = 10 + + def __init__(self): + super().__init__("StatsStore") + + def loop(self): + if self.loop_count % self.save_interval == 0: + stats = collector.Collector().collect() + ss = StatsStore() + ss.data = stats + ss.save() + + time.sleep(1) + return True diff --git a/tests/plugins/test_query.py b/tests/plugins/test_query.py deleted file mode 100644 index 945d650..0000000 --- a/tests/plugins/test_query.py +++ /dev/null @@ -1,54 +0,0 @@ -from unittest import mock - -from oslo_config import cfg - -from aprsd import packets -from aprsd.packets import tracker -from aprsd.plugins import query as query_plugin - -from .. import fake, test_plugin - - -CONF = cfg.CONF - - -class TestQueryPlugin(test_plugin.TestPlugin): - @mock.patch("aprsd.packets.tracker.PacketTrack.flush") - def test_query_flush(self, mock_flush): - packet = fake.fake_packet(message="!delete") - CONF.callsign = fake.FAKE_TO_CALLSIGN - CONF.save_enabled = True - CONF.query_plugin.callsign = fake.FAKE_FROM_CALLSIGN - query = query_plugin.QueryPlugin() - query.enabled = True - - expected = "Deleted ALL pending msgs." - actual = query.filter(packet) - mock_flush.assert_called_once() - self.assertEqual(expected, actual) - - @mock.patch("aprsd.packets.tracker.PacketTrack.restart_delayed") - def test_query_restart_delayed(self, mock_restart): - CONF.callsign = fake.FAKE_TO_CALLSIGN - CONF.save_enabled = True - CONF.query_plugin.callsign = fake.FAKE_FROM_CALLSIGN - track = tracker.PacketTrack() - track.data = {} - packet = fake.fake_packet(message="!4") - query = query_plugin.QueryPlugin() - - expected = "No pending msgs to resend" - actual = query.filter(packet) - mock_restart.assert_not_called() - self.assertEqual(expected, actual) - mock_restart.reset_mock() - - # add a message - pkt = packets.MessagePacket( - from_call=self.fromcall, - to_call="testing", - msgNo=self.ack, - ) - track.add(pkt) - actual = query.filter(packet) - mock_restart.assert_called_once()