mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-25 17:38:44 -05:00
Compare commits
No commits in common. "a656d9326333c79482d0797f676a6e865d49541c" and "4542c0a643618940ea15468619cec7737c50d309" have entirely different histories.
a656d93263
...
4542c0a643
@ -11,7 +11,7 @@ import wrapt
|
||||
|
||||
from aprsd import exception
|
||||
from aprsd.clients import aprsis, fake, kiss
|
||||
from aprsd.packets import collector, core
|
||||
from aprsd.packets import core, packet_list
|
||||
from aprsd.utils import singleton, trace
|
||||
|
||||
|
||||
@ -101,8 +101,7 @@ class Client:
|
||||
self._client.stop()
|
||||
|
||||
def send(self, packet: core.Packet):
|
||||
"""Send a packet to the network."""
|
||||
collector.PacketCollector().tx(packet)
|
||||
packet_list.PacketList().tx(packet)
|
||||
self.client.send(packet)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
|
@ -17,7 +17,6 @@ from rich.console import Console
|
||||
import aprsd
|
||||
from aprsd import cli_helper, client, packets, plugin, threads
|
||||
from aprsd.main import cli
|
||||
from aprsd.packets import collector as packet_collector
|
||||
from aprsd.packets import log as packet_log
|
||||
from aprsd.stats import collector
|
||||
from aprsd.threads import keep_alive, rx
|
||||
@ -82,7 +81,7 @@ class APRSDListenThread(rx.APRSDRXThread):
|
||||
# This is the listen only command.
|
||||
self.plugin_manager.run(packet)
|
||||
|
||||
packet_collector.PacketCollector().rx(packet)
|
||||
packets.PacketList().rx(packet)
|
||||
|
||||
|
||||
@cli.command()
|
||||
|
@ -11,7 +11,6 @@ import aprsd
|
||||
from aprsd import cli_helper, client, packets
|
||||
from aprsd import conf # noqa : F401
|
||||
from aprsd.main import cli
|
||||
from aprsd.packets import collector
|
||||
from aprsd.threads import tx
|
||||
|
||||
|
||||
@ -104,7 +103,7 @@ def send_message(
|
||||
global got_ack, got_response
|
||||
cl = client.factory.create()
|
||||
packet = cl.decode_packet(packet)
|
||||
collector.PacketCollector().rx(packet)
|
||||
packets.PacketList().rx(packet)
|
||||
packet.log("RX")
|
||||
# LOG.debug("Got packet back {}".format(packet))
|
||||
if isinstance(packet, packets.AckPacket):
|
||||
|
@ -1,42 +0,0 @@
|
||||
from typing import Callable, Protocol, runtime_checkable
|
||||
|
||||
from aprsd.packets import core
|
||||
from aprsd.utils import singleton
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class PacketMonitor(Protocol):
|
||||
"""Protocol for Monitoring packets in some way."""
|
||||
|
||||
def rx(self, packet: type[core.Packet]) -> None:
|
||||
"""When we get a packet from the network."""
|
||||
...
|
||||
|
||||
def tx(self, packet: type[core.Packet]) -> None:
|
||||
"""When we send a packet out the network."""
|
||||
...
|
||||
|
||||
|
||||
@singleton
|
||||
class PacketCollector:
|
||||
def __init__(self):
|
||||
self.monitors: list[Callable] = []
|
||||
|
||||
def register(self, monitor: Callable) -> None:
|
||||
self.monitors.append(monitor)
|
||||
|
||||
def rx(self, packet: type[core.Packet]) -> None:
|
||||
for name in self.monitors:
|
||||
cls = name()
|
||||
if isinstance(cls, PacketMonitor):
|
||||
cls.rx(packet)
|
||||
else:
|
||||
raise TypeError(f"Monitor {name} is not a PacketMonitor")
|
||||
|
||||
def tx(self, packet: type[core.Packet]) -> None:
|
||||
for name in self.monitors:
|
||||
cls = name()
|
||||
if isinstance(cls, PacketMonitor):
|
||||
cls.tx(packet)
|
||||
else:
|
||||
raise TypeError(f"Monitor {name} is not a PacketMonitor")
|
@ -89,7 +89,6 @@ class Packet:
|
||||
addresse: Optional[str] = field(default=None)
|
||||
format: Optional[str] = field(default=None)
|
||||
msgNo: Optional[str] = field(default=None) # noqa: N815
|
||||
ackMsgNo: Optional[str] = field(default=None) # noqa: N815
|
||||
packet_type: Optional[str] = field(default=None)
|
||||
timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False)
|
||||
# Holds the raw text string to be sent over the wire
|
||||
|
@ -5,7 +5,7 @@ import threading
|
||||
from oslo_config import cfg
|
||||
import wrapt
|
||||
|
||||
from aprsd.packets import collector, core
|
||||
from aprsd.packets import seen_list
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
@ -14,7 +14,6 @@ LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class PacketList(objectstore.ObjectStoreMixin):
|
||||
"""Class to keep track of the packets we tx/rx."""
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
_total_rx: int = 0
|
||||
@ -32,7 +31,7 @@ class PacketList(objectstore.ObjectStoreMixin):
|
||||
return cls._instance
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def rx(self, packet: type[core.Packet]):
|
||||
def rx(self, packet):
|
||||
"""Add a packet that was received."""
|
||||
self._total_rx += 1
|
||||
self._add(packet)
|
||||
@ -40,9 +39,10 @@ class PacketList(objectstore.ObjectStoreMixin):
|
||||
if not ptype in self.data["types"]:
|
||||
self.data["types"][ptype] = {"tx": 0, "rx": 0}
|
||||
self.data["types"][ptype]["rx"] += 1
|
||||
seen_list.SeenList().update_seen(packet)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def tx(self, packet: type[core.Packet]):
|
||||
def tx(self, packet):
|
||||
"""Add a packet that was received."""
|
||||
self._total_tx += 1
|
||||
self._add(packet)
|
||||
@ -50,6 +50,7 @@ class PacketList(objectstore.ObjectStoreMixin):
|
||||
if not ptype in self.data["types"]:
|
||||
self.data["types"][ptype] = {"tx": 0, "rx": 0}
|
||||
self.data["types"][ptype]["tx"] += 1
|
||||
seen_list.SeenList().update_seen(packet)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, packet):
|
||||
@ -66,6 +67,10 @@ class PacketList(objectstore.ObjectStoreMixin):
|
||||
def copy(self):
|
||||
return self.data.copy()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def set_maxlen(self, maxlen):
|
||||
self.maxlen = maxlen
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def find(self, packet):
|
||||
return self.data["packets"][packet.key]
|
||||
@ -104,9 +109,3 @@ class PacketList(objectstore.ObjectStoreMixin):
|
||||
"packets": pkts,
|
||||
}
|
||||
return stats
|
||||
|
||||
|
||||
# Now register the PacketList with the collector
|
||||
# every packet we RX and TX goes through the collector
|
||||
# for processing for whatever reason is needed.
|
||||
collector.PacketCollector().register(PacketList)
|
||||
|
@ -5,7 +5,6 @@ import threading
|
||||
from oslo_config import cfg
|
||||
import wrapt
|
||||
|
||||
from aprsd.packets import collector, core
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
@ -23,6 +22,7 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
@ -37,8 +37,7 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
return self.data.copy()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def rx(self, packet: type[core.Packet]):
|
||||
"""When we get a packet from the network, update the seen list."""
|
||||
def update_seen(self, packet):
|
||||
callsign = None
|
||||
if packet.from_call:
|
||||
callsign = packet.from_call
|
||||
@ -52,11 +51,3 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
}
|
||||
self.data[callsign]["last"] = datetime.datetime.now()
|
||||
self.data[callsign]["count"] += 1
|
||||
|
||||
def tx(self, packet: type[core.Packet]):
|
||||
"""We don't care about TX packets."""
|
||||
|
||||
|
||||
# Register with the packet collector so we can process the packet
|
||||
# when we get it off the client (network)
|
||||
collector.PacketCollector().register(SeenList)
|
||||
|
@ -6,7 +6,6 @@ from oslo_config import cfg
|
||||
import wrapt
|
||||
|
||||
from aprsd import utils
|
||||
from aprsd.packets import collector, core
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
@ -24,18 +23,20 @@ class WatchList(objectstore.ObjectStoreMixin):
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._update_from_conf()
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
def _update_from_conf(self, config=None):
|
||||
if CONF.watch_list.enabled and CONF.watch_list.callsigns:
|
||||
def __init__(self, config=None):
|
||||
CONF.watch_list.packet_keep_count
|
||||
|
||||
if CONF.watch_list.callsigns:
|
||||
for callsign in CONF.watch_list.callsigns:
|
||||
call = callsign.replace("*", "")
|
||||
# FIXME(waboring) - we should fetch the last time we saw
|
||||
# a beacon from a callsign or some other mechanism to find
|
||||
# last time a message was seen by aprs-is. For now this
|
||||
# is all we can do.
|
||||
if call not in self.data:
|
||||
self.data[call] = {
|
||||
"last": None,
|
||||
"packet": None,
|
||||
@ -60,19 +61,15 @@ class WatchList(objectstore.ObjectStoreMixin):
|
||||
return callsign in self.data
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def rx(self, packet: type[core.Packet]) -> None:
|
||||
"""Track when we got a packet from the network."""
|
||||
LOG.error(f"WatchList RX {packet}")
|
||||
def update_seen(self, packet):
|
||||
if packet.addresse:
|
||||
callsign = packet.addresse
|
||||
else:
|
||||
callsign = packet.from_call
|
||||
|
||||
if self.callsign_in_watchlist(callsign):
|
||||
LOG.error(f"Updating WatchList RX {callsign}")
|
||||
self.data[callsign]["last"] = datetime.datetime.now()
|
||||
self.data[callsign]["packet"] = packet
|
||||
|
||||
def tx(self, packet: type[core.Packet]) -> None:
|
||||
"""We don't care about TX packets."""
|
||||
|
||||
def last_seen(self, callsign):
|
||||
if self.callsign_in_watchlist(callsign):
|
||||
return self.data[callsign]["last"]
|
||||
@ -116,6 +113,3 @@ class WatchList(objectstore.ObjectStoreMixin):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
collector.PacketCollector().register(WatchList)
|
||||
|
@ -9,12 +9,12 @@ from aprsd.threads import aprsd
|
||||
# Create the collector and register all the objects
|
||||
# that APRSD has that implement the stats protocol
|
||||
stats_collector = collector.Collector()
|
||||
stats_collector.register_producer(app.APRSDStats)
|
||||
stats_collector.register_producer(packet_list.PacketList)
|
||||
stats_collector.register_producer(watch_list.WatchList)
|
||||
stats_collector.register_producer(tracker.PacketTrack)
|
||||
stats_collector.register_producer(plugin.PluginManager)
|
||||
stats_collector.register_producer(aprsd.APRSDThreadList)
|
||||
stats_collector.register_producer(email.EmailStats)
|
||||
stats_collector.register_producer(aprs_client.APRSClientStats)
|
||||
stats_collector.register_producer(seen_list.SeenList)
|
||||
stats_collector.register_producer(app.APRSDStats())
|
||||
stats_collector.register_producer(packet_list.PacketList())
|
||||
stats_collector.register_producer(watch_list.WatchList())
|
||||
stats_collector.register_producer(tracker.PacketTrack())
|
||||
stats_collector.register_producer(plugin.PluginManager())
|
||||
stats_collector.register_producer(aprsd.APRSDThreadList())
|
||||
stats_collector.register_producer(email.EmailStats())
|
||||
stats_collector.register_producer(aprs_client.APRSClientStats())
|
||||
stats_collector.register_producer(seen_list.SeenList())
|
||||
|
@ -1,9 +1,8 @@
|
||||
from typing import Callable, Protocol, runtime_checkable
|
||||
from typing import Protocol
|
||||
|
||||
from aprsd.utils import singleton
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class StatsProducer(Protocol):
|
||||
"""The StatsProducer protocol is used to define the interface for collecting stats."""
|
||||
def stats(self, serializeable=False) -> dict:
|
||||
@ -15,17 +14,17 @@ class StatsProducer(Protocol):
|
||||
class Collector:
|
||||
"""The Collector class is used to collect stats from multiple StatsProducer instances."""
|
||||
def __init__(self):
|
||||
self.producers: list[Callable] = []
|
||||
self.producers: dict[str, StatsProducer] = {}
|
||||
|
||||
def collect(self, serializable=False) -> dict:
|
||||
stats = {}
|
||||
for name in self.producers:
|
||||
cls = name()
|
||||
if isinstance(cls, StatsProducer):
|
||||
stats[cls.__class__.__name__] = cls.stats(serializable=serializable)
|
||||
else:
|
||||
raise TypeError(f"{cls} is not an instance of StatsProducer")
|
||||
for name, producer in self.producers.items():
|
||||
# No need to put in empty stats
|
||||
tmp_stats = producer.stats(serializable=serializable)
|
||||
if tmp_stats:
|
||||
stats[name] = tmp_stats
|
||||
return stats
|
||||
|
||||
def register_producer(self, producer_name: Callable):
|
||||
self.producers.append(producer_name)
|
||||
def register_producer(self, producer: StatsProducer):
|
||||
name = producer.__class__.__name__
|
||||
self.producers[name] = producer
|
||||
|
@ -33,7 +33,7 @@ class KeepAliveThread(APRSDThread):
|
||||
|
||||
if "EmailStats" in stats_json:
|
||||
email_stats = stats_json["EmailStats"]
|
||||
if email_stats.get("last_check_time"):
|
||||
if email_stats["last_check_time"]:
|
||||
email_thread_time = utils.strfdelta(now - email_stats["last_check_time"])
|
||||
else:
|
||||
email_thread_time = "N/A"
|
||||
|
@ -7,7 +7,6 @@ import aprslib
|
||||
from oslo_config import cfg
|
||||
|
||||
from aprsd import client, packets, plugin
|
||||
from aprsd.packets import collector
|
||||
from aprsd.packets import log as packet_log
|
||||
from aprsd.threads import APRSDThread, tx
|
||||
|
||||
@ -117,7 +116,7 @@ class APRSDDupeRXThread(APRSDRXThread):
|
||||
|
||||
if not found:
|
||||
# We haven't seen this packet before, so we process it.
|
||||
collector.PacketCollector().rx(packet)
|
||||
pkt_list.rx(packet)
|
||||
self.packet_queue.put(packet)
|
||||
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
|
||||
# If the packet came in within N seconds of the
|
||||
@ -128,7 +127,7 @@ class APRSDDupeRXThread(APRSDRXThread):
|
||||
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
|
||||
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
|
||||
)
|
||||
collector.PacketCollector().rx(packet)
|
||||
pkt_list.rx(packet)
|
||||
self.packet_queue.put(packet)
|
||||
|
||||
|
||||
@ -155,15 +154,7 @@ class APRSDProcessPacketThread(APRSDThread):
|
||||
"""We got an ack for a message, no need to resend it."""
|
||||
ack_num = packet.msgNo
|
||||
LOG.debug(f"Got ack for message {ack_num}")
|
||||
collector.PacketCollector().rx(packet)
|
||||
pkt_tracker = packets.PacketTrack()
|
||||
pkt_tracker.remove(ack_num)
|
||||
|
||||
def process_piggyback_ack(self, packet):
|
||||
"""We got an ack embedded in a packet."""
|
||||
ack_num = packet.ackMsgNo
|
||||
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
|
||||
collector.PacketCollector().rx(packet)
|
||||
packets.PacketList().rx(packet)
|
||||
pkt_tracker = packets.PacketTrack()
|
||||
pkt_tracker.remove(ack_num)
|
||||
|
||||
@ -171,7 +162,7 @@ class APRSDProcessPacketThread(APRSDThread):
|
||||
"""We got a reject message for a packet. Stop sending the message."""
|
||||
ack_num = packet.msgNo
|
||||
LOG.debug(f"Got REJECT for message {ack_num}")
|
||||
collector.PacketCollector().rx(packet)
|
||||
packets.PacketList().rx(packet)
|
||||
pkt_tracker = packets.PacketTrack()
|
||||
pkt_tracker.remove(ack_num)
|
||||
|
||||
@ -209,10 +200,6 @@ class APRSDProcessPacketThread(APRSDThread):
|
||||
):
|
||||
self.process_reject_packet(packet)
|
||||
else:
|
||||
if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
|
||||
# we got an ack embedded in this packet
|
||||
# we need to handle the ack
|
||||
self.process_piggyback_ack(packet)
|
||||
# Only ack messages that were sent directly to us
|
||||
if isinstance(packet, packets.MessagePacket):
|
||||
if to_call and to_call.lower() == our_call:
|
||||
|
@ -70,7 +70,6 @@ class ObjectStoreMixin:
|
||||
"""Save any queued to disk?"""
|
||||
if not CONF.enable_save:
|
||||
return
|
||||
self._init_store()
|
||||
save_filename = self._save_filename()
|
||||
if len(self) > 0:
|
||||
LOG.info(
|
||||
|
Loading…
Reference in New Issue
Block a user