diff --git a/aprsd/client.py b/aprsd/client.py index 4991578..6d46954 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -11,7 +11,7 @@ import wrapt from aprsd import exception from aprsd.clients import aprsis, fake, kiss -from aprsd.packets import core, packet_list +from aprsd.packets import collector, core from aprsd.utils import singleton, trace @@ -101,7 +101,8 @@ class Client: self._client.stop() def send(self, packet: core.Packet): - packet_list.PacketList().tx(packet) + """Send a packet to the network.""" + collector.PacketCollector().tx(packet) self.client.send(packet) @wrapt.synchronized(lock) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index f17f5e8..e17499f 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -17,6 +17,7 @@ from rich.console import Console import aprsd from aprsd import cli_helper, client, packets, plugin, threads from aprsd.main import cli +from aprsd.packets import collector as packet_collector from aprsd.packets import log as packet_log from aprsd.stats import collector from aprsd.threads import keep_alive, rx @@ -81,7 +82,7 @@ class APRSDListenThread(rx.APRSDRXThread): # This is the listen only command. self.plugin_manager.run(packet) - packets.PacketList().rx(packet) + packet_collector.PacketCollector().rx(packet) @cli.command() diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index 84a503e..aba0d04 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -11,6 +11,7 @@ import aprsd from aprsd import cli_helper, client, packets from aprsd import conf # noqa : F401 from aprsd.main import cli +from aprsd.packets import collector from aprsd.threads import tx @@ -103,7 +104,7 @@ def send_message( global got_ack, got_response cl = client.factory.create() packet = cl.decode_packet(packet) - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) packet.log("RX") # LOG.debug("Got packet back {}".format(packet)) if isinstance(packet, packets.AckPacket): diff --git a/aprsd/packets/collector.py b/aprsd/packets/collector.py new file mode 100644 index 0000000..d6c6659 --- /dev/null +++ b/aprsd/packets/collector.py @@ -0,0 +1,42 @@ +from typing import Callable, Protocol, runtime_checkable + +from aprsd.packets import core +from aprsd.utils import singleton + + +@runtime_checkable +class PacketMonitor(Protocol): + """Protocol for Monitoring packets in some way.""" + + def rx(self, packet: type[core.Packet]) -> None: + """When we get a packet from the network.""" + ... + + def tx(self, packet: type[core.Packet]) -> None: + """When we send a packet out the network.""" + ... + + +@singleton +class PacketCollector: + def __init__(self): + self.monitors: list[Callable] = [] + + def register(self, monitor: Callable) -> None: + self.monitors.append(monitor) + + def rx(self, packet: type[core.Packet]) -> None: + for name in self.monitors: + cls = name() + if isinstance(cls, PacketMonitor): + cls.rx(packet) + else: + raise TypeError(f"Monitor {name} is not a PacketMonitor") + + def tx(self, packet: type[core.Packet]) -> None: + for name in self.monitors: + cls = name() + if isinstance(cls, PacketMonitor): + cls.tx(packet) + else: + raise TypeError(f"Monitor {name} is not a PacketMonitor") diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 1ed2ec6..cb80452 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -5,7 +5,7 @@ import threading from oslo_config import cfg import wrapt -from aprsd.packets import seen_list +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -14,6 +14,7 @@ LOG = logging.getLogger("APRSD") class PacketList(objectstore.ObjectStoreMixin): + """Class to keep track of the packets we tx/rx.""" _instance = None lock = threading.Lock() _total_rx: int = 0 @@ -31,7 +32,7 @@ class PacketList(objectstore.ObjectStoreMixin): return cls._instance @wrapt.synchronized(lock) - def rx(self, packet): + def rx(self, packet: type[core.Packet]): """Add a packet that was received.""" self._total_rx += 1 self._add(packet) @@ -39,10 +40,9 @@ class PacketList(objectstore.ObjectStoreMixin): if not ptype in self.data["types"]: self.data["types"][ptype] = {"tx": 0, "rx": 0} self.data["types"][ptype]["rx"] += 1 - seen_list.SeenList().update_seen(packet) @wrapt.synchronized(lock) - def tx(self, packet): + def tx(self, packet: type[core.Packet]): """Add a packet that was received.""" self._total_tx += 1 self._add(packet) @@ -50,7 +50,6 @@ class PacketList(objectstore.ObjectStoreMixin): if not ptype in self.data["types"]: self.data["types"][ptype] = {"tx": 0, "rx": 0} self.data["types"][ptype]["tx"] += 1 - seen_list.SeenList().update_seen(packet) @wrapt.synchronized(lock) def add(self, packet): @@ -105,3 +104,9 @@ class PacketList(objectstore.ObjectStoreMixin): "packets": pkts, } return stats + + +# Now register the PacketList with the collector +# every packet we RX and TX goes through the collector +# for processing for whatever reason is needed. +collector.PacketCollector().register(PacketList) diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py index 88ab0fd..e72324c 100644 --- a/aprsd/packets/seen_list.py +++ b/aprsd/packets/seen_list.py @@ -5,6 +5,7 @@ import threading from oslo_config import cfg import wrapt +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -22,7 +23,6 @@ class SeenList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance._init_store() cls._instance.data = {} return cls._instance @@ -37,7 +37,8 @@ class SeenList(objectstore.ObjectStoreMixin): return self.data.copy() @wrapt.synchronized(lock) - def update_seen(self, packet): + def rx(self, packet: type[core.Packet]): + """When we get a packet from the network, update the seen list.""" callsign = None if packet.from_call: callsign = packet.from_call @@ -51,3 +52,11 @@ class SeenList(objectstore.ObjectStoreMixin): } self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["count"] += 1 + + def tx(self, packet: type[core.Packet]): + """We don't care about TX packets.""" + + +# Register with the packet collector so we can process the packet +# when we get it off the client (network) +collector.PacketCollector().register(SeenList) diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index ad8f222..174255a 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -6,6 +6,7 @@ from oslo_config import cfg import wrapt from aprsd import utils +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -23,24 +24,22 @@ class WatchList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance._init_store() - cls._instance.data = {} + cls._instance._update_from_conf() return cls._instance - def __init__(self, config=None): - CONF.watch_list.packet_keep_count - - if CONF.watch_list.callsigns: + def _update_from_conf(self, config=None): + if CONF.watch_list.enabled and CONF.watch_list.callsigns: for callsign in CONF.watch_list.callsigns: call = callsign.replace("*", "") # FIXME(waboring) - we should fetch the last time we saw # a beacon from a callsign or some other mechanism to find # last time a message was seen by aprs-is. For now this # is all we can do. - self.data[call] = { - "last": None, - "packet": None, - } + if call not in self.data: + self.data[call] = { + "last": None, + "packet": None, + } @wrapt.synchronized(lock) def stats(self, serializable=False) -> dict: @@ -61,15 +60,19 @@ class WatchList(objectstore.ObjectStoreMixin): return callsign in self.data @wrapt.synchronized(lock) - def update_seen(self, packet): - if packet.addresse: - callsign = packet.addresse - else: - callsign = packet.from_call + def rx(self, packet: type[core.Packet]) -> None: + """Track when we got a packet from the network.""" + LOG.error(f"WatchList RX {packet}") + callsign = packet.from_call + if self.callsign_in_watchlist(callsign): + LOG.error(f"Updating WatchList RX {callsign}") self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["packet"] = packet + def tx(self, packet: type[core.Packet]) -> None: + """We don't care about TX packets.""" + def last_seen(self, callsign): if self.callsign_in_watchlist(callsign): return self.data[callsign]["last"] @@ -113,3 +116,6 @@ class WatchList(objectstore.ObjectStoreMixin): return False else: return False + + +collector.PacketCollector().register(WatchList) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index ab861d2..7a59865 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -7,6 +7,7 @@ import aprslib from oslo_config import cfg from aprsd import client, packets, plugin +from aprsd.packets import collector from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx @@ -116,7 +117,7 @@ class APRSDDupeRXThread(APRSDRXThread): if not found: # We haven't seen this packet before, so we process it. - pkt_list.rx(packet) + collector.PacketCollector().rx(packet) self.packet_queue.put(packet) elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout: # If the packet came in within N seconds of the @@ -127,7 +128,7 @@ class APRSDDupeRXThread(APRSDRXThread): f"Packet {packet.from_call}:{packet.msgNo} already tracked " f"but older than {CONF.packet_dupe_timeout} seconds. processing.", ) - pkt_list.rx(packet) + collector.PacketCollector().rx(packet) self.packet_queue.put(packet) @@ -154,7 +155,7 @@ class APRSDProcessPacketThread(APRSDThread): """We got an ack for a message, no need to resend it.""" ack_num = packet.msgNo LOG.debug(f"Got ack for message {ack_num}") - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) @@ -162,7 +163,7 @@ class APRSDProcessPacketThread(APRSDThread): """We got an ack embedded in a packet.""" ack_num = packet.ackMsgNo LOG.debug(f"Got PiggyBackAck for message {ack_num}") - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) @@ -170,7 +171,7 @@ class APRSDProcessPacketThread(APRSDThread): """We got a reject message for a packet. Stop sending the message.""" ack_num = packet.msgNo LOG.debug(f"Got REJECT for message {ack_num}") - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index a022373..7637fc2 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -70,6 +70,7 @@ class ObjectStoreMixin: """Save any queued to disk?""" if not CONF.enable_save: return + self._init_store() save_filename = self._save_filename() if len(self) > 0: LOG.info(