Compare commits

...

3 Commits

Author SHA1 Message Date
Hemna 717db6083e Added PacketTrack to packet collector
Now the PacketTrack object is a packet collector as well.
2024-04-17 16:54:08 -04:00
Hemna 4c7e27c88b Webchat Send Beacon uses Path selected in UI
This patch changes the Send Beacon button capability in
webchat to use the path selected in the UI for the
actual beacon being sent out.
2024-04-17 12:34:01 -04:00
Hemna 88d26241f5 Added try except blocks in collectors
This patch adds some try except blocks in both the stats collector
and the packets collector calls to registered objects.  This can
prevent the rest of APRSD falling down when the collector objects
have a failure of some sort.
2024-04-17 12:24:56 -04:00
8 changed files with 64 additions and 22 deletions

View File

@ -11,7 +11,7 @@ import wrapt
from aprsd import exception from aprsd import exception
from aprsd.clients import aprsis, fake, kiss from aprsd.clients import aprsis, fake, kiss
from aprsd.packets import collector, core from aprsd.packets import core
from aprsd.utils import singleton, trace from aprsd.utils import singleton, trace
@ -102,7 +102,6 @@ class Client:
def send(self, packet: core.Packet): def send(self, packet: core.Packet):
"""Send a packet to the network.""" """Send a packet to the network."""
collector.PacketCollector().tx(packet)
self.client.send(packet) self.client.send(packet)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)

View File

@ -543,6 +543,14 @@ class SendMessageNamespace(Namespace):
long = data["longitude"] long = data["longitude"]
LOG.debug(f"Lat {lat}") LOG.debug(f"Lat {lat}")
LOG.debug(f"Long {long}") LOG.debug(f"Long {long}")
path = data.get("path", None)
if not path:
path = []
elif "," in path:
path_opts = path.split(",")
path = [x.strip() for x in path_opts]
else:
path = [path]
tx.send( tx.send(
packets.BeaconPacket( packets.BeaconPacket(
@ -551,6 +559,7 @@ class SendMessageNamespace(Namespace):
latitude=lat, latitude=lat,
longitude=long, longitude=long,
comment="APRSD WebChat Beacon", comment="APRSD WebChat Beacon",
path=path,
), ),
direct=True, direct=True,
) )

View File

@ -1,9 +1,13 @@
import logging
from typing import Callable, Protocol, runtime_checkable from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core from aprsd.packets import core
from aprsd.utils import singleton from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable @runtime_checkable
class PacketMonitor(Protocol): class PacketMonitor(Protocol):
"""Protocol for Monitoring packets in some way.""" """Protocol for Monitoring packets in some way."""
@ -29,7 +33,11 @@ class PacketCollector:
for name in self.monitors: for name in self.monitors:
cls = name() cls = name()
if isinstance(cls, PacketMonitor): if isinstance(cls, PacketMonitor):
cls.rx(packet) try:
cls.rx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (rx): {e}")
else: else:
raise TypeError(f"Monitor {name} is not a PacketMonitor") raise TypeError(f"Monitor {name} is not a PacketMonitor")
@ -37,6 +45,9 @@ class PacketCollector:
for name in self.monitors: for name in self.monitors:
cls = name() cls = name()
if isinstance(cls, PacketMonitor): if isinstance(cls, PacketMonitor):
cls.tx(packet) try:
cls.tx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (tx): {e}")
else: else:
raise TypeError(f"Monitor {name} is not a PacketMonitor") raise TypeError(f"Monitor {name} is not a PacketMonitor")

View File

@ -4,6 +4,7 @@ import threading
from oslo_config import cfg from oslo_config import cfg
import wrapt import wrapt
from aprsd.packets import collector, core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -79,7 +80,19 @@ class PacketTrack(objectstore.ObjectStoreMixin):
return len(self.data) return len(self.data)
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def add(self, packet): def rx(self, packet: type[core.Packet]) -> None:
"""When we get a packet from the network, check if we should remove it."""
if isinstance(packet, core.AckPacket):
self._remove(packet.msgNo)
elif isinstance(packet, core.RejectPacket):
self._remove(packet.msgNo)
elif packet.ackMsgNo:
# Got a piggyback ack, so remove the original message
self._remove(packet.ackMsgNo)
@wrapt.synchronized(lock)
def tx(self, packet: type[core.Packet]) -> None:
"""Add a packet that was sent."""
key = packet.msgNo key = packet.msgNo
packet.send_count = 0 packet.send_count = 0
self.data[key] = packet self.data[key] = packet
@ -91,7 +104,16 @@ class PacketTrack(objectstore.ObjectStoreMixin):
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def remove(self, key): def remove(self, key):
self._remove(key)
def _remove(self, key):
try: try:
del self.data[key] del self.data[key]
except KeyError: except KeyError:
pass pass
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketTrack)

View File

