diff --git a/aprsd/client.py b/aprsd/client.py index 6d46954..52d14ef 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -11,7 +11,7 @@ import wrapt from aprsd import exception from aprsd.clients import aprsis, fake, kiss -from aprsd.packets import collector, core +from aprsd.packets import core from aprsd.utils import singleton, trace @@ -102,7 +102,6 @@ class Client: def send(self, packet: core.Packet): """Send a packet to the network.""" - collector.PacketCollector().tx(packet) self.client.send(packet) @wrapt.synchronized(lock) diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 74dc50c..52b6fc6 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -4,6 +4,7 @@ import threading from oslo_config import cfg import wrapt +from aprsd.packets import collector, core from aprsd.utils import objectstore @@ -79,7 +80,19 @@ class PacketTrack(objectstore.ObjectStoreMixin): return len(self.data) @wrapt.synchronized(lock) - def add(self, packet): + def rx(self, packet: type[core.Packet]) -> None: + """When we get a packet from the network, check if we should remove it.""" + if isinstance(packet, core.AckPacket): + self._remove(packet.msgNo) + elif isinstance(packet, core.RejectPacket): + self._remove(packet.msgNo) + elif packet.ackMsgNo: + # Got a piggyback ack, so remove the original message + self._remove(packet.ackMsgNo) + + @wrapt.synchronized(lock) + def tx(self, packet: type[core.Packet]) -> None: + """Add a packet that was sent.""" key = packet.msgNo packet.send_count = 0 self.data[key] = packet @@ -91,7 +104,16 @@ class PacketTrack(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def remove(self, key): + self._remove(key) + + def _remove(self, key): try: del self.data[key] except KeyError: pass + + +# Now register the PacketList with the collector +# every packet we RX and TX goes through the collector +# for processing for whatever reason is needed. +collector.PacketCollector().register(PacketTrack) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 7a59865..dbf558b 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -18,7 +18,7 @@ LOG = logging.getLogger("APRSD") class APRSDRXThread(APRSDThread): def __init__(self, packet_queue): - super().__init__("RX_MSG") + super().__init__("RX_PKT") self.packet_queue = packet_queue self._client = client.factory.create() @@ -156,24 +156,18 @@ class APRSDProcessPacketThread(APRSDThread): ack_num = packet.msgNo LOG.debug(f"Got ack for message {ack_num}") collector.PacketCollector().rx(packet) - pkt_tracker = packets.PacketTrack() - pkt_tracker.remove(ack_num) def process_piggyback_ack(self, packet): """We got an ack embedded in a packet.""" ack_num = packet.ackMsgNo LOG.debug(f"Got PiggyBackAck for message {ack_num}") collector.PacketCollector().rx(packet) - pkt_tracker = packets.PacketTrack() - pkt_tracker.remove(ack_num) def process_reject_packet(self, packet): """We got a reject message for a packet. Stop sending the message.""" ack_num = packet.msgNo LOG.debug(f"Got REJECT for message {ack_num}") collector.PacketCollector().rx(packet) - pkt_tracker = packets.PacketTrack() - pkt_tracker.remove(ack_num) def loop(self): try: diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index 273f2ec..634f486 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -10,7 +10,7 @@ from rush.stores import dictionary from aprsd import client from aprsd import conf # noqa from aprsd import threads as aprsd_threads -from aprsd.packets import core +from aprsd.packets import collector, core from aprsd.packets import log as packet_log from aprsd.packets import tracker @@ -44,6 +44,7 @@ def send(packet: core.Packet, direct=False, aprs_client=None): """Send a packet either in a thread or directly to the client.""" # prepare the packet for sending. # This constructs the packet.raw + collector.PacketCollector().tx(packet) packet.prepare() if isinstance(packet, core.AckPacket): _send_ack(packet, direct=direct, aprs_client=aprs_client) @@ -89,10 +90,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): def __init__(self, packet): self.packet = packet - name = self.packet.raw[:5] - super().__init__(f"TXPKT-{self.packet.msgNo}-{name}") - pkt_tracker = tracker.PacketTrack() - pkt_tracker.add(packet) + super().__init__(f"TX-{packet.to_call}-{self.packet.msgNo}") def loop(self): """Loop until a message is acked or it gets delayed. @@ -146,7 +144,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): # no attempt time, so lets send it, and start # tracking the time. packet.last_send_time = int(round(time.time())) - send(packet, direct=True) + _send_direct(packet) packet.send_count += 1 time.sleep(1) @@ -161,7 +159,7 @@ class SendAckThread(aprsd_threads.APRSDThread): def __init__(self, packet): self.packet = packet - super().__init__(f"SendAck-{self.packet.msgNo}") + super().__init__(f"TXAck-{packet.to_call}-{self.packet.msgNo}") self.max_retries = CONF.default_ack_send_count def loop(self): @@ -195,7 +193,7 @@ class SendAckThread(aprsd_threads.APRSDThread): send_now = True if send_now: - send(self.packet, direct=True) + _send_direct(self.packet) self.packet.send_count += 1 self.packet.last_send_time = int(round(time.time()))