Updated webchat and listen for queue based RX

This patch updates both the webchat and listen commands
to be able to use the new queue based packet RX processing.

APRSD used to start a thread for every packet received, now
packets are pushed into a queue for processing by other threads
already running.
This commit is contained in:
Hemna 2022-12-19 10:28:22 -05:00
parent e37f99a6dd
commit ad0d89db40
4 changed files with 18 additions and 41 deletions

View File

@ -135,7 +135,10 @@ def listen(
keepalive.start()
LOG.debug("Create APRSDListenThread")
listen_thread = APRSDListenThread(threads.packet_queue, config=config)
listen_thread = APRSDListenThread(
config=config,
packet_queue=threads.packet_queue,
)
LOG.debug("Start APRSDListenThread")
listen_thread.start()
LOG.debug("keepalive Join")

View File

@ -132,42 +132,12 @@ def verify_password(username, password):
return username
class WebChatRXThread(rx.APRSDRXThread):
"""Class that connects to APRISIS/kiss and waits for messages.
After the packet is received from APRSIS/KISS, the packet is
sent to processing in the WebChatProcessPacketThread.
"""
def __init__(self, config, socketio):
super().__init__(None, config)
self.socketio = socketio
self.connected = False
def connected(self, connected=True):
self.connected = connected
def process_packet(self, *args, **kwargs):
# packet = self._client.decode_packet(*args, **kwargs)
if "packet" in kwargs:
packet = kwargs["packet"]
else:
packet = self._client.decode_packet(*args, **kwargs)
LOG.debug(f"GOT Packet {packet}")
thread = WebChatProcessPacketThread(
config=self.config,
packet=packet,
socketio=self.socketio,
)
thread.start()
class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
"""Class that handles packets being sent to us."""
def __init__(self, config, packet, socketio):
def __init__(self, config, packet_queue, socketio):
self.socketio = socketio
self.connected = False
super().__init__(config, packet)
super().__init__(config, packet_queue)
def process_ack_packet(self, packet: packets.AckPacket):
super().process_ack_packet(packet)
@ -184,7 +154,6 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
packet.get("addresse", None)
fromcall = packet.from_call
packets.PacketList().rx(packet)
message = packet.get("message_text", None)
msg = {
"id": 0,
@ -532,12 +501,17 @@ def webchat(ctx, flush, port):
packets.SeenList(config=config)
(socketio, app) = init_flask(config, loglevel, quiet)
rx_thread = WebChatRXThread(
rx_thread = rx.APRSDPluginRXThread(
config=config,
packet_queue=threads.packet_queue,
)
rx_thread.start()
process_thread = WebChatProcessPacketThread(
config=config,
packet_queue=threads.packet_queue,
socketio=socketio,
)
LOG.info("Start RX Thread")
rx_thread.start()
process_thread.start()
keepalive = threads.KeepAliveThread(config=config)
LOG.info("Start KeepAliveThread")

View File

@ -13,10 +13,10 @@ LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread):
def __init__(self, packet_queue, config):
def __init__(self, config, packet_queue):
super().__init__("RX_MSG")
self.packet_queue = packet_queue
self.config = config
self.packet_queue = packet_queue
self._client = client.factory.create()
def stop(self):
@ -95,7 +95,7 @@ class APRSDProcessPacketThread(APRSDThread):
def loop(self):
try:
packet = self.packet_queue.get(block=True, timeout=1)
packet = self.packet_queue.get(timeout=1)
if packet:
self.process_packet(packet)
except queue.Empty:

View File

@ -38,7 +38,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
# So it got acked and we are done.
LOG.info(
f"{packet.__class__.__name__}"
f"({packet.msgNo}) "
f"({self.packet.msgNo}) "
"Message Send Complete via Ack.",
)
return False