1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-25 17:38:44 -05:00

Compare commits

..

No commits in common. "a656d9326333c79482d0797f676a6e865d49541c" and "4542c0a643618940ea15468619cec7737c50d309" have entirely different histories.

13 changed files with 54 additions and 131 deletions

View File

@ -11,7 +11,7 @@ import wrapt
from aprsd import exception from aprsd import exception
from aprsd.clients import aprsis, fake, kiss from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import collector, core from aprsd.packets import core, packet_list
from aprsd.utils import singleton, trace from aprsd.utils import singleton, trace
@ -101,8 +101,7 @@ class Client:
self._client.stop() self._client.stop()
def send(self, packet: core.Packet): def send(self, packet: core.Packet):
"""Send a packet to the network.""" packet_list.PacketList().tx(packet)
collector.PacketCollector().tx(packet)
self.client.send(packet) self.client.send(packet)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)

View File

@ -17,7 +17,6 @@ from rich.console import Console
import aprsd import aprsd
from aprsd import cli_helper, client, packets, plugin, threads from aprsd import cli_helper, client, packets, plugin, threads
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector as packet_collector
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.stats import collector from aprsd.stats import collector
from aprsd.threads import keep_alive, rx from aprsd.threads import keep_alive, rx
@ -82,7 +81,7 @@ class APRSDListenThread(rx.APRSDRXThread):
# This is the listen only command. # This is the listen only command.
self.plugin_manager.run(packet) self.plugin_manager.run(packet)
packet_collector.PacketCollector().rx(packet) packets.PacketList().rx(packet)
@cli.command() @cli.command()

View File

@ -11,7 +11,6 @@ import aprsd
from aprsd import cli_helper, client, packets from aprsd import cli_helper, client, packets
from aprsd import conf # noqa : F401 from aprsd import conf # noqa : F401
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector
from aprsd.threads import tx from aprsd.threads import tx
@ -104,7 +103,7 @@ def send_message(
global got_ack, got_response global got_ack, got_response
cl = client.factory.create() cl = client.factory.create()
packet = cl.decode_packet(packet) packet = cl.decode_packet(packet)
collector.PacketCollector().rx(packet) packets.PacketList().rx(packet)
packet.log("RX") packet.log("RX")
# LOG.debug("Got packet back {}".format(packet)) # LOG.debug("Got packet back {}".format(packet))
if isinstance(packet, packets.AckPacket): if isinstance(packet, packets.AckPacket):

View File

@ -1,42 +0,0 @@
from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
@runtime_checkable
class PacketMonitor(Protocol):
"""Protocol for Monitoring packets in some way."""
def rx(self, packet: type[core.Packet]) -> None:
"""When we get a packet from the network."""
...
def tx(self, packet: type[core.Packet]) -> None:
"""When we send a packet out the network."""
...
@singleton
class PacketCollector:
def __init__(self):
self.monitors: list[Callable] = []
def register(self, monitor: Callable) -> None:
self.monitors.append(monitor)
def rx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.rx(packet)
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")
def tx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.tx(packet)
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")

View File

@ -89,7 +89,6 @@ class Packet:
addresse: Optional[str] = field(default=None) addresse: Optional[str] = field(default=None)
format: Optional[str] = field(default=None) format: Optional[str] = field(default=None)
msgNo: Optional[str] = field(default=None) # noqa: N815 msgNo: Optional[str] = field(default=None) # noqa: N815
ackMsgNo: Optional[str] = field(default=None) # noqa: N815
packet_type: Optional[str] = field(default=None) packet_type: Optional[str] = field(default=None)
timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False) timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False)
# Holds the raw text string to be sent over the wire # Holds the raw text string to be sent over the wire

View File

