1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-25 17:38:44 -05:00

Compare commits

..

No commits in common. "a656d9326333c79482d0797f676a6e865d49541c" and "4542c0a643618940ea15468619cec7737c50d309" have entirely different histories.

13 changed files with 54 additions and 131 deletions

View File

@ -11,7 +11,7 @@ import wrapt
from aprsd import exception
from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import collector, core
from aprsd.packets import core, packet_list
from aprsd.utils import singleton, trace
@ -101,8 +101,7 @@ class Client:
self._client.stop()
def send(self, packet: core.Packet):
"""Send a packet to the network."""
collector.PacketCollector().tx(packet)
packet_list.PacketList().tx(packet)
self.client.send(packet)
@wrapt.synchronized(lock)

View File

@ -17,7 +17,6 @@ from rich.console import Console
import aprsd
from aprsd import cli_helper, client, packets, plugin, threads
from aprsd.main import cli
from aprsd.packets import collector as packet_collector
from aprsd.packets import log as packet_log
from aprsd.stats import collector
from aprsd.threads import keep_alive, rx
@ -82,7 +81,7 @@ class APRSDListenThread(rx.APRSDRXThread):
# This is the listen only command.
self.plugin_manager.run(packet)
packet_collector.PacketCollector().rx(packet)
packets.PacketList().rx(packet)
@cli.command()

View File

@ -11,7 +11,6 @@ import aprsd
from aprsd import cli_helper, client, packets
from aprsd import conf # noqa : F401
from aprsd.main import cli
from aprsd.packets import collector
from aprsd.threads import tx
@ -104,7 +103,7 @@ def send_message(
global got_ack, got_response
cl = client.factory.create()
packet = cl.decode_packet(packet)
collector.PacketCollector().rx(packet)
packets.PacketList().rx(packet)
packet.log("RX")
# LOG.debug("Got packet back {}".format(packet))
if isinstance(packet, packets.AckPacket):

View File

@ -1,42 +0,0 @@
from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
@runtime_checkable
class PacketMonitor(Protocol):
"""Protocol for Monitoring packets in some way."""
def rx(self, packet: type[core.Packet]) -> None:
"""When we get a packet from the network."""
...
def tx(self, packet: type[core.Packet]) -> None:
"""When we send a packet out the network."""
...
@singleton
class PacketCollector:
def __init__(self):
self.monitors: list[Callable] = []
def register(self, monitor: Callable) -> None:
self.monitors.append(monitor)
def rx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.rx(packet)
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")
def tx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.tx(packet)
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")

View File

@ -89,7 +89,6 @@ class Packet:
addresse: Optional[str] = field(default=None)
format: Optional[str] = field(default=None)
msgNo: Optional[str] = field(default=None) # noqa: N815
ackMsgNo: Optional[str] = field(default=None) # noqa: N815
packet_type: Optional[str] = field(default=None)
timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False)
# Holds the raw text string to be sent over the wire

View File

@ -5,7 +5,7 @@ import threading
from oslo_config import cfg
import wrapt
from aprsd.packets import collector, core
from aprsd.packets import seen_list
from aprsd.utils import objectstore
@ -14,7 +14,6 @@ LOG = logging.getLogger("APRSD")
class PacketList(objectstore.ObjectStoreMixin):
"""Class to keep track of the packets we tx/rx."""
_instance = None
lock = threading.Lock()
_total_rx: int = 0
@ -32,7 +31,7 @@ class PacketList(objectstore.ObjectStoreMixin):
return cls._instance
@wrapt.synchronized(lock)
def rx(self, packet: type[core.Packet]):
def rx(self, packet):
"""Add a packet that was received."""
self._total_rx += 1
self._add(packet)
@ -40,9 +39,10 @@ class PacketList(objectstore.ObjectStoreMixin):
if not ptype in self.data["types"]:
self.data["types"][ptype] = {"tx": 0, "rx": 0}
self.data["types"][ptype]["rx"] += 1
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def tx(self, packet: type[core.Packet]):
def tx(self, packet):
"""Add a packet that was received."""
self._total_tx += 1
self._add(packet)
@ -50,6 +50,7 @@ class PacketList(objectstore.ObjectStoreMixin):
if not ptype in self.data["types"]:
self.data["types"][ptype] = {"tx": 0, "rx": 0}
self.data["types"][ptype]["tx"] += 1
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def add(self, packet):
@ -66,6 +67,10 @@ class PacketList(objectstore.ObjectStoreMixin):
def copy(self):
return self.data.copy()
@wrapt.synchronized(lock)
def set_maxlen(self, maxlen):
self.maxlen = maxlen
@wrapt.synchronized(lock)
def find(self, packet):
return self.data["packets"][packet.key]
@ -104,9 +109,3 @@ class PacketList(objectstore.ObjectStoreMixin):
"packets": pkts,
}
return stats
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketList)

