Added new PacketCollector

this patch adds the new PacketCollector class.
It's a single point for collecting information about
packets sent and recieved from the APRS client.
Basically instead of having the packetlist call the seen list
when we get a packet, we simply call the PacketCollector.rx(),
which in turn calls each registered PacketMonitor class.

This allows us to decouple the packet stats like classses inside
of APRSD.  More importantly, it allows extensions to append their
own PacketMonitor class to the chain without modifying ARPSD.
This commit is contained in:
Hemna 2024-04-16 14:34:14 -04:00
parent cb0cfeea0b
commit a656d93263
9 changed files with 98 additions and 31 deletions

View File

@ -11,7 +11,7 @@ import wrapt
from aprsd import exception
from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import core, packet_list
from aprsd.packets import collector, core
from aprsd.utils import singleton, trace
@ -101,7 +101,8 @@ class Client:
self._client.stop()
def send(self, packet: core.Packet):
packet_list.PacketList().tx(packet)
"""Send a packet to the network."""
collector.PacketCollector().tx(packet)
self.client.send(packet)
@wrapt.synchronized(lock)

View File

@ -17,6 +17,7 @@ from rich.console import Console
import aprsd
from aprsd import cli_helper, client, packets, plugin, threads
from aprsd.main import cli
from aprsd.packets import collector as packet_collector
from aprsd.packets import log as packet_log
from aprsd.stats import collector
from aprsd.threads import keep_alive, rx
@ -81,7 +82,7 @@ class APRSDListenThread(rx.APRSDRXThread):
# This is the listen only command.
self.plugin_manager.run(packet)
packets.PacketList().rx(packet)
packet_collector.PacketCollector().rx(packet)
@cli.command()

View File

@ -11,6 +11,7 @@ import aprsd
from aprsd import cli_helper, client, packets
from aprsd import conf # noqa : F401
from aprsd.main import cli
from aprsd.packets import collector
from aprsd.threads import tx
@ -103,7 +104,7 @@ def send_message(
global got_ack, got_response
cl = client.factory.create()
packet = cl.decode_packet(packet)
packets.PacketList().rx(packet)
collector.PacketCollector().rx(packet)
packet.log("RX")
# LOG.debug("Got packet back {}".format(packet))
if isinstance(packet, packets.AckPacket):

View File

@ -0,0 +1,42 @@
from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
@runtime_checkable
class PacketMonitor(Protocol):
"""Protocol for Monitoring packets in some way."""
def rx(self, packet: type[core.Packet]) -> None:
"""When we get a packet from the network."""
...
def tx(self, packet: type[core.Packet]) -> None:
"""When we send a packet out the network."""
...
@singleton
class PacketCollector:
def __init__(self):
self.monitors: list[Callable] = []
def register(self, monitor: Callable) -> None:
self.monitors.append(monitor)
def rx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.rx(packet)
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")
def tx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.tx(packet)
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")

View File

@ -5,7 +5,7 @@ import threading
from oslo_config import cfg
import wrapt
from aprsd.packets import seen_list
from aprsd.packets import collector, core
from aprsd.utils import objectstore
@ -14,6 +14,7 @@ LOG = logging.getLogger("APRSD")
class PacketList(objectstore.ObjectStoreMixin):
"""Class to keep track of the packets we tx/rx."""
_instance = None
lock = threading.Lock()
_total_rx: int = 0
@ -31,7 +32,7 @@ class PacketList(objectstore.ObjectStoreMixin):
return cls._instance
@wrapt.synchronized(lock)
def rx(self, packet):
def rx(self, packet: type[core.Packet]):
"""Add a packet that was received."""
self._total_rx += 1
self._add(packet)
@ -39,10 +40,9 @@ class PacketList(objectstore.ObjectStoreMixin):
if not ptype in self.data["types"]:
self.data["types"][ptype] = {"tx": 0, "rx": 0}
self.data["types"][ptype]["rx"] += 1
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def tx(self, packet):
def tx(self, packet: type[core.Packet]):
"""Add a packet that was received."""
self._total_tx += 1
self._add(packet)
@ -50,7 +50,6 @@ class PacketList(objectstore.ObjectStoreMixin):
if not ptype in self.data["types"]:
self.data["types"][ptype] = {"tx": 0, "rx": 0}
self.data["types"][ptype]["tx"] += 1
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def add(self, packet):
@ -105,3 +104,9 @@ class PacketList(objectstore.ObjectStoreMixin):
"packets": pkts,
}
return stats
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketList)

