Added Flask web thread and stats collection

This patch adds the stats object to collect statistics of
the running server.  This also optionally adds the ability
to run a flask web service on a port to use as a keepalive
healthcheck.
This commit is contained in:
Hemna 2021-01-21 20:58:47 -05:00
parent 9630279d14
commit 5c949343ec
10 changed files with 256 additions and 33 deletions

View File

@ -7,7 +7,7 @@ import re
import smtplib
import time
from aprsd import messaging, threads
from aprsd import messaging, stats, threads
import imapclient
from validate_email import validate_email
@ -269,6 +269,7 @@ def send_email(to_addr, content):
[to_addr],
msg.as_string(),
)
stats.APRSDStats().email_tx_inc()
except Exception as e:
msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg)
@ -366,6 +367,7 @@ class APRSDEmailThread(threads.APRSDThread):
past = datetime.datetime.now()
while not self.thread_stop:
time.sleep(5)
stats.APRSDStats().email_thread_update()
# always sleep for 5 seconds and see if we need to check email
# This allows CTRL-C to stop the execution of this loop sooner
# than check_email_delay time

View File

@ -1,8 +1,7 @@
import datetime
import json
import aprsd
from aprsd import messaging
from aprsd import messaging, stats
import flask
import flask_classful
@ -18,14 +17,16 @@ class APRSDFlask(flask_classful.FlaskView):
# return flask.render_template("index.html", message=msg)
def stats(self):
stats_obj = stats.APRSDStats()
track = messaging.MsgTrack()
uptime = datetime.datetime.now() - track._start_time
stats = {
result = {
"version": aprsd.__version__,
"uptime": str(uptime),
"uptime": stats_obj.uptime,
"size_tracker": len(track),
"stats": stats_obj.stats(),
}
return json.dumps(stats)
return json.dumps(result)
def init_flask(config):

View File

@ -32,7 +32,7 @@ import time
# local imports here
import aprsd
from aprsd import client, email, messaging, plugin, threads, utils
from aprsd import client, email, flask, messaging, plugin, stats, threads, utils
import aprslib
from aprslib.exceptions import LoginError
import click
@ -157,7 +157,9 @@ def signal_handler(sig, frame):
)
threads.APRSDThreadList().stop_all()
server_event.set()
time.sleep(1)
LOG.info("EXITING STATS")
LOG.info(stats.APRSDStats())
# time.sleep(1)
signal.signal(signal.SIGTERM, sys.exit(0))
@ -384,19 +386,12 @@ def send_message(
default=False,
help="Flush out all old aged messages on disk.",
)
@click.option(
"--stats-server",
is_flag=True,
default=False,
help="Run a stats web server on port 5001?",
)
def server(
loglevel,
quiet,
disable_validation,
config_file,
flush,
stats_server,
):
"""Start the aprsd server process."""
global event
@ -416,6 +411,7 @@ def server(
setup_logging(config, loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
stats.APRSDStats(config)
email_enabled = config["aprsd"]["email"].get("enabled", False)
@ -463,18 +459,25 @@ def server(
messaging.MsgTrack().restart()
cntr = 0
while not server_event.is_set():
# to keep the log noise down
if cntr % 12 == 0:
tracker = messaging.MsgTrack()
LOG.debug("KeepAlive Tracker({}): {}".format(len(tracker), str(tracker)))
cntr += 1
time.sleep(10)
keepalive = threads.KeepAliveThread()
keepalive.start()
try:
web_enabled = utils.check_config_option(config, ["aprsd", "web", "enabled"])
except Exception:
web_enabled = False
if web_enabled:
app = flask.init_flask(config)
app.run(
host=config["aprsd"]["web"]["host"],
port=config["aprsd"]["web"]["port"],
)
# If there are items in the msgTracker, then save them
tracker = messaging.MsgTrack()
tracker.save()
LOG.info(stats.APRSDStats())
LOG.info("APRSD Exiting.")

View File

@ -9,7 +9,7 @@ import re
import threading
import time
from aprsd import client, threads, utils
from aprsd import client, stats, threads, utils
LOG = logging.getLogger("APRSD")
@ -49,7 +49,7 @@ class MsgTrack:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.track = {}
cls._start_time = datetime.datetime.now()
cls._instance._start_time = datetime.datetime.now()
cls._instance.lock = threading.Lock()
return cls._instance
@ -57,6 +57,7 @@ class MsgTrack:
with self.lock:
key = int(msg.id)
self.track[key] = msg
stats.APRSDStats().msgs_tracked_inc()
self.total_messages_tracked += 1
def get(self, id):
@ -251,6 +252,7 @@ class RawMessage(Message):
fromcall=self.fromcall,
)
cl.sendall(repr(self))
stats.APRSDStats().msgs_sent_inc()
class TextMessage(Message):
@ -267,7 +269,7 @@ class TextMessage(Message):
def __repr__(self):
"""Build raw string to send over the air."""
return "{}>APRS::{}:{}{{{}\n".format(
return "{}>APZ100::{}:{}{{{}\n".format(
self.fromcall,
self.tocall.ljust(9),
self._filter_for_send(),
@ -315,6 +317,7 @@ class TextMessage(Message):
fromcall=self.fromcall,
)
cl.sendall(repr(self))
stats.APRSDStats().msgs_tx_inc()
class SendMessageThread(threads.APRSDThread):
@ -374,6 +377,7 @@ class SendMessageThread(threads.APRSDThread):
msg_num=msg.id,
)
cl.sendall(repr(msg))
stats.APRSDStats().msgs_tx_inc()
msg.last_send_time = datetime.datetime.now()
msg.last_send_attempt += 1
@ -389,7 +393,7 @@ class AckMessage(Message):
super().__init__(fromcall, tocall, msg_id=msg_id)
def __repr__(self):
return "{}>APRS::{}:ack{}\n".format(
return "{}>APZ100::{}:ack{}\n".format(
self.fromcall,
self.tocall.ljust(9),
self.id,
@ -411,6 +415,7 @@ class AckMessage(Message):
retry_number=i,
)
cl.sendall(repr(self))
stats.APRSDStats().ack_tx_inc()
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
time.sleep(31)
@ -478,6 +483,7 @@ class SendAckThread(threads.APRSDThread):
retry_number=self.ack.last_send_attempt,
)
cl.sendall(repr(self.ack))
stats.APRSDStats().ack_tx_inc()
self.ack.last_send_attempt += 1
self.ack.last_send_time = datetime.datetime.now()
time.sleep(5)

View File

@ -48,7 +48,7 @@ class LocationPlugin(plugin.APRSDPluginBase):
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
try: # altitude not always provided
alt = aprs_data["entries"][0]["altitude"]
alt = float(aprs_data["entries"][0]["altitude"])
except Exception:
alt = 0
altfeet = int(alt * 3.28084)

161
aprsd/stats.py Normal file
View File

@ -0,0 +1,161 @@
import datetime
import logging
import threading
LOG = logging.getLogger("APRSD")
class APRSDStats:
_instance = None
lock = None
config = None
start_time = None
_msgs_tracked = 0
_msgs_tx = 0
_msgs_rx = 0
_msgs_mice_rx = 0
_ack_tx = 0
_ack_rx = 0
_email_thread_last_time = None
_email_tx = 0
_email_rx = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
# any initializetion here
cls._instance.lock = threading.Lock()
cls._instance.start_time = datetime.datetime.now()
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
@property
def uptime(self):
with self.lock:
return str(datetime.datetime.now() - self.start_time)
@property
def msgs_tx(self):
with self.lock:
return self._msgs_tx
def msgs_tx_inc(self):
with self.lock:
self._msgs_tx += 1
@property
def msgs_rx(self):
with self.lock:
return self._msgs_rx
def msgs_rx_inc(self):
with self.lock:
self._msgs_rx += 1
@property
def msgs_mice_rx(self):
with self.lock:
return self._msgs_mice_rx
def msgs_mice_inc(self):
with self.lock:
self._msgs_mice_rx += 1
@property
def ack_tx(self):
with self.lock:
return self._ack_tx
def ack_tx_inc(self):
with self.lock:
self._ack_tx += 1
@property
def ack_rx(self):
with self.lock:
return self._ack_rx
def ack_rx_inc(self):
with self.lock:
self._ack_rx += 1
@property
def msgs_tracked(self):
with self.lock:
return self._msgs_tracked
def msgs_tracked_inc(self):
with self.lock:
self._msgs_tracked += 1
@property
def email_tx(self):
with self.lock:
return self._email_tx
def email_tx_inc(self):
with self.lock:
self._email_tx += 1
@property
def email_rx(self):
with self.lock:
return self._email_rx
def email_rx_inc(self):
with self.lock:
self._email_rx += 1
@property
def email_thread_time(self):
with self.lock:
return self._email_thread_last_time
def email_thread_update(self):
with self.lock:
self._email_thread_last_time = datetime.datetime.now()
def stats(self):
now = datetime.datetime.now()
stats = {
"messages": {
"tracked": self.msgs_tracked,
"sent": self.msgs_tx,
"recieved": self.msgs_rx,
"ack_sent": self.ack_tx,
"ack_recieved": self.ack_rx,
"mic-e recieved": self.msgs_mice_rx,
},
"email": {
"sent": self._email_tx,
"recieved": self._email_rx,
"thread_last_update": str(now - self._email_thread_last_time),
},
}
LOG.debug("STATS {}".format(stats))
return stats
def __str__(self):
return (
"Msgs TX:{} RX:{} ACK: TX:{} RX:{} "
"Email TX:{} RX:{} LastLoop:{} "
"Uptime: {}".format(
self._msgs_tx,
self._msgs_rx,
self._ack_tx,
self._ack_rx,
self._email_tx,
self._email_rx,
self._email_thread_last_time,
self.uptime,
)
)

View File

@ -1,10 +1,11 @@
import abc
import datetime
import logging
import queue
import threading
import time
from aprsd import client, messaging, plugin
from aprsd import client, messaging, plugin, stats
import aprslib
LOG = logging.getLogger("APRSD")
@ -63,6 +64,37 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
LOG.debug("Exiting")
class KeepAliveThread(APRSDThread):
cntr = 0
def __init__(self):
super().__init__("KeepAlive")
def loop(self):
if self.cntr % 6 == 0:
tracker = messaging.MsgTrack()
stats_obj = stats.APRSDStats()
now = datetime.datetime.now()
last_email = stats.APRSDStats().email_thread_time
if last_email:
email_thread_time = str(now - last_email)
else:
email_thread_time = "N/A"
LOG.debug(
"Tracker({}) EmailThread: {} "
" Msgs: TX:{} RX:{}".format(
len(tracker),
email_thread_time,
stats_obj.msgs_tx,
stats_obj.msgs_rx,
),
)
self.cntr += 1
time.sleep(10)
return True
class APRSDRXThread(APRSDThread):
def __init__(self, msg_queues, config):
super().__init__("RX_MSG")
@ -118,11 +150,13 @@ class APRSDRXThread(APRSDThread):
)
tracker = messaging.MsgTrack()
tracker.remove(ack_num)
stats.APRSDStats().ack_rx_inc()
return
def process_mic_e_packet(self, packet):
LOG.info("Mic-E Packet detected. Currenlty unsupported.")
messaging.log_packet(packet)
stats.APRSDStats().msgs_mice_inc()
return
def process_message_packet(self, packet):
@ -196,6 +230,7 @@ class APRSDRXThread(APRSDThread):
try:
LOG.info("Got message: {}".format(packet))
stats.APRSDStats().msgs_rx_inc()
msg = packet.get("message_text", None)
msg_format = packet.get("format", None)

View File

@ -25,6 +25,11 @@ DEFAULT_CONFIG_DICT = {
"plugin_dir": "~/.config/aprsd/plugins",
"enabled_plugins": plugin.CORE_PLUGINS,
"units": "imperial",
"web": {
"enabled": True,
"host": "0.0.0.0",
"port": 8001,
},
"email": {
"enabled": True,
"shortcuts": {

View File

@ -12,3 +12,4 @@ py3-validate-email
pre-commit
pytz
opencage
flask

View File

@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile
# pip-compile requirements.in
#
appdirs==1.4.4
# via virtualenv
@ -24,6 +24,7 @@ click==7.1.2
# via
# -r requirements.in
# click-completion
# flask
cryptography==3.3.1
# via pyopenssl
distlib==0.3.1
@ -34,6 +35,8 @@ filelock==3.0.12
# via
# py3-validate-email
# virtualenv
flask==1.1.2
# via -r requirements.in
identify==1.5.13
# via pre-commit
idna==2.10
@ -42,8 +45,12 @@ idna==2.10
# requests
imapclient==2.2.0
# via -r requirements.in
itsdangerous==1.1.0
# via flask
jinja2==2.11.2
# via click-completion
# via
# click-completion
# flask
markupsafe==1.1.1
# via jinja2
nodeenv==1.5.0
@ -64,7 +71,7 @@ pyopenssl==20.0.1
# via opencage
pytz==2020.5
# via -r requirements.in
pyyaml==5.3.1
pyyaml==5.4.1
# via
# -r requirements.in
# pre-commit
@ -91,3 +98,5 @@ urllib3==1.26.2
# via requests
virtualenv==20.4.0
# via pre-commit
werkzeug==1.0.1
# via flask