Change RX packet processing to enqueu

This changes the RX thread to send the packet into a queue instead of
starting a new thread for every packet.
This commit is contained in:
Hemna 2022-12-18 08:52:58 -05:00
parent 1187f1ed73
commit 123b3ffa81
6 changed files with 32 additions and 24 deletions

View File

@ -96,10 +96,15 @@ def server(ctx, flush):
plugin_manager.setup_plugins() plugin_manager.setup_plugins()
rx_thread = rx.APRSDPluginRXThread( rx_thread = rx.APRSDPluginRXThread(
msg_queues=threads.msg_queues, packet_queue=threads.packet_queue,
config=config, config=config,
) )
process_thread = rx.APRSDPluginProcessPacketThread(
config=config,
packet_queue=threads.packet_queue,
)
rx_thread.start() rx_thread.start()
process_thread.start()
packets.PacketTrack().restart() packets.PacketTrack().restart()

View File

@ -63,6 +63,9 @@ class SentMessages(objectstore.ObjectStoreMixin):
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
return cls._instance return cls._instance
def is_initialized(self):
return True
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def add(self, msg): def add(self, msg):
self.data[msg.msgNo] = self.create(msg.msgNo) self.data[msg.msgNo] = self.create(msg.msgNo)

View File

@ -7,7 +7,4 @@ from .keep_alive import KeepAliveThread # noqa: F401
from .rx import APRSDRXThread # noqa: F401 from .rx import APRSDRXThread # noqa: F401
rx_msg_queue = queue.Queue(maxsize=20) packet_queue = queue.Queue(maxsize=20)
msg_queues = {
"rx": rx_msg_queue,
}

View File

@ -1,5 +1,6 @@
import abc import abc
import logging import logging
import queue
import time import time
import aprslib import aprslib
@ -12,9 +13,9 @@ LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread): class APRSDRXThread(APRSDThread):
def __init__(self, msg_queues, config): def __init__(self, packet_queue, config):
super().__init__("RX_MSG") super().__init__("RX_MSG")
self.msg_queues = msg_queues self.packet_queue = packet_queue
self.config = config self.config = config
self._client = client.factory.create() self._client = client.factory.create()
@ -68,11 +69,7 @@ class APRSDPluginRXThread(APRSDRXThread):
# LOG.debug(raw) # LOG.debug(raw)
packet.log(header="RX") packet.log(header="RX")
packets.PacketList().rx(packet) packets.PacketList().rx(packet)
thread = APRSDPluginProcessPacketThread( self.packet_queue.put(packet)
config=self.config,
packet=packet,
)
thread.start()
class APRSDProcessPacketThread(APRSDThread): class APRSDProcessPacketThread(APRSDThread):
@ -83,11 +80,10 @@ class APRSDProcessPacketThread(APRSDThread):
will ack a message before sending the packet to the subclass will ack a message before sending the packet to the subclass
for processing.""" for processing."""
def __init__(self, config, packet): def __init__(self, config, packet_queue):
self.config = config self.config = config
self.packet = packet self.packet_queue = packet_queue
name = self.packet.raw[:10] super().__init__("ProcessPKT")
super().__init__(f"RXPKT-{name}")
self._loop_cnt = 1 self._loop_cnt = 1
def process_ack_packet(self, packet): def process_ack_packet(self, packet):
@ -99,9 +95,18 @@ class APRSDProcessPacketThread(APRSDThread):
return return
def loop(self): def loop(self):
try:
packet = self.packet_queue.get(block=True, timeout=1)
if packet:
self.process_packet(packet)
except queue.Empty:
pass
self._loop_cnt += 1
return True
def process_packet(self, packet):
"""Process a packet received from aprs-is server.""" """Process a packet received from aprs-is server."""
LOG.debug(f"RXPKT-LOOP {self._loop_cnt}") LOG.debug(f"RXPKT-LOOP {self._loop_cnt}")
packet = self.packet
our_call = self.config["aprsd"]["callsign"].lower() our_call = self.config["aprsd"]["callsign"].lower()
from_call = packet.from_call from_call = packet.from_call
@ -147,7 +152,7 @@ class APRSDProcessPacketThread(APRSDThread):
return False return False
@abc.abstractmethod @abc.abstractmethod
def process_our_message_packet(self, *args, **kwargs): def process_our_message_packet(self, packet):
"""Process a MessagePacket destined for us!""" """Process a MessagePacket destined for us!"""
def process_other_packet(self, packet, for_us=False): def process_other_packet(self, packet, for_us=False):
@ -210,9 +215,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
to_call=from_call, to_call=from_call,
message_text=reply, message_text=reply,
) )
LOG.warning("Calling msg_pkg.send()")
msg_pkt.send() msg_pkt.send()
LOG.warning("Calling msg_pkg.send() --- DONE")
# If the message was for us and we didn't have a # If the message was for us and we didn't have a
# response, then we send a usage statement. # response, then we send a usage statement.

View File

@ -6,7 +6,7 @@ from aprsd import plugin
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
class HelloPlugin(plugin.APRSDPluginBase): class HelloPlugin(plugin.APRSDRegexCommandPluginBase):
"""Hello World.""" """Hello World."""
version = "1.0" version = "1.0"
@ -14,7 +14,7 @@ class HelloPlugin(plugin.APRSDPluginBase):
command_regex = "^[hH]" command_regex = "^[hH]"
command_name = "hello" command_name = "hello"
def command(self, fromcall, message, ack): def command(self, packet):
LOG.info("HelloPlugin") LOG.info("HelloPlugin")
reply = f"Hello '{fromcall}'" reply = f"Hello '{packet.from_call}'"
return reply return reply

View File

@ -90,7 +90,7 @@ class TestSendMessageCommand(unittest.TestCase):
mock_emit.called_once() mock_emit.called_once()
@mock.patch("aprsd.config.parse_config") @mock.patch("aprsd.config.parse_config")
@mock.patch("aprsd.packets.PacketList.add") @mock.patch("aprsd.packets.PacketList.rx")
@mock.patch("aprsd.cmds.webchat.socketio.emit") @mock.patch("aprsd.cmds.webchat.socketio.emit")
def test_process_our_message_packet( def test_process_our_message_packet(
self, mock_parse_config, self, mock_parse_config,