Refactored threads a bit

This patch refactors the rx threads a bit to reuse some code
responsible for processing acks when packets are received.

This also eliminates a custom thread in the webchat command for
processing received packets now that there is common code in the base
classes.
This commit is contained in:
Hemna 2022-12-02 16:26:48 -05:00
parent 480094b0d4
commit 51b80cd4ea
2 changed files with 145 additions and 221 deletions

View File

@ -2,13 +2,11 @@ import datetime
import json
import logging
from logging.handlers import RotatingFileHandler
import queue
import signal
import sys
import threading
import time
import aprslib
from aprslib import util as aprslib_util
import click
from device_detector import DeviceDetector
@ -27,7 +25,6 @@ from aprsd import config as aprsd_config
from aprsd import messaging, packets, stats, threads, utils
from aprsd.aprsd import cli
from aprsd.logging import rich as aprsd_logging
from aprsd.threads import aprsd as aprsd_thread
from aprsd.threads import rx
from aprsd.utils import objectstore, trace
@ -35,14 +32,6 @@ from aprsd.utils import objectstore, trace
LOG = logging.getLogger("APRSD")
auth = HTTPBasicAuth()
users = None
rx_msg_queue = queue.Queue(maxsize=20)
tx_msg_queue = queue.Queue(maxsize=20)
control_queue = queue.Queue(maxsize=20)
msg_queues = {
"rx": rx_msg_queue,
"control": control_queue,
"tx": tx_msg_queue,
}
def signal_handler(sig, frame):
@ -143,62 +132,19 @@ def verify_password(username, password):
class WebChatRXThread(rx.APRSDRXThread):
"""Class that connects to aprsis/kiss and waits for messages."""
"""Class that connects to APRISIS/kiss and waits for messages.
After the packet is received from APRSIS/KISS, the packet is
sent to processing in the WebChatProcessPacketThread.
"""
def __init__(self, config, socketio):
super().__init__(None, config)
self.socketio = socketio
self.connected = False
def connected(self, connected=True):
self.connected = connected
def stop(self):
self.thread_stop = True
client.factory.create().client.stop()
def loop(self):
# setup the consumer of messages and block until a messages
msg = None
try:
msg = self.msg_queues["tx"].get_nowait()
except queue.Empty:
pass
try:
if msg:
LOG.debug("GOT msg from TX queue!!")
msg.send()
except (
aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError,
):
LOG.error("Connection dropped, reconnecting")
# Put it back on the queue to send.
self.msg_queues["tx"].put(msg)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
self._client.reset()
time.sleep(2)
try:
# When new packets come in the consumer will process
# the packet
# This call blocks until thread stop() is called.
self._client.client.consumer(
self.process_packet, raw=False, blocking=False,
)
except (
aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError,
):
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
self._client.reset()
return True
return True
def process_packet(self, *args, **kwargs):
# packet = self._client.decode_packet(*args, **kwargs)
if "packet" in kwargs:
@ -207,101 +153,55 @@ class WebChatRXThread(rx.APRSDRXThread):
packet = self._client.decode_packet(*args, **kwargs)
LOG.debug(f"GOT Packet {packet}")
self.msg_queues["rx"].put(packet)
thread = WebChatProcessPacketThread(
config=self.config,
packet=packet,
socketio=self.socketio,
)
thread.start()
class WebChatTXThread(aprsd_thread.APRSDThread):
"""Class that """
def __init__(self, msg_queues, config, socketio):
super().__init__("_TXThread_")
self.msg_queues = msg_queues
self.config = config
class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
"""Class that handles packets being sent to us."""
def __init__(self, config, packet, socketio):
self.socketio = socketio
self.connected = False
def loop(self):
try:
msg = self.msg_queues["control"].get_nowait()
self.connected = msg["connected"]
except queue.Empty:
pass
try:
packet = self.msg_queues["rx"].get_nowait()
if packet:
# we got a packet and we need to send it to the
# web socket
self.process_packet(packet)
except queue.Empty:
pass
except Exception as ex:
LOG.exception(ex)
time.sleep(1)
return True
super().__init__(config, packet)
def process_ack_packet(self, packet):
super().process_ack_packet(packet)
ack_num = packet.get("msgNo")
LOG.info(f"We got ack for our sent message {ack_num}")
messaging.log_packet(packet)
SentMessages().ack(int(ack_num))
tracker = messaging.MsgTrack()
tracker.remove(ack_num)
self.socketio.emit(
"ack", SentMessages().get(int(ack_num)),
namespace="/sendmsg",
)
stats.APRSDStats().ack_rx_inc()
self.got_ack = True
def process_packet(self, packet):
LOG.info(f"process PACKET {packet}")
tocall = packet.get("addresse", None)
def process_non_ack_packet(self, packet):
LOG.info(f"process non ack PACKET {packet}")
packet.get("addresse", None)
fromcall = packet["from"]
msg = packet.get("message_text", None)
msg_id = packet.get("msgNo", "0")
msg_response = packet.get("response", None)
if (
tocall.lower() == self.config["aprsd"]["callsign"].lower()
and msg_response == "ack"
):
self.process_ack_packet(packet)
elif tocall.lower() == self.config["aprsd"]["callsign"].lower():
messaging.log_message(
"Received Message",
packet["raw"],
msg,
fromcall=fromcall,
msg_num=msg_id,
)
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(
self.config["aprsd"]["callsign"],
fromcall,
msg_id=msg_id,
)
ack.send()
packets.PacketList().add(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
msg = {
"id": 0,
"ts": time.time(),
"ack": False,
"from": fromcall,
"to": packet["to"],
"raw": packet["raw"],
"message": message,
"status": None,
"last_update": None,
"reply": None,
}
self.socketio.emit(
"new", msg,
namespace="/sendmsg",
)
packets.PacketList().add(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
msg = {
"id": 0,
"ts": time.time(),
"ack": False,
"from": fromcall,
"to": packet["to"],
"raw": packet["raw"],
"message": message,
"status": None,
"last_update": None,
"reply": None,
}
self.socketio.emit(
"new", msg,
namespace="/sendmsg",
)
class WebChatFlask(flask_classful.FlaskView):
@ -418,9 +318,8 @@ class SendMessageNamespace(Namespace):
msg = None
request = None
def __init__(self, namespace=None, config=None, msg_queues=None):
def __init__(self, namespace=None, config=None):
self._config = config
self._msg_queues = msg_queues
super().__init__(namespace)
def on_connect(self):
@ -430,13 +329,9 @@ class SendMessageNamespace(Namespace):
"connected", {"data": "/sendmsg Connected"},
namespace="/sendmsg",
)
msg = {"connected": True}
self._msg_queues["control"].put(msg)
def on_disconnect(self):
LOG.debug("WS Disconnected")
msg = {"connected": False}
self._msg_queues["control"].put(msg)
def on_send(self, data):
global socketio
@ -458,7 +353,6 @@ class SendMessageNamespace(Namespace):
namespace="/sendmsg",
)
msg.send()
# self._msg_queues["tx"].put(msg)
def on_gps(self, data):
LOG.debug(f"WS on_GPS: {data}")
@ -567,7 +461,6 @@ def init_flask(config, loglevel, quiet):
socketio.on_namespace(
SendMessageNamespace(
"/sendmsg", config=config,
msg_queues=msg_queues,
),
)
return socketio, flask_app
@ -644,18 +537,11 @@ def webchat(ctx, flush, port):
(socketio, app) = init_flask(config, loglevel, quiet)
rx_thread = WebChatRXThread(
msg_queues=msg_queues,
config=config,
)
LOG.info("Start RX Thread")
rx_thread.start()
tx_thread = WebChatTXThread(
msg_queues=msg_queues,
config=config,
socketio=socketio,
)
LOG.info("Start TX Thread")
tx_thread.start()
LOG.info("Start RX Thread")
rx_thread.start()
keepalive = threads.KeepAliveThread(config=config)
LOG.info("Start KeepAliveThread")