@ -1,8 +1,12 @@
import logging
from typing import Callable, Protocol, runtime_checkable from typing import Callable, Protocol, runtime_checkable
from aprsd.utils import singleton from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable @runtime_checkable
class StatsProducer(Protocol): class StatsProducer(Protocol):
"""The StatsProducer protocol is used to define the interface for collecting stats.""" """The StatsProducer protocol is used to define the interface for collecting stats."""
@ -22,7 +26,10 @@ class Collector:
for name in self.producers: for name in self.producers:
cls = name() cls = name()
if isinstance(cls, StatsProducer): if isinstance(cls, StatsProducer):
stats[cls.__class__.__name__] = cls.stats(serializable=serializable) try:
stats[cls.__class__.__name__] = cls.stats(serializable=serializable)
except Exception as e:
LOG.error(f"Error in producer {name} (stats): {e}")
else: else:
raise TypeError(f"{cls} is not an instance of StatsProducer") raise TypeError(f"{cls} is not an instance of StatsProducer")
return stats return stats

View File

@ -18,7 +18,7 @@ LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread): class APRSDRXThread(APRSDThread):
def __init__(self, packet_queue): def __init__(self, packet_queue):
super().__init__("RX_MSG") super().__init__("RX_PKT")
self.packet_queue = packet_queue self.packet_queue = packet_queue
self._client = client.factory.create() self._client = client.factory.create()
@ -156,24 +156,18 @@ class APRSDProcessPacketThread(APRSDThread):
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.debug(f"Got ack for message {ack_num}") LOG.debug(f"Got ack for message {ack_num}")
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def process_piggyback_ack(self, packet): def process_piggyback_ack(self, packet):
"""We got an ack embedded in a packet.""" """We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo ack_num = packet.ackMsgNo
LOG.debug(f"Got PiggyBackAck for message {ack_num}") LOG.debug(f"Got PiggyBackAck for message {ack_num}")
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def process_reject_packet(self, packet): def process_reject_packet(self, packet):
"""We got a reject message for a packet. Stop sending the message.""" """We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.debug(f"Got REJECT for message {ack_num}") LOG.debug(f"Got REJECT for message {ack_num}")
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
def loop(self): def loop(self):
try: try:

View File

@ -10,7 +10,7 @@ from rush.stores import dictionary
from aprsd import client from aprsd import client
from aprsd import conf # noqa from aprsd import conf # noqa
from aprsd import threads as aprsd_threads from aprsd import threads as aprsd_threads
from aprsd.packets import core from aprsd.packets import collector, core
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.packets import tracker from aprsd.packets import tracker
@ -44,6 +44,7 @@ def send(packet: core.Packet, direct=False, aprs_client=None):
"""Send a packet either in a thread or directly to the client.""" """Send a packet either in a thread or directly to the client."""
# prepare the packet for sending. # prepare the packet for sending.
# This constructs the packet.raw # This constructs the packet.raw
collector.PacketCollector().tx(packet)
packet.prepare() packet.prepare()
if isinstance(packet, core.AckPacket): if isinstance(packet, core.AckPacket):
_send_ack(packet, direct=direct, aprs_client=aprs_client) _send_ack(packet, direct=direct, aprs_client=aprs_client)
@ -89,10 +90,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
def __init__(self, packet): def __init__(self, packet):
self.packet = packet self.packet = packet
name = self.packet.raw[:5] super().__init__(f"TX-{packet.to_call}-{self.packet.msgNo}")
super().__init__(f"TXPKT-{self.packet.msgNo}-{name}")
pkt_tracker = tracker.PacketTrack()
pkt_tracker.add(packet)
def loop(self): def loop(self):
"""Loop until a message is acked or it gets delayed. """Loop until a message is acked or it gets delayed.
@ -146,7 +144,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
# no attempt time, so lets send it, and start # no attempt time, so lets send it, and start
# tracking the time. # tracking the time.
packet.last_send_time = int(round(time.time())) packet.last_send_time = int(round(time.time()))
send(packet, direct=True) _send_direct(packet)
packet.send_count += 1 packet.send_count += 1
time.sleep(1) time.sleep(1)
@ -161,7 +159,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
def __init__(self, packet): def __init__(self, packet):
self.packet = packet self.packet = packet
super().__init__(f"SendAck-{self.packet.msgNo}") super().__init__(f"TXAck-{packet.to_call}-{self.packet.msgNo}")
self.max_retries = CONF.default_ack_send_count self.max_retries = CONF.default_ack_send_count
def loop(self): def loop(self):
@ -195,7 +193,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
send_now = True send_now = True
if send_now: if send_now:
send(self.packet, direct=True) _send_direct(self.packet)
self.packet.send_count += 1 self.packet.send_count += 1
self.packet.last_send_time = int(round(time.time())) self.packet.last_send_time = int(round(time.time()))

View File

@ -64,9 +64,11 @@ function showError(error) {
function showPosition(position) { function showPosition(position) {
console.log("showPosition Called"); console.log("showPosition Called");
path = $('#pkt_path option:selected').val();
msg = { msg = {
'latitude': position.coords.latitude, 'latitude': position.coords.latitude,
'longitude': position.coords.longitude 'longitude': position.coords.longitude,
'path': path,
} }
console.log(msg); console.log(msg);
$.toast({ $.toast({