View File

@ -5,6 +5,7 @@ import threading
from oslo_config import cfg
import wrapt
from aprsd.packets import collector, core
from aprsd.utils import objectstore
@ -22,7 +23,6 @@ class SeenList(objectstore.ObjectStoreMixin):
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
@ -37,7 +37,8 @@ class SeenList(objectstore.ObjectStoreMixin):
return self.data.copy()
@wrapt.synchronized(lock)
def update_seen(self, packet):
def rx(self, packet: type[core.Packet]):
"""When we get a packet from the network, update the seen list."""
callsign = None
if packet.from_call:
callsign = packet.from_call
@ -51,3 +52,11 @@ class SeenList(objectstore.ObjectStoreMixin):
}
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["count"] += 1
def tx(self, packet: type[core.Packet]):
"""We don't care about TX packets."""
# Register with the packet collector so we can process the packet
# when we get it off the client (network)
collector.PacketCollector().register(SeenList)

View File

@ -6,6 +6,7 @@ from oslo_config import cfg
import wrapt
from aprsd import utils
from aprsd.packets import collector, core
from aprsd.utils import objectstore
@ -23,24 +24,22 @@ class WatchList(objectstore.ObjectStoreMixin):
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_store()
cls._instance.data = {}
cls._instance._update_from_conf()
return cls._instance
def __init__(self, config=None):
CONF.watch_list.packet_keep_count
if CONF.watch_list.callsigns:
def _update_from_conf(self, config=None):
if CONF.watch_list.enabled and CONF.watch_list.callsigns:
for callsign in CONF.watch_list.callsigns:
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
self.data[call] = {
"last": None,
"packet": None,
}
if call not in self.data:
self.data[call] = {
"last": None,
"packet": None,
}
@wrapt.synchronized(lock)
def stats(self, serializable=False) -> dict:
@ -61,15 +60,19 @@ class WatchList(objectstore.ObjectStoreMixin):
return callsign in self.data
@wrapt.synchronized(lock)
def update_seen(self, packet):
if packet.addresse:
callsign = packet.addresse
else:
callsign = packet.from_call
def rx(self, packet: type[core.Packet]) -> None:
"""Track when we got a packet from the network."""
LOG.error(f"WatchList RX {packet}")
callsign = packet.from_call
if self.callsign_in_watchlist(callsign):
LOG.error(f"Updating WatchList RX {callsign}")
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["packet"] = packet
def tx(self, packet: type[core.Packet]) -> None:
"""We don't care about TX packets."""
def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign):
return self.data[callsign]["last"]
@ -113,3 +116,6 @@ class WatchList(objectstore.ObjectStoreMixin):
return False
else:
return False
collector.PacketCollector().register(WatchList)

View File

@ -7,6 +7,7 @@ import aprslib
from oslo_config import cfg
from aprsd import client, packets, plugin
from aprsd.packets import collector
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
@ -116,7 +117,7 @@ class APRSDDupeRXThread(APRSDRXThread):
if not found:
# We haven't seen this packet before, so we process it.
pkt_list.rx(packet)
collector.PacketCollector().rx(packet)
self.packet_queue.put(packet)
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the
@ -127,7 +128,7 @@ class APRSDDupeRXThread(APRSDRXThread):
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
)
pkt_list.rx(packet)
collector.PacketCollector().rx(packet)
self.packet_queue.put(packet)
@ -154,7 +155,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got an ack for a message, no need to resend it."""
ack_num = packet.msgNo
LOG.debug(f"Got ack for message {ack_num}")
packets.PacketList().rx(packet)
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
@ -162,7 +163,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
packets.PacketList().rx(packet)
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
@ -170,7 +171,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo
LOG.debug(f"Got REJECT for message {ack_num}")
packets.PacketList().rx(packet)
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)

View File

@ -70,6 +70,7 @@ class ObjectStoreMixin:
"""Save any queued to disk?"""
if not CONF.enable_save:
return
self._init_store()
save_filename = self._save_filename()
if len(self) > 0:
LOG.info(