diff --git a/aprsd/client.py b/aprsd/client.py index 2561692..de5006f 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -1,4 +1,5 @@ import abc +import datetime import logging import time @@ -9,7 +10,7 @@ from oslo_config import cfg from aprsd import exception from aprsd.clients import aprsis, fake, kiss from aprsd.packets import core, packet_list -from aprsd.utils import trace +from aprsd.utils import singleton, trace CONF = cfg.CONF @@ -24,6 +25,26 @@ TRANSPORT_FAKE = "fake" # Correct config factory = None +@singleton +class APRSClientStats: + def stats(self): + client = factory.create() + stats = { + "transport": client.transport(), + "filter": client.filter, + "connected": client.connected, + } + + if client.transport() == TRANSPORT_APRSIS: + stats["server_string"] = client.client.server_string + stats["sever_keepalive"] = client.client.aprsd_keepalive + elif client.transport() == TRANSPORT_TCPKISS: + stats["host"] = CONF.kiss_tcp.host + stats["port"] = CONF.kiss_tcp.port + elif client.transport() == TRANSPORT_SERIALKISS: + stats["device"] = CONF.kiss_serial.device + return stats + class Client: """Singleton client class that constructs the aprslib connection.""" @@ -32,7 +53,6 @@ class Client: _client = None connected = False - server_string = None filter = None def __new__(cls, *args, **kwargs): @@ -43,6 +63,10 @@ class Client: cls._instance._create_client() return cls._instance + @abc.abstractmethod + def stats(self) -> dict: + pass + def set_filter(self, filter): self.filter = filter if self._client: @@ -102,11 +126,30 @@ class Client: def consumer(self, callback, blocking=False, immortal=False, raw=False): pass + @abc.abstractmethod + def is_alive(self): + pass + class APRSISClient(Client): _client = None + def __init__(self): + max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} + self.max_delta = datetime.timedelta(**max_timeout) + + def stats(self) -> dict: + stats = {} + if self.is_configured(): + stats = { + "server_string": self._client.server_string, + "sever_keepalive": self._client.aprsd_keepalive, + "filter": self.filter, + } + + return stats + @staticmethod def is_enabled(): # Defaults to True if the enabled flag is non existent @@ -138,10 +181,15 @@ class APRSISClient(Client): return True return True + def _is_stale_connection(self): + delta = datetime.datetime.now() - self._client.aprsd_keepalive + if delta > self.max_delta: + LOG.error(f"Connection is stale, last heard {delta} ago.") + return True + def is_alive(self): if self._client: - LOG.warning(f"APRS_CLIENT {self._client} alive? {self._client.is_alive()}") - return self._client.is_alive() + return self._client.is_alive() and not self._is_stale_connection() else: LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") return False @@ -159,25 +207,25 @@ class APRSISClient(Client): password = CONF.aprs_network.password host = CONF.aprs_network.host port = CONF.aprs_network.port - connected = False + self.connected = False backoff = 1 aprs_client = None - while not connected: + while not self.connected: try: LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.") aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) # Force the log to be the same aprs_client.logger = LOG aprs_client.connect() - connected = True + self.connected = True backoff = 1 except LoginError as e: LOG.error(f"Failed to login to APRS-IS Server '{e}'") - connected = False + self.connected = False time.sleep(backoff) except Exception as e: LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") - connected = False + self.connected = False time.sleep(backoff) # Don't allow the backoff to go to inifinity. if backoff > 5: @@ -201,6 +249,14 @@ class KISSClient(Client): _client = None + def stats(self) -> dict: + stats = {} + if self.is_configured(): + return { + "transport": self.transport(), + } + return stats + @staticmethod def is_enabled(): """Return if tcp or serial KISS is enabled.""" @@ -268,6 +324,7 @@ class KISSClient(Client): def setup_connection(self): self._client = kiss.KISS3Client() + self.connected = True return self._client def consumer(self, callback, blocking=False, immortal=False, raw=False): @@ -276,6 +333,9 @@ class KISSClient(Client): class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): + def stats(self) -> dict: + return {} + @staticmethod def is_enabled(): if CONF.fake_client.enabled: @@ -290,6 +350,7 @@ class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): return True def setup_connection(self): + self.connected = True return fake.APRSDFakeClient() @staticmethod @@ -329,7 +390,6 @@ class ClientFactory: key = TRANSPORT_FAKE builder = self._builders.get(key) - LOG.debug(f"ClientFactory Creating client of type '{key}'") if not builder: raise ValueError(key) return builder() diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py index 07052ac..06b2d6b 100644 --- a/aprsd/clients/aprsis.py +++ b/aprsd/clients/aprsis.py @@ -1,3 +1,4 @@ +import datetime import logging import select import threading @@ -11,7 +12,6 @@ from aprslib.exceptions import ( import wrapt import aprsd -from aprsd import stats from aprsd.packets import core @@ -24,6 +24,9 @@ class Aprsdis(aprslib.IS): # flag to tell us to stop thread_stop = False + # date for last time we heard from the server + aprsd_keepalive = datetime.datetime.now() + # timeout in seconds select_timeout = 1 lock = threading.Lock() @@ -142,7 +145,6 @@ class Aprsdis(aprslib.IS): self.logger.info(f"Connected to {server_string}") self.server_string = server_string - stats.APRSDStats().set_aprsis_server(server_string) except LoginError as e: self.logger.error(str(e)) @@ -176,13 +178,14 @@ class Aprsdis(aprslib.IS): try: for line in self._socket_readlines(blocking): if line[0:1] != b"#": + self.aprsd_keepalive = datetime.datetime.now() if raw: callback(line) else: callback(self._parse(line)) else: self.logger.debug("Server: %s", line.decode("utf8")) - stats.APRSDStats().set_aprsis_keepalive() + self.aprsd_keepalive = datetime.datetime.now() except ParseError as exp: self.logger.log( 11, diff --git a/aprsd/cmds/list_plugins.py b/aprsd/cmds/list_plugins.py index 357d725..6c4dc08 100644 --- a/aprsd/cmds/list_plugins.py +++ b/aprsd/cmds/list_plugins.py @@ -21,7 +21,7 @@ from aprsd import cli_helper from aprsd import plugin as aprsd_plugin from aprsd.main import cli from aprsd.plugins import ( - email, fortune, location, notify, ping, query, time, version, weather, + email, fortune, location, notify, ping, time, version, weather, ) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 93564ac..b30eee6 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -15,10 +15,10 @@ from rich.console import Console # local imports here import aprsd -from aprsd import cli_helper, client, packets, plugin, stats, threads +from aprsd import cli_helper, client, packets, plugin, threads from aprsd.main import cli from aprsd.packets import log as packet_log -from aprsd.rpc import server as rpc_server +from aprsd.stats import collector from aprsd.threads import rx @@ -38,7 +38,7 @@ def signal_handler(sig, frame): ), ) time.sleep(5) - LOG.info(stats.APRSDStats()) + LOG.info(collector.Collector().collect()) class APRSDListenThread(rx.APRSDRXThread): @@ -169,6 +169,7 @@ def listen( LOG.info(f"APRSD Listen Started version: {aprsd.__version__}") CONF.log_opt_values(LOG, logging.DEBUG) + collector.Collector() # Try and load saved MsgTrack list LOG.debug("Loading saved MsgTrack object.") @@ -192,10 +193,6 @@ def listen( keepalive = threads.KeepAliveThread() # keepalive.start() - if CONF.rpc_settings.enabled: - rpc = rpc_server.APRSDRPCThread() - rpc.start() - pm = None pm = plugin.PluginManager() if load_plugins: @@ -206,6 +203,8 @@ def listen( "Not Loading any plugins use --load-plugins to load what's " "defined in the config file.", ) + stats_thread = threads.APRSDStatsStoreThread() + stats_thread.start() LOG.debug("Create APRSDListenThread") listen_thread = APRSDListenThread( @@ -221,6 +220,4 @@ def listen( keepalive.join() LOG.debug("listen_thread Join") listen_thread.join() - - if CONF.rpc_settings.enabled: - rpc.join() + stats_thread.join() diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 89a6c74..c644270 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -10,7 +10,6 @@ from aprsd import cli_helper, client from aprsd import main as aprsd_main from aprsd import packets, plugin, threads, utils from aprsd.main import cli -from aprsd.rpc import server as rpc_server from aprsd.threads import registry, rx, tx @@ -47,6 +46,14 @@ def server(ctx, flush): # Initialize the client factory and create # The correct client object ready for use client.ClientFactory.setup() + if not client.factory.is_client_enabled(): + LOG.error("No Clients are enabled in config.") + sys.exit(-1) + + # Creates the client object + LOG.info("Creating client connection") + aprs_client = client.factory.create() + LOG.info(aprs_client) # Create the initial PM singleton and Register plugins # We register plugins first here so we can register each @@ -97,6 +104,9 @@ def server(ctx, flush): keepalive = threads.KeepAliveThread() keepalive.start() + stats_thread = threads.APRSDStatsStoreThread() + stats_thread.start() + rx_thread = rx.APRSDPluginRXThread( packet_queue=threads.packet_queue, ) @@ -106,7 +116,6 @@ def server(ctx, flush): rx_thread.start() process_thread.start() - packets.PacketTrack().restart() if CONF.enable_beacon: LOG.info("Beacon Enabled. Starting Beacon thread.") bcn_thread = tx.BeaconSendThread() @@ -118,8 +127,6 @@ def server(ctx, flush): registry_thread.start() if CONF.rpc_settings.enabled: - rpc = rpc_server.APRSDRPCThread() - rpc.start() log_monitor = threads.log_monitor.LogMonitorThread() log_monitor.start() diff --git a/aprsd/main.py b/aprsd/main.py index 26c228e..e6eba5b 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -35,7 +35,8 @@ from oslo_config import cfg, generator # local imports here import aprsd -from aprsd import cli_helper, packets, stats, threads, utils +from aprsd import cli_helper, packets, threads, utils +from aprsd.stats import collector # setup the global logger @@ -96,7 +97,7 @@ def signal_handler(sig, frame): packets.PacketTrack().save() packets.WatchList().save() packets.SeenList().save() - LOG.info(stats.APRSDStats()) + LOG.info(collector.Collector().collect()) # signal.signal(signal.SIGTERM, sys.exit(0)) # sys.exit(0) diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 3d75a07..4d94efb 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -6,7 +6,6 @@ import threading from oslo_config import cfg import wrapt -from aprsd import stats from aprsd.packets import seen_list @@ -38,7 +37,6 @@ class PacketList(MutableMapping): self.types[ptype] = {"tx": 0, "rx": 0} self.types[ptype]["rx"] += 1 seen_list.SeenList().update_seen(packet) - stats.APRSDStats().rx(packet) @wrapt.synchronized(lock) def tx(self, packet): @@ -50,7 +48,6 @@ class PacketList(MutableMapping): self.types[ptype] = {"tx": 0, "rx": 0} self.types[ptype]["tx"] += 1 seen_list.SeenList().update_seen(packet) - stats.APRSDStats().tx(packet) @wrapt.synchronized(lock) def add(self, packet): @@ -97,3 +94,13 @@ class PacketList(MutableMapping): @wrapt.synchronized(lock) def total_tx(self): return self._total_tx + + def stats(self) -> dict: + stats = { + "total_tracked": self.total_tx() + self.total_rx(), + "rx": self.total_rx(), + "tx": self.total_tx(), + "packets": self.types, + } + + return stats diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 1246dc1..288ac6b 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -4,7 +4,6 @@ import threading from oslo_config import cfg import wrapt -from aprsd.threads import tx from aprsd.utils import objectstore @@ -58,6 +57,22 @@ class PacketTrack(objectstore.ObjectStoreMixin): def values(self): return self.data.values() + @wrapt.synchronized(lock) + def stats(self): + stats = { + "total_tracked": self.total_tracked, + } + pkts = {} + for key in self.data: + pkts[key] = { + "last_send_time": self.data[key].last_send_time, + "last_send_attempt": self.data[key]._last_send_attempt, + "retry_count": self.data[key].retry_count, + "message": self.data[key].raw, + } + stats["packets"] = pkts + return stats + @wrapt.synchronized(lock) def __len__(self): return len(self.data) @@ -79,33 +94,3 @@ class PacketTrack(objectstore.ObjectStoreMixin): del self.data[key] except KeyError: pass - - def restart(self): - """Walk the list of messages and restart them if any.""" - for key in self.data.keys(): - pkt = self.data[key] - if pkt._last_send_attempt < pkt.retry_count: - tx.send(pkt) - - def _resend(self, packet): - packet._last_send_attempt = 0 - tx.send(packet) - - def restart_delayed(self, count=None, most_recent=True): - """Walk the list of delayed messages and restart them if any.""" - if not count: - # Send all the delayed messages - for key in self.data.keys(): - pkt = self.data[key] - if pkt._last_send_attempt == pkt._retry_count: - self._resend(pkt) - else: - # They want to resend delayed messages - tmp = sorted( - self.data.items(), - reverse=most_recent, - key=lambda x: x[1].last_send_time, - ) - pkt_list = tmp[:count] - for (_key, pkt) in pkt_list: - self._resend(pkt) diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index dee0631..10684ee 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -28,7 +28,7 @@ class WatchList(objectstore.ObjectStoreMixin): return cls._instance def __init__(self, config=None): - ring_size = CONF.watch_list.packet_keep_count + CONF.watch_list.packet_keep_count if CONF.watch_list.callsigns: for callsign in CONF.watch_list.callsigns: @@ -38,12 +38,22 @@ class WatchList(objectstore.ObjectStoreMixin): # last time a message was seen by aprs-is. For now this # is all we can do. self.data[call] = { - "last": datetime.datetime.now(), - "packets": utils.RingBuffer( - ring_size, - ), + "last": None, + "packet": None, } + @wrapt.synchronized(lock) + def stats(self) -> dict: + stats = {} + for callsign in self.data: + stats[callsign] = { + "last": self.data[callsign]["last"], + "packet": self.data[callsign]["packet"], + "age": self.age(callsign), + "old": self.is_old(callsign), + } + return stats + def is_enabled(self): return CONF.watch_list.enabled @@ -58,7 +68,7 @@ class WatchList(objectstore.ObjectStoreMixin): callsign = packet.from_call if self.callsign_in_watchlist(callsign): self.data[callsign]["last"] = datetime.datetime.now() - self.data[callsign]["packets"].append(packet) + self.data[callsign]["packet"] = packet def last_seen(self, callsign): if self.callsign_in_watchlist(callsign): diff --git a/aprsd/plugin.py b/aprsd/plugin.py index f73e0c6..32c7010 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -344,6 +344,28 @@ class PluginManager: self._watchlist_pm = pluggy.PluginManager("aprsd") self._watchlist_pm.add_hookspecs(APRSDPluginSpec) + def stats(self) -> dict: + """Collect and return stats for all plugins.""" + def full_name_with_qualname(obj): + return "{}.{}".format( + obj.__class__.__module__, + obj.__class__.__qualname__, + ) + + plugin_stats = {} + plugins = self.get_plugins() + if plugins: + + for p in plugins: + plugin_stats[full_name_with_qualname(p)] = { + "enabled": p.enabled, + "rx": p.rx_count, + "tx": p.tx_count, + "version": p.version, + } + + return plugin_stats + def is_plugin(self, obj): for c in inspect.getmro(obj): if issubclass(c, APRSDPluginBase): @@ -369,7 +391,7 @@ class PluginManager: try: module_name, class_name = module_class_string.rsplit(".", 1) module = importlib.import_module(module_name) - module = importlib.reload(module) + #module = importlib.reload(module) except Exception as ex: if not module_name: LOG.error(f"Failed to load Plugin {module_class_string}") diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index f6f273a..2ab12f7 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -11,7 +11,7 @@ import time import imapclient from oslo_config import cfg -from aprsd import packets, plugin, stats, threads +from aprsd import packets, plugin, threads, utils from aprsd.threads import tx from aprsd.utils import trace @@ -60,6 +60,33 @@ class EmailInfo: self._delay = val +@utils.singleton +class EmailStats: + """Singleton object to store stats related to email.""" + _instance = None + tx = 0 + rx = 0 + email_thread_last_time = None + + def stats(self): + if CONF.email_plugin.enabled: + stats = { + "tx": self.tx, + "rx": self.rx, + "last_check_time": self.email_thread_last_time, + } + else: + stats = {} + return stats + + def tx_inc(self): + self.tx += 1 + def rx_inc(self): + self.rx += 1 + def email_thread_update(self): + self.email_thread_last_time = datetime.datetime.now() + + class EmailPlugin(plugin.APRSDRegexCommandPluginBase): """Email Plugin.""" @@ -440,7 +467,7 @@ def send_email(to_addr, content): [to_addr], msg.as_string(), ) - stats.APRSDStats().email_tx_inc() + EmailStats().tx_inc() except Exception: LOG.exception("Sendmail Error!!!!") server.quit() @@ -545,7 +572,7 @@ class APRSDEmailThread(threads.APRSDThread): def loop(self): time.sleep(5) - stats.APRSDStats().email_thread_update() + EmailStats().email_thread_update() # always sleep for 5 seconds and see if we need to check email # This allows CTRL-C to stop the execution of this loop sooner # than check_email_delay time diff --git a/aprsd/plugins/query.py b/aprsd/plugins/query.py deleted file mode 100644 index 871b249..0000000 --- a/aprsd/plugins/query.py +++ /dev/null @@ -1,81 +0,0 @@ -import datetime -import logging -import re - -from oslo_config import cfg - -from aprsd import packets, plugin -from aprsd.packets import tracker -from aprsd.utils import trace - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -class QueryPlugin(plugin.APRSDRegexCommandPluginBase): - """Query command.""" - - command_regex = r"^\!.*" - command_name = "query" - short_description = "APRSD Owner command to query messages in the MsgTrack" - - def setup(self): - """Do any plugin setup here.""" - if not CONF.query_plugin.callsign: - LOG.error("Config query_plugin.callsign not set. Disabling plugin") - self.enabled = False - self.enabled = True - - @trace.trace - def process(self, packet: packets.MessagePacket): - LOG.info("Query COMMAND") - - fromcall = packet.from_call - message = packet.get("message_text", None) - - pkt_tracker = tracker.PacketTrack() - now = datetime.datetime.now() - reply = "Pending messages ({}) {}".format( - len(pkt_tracker), - now.strftime("%H:%M:%S"), - ) - - searchstring = "^" + CONF.query_plugin.callsign + ".*" - # only I can do admin commands - if re.search(searchstring, fromcall): - - # resend last N most recent: "!3" - r = re.search(r"^\!([0-9]).*", message) - if r is not None: - if len(pkt_tracker) > 0: - last_n = r.group(1) - reply = packets.NULL_MESSAGE - LOG.debug(reply) - pkt_tracker.restart_delayed(count=int(last_n)) - else: - reply = "No pending msgs to resend" - LOG.debug(reply) - return reply - - # resend all: "!a" - r = re.search(r"^\![aA].*", message) - if r is not None: - if len(pkt_tracker) > 0: - reply = packets.NULL_MESSAGE - LOG.debug(reply) - pkt_tracker.restart_delayed() - else: - reply = "No pending msgs" - LOG.debug(reply) - return reply - - # delete all: "!d" - r = re.search(r"^\![dD].*", message) - if r is not None: - reply = "Deleted ALL pending msgs." - LOG.debug(reply) - pkt_tracker.flush() - return reply - - return reply diff --git a/aprsd/stats.py b/aprsd/stats.py deleted file mode 100644 index cb20eb8..0000000 --- a/aprsd/stats.py +++ /dev/null @@ -1,265 +0,0 @@ -import datetime -import logging -import threading - -from oslo_config import cfg -import wrapt - -import aprsd -from aprsd import packets, plugin, utils - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -class APRSDStats: - - _instance = None - lock = threading.Lock() - - start_time = None - _aprsis_server = None - _aprsis_keepalive = None - - _email_thread_last_time = None - _email_tx = 0 - _email_rx = 0 - - _mem_current = 0 - _mem_peak = 0 - - _thread_info = {} - - _pkt_cnt = { - "Packet": { - "tx": 0, - "rx": 0, - }, - "AckPacket": { - "tx": 0, - "rx": 0, - }, - "GPSPacket": { - "tx": 0, - "rx": 0, - }, - "StatusPacket": { - "tx": 0, - "rx": 0, - }, - "MicEPacket": { - "tx": 0, - "rx": 0, - }, - "MessagePacket": { - "tx": 0, - "rx": 0, - }, - "WeatherPacket": { - "tx": 0, - "rx": 0, - }, - "ObjectPacket": { - "tx": 0, - "rx": 0, - }, - } - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - # any init here - cls._instance.start_time = datetime.datetime.now() - cls._instance._aprsis_keepalive = datetime.datetime.now() - return cls._instance - - @wrapt.synchronized(lock) - @property - def uptime(self): - return datetime.datetime.now() - self.start_time - - @wrapt.synchronized(lock) - @property - def memory(self): - return self._mem_current - - @wrapt.synchronized(lock) - def set_memory(self, memory): - self._mem_current = memory - - @wrapt.synchronized(lock) - @property - def memory_peak(self): - return self._mem_peak - - @wrapt.synchronized(lock) - def set_memory_peak(self, memory): - self._mem_peak = memory - - @wrapt.synchronized(lock) - def set_thread_info(self, thread_info): - self._thread_info = thread_info - - @wrapt.synchronized(lock) - @property - def thread_info(self): - return self._thread_info - - @wrapt.synchronized(lock) - @property - def aprsis_server(self): - return self._aprsis_server - - @wrapt.synchronized(lock) - def set_aprsis_server(self, server): - self._aprsis_server = server - - @wrapt.synchronized(lock) - @property - def aprsis_keepalive(self): - return self._aprsis_keepalive - - @wrapt.synchronized(lock) - def set_aprsis_keepalive(self): - self._aprsis_keepalive = datetime.datetime.now() - - def rx(self, packet): - pkt_type = packet.__class__.__name__ - if pkt_type not in self._pkt_cnt: - self._pkt_cnt[pkt_type] = { - "tx": 0, - "rx": 0, - } - self._pkt_cnt[pkt_type]["rx"] += 1 - - def tx(self, packet): - pkt_type = packet.__class__.__name__ - if pkt_type not in self._pkt_cnt: - self._pkt_cnt[pkt_type] = { - "tx": 0, - "rx": 0, - } - self._pkt_cnt[pkt_type]["tx"] += 1 - - @wrapt.synchronized(lock) - @property - def msgs_tracked(self): - return packets.PacketTrack().total_tracked - - @wrapt.synchronized(lock) - @property - def email_tx(self): - return self._email_tx - - @wrapt.synchronized(lock) - def email_tx_inc(self): - self._email_tx += 1 - - @wrapt.synchronized(lock) - @property - def email_rx(self): - return self._email_rx - - @wrapt.synchronized(lock) - def email_rx_inc(self): - self._email_rx += 1 - - @wrapt.synchronized(lock) - @property - def email_thread_time(self): - return self._email_thread_last_time - - @wrapt.synchronized(lock) - def email_thread_update(self): - self._email_thread_last_time = datetime.datetime.now() - - def stats(self): - now = datetime.datetime.now() - if self._email_thread_last_time: - last_update = str(now - self._email_thread_last_time) - else: - last_update = "never" - - if self._aprsis_keepalive: - last_aprsis_keepalive = str(now - self._aprsis_keepalive) - else: - last_aprsis_keepalive = "never" - - pm = plugin.PluginManager() - plugins = pm.get_plugins() - plugin_stats = {} - if plugins: - def full_name_with_qualname(obj): - return "{}.{}".format( - obj.__class__.__module__, - obj.__class__.__qualname__, - ) - - for p in plugins: - plugin_stats[full_name_with_qualname(p)] = { - "enabled": p.enabled, - "rx": p.rx_count, - "tx": p.tx_count, - "version": p.version, - } - - wl = packets.WatchList() - sl = packets.SeenList() - pl = packets.PacketList() - - stats = { - "aprsd": { - "version": aprsd.__version__, - "uptime": utils.strfdelta(self.uptime), - "callsign": CONF.callsign, - "memory_current": int(self.memory), - "memory_current_str": utils.human_size(self.memory), - "memory_peak": int(self.memory_peak), - "memory_peak_str": utils.human_size(self.memory_peak), - "threads": self._thread_info, - "watch_list": wl.get_all(), - "seen_list": sl.get_all(), - }, - "aprs-is": { - "server": str(self.aprsis_server), - "callsign": CONF.aprs_network.login, - "last_update": last_aprsis_keepalive, - }, - "packets": { - "total_tracked": int(pl.total_tx() + pl.total_rx()), - "total_sent": int(pl.total_tx()), - "total_received": int(pl.total_rx()), - "by_type": self._pkt_cnt, - }, - "messages": { - "sent": self._pkt_cnt["MessagePacket"]["tx"], - "received": self._pkt_cnt["MessagePacket"]["tx"], - "ack_sent": self._pkt_cnt["AckPacket"]["tx"], - }, - "email": { - "enabled": CONF.email_plugin.enabled, - "sent": int(self._email_tx), - "received": int(self._email_rx), - "thread_last_update": last_update, - }, - "plugins": plugin_stats, - } - return stats - - def __str__(self): - pl = packets.PacketList() - return ( - "Uptime:{} Msgs TX:{} RX:{} " - "ACK: TX:{} RX:{} " - "Email TX:{} RX:{} LastLoop:{} ".format( - self.uptime, - pl.total_tx(), - pl.total_rx(), - self._pkt_cnt["AckPacket"]["tx"], - self._pkt_cnt["AckPacket"]["rx"], - self._email_tx, - self._email_rx, - self._email_thread_last_time, - ) - ) diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index 9220a65..fd01292 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -4,7 +4,10 @@ import queue # aprsd.threads from .aprsd import APRSDThread, APRSDThreadList # noqa: F401 from .keep_alive import KeepAliveThread # noqa: F401 -from .rx import APRSDRXThread, APRSDDupeRXThread, APRSDProcessPacketThread # noqa: F401 +from .rx import ( # noqa: F401 + APRSDDupeRXThread, APRSDProcessPacketThread, APRSDRXThread, +) +from .stats import APRSDStatsStoreThread packet_queue = queue.Queue(maxsize=20) diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index d815af6..c1044d1 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -47,6 +47,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def run(self): LOG.debug("Starting") while not self._should_quit(): + self.loop_count += 1 can_loop = self.loop() self.loop_interval += 1 self._last_loop = datetime.datetime.now() @@ -71,6 +72,17 @@ class APRSDThreadList: cls.threads_list = [] return cls._instance + def stats(self) -> dict: + stats = {} + for th in self.threads_list: + stats[th.__class__.__name__] = { + "name": th.name, + "alive": th.is_alive(), + "age": th.loop_age(), + "loop_count": th.loop_count, + } + return stats + @wrapt.synchronized(lock) def add(self, thread_obj): self.threads_list.append(thread_obj) diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 940802d..73f1729 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -5,7 +5,8 @@ import tracemalloc from oslo_config import cfg -from aprsd import client, packets, stats, utils +from aprsd import client, packets, utils +from aprsd.stats import collector from aprsd.threads import APRSDThread, APRSDThreadList @@ -24,61 +25,68 @@ class KeepAliveThread(APRSDThread): self.max_delta = datetime.timedelta(**max_timeout) def loop(self): - if self.cntr % 60 == 0: - pkt_tracker = packets.PacketTrack() - stats_obj = stats.APRSDStats() + if self.loop_count % 60 == 0: + stats_json = collector.Collector().collect() + #LOG.debug(stats_json) pl = packets.PacketList() thread_list = APRSDThreadList() now = datetime.datetime.now() - last_email = stats_obj.email_thread_time - if last_email: - email_thread_time = utils.strfdelta(now - last_email) + + if "EmailStats" in stats_json: + email_stats = stats_json["EmailStats"] + if email_stats["last_check_time"]: + email_thread_time = utils.strfdelta(now - email_stats["last_check_time"]) + else: + email_thread_time = "N/A" else: email_thread_time = "N/A" - last_msg_time = utils.strfdelta(now - stats_obj.aprsis_keepalive) + if "APRSClientStats" in stats_json and stats_json["APRSClientStats"].get("transport") == "aprsis": + if stats_json["APRSClientStats"].get("server_keepalive"): + last_msg_time = utils.strfdelta(now - stats_json["APRSClientStats"]["server_keepalive"]) + else: + last_msg_time = "N/A" + else: + last_msg_time = "N/A" - current, peak = tracemalloc.get_traced_memory() - stats_obj.set_memory(current) - stats_obj.set_memory_peak(peak) + tracked_packets = stats_json["PacketTrack"]["total_tracked"] + tx_msg = 0 + rx_msg = 0 + if "PacketList" in stats_json: + msg_packets = stats_json["PacketList"].get("MessagePacket") + if msg_packets: + tx_msg = msg_packets.get("tx", 0) + rx_msg = msg_packets.get("rx", 0) - login = CONF.callsign - - tracked_packets = len(pkt_tracker) keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" ).format( - login, - utils.strfdelta(stats_obj.uptime), + stats_json["APRSDStats"]["callsign"], + stats_json["APRSDStats"]["uptime"], pl.total_rx(), pl.total_tx(), tracked_packets, - stats_obj._pkt_cnt["MessagePacket"]["tx"], - stats_obj._pkt_cnt["MessagePacket"]["rx"], + tx_msg, + rx_msg, last_msg_time, email_thread_time, - utils.human_size(current), - utils.human_size(peak), + stats_json["APRSDStats"]["memory_current_str"], + stats_json["APRSDStats"]["memory_peak_str"], len(thread_list), ) LOG.info(keepalive) - thread_out = [] - thread_info = {} - for thread in thread_list.threads_list: - alive = thread.is_alive() - age = thread.loop_age() - key = thread.__class__.__name__ - thread_out.append(f"{key}:{alive}:{age}") - if key not in thread_info: - thread_info[key] = {} - thread_info[key]["alive"] = alive - thread_info[key]["age"] = age - if not alive: - LOG.error(f"Thread {thread}") - LOG.info(",".join(thread_out)) - stats_obj.set_thread_info(thread_info) + if "APRSDThreadList" in stats_json: + thread_list = stats_json["APRSDThreadList"] + for thread_name in thread_list: + thread = thread_list[thread_name] + alive = thread["alive"] + age = thread["age"] + key = thread["name"] + if not alive: + LOG.error(f"Thread {thread}") + LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}") # check the APRS connection cl = client.factory.create() @@ -90,18 +98,18 @@ class KeepAliveThread(APRSDThread): if not cl.is_alive() and self.cntr > 0: LOG.error(f"{cl.__class__.__name__} is not alive!!! Resetting") client.factory.create().reset() - else: - # See if we should reset the aprs-is client - # Due to losing a keepalive from them - delta_dict = utils.parse_delta_str(last_msg_time) - delta = datetime.timedelta(**delta_dict) - - if delta > self.max_delta: - # We haven't gotten a keepalive from aprs-is in a while - # reset the connection.a - if not client.KISSClient.is_enabled(): - LOG.warning(f"Resetting connection to APRS-IS {delta}") - client.factory.create().reset() + # else: + # # See if we should reset the aprs-is client + # # Due to losing a keepalive from them + # delta_dict = utils.parse_delta_str(last_msg_time) + # delta = datetime.timedelta(**delta_dict) + # + # if delta > self.max_delta: + # # We haven't gotten a keepalive from aprs-is in a while + # # reset the connection.a + # if not client.KISSClient.is_enabled(): + # LOG.warning(f"Resetting connection to APRS-IS {delta}") + # client.factory.create().reset() # Check version every day delta = now - self.checker_time @@ -110,6 +118,5 @@ class KeepAliveThread(APRSDThread): level, msg = utils._check_version() if level: LOG.warning(msg) - self.cntr += 1 time.sleep(1) return True diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py index 8b93ab5..2d95d06 100644 --- a/aprsd/threads/log_monitor.py +++ b/aprsd/threads/log_monitor.py @@ -20,6 +20,11 @@ class LogEntries: cls._instance = super().__new__(cls) return cls._instance + def stats(self) -> dict: + return { + "log_entries": self.entries, + } + @wrapt.synchronized(lock) def add(self, entry): self.entries.append(entry) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index e78a204..3da957a 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -155,7 +155,6 @@ class APRSDProcessPacketThread(APRSDThread): def __init__(self, packet_queue): self.packet_queue = packet_queue super().__init__("ProcessPKT") - self._loop_cnt = 1 def process_ack_packet(self, packet): """We got an ack for a message, no need to resend it.""" @@ -178,12 +177,11 @@ class APRSDProcessPacketThread(APRSDThread): self.process_packet(packet) except queue.Empty: pass - self._loop_cnt += 1 return True def process_packet(self, packet): """Process a packet received from aprs-is server.""" - LOG.debug(f"ProcessPKT-LOOP {self._loop_cnt}") + LOG.debug(f"ProcessPKT-LOOP {self.loop_count}") our_call = CONF.callsign.lower() from_call = packet.from_call diff --git a/aprsd/utils/__init__.py b/aprsd/utils/__init__.py index 1924172..eb24fac 100644 --- a/aprsd/utils/__init__.py +++ b/aprsd/utils/__init__.py @@ -1,6 +1,7 @@ """Utilities and helper functions.""" import errno +import functools import os import re import sys @@ -22,6 +23,17 @@ else: from collections.abc import MutableMapping +def singleton(cls): + """Make a class a Singleton class (only one instance)""" + @functools.wraps(cls) + def wrapper_singleton(*args, **kwargs): + if wrapper_singleton.instance is None: + wrapper_singleton.instance = cls(*args, **kwargs) + return wrapper_singleton.instance + wrapper_singleton.instance = None + return wrapper_singleton + + def env(*vars, **kwargs): """This returns the first environment variable set. if none are non-empty, defaults to '' or keyword arg default diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index 7431b7e..1abcaf5 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -71,12 +71,13 @@ class ObjectStoreMixin: if not CONF.enable_save: return if len(self) > 0: + save_filename = self._save_filename() LOG.info( f"{self.__class__.__name__}::Saving" - f" {len(self)} entries to disk at" - f"{CONF.save_location}", + f" {len(self)} entries to disk at " + f"{save_filename}", ) - with open(self._save_filename(), "wb+") as fp: + with open(save_filename, "wb+") as fp: pickle.dump(self._dump(), fp) else: LOG.debug(