mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-26 18:08:36 -05:00
Compare commits
9 Commits
f265e8f354
...
0be87d8b4f
Author | SHA1 | Date | |
---|---|---|---|
0be87d8b4f | |||
d808e217a2 | |||
7e8d7cdf86 | |||
add18f1a6f | |||
c4bf89071a | |||
df0ca04483 | |||
3fd606946d | |||
|
fbfac97140 | ||
d863474c13 |
@ -32,7 +32,11 @@ class APRSClient:
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def stats(self) -> dict:
|
def stats(self) -> dict:
|
||||||
pass
|
"""Return statistics about the client connection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Statistics about the connection and packet handling
|
||||||
|
"""
|
||||||
|
|
||||||
def set_filter(self, filter):
|
def set_filter(self, filter):
|
||||||
self.filter = filter
|
self.filter = filter
|
||||||
@ -46,22 +50,31 @@ class APRSClient:
|
|||||||
return self._client
|
return self._client
|
||||||
|
|
||||||
def _create_client(self):
|
def _create_client(self):
|
||||||
self._client = self.setup_connection()
|
try:
|
||||||
if self.filter:
|
self._client = self.setup_connection()
|
||||||
LOG.info("Creating APRS client filter")
|
if self.filter:
|
||||||
self._client.set_filter(self.filter)
|
LOG.info("Creating APRS client filter")
|
||||||
|
self._client.set_filter(self.filter)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error(f"Failed to create APRS client: {e}")
|
||||||
|
self._client = None
|
||||||
|
raise
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self._client:
|
if self._client:
|
||||||
LOG.info("Stopping client connection.")
|
LOG.info("Stopping client connection.")
|
||||||
self._client.stop()
|
self._client.stop()
|
||||||
|
|
||||||
def send(self, packet: core.Packet):
|
def send(self, packet: core.Packet) -> None:
|
||||||
"""Send a packet to the network."""
|
"""Send a packet to the network.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
packet: The APRS packet to send
|
||||||
|
"""
|
||||||
self.client.send(packet)
|
self.client.send(packet)
|
||||||
|
|
||||||
@wrapt.synchronized(lock)
|
@wrapt.synchronized(lock)
|
||||||
def reset(self):
|
def reset(self) -> None:
|
||||||
"""Call this to force a rebuild/reconnect."""
|
"""Call this to force a rebuild/reconnect."""
|
||||||
LOG.info("Resetting client connection.")
|
LOG.info("Resetting client connection.")
|
||||||
if self._client:
|
if self._client:
|
||||||
@ -76,7 +89,11 @@ class APRSClient:
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def setup_connection(self):
|
def setup_connection(self):
|
||||||
pass
|
"""Initialize and return the underlying APRS connection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
object: The initialized connection object
|
||||||
|
"""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
@ -90,7 +107,11 @@ class APRSClient:
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def decode_packet(self, *args, **kwargs):
|
def decode_packet(self, *args, **kwargs):
|
||||||
pass
|
"""Decode raw APRS packet data into a Packet object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Packet: Decoded APRS packet
|
||||||
|
"""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def consumer(self, callback, blocking=False, immortal=False, raw=False):
|
def consumer(self, callback, blocking=False, immortal=False, raw=False):
|
||||||
|
@ -11,6 +11,7 @@ from rich.table import Table
|
|||||||
import aprsd
|
import aprsd
|
||||||
from aprsd import cli_helper
|
from aprsd import cli_helper
|
||||||
from aprsd.main import cli
|
from aprsd.main import cli
|
||||||
|
from aprsd.threads.stats import StatsStore
|
||||||
|
|
||||||
|
|
||||||
# setup the global logger
|
# setup the global logger
|
||||||
@ -154,3 +155,18 @@ def fetch_stats(ctx, host, port):
|
|||||||
watch_table.add_row(key, value["last"])
|
watch_table.add_row(key, value["last"])
|
||||||
|
|
||||||
console.print(watch_table)
|
console.print(watch_table)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@cli_helper.add_options(cli_helper.common_options)
|
||||||
|
@click.pass_context
|
||||||
|
@cli_helper.process_standard_options
|
||||||
|
def dump_stats(ctx):
|
||||||
|
"""Dump the current stats from the running APRSD instance."""
|
||||||
|
console = Console()
|
||||||
|
console.print(f"APRSD Fetch-Stats started version: {aprsd.__version__}")
|
||||||
|
|
||||||
|
ss = StatsStore()
|
||||||
|
ss.load()
|
||||||
|
stats = ss.data
|
||||||
|
console.print(stats)
|
||||||
|
@ -10,12 +10,13 @@ import sys
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
from loguru import logger
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
|
|
||||||
# local imports here
|
# local imports here
|
||||||
import aprsd
|
import aprsd
|
||||||
from aprsd import cli_helper, packets, plugin, threads
|
from aprsd import cli_helper, packets, plugin, threads, utils
|
||||||
from aprsd.client import client_factory
|
from aprsd.client import client_factory
|
||||||
from aprsd.main import cli
|
from aprsd.main import cli
|
||||||
from aprsd.packets import collector as packet_collector
|
from aprsd.packets import collector as packet_collector
|
||||||
@ -24,12 +25,14 @@ from aprsd.packets import seen_list
|
|||||||
from aprsd.stats import collector
|
from aprsd.stats import collector
|
||||||
from aprsd.threads import keep_alive, rx
|
from aprsd.threads import keep_alive, rx
|
||||||
from aprsd.threads import stats as stats_thread
|
from aprsd.threads import stats as stats_thread
|
||||||
|
from aprsd.threads.aprsd import APRSDThread
|
||||||
|
|
||||||
|
|
||||||
# setup the global logger
|
# setup the global logger
|
||||||
# log.basicConfig(level=log.DEBUG) # level=10
|
# log.basicConfig(level=log.DEBUG) # level=10
|
||||||
LOG = logging.getLogger("APRSD")
|
LOG = logging.getLogger("APRSD")
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
LOGU = logger
|
||||||
console = Console()
|
console = Console()
|
||||||
|
|
||||||
|
|
||||||
@ -88,6 +91,42 @@ class APRSDListenThread(rx.APRSDRXThread):
|
|||||||
packet_collector.PacketCollector().rx(packet)
|
packet_collector.PacketCollector().rx(packet)
|
||||||
|
|
||||||
|
|
||||||
|
class ListenStatsThread(APRSDThread):
|
||||||
|
"""Log the stats from the PacketList."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("SimpleStatsLog")
|
||||||
|
self._last_total_rx = 0
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
if self.loop_count % 10 == 0:
|
||||||
|
# log the stats every 10 seconds
|
||||||
|
stats_json = collector.Collector().collect()
|
||||||
|
stats = stats_json["PacketList"]
|
||||||
|
total_rx = stats["rx"]
|
||||||
|
rx_delta = total_rx - self._last_total_rx
|
||||||
|
rate = rx_delta / 10
|
||||||
|
|
||||||
|
# Log summary stats
|
||||||
|
LOGU.opt(colors=True).info(
|
||||||
|
f"<green>RX Rate: {rate} pps</green> "
|
||||||
|
f"<yellow>Total RX: {total_rx}</yellow> "
|
||||||
|
f"<red>RX Last 10 secs: {rx_delta}</red>",
|
||||||
|
)
|
||||||
|
self._last_total_rx = total_rx
|
||||||
|
|
||||||
|
# Log individual type stats
|
||||||
|
for k, v in stats["types"].items():
|
||||||
|
thread_hex = f"fg {utils.hex_from_name(k)}"
|
||||||
|
LOGU.opt(colors=True).info(
|
||||||
|
f"<{thread_hex}>{k:<15}</{thread_hex}> "
|
||||||
|
f"<blue>RX: {v['rx']}</blue> <red>TX: {v['tx']}</red>",
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@cli_helper.add_options(cli_helper.common_options)
|
@cli_helper.add_options(cli_helper.common_options)
|
||||||
@click.option(
|
@click.option(
|
||||||
@ -195,7 +234,6 @@ def listen(
|
|||||||
aprs_client.set_filter(filter)
|
aprs_client.set_filter(filter)
|
||||||
|
|
||||||
keepalive = keep_alive.KeepAliveThread()
|
keepalive = keep_alive.KeepAliveThread()
|
||||||
# keepalive.start()
|
|
||||||
|
|
||||||
if not CONF.enable_seen_list:
|
if not CONF.enable_seen_list:
|
||||||
# just deregister the class from the packet collector
|
# just deregister the class from the packet collector
|
||||||
@ -222,6 +260,8 @@ def listen(
|
|||||||
)
|
)
|
||||||
LOG.debug("Start APRSDListenThread")
|
LOG.debug("Start APRSDListenThread")
|
||||||
listen_thread.start()
|
listen_thread.start()
|
||||||
|
listen_stats = ListenStatsThread()
|
||||||
|
listen_stats.start()
|
||||||
|
|
||||||
keepalive.start()
|
keepalive.start()
|
||||||
LOG.debug("keepalive Join")
|
LOG.debug("keepalive Join")
|
||||||
|
@ -37,9 +37,10 @@ class PacketList(objectstore.ObjectStoreMixin):
|
|||||||
self._total_rx += 1
|
self._total_rx += 1
|
||||||
self._add(packet)
|
self._add(packet)
|
||||||
ptype = packet.__class__.__name__
|
ptype = packet.__class__.__name__
|
||||||
if ptype not in self.data["types"]:
|
type_stats = self.data["types"].setdefault(
|
||||||
self.data["types"][ptype] = {"tx": 0, "rx": 0}
|
ptype, {"tx": 0, "rx": 0},
|
||||||
self.data["types"][ptype]["rx"] += 1
|
)
|
||||||
|
type_stats["rx"] += 1
|
||||||
|
|
||||||
def tx(self, packet: type[core.Packet]):
|
def tx(self, packet: type[core.Packet]):
|
||||||
"""Add a packet that was received."""
|
"""Add a packet that was received."""
|
||||||
@ -47,9 +48,10 @@ class PacketList(objectstore.ObjectStoreMixin):
|
|||||||
self._total_tx += 1
|
self._total_tx += 1
|
||||||
self._add(packet)
|
self._add(packet)
|
||||||
ptype = packet.__class__.__name__
|
ptype = packet.__class__.__name__
|
||||||
if ptype not in self.data["types"]:
|
type_stats = self.data["types"].setdefault(
|
||||||
self.data["types"][ptype] = {"tx": 0, "rx": 0}
|
ptype, {"tx": 0, "rx": 0},
|
||||||
self.data["types"][ptype]["tx"] += 1
|
)
|
||||||
|
type_stats["tx"] += 1
|
||||||
|
|
||||||
def add(self, packet):
|
def add(self, packet):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
@ -81,28 +83,16 @@ class PacketList(objectstore.ObjectStoreMixin):
|
|||||||
return self._total_tx
|
return self._total_tx
|
||||||
|
|
||||||
def stats(self, serializable=False) -> dict:
|
def stats(self, serializable=False) -> dict:
|
||||||
# limit the number of packets to return to 50
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
tmp = OrderedDict(
|
# Get last N packets directly using list slicing
|
||||||
reversed(
|
packets_list = list(self.data.get("packets", {}).values())
|
||||||
list(
|
pkts = packets_list[-CONF.packet_list_stats_maxlen:][::-1]
|
||||||
self.data.get("packets", OrderedDict()).items(),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pkts = []
|
|
||||||
count = 1
|
|
||||||
for packet in tmp:
|
|
||||||
pkts.append(tmp[packet])
|
|
||||||
count += 1
|
|
||||||
if count > CONF.packet_list_stats_maxlen:
|
|
||||||
break
|
|
||||||
|
|
||||||
stats = {
|
stats = {
|
||||||
"total_tracked": self._total_rx + self._total_rx,
|
"total_tracked": self._total_rx + self._total_tx, # Fixed typo: was rx + rx
|
||||||
"rx": self._total_rx,
|
"rx": self._total_rx,
|
||||||
"tx": self._total_tx,
|
"tx": self._total_tx,
|
||||||
"types": self.data.get("types", []),
|
"types": self.data.get("types", {}), # Changed default from [] to {}
|
||||||
"packet_count": len(self.data.get("packets", [])),
|
"packet_count": len(self.data.get("packets", [])),
|
||||||
"maxlen": self.maxlen,
|
"maxlen": self.maxlen,
|
||||||
"packets": pkts,
|
"packets": pkts,
|
||||||
|
@ -35,3 +35,8 @@ class Collector:
|
|||||||
if not isinstance(producer_name, StatsProducer):
|
if not isinstance(producer_name, StatsProducer):
|
||||||
raise TypeError(f"Producer {producer_name} is not a StatsProducer")
|
raise TypeError(f"Producer {producer_name} is not a StatsProducer")
|
||||||
self.producers.append(producer_name)
|
self.producers.append(producer_name)
|
||||||
|
|
||||||
|
def unregister_producer(self, producer_name: Callable):
|
||||||
|
if not isinstance(producer_name, StatsProducer):
|
||||||
|
raise TypeError(f"Producer {producer_name} is not a StatsProducer")
|
||||||
|
self.producers.remove(producer_name)
|
||||||
|
Loading…
Reference in New Issue
Block a user