@ -5,7 +5,7 @@ import threading
from oslo_config import cfg from oslo_config import cfg
import wrapt import wrapt
from aprsd.packets import collector, core from aprsd.packets import seen_list
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -14,7 +14,6 @@ LOG = logging.getLogger("APRSD")
class PacketList(objectstore.ObjectStoreMixin): class PacketList(objectstore.ObjectStoreMixin):
"""Class to keep track of the packets we tx/rx."""
_instance = None _instance = None
lock = threading.Lock() lock = threading.Lock()
_total_rx: int = 0 _total_rx: int = 0
@ -32,7 +31,7 @@ class PacketList(objectstore.ObjectStoreMixin):
return cls._instance return cls._instance
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def rx(self, packet: type[core.Packet]): def rx(self, packet):
"""Add a packet that was received.""" """Add a packet that was received."""
self._total_rx += 1 self._total_rx += 1
self._add(packet) self._add(packet)
@ -40,9 +39,10 @@ class PacketList(objectstore.ObjectStoreMixin):
if not ptype in self.data["types"]: if not ptype in self.data["types"]:
self.data["types"][ptype] = {"tx": 0, "rx": 0} self.data["types"][ptype] = {"tx": 0, "rx": 0}
self.data["types"][ptype]["rx"] += 1 self.data["types"][ptype]["rx"] += 1
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def tx(self, packet: type[core.Packet]): def tx(self, packet):
"""Add a packet that was received.""" """Add a packet that was received."""
self._total_tx += 1 self._total_tx += 1
self._add(packet) self._add(packet)
@ -50,6 +50,7 @@ class PacketList(objectstore.ObjectStoreMixin):
if not ptype in self.data["types"]: if not ptype in self.data["types"]:
self.data["types"][ptype] = {"tx": 0, "rx": 0} self.data["types"][ptype] = {"tx": 0, "rx": 0}
self.data["types"][ptype]["tx"] += 1 self.data["types"][ptype]["tx"] += 1
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def add(self, packet): def add(self, packet):
@ -66,6 +67,10 @@ class PacketList(objectstore.ObjectStoreMixin):
def copy(self): def copy(self):
return self.data.copy() return self.data.copy()
@wrapt.synchronized(lock)
def set_maxlen(self, maxlen):
self.maxlen = maxlen
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def find(self, packet): def find(self, packet):
return self.data["packets"][packet.key] return self.data["packets"][packet.key]
@ -104,9 +109,3 @@ class PacketList(objectstore.ObjectStoreMixin):
"packets": pkts, "packets": pkts,
} }
return stats return stats
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketList)

View File

@ -5,7 +5,6 @@ import threading
from oslo_config import cfg from oslo_config import cfg
import wrapt import wrapt
from aprsd.packets import collector, core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -23,6 +22,7 @@ class SeenList(objectstore.ObjectStoreMixin):
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance._init_store()
cls._instance.data = {} cls._instance.data = {}
return cls._instance return cls._instance
@ -37,8 +37,7 @@ class SeenList(objectstore.ObjectStoreMixin):
return self.data.copy() return self.data.copy()
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def rx(self, packet: type[core.Packet]): def update_seen(self, packet):
"""When we get a packet from the network, update the seen list."""
callsign = None callsign = None
if packet.from_call: if packet.from_call:
callsign = packet.from_call callsign = packet.from_call
@ -52,11 +51,3 @@ class SeenList(objectstore.ObjectStoreMixin):
} }
self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["count"] += 1 self.data[callsign]["count"] += 1
def tx(self, packet: type[core.Packet]):
"""We don't care about TX packets."""
# Register with the packet collector so we can process the packet
# when we get it off the client (network)
collector.PacketCollector().register(SeenList)

View File

