1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-12-20 16:41:13 -05:00

reworked collecting and reporting stats

This is the start of the cleanup of reporting of
packet stats
This commit is contained in:
Hemna 2022-12-18 21:44:23 -05:00
parent 9fc5356456
commit e37f99a6dd
13 changed files with 208 additions and 140 deletions

View File

@ -100,7 +100,8 @@ def send_message(
global got_ack, got_response
cl = client.factory.create()
packet = cl.decode_packet(packet)
packet.log("RX_PKT")
packets.PacketList().rx(packet)
packet.log("RX")
# LOG.debug("Got packet back {}".format(packet))
if isinstance(packet, packets.AckPacket):
got_ack = True

View File

@ -185,7 +185,6 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
fromcall = packet.from_call
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
msg = {
"id": 0,

View File

@ -380,7 +380,12 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required
def packets(self):
packet_list = packets.PacketList().get()
return json.dumps(packet_list)
tmp_list = []
for pkt in packet_list:
tmp_list.append(pkt.json)
LOG.info(f"PACKETS {tmp_list}")
return json.dumps(tmp_list)
@auth.login_required
def plugins(self):
@ -420,8 +425,8 @@ class APRSDFlask(flask_classful.FlaskView):
stats_dict["aprsd"]["watch_list"] = new_list
packet_list = packets.PacketList()
rx = packet_list.total_received()
tx = packet_list.total_sent()
rx = packet_list.total_rx()
tx = packet_list.total_tx()
stats_dict["packets"] = {
"sent": tx,
"received": rx,

View File

@ -1,6 +1,7 @@
import abc
from dataclasses import asdict, dataclass, field
import datetime
import json
import logging
import re
import time
@ -9,9 +10,11 @@ from typing import List
import dacite
from aprsd import client, stats
from aprsd import client
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.threads import tx
from aprsd.utils import counter
from aprsd.utils import json as aprsd_json
LOG = logging.getLogger("APRSD")
@ -57,16 +60,26 @@ class Packet(metaclass=abc.ABCMeta):
raw_dict: dict = field(repr=False, default_factory=lambda: {})
# Fields related to sending packets out
send_count: int = field(repr=False, default=1)
send_count: int = field(repr=False, default=0)
retry_count: int = field(repr=False, default=3)
last_send_time: datetime.timedelta = field(repr=False, default=None)
last_send_attempt: int = field(repr=False, default=0)
# Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True)
def __post__init__(self):
LOG.warning(f"POST INIT {self}")
@property
def __dict__(self):
return asdict(self)
@property
def json(self):
"""
get the json formated string
"""
return json.dumps(self.__dict__, cls=aprsd_json.EnhancedJSONEncoder)
def get(self, key, default=None):
"""Emulate a getter on a dict."""
if hasattr(self, key):
@ -122,13 +135,13 @@ class Packet(metaclass=abc.ABCMeta):
log_list = ["\n"]
name = self.__class__.__name__
if header:
if isinstance(self, AckPacket) and "tx" in header.lower():
if "tx" in header.lower():
log_list.append(
f"{header}____________({name}__"
f"TX:{self.send_count} of {self.retry_count})",
f"{header}________({name} "
f"TX:{self.send_count+1} of {self.retry_count})",
)
else:
log_list.append(f"{header}____________({name})")
log_list.append(f"{header}________({name})")
# log_list.append(f" Packet : {self.__class__.__name__}")
log_list.append(f" Raw : {self.raw}")
if self.to_call:
@ -148,7 +161,7 @@ class Packet(metaclass=abc.ABCMeta):
if self.msgNo:
log_list.append(f" Msg # : {self.msgNo}")
log_list.append(f"{header}____________({name})")
log_list.append(f"{header}________({name})")
LOG.info("\n".join(log_list))
LOG.debug(self)
@ -178,7 +191,7 @@ class Packet(metaclass=abc.ABCMeta):
cl = client.factory.create().client
self.log(header="TX Message Direct")
cl.send(self.raw)
stats.APRSDStats().msgs_tx_inc()
PacketList().tx(self)
@dataclass

View File

@ -3,7 +3,7 @@ import threading
import wrapt
from aprsd import utils
from aprsd import stats, utils
from aprsd.packets import seen_list
@ -19,8 +19,8 @@ class PacketList:
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
total_recv: int = 0
total_tx: int = 0
_total_rx: int = 0
_total_tx: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
@ -43,25 +43,27 @@ class PacketList:
@wrapt.synchronized(lock)
def rx(self, packet):
"""Add a packet that was received."""
self.total_recv += 1
self._total_rx += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)
stats.APRSDStats().rx(packet)
@wrapt.synchronized(lock)
def tx(self, packet):
"""Add a packet that was received."""
self.total_tx += 1
self._total_tx += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)
stats.APRSDStats().tx(packet)
@wrapt.synchronized(lock)
def get(self):
return self.packet_list.get()
@wrapt.synchronized(lock)
def total_received(self):
return self.total_recv
def total_rx(self):
return self._total_rx
@wrapt.synchronized(lock)
def total_sent(self):
return self.total_tx
def total_tx(self):
return self._total_tx

View File

@ -3,7 +3,6 @@ import threading
import wrapt
from aprsd import stats
from aprsd.utils import objectstore
@ -76,7 +75,6 @@ class PacketTrack(objectstore.ObjectStoreMixin):
def add(self, packet):
key = int(packet.msgNo)
self.data[key] = packet
stats.APRSDStats().msgs_tracked_inc()
self.total_tracked += 1
@wrapt.synchronized(lock)

View File

@ -21,15 +21,6 @@ class APRSDStats:
_aprsis_server = None
_aprsis_keepalive = None
_msgs_tracked = 0
_msgs_tx = 0
_msgs_rx = 0
_msgs_mice_rx = 0
_ack_tx = 0
_ack_rx = 0
_email_thread_last_time = None
_email_tx = 0
_email_rx = 0
@ -37,6 +28,37 @@ class APRSDStats:
_mem_current = 0
_mem_peak = 0
_pkt_cnt = {
"Packet": {
"tx": 0,
"rx": 0,
},
"AckPacket": {
"tx": 0,
"rx": 0,
},
"GPSPacket": {
"tx": 0,
"rx": 0,
},
"StatusPacket": {
"tx": 0,
"rx": 0,
},
"MicEPacket": {
"tx": 0,
"rx": 0,
},
"MessagePacket": {
"tx": 0,
"rx": 0,
},
"WeatherPacket": {
"tx": 0,
"rx": 0,
},
}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
@ -90,67 +112,18 @@ class APRSDStats:
def set_aprsis_keepalive(self):
self._aprsis_keepalive = datetime.datetime.now()
def rx_packet(self, packet):
if isinstance(packet, packets.MessagePacket):
self.msgs_rx_inc()
elif isinstance(packet, packets.MicEPacket):
self.msgs_mice_inc()
elif isinstance(packet, packets.AckPacket):
self.ack_rx_inc()
def rx(self, packet):
type = packet.__class__.__name__
self._pkt_cnt[type]["rx"] += 1
@wrapt.synchronized(lock)
@property
def msgs_tx(self):
return self._msgs_tx
@wrapt.synchronized(lock)
def msgs_tx_inc(self):
self._msgs_tx += 1
@wrapt.synchronized(lock)
@property
def msgs_rx(self):
return self._msgs_rx
@wrapt.synchronized(lock)
def msgs_rx_inc(self):
self._msgs_rx += 1
@wrapt.synchronized(lock)
@property
def msgs_mice_rx(self):
return self._msgs_mice_rx
@wrapt.synchronized(lock)
def msgs_mice_inc(self):
self._msgs_mice_rx += 1
@wrapt.synchronized(lock)
@property
def ack_tx(self):
return self._ack_tx
@wrapt.synchronized(lock)
def ack_tx_inc(self):
self._ack_tx += 1
@wrapt.synchronized(lock)
@property
def ack_rx(self):
return self._ack_rx
@wrapt.synchronized(lock)
def ack_rx_inc(self):
self._ack_rx += 1
def tx(self, packet):
type = packet.__class__.__name__
self._pkt_cnt[type]["tx"] += 1
@wrapt.synchronized(lock)
@property
def msgs_tracked(self):
return self._msgs_tracked
@wrapt.synchronized(lock)
def msgs_tracked_inc(self):
self._msgs_tracked += 1
return packets.PacketTrack().total_tracked
@wrapt.synchronized(lock)
@property
@ -212,11 +185,13 @@ class APRSDStats:
wl = packets.WatchList()
sl = packets.SeenList()
pl = packets.PacketList()
stats = {
"aprsd": {
"version": aprsd.__version__,
"uptime": utils.strfdelta(self.uptime),
"callsign": self.config["aprsd"]["callsign"],
"memory_current": int(self.memory),
"memory_current_str": utils.human_size(self.memory),
"memory_peak": int(self.memory_peak),
@ -229,18 +204,20 @@ class APRSDStats:
"callsign": self.config["aprs"]["login"],
"last_update": last_aprsis_keepalive,
},
"packets": {
"tracked": int(pl.total_tx() + pl.total_rx()),
"sent": int(pl.total_tx()),
"received": int(pl.total_rx()),
},
"messages": {
"tracked": int(self.msgs_tracked),
"sent": int(self.msgs_tx),
"recieved": int(self.msgs_rx),
"ack_sent": int(self.ack_tx),
"ack_recieved": int(self.ack_rx),
"mic-e recieved": int(self.msgs_mice_rx),
"sent": self._pkt_cnt["MessagePacket"]["tx"],
"received": self._pkt_cnt["MessagePacket"]["tx"],
"ack_sent": self._pkt_cnt["AckPacket"]["tx"],
},
"email": {
"enabled": self.config["aprsd"]["email"]["enabled"],
"sent": int(self._email_tx),
"recieved": int(self._email_rx),
"received": int(self._email_rx),
"thread_last_update": last_update,
},
"plugins": plugin_stats,
@ -248,15 +225,16 @@ class APRSDStats:
return stats
def __str__(self):
pl = packets.PacketList()
return (
"Uptime:{} Msgs TX:{} RX:{} "
"ACK: TX:{} RX:{} "
"Email TX:{} RX:{} LastLoop:{} ".format(
self.uptime,
self._msgs_tx,
self._msgs_rx,
self._ack_tx,
self._ack_rx,
pl.total_tx(),
pl.total_rx(),
self._pkt_cnt["AckPacket"]["tx"],
self._pkt_cnt["AckPacket"]["rx"],
self._email_tx,
self._email_rx,
self._email_thread_last_time,

View File

@ -56,11 +56,11 @@ class KeepAliveThread(APRSDThread):
).format(
login,
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
pl.total_rx(),
pl.total_tx(),
tracked_packets,
stats_obj.msgs_tx,
stats_obj.msgs_rx,
stats_obj._pkt_cnt["MessagePacket"]["tx"],
stats_obj._pkt_cnt["MessagePacket"]["rx"],
last_msg_time,
email_thread_time,
utils.human_size(current),

View File

@ -5,7 +5,7 @@ import time
import aprslib
from aprsd import client, packets, plugin, stats
from aprsd import client, packets, plugin
from aprsd.threads import APRSDThread
@ -91,7 +91,6 @@ class APRSDProcessPacketThread(APRSDThread):
LOG.info(f"Got ack for message {ack_num}")
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
stats.APRSDStats().ack_rx_inc()
return
def loop(self):
@ -130,7 +129,6 @@ class APRSDProcessPacketThread(APRSDThread):
if isinstance(packet, packets.MessagePacket):
if to_call and to_call.lower() == our_call:
# It's a MessagePacket and it's for us!
stats.APRSDStats().msgs_rx_inc()
# let any threads do their thing, then ack
# send an ack last
ack_pkt = packets.AckPacket(

View File

@ -2,7 +2,7 @@ import datetime
import logging
import time
from aprsd import client, stats
from aprsd import client
from aprsd import threads as aprsd_threads
from aprsd.packets import packet_list, tracker
@ -36,14 +36,23 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if not packet:
# The message has been removed from the tracking queue
# So it got acked and we are done.
LOG.info("Message Send Complete via Ack.")
LOG.info(
f"{packet.__class__.__name__}"
f"({packet.msgNo}) "
"Message Send Complete via Ack.",
)
return False
else:
send_now = False
if packet.last_send_attempt == packet.retry_count:
if packet.send_count == packet.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Message Send Complete. Max attempts reached.")
LOG.info(
f"{packet.__class__.__name__} "
f"({packet.msgNo}) "
"Message Send Complete. Max attempts reached"
f" {packet.retry_count}",
)
if not packet.allow_delay:
pkt_tracker.remove(packet.msgNo)
return False
@ -52,7 +61,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if packet.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
sleeptime = (packet.last_send_attempt + 1) * 31
sleeptime = (packet.send_count + 1) * 31
delta = now - packet.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
@ -66,10 +75,9 @@ class SendPacketThread(aprsd_threads.APRSDThread):
packet.log("TX")
cl = client.factory.create().client
cl.send(packet.raw)
stats.APRSDStats().msgs_tx_inc()
packet_list.PacketList().tx(packet)
packet.last_send_time = datetime.datetime.now()
packet.last_send_attempt += 1
packet.send_count += 1
time.sleep(1)
# Make sure we get called again.
@ -87,10 +95,15 @@ class SendAckThread(aprsd_threads.APRSDThread):
def loop(self):
"""Separate thread to send acks with retries."""
send_now = False
if self.packet.last_send_attempt == self.packet.retry_count:
if self.packet.send_count == self.packet.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Ack Send Complete. Max attempts reached.")
LOG.info(
f"{self.packet.__class__.__name__}"
f"({self.packet.msgNo}) "
"Send Complete. Max attempts reached"
f" {self.packet.retry_count}",
)
return False
if self.packet.last_send_time:
@ -113,10 +126,8 @@ class SendAckThread(aprsd_threads.APRSDThread):
cl = client.factory.create().client
self.packet.log("TX")
cl.send(self.packet.raw)
self.packet.send_count += 1
stats.APRSDStats().ack_tx_inc()
packet_list.PacketList().tx(self.packet)
self.packet.last_send_attempt += 1
self.packet.send_count += 1
self.packet.last_send_time = datetime.datetime.now()
time.sleep(1)

60
aprsd/utils/json.py Normal file
View File

@ -0,0 +1,60 @@
import datetime
import decimal
import json
import sys
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
args = (
"year", "month", "day", "hour", "minute",
"second", "microsecond",
)
return {
"__type__": "datetime.datetime",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.date):
args = ("year", "month", "day")
return {
"__type__": "datetime.date",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.time):
args = ("hour", "minute", "second", "microsecond")
return {
"__type__": "datetime.time",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.timedelta):
args = ("days", "seconds", "microseconds")
return {
"__type__": "datetime.timedelta",
"args": [getattr(obj, a) for a in args],
}
elif isinstance(obj, decimal.Decimal):
return {
"__type__": "decimal.Decimal",
"args": [str(obj)],
}
else:
return super().default(obj)
class EnhancedJSONDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
super().__init__(
*args, object_hook=self.object_hook,
**kwargs,
)
def object_hook(self, d):
if "__type__" not in d:
return d
o = sys.modules[__name__]
for e in d["__type__"].split("."):
o = getattr(o, e)
args, kwargs = d.get("args", ()), d.get("kwargs", {})
return o(*args, **kwargs)

View File

@ -219,6 +219,7 @@ function updateQuadData(chart, label, first, second, third, fourth) {
}
function update_stats( data ) {
our_callsign = data["stats"]["aprsd"]["callsign"];
$("#version").text( data["stats"]["aprsd"]["version"] );
$("#aprs_connection").html( data["aprs_connection"] );
$("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] );
@ -226,7 +227,7 @@ function update_stats( data ) {
$("#jsonstats").html(html_pretty);
short_time = data["time"].split(/\s(.+)/)[1];
updateDualData(packets_chart, short_time, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]);
updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["recieved"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]);
updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]);
}

View File

@ -1,5 +1,6 @@
// watchlist is a dict of ham callsign => symbol, packets
var watchlist = {};
var our_callsign = "";
function aprs_img(item, x_offset, y_offset) {
var x = x_offset * -16;
@ -107,34 +108,35 @@ function update_packets( data ) {
packetsdiv.html('')
}
jQuery.each(data, function(i, val) {
update_watchlist_from_packet(val['from'], val);
if ( packet_list.hasOwnProperty(val["ts"]) == false ) {
pkt = JSON.parse(val);
update_watchlist_from_packet(pkt['from_call'], pkt);
if ( packet_list.hasOwnProperty(val["timestamp"]) == false ) {
// Store the packet
packet_list[val["ts"]] = val;
ts_str = val["ts"].toString();
ts = ts_str.split(".")[0]*1000;
var d = new Date(ts).toLocaleDateString("en-US")
var t = new Date(ts).toLocaleTimeString("en-US")
if (val.hasOwnProperty('from') == false) {
from = val['fromcall']
title_id = 'title_tx'
packet_list[pkt["timestamp"]] = pkt;
//ts_str = val["timestamp"].toString();
//ts = ts_str.split(".")[0]*1000;
ts = pkt["timestamp"]
var d = new Date(ts).toLocaleDateString("en-US");
var t = new Date(ts).toLocaleTimeString("en-US");
var from_call = pkt['from_call'];
if (from_call == our_callsign) {
title_id = 'title_tx';
} else {
from = val['from']
title_id = 'title_rx'
title_id = 'title_rx';
}
var from_to = d + " " + t + "    " + from + " > "
var from_to = d + " " + t + "    " + from_call + " > "
if (val.hasOwnProperty('addresse')) {
from_to = from_to + val['addresse']
} else if (val.hasOwnProperty('tocall')) {
from_to = from_to + val['tocall']
} else if (val.hasOwnProperty('format') && val['format'] == 'mic-e') {
from_to = from_to + pkt['addresse']
} else if (pkt.hasOwnProperty('to_call')) {
from_to = from_to + pkt['to_call']
} else if (pkt.hasOwnProperty('format') && pkt['format'] == 'mic-e') {
from_to = from_to + "Mic-E"
}
from_to = from_to + "  -  " + val['raw']
from_to = from_to + "  -  " + pkt['raw']
json_pretty = Prism.highlight(JSON.stringify(val, null, '\t'), Prism.languages.json, 'json');
json_pretty = Prism.highlight(JSON.stringify(pkt, null, '\t'), Prism.languages.json, 'json');
pkt_html = '<div class="title" id="' + title_id + '"><i class="dropdown icon"></i>' + from_to + '</div><div class="content"><p class="transition hidden"><pre class="language-json">' + json_pretty + '</p></p></div>'
packetsdiv.prepend(pkt_html);
}