View File

@ -5,7 +5,6 @@ import threading
from oslo_config import cfg
import wrapt
from aprsd.packets import collector, core
from aprsd.utils import objectstore
@ -23,6 +22,7 @@ class SeenList(objectstore.ObjectStoreMixin):
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
@ -37,8 +37,7 @@ class SeenList(objectstore.ObjectStoreMixin):
return self.data.copy()
@wrapt.synchronized(lock)
def rx(self, packet: type[core.Packet]):
"""When we get a packet from the network, update the seen list."""
def update_seen(self, packet):
callsign = None
if packet.from_call:
callsign = packet.from_call
@ -52,11 +51,3 @@ class SeenList(objectstore.ObjectStoreMixin):
}
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["count"] += 1
def tx(self, packet: type[core.Packet]):
"""We don't care about TX packets."""
# Register with the packet collector so we can process the packet
# when we get it off the client (network)
collector.PacketCollector().register(SeenList)

View File

@ -6,7 +6,6 @@ from oslo_config import cfg
import wrapt
from aprsd import utils
from aprsd.packets import collector, core
from aprsd.utils import objectstore
@ -24,18 +23,20 @@ class WatchList(objectstore.ObjectStoreMixin):
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._update_from_conf()
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
def _update_from_conf(self, config=None):
if CONF.watch_list.enabled and CONF.watch_list.callsigns:
def __init__(self, config=None):
CONF.watch_list.packet_keep_count
if CONF.watch_list.callsigns:
for callsign in CONF.watch_list.callsigns:
call = callsign.replace("*", "")
# FIXME(waboring) - we should fetch the last time we saw
# a beacon from a callsign or some other mechanism to find
# last time a message was seen by aprs-is. For now this
# is all we can do.
if call not in self.data:
self.data[call] = {
"last": None,
"packet": None,
@ -60,19 +61,15 @@ class WatchList(objectstore.ObjectStoreMixin):
return callsign in self.data
@wrapt.synchronized(lock)
def rx(self, packet: type[core.Packet]) -> None:
"""Track when we got a packet from the network."""
LOG.error(f"WatchList RX {packet}")
def update_seen(self, packet):
if packet.addresse:
callsign = packet.addresse
else:
callsign = packet.from_call
if self.callsign_in_watchlist(callsign):
LOG.error(f"Updating WatchList RX {callsign}")
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["packet"] = packet
def tx(self, packet: type[core.Packet]) -> None:
"""We don't care about TX packets."""
def last_seen(self, callsign):
if self.callsign_in_watchlist(callsign):
return self.data[callsign]["last"]
@ -116,6 +113,3 @@ class WatchList(objectstore.ObjectStoreMixin):
return False
else:
return False
collector.PacketCollector().register(WatchList)

View File

@ -9,12 +9,12 @@ from aprsd.threads import aprsd
# Create the collector and register all the objects
# that APRSD has that implement the stats protocol
stats_collector = collector.Collector()
stats_collector.register_producer(app.APRSDStats)
stats_collector.register_producer(packet_list.PacketList)
stats_collector.register_producer(watch_list.WatchList)
stats_collector.register_producer(tracker.PacketTrack)
stats_collector.register_producer(plugin.PluginManager)
stats_collector.register_producer(aprsd.APRSDThreadList)
stats_collector.register_producer(email.EmailStats)
stats_collector.register_producer(aprs_client.APRSClientStats)
stats_collector.register_producer(seen_list.SeenList)
stats_collector.register_producer(app.APRSDStats())
stats_collector.register_producer(packet_list.PacketList())
stats_collector.register_producer(watch_list.WatchList())
stats_collector.register_producer(tracker.PacketTrack())
stats_collector.register_producer(plugin.PluginManager())
stats_collector.register_producer(aprsd.APRSDThreadList())
stats_collector.register_producer(email.EmailStats())
stats_collector.register_producer(aprs_client.APRSClientStats())
stats_collector.register_producer(seen_list.SeenList())

View File

@ -1,9 +1,8 @@
from typing import Callable, Protocol, runtime_checkable
from typing import Protocol
from aprsd.utils import singleton
@runtime_checkable
class StatsProducer(Protocol):
"""The StatsProducer protocol is used to define the interface for collecting stats."""
def stats(self, serializeable=False) -> dict:
@ -15,17 +14,17 @@ class StatsProducer(Protocol):
class Collector:
"""The Collector class is used to collect stats from multiple StatsProducer instances."""
def __init__(self):
self.producers: list[Callable] = []
self.producers: dict[str, StatsProducer] = {}
def collect(self, serializable=False) -> dict:
stats = {}
for name in self.producers:
cls = name()
if isinstance(cls, StatsProducer):
stats[cls.__class__.__name__] = cls.stats(serializable=serializable)
else:
raise TypeError(f"{cls} is not an instance of StatsProducer")
for name, producer in self.producers.items():
# No need to put in empty stats
tmp_stats = producer.stats(serializable=serializable)
if tmp_stats:
stats[name] = tmp_stats
return stats
def register_producer(self, producer_name: Callable):
self.producers.append(producer_name)
def register_producer(self, producer: StatsProducer):
name = producer.__class__.__name__
self.producers[name] = producer

View File

@ -33,7 +33,7 @@ class KeepAliveThread(APRSDThread):
if "EmailStats" in stats_json:
email_stats = stats_json["EmailStats"]
if email_stats.get("last_check_time"):
if email_stats["last_check_time"]:
email_thread_time = utils.strfdelta(now - email_stats["last_check_time"])
else:
email_thread_time = "N/A"

View File

@ -7,7 +7,6 @@ import aprslib
from oslo_config import cfg
from aprsd import client, packets, plugin
from aprsd.packets import collector
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
@ -117,7 +116,7 @@ class APRSDDupeRXThread(APRSDRXThread):
if not found:
# We haven't seen this packet before, so we process it.
collector.PacketCollector().rx(packet)
pkt_list.rx(packet)
self.packet_queue.put(packet)
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the
@ -128,7 +127,7 @@ class APRSDDupeRXThread(APRSDRXThread):
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
)
collector.PacketCollector().rx(packet)
pkt_list.rx(packet)
self.packet_queue.put(packet)
@ -155,15 +154,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got an ack for a message, no need to resend it."""
ack_num = packet.msgNo
LOG.debug(f"Got ack for message {ack_num}")
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def process_piggyback_ack(self, packet):
"""We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
collector.PacketCollector().rx(packet)
packets.PacketList().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
@ -171,7 +162,7 @@ class APRSDProcessPacketThread(APRSDThread):
"""We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo
LOG.debug(f"Got REJECT for message {ack_num}")
collector.PacketCollector().rx(packet)
packets.PacketList().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
@ -209,10 +200,6 @@ class APRSDProcessPacketThread(APRSDThread):
):
self.process_reject_packet(packet)
else:
if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
# we got an ack embedded in this packet
# we need to handle the ack
self.process_piggyback_ack(packet)
# Only ack messages that were sent directly to us
if isinstance(packet, packets.MessagePacket):
if to_call and to_call.lower() == our_call:

View File

@ -70,7 +70,6 @@ class ObjectStoreMixin:
"""Save any queued to disk?"""
if not CONF.enable_save:
return
self._init_store()
save_filename = self._save_filename()
if len(self) > 0:
LOG.info(