@ -6,7 +6,6 @@ from oslo_config import cfg
import wrapt import wrapt
from aprsd import utils from aprsd import utils
from aprsd.packets import collector, core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -24,18 +23,20 @@ class WatchList(objectstore.ObjectStoreMixin):
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance._update_from_conf() cls._instance._init_store()
cls._instance.data = {}
return cls._instance return cls._instance
def _update_from_conf(self, config=None): def __init__(self, config=None):
if CONF.watch_list.enabled and CONF.watch_list.callsigns: CONF.watch_list.packet_keep_count
if CONF.watch_list.callsigns:
for callsign in CONF.watch_list.callsigns: for callsign in CONF.watch_list.callsigns:
call = callsign.replace("*", "") call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw # FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find # a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this # last time a message was seen by aprs-is. For now this
# is all we can do. # is all we can do.
if call not in self.data:
self.data[call] = { self.data[call] = {
"last": None, "last": None,
"packet": None, "packet": None,
@ -60,19 +61,15 @@ class WatchList(objectstore.ObjectStoreMixin):
return callsign in self.data return callsign in self.data
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def rx(self, packet: type[core.Packet]) -> None: def update_seen(self, packet):
"""Track when we got a packet from the network.""" if packet.addresse:
LOG.error(f"WatchList RX {packet}") callsign = packet.addresse
else:
callsign = packet.from_call callsign = packet.from_call
if self.callsign_in_watchlist(callsign): if self.callsign_in_watchlist(callsign):
LOG.error(f"Updating WatchList RX {callsign}")
self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["packet"] = packet self.data[callsign]["packet"] = packet
def tx(self, packet: type[core.Packet]) -> None:
"""We don't care about TX packets."""
def last_seen(self, callsign): def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign): if self.callsign_in_watchlist(callsign):
return self.data[callsign]["last"] return self.data[callsign]["last"]
@ -116,6 +113,3 @@ class WatchList(objectstore.ObjectStoreMixin):
return False return False
else: else:
return False return False
collector.PacketCollector().register(WatchList)

View File

@ -9,12 +9,12 @@ from aprsd.threads import aprsd
# Create the collector and register all the objects # Create the collector and register all the objects
# that APRSD has that implement the stats protocol # that APRSD has that implement the stats protocol
stats_collector = collector.Collector() stats_collector = collector.Collector()
stats_collector.register_producer(app.APRSDStats) stats_collector.register_producer(app.APRSDStats())
stats_collector.register_producer(packet_list.PacketList) stats_collector.register_producer(packet_list.PacketList())
stats_collector.register_producer(watch_list.WatchList) stats_collector.register_producer(watch_list.WatchList())
stats_collector.register_producer(tracker.PacketTrack) stats_collector.register_producer(tracker.PacketTrack())
stats_collector.register_producer(plugin.PluginManager) stats_collector.register_producer(plugin.PluginManager())
stats_collector.register_producer(aprsd.APRSDThreadList) stats_collector.register_producer(aprsd.APRSDThreadList())
stats_collector.register_producer(email.EmailStats) stats_collector.register_producer(email.EmailStats())
stats_collector.register_producer(aprs_client.APRSClientStats) stats_collector.register_producer(aprs_client.APRSClientStats())
stats_collector.register_producer(seen_list.SeenList) stats_collector.register_producer(seen_list.SeenList())

View File

@ -1,9 +1,8 @@
from typing import Callable, Protocol, runtime_checkable from typing import Protocol
from aprsd.utils import singleton from aprsd.utils import singleton
@runtime_checkable
class StatsProducer(Protocol): class StatsProducer(Protocol):
"""The StatsProducer protocol is used to define the interface for collecting stats.""" """The StatsProducer protocol is used to define the interface for collecting stats."""
def stats(self, serializeable=False) -> dict: def stats(self, serializeable=False) -> dict:
@ -15,17 +14,17 @@ class StatsProducer(Protocol):
class Collector: class Collector:
"""The Collector class is used to collect stats from multiple StatsProducer instances.""" """The Collector class is used to collect stats from multiple StatsProducer instances."""
def __init__(self): def __init__(self):
self.producers: list[Callable] = [] self.producers: dict[str, StatsProducer] = {}
def collect(self, serializable=False) -> dict: def collect(self, serializable=False) -> dict:
stats = {} stats = {}
for name in self.producers: for name, producer in self.producers.items():
cls = name() # No need to put in empty stats
if isinstance(cls, StatsProducer): tmp_stats = producer.stats(serializable=serializable)
stats[cls.__class__.__name__] = cls.stats(serializable=serializable) if tmp_stats:
else: stats[name] = tmp_stats
raise TypeError(f"{cls} is not an instance of StatsProducer")
return stats return stats
def register_producer(self, producer_name: Callable): def register_producer(self, producer: StatsProducer):
self.producers.append(producer_name) name = producer.__class__.__name__
self.producers[name] = producer

View File

@ -33,7 +33,7 @@ class KeepAliveThread(APRSDThread):
if "EmailStats" in stats_json: if "EmailStats" in stats_json:
email_stats = stats_json["EmailStats"] email_stats = stats_json["EmailStats"]
if email_stats.get("last_check_time"): if email_stats["last_check_time"]:
email_thread_time = utils.strfdelta(now - email_stats["last_check_time"]) email_thread_time = utils.strfdelta(now - email_stats["last_check_time"])
else: else:
email_thread_time = "N/A" email_thread_time = "N/A"

View File

@ -7,7 +7,6 @@ import aprslib
from oslo_config import cfg from oslo_config import cfg
from aprsd import client, packets, plugin from aprsd import client, packets, plugin
from aprsd.packets import collector
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx from aprsd.threads import APRSDThread, tx
@ -117,7 +116,7 @@ class APRSDDupeRXThread(APRSDRXThread):
if not found: if not found:
# We haven't seen this packet before, so we process it. # We haven't seen this packet before, so we process it.
collector.PacketCollector().rx(packet) pkt_list.rx(packet)
self.packet_queue.put(packet) self.packet_queue.put(packet)
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout: elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the # If the packet came in within N seconds of the
@ -128,7 +127,7 @@ class APRSDDupeRXThread(APRSDRXThread):
f"Packet {packet.from_call}:{packet.msgNo} already tracked " f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.", f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
) )
collector.PacketCollector().rx(packet) pkt_list.rx(packet)
self.packet_queue.put(packet) self.packet_queue.put(packet)
@ -155,15 +154,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got an ack for a message, no need to resend it.""" """We got an ack for a message, no need to resend it."""
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.debug(f"Got ack for message {ack_num}") LOG.debug(f"Got ack for message {ack_num}")
collector.PacketCollector().rx(packet) packets.PacketList().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def process_piggyback_ack(self, packet):
"""We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack() pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num) pkt_tracker.remove(ack_num)
@ -171,7 +162,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got a reject message for a packet. Stop sending the message.""" """We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.debug(f"Got REJECT for message {ack_num}") LOG.debug(f"Got REJECT for message {ack_num}")
collector.PacketCollector().rx(packet) packets.PacketList().rx(packet)
pkt_tracker = packets.PacketTrack() pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num) pkt_tracker.remove(ack_num)
@ -209,10 +200,6 @@ class APRSDProcessPacketThread(APRSDThread):
): ):
self.process_reject_packet(packet) self.process_reject_packet(packet)
else: else:
if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
# we got an ack embedded in this packet
# we need to handle the ack
self.process_piggyback_ack(packet)
# Only ack messages that were sent directly to us # Only ack messages that were sent directly to us
if isinstance(packet, packets.MessagePacket): if isinstance(packet, packets.MessagePacket):
if to_call and to_call.lower() == our_call: if to_call and to_call.lower() == our_call:

View File

@ -70,7 +70,6 @@ class ObjectStoreMixin:
"""Save any queued to disk?""" """Save any queued to disk?"""
if not CONF.enable_save: if not CONF.enable_save:
return return
self._init_store()
save_filename = self._save_filename() save_filename = self._save_filename()
if len(self) > 0: if len(self) > 0:
LOG.info( LOG.info(