diff --git a/aprsd/client.py b/aprsd/client.py index 4285990..3d06e08 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -8,7 +8,7 @@ from aprslib.exceptions import LoginError from aprsd import config as aprsd_config from aprsd import exception from aprsd.clients import aprsis, kiss -from aprsd.packets import core +from aprsd.packets import core, packet_list from aprsd.utils import trace @@ -59,6 +59,10 @@ class Client: self._client.set_filter(self.filter) return self._client + def send(self, packet: core.Packet): + packet_list.PacketList().tx(packet) + self.client.send(packet) + def reset(self): """Call this to force a rebuild/reconnect.""" if self._client: diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py index fc22008..191f43d 100644 --- a/aprsd/clients/aprsis.py +++ b/aprsd/clients/aprsis.py @@ -12,6 +12,7 @@ import wrapt import aprsd from aprsd import stats +from aprsd.packets import core LOG = logging.getLogger("APRSD") @@ -32,10 +33,9 @@ class Aprsdis(aprslib.IS): LOG.info("Shutdown Aprsdis client.") @wrapt.synchronized(lock) - def send(self, msg): + def send(self, packet: core.Packet): """Send an APRS Message object.""" - line = str(msg) - self.sendall(line) + self.sendall(packet.raw) def _socket_readlines(self, blocking=False): """ diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index 469901d..9f69fc5 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -9,6 +9,7 @@ import click import aprsd from aprsd import cli_helper, client, packets from aprsd.aprsd import cli +from aprsd.threads import tx LOG = logging.getLogger("APRSD") @@ -109,12 +110,14 @@ def send_message( got_response = True from_call = packet.from_call our_call = config["aprsd"]["callsign"].lower() - ack_pkt = packets.AckPacket( - from_call=our_call, - to_call=from_call, - msgNo=packet.msgNo, + tx.send( + packets.AckPacket( + from_call=our_call, + to_call=from_call, + msgNo=packet.msgNo, + ), + direct=True, ) - ack_pkt.send_direct() if got_ack: if wait_response: @@ -135,16 +138,20 @@ def send_message( # we should bail after we get the ack and send an ack back for the # message if raw: - pkt = packets.Packet(from_call="", to_call="", raw=raw) - pkt.send_direct() + tx.send( + packets.Packet(from_call="", to_call="", raw=raw), + direct=True, + ) sys.exit(0) else: - pkt = packets.MessagePacket( - from_call=aprs_login, - to_call=tocallsign, - message_text=command, + tx.send( + packets.MessagePacket( + from_call=aprs_login, + to_call=tocallsign, + message_text=command, + ), + direct=True, ) - pkt.send_direct() if no_ack: sys.exit(0) diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index c53b44f..261ba19 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -25,7 +25,7 @@ from aprsd import config as aprsd_config from aprsd import packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.logging import rich as aprsd_logging -from aprsd.threads import rx +from aprsd.threads import rx, tx from aprsd.utils import objectstore, trace @@ -150,7 +150,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): self.got_ack = True def process_our_message_packet(self, packet: packets.MessagePacket): - LOG.info(f"process non ack PACKET {packet}") + LOG.info(f"process MessagePacket {repr(packet)}") packet.get("addresse", None) fromcall = packet.from_call @@ -321,7 +321,7 @@ class SendMessageNamespace(Namespace): self.msg = pkt msgs = SentMessages() msgs.add(pkt) - pkt.send() + tx.send(pkt) msgs.set_status(pkt.msgNo, "Sending") obj = msgs.get(pkt.msgNo) socketio.emit( @@ -336,14 +336,16 @@ class SendMessageNamespace(Namespace): LOG.debug(f"Lat DDM {lat}") LOG.debug(f"Long DDM {long}") - beacon = packets.GPSPacket( - from_call=self._config["aprs"]["login"], - to_call="APDW16", - latitude=lat, - longitude=long, - comment="APRSD WebChat Beacon", + tx.send( + packets.GPSPacket( + from_call=self._config["aprs"]["login"], + to_call="APDW16", + latitude=lat, + longitude=long, + comment="APRSD WebChat Beacon", + ), + direct=True, ) - beacon.send_direct() def handle_message(self, data): LOG.debug(f"WS Data {data}") diff --git a/aprsd/flask.py b/aprsd/flask.py index 8f4ddf6..3f374a3 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -23,6 +23,7 @@ from aprsd import packets, plugin, stats, threads, utils from aprsd.clients import aprsis from aprsd.logging import log from aprsd.logging import rich as aprsd_logging +from aprsd.threads import tx LOG = logging.getLogger("APRSD") @@ -181,7 +182,11 @@ class SendMessageThread(threads.APRSDRXThread): except LoginError as e: f"Failed to setup Connection {e}" - self.packet.send_direct(aprsis_client=self.aprs_client) + tx.send( + self.packet, + direct=True, + aprs_client=self.aprs_client, + ) SentMessages().set_status(self.packet.msgNo, "Sent") while not self.thread_stop: @@ -218,12 +223,15 @@ class SendMessageThread(threads.APRSDRXThread): "reply", SentMessages().get(self.packet.msgNo), namespace="/sendmsg", ) - ack_pkt = packets.AckPacket( - from_call=self.request["from"], - to_call=packet.from_call, - msgNo=msg_number, + tx.send( + packets.AckPacket( + from_call=self.request["from"], + to_call=packet.from_call, + msgNo=msg_number, + ), + direct=True, + aprs_client=self.aprsis_client, ) - ack_pkt.send_direct(aprsis_client=self.aprsis_client) SentMessages().set_status(self.packet.msgNo, "Ack Sent") # Now we can exit, since we are done. diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 8638edf..782db3f 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -10,9 +10,6 @@ from typing import List import dacite -from aprsd import client -from aprsd.packets.packet_list import PacketList # noqa: F401 -from aprsd.threads import tx from aprsd.utils import counter from aprsd.utils import json as aprsd_json @@ -87,7 +84,7 @@ class Packet(metaclass=abc.ABCMeta): else: return default - def _init_for_send(self): + def prepare(self): """Do stuff here that is needed prior to sending over the air.""" # now build the raw message for sending self._build_raw() @@ -164,7 +161,7 @@ class Packet(metaclass=abc.ABCMeta): log_list.append(f"{header}________({name})") LOG.info("\n".join(log_list)) - LOG.debug(self) + LOG.debug(repr(self)) def _filter_for_send(self) -> str: """Filter and format message string for FCC.""" @@ -176,22 +173,10 @@ class Packet(metaclass=abc.ABCMeta): # We all miss George Carlin return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def send(self): - """Method to send a packet.""" - self._init_for_send() - thread = tx.SendPacketThread(packet=self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send the message in the same thread as caller.""" - self._init_for_send() - if aprsis_client: - cl = aprsis_client - else: - cl = client.factory.create().client - self.log(header="TX Message Direct") - cl.send(self.raw) - PacketList().tx(self) + def __str__(self): + """Show the raw version of the packet""" + self.prepare() + return self.raw @dataclass @@ -219,13 +204,6 @@ class AckPacket(PathPacket): self.msgNo, ) - def send(self): - """Method to send a packet.""" - self._init_for_send() - thread = tx.SendAckThread(packet=self) - LOG.warning(f"Starting thread to TXACK {self}") - thread.start() - @dataclass class MessagePacket(PathPacket): diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 8d3d931..e58e323 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -3,6 +3,7 @@ import threading import wrapt +from aprsd.threads import tx from aprsd.utils import objectstore @@ -93,11 +94,11 @@ class PacketTrack(objectstore.ObjectStoreMixin): for key in self.data.keys(): pkt = self.data[key] if pkt.last_send_attempt < pkt.retry_count: - pkt.send() + tx.send(pkt) def _resend(self, packet): packet._last_send_attempt = 0 - packet.send() + tx.send(packet) def restart_delayed(self, count=None, most_recent=True): """Walk the list of delayed messages and restart them if any.""" diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 75a8e4c..d25adad 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -40,7 +40,7 @@ class APRSDPluginSpec: """A hook specification namespace.""" @hookspec - def filter(self, packet): + def filter(self, packet: packets.core.Packet): """My special little hook that you can customize.""" @@ -117,11 +117,11 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): thread.stop() @abc.abstractmethod - def filter(self, packet): + def filter(self, packet: packets.core.Packet): pass @abc.abstractmethod - def process(self, packet): + def process(self, packet: packets.core.Packet): """This is called when the filter passes.""" @@ -158,7 +158,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): LOG.warning("Watch list enabled, but no callsigns set.") @hookimpl - def filter(self, packet): + def filter(self, packet: packets.core.Packet): result = packets.NULL_MESSAGE if self.enabled: wl = watch_list.WatchList() @@ -210,7 +210,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): self.enabled = True @hookimpl - def filter(self, packet): + def filter(self, packet: packets.core.MessagePacket): result = None message = packet.get("message_text", None) @@ -270,7 +270,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase): def help(self): return "Help: send APRS help or help <plugin>" - def process(self, packet): + def process(self, packet: packets.core.MessagePacket): LOG.info("HelpPlugin") # fromcall = packet.get("from") message = packet.message_text @@ -455,12 +455,12 @@ class PluginManager: LOG.info("Completed Plugin Loading.") - def run(self, packet): + def run(self, packet: packets.core.MessagePacket): """Execute all the pluguns run method.""" with self.lock: return self._pluggy_pm.hook.filter(packet=packet) - def run_watchlist(self, packet): + def run_watchlist(self, packet: packets.core.Packet): with self.lock: return self._watchlist_pm.hook.filter(packet=packet) diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 1e49f4a..e5ea994 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -11,6 +11,7 @@ import time import imapclient from aprsd import packets, plugin, stats, threads +from aprsd.threads import tx from aprsd.utils import trace @@ -464,12 +465,13 @@ def resend_email(config, count, fromcall): from_addr = shortcuts_inverted[from_addr] # asterisk indicates a resend reply = "-" + from_addr + " * " + body.decode(errors="ignore") - pkt = packets.MessagePacket( - from_call=config["aprsd"]["callsign"], - to_call=fromcall, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=config["aprsd"]["callsign"], + to_call=fromcall, + message_text=reply, + ), ) - pkt.send() msgexists = True if msgexists is not True: @@ -486,12 +488,13 @@ def resend_email(config, count, fromcall): str(m).zfill(2), str(s).zfill(2), ) - pkt = packets.MessagePacket( - from_call=config["aprsd"]["callsign"], - to_call=fromcall, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=config["aprsd"]["callsign"], + to_call=fromcall, + message_text=reply, + ), ) - pkt.send() # check email more often since we're resending one now EmailInfo().delay = 60 @@ -606,12 +609,13 @@ class APRSDEmailThread(threads.APRSDThread): reply = "-" + from_addr + " " + body.decode(errors="ignore") # Send the message to the registered user in the # config ham.callsign - pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=self.config["ham"]["callsign"], - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=self.config["ham"]["callsign"], + message_text=reply, + ), ) - pkt.send() # flag message as sent via aprs try: server.add_flags(msgid, ["APRS"]) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index b860c46..2880d70 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -6,7 +6,7 @@ import time import aprslib from aprsd import client, packets, plugin -from aprsd.threads import APRSDThread +from aprsd.threads import APRSDThread, tx LOG = logging.getLogger("APRSD") @@ -131,12 +131,13 @@ class APRSDProcessPacketThread(APRSDThread): # It's a MessagePacket and it's for us! # let any threads do their thing, then ack # send an ack last - ack_pkt = packets.AckPacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - msgNo=msg_id, + tx.send( + packets.AckPacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + msgNo=msg_id, + ), ) - ack_pkt.send() self.process_our_message_packet(packet) else: @@ -175,18 +176,20 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): for subreply in reply: LOG.debug(f"Sending '{subreply}'") if isinstance(subreply, packets.Packet): - subreply.send() + tx.send(subreply) else: - to_call = self.config["aprsd"]["watch_list"]["alert_callsign"] - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=to_call, - message_text=subreply, + wl = self.config["aprsd"]["watch_list"] + to_call = wl["alert_callsign"] + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=to_call, + message_text=subreply, + ), ) - msg_pkt.send() elif isinstance(reply, packets.Packet): # We have a message based object. - reply.send() + tx.send(reply) except Exception as ex: LOG.error("Plugin failed!!!") LOG.exception(ex) @@ -212,17 +215,18 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): for subreply in reply: LOG.debug(f"Sending '{subreply}'") if isinstance(subreply, packets.Packet): - subreply.send() + tx.send(subreply) else: - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text=subreply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=subreply, + ), ) - msg_pkt.send() elif isinstance(reply, packets.Packet): # We have a message based object. - reply.send() + tx.send(reply) replied = True else: replied = True @@ -232,34 +236,38 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): # usage string if reply is not packets.NULL_MESSAGE: LOG.debug(f"Sending '{reply}'") - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, + ), ) - msg_pkt.send() # If the message was for us and we didn't have a # response, then we send a usage statement. if to_call == self.config["aprsd"]["callsign"] and not replied: LOG.warning("Sending help!") - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text="Unknown command! Send 'help' message for help", + message_text = "Unknown command! Send 'help' message for help" + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=message_text, + ), ) - msg_pkt.send() except Exception as ex: LOG.error("Plugin failed!!!") LOG.exception(ex) # Do we need to send a reply? if to_call == self.config["aprsd"]["callsign"]: reply = "A Plugin failed! try again?" - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, + ), ) - msg_pkt.send() LOG.debug("Completed process_our_message_packet") diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index 4511e95..1902ad0 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -4,12 +4,34 @@ import time from aprsd import client from aprsd import threads as aprsd_threads -from aprsd.packets import packet_list, tracker +from aprsd.packets import core, packet_list, tracker LOG = logging.getLogger("APRSD") +def send(packet: core.Packet, direct=False, aprs_client=None): + """Send a packet either in a thread or directly to the client.""" + # prepare the packet for sending. + # This constructs the packet.raw + packet.prepare() + if not direct: + if isinstance(packet, core.AckPacket): + thread = SendAckThread(packet=packet) + else: + thread = SendPacketThread(packet=packet) + thread.start() + else: + if aprs_client: + cl = aprs_client + else: + cl = client.factory.create() + + packet.log(header="TX") + cl.send(packet) + packet_list.PacketList().tx(packet) + + class SendPacketThread(aprsd_threads.APRSDThread): loop_count: int = 1 @@ -37,7 +59,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): # The message has been removed from the tracking queue # So it got acked and we are done. LOG.info( - f"{packet.__class__.__name__}" + f"{self.packet.__class__.__name__}" f"({self.packet.msgNo}) " "Message Send Complete via Ack.", ) @@ -72,10 +94,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): if send_now: # no attempt time, so lets send it, and start # tracking the time. - packet.log("TX") - cl = client.factory.create().client - cl.send(packet.raw) - packet_list.PacketList().tx(packet) + send(packet, direct=True) packet.last_send_time = datetime.datetime.now() packet.send_count += 1 @@ -123,10 +142,7 @@ class SendAckThread(aprsd_threads.APRSDThread): send_now = True if send_now: - cl = client.factory.create().client - self.packet.log("TX") - cl.send(self.packet.raw) - packet_list.PacketList().tx(self.packet) + send(self.packet, direct=True) self.packet.send_count += 1 self.packet.last_send_time = datetime.datetime.now()