2024-04-17 12:24:56 -04:00
|
|
|
import logging
|
2024-04-16 11:06:38 -04:00
|
|
|
from typing import Callable, Protocol, runtime_checkable
|
2024-03-29 13:23:39 -04:00
|
|
|
|
|
|
|
from aprsd.utils import singleton
|
|
|
|
|
|
|
|
|
2024-04-17 12:24:56 -04:00
|
|
|
LOG = logging.getLogger("APRSD")
|
|
|
|
|
|
|
|
|
2024-04-16 11:06:38 -04:00
|
|
|
@runtime_checkable
|
2024-03-29 13:23:39 -04:00
|
|
|
class StatsProducer(Protocol):
|
|
|
|
"""The StatsProducer protocol is used to define the interface for collecting stats."""
|
2024-04-02 14:07:37 -04:00
|
|
|
def stats(self, serializeable=False) -> dict:
|
|
|
|
"""provide stats in a dictionary format."""
|
2024-03-29 13:23:39 -04:00
|
|
|
...
|
|
|
|
|
|
|
|
|
|
|
|
@singleton
|
|
|
|
class Collector:
|
|
|
|
"""The Collector class is used to collect stats from multiple StatsProducer instances."""
|
|
|
|
def __init__(self):
|
2024-04-16 11:06:38 -04:00
|
|
|
self.producers: list[Callable] = []
|
2024-03-29 13:23:39 -04:00
|
|
|
|
2024-04-02 14:07:37 -04:00
|
|
|
def collect(self, serializable=False) -> dict:
|
2024-03-29 13:23:39 -04:00
|
|
|
stats = {}
|
2024-04-16 11:06:38 -04:00
|
|
|
for name in self.producers:
|
|
|
|
cls = name()
|
|
|
|
if isinstance(cls, StatsProducer):
|
2024-04-17 12:24:56 -04:00
|
|
|
try:
|
2024-04-24 13:49:55 -04:00
|
|
|
stats[cls.__class__.__name__] = cls.stats(serializable=serializable).copy()
|
2024-04-17 12:24:56 -04:00
|
|
|
except Exception as e:
|
|
|
|
LOG.error(f"Error in producer {name} (stats): {e}")
|
2024-04-16 11:06:38 -04:00
|
|
|
else:
|
|
|
|
raise TypeError(f"{cls} is not an instance of StatsProducer")
|
2024-03-29 13:23:39 -04:00
|
|
|
return stats
|
|
|
|
|
2024-04-16 11:06:38 -04:00
|
|
|
def register_producer(self, producer_name: Callable):
|
|
|
|
self.producers.append(producer_name)
|