From 123b3ffa81c5eb6366378cd4e28d936086af7686 Mon Sep 17 00:00:00 2001 From: Hemna Date: Sun, 18 Dec 2022 08:52:58 -0500 Subject: [PATCH] Change RX packet processing to enqueu This changes the RX thread to send the packet into a queue instead of starting a new thread for every packet. --- aprsd/cmds/server.py | 7 ++++++- aprsd/cmds/webchat.py | 3 +++ aprsd/threads/__init__.py | 5 +---- aprsd/threads/rx.py | 33 ++++++++++++++++-------------- examples/plugins/example_plugin.py | 6 +++--- tests/cmds/test_webchat.py | 2 +- 6 files changed, 32 insertions(+), 24 deletions(-) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 084f072..b7a16a2 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -96,10 +96,15 @@ def server(ctx, flush): plugin_manager.setup_plugins() rx_thread = rx.APRSDPluginRXThread( - msg_queues=threads.msg_queues, + packet_queue=threads.packet_queue, config=config, ) + process_thread = rx.APRSDPluginProcessPacketThread( + config=config, + packet_queue=threads.packet_queue, + ) rx_thread.start() + process_thread.start() packets.PacketTrack().restart() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 1f19412..17ca3b5 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -63,6 +63,9 @@ class SentMessages(objectstore.ObjectStoreMixin): cls._instance = super().__new__(cls) return cls._instance + def is_initialized(self): + return True + @wrapt.synchronized(lock) def add(self, msg): self.data[msg.msgNo] = self.create(msg.msgNo) diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index fd4da3b..bf8cb23 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -7,7 +7,4 @@ from .keep_alive import KeepAliveThread # noqa: F401 from .rx import APRSDRXThread # noqa: F401 -rx_msg_queue = queue.Queue(maxsize=20) -msg_queues = { - "rx": rx_msg_queue, -} +packet_queue = queue.Queue(maxsize=20) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 226248f..3ff8c9e 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -1,5 +1,6 @@ import abc import logging +import queue import time import aprslib @@ -12,9 +13,9 @@ LOG = logging.getLogger("APRSD") class APRSDRXThread(APRSDThread): - def __init__(self, msg_queues, config): + def __init__(self, packet_queue, config): super().__init__("RX_MSG") - self.msg_queues = msg_queues + self.packet_queue = packet_queue self.config = config self._client = client.factory.create() @@ -68,11 +69,7 @@ class APRSDPluginRXThread(APRSDRXThread): # LOG.debug(raw) packet.log(header="RX") packets.PacketList().rx(packet) - thread = APRSDPluginProcessPacketThread( - config=self.config, - packet=packet, - ) - thread.start() + self.packet_queue.put(packet) class APRSDProcessPacketThread(APRSDThread): @@ -83,11 +80,10 @@ class APRSDProcessPacketThread(APRSDThread): will ack a message before sending the packet to the subclass for processing.""" - def __init__(self, config, packet): + def __init__(self, config, packet_queue): self.config = config - self.packet = packet - name = self.packet.raw[:10] - super().__init__(f"RXPKT-{name}") + self.packet_queue = packet_queue + super().__init__("ProcessPKT") self._loop_cnt = 1 def process_ack_packet(self, packet): @@ -99,9 +95,18 @@ class APRSDProcessPacketThread(APRSDThread): return def loop(self): + try: + packet = self.packet_queue.get(block=True, timeout=1) + if packet: + self.process_packet(packet) + except queue.Empty: + pass + self._loop_cnt += 1 + return True + + def process_packet(self, packet): """Process a packet received from aprs-is server.""" LOG.debug(f"RXPKT-LOOP {self._loop_cnt}") - packet = self.packet our_call = self.config["aprsd"]["callsign"].lower() from_call = packet.from_call @@ -147,7 +152,7 @@ class APRSDProcessPacketThread(APRSDThread): return False @abc.abstractmethod - def process_our_message_packet(self, *args, **kwargs): + def process_our_message_packet(self, packet): """Process a MessagePacket destined for us!""" def process_other_packet(self, packet, for_us=False): @@ -210,9 +215,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): to_call=from_call, message_text=reply, ) - LOG.warning("Calling msg_pkg.send()") msg_pkt.send() - LOG.warning("Calling msg_pkg.send() --- DONE") # If the message was for us and we didn't have a # response, then we send a usage statement. diff --git a/examples/plugins/example_plugin.py b/examples/plugins/example_plugin.py index d674d0d..0a3c75e 100644 --- a/examples/plugins/example_plugin.py +++ b/examples/plugins/example_plugin.py @@ -6,7 +6,7 @@ from aprsd import plugin LOG = logging.getLogger("APRSD") -class HelloPlugin(plugin.APRSDPluginBase): +class HelloPlugin(plugin.APRSDRegexCommandPluginBase): """Hello World.""" version = "1.0" @@ -14,7 +14,7 @@ class HelloPlugin(plugin.APRSDPluginBase): command_regex = "^[hH]" command_name = "hello" - def command(self, fromcall, message, ack): + def command(self, packet): LOG.info("HelloPlugin") - reply = f"Hello '{fromcall}'" + reply = f"Hello '{packet.from_call}'" return reply diff --git a/tests/cmds/test_webchat.py b/tests/cmds/test_webchat.py index 7c23859..6bd08f8 100644 --- a/tests/cmds/test_webchat.py +++ b/tests/cmds/test_webchat.py @@ -90,7 +90,7 @@ class TestSendMessageCommand(unittest.TestCase): mock_emit.called_once() @mock.patch("aprsd.config.parse_config") - @mock.patch("aprsd.packets.PacketList.add") + @mock.patch("aprsd.packets.PacketList.rx") @mock.patch("aprsd.cmds.webchat.socketio.emit") def test_process_our_message_packet( self, mock_parse_config,