diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 4e27ed1..8b2e03b 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -3,8 +3,10 @@ # # python included libs +import cProfile import datetime import logging +import pstats import signal import sys import time @@ -20,7 +22,7 @@ import aprsd from aprsd import cli_helper, packets, plugin, threads, utils from aprsd.client.client import APRSDClient from aprsd.main import cli -from aprsd.packets import core, seen_list +from aprsd.packets import core from aprsd.packets import log as packet_log from aprsd.packets.filter import PacketFilter from aprsd.packets.filters import dupe_filter, packet_type @@ -28,6 +30,7 @@ from aprsd.stats import collector from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread from aprsd.threads.aprsd import APRSDThread +from aprsd.threads.stats import StatsLogThread # setup the global logger # log.basicConfig(level=log.DEBUG) # level=10 @@ -81,109 +84,6 @@ class APRSDListenProcessThread(rx.APRSDFilterThread): self.plugin_manager.run(packet) -class ListenStatsThread(APRSDThread): - """Log the stats from the PacketList.""" - - def __init__(self): - super().__init__('PacketStatsLog') - self._last_total_rx = 0 - self.period = 10 - self.start_time = time.time() - - def loop(self): - if self.loop_count % self.period == 0: - # log the stats every 10 seconds - stats_json = collector.Collector().collect(serializable=True) - stats = stats_json['PacketList'] - total_rx = stats['rx'] - rx_delta = total_rx - self._last_total_rx - rate = rx_delta / self.period - - # Get unique callsigns count from SeenList stats - seen_list_instance = seen_list.SeenList() - # stats() returns data while holding lock internally, so copy it immediately - seen_list_stats = seen_list_instance.stats() - seen_list_instance.save() - # Copy the stats to avoid holding references to locked data - seen_list_stats = seen_list_stats.copy() - unique_callsigns_count = len(seen_list_stats) - - # Calculate uptime - elapsed = time.time() - self.start_time - elapsed_minutes = elapsed / 60 - elapsed_hours = elapsed / 3600 - - # Log summary stats - LOGU.opt(colors=True).info( - f'RX Rate: {rate:.2f} pps ' - f'Total RX: {total_rx} ' - f'RX Last {self.period} secs: {rx_delta} ' - ) - LOGU.opt(colors=True).info( - f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) ' - f'Unique Callsigns: {unique_callsigns_count}', - ) - self._last_total_rx = total_rx - - # Log individual type stats, sorted by RX count (descending) - sorted_types = sorted( - stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True - ) - for k, v in sorted_types: - # Calculate percentage of this packet type compared to total RX - percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0 - # Format values first, then apply colors - packet_type_str = f'{k:<15}' - rx_count_str = f'{v["rx"]:6d}' - tx_count_str = f'{v["tx"]:6d}' - percentage_str = f'{percentage:5.1f}%' - # Use different colors for RX count based on threshold (matching mqtt_injest.py) - rx_color_tag = ( - 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red' - ) - LOGU.opt(colors=True).info( - f' {packet_type_str}: ' - f'<{rx_color_tag}>RX: {rx_count_str} ' - f'TX: {tx_count_str} ' - f'({percentage_str})', - ) - - # Extract callsign counts from seen_list stats - callsign_counts = {} - for callsign, data in seen_list_stats.items(): - if isinstance(data, dict) and 'count' in data: - callsign_counts[callsign] = data['count'] - - # Sort callsigns by packet count (descending) and get top 10 - sorted_callsigns = sorted( - callsign_counts.items(), key=lambda x: x[1], reverse=True - )[:10] - - # Log top 10 callsigns - if sorted_callsigns: - LOGU.opt(colors=True).info( - 'Top 10 Callsigns by Packet Count:' - ) - total_ranks = len(sorted_callsigns) - for rank, (callsign, count) in enumerate(sorted_callsigns, 1): - # Use different colors based on rank: most packets (rank 1) = red, - # least packets (last rank) = green, middle = yellow - if rank == 1: - count_color_tag = 'red' - elif rank == total_ranks: - count_color_tag = 'green' - else: - count_color_tag = 'yellow' - LOGU.opt(colors=True).info( - f' {rank:2d}. ' - f'{callsign:<12}: ' - f'<{count_color_tag}>{count:6d} packets', - ) - - time.sleep(1) - return True - - class StatsExportThread(APRSDThread): """Export stats to remote aprsd-exporter API.""" @@ -304,6 +204,12 @@ class StatsExportThread(APRSDThread): default='http://localhost:8081', help='URL of the aprsd-exporter API to send stats to.', ) +@click.option( + '--profile', + default=False, + is_flag=True, + help='Enable Python cProfile profiling to identify performance bottlenecks.', +) @click.pass_context @cli_helper.process_standard_options def listen( @@ -318,6 +224,7 @@ def listen( enable_packet_stats, export_stats, exporter_url, + profile, ): """Listen to packets on the APRS-IS Network based on FILTER. @@ -329,6 +236,13 @@ def listen( o/obj1/obj2... - Object Filter Pass all objects with the exact name of obj1, obj2, ... (* wild card allowed)\n """ + # Initialize profiler if enabled + profiler = None + if profile: + LOG.info('Starting Python cProfile profiling') + profiler = cProfile.Profile() + profiler.enable() + signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -435,8 +349,8 @@ def listen( LOG.debug(f'enable_packet_stats: {enable_packet_stats}') if enable_packet_stats: - LOG.debug('Start ListenStatsThread') - listen_stats = ListenStatsThread() + LOG.debug('Start StatsLogThread') + listen_stats = StatsLogThread() listen_stats.start() LOG.debug(f'export_stats: {export_stats}') @@ -454,3 +368,25 @@ def listen( stats.join() if stats_export: stats_export.join() + + # Save profiling results if enabled + if profiler: + profiler.disable() + profile_file = 'aprsd_listen_profile.prof' + profiler.dump_stats(profile_file) + LOG.info(f'Profile saved to {profile_file}') + + # Print profiling summary + LOG.info('Profile Summary (top 50 functions by cumulative time):') + stats = pstats.Stats(profiler) + stats.sort_stats('cumulative') + + # Log the top functions + LOG.info('-' * 80) + for item in stats.get_stats().items()[:50]: + func_info, stats_tuple = item + cumulative = stats_tuple[3] + total_calls = stats_tuple[0] + LOG.info( + f'{func_info} - Calls: {total_calls}, Cumulative: {cumulative:.4f}s' + ) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 0c34f6a..0da6a21 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -15,6 +15,7 @@ from aprsd.packets import collector as packet_collector from aprsd.packets import seen_list from aprsd.threads import keepalive, registry, rx, service, tx from aprsd.threads import stats as stats_thread +from aprsd.threads.stats import StatsLogThread CONF = cfg.CONF LOG = logging.getLogger('APRSD') @@ -42,9 +43,15 @@ def _is_aprsd_gps_extension_installed(): default=False, help='Flush out all old aged messages on disk.', ) +@click.option( + '--enable-packet-stats', + default=False, + is_flag=True, + help='Enable packet stats periodic logging.', +) @click.pass_context @cli_helper.process_standard_options -def server(ctx, flush): +def server(ctx, flush, enable_packet_stats): """Start the aprsd server gateway process.""" signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler) @@ -165,6 +172,11 @@ def server(ctx, flush): LOG.info('Registry Enabled. Starting Registry thread.') service_threads.register(registry.APRSRegistryThread()) + if enable_packet_stats: + LOG.debug('Start StatsLogThread') + listen_stats = StatsLogThread() + listen_stats.start() + service_threads.start() service_threads.join() diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index 7946b6d..3801387 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -7,5 +7,6 @@ from .rx import ( # noqa: F401 APRSDProcessPacketThread, APRSDRXThread, ) +from .stats import APRSDStatsStoreThread, StatsLogThread # noqa: F401 packet_queue = queue.Queue(maxsize=500) diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py index bd7ef3a..a69b384 100644 --- a/aprsd/threads/stats.py +++ b/aprsd/threads/stats.py @@ -2,13 +2,15 @@ import logging import threading import time +from loguru import logger from oslo_config import cfg -from aprsd.stats import collector + from aprsd.threads import APRSDThread from aprsd.utils import objectstore CONF = cfg.CONF LOG = logging.getLogger('APRSD') +LOGU = logger class StatsStore(objectstore.ObjectStoreMixin): @@ -33,6 +35,9 @@ class APRSDStatsStoreThread(APRSDThread): def loop(self): if self.loop_count % self.save_interval == 0: + # Lazy import to avoid circular dependency + from aprsd.stats import collector + stats = collector.Collector().collect() ss = StatsStore() ss.add(stats) @@ -40,3 +45,110 @@ class APRSDStatsStoreThread(APRSDThread): time.sleep(1) return True + + +class StatsLogThread(APRSDThread): + """Log the stats from the PacketList.""" + + def __init__(self): + super().__init__('PacketStatsLog') + self._last_total_rx = 0 + self.period = 10 + self.start_time = time.time() + + def loop(self): + if self.loop_count % self.period == 0: + # Lazy imports to avoid circular dependency + from aprsd.packets import seen_list + from aprsd.stats import collector + + # log the stats every 10 seconds + stats_json = collector.Collector().collect(serializable=True) + stats = stats_json['PacketList'] + total_rx = stats['rx'] + rx_delta = total_rx - self._last_total_rx + rate = rx_delta / self.period + + # Get unique callsigns count from SeenList stats + seen_list_instance = seen_list.SeenList() + # stats() returns data while holding lock internally, so copy it immediately + seen_list_stats = seen_list_instance.stats() + seen_list_instance.save() + # Copy the stats to avoid holding references to locked data + seen_list_stats = seen_list_stats.copy() + unique_callsigns_count = len(seen_list_stats) + + # Calculate uptime + elapsed = time.time() - self.start_time + elapsed_minutes = elapsed / 60 + elapsed_hours = elapsed / 3600 + + # Log summary stats + LOGU.opt(colors=True).info( + f'RX Rate: {rate:.2f} pps ' + f'Total RX: {total_rx} ' + f'RX Last {self.period} secs: {rx_delta} ' + ) + LOGU.opt(colors=True).info( + f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) ' + f'Unique Callsigns: {unique_callsigns_count}', + ) + self._last_total_rx = total_rx + + # Log individual type stats, sorted by RX count (descending) + sorted_types = sorted( + stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True + ) + for k, v in sorted_types: + # Calculate percentage of this packet type compared to total RX + percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0 + # Format values first, then apply colors + packet_type_str = f'{k:<15}' + rx_count_str = f'{v["rx"]:6d}' + tx_count_str = f'{v["tx"]:6d}' + percentage_str = f'{percentage:5.1f}%' + # Use different colors for RX count based on threshold (matching mqtt_injest.py) + rx_color_tag = ( + 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red' + ) + LOGU.opt(colors=True).info( + f' {packet_type_str}: ' + f'<{rx_color_tag}>RX: {rx_count_str} ' + f'TX: {tx_count_str} ' + f'({percentage_str})', + ) + + # Extract callsign counts from seen_list stats + callsign_counts = {} + for callsign, data in seen_list_stats.items(): + if isinstance(data, dict) and 'count' in data: + callsign_counts[callsign] = data['count'] + + # Sort callsigns by packet count (descending) and get top 10 + sorted_callsigns = sorted( + callsign_counts.items(), key=lambda x: x[1], reverse=True + )[:10] + + # Log top 10 callsigns + if sorted_callsigns: + LOGU.opt(colors=True).info( + 'Top 10 Callsigns by Packet Count:' + ) + total_ranks = len(sorted_callsigns) + for rank, (callsign, count) in enumerate(sorted_callsigns, 1): + # Use different colors based on rank: most packets (rank 1) = red, + # least packets (last rank) = green, middle = yellow + if rank == 1: + count_color_tag = 'red' + elif rank == total_ranks: + count_color_tag = 'green' + else: + count_color_tag = 'yellow' + LOGU.opt(colors=True).info( + f' {rank:2d}. ' + f'{callsign:<12}: ' + f'<{count_color_tag}>{count:6d} packets', + ) + + time.sleep(1) + return True diff --git a/aprsd/utils/package.py b/aprsd/utils/package.py index b81b2ae..b2bf079 100644 --- a/aprsd/utils/package.py +++ b/aprsd/utils/package.py @@ -60,18 +60,27 @@ def get_module_info(package_name, module_name, module_path): for path, _subdirs, files in os.walk(dir_path): for name in files: if fnmatch.fnmatch(name, pattern): - module = smuggle(f'{path}/{name}') - for mem_name, obj in inspect.getmembers(module): - if inspect.isclass(obj) and is_plugin(obj): - obj_list.append( - { - 'package': package_name, - 'name': mem_name, - 'obj': obj, - 'version': obj.version, - 'path': f'{".".join([module_name, obj.__name__])}', - }, - ) + # Skip __init__.py files as they often have relative imports + # that don't work when imported directly via smuggle + if name == '__init__.py': + continue + try: + module = smuggle(f'{path}/{name}') + for mem_name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and is_plugin(obj): + obj_list.append( + { + 'package': package_name, + 'name': mem_name, + 'obj': obj, + 'version': obj.version, + 'path': f'{".".join([module_name, obj.__name__])}', + }, + ) + except (ImportError, SyntaxError, AttributeError) as e: + # Skip files that can't be imported (relative imports, syntax errors, etc.) + LOG.debug(f'Could not import {path}/{name}: {e}') + continue return obj_list diff --git a/tox.ini b/tox.ini index a5485a6..f8a5570 100644 --- a/tox.ini +++ b/tox.ini @@ -1,8 +1,8 @@ [tox] -minversion = 2.9.0 +minversion = 4.30.0 skipdist = True skip_missing_interpreters = true -envlist = pep8,py{311} +envlist = pep8,py{310,311,312,313,314} #requires = tox-pipenv # pip==22.0.4 # pip-tools==5.4.0 @@ -25,7 +25,7 @@ deps = pytest-cov pytest commands = - pytest -s -v --cov-report term-missing --cov=aprsd {posargs} + pytest -v --cov-report term-missing --cov=aprsd {posargs} coverage: coverage report -m coverage: coverage xml @@ -45,7 +45,6 @@ commands = #sphinx-build -a -W . _build sphinx-build -M html source build - [testenv:pep8] deps = flake8 @@ -80,7 +79,6 @@ exclude = .venv,.git,.tox,dist,doc,.ropeproject # This section is not needed if not using GitHub Actions for CI. [gh-actions] python = - 3.9: py39, pep8, type-check, docs 3.10: py39, pep8, type-check, docs 3.11: py311, pep8, type-check, docs diff --git a/uv.lock b/uv.lock index f8fa464..b6d3f83 100644 --- a/uv.lock +++ b/uv.lock @@ -38,6 +38,7 @@ dependencies = [ { name = "rfc3986" }, { name = "rich" }, { name = "rush" }, + { name = "setuptools" }, { name = "stevedore" }, { name = "thesmuggler" }, { name = "timeago" }, @@ -63,6 +64,7 @@ dev = [ { name = "identify" }, { name = "nodeenv" }, { name = "packaging" }, + { name = "pip" }, { name = "pip-tools" }, { name = "platformdirs" }, { name = "pluggy" }, @@ -70,6 +72,7 @@ dev = [ { name = "pyproject-api" }, { name = "pyproject-hooks" }, { name = "pyyaml" }, + { name = "setuptools" }, { name = "tomli" }, { name = "tox" }, { name = "typing-extensions" }, @@ -112,6 +115,7 @@ requires-dist = [ { name = "packaging", specifier = "==25.0" }, { name = "packaging", marker = "extra == 'dev'", specifier = "==25.0" }, { name = "pbr", specifier = "==7.0.3" }, + { name = "pip", marker = "extra == 'dev'", specifier = "==25.3" }, { name = "pip-tools", marker = "extra == 'dev'", specifier = "==7.5.2" }, { name = "platformdirs", marker = "extra == 'dev'", specifier = "==4.5.1" }, { name = "pluggy", specifier = "==1.6.0" }, @@ -129,6 +133,8 @@ requires-dist = [ { name = "rfc3986", specifier = "==2.0.0" }, { name = "rich", specifier = "==14.2.0" }, { name = "rush", specifier = "==2021.4.0" }, + { name = "setuptools", specifier = "==80.9.0" }, + { name = "setuptools", marker = "extra == 'dev'", specifier = "==80.9.0" }, { name = "stevedore", specifier = "==5.6.0" }, { name = "thesmuggler", specifier = "==1.0.1" }, { name = "timeago", specifier = "==1.0.16" },