View File

@ -58,17 +58,32 @@ class APRSDRXThread(APRSDThread):
class APRSDPluginRXThread(APRSDRXThread):
"""Process received packets.
This is the main APRSD Server command thread that
receives packets from APRIS and then sends them for
processing in the PluginProcessPacketThread.
"""
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
thread = APRSDProcessPacketThread(packet=packet, config=self.config)
thread = APRSDPluginProcessPacketThread(
config=self.config,
packet=packet,
)
thread.start()
class APRSDProcessPacketThread(APRSDThread):
"""Base class for processing received packets.
def __init__(self, packet, config):
self.packet = packet
This is the base class for processing packets coming from
the consumer. This base class handles sending ack packets and
will ack a message before sending the packet to the subclass
for processing."""
def __init__(self, config, packet):
self.config = config
self.packet = packet
name = self.packet["raw"][:10]
super().__init__(f"RXPKT-{name}")
@ -119,7 +134,10 @@ class APRSDProcessPacketThread(APRSDThread):
)
# Only ack messages that were sent directly to us
if (tocall.lower() == self.config["aprsd"]["callsign"].lower()):
if (
tocall
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
):
stats.APRSDStats().msgs_rx_inc()
# let any threads do their thing, then ack
# send an ack last
@ -130,69 +148,89 @@ class APRSDProcessPacketThread(APRSDThread):
)
ack.send()
pm = plugin.PluginManager()
try:
results = pm.run(packet)
wl = packets.WatchList()
wl.update_seen(packet)
replied = False
for reply in results:
if isinstance(reply, list):
# one of the plugins wants to send multiple messages
replied = True
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, messaging.Message):
subreply.send()
else:
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
subreply,
)
msg.send()
elif isinstance(reply, messaging.Message):
# We have a message based object.
LOG.debug(f"Sending '{reply}'")
reply.send()
replied = True
else:
replied = True
# A plugin can return a null message flag which signals
# us that they processed the message correctly, but have
# nothing to reply with, so we avoid replying with a
# usage string
if reply is not messaging.NULL_MESSAGE:
LOG.debug(f"Sending '{reply}'")
self.process_non_ack_packet(packet)
else:
LOG.info("Packet was not for us.")
LOG.debug("Packet processing complete")
@abc.abstractmethod
def process_non_ack_packet(self, *args, **kwargs):
"""Ack packets already dealt with here."""
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
"""Process the packet through the plugin manager.
This is the main aprsd server plugin processing thread."""
def process_non_ack_packet(self, packet):
"""Send the packet through the plugins."""
fromcall = packet["from"]
tocall = packet.get("addresse", None)
msg = packet.get("message_text", None)
packet.get("msgNo", "0")
packet.get("response", None)
pm = plugin.PluginManager()
try:
results = pm.run(packet)
wl = packets.WatchList()
wl.update_seen(packet)
replied = False
for reply in results:
if isinstance(reply, list):
# one of the plugins wants to send multiple messages
replied = True
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, messaging.Message):
subreply.send()
else:
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
reply,
subreply,
)
msg.send()
elif isinstance(reply, messaging.Message):
# We have a message based object.
LOG.debug(f"Sending '{reply}'")
reply.send()
replied = True
else:
replied = True
# A plugin can return a null message flag which signals
# us that they processed the message correctly, but have
# nothing to reply with, so we avoid replying with a
# usage string
if reply is not messaging.NULL_MESSAGE:
LOG.debug(f"Sending '{reply}'")
# If the message was for us and we didn't have a
# response, then we send a usage statement.
if tocall == self.config["aprsd"]["callsign"] and not replied:
LOG.warning("Sending help!")
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
"Unknown command! Send 'help' message for help",
)
msg.send()
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.exception(ex)
# Do we need to send a reply?
if tocall == self.config["aprsd"]["callsign"]:
reply = "A Plugin failed! try again?"
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
reply,
)
msg.send()
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
reply,
)
msg.send()
LOG.debug("Packet processing complete")
# If the message was for us and we didn't have a
# response, then we send a usage statement.
if tocall == self.config["aprsd"]["callsign"] and not replied:
LOG.warning("Sending help!")
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
"Unknown command! Send 'help' message for help",
)
msg.send()
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.exception(ex)
# Do we need to send a reply?
if tocall == self.config["aprsd"]["callsign"]:
reply = "A Plugin failed! try again?"
msg = messaging.TextMessage(
self.config["aprsd"]["callsign"],
fromcall,
reply,
)
msg.send()