From a656d9326333c79482d0797f676a6e865d49541c Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 16 Apr 2024 14:34:14 -0400 Subject: [PATCH] Added new PacketCollector this patch adds the new PacketCollector class. It's a single point for collecting information about packets sent and recieved from the APRS client. Basically instead of having the packetlist call the seen list when we get a packet, we simply call the PacketCollector.rx(), which in turn calls each registered PacketMonitor class. This allows us to decouple the packet stats like classses inside of APRSD. More importantly, it allows extensions to append their own PacketMonitor class to the chain without modifying ARPSD. --- aprsd/client.py | 5 +++-- aprsd/cmds/listen.py | 3 ++- aprsd/cmds/send_message.py | 3 ++- aprsd/packets/collector.py | 42 ++++++++++++++++++++++++++++++++++++ aprsd/packets/packet_list.py | 15 ++++++++----- aprsd/packets/seen_list.py | 13 +++++++++-- aprsd/packets/watch_list.py | 36 ++++++++++++++++++------------- aprsd/threads/rx.py | 11 +++++----- aprsd/utils/objectstore.py | 1 + 9 files changed, 98 insertions(+), 31 deletions(-) create mode 100644 aprsd/packets/collector.py diff --git a/aprsd/client.py b/aprsd/client.py index 4991578..6d46954 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -11,7 +11,7 @@ import wrapt from aprsd import exception from aprsd.clients import aprsis, fake, kiss -from aprsd.packets import core, packet_list +from aprsd.packets import collector, core from aprsd.utils import singleton, trace @@ -101,7 +101,8 @@ class Client: self._client.stop() def send(self, packet: core.Packet): - packet_list.PacketList().tx(packet) + """Send a packet to the network.""" + collector.PacketCollector().tx(packet) self.client.send(packet) @wrapt.synchronized(lock) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index f17f5e8..e17499f 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -17,6 +17,7 @@ from rich.console import Console import aprsd from aprsd import cli_helper, client, packets, plugin, threads from aprsd.main import cli +from aprsd.packets import collector as packet_collector from aprsd.packets import log as packet_log from aprsd.stats import collector from aprsd.threads import keep_alive, rx @@ -81,7 +82,7 @@ class APRSDListenThread(rx.APRSDRXThread): # This is the listen only command. self.plugin_manager.run(packet) - packets.PacketList().rx(packet) + packet_collector.PacketCollector().rx(packet) @cli.command() diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index 84a503e..aba0d04 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -11,6 +11,7 @@ import aprsd from aprsd import cli_helper, client, packets from aprsd import conf # noqa : F401 from aprsd.main import cli +from aprsd.packets import collector from aprsd.threads import tx @@ -103,7 +104,7 @@ def send_message( global got_ack, got_response cl = client.factory.create() packet = cl.decode_packet(packet) - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) packet.log("RX") # LOG.debug("Got packet back {}".format(packet)) if isinstance(packet, packets.AckPacket): diff --git a/aprsd/packets/collector.py b/aprsd/packets/collector.py new file mode 100644 index 0000000..d6c6659 --- /dev/null +++ b/aprsd/packets/collector.py @@ -0,0 +1,42 @@ +from typing import Callable, Protocol, runtime_checkable + +from aprsd.packets import core +from aprsd.utils import singleton + + +@runtime_checkable +class PacketMonitor(Protocol): + """Protocol for Monitoring packets in some way.""" + + def rx(self, packet: type[core.Packet]) -> None: + """When we get a packet from the network.""" + ... + + def tx(self, packet: type[core.Packet]) -> None: + """When we send a packet out the network.""" + ... + + +@singleton +class PacketCollector: + def __init__(self): + self.monitors: list[Callable] = [] + + def register(self, monitor: Callable) -> None: + self.monitors.append(monitor) + + def rx(self, packet: type[core.Packet]) -> None: + for name in self.monitors: + cls = name() + if isinstance(cls, PacketMonitor): + cls.rx(packet) + else: + raise TypeError(f"Monitor {name} is not a PacketMonitor") + + def tx(self, packet: type[core.Packet]) -> None: + for name in self.monitors: + cls = name() + if isinstance(cls, PacketMonitor): + cls.tx(packet) + else: + raise TypeError(f"Monitor {name} is not a PacketMonitor") diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 1ed2ec6..cb80452 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -5,7 +5,7 @@ import threading from oslo_config import cfg import wrapt -from aprsd.packets import seen_list +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -14,6 +14,7 @@ LOG = logging.getLogger("APRSD") class PacketList(objectstore.ObjectStoreMixin): + """Class to keep track of the packets we tx/rx.""" _instance = None lock = threading.Lock() _total_rx: int = 0 @@ -31,7 +32,7 @@ class PacketList(objectstore.ObjectStoreMixin): return cls._instance @wrapt.synchronized(lock) - def rx(self, packet): + def rx(self, packet: type[core.Packet]): """Add a packet that was received.""" self._total_rx += 1 self._add(packet) @@ -39,10 +40,9 @@ class PacketList(objectstore.ObjectStoreMixin): if not ptype in self.data["types"]: self.data["types"][ptype] = {"tx": 0, "rx": 0} self.data["types"][ptype]["rx"] += 1 - seen_list.SeenList().update_seen(packet) @wrapt.synchronized(lock) - def tx(self, packet): + def tx(self, packet: type[core.Packet]): """Add a packet that was received.""" self._total_tx += 1 self._add(packet) @@ -50,7 +50,6 @@ class PacketList(objectstore.ObjectStoreMixin): if not ptype in self.data["types"]: self.data["types"][ptype] = {"tx": 0, "rx": 0} self.data["types"][ptype]["tx"] += 1 - seen_list.SeenList().update_seen(packet) @wrapt.synchronized(lock) def add(self, packet): @@ -105,3 +104,9 @@ class PacketList(objectstore.ObjectStoreMixin): "packets": pkts, } return stats + + +# Now register the PacketList with the collector +# every packet we RX and TX goes through the collector +# for processing for whatever reason is needed. +collector.PacketCollector().register(PacketList) diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py index 88ab0fd..e72324c 100644 --- a/aprsd/packets/seen_list.py +++ b/aprsd/packets/seen_list.py @@ -5,6 +5,7 @@ import threading from oslo_config import cfg import wrapt +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -22,7 +23,6 @@ class SeenList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance._init_store() cls._instance.data = {} return cls._instance @@ -37,7 +37,8 @@ class SeenList(objectstore.ObjectStoreMixin): return self.data.copy() @wrapt.synchronized(lock) - def update_seen(self, packet): + def rx(self, packet: type[core.Packet]): + """When we get a packet from the network, update the seen list.""" callsign = None if packet.from_call: callsign = packet.from_call @@ -51,3 +52,11 @@ class SeenList(objectstore.ObjectStoreMixin): } self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["count"] += 1 + + def tx(self, packet: type[core.Packet]): + """We don't care about TX packets.""" + + +# Register with the packet collector so we can process the packet +# when we get it off the client (network) +collector.PacketCollector().register(SeenList) diff --git a/aprsd/packets/watch_list.py b/aprsd/packets/watch_list.py index ad8f222..174255a 100644 --- a/aprsd/packets/watch_list.py +++ b/aprsd/packets/watch_list.py @@ -6,6 +6,7 @@ from oslo_config import cfg import wrapt from aprsd import utils +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -23,24 +24,22 @@ class WatchList(objectstore.ObjectStoreMixin): def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance._init_store() - cls._instance.data = {} + cls._instance._update_from_conf() return cls._instance - def __init__(self, config=None): - CONF.watch_list.packet_keep_count - - if CONF.watch_list.callsigns: + def _update_from_conf(self, config=None): + if CONF.watch_list.enabled and CONF.watch_list.callsigns: for callsign in CONF.watch_list.callsigns: call = callsign.replace("*", "") # FIXME(waboring) - we should fetch the last time we saw # a beacon from a callsign or some other mechanism to find # last time a message was seen by aprs-is. For now this # is all we can do. - self.data[call] = { - "last": None, - "packet": None, - } + if call not in self.data: + self.data[call] = { + "last": None, + "packet": None, + } @wrapt.synchronized(lock) def stats(self, serializable=False) -> dict: @@ -61,15 +60,19 @@ class WatchList(objectstore.ObjectStoreMixin): return callsign in self.data @wrapt.synchronized(lock) - def update_seen(self, packet): - if packet.addresse: - callsign = packet.addresse - else: - callsign = packet.from_call + def rx(self, packet: type[core.Packet]) -> None: + """Track when we got a packet from the network.""" + LOG.error(f"WatchList RX {packet}") + callsign = packet.from_call + if self.callsign_in_watchlist(callsign): + LOG.error(f"Updating WatchList RX {callsign}") self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["packet"] = packet + def tx(self, packet: type[core.Packet]) -> None: + """We don't care about TX packets.""" + def last_seen(self, callsign): if self.callsign_in_watchlist(callsign): return self.data[callsign]["last"] @@ -113,3 +116,6 @@ class WatchList(objectstore.ObjectStoreMixin): return False else: return False + + +collector.PacketCollector().register(WatchList) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index ab861d2..7a59865 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -7,6 +7,7 @@ import aprslib from oslo_config import cfg from aprsd import client, packets, plugin +from aprsd.packets import collector from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx @@ -116,7 +117,7 @@ class APRSDDupeRXThread(APRSDRXThread): if not found: # We haven't seen this packet before, so we process it. - pkt_list.rx(packet) + collector.PacketCollector().rx(packet) self.packet_queue.put(packet) elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout: # If the packet came in within N seconds of the @@ -127,7 +128,7 @@ class APRSDDupeRXThread(APRSDRXThread): f"Packet {packet.from_call}:{packet.msgNo} already tracked " f"but older than {CONF.packet_dupe_timeout} seconds. processing.", ) - pkt_list.rx(packet) + collector.PacketCollector().rx(packet) self.packet_queue.put(packet) @@ -154,7 +155,7 @@ class APRSDProcessPacketThread(APRSDThread): """We got an ack for a message, no need to resend it.""" ack_num = packet.msgNo LOG.debug(f"Got ack for message {ack_num}") - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) @@ -162,7 +163,7 @@ class APRSDProcessPacketThread(APRSDThread): """We got an ack embedded in a packet.""" ack_num = packet.ackMsgNo LOG.debug(f"Got PiggyBackAck for message {ack_num}") - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) @@ -170,7 +171,7 @@ class APRSDProcessPacketThread(APRSDThread): """We got a reject message for a packet. Stop sending the message.""" ack_num = packet.msgNo LOG.debug(f"Got REJECT for message {ack_num}") - packets.PacketList().rx(packet) + collector.PacketCollector().rx(packet) pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index a022373..7637fc2 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -70,6 +70,7 @@ class ObjectStoreMixin: """Save any queued to disk?""" if not CONF.enable_save: return + self._init_store() save_filename = self._save_filename() if len(self) > 0: LOG.info(