Removed Packet.send()

This patch decouples sending a message from the internals of
the Packet classes.  This allows the rest of the code to use
Packet objects as type hints in methods to enforce Packets
in the plugins.

The send method was moved to a single place in the threads.tx.send()
This commit is contained in:
Hemna 2022-12-21 16:26:36 -05:00
parent f464ff0785
commit 4a65f52939
11 changed files with 159 additions and 131 deletions

View File

@ -8,7 +8,7 @@ from aprslib.exceptions import LoginError
from aprsd import config as aprsd_config
from aprsd import exception
from aprsd.clients import aprsis, kiss
from aprsd.packets import core
from aprsd.packets import core, packet_list
from aprsd.utils import trace
@ -59,6 +59,10 @@ class Client:
self._client.set_filter(self.filter)
return self._client
def send(self, packet: core.Packet):
packet_list.PacketList().tx(packet)
self.client.send(packet)
def reset(self):
"""Call this to force a rebuild/reconnect."""
if self._client:

View File

@ -12,6 +12,7 @@ import wrapt
import aprsd
from aprsd import stats
from aprsd.packets import core
LOG = logging.getLogger("APRSD")
@ -32,10 +33,9 @@ class Aprsdis(aprslib.IS):
LOG.info("Shutdown Aprsdis client.")
@wrapt.synchronized(lock)
def send(self, msg):
def send(self, packet: core.Packet):
"""Send an APRS Message object."""
line = str(msg)
self.sendall(line)
self.sendall(packet.raw)
def _socket_readlines(self, blocking=False):
"""

View File

@ -9,6 +9,7 @@ import click
import aprsd
from aprsd import cli_helper, client, packets
from aprsd.aprsd import cli
from aprsd.threads import tx
LOG = logging.getLogger("APRSD")
@ -109,12 +110,14 @@ def send_message(
got_response = True
from_call = packet.from_call
our_call = config["aprsd"]["callsign"].lower()
ack_pkt = packets.AckPacket(
from_call=our_call,
to_call=from_call,
msgNo=packet.msgNo,
tx.send(
packets.AckPacket(
from_call=our_call,
to_call=from_call,
msgNo=packet.msgNo,
),
direct=True,
)
ack_pkt.send_direct()
if got_ack:
if wait_response:
@ -135,16 +138,20 @@ def send_message(
# we should bail after we get the ack and send an ack back for the
# message
if raw:
pkt = packets.Packet(from_call="", to_call="", raw=raw)
pkt.send_direct()
tx.send(
packets.Packet(from_call="", to_call="", raw=raw),
direct=True,
)
sys.exit(0)
else:
pkt = packets.MessagePacket(
from_call=aprs_login,
to_call=tocallsign,
message_text=command,
tx.send(
packets.MessagePacket(
from_call=aprs_login,
to_call=tocallsign,
message_text=command,
),
direct=True,
)
pkt.send_direct()
if no_ack:
sys.exit(0)

View File

@ -25,7 +25,7 @@ from aprsd import config as aprsd_config
from aprsd import packets, stats, threads, utils
from aprsd.aprsd import cli
from aprsd.logging import rich as aprsd_logging
from aprsd.threads import rx
from aprsd.threads import rx, tx
from aprsd.utils import objectstore, trace
@ -150,7 +150,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
self.got_ack = True
def process_our_message_packet(self, packet: packets.MessagePacket):
LOG.info(f"process non ack PACKET {packet}")
LOG.info(f"process MessagePacket {repr(packet)}")
packet.get("addresse", None)
fromcall = packet.from_call
@ -321,7 +321,7 @@ class SendMessageNamespace(Namespace):
self.msg = pkt
msgs = SentMessages()
msgs.add(pkt)
pkt.send()
tx.send(pkt)
msgs.set_status(pkt.msgNo, "Sending")
obj = msgs.get(pkt.msgNo)
socketio.emit(
@ -336,14 +336,16 @@ class SendMessageNamespace(Namespace):
LOG.debug(f"Lat DDM {lat}")
LOG.debug(f"Long DDM {long}")
beacon = packets.GPSPacket(
from_call=self._config["aprs"]["login"],
to_call="APDW16",
latitude=lat,
longitude=long,
comment="APRSD WebChat Beacon",
tx.send(
packets.GPSPacket(
from_call=self._config["aprs"]["login"],
to_call="APDW16",
latitude=lat,
longitude=long,
comment="APRSD WebChat Beacon",
),
direct=True,
)
beacon.send_direct()
def handle_message(self, data):
LOG.debug(f"WS Data {data}")

View File

@ -23,6 +23,7 @@ from aprsd import packets, plugin, stats, threads, utils
from aprsd.clients import aprsis
from aprsd.logging import log
from aprsd.logging import rich as aprsd_logging
from aprsd.threads import tx
LOG = logging.getLogger("APRSD")
@ -181,7 +182,11 @@ class SendMessageThread(threads.APRSDRXThread):
except LoginError as e:
f"Failed to setup Connection {e}"
self.packet.send_direct(aprsis_client=self.aprs_client)
tx.send(
self.packet,
direct=True,
aprs_client=self.aprs_client,
)
SentMessages().set_status(self.packet.msgNo, "Sent")
while not self.thread_stop:
@ -218,12 +223,15 @@ class SendMessageThread(threads.APRSDRXThread):
"reply", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
ack_pkt = packets.AckPacket(
from_call=self.request["from"],
to_call=packet.from_call,
msgNo=msg_number,
tx.send(
packets.AckPacket(
from_call=self.request["from"],
to_call=packet.from_call,
msgNo=msg_number,
),
direct=True,
aprs_client=self.aprsis_client,
)
ack_pkt.send_direct(aprsis_client=self.aprsis_client)
SentMessages().set_status(self.packet.msgNo, "Ack Sent")
# Now we can exit, since we are done.

View File

@ -10,9 +10,6 @@ from typing import List
import dacite
from aprsd import client
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.threads import tx
from aprsd.utils import counter
from aprsd.utils import json as aprsd_json
@ -87,7 +84,7 @@ class Packet(metaclass=abc.ABCMeta):
else:
return default
def _init_for_send(self):
def prepare(self):
"""Do stuff here that is needed prior to sending over the air."""
# now build the raw message for sending
self._build_raw()
@ -164,7 +161,7 @@ class Packet(metaclass=abc.ABCMeta):
log_list.append(f"{header}________({name})")
LOG.info("\n".join(log_list))
LOG.debug(self)
LOG.debug(repr(self))
def _filter_for_send(self) -> str:
"""Filter and format message string for FCC."""
@ -176,22 +173,10 @@ class Packet(metaclass=abc.ABCMeta):
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def send(self):
"""Method to send a packet."""
self._init_for_send()
thread = tx.SendPacketThread(packet=self)
thread.start()
def send_direct(self, aprsis_client=None):
"""Send the message in the same thread as caller."""
self._init_for_send()
if aprsis_client:
cl = aprsis_client
else:
cl = client.factory.create().client
self.log(header="TX Message Direct")
cl.send(self.raw)
PacketList().tx(self)
def __str__(self):
"""Show the raw version of the packet"""
self.prepare()
return self.raw
@dataclass
@ -219,13 +204,6 @@ class AckPacket(PathPacket):
self.msgNo,
)
def send(self):
"""Method to send a packet."""
self._init_for_send()
thread = tx.SendAckThread(packet=self)
LOG.warning(f"Starting thread to TXACK {self}")
thread.start()
@dataclass
class MessagePacket(PathPacket):

View File

@ -3,6 +3,7 @@ import threading
import wrapt
from aprsd.threads import tx
from aprsd.utils import objectstore
@ -93,11 +94,11 @@ class PacketTrack(objectstore.ObjectStoreMixin):
for key in self.data.keys():
pkt = self.data[key]
if pkt.last_send_attempt < pkt.retry_count:
pkt.send()
tx.send(pkt)
def _resend(self, packet):
packet._last_send_attempt = 0
packet.send()
tx.send(packet)
def restart_delayed(self, count=None, most_recent=True):
"""Walk the list of delayed messages and restart them if any."""

View File

@ -40,7 +40,7 @@ class APRSDPluginSpec:
"""A hook specification namespace."""
@hookspec
def filter(self, packet):
def filter(self, packet: packets.core.Packet):
"""My special little hook that you can customize."""
@ -117,11 +117,11 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
thread.stop()
@abc.abstractmethod
def filter(self, packet):
def filter(self, packet: packets.core.Packet):
pass
@abc.abstractmethod
def process(self, packet):
def process(self, packet: packets.core.Packet):
"""This is called when the filter passes."""
@ -158,7 +158,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
LOG.warning("Watch list enabled, but no callsigns set.")
@hookimpl
def filter(self, packet):
def filter(self, packet: packets.core.Packet):
result = packets.NULL_MESSAGE
if self.enabled:
wl = watch_list.WatchList()
@ -210,7 +210,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
self.enabled = True
@hookimpl
def filter(self, packet):
def filter(self, packet: packets.core.MessagePacket):
result = None
message = packet.get("message_text", None)
@ -270,7 +270,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase):
def help(self):
return "Help: send APRS help or help <plugin>"
def process(self, packet):
def process(self, packet: packets.core.MessagePacket):
LOG.info("HelpPlugin")
# fromcall = packet.get("from")
message = packet.message_text
@ -455,12 +455,12 @@ class PluginManager:
LOG.info("Completed Plugin Loading.")
def run(self, packet):
def run(self, packet: packets.core.MessagePacket):
"""Execute all the pluguns run method."""
with self.lock:
return self._pluggy_pm.hook.filter(packet=packet)
def run_watchlist(self, packet):
def run_watchlist(self, packet: packets.core.Packet):
with self.lock:
return self._watchlist_pm.hook.filter(packet=packet)

View File

@ -11,6 +11,7 @@ import time
import imapclient
from aprsd import packets, plugin, stats, threads
from aprsd.threads import tx
from aprsd.utils import trace
@ -464,12 +465,13 @@ def resend_email(config, count, fromcall):
from_addr = shortcuts_inverted[from_addr]
# asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors="ignore")
pkt = packets.MessagePacket(
from_call=config["aprsd"]["callsign"],
to_call=fromcall,
message_text=reply,
tx.send(
packets.MessagePacket(
from_call=config["aprsd"]["callsign"],
to_call=fromcall,
message_text=reply,
),
)
pkt.send()
msgexists = True
if msgexists is not True:
@ -486,12 +488,13 @@ def resend_email(config, count, fromcall):
str(m).zfill(2),
str(s).zfill(2),
)
pkt = packets.MessagePacket(
from_call=config["aprsd"]["callsign"],
to_call=fromcall,
message_text=reply,
tx.send(
packets.MessagePacket(
from_call=config["aprsd"]["callsign"],
to_call=fromcall,
message_text=reply,
),
)
pkt.send()
# check email more often since we're resending one now
EmailInfo().delay = 60
@ -606,12 +609,13 @@ class APRSDEmailThread(threads.APRSDThread):
reply = "-" + from_addr + " " + body.decode(errors="ignore")
# Send the message to the registered user in the
# config ham.callsign
pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=self.config["ham"]["callsign"],
message_text=reply,
tx.send(
packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=self.config["ham"]["callsign"],
message_text=reply,
),
)
pkt.send()
# flag message as sent via aprs
try:
server.add_flags(msgid, ["APRS"])

View File

@ -6,7 +6,7 @@ import time
import aprslib
from aprsd import client, packets, plugin
from aprsd.threads import APRSDThread
from aprsd.threads import APRSDThread, tx
LOG = logging.getLogger("APRSD")
@ -131,12 +131,13 @@ class APRSDProcessPacketThread(APRSDThread):
# It's a MessagePacket and it's for us!
# let any threads do their thing, then ack
# send an ack last
ack_pkt = packets.AckPacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
msgNo=msg_id,
tx.send(
packets.AckPacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
msgNo=msg_id,
),
)
ack_pkt.send()
self.process_our_message_packet(packet)
else:
@ -175,18 +176,20 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, packets.Packet):
subreply.send()
tx.send(subreply)
else:
to_call = self.config["aprsd"]["watch_list"]["alert_callsign"]
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=to_call,
message_text=subreply,
wl = self.config["aprsd"]["watch_list"]
to_call = wl["alert_callsign"]
tx.send(
packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=to_call,
message_text=subreply,
),
)
msg_pkt.send()
elif isinstance(reply, packets.Packet):
# We have a message based object.
reply.send()
tx.send(reply)
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.exception(ex)
@ -212,17 +215,18 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
for subreply in reply:
LOG.debug(f"Sending '{subreply}'")
if isinstance(subreply, packets.Packet):
subreply.send()
tx.send(subreply)
else:
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=subreply,
tx.send(
packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=subreply,
),
)
msg_pkt.send()
elif isinstance(reply, packets.Packet):
# We have a message based object.
reply.send()
tx.send(reply)
replied = True
else:
replied = True
@ -232,34 +236,38 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
# usage string
if reply is not packets.NULL_MESSAGE:
LOG.debug(f"Sending '{reply}'")
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=reply,
tx.send(
packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=reply,
),
)
msg_pkt.send()
# If the message was for us and we didn't have a
# response, then we send a usage statement.
if to_call == self.config["aprsd"]["callsign"] and not replied:
LOG.warning("Sending help!")
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text="Unknown command! Send 'help' message for help",
message_text = "Unknown command! Send 'help' message for help"
tx.send(
packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=message_text,
),
)
msg_pkt.send()
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.exception(ex)
# Do we need to send a reply?
if to_call == self.config["aprsd"]["callsign"]:
reply = "A Plugin failed! try again?"
msg_pkt = packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=reply,
tx.send(
packets.MessagePacket(
from_call=self.config["aprsd"]["callsign"],
to_call=from_call,
message_text=reply,
),
)
msg_pkt.send()
LOG.debug("Completed process_our_message_packet")

View File

@ -4,12 +4,34 @@ import time
from aprsd import client
from aprsd import threads as aprsd_threads
from aprsd.packets import packet_list, tracker
from aprsd.packets import core, packet_list, tracker
LOG = logging.getLogger("APRSD")
def send(packet: core.Packet, direct=False, aprs_client=None):
"""Send a packet either in a thread or directly to the client."""
# prepare the packet for sending.
# This constructs the packet.raw
packet.prepare()
if not direct:
if isinstance(packet, core.AckPacket):
thread = SendAckThread(packet=packet)
else:
thread = SendPacketThread(packet=packet)
thread.start()
else:
if aprs_client:
cl = aprs_client
else:
cl = client.factory.create()
packet.log(header="TX")
cl.send(packet)
packet_list.PacketList().tx(packet)
class SendPacketThread(aprsd_threads.APRSDThread):
loop_count: int = 1
@ -37,7 +59,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
# The message has been removed from the tracking queue
# So it got acked and we are done.
LOG.info(
f"{packet.__class__.__name__}"
f"{self.packet.__class__.__name__}"
f"({self.packet.msgNo}) "
"Message Send Complete via Ack.",
)
@ -72,10 +94,7 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if send_now:
# no attempt time, so lets send it, and start
# tracking the time.
packet.log("TX")
cl = client.factory.create().client
cl.send(packet.raw)
packet_list.PacketList().tx(packet)
send(packet, direct=True)
packet.last_send_time = datetime.datetime.now()
packet.send_count += 1
@ -123,10 +142,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
send_now = True
if send_now:
cl = client.factory.create().client
self.packet.log("TX")
cl.send(self.packet.raw)
packet_list.PacketList().tx(self.packet)
send(self.packet, direct=True)
self.packet.send_count += 1
self.packet.last_send_time = datetime.datetime.now()