aprsd/aprsd/packets/collector.py

54 lines
1.5 KiB
Python

import logging
from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable
class PacketMonitor(Protocol):
"""Protocol for Monitoring packets in some way."""
def rx(self, packet: type[core.Packet]) -> None:
"""When we get a packet from the network."""
...
def tx(self, packet: type[core.Packet]) -> None:
"""When we send a packet out the network."""
...
@singleton
class PacketCollector:
def __init__(self):
self.monitors: list[Callable] = []
def register(self, monitor: Callable) -> None:
self.monitors.append(monitor)
def rx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
try:
cls.rx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (rx): {e}")
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")
def tx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
try:
cls.tx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (tx): {e}")
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")