mirror of
				https://github.com/craigerl/aprsd.git
				synced 2025-10-31 04:40:22 -04:00 
			
		
		
		
	Added some changes to listen
to collect stats and only show those stats during listen
This commit is contained in:
		
							parent
							
								
									0271ccd145
								
							
						
					
					
						commit
						d863474c13
					
				| @ -10,12 +10,13 @@ import sys | |||||||
| import time | import time | ||||||
| 
 | 
 | ||||||
| import click | import click | ||||||
|  | from loguru import logger | ||||||
| from oslo_config import cfg | from oslo_config import cfg | ||||||
| from rich.console import Console | from rich.console import Console | ||||||
| 
 | 
 | ||||||
| # local imports here | # local imports here | ||||||
| import aprsd | import aprsd | ||||||
| from aprsd import cli_helper, packets, plugin, threads | from aprsd import cli_helper, packets, plugin, threads, utils | ||||||
| from aprsd.client import client_factory | from aprsd.client import client_factory | ||||||
| from aprsd.main import cli | from aprsd.main import cli | ||||||
| from aprsd.packets import collector as packet_collector | from aprsd.packets import collector as packet_collector | ||||||
| @ -24,12 +25,14 @@ from aprsd.packets import seen_list | |||||||
| from aprsd.stats import collector | from aprsd.stats import collector | ||||||
| from aprsd.threads import keep_alive, rx | from aprsd.threads import keep_alive, rx | ||||||
| from aprsd.threads import stats as stats_thread | from aprsd.threads import stats as stats_thread | ||||||
|  | from aprsd.threads.aprsd import APRSDThread | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # setup the global logger | # setup the global logger | ||||||
| # log.basicConfig(level=log.DEBUG) # level=10 | # log.basicConfig(level=log.DEBUG) # level=10 | ||||||
| LOG = logging.getLogger("APRSD") | LOG = logging.getLogger("APRSD") | ||||||
| CONF = cfg.CONF | CONF = cfg.CONF | ||||||
|  | LOGU = logger | ||||||
| console = Console() | console = Console() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -46,6 +49,41 @@ def signal_handler(sig, frame): | |||||||
|         collector.Collector().collect() |         collector.Collector().collect() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @utils.singleton | ||||||
|  | class SimplePacketStats: | ||||||
|  |     def __init__(self): | ||||||
|  |         self.total_rx = 0 | ||||||
|  |         self.total_tx = 0 | ||||||
|  |         self.types = {} | ||||||
|  | 
 | ||||||
|  |     def rx(self, packet): | ||||||
|  |         self.total_rx += 1 | ||||||
|  |         ptype = packet.__class__.__name__ | ||||||
|  |         if ptype not in self.types: | ||||||
|  |             self.types[ptype] = {"tx": 0, "rx": 0} | ||||||
|  |         self.types[ptype]["rx"] += 1 | ||||||
|  | 
 | ||||||
|  |     def tx(self, packet): | ||||||
|  |         self.total_tx += 1 | ||||||
|  |         ptype = packet.__class__.__name__ | ||||||
|  |         if ptype not in self.types: | ||||||
|  |             self.types[ptype] = {"tx": 0, "rx": 0} | ||||||
|  |         self.types[ptype]["tx"] += 1 | ||||||
|  | 
 | ||||||
|  |     def flush(self): | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  |     def load(self): | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  |     def stats(self, serializable=False): | ||||||
|  |         return { | ||||||
|  |             "total_rx": self.total_rx, | ||||||
|  |             "total_tx": self.total_tx, | ||||||
|  |             "types": self.types, | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| class APRSDListenThread(rx.APRSDRXThread): | class APRSDListenThread(rx.APRSDRXThread): | ||||||
|     def __init__(self, packet_queue, packet_filter=None, plugin_manager=None): |     def __init__(self, packet_queue, packet_filter=None, plugin_manager=None): | ||||||
|         super().__init__(packet_queue) |         super().__init__(packet_queue) | ||||||
| @ -88,6 +126,28 @@ class APRSDListenThread(rx.APRSDRXThread): | |||||||
|         packet_collector.PacketCollector().rx(packet) |         packet_collector.PacketCollector().rx(packet) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class ListenStatsThread(APRSDThread): | ||||||
|  |     def __init__(self): | ||||||
|  |         super().__init__("SimpleStats") | ||||||
|  |         self._last_total_rx = 0 | ||||||
|  | 
 | ||||||
|  |     def loop(self): | ||||||
|  |         if self.loop_count % 10 == 0: | ||||||
|  |             # log the stats every 10 seconds | ||||||
|  |             stats_json = collector.Collector().collect() | ||||||
|  |             stats = stats_json["SimplePacketStats"] | ||||||
|  |             total_rx = stats["total_rx"] | ||||||
|  |             rate = (total_rx - self._last_total_rx) / 10 | ||||||
|  |             LOG.warning(f"RX Rate: {rate} pps  Total RX: {total_rx} - {self._last_total_rx}") | ||||||
|  |             #LOG.error(stats) | ||||||
|  |             self._last_total_rx = total_rx | ||||||
|  |             for k, v in stats["types"].items(): | ||||||
|  |                 LOGU.opt(colors=True).warning(f"Type: {k} <blue>RX: {v['rx']}</blue> <red>TX: {v['tx']}</red>") | ||||||
|  | 
 | ||||||
|  |         time.sleep(1) | ||||||
|  |         return True | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @cli.command() | @cli.command() | ||||||
| @cli_helper.add_options(cli_helper.common_options) | @cli_helper.add_options(cli_helper.common_options) | ||||||
| @click.option( | @click.option( | ||||||
| @ -201,6 +261,27 @@ def listen( | |||||||
|         # just deregister the class from the packet collector |         # just deregister the class from the packet collector | ||||||
|         packet_collector.PacketCollector().unregister(seen_list.SeenList) |         packet_collector.PacketCollector().unregister(seen_list.SeenList) | ||||||
| 
 | 
 | ||||||
|  |     packet_collector.PacketCollector().register(SimplePacketStats) | ||||||
|  | 
 | ||||||
|  |     from aprsd.client import stats as client_stats | ||||||
|  |     from aprsd.packets.packet_list import PacketList  # noqa: F401 | ||||||
|  |     from aprsd.packets.seen_list import SeenList  # noqa: F401 | ||||||
|  |     from aprsd.packets.tracker import PacketTrack  # noqa: F401 | ||||||
|  |     from aprsd.packets.watch_list import WatchList  # noqa: F401 | ||||||
|  |     from aprsd.plugins import email | ||||||
|  |     from aprsd.threads import aprsd as aprsd_thread | ||||||
|  |     c = collector.Collector() | ||||||
|  |     # c.unregister_producer(app.APRSDStats) | ||||||
|  |     c.unregister_producer(PacketList) | ||||||
|  |     c.unregister_producer(WatchList) | ||||||
|  |     #c.unregister_producer(PacketTrack) | ||||||
|  |     c.unregister_producer(plugin.PluginManager) | ||||||
|  |     c.unregister_producer(aprsd_thread.APRSDThreadList) | ||||||
|  |     c.unregister_producer(email.EmailStats) | ||||||
|  |     c.unregister_producer(client_stats.APRSClientStats) | ||||||
|  |     c.unregister_producer(seen_list.SeenList) | ||||||
|  |     c.register_producer(SimplePacketStats) | ||||||
|  | 
 | ||||||
|     pm = None |     pm = None | ||||||
|     pm = plugin.PluginManager() |     pm = plugin.PluginManager() | ||||||
|     if load_plugins: |     if load_plugins: | ||||||
| @ -222,6 +303,8 @@ def listen( | |||||||
|     ) |     ) | ||||||
|     LOG.debug("Start APRSDListenThread") |     LOG.debug("Start APRSDListenThread") | ||||||
|     listen_thread.start() |     listen_thread.start() | ||||||
|  |     listen_stats = ListenStatsThread() | ||||||
|  |     listen_stats.start() | ||||||
| 
 | 
 | ||||||
|     keepalive.start() |     keepalive.start() | ||||||
|     LOG.debug("keepalive Join") |     LOG.debug("keepalive Join") | ||||||
|  | |||||||
| @ -35,3 +35,8 @@ class Collector: | |||||||
|         if not isinstance(producer_name, StatsProducer): |         if not isinstance(producer_name, StatsProducer): | ||||||
|             raise TypeError(f"Producer {producer_name} is not a StatsProducer") |             raise TypeError(f"Producer {producer_name} is not a StatsProducer") | ||||||
|         self.producers.append(producer_name) |         self.producers.append(producer_name) | ||||||
|  | 
 | ||||||
|  |     def unregister_producer(self, producer_name: Callable): | ||||||
|  |         if not isinstance(producer_name, StatsProducer): | ||||||
|  |             raise TypeError(f"Producer {producer_name} is not a StatsProducer") | ||||||
|  |         self.producers.remove(producer_name) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user