mirror of
https://github.com/craigerl/aprsd.git
synced 2026-01-18 11:35:33 -05:00
Added new StatsLogThread
This thread collects stats and outputs to the log every 60 seconds.
This commit is contained in:
parent
e5644cc49d
commit
40f8d23db2
@ -3,8 +3,10 @@
|
||||
#
|
||||
|
||||
# python included libs
|
||||
import cProfile
|
||||
import datetime
|
||||
import logging
|
||||
import pstats
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
@ -20,7 +22,7 @@ import aprsd
|
||||
from aprsd import cli_helper, packets, plugin, threads, utils
|
||||
from aprsd.client.client import APRSDClient
|
||||
from aprsd.main import cli
|
||||
from aprsd.packets import core, seen_list
|
||||
from aprsd.packets import core
|
||||
from aprsd.packets import log as packet_log
|
||||
from aprsd.packets.filter import PacketFilter
|
||||
from aprsd.packets.filters import dupe_filter, packet_type
|
||||
@ -28,6 +30,7 @@ from aprsd.stats import collector
|
||||
from aprsd.threads import keepalive, rx
|
||||
from aprsd.threads import stats as stats_thread
|
||||
from aprsd.threads.aprsd import APRSDThread
|
||||
from aprsd.threads.stats import StatsLogThread
|
||||
|
||||
# setup the global logger
|
||||
# log.basicConfig(level=log.DEBUG) # level=10
|
||||
@ -81,109 +84,6 @@ class APRSDListenProcessThread(rx.APRSDFilterThread):
|
||||
self.plugin_manager.run(packet)
|
||||
|
||||
|
||||
class ListenStatsThread(APRSDThread):
|
||||
"""Log the stats from the PacketList."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('PacketStatsLog')
|
||||
self._last_total_rx = 0
|
||||
self.period = 10
|
||||
self.start_time = time.time()
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.period == 0:
|
||||
# log the stats every 10 seconds
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
stats = stats_json['PacketList']
|
||||
total_rx = stats['rx']
|
||||
rx_delta = total_rx - self._last_total_rx
|
||||
rate = rx_delta / self.period
|
||||
|
||||
# Get unique callsigns count from SeenList stats
|
||||
seen_list_instance = seen_list.SeenList()
|
||||
# stats() returns data while holding lock internally, so copy it immediately
|
||||
seen_list_stats = seen_list_instance.stats()
|
||||
seen_list_instance.save()
|
||||
# Copy the stats to avoid holding references to locked data
|
||||
seen_list_stats = seen_list_stats.copy()
|
||||
unique_callsigns_count = len(seen_list_stats)
|
||||
|
||||
# Calculate uptime
|
||||
elapsed = time.time() - self.start_time
|
||||
elapsed_minutes = elapsed / 60
|
||||
elapsed_hours = elapsed / 3600
|
||||
|
||||
# Log summary stats
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<green>RX Rate: {rate:.2f} pps</green> '
|
||||
f'<yellow>Total RX: {total_rx}</yellow> '
|
||||
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
|
||||
)
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<cyan>Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h)</cyan> '
|
||||
f'<magenta>Unique Callsigns: {unique_callsigns_count}</magenta>',
|
||||
)
|
||||
self._last_total_rx = total_rx
|
||||
|
||||
# Log individual type stats, sorted by RX count (descending)
|
||||
sorted_types = sorted(
|
||||
stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
|
||||
)
|
||||
for k, v in sorted_types:
|
||||
# Calculate percentage of this packet type compared to total RX
|
||||
percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
|
||||
# Format values first, then apply colors
|
||||
packet_type_str = f'{k:<15}'
|
||||
rx_count_str = f'{v["rx"]:6d}'
|
||||
tx_count_str = f'{v["tx"]:6d}'
|
||||
percentage_str = f'{percentage:5.1f}%'
|
||||
# Use different colors for RX count based on threshold (matching mqtt_injest.py)
|
||||
rx_color_tag = (
|
||||
'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
|
||||
)
|
||||
LOGU.opt(colors=True).info(
|
||||
f' <cyan>{packet_type_str}</cyan>: '
|
||||
f'<{rx_color_tag}>RX: {rx_count_str}</{rx_color_tag}> '
|
||||
f'<red>TX: {tx_count_str}</red> '
|
||||
f'<magenta>({percentage_str})</magenta>',
|
||||
)
|
||||
|
||||
# Extract callsign counts from seen_list stats
|
||||
callsign_counts = {}
|
||||
for callsign, data in seen_list_stats.items():
|
||||
if isinstance(data, dict) and 'count' in data:
|
||||
callsign_counts[callsign] = data['count']
|
||||
|
||||
# Sort callsigns by packet count (descending) and get top 10
|
||||
sorted_callsigns = sorted(
|
||||
callsign_counts.items(), key=lambda x: x[1], reverse=True
|
||||
)[:10]
|
||||
|
||||
# Log top 10 callsigns
|
||||
if sorted_callsigns:
|
||||
LOGU.opt(colors=True).info(
|
||||
'<cyan>Top 10 Callsigns by Packet Count:</cyan>'
|
||||
)
|
||||
total_ranks = len(sorted_callsigns)
|
||||
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
|
||||
# Use different colors based on rank: most packets (rank 1) = red,
|
||||
# least packets (last rank) = green, middle = yellow
|
||||
if rank == 1:
|
||||
count_color_tag = 'red'
|
||||
elif rank == total_ranks:
|
||||
count_color_tag = 'green'
|
||||
else:
|
||||
count_color_tag = 'yellow'
|
||||
LOGU.opt(colors=True).info(
|
||||
f' <cyan>{rank:2d}.</cyan> '
|
||||
f'<white>{callsign:<12}</white>: '
|
||||
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}>',
|
||||
)
|
||||
|
||||
time.sleep(1)
|
||||
return True
|
||||
|
||||
|
||||
class StatsExportThread(APRSDThread):
|
||||
"""Export stats to remote aprsd-exporter API."""
|
||||
|
||||
@ -304,6 +204,12 @@ class StatsExportThread(APRSDThread):
|
||||
default='http://localhost:8081',
|
||||
help='URL of the aprsd-exporter API to send stats to.',
|
||||
)
|
||||
@click.option(
|
||||
'--profile',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='Enable Python cProfile profiling to identify performance bottlenecks.',
|
||||
)
|
||||
@click.pass_context
|
||||
@cli_helper.process_standard_options
|
||||
def listen(
|
||||
@ -318,6 +224,7 @@ def listen(
|
||||
enable_packet_stats,
|
||||
export_stats,
|
||||
exporter_url,
|
||||
profile,
|
||||
):
|
||||
"""Listen to packets on the APRS-IS Network based on FILTER.
|
||||
|
||||
@ -329,6 +236,13 @@ def listen(
|
||||
o/obj1/obj2... - Object Filter Pass all objects with the exact name of obj1, obj2, ... (* wild card allowed)\n
|
||||
|
||||
"""
|
||||
# Initialize profiler if enabled
|
||||
profiler = None
|
||||
if profile:
|
||||
LOG.info('Starting Python cProfile profiling')
|
||||
profiler = cProfile.Profile()
|
||||
profiler.enable()
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
@ -435,8 +349,8 @@ def listen(
|
||||
|
||||
LOG.debug(f'enable_packet_stats: {enable_packet_stats}')
|
||||
if enable_packet_stats:
|
||||
LOG.debug('Start ListenStatsThread')
|
||||
listen_stats = ListenStatsThread()
|
||||
LOG.debug('Start StatsLogThread')
|
||||
listen_stats = StatsLogThread()
|
||||
listen_stats.start()
|
||||
|
||||
LOG.debug(f'export_stats: {export_stats}')
|
||||
@ -454,3 +368,25 @@ def listen(
|
||||
stats.join()
|
||||
if stats_export:
|
||||
stats_export.join()
|
||||
|
||||
# Save profiling results if enabled
|
||||
if profiler:
|
||||
profiler.disable()
|
||||
profile_file = 'aprsd_listen_profile.prof'
|
||||
profiler.dump_stats(profile_file)
|
||||
LOG.info(f'Profile saved to {profile_file}')
|
||||
|
||||
# Print profiling summary
|
||||
LOG.info('Profile Summary (top 50 functions by cumulative time):')
|
||||
stats = pstats.Stats(profiler)
|
||||
stats.sort_stats('cumulative')
|
||||
|
||||
# Log the top functions
|
||||
LOG.info('-' * 80)
|
||||
for item in stats.get_stats().items()[:50]:
|
||||
func_info, stats_tuple = item
|
||||
cumulative = stats_tuple[3]
|
||||
total_calls = stats_tuple[0]
|
||||
LOG.info(
|
||||
f'{func_info} - Calls: {total_calls}, Cumulative: {cumulative:.4f}s'
|
||||
)
|
||||
|
||||
@ -15,6 +15,7 @@ from aprsd.packets import collector as packet_collector
|
||||
from aprsd.packets import seen_list
|
||||
from aprsd.threads import keepalive, registry, rx, service, tx
|
||||
from aprsd.threads import stats as stats_thread
|
||||
from aprsd.threads.stats import StatsLogThread
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger('APRSD')
|
||||
@ -42,9 +43,15 @@ def _is_aprsd_gps_extension_installed():
|
||||
default=False,
|
||||
help='Flush out all old aged messages on disk.',
|
||||
)
|
||||
@click.option(
|
||||
'--enable-packet-stats',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='Enable packet stats periodic logging.',
|
||||
)
|
||||
@click.pass_context
|
||||
@cli_helper.process_standard_options
|
||||
def server(ctx, flush):
|
||||
def server(ctx, flush, enable_packet_stats):
|
||||
"""Start the aprsd server gateway process."""
|
||||
signal.signal(signal.SIGINT, aprsd_main.signal_handler)
|
||||
signal.signal(signal.SIGTERM, aprsd_main.signal_handler)
|
||||
@ -165,6 +172,11 @@ def server(ctx, flush):
|
||||
LOG.info('Registry Enabled. Starting Registry thread.')
|
||||
service_threads.register(registry.APRSRegistryThread())
|
||||
|
||||
if enable_packet_stats:
|
||||
LOG.debug('Start StatsLogThread')
|
||||
listen_stats = StatsLogThread()
|
||||
listen_stats.start()
|
||||
|
||||
service_threads.start()
|
||||
service_threads.join()
|
||||
|
||||
|
||||
@ -7,5 +7,6 @@ from .rx import ( # noqa: F401
|
||||
APRSDProcessPacketThread,
|
||||
APRSDRXThread,
|
||||
)
|
||||
from .stats import APRSDStatsStoreThread, StatsLogThread # noqa: F401
|
||||
|
||||
packet_queue = queue.Queue(maxsize=500)
|
||||
|
||||
@ -2,13 +2,15 @@ import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
from oslo_config import cfg
|
||||
from aprsd.stats import collector
|
||||
|
||||
from aprsd.threads import APRSDThread
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger('APRSD')
|
||||
LOGU = logger
|
||||
|
||||
|
||||
class StatsStore(objectstore.ObjectStoreMixin):
|
||||
@ -33,6 +35,9 @@ class APRSDStatsStoreThread(APRSDThread):
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.save_interval == 0:
|
||||
# Lazy import to avoid circular dependency
|
||||
from aprsd.stats import collector
|
||||
|
||||
stats = collector.Collector().collect()
|
||||
ss = StatsStore()
|
||||
ss.add(stats)
|
||||
@ -40,3 +45,110 @@ class APRSDStatsStoreThread(APRSDThread):
|
||||
|
||||
time.sleep(1)
|
||||
return True
|
||||
|
||||
|
||||
class StatsLogThread(APRSDThread):
|
||||
"""Log the stats from the PacketList."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('PacketStatsLog')
|
||||
self._last_total_rx = 0
|
||||
self.period = 10
|
||||
self.start_time = time.time()
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.period == 0:
|
||||
# Lazy imports to avoid circular dependency
|
||||
from aprsd.packets import seen_list
|
||||
from aprsd.stats import collector
|
||||
|
||||
# log the stats every 10 seconds
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
stats = stats_json['PacketList']
|
||||
total_rx = stats['rx']
|
||||
rx_delta = total_rx - self._last_total_rx
|
||||
rate = rx_delta / self.period
|
||||
|
||||
# Get unique callsigns count from SeenList stats
|
||||
seen_list_instance = seen_list.SeenList()
|
||||
# stats() returns data while holding lock internally, so copy it immediately
|
||||
seen_list_stats = seen_list_instance.stats()
|
||||
seen_list_instance.save()
|
||||
# Copy the stats to avoid holding references to locked data
|
||||
seen_list_stats = seen_list_stats.copy()
|
||||
unique_callsigns_count = len(seen_list_stats)
|
||||
|
||||
# Calculate uptime
|
||||
elapsed = time.time() - self.start_time
|
||||
elapsed_minutes = elapsed / 60
|
||||
elapsed_hours = elapsed / 3600
|
||||
|
||||
# Log summary stats
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<green>RX Rate: {rate:.2f} pps</green> '
|
||||
f'<yellow>Total RX: {total_rx}</yellow> '
|
||||
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
|
||||
)
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<cyan>Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h)</cyan> '
|
||||
f'<magenta>Unique Callsigns: {unique_callsigns_count}</magenta>',
|
||||
)
|
||||
self._last_total_rx = total_rx
|
||||
|
||||
# Log individual type stats, sorted by RX count (descending)
|
||||
sorted_types = sorted(
|
||||
stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
|
||||
)
|
||||
for k, v in sorted_types:
|
||||
# Calculate percentage of this packet type compared to total RX
|
||||
percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
|
||||
# Format values first, then apply colors
|
||||
packet_type_str = f'{k:<15}'
|
||||
rx_count_str = f'{v["rx"]:6d}'
|
||||
tx_count_str = f'{v["tx"]:6d}'
|
||||
percentage_str = f'{percentage:5.1f}%'
|
||||
# Use different colors for RX count based on threshold (matching mqtt_injest.py)
|
||||
rx_color_tag = (
|
||||
'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
|
||||
)
|
||||
LOGU.opt(colors=True).info(
|
||||
f' <cyan>{packet_type_str}</cyan>: '
|
||||
f'<{rx_color_tag}>RX: {rx_count_str}</{rx_color_tag}> '
|
||||
f'<red>TX: {tx_count_str}</red> '
|
||||
f'<magenta>({percentage_str})</magenta>',
|
||||
)
|
||||
|
||||
# Extract callsign counts from seen_list stats
|
||||
callsign_counts = {}
|
||||
for callsign, data in seen_list_stats.items():
|
||||
if isinstance(data, dict) and 'count' in data:
|
||||
callsign_counts[callsign] = data['count']
|
||||
|
||||
# Sort callsigns by packet count (descending) and get top 10
|
||||
sorted_callsigns = sorted(
|
||||
callsign_counts.items(), key=lambda x: x[1], reverse=True
|
||||
)[:10]
|
||||
|
||||
# Log top 10 callsigns
|
||||
if sorted_callsigns:
|
||||
LOGU.opt(colors=True).info(
|
||||
'<cyan>Top 10 Callsigns by Packet Count:</cyan>'
|
||||
)
|
||||
total_ranks = len(sorted_callsigns)
|
||||
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
|
||||
# Use different colors based on rank: most packets (rank 1) = red,
|
||||
# least packets (last rank) = green, middle = yellow
|
||||
if rank == 1:
|
||||
count_color_tag = 'red'
|
||||
elif rank == total_ranks:
|
||||
count_color_tag = 'green'
|
||||
else:
|
||||
count_color_tag = 'yellow'
|
||||
LOGU.opt(colors=True).info(
|
||||
f' <cyan>{rank:2d}.</cyan> '
|
||||
f'<white>{callsign:<12}</white>: '
|
||||
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}>',
|
||||
)
|
||||
|
||||
time.sleep(1)
|
||||
return True
|
||||
|
||||
@ -60,18 +60,27 @@ def get_module_info(package_name, module_name, module_path):
|
||||
for path, _subdirs, files in os.walk(dir_path):
|
||||
for name in files:
|
||||
if fnmatch.fnmatch(name, pattern):
|
||||
module = smuggle(f'{path}/{name}')
|
||||
for mem_name, obj in inspect.getmembers(module):
|
||||
if inspect.isclass(obj) and is_plugin(obj):
|
||||
obj_list.append(
|
||||
{
|
||||
'package': package_name,
|
||||
'name': mem_name,
|
||||
'obj': obj,
|
||||
'version': obj.version,
|
||||
'path': f'{".".join([module_name, obj.__name__])}',
|
||||
},
|
||||
)
|
||||
# Skip __init__.py files as they often have relative imports
|
||||
# that don't work when imported directly via smuggle
|
||||
if name == '__init__.py':
|
||||
continue
|
||||
try:
|
||||
module = smuggle(f'{path}/{name}')
|
||||
for mem_name, obj in inspect.getmembers(module):
|
||||
if inspect.isclass(obj) and is_plugin(obj):
|
||||
obj_list.append(
|
||||
{
|
||||
'package': package_name,
|
||||
'name': mem_name,
|
||||
'obj': obj,
|
||||
'version': obj.version,
|
||||
'path': f'{".".join([module_name, obj.__name__])}',
|
||||
},
|
||||
)
|
||||
except (ImportError, SyntaxError, AttributeError) as e:
|
||||
# Skip files that can't be imported (relative imports, syntax errors, etc.)
|
||||
LOG.debug(f'Could not import {path}/{name}: {e}')
|
||||
continue
|
||||
|
||||
return obj_list
|
||||
|
||||
|
||||
8
tox.ini
8
tox.ini
@ -1,8 +1,8 @@
|
||||
[tox]
|
||||
minversion = 2.9.0
|
||||
minversion = 4.30.0
|
||||
skipdist = True
|
||||
skip_missing_interpreters = true
|
||||
envlist = pep8,py{311}
|
||||
envlist = pep8,py{310,311,312,313,314}
|
||||
#requires = tox-pipenv
|
||||
# pip==22.0.4
|
||||
# pip-tools==5.4.0
|
||||
@ -25,7 +25,7 @@ deps =
|
||||
pytest-cov
|
||||
pytest
|
||||
commands =
|
||||
pytest -s -v --cov-report term-missing --cov=aprsd {posargs}
|
||||
pytest -v --cov-report term-missing --cov=aprsd {posargs}
|
||||
coverage: coverage report -m
|
||||
coverage: coverage xml
|
||||
|
||||
@ -45,7 +45,6 @@ commands =
|
||||
#sphinx-build -a -W . _build
|
||||
sphinx-build -M html source build
|
||||
|
||||
|
||||
[testenv:pep8]
|
||||
deps =
|
||||
flake8
|
||||
@ -80,7 +79,6 @@ exclude = .venv,.git,.tox,dist,doc,.ropeproject
|
||||
# This section is not needed if not using GitHub Actions for CI.
|
||||
[gh-actions]
|
||||
python =
|
||||
3.9: py39, pep8, type-check, docs
|
||||
3.10: py39, pep8, type-check, docs
|
||||
3.11: py311, pep8, type-check, docs
|
||||
|
||||
|
||||
6
uv.lock
generated
6
uv.lock
generated
@ -38,6 +38,7 @@ dependencies = [
|
||||
{ name = "rfc3986" },
|
||||
{ name = "rich" },
|
||||
{ name = "rush" },
|
||||
{ name = "setuptools" },
|
||||
{ name = "stevedore" },
|
||||
{ name = "thesmuggler" },
|
||||
{ name = "timeago" },
|
||||
@ -63,6 +64,7 @@ dev = [
|
||||
{ name = "identify" },
|
||||
{ name = "nodeenv" },
|
||||
{ name = "packaging" },
|
||||
{ name = "pip" },
|
||||
{ name = "pip-tools" },
|
||||
{ name = "platformdirs" },
|
||||
{ name = "pluggy" },
|
||||
@ -70,6 +72,7 @@ dev = [
|
||||
{ name = "pyproject-api" },
|
||||
{ name = "pyproject-hooks" },
|
||||
{ name = "pyyaml" },
|
||||
{ name = "setuptools" },
|
||||
{ name = "tomli" },
|
||||
{ name = "tox" },
|
||||
{ name = "typing-extensions" },
|
||||
@ -112,6 +115,7 @@ requires-dist = [
|
||||
{ name = "packaging", specifier = "==25.0" },
|
||||
{ name = "packaging", marker = "extra == 'dev'", specifier = "==25.0" },
|
||||
{ name = "pbr", specifier = "==7.0.3" },
|
||||
{ name = "pip", marker = "extra == 'dev'", specifier = "==25.3" },
|
||||
{ name = "pip-tools", marker = "extra == 'dev'", specifier = "==7.5.2" },
|
||||
{ name = "platformdirs", marker = "extra == 'dev'", specifier = "==4.5.1" },
|
||||
{ name = "pluggy", specifier = "==1.6.0" },
|
||||
@ -129,6 +133,8 @@ requires-dist = [
|
||||
{ name = "rfc3986", specifier = "==2.0.0" },
|
||||
{ name = "rich", specifier = "==14.2.0" },
|
||||
{ name = "rush", specifier = "==2021.4.0" },
|
||||
{ name = "setuptools", specifier = "==80.9.0" },
|
||||
{ name = "setuptools", marker = "extra == 'dev'", specifier = "==80.9.0" },
|
||||
{ name = "stevedore", specifier = "==5.6.0" },
|
||||
{ name = "thesmuggler", specifier = "==1.0.1" },
|
||||
{ name = "timeago", specifier = "==1.0.16" },
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user