From 5c949343ec868f341c6ad3ebcff7a350afe0d620 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 21 Jan 2021 20:58:47 -0500 Subject: [PATCH] Added Flask web thread and stats collection This patch adds the stats object to collect statistics of the running server. This also optionally adds the ability to run a flask web service on a port to use as a keepalive healthcheck. --- aprsd/email.py | 4 +- aprsd/flask.py | 13 +-- aprsd/main.py | 37 +++++---- aprsd/messaging.py | 14 +++- aprsd/plugins/location.py | 2 +- aprsd/stats.py | 161 ++++++++++++++++++++++++++++++++++++++ aprsd/threads.py | 37 ++++++++- aprsd/utils.py | 5 ++ requirements.in | 1 + requirements.txt | 15 +++- 10 files changed, 256 insertions(+), 33 deletions(-) create mode 100644 aprsd/stats.py diff --git a/aprsd/email.py b/aprsd/email.py index 49d68a9..9012434 100644 --- a/aprsd/email.py +++ b/aprsd/email.py @@ -7,7 +7,7 @@ import re import smtplib import time -from aprsd import messaging, threads +from aprsd import messaging, stats, threads import imapclient from validate_email import validate_email @@ -269,6 +269,7 @@ def send_email(to_addr, content): [to_addr], msg.as_string(), ) + stats.APRSDStats().email_tx_inc() except Exception as e: msg = getattr(e, "message", repr(e)) LOG.error("Sendmail Error!!!! '{}'", msg) @@ -366,6 +367,7 @@ class APRSDEmailThread(threads.APRSDThread): past = datetime.datetime.now() while not self.thread_stop: time.sleep(5) + stats.APRSDStats().email_thread_update() # always sleep for 5 seconds and see if we need to check email # This allows CTRL-C to stop the execution of this loop sooner # than check_email_delay time diff --git a/aprsd/flask.py b/aprsd/flask.py index ee32071..19b4839 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -1,8 +1,7 @@ -import datetime import json import aprsd -from aprsd import messaging +from aprsd import messaging, stats import flask import flask_classful @@ -18,14 +17,16 @@ class APRSDFlask(flask_classful.FlaskView): # return flask.render_template("index.html", message=msg) def stats(self): + stats_obj = stats.APRSDStats() track = messaging.MsgTrack() - uptime = datetime.datetime.now() - track._start_time - stats = { + + result = { "version": aprsd.__version__, - "uptime": str(uptime), + "uptime": stats_obj.uptime, "size_tracker": len(track), + "stats": stats_obj.stats(), } - return json.dumps(stats) + return json.dumps(result) def init_flask(config): diff --git a/aprsd/main.py b/aprsd/main.py index b20c4a9..ad4df16 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -32,7 +32,7 @@ import time # local imports here import aprsd -from aprsd import client, email, messaging, plugin, threads, utils +from aprsd import client, email, flask, messaging, plugin, stats, threads, utils import aprslib from aprslib.exceptions import LoginError import click @@ -157,7 +157,9 @@ def signal_handler(sig, frame): ) threads.APRSDThreadList().stop_all() server_event.set() - time.sleep(1) + LOG.info("EXITING STATS") + LOG.info(stats.APRSDStats()) + # time.sleep(1) signal.signal(signal.SIGTERM, sys.exit(0)) @@ -384,19 +386,12 @@ def send_message( default=False, help="Flush out all old aged messages on disk.", ) -@click.option( - "--stats-server", - is_flag=True, - default=False, - help="Run a stats web server on port 5001?", -) def server( loglevel, quiet, disable_validation, config_file, flush, - stats_server, ): """Start the aprsd server process.""" global event @@ -416,6 +411,7 @@ def server( setup_logging(config, loglevel, quiet) LOG.info("APRSD Started version: {}".format(aprsd.__version__)) + stats.APRSDStats(config) email_enabled = config["aprsd"]["email"].get("enabled", False) @@ -463,18 +459,25 @@ def server( messaging.MsgTrack().restart() - cntr = 0 - while not server_event.is_set(): - # to keep the log noise down - if cntr % 12 == 0: - tracker = messaging.MsgTrack() - LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker))) - cntr += 1 - time.sleep(10) + keepalive = threads.KeepAliveThread() + keepalive.start() + + try: + web_enabled = utils.check_config_option(config, ["aprsd", "web", "enabled"]) + except Exception: + web_enabled = False + + if web_enabled: + app = flask.init_flask(config) + app.run( + host=config["aprsd"]["web"]["host"], + port=config["aprsd"]["web"]["port"], + ) # If there are items in the msgTracker, then save them tracker = messaging.MsgTrack() tracker.save() + LOG.info(stats.APRSDStats()) LOG.info("APRSD Exiting.") diff --git a/aprsd/messaging.py b/aprsd/messaging.py index a14b7ba..f84c78a 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -9,7 +9,7 @@ import re import threading import time -from aprsd import client, threads, utils +from aprsd import client, stats, threads, utils LOG = logging.getLogger("APRSD") @@ -49,7 +49,7 @@ class MsgTrack: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.track = {} - cls._start_time = datetime.datetime.now() + cls._instance._start_time = datetime.datetime.now() cls._instance.lock = threading.Lock() return cls._instance @@ -57,6 +57,7 @@ class MsgTrack: with self.lock: key = int(msg.id) self.track[key] = msg + stats.APRSDStats().msgs_tracked_inc() self.total_messages_tracked += 1 def get(self, id): @@ -251,6 +252,7 @@ class RawMessage(Message): fromcall=self.fromcall, ) cl.sendall(repr(self)) + stats.APRSDStats().msgs_sent_inc() class TextMessage(Message): @@ -267,7 +269,7 @@ class TextMessage(Message): def __repr__(self): """Build raw string to send over the air.""" - return "{}>APRS::{}:{}{{{}\n".format( + return "{}>APZ100::{}:{}{{{}\n".format( self.fromcall, self.tocall.ljust(9), self._filter_for_send(), @@ -315,6 +317,7 @@ class TextMessage(Message): fromcall=self.fromcall, ) cl.sendall(repr(self)) + stats.APRSDStats().msgs_tx_inc() class SendMessageThread(threads.APRSDThread): @@ -374,6 +377,7 @@ class SendMessageThread(threads.APRSDThread): msg_num=msg.id, ) cl.sendall(repr(msg)) + stats.APRSDStats().msgs_tx_inc() msg.last_send_time = datetime.datetime.now() msg.last_send_attempt += 1 @@ -389,7 +393,7 @@ class AckMessage(Message): super().__init__(fromcall, tocall, msg_id=msg_id) def __repr__(self): - return "{}>APRS::{}:ack{}\n".format( + return "{}>APZ100::{}:ack{}\n".format( self.fromcall, self.tocall.ljust(9), self.id, @@ -411,6 +415,7 @@ class AckMessage(Message): retry_number=i, ) cl.sendall(repr(self)) + stats.APRSDStats().ack_tx_inc() # aprs duplicate detection is 30 secs? # (21 only sends first, 28 skips middle) time.sleep(31) @@ -478,6 +483,7 @@ class SendAckThread(threads.APRSDThread): retry_number=self.ack.last_send_attempt, ) cl.sendall(repr(self.ack)) + stats.APRSDStats().ack_tx_inc() self.ack.last_send_attempt += 1 self.ack.last_send_time = datetime.datetime.now() time.sleep(5) diff --git a/aprsd/plugins/location.py b/aprsd/plugins/location.py index 63db646..fd667ef 100644 --- a/aprsd/plugins/location.py +++ b/aprsd/plugins/location.py @@ -48,7 +48,7 @@ class LocationPlugin(plugin.APRSDPluginBase): lat = aprs_data["entries"][0]["lat"] lon = aprs_data["entries"][0]["lng"] try: # altitude not always provided - alt = aprs_data["entries"][0]["altitude"] + alt = float(aprs_data["entries"][0]["altitude"]) except Exception: alt = 0 altfeet = int(alt * 3.28084) diff --git a/aprsd/stats.py b/aprsd/stats.py new file mode 100644 index 0000000..c4345e2 --- /dev/null +++ b/aprsd/stats.py @@ -0,0 +1,161 @@ +import datetime +import logging +import threading + +LOG = logging.getLogger("APRSD") + + +class APRSDStats: + + _instance = None + lock = None + config = None + + start_time = None + + _msgs_tracked = 0 + _msgs_tx = 0 + _msgs_rx = 0 + + _msgs_mice_rx = 0 + + _ack_tx = 0 + _ack_rx = 0 + + _email_thread_last_time = None + _email_tx = 0 + _email_rx = 0 + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + # any initializetion here + cls._instance.lock = threading.Lock() + cls._instance.start_time = datetime.datetime.now() + return cls._instance + + def __init__(self, config=None): + if config: + self.config = config + + @property + def uptime(self): + with self.lock: + return str(datetime.datetime.now() - self.start_time) + + @property + def msgs_tx(self): + with self.lock: + return self._msgs_tx + + def msgs_tx_inc(self): + with self.lock: + self._msgs_tx += 1 + + @property + def msgs_rx(self): + with self.lock: + return self._msgs_rx + + def msgs_rx_inc(self): + with self.lock: + self._msgs_rx += 1 + + @property + def msgs_mice_rx(self): + with self.lock: + return self._msgs_mice_rx + + def msgs_mice_inc(self): + with self.lock: + self._msgs_mice_rx += 1 + + @property + def ack_tx(self): + with self.lock: + return self._ack_tx + + def ack_tx_inc(self): + with self.lock: + self._ack_tx += 1 + + @property + def ack_rx(self): + with self.lock: + return self._ack_rx + + def ack_rx_inc(self): + with self.lock: + self._ack_rx += 1 + + @property + def msgs_tracked(self): + with self.lock: + return self._msgs_tracked + + def msgs_tracked_inc(self): + with self.lock: + self._msgs_tracked += 1 + + @property + def email_tx(self): + with self.lock: + return self._email_tx + + def email_tx_inc(self): + with self.lock: + self._email_tx += 1 + + @property + def email_rx(self): + with self.lock: + return self._email_rx + + def email_rx_inc(self): + with self.lock: + self._email_rx += 1 + + @property + def email_thread_time(self): + with self.lock: + return self._email_thread_last_time + + def email_thread_update(self): + with self.lock: + self._email_thread_last_time = datetime.datetime.now() + + def stats(self): + now = datetime.datetime.now() + stats = { + "messages": { + "tracked": self.msgs_tracked, + "sent": self.msgs_tx, + "recieved": self.msgs_rx, + "ack_sent": self.ack_tx, + "ack_recieved": self.ack_rx, + "mic-e recieved": self.msgs_mice_rx, + }, + "email": { + "sent": self._email_tx, + "recieved": self._email_rx, + "thread_last_update": str(now - self._email_thread_last_time), + }, + } + LOG.debug("STATS {}".format(stats)) + return stats + + def __str__(self): + return ( + "Msgs TX:{} RX:{} ACK: TX:{} RX:{} " + "Email TX:{} RX:{} LastLoop:{} " + "Uptime: {}".format( + self._msgs_tx, + self._msgs_rx, + self._ack_tx, + self._ack_rx, + self._email_tx, + self._email_rx, + self._email_thread_last_time, + self.uptime, + ) + ) diff --git a/aprsd/threads.py b/aprsd/threads.py index d962aa0..b4b6ca6 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -1,10 +1,11 @@ import abc +import datetime import logging import queue import threading import time -from aprsd import client, messaging, plugin +from aprsd import client, messaging, plugin, stats import aprslib LOG = logging.getLogger("APRSD") @@ -63,6 +64,37 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): LOG.debug("Exiting") +class KeepAliveThread(APRSDThread): + cntr = 0 + + def __init__(self): + super().__init__("KeepAlive") + + def loop(self): + if self.cntr % 6 == 0: + tracker = messaging.MsgTrack() + stats_obj = stats.APRSDStats() + now = datetime.datetime.now() + last_email = stats.APRSDStats().email_thread_time + if last_email: + email_thread_time = str(now - last_email) + else: + email_thread_time = "N/A" + + LOG.debug( + "Tracker({}) EmailThread: {} " + " Msgs: TX:{} RX:{}".format( + len(tracker), + email_thread_time, + stats_obj.msgs_tx, + stats_obj.msgs_rx, + ), + ) + self.cntr += 1 + time.sleep(10) + return True + + class APRSDRXThread(APRSDThread): def __init__(self, msg_queues, config): super().__init__("RX_MSG") @@ -118,11 +150,13 @@ class APRSDRXThread(APRSDThread): ) tracker = messaging.MsgTrack() tracker.remove(ack_num) + stats.APRSDStats().ack_rx_inc() return def process_mic_e_packet(self, packet): LOG.info("Mic-E Packet detected. Currenlty unsupported.") messaging.log_packet(packet) + stats.APRSDStats().msgs_mice_inc() return def process_message_packet(self, packet): @@ -196,6 +230,7 @@ class APRSDRXThread(APRSDThread): try: LOG.info("Got message: {}".format(packet)) + stats.APRSDStats().msgs_rx_inc() msg = packet.get("message_text", None) msg_format = packet.get("format", None) diff --git a/aprsd/utils.py b/aprsd/utils.py index 9824667..182ef89 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -25,6 +25,11 @@ DEFAULT_CONFIG_DICT = { "plugin_dir": "~/.config/aprsd/plugins", "enabled_plugins": plugin.CORE_PLUGINS, "units": "imperial", + "web": { + "enabled": True, + "host": "0.0.0.0", + "port": 8001, + }, "email": { "enabled": True, "shortcuts": { diff --git a/requirements.in b/requirements.in index 6164b3e..bca803b 100644 --- a/requirements.in +++ b/requirements.in @@ -12,3 +12,4 @@ py3-validate-email pre-commit pytz opencage +flask diff --git a/requirements.txt b/requirements.txt index ad84d96..9e34f8b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile # To update, run: # -# pip-compile +# pip-compile requirements.in # appdirs==1.4.4 # via virtualenv @@ -24,6 +24,7 @@ click==7.1.2 # via # -r requirements.in # click-completion + # flask cryptography==3.3.1 # via pyopenssl distlib==0.3.1 @@ -34,6 +35,8 @@ filelock==3.0.12 # via # py3-validate-email # virtualenv +flask==1.1.2 + # via -r requirements.in identify==1.5.13 # via pre-commit idna==2.10 @@ -42,8 +45,12 @@ idna==2.10 # requests imapclient==2.2.0 # via -r requirements.in +itsdangerous==1.1.0 + # via flask jinja2==2.11.2 - # via click-completion + # via + # click-completion + # flask markupsafe==1.1.1 # via jinja2 nodeenv==1.5.0 @@ -64,7 +71,7 @@ pyopenssl==20.0.1 # via opencage pytz==2020.5 # via -r requirements.in -pyyaml==5.3.1 +pyyaml==5.4.1 # via # -r requirements.in # pre-commit @@ -91,3 +98,5 @@ urllib3==1.26.2 # via requests virtualenv==20.4.0 # via pre-commit +werkzeug==1.0.1 + # via flask