From ad0d89db401847a44b2d2af82d9901b40ecd9b5a Mon Sep 17 00:00:00 2001 From: Hemna Date: Mon, 19 Dec 2022 10:28:22 -0500 Subject: [PATCH] Updated webchat and listen for queue based RX This patch updates both the webchat and listen commands to be able to use the new queue based packet RX processing. APRSD used to start a thread for every packet received, now packets are pushed into a queue for processing by other threads already running. --- aprsd/cmds/listen.py | 5 ++++- aprsd/cmds/webchat.py | 46 ++++++++++--------------------------------- aprsd/threads/rx.py | 6 +++--- aprsd/threads/tx.py | 2 +- 4 files changed, 18 insertions(+), 41 deletions(-) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index a8066d4..da6b4c8 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -135,7 +135,10 @@ def listen( keepalive.start() LOG.debug("Create APRSDListenThread") - listen_thread = APRSDListenThread(threads.packet_queue, config=config) + listen_thread = APRSDListenThread( + config=config, + packet_queue=threads.packet_queue, + ) LOG.debug("Start APRSDListenThread") listen_thread.start() LOG.debug("keepalive Join") diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index d0c0b9b..c53b44f 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -132,42 +132,12 @@ def verify_password(username, password): return username -class WebChatRXThread(rx.APRSDRXThread): - """Class that connects to APRISIS/kiss and waits for messages. - - After the packet is received from APRSIS/KISS, the packet is - sent to processing in the WebChatProcessPacketThread. - """ - def __init__(self, config, socketio): - super().__init__(None, config) - self.socketio = socketio - self.connected = False - - def connected(self, connected=True): - self.connected = connected - - def process_packet(self, *args, **kwargs): - # packet = self._client.decode_packet(*args, **kwargs) - if "packet" in kwargs: - packet = kwargs["packet"] - else: - packet = self._client.decode_packet(*args, **kwargs) - - LOG.debug(f"GOT Packet {packet}") - thread = WebChatProcessPacketThread( - config=self.config, - packet=packet, - socketio=self.socketio, - ) - thread.start() - - class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): """Class that handles packets being sent to us.""" - def __init__(self, config, packet, socketio): + def __init__(self, config, packet_queue, socketio): self.socketio = socketio self.connected = False - super().__init__(config, packet) + super().__init__(config, packet_queue) def process_ack_packet(self, packet: packets.AckPacket): super().process_ack_packet(packet) @@ -184,7 +154,6 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): packet.get("addresse", None) fromcall = packet.from_call - packets.PacketList().rx(packet) message = packet.get("message_text", None) msg = { "id": 0, @@ -532,12 +501,17 @@ def webchat(ctx, flush, port): packets.SeenList(config=config) (socketio, app) = init_flask(config, loglevel, quiet) - rx_thread = WebChatRXThread( + rx_thread = rx.APRSDPluginRXThread( config=config, + packet_queue=threads.packet_queue, + ) + rx_thread.start() + process_thread = WebChatProcessPacketThread( + config=config, + packet_queue=threads.packet_queue, socketio=socketio, ) - LOG.info("Start RX Thread") - rx_thread.start() + process_thread.start() keepalive = threads.KeepAliveThread(config=config) LOG.info("Start KeepAliveThread") diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 173b024..4db1577 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -13,10 +13,10 @@ LOG = logging.getLogger("APRSD") class APRSDRXThread(APRSDThread): - def __init__(self, packet_queue, config): + def __init__(self, config, packet_queue): super().__init__("RX_MSG") - self.packet_queue = packet_queue self.config = config + self.packet_queue = packet_queue self._client = client.factory.create() def stop(self): @@ -95,7 +95,7 @@ class APRSDProcessPacketThread(APRSDThread): def loop(self): try: - packet = self.packet_queue.get(block=True, timeout=1) + packet = self.packet_queue.get(timeout=1) if packet: self.process_packet(packet) except queue.Empty: diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index ac3651a..4511e95 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -38,7 +38,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): # So it got acked and we are done. LOG.info( f"{packet.__class__.__name__}" - f"({packet.msgNo}) " + f"({self.packet.msgNo}) " "Message Send Complete via Ack.", ) return False