Compare commits

...

3 Commits

Author SHA1 Message Date
Hemna 717db6083e Added PacketTrack to packet collector
Now the PacketTrack object is a packet collector as well.
2024-04-17 16:54:08 -04:00
Hemna 4c7e27c88b Webchat Send Beacon uses Path selected in UI
This patch changes the Send Beacon button capability in
webchat to use the path selected in the UI for the
actual beacon being sent out.
2024-04-17 12:34:01 -04:00
Hemna 88d26241f5 Added try except blocks in collectors
This patch adds some try except blocks in both the stats collector
and the packets collector calls to registered objects.  This can
prevent the rest of APRSD falling down when the collector objects
have a failure of some sort.
2024-04-17 12:24:56 -04:00
8 changed files with 64 additions and 22 deletions

View File

@ -11,7 +11,7 @@ import wrapt
from aprsd import exception
from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import collector, core
from aprsd.packets import core
from aprsd.utils import singleton, trace
@ -102,7 +102,6 @@ class Client:
def send(self, packet: core.Packet):
"""Send a packet to the network."""
collector.PacketCollector().tx(packet)
self.client.send(packet)
@wrapt.synchronized(lock)

View File

@ -543,6 +543,14 @@ class SendMessageNamespace(Namespace):
long = data["longitude"]
LOG.debug(f"Lat {lat}")
LOG.debug(f"Long {long}")
path = data.get("path", None)
if not path:
path = []
elif "," in path:
path_opts = path.split(",")
path = [x.strip() for x in path_opts]
else:
path = [path]
tx.send(
packets.BeaconPacket(
@ -551,6 +559,7 @@ class SendMessageNamespace(Namespace):
latitude=lat,
longitude=long,
comment="APRSD WebChat Beacon",
path=path,
),
direct=True,
)

View File

@ -1,9 +1,13 @@
import logging
from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable
class PacketMonitor(Protocol):
"""Protocol for Monitoring packets in some way."""
@ -29,7 +33,11 @@ class PacketCollector:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.rx(packet)
try:
cls.rx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (rx): {e}")
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")
@ -37,6 +45,9 @@ class PacketCollector:
for name in self.monitors:
cls = name()
if isinstance(cls, PacketMonitor):
cls.tx(packet)
try:
cls.tx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (tx): {e}")
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")

View File

@ -4,6 +4,7 @@ import threading
from oslo_config import cfg
import wrapt
from aprsd.packets import collector, core
from aprsd.utils import objectstore
@ -79,7 +80,19 @@ class PacketTrack(objectstore.ObjectStoreMixin):
return len(self.data)
@wrapt.synchronized(lock)
def add(self, packet):
def rx(self, packet: type[core.Packet]) -> None:
"""When we get a packet from the network, check if we should remove it."""
if isinstance(packet, core.AckPacket):
self._remove(packet.msgNo)
elif isinstance(packet, core.RejectPacket):
self._remove(packet.msgNo)
elif packet.ackMsgNo:
# Got a piggyback ack, so remove the original message
self._remove(packet.ackMsgNo)
@wrapt.synchronized(lock)
def tx(self, packet: type[core.Packet]) -> None:
"""Add a packet that was sent."""
key = packet.msgNo
packet.send_count = 0
self.data[key] = packet
@ -91,7 +104,16 @@ class PacketTrack(objectstore.ObjectStoreMixin):
@wrapt.synchronized(lock)
def remove(self, key):
self._remove(key)
def _remove(self, key):
try:
del self.data[key]
except KeyError:
pass
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketTrack)

View File

@ -1,8 +1,12 @@
import logging
from typing import Callable, Protocol, runtime_checkable
from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable
class StatsProducer(Protocol):
"""The StatsProducer protocol is used to define the interface for collecting stats."""
@ -22,7 +26,10 @@ class Collector:
for name in self.producers:
cls = name()
if isinstance(cls, StatsProducer):
stats[cls.__class__.__name__] = cls.stats(serializable=serializable)
try:
stats[cls.__class__.__name__] = cls.stats(serializable=serializable)
except Exception as e:
LOG.error(f"Error in producer {name} (stats): {e}")
else:
raise TypeError(f"{cls} is not an instance of StatsProducer")
return stats

View File

@ -18,7 +18,7 @@ LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread):
def __init__(self, packet_queue):
super().__init__("RX_MSG")
super().__init__("RX_PKT")
self.packet_queue = packet_queue
self._client = client.factory.create()
@ -156,24 +156,18 @@ class APRSDProcessPacketThread(APRSDThread):
ack_num = packet.msgNo
LOG.debug(f"Got ack for message {ack_num}")
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def process_piggyback_ack(self, packet):
"""We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def process_reject_packet(self, packet):
"""We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo
LOG.debug(f"Got REJECT for message {ack_num}")
collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def loop(self):
try:

View File

@ -10,7 +10,7 @@ from rush.stores import dictionary
from aprsd import client
from aprsd import conf # noqa
from aprsd import threads as aprsd_threads
from aprsd.packets import core
from aprsd.packets import collector, core
from aprsd.packets import log as packet_log
from aprsd.packets import tracker
@ -44,6 +44,7 @@ def send(packet: core.Packet, direct=False, aprs_client=None):
"""Send a packet either in a thread or directly to the client."""
# prepare the packet for sending.
# This constructs the packet.raw
collector.PacketCollector().tx(packet)
packet.prepare()
if isinstance(packet, core.AckPacket):
_send_ack(packet, direct=direct, aprs_client=aprs_client)
@ -89,10 +90,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
def __init__(self, packet):
self.packet = packet
name = self.packet.raw[:5]
super().__init__(f"TXPKT-{self.packet.msgNo}-{name}")
pkt_tracker = tracker.PacketTrack()
pkt_tracker.add(packet)
super().__init__(f"TX-{packet.to_call}-{self.packet.msgNo}")
def loop(self):
"""Loop until a message is acked or it gets delayed.
@ -146,7 +144,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
# no attempt time, so lets send it, and start
# tracking the time.
packet.last_send_time = int(round(time.time()))
send(packet, direct=True)
_send_direct(packet)
packet.send_count += 1
time.sleep(1)
@ -161,7 +159,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
def __init__(self, packet):
self.packet = packet
super().__init__(f"SendAck-{self.packet.msgNo}")
super().__init__(f"TXAck-{packet.to_call}-{self.packet.msgNo}")
self.max_retries = CONF.default_ack_send_count
def loop(self):
@ -195,7 +193,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
send_now = True
if send_now:
send(self.packet, direct=True)
_send_direct(self.packet)
self.packet.send_count += 1
self.packet.last_send_time = int(round(time.time()))

View File

@ -64,9 +64,11 @@ function showError(error) {
function showPosition(position) {
console.log("showPosition Called");
path = $('#pkt_path option:selected').val();
msg = {
'latitude': position.coords.latitude,
'longitude': position.coords.longitude
'longitude': position.coords.longitude,
'path': path,
}
console.log(msg);
$.toast({