From 270be947b59d47ba577091be119f6c8de522f3fd Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 17 Sep 2021 09:32:30 -0400 Subject: [PATCH] Refactored client classes This patch completely refactors and simplifies how the clients are created and used. There is no need now to have a separate KISSRXThread. Since all the custom work for the KISS client is encapsulated in the kiss client itself, the same RX thread and callback mechanism works for both the APRSIS client and KISS Client objects. There is also no need to determine which transport (aprsis vs kiss) is being used at runtime by any of the messages objects. The same API works for both APRSIS and KISS Client objects --- aprsd/client.py | 338 +++++++++-------------- aprsd/clients/aprsis.py | 209 ++++++++++++++ aprsd/{kissclient.py => clients/kiss.py} | 92 ++---- aprsd/flask.py | 14 +- aprsd/main.py | 38 +-- aprsd/messaging.py | 46 +-- aprsd/plugin.py | 2 +- aprsd/threads.py | 107 +------ 8 files changed, 409 insertions(+), 437 deletions(-) create mode 100644 aprsd/clients/aprsis.py rename aprsd/{kissclient.py => clients/kiss.py} (54%) diff --git a/aprsd/client.py b/aprsd/client.py index 3e8e198..3b7ee0e 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -1,26 +1,30 @@ +import abc import logging -import select import time import aprslib -from aprslib import is_py3 -from aprslib.exceptions import ( - ConnectionDrop, ConnectionError, GenericError, LoginError, ParseError, - UnknownFormat, -) +from aprslib.exceptions import LoginError -import aprsd -from aprsd import stats +from aprsd import trace +from aprsd.clients import aprsis, kiss LOG = logging.getLogger("APRSD") +TRANSPORT_APRSIS = "aprsis" +TRANSPORT_TCPKISS = "tcpkiss" +TRANSPORT_SERIALKISS = "serialkiss" + +# Main must create this from the ClientFactory +# object such that it's populated with the +# Correct config +factory = None class Client: """Singleton client class that constructs the aprslib connection.""" _instance = None - aprs_client = None + _client = None config = None connected = False @@ -40,14 +44,49 @@ class Client: @property def client(self): - if not self.aprs_client: - self.aprs_client = self.setup_connection() - return self.aprs_client + if not self._client: + self._client = self.setup_connection() + return self._client def reset(self): """Call this to force a rebuild/reconnect.""" - del self.aprs_client + del self._client + @abc.abstractmethod + def setup_connection(self): + pass + + @staticmethod + @abc.abstractmethod + def is_enabled(config): + pass + + @staticmethod + @abc.abstractmethod + def transport(config): + pass + + @abc.abstractmethod + def decode_packet(self, *args, **kwargs): + pass + + +class APRSISClient(Client): + + @staticmethod + def is_enabled(config): + # Defaults to True if the enabled flag is non existent + return config["aprs"].get("enabled", True) + + @staticmethod + def transport(config): + return TRANSPORT_APRSIS + + def decode_packet(self, *args, **kwargs): + """APRS lib already decodes this.""" + return args[0] + + @trace.trace def setup_connection(self): user = self.config["aprs"]["login"] password = self.config["aprs"]["password"] @@ -55,10 +94,11 @@ class Client: port = self.config["aprs"].get("port", 14580) connected = False backoff = 1 + aprs_client = None while not connected: try: LOG.info("Creating aprslib client") - aprs_client = Aprsdis(user, passwd=password, host=host, port=port) + aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) # Force the logging to be the same aprs_client.logger = LOG aprs_client.connect() @@ -77,200 +117,96 @@ class Client: return aprs_client -class Aprsdis(aprslib.IS): - """Extend the aprslib class so we can exit properly.""" +class KISSClient(Client): - # flag to tell us to stop - thread_stop = False + @staticmethod + def is_enabled(config): + """Return if tcp or serial KISS is enabled.""" + if "kiss" not in config: + return False - # timeout in seconds - select_timeout = 1 + if "serial" in config["kiss"]: + if config["kiss"]["serial"].get("enabled", False): + return True - def stop(self): - self.thread_stop = True - LOG.info("Shutdown Aprsdis client.") + if "tcp" in config["kiss"]: + if config["kiss"]["tcp"].get("enabled", False): + return True - def send(self, msg): - """Send an APRS Message object.""" - line = str(msg) - self.sendall(line) + @staticmethod + def transport(config): + if "serial" in config["kiss"]: + if config["kiss"]["serial"].get("enabled", False): + return TRANSPORT_SERIALKISS - def _socket_readlines(self, blocking=False): - """ - Generator for complete lines, received from the server - """ - try: - self.sock.setblocking(0) - except OSError as e: - self.logger.error(f"socket error when setblocking(0): {str(e)}") - raise aprslib.ConnectionDrop("connection dropped") + if "tcp" in config["kiss"]: + if config["kiss"]["tcp"].get("enabled", False): + return TRANSPORT_TCPKISS - while not self.thread_stop: - short_buf = b"" - newline = b"\r\n" + def decode_packet(self, *args, **kwargs): + """We get a frame, which has to be decoded.""" + frame = kwargs["frame"] + LOG.debug(f"Got an APRS Frame '{frame}'") + # try and nuke the * from the fromcall sign. + frame.header._source._ch = False + payload = str(frame.payload.decode()) + msg = f"{str(frame.header)}:{payload}" + # msg = frame.tnc2 + LOG.debug(f"Decoding {msg}") - # set a select timeout, so we get a chance to exit - # when user hits CTRL-C - readable, writable, exceptional = select.select( - [self.sock], - [], - [], - self.select_timeout, - ) - if not readable: - if not blocking: - break - else: - continue + packet = aprslib.parse(msg) + return packet - try: - short_buf = self.sock.recv(4096) - - # sock.recv returns empty if the connection drops - if not short_buf: - if not blocking: - # We could just not be blocking, so empty is expected - continue - else: - self.logger.error("socket.recv(): returned empty") - raise aprslib.ConnectionDrop("connection dropped") - except OSError as e: - # self.logger.error("socket error on recv(): %s" % str(e)) - if "Resource temporarily unavailable" in str(e): - if not blocking: - if len(self.buf) == 0: - break - - self.buf += short_buf - - while newline in self.buf: - line, self.buf = self.buf.split(newline, 1) - - yield line - - def _send_login(self): - """ - Sends login string to server - """ - login_str = "user {0} pass {1} vers github.com/craigerl/aprsd {3}{2}\r\n" - login_str = login_str.format( - self.callsign, - self.passwd, - (" filter " + self.filter) if self.filter != "" else "", - aprsd.__version__, - ) - - self.logger.info("Sending login information") - - try: - self._sendall(login_str) - self.sock.settimeout(5) - test = self.sock.recv(len(login_str) + 100) - if is_py3: - test = test.decode("latin-1") - test = test.rstrip() - - self.logger.debug("Server: %s", test) - - a, b, callsign, status, e = test.split(" ", 4) - s = e.split(",") - if len(s): - server_string = s[0].replace("server ", "") - else: - server_string = e.replace("server ", "") - - self.logger.info(f"Connected to {server_string}") - self.server_string = server_string - stats.APRSDStats().set_aprsis_server(server_string) - - if callsign == "": - raise LoginError("Server responded with empty callsign???") - if callsign != self.callsign: - raise LoginError(f"Server: {test}") - if status != "verified," and self.passwd != "-1": - raise LoginError("Password is incorrect") - - if self.passwd == "-1": - self.logger.info("Login successful (receive only)") - else: - self.logger.info("Login successful") - - except LoginError as e: - self.logger.error(str(e)) - self.close() - raise - except Exception as e: - self.close() - self.logger.error(f"Failed to login '{e}'") - raise LoginError("Failed to login") - - def consumer(self, callback, blocking=True, immortal=False, raw=False): - """ - When a position sentence is received, it will be passed to the callback function - - blocking: if true (default), runs forever, otherwise will return after one sentence - You can still exit the loop, by raising StopIteration in the callback function - - immortal: When true, consumer will try to reconnect and stop propagation of Parse exceptions - if false (default), consumer will return - - raw: when true, raw packet is passed to callback, otherwise the result from aprs.parse() - """ - - if not self._connected: - raise ConnectionError("not connected to a server") - - line = b"" - - while True and not self.thread_stop: - try: - for line in self._socket_readlines(blocking): - if line[0:1] != b"#": - if raw: - callback(line) - else: - callback(self._parse(line)) - else: - self.logger.debug("Server: %s", line.decode("utf8")) - stats.APRSDStats().set_aprsis_keepalive() - except ParseError as exp: - self.logger.log( - 11, - "%s\n Packet: %s", - exp, - exp.packet, - ) - except UnknownFormat as exp: - self.logger.log( - 9, - "%s\n Packet: %s", - exp, - exp.packet, - ) - except LoginError as exp: - self.logger.error("%s: %s", exp.__class__.__name__, exp) - except (KeyboardInterrupt, SystemExit): - raise - except (ConnectionDrop, ConnectionError): - self.close() - - if not immortal: - raise - else: - self.connect(blocking=blocking) - continue - except GenericError: - pass - except StopIteration: - break - except Exception: - self.logger.error("APRS Packet: %s", line) - raise - - if not blocking: - break + @trace.trace + def setup_connection(self): + ax25client = kiss.Aioax25Client(self.config) + return ax25client -def get_client(): - cl = Client() - return cl.client +class ClientFactory: + _instance = None + + def __new__(cls, *args, **kwargs): + """This magic turns this into a singleton.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + # Put any initialization here. + return cls._instance + + def __init__(self, config): + self.config = config + self._builders = {} + + def register(self, key, builder): + self._builders[key] = builder + + def create(self, key=None): + if not key: + if APRSISClient.is_enabled(self.config): + key = TRANSPORT_APRSIS + elif KISSClient.is_enabled(self.config): + key = KISSClient.transport(self.config) + + LOG.debug(f"GET client {key}") + builder = self._builders.get(key) + if not builder: + raise ValueError(key) + return builder(self.config) + + def is_client_enabled(self): + """Make sure at least one client is enabled.""" + enabled = False + for key in self._builders.keys(): + enabled |= self._builders[key].is_enabled(self.config) + + return enabled + + @staticmethod + def setup(config): + """Create and register all possible client objects.""" + global factory + + factory = ClientFactory(config) + factory.register(TRANSPORT_APRSIS, APRSISClient) + factory.register(TRANSPORT_TCPKISS, KISSClient) + factory.register(TRANSPORT_SERIALKISS, KISSClient) diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py new file mode 100644 index 0000000..ac7bdac --- /dev/null +++ b/aprsd/clients/aprsis.py @@ -0,0 +1,209 @@ +import logging +import select + +import aprslib +from aprslib import is_py3 +from aprslib.exceptions import ( + ConnectionDrop, ConnectionError, GenericError, LoginError, ParseError, + UnknownFormat, +) + +import aprsd +from aprsd import stats + + +LOG = logging.getLogger("APRSD") + + +class Aprsdis(aprslib.IS): + """Extend the aprslib class so we can exit properly.""" + + # flag to tell us to stop + thread_stop = False + + # timeout in seconds + select_timeout = 1 + + def stop(self): + self.thread_stop = True + LOG.info("Shutdown Aprsdis client.") + + def send(self, msg): + """Send an APRS Message object.""" + line = str(msg) + self.sendall(line) + + def _socket_readlines(self, blocking=False): + """ + Generator for complete lines, received from the server + """ + try: + self.sock.setblocking(0) + except OSError as e: + self.logger.error(f"socket error when setblocking(0): {str(e)}") + raise aprslib.ConnectionDrop("connection dropped") + + while not self.thread_stop: + short_buf = b"" + newline = b"\r\n" + + # set a select timeout, so we get a chance to exit + # when user hits CTRL-C + readable, writable, exceptional = select.select( + [self.sock], + [], + [], + self.select_timeout, + ) + if not readable: + if not blocking: + break + else: + continue + + try: + short_buf = self.sock.recv(4096) + + # sock.recv returns empty if the connection drops + if not short_buf: + if not blocking: + # We could just not be blocking, so empty is expected + continue + else: + self.logger.error("socket.recv(): returned empty") + raise aprslib.ConnectionDrop("connection dropped") + except OSError as e: + # self.logger.error("socket error on recv(): %s" % str(e)) + if "Resource temporarily unavailable" in str(e): + if not blocking: + if len(self.buf) == 0: + break + + self.buf += short_buf + + while newline in self.buf: + line, self.buf = self.buf.split(newline, 1) + + yield line + + def _send_login(self): + """ + Sends login string to server + """ + login_str = "user {0} pass {1} vers github.com/craigerl/aprsd {3}{2}\r\n" + login_str = login_str.format( + self.callsign, + self.passwd, + (" filter " + self.filter) if self.filter != "" else "", + aprsd.__version__, + ) + + self.logger.info("Sending login information") + + try: + self._sendall(login_str) + self.sock.settimeout(5) + test = self.sock.recv(len(login_str) + 100) + if is_py3: + test = test.decode("latin-1") + test = test.rstrip() + + self.logger.debug("Server: %s", test) + + a, b, callsign, status, e = test.split(" ", 4) + s = e.split(",") + if len(s): + server_string = s[0].replace("server ", "") + else: + server_string = e.replace("server ", "") + + self.logger.info(f"Connected to {server_string}") + self.server_string = server_string + stats.APRSDStats().set_aprsis_server(server_string) + + if callsign == "": + raise LoginError("Server responded with empty callsign???") + if callsign != self.callsign: + raise LoginError(f"Server: {test}") + if status != "verified," and self.passwd != "-1": + raise LoginError("Password is incorrect") + + if self.passwd == "-1": + self.logger.info("Login successful (receive only)") + else: + self.logger.info("Login successful") + + except LoginError as e: + self.logger.error(str(e)) + self.close() + raise + except Exception as e: + self.close() + self.logger.error(f"Failed to login '{e}'") + raise LoginError("Failed to login") + + def consumer(self, callback, blocking=True, immortal=False, raw=False): + """ + When a position sentence is received, it will be passed to the callback function + + blocking: if true (default), runs forever, otherwise will return after one sentence + You can still exit the loop, by raising StopIteration in the callback function + + immortal: When true, consumer will try to reconnect and stop propagation of Parse exceptions + if false (default), consumer will return + + raw: when true, raw packet is passed to callback, otherwise the result from aprs.parse() + """ + + if not self._connected: + raise ConnectionError("not connected to a server") + + line = b"" + + while True and not self.thread_stop: + try: + for line in self._socket_readlines(blocking): + if line[0:1] != b"#": + if raw: + callback(line) + else: + callback(self._parse(line)) + else: + self.logger.debug("Server: %s", line.decode("utf8")) + stats.APRSDStats().set_aprsis_keepalive() + except ParseError as exp: + self.logger.log( + 11, + "%s\n Packet: %s", + exp, + exp.packet, + ) + except UnknownFormat as exp: + self.logger.log( + 9, + "%s\n Packet: %s", + exp, + exp.packet, + ) + except LoginError as exp: + self.logger.error("%s: %s", exp.__class__.__name__, exp) + except (KeyboardInterrupt, SystemExit): + raise + except (ConnectionDrop, ConnectionError): + self.close() + + if not immortal: + raise + else: + self.connect(blocking=blocking) + continue + except GenericError: + pass + except StopIteration: + break + except Exception: + self.logger.error("APRS Packet: %s", line) + raise + + if not blocking: + break diff --git a/aprsd/kissclient.py b/aprsd/clients/kiss.py similarity index 54% rename from aprsd/kissclient.py rename to aprsd/clients/kiss.py index bed12c2..7ea1ce6 100644 --- a/aprsd/kissclient.py +++ b/aprsd/clients/kiss.py @@ -5,83 +5,20 @@ from aioax25 import interface from aioax25 import kiss as kiss from aioax25.aprs import APRSInterface -from aprsd import trace - -TRANSPORT_TCPKISS = "tcpkiss" -TRANSPORT_SERIALKISS = "serialkiss" LOG = logging.getLogger("APRSD") -class KISSClient: - - _instance = None - config = None - ax25client = None - loop = None - - def __new__(cls, *args, **kwargs): - """Singleton for this class.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - # initialize shit here - return cls._instance - - def __init__(self, config=None): - if config: - self.config = config - - @staticmethod - def kiss_enabled(config): - """Return if tcp or serial KISS is enabled.""" - if "kiss" not in config: - return False - - if "serial" in config["kiss"]: - if config["kiss"]["serial"].get("enabled", False): - return True - - if "tcp" in config["kiss"]: - if config["kiss"]["tcp"].get("enabled", False): - return True - - @staticmethod - def transport(config): - if "serial" in config["kiss"]: - if config["kiss"]["serial"].get("enabled", False): - return TRANSPORT_SERIALKISS - - if "tcp" in config["kiss"]: - if config["kiss"]["tcp"].get("enabled", False): - return TRANSPORT_TCPKISS - - @property - def client(self): - if not self.ax25client: - self.ax25client = self.setup_connection() - return self.ax25client - - def reset(self): - """Call this to fore a rebuild/reconnect.""" - self.ax25client.stop() - del self.ax25client - - @trace.trace - def setup_connection(self): - ax25client = Aioax25Client(self.config) - LOG.debug("Complete") - return ax25client - - class Aioax25Client: def __init__(self, config): self.config = config + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.loop = asyncio.get_event_loop() self.setup() def setup(self): # we can be TCP kiss or Serial kiss - - self.loop = asyncio.get_event_loop() if "serial" in self.config["kiss"] and self.config["kiss"]["serial"].get( "enabled", False, @@ -131,10 +68,20 @@ class Aioax25Client: self.kissdev._close() self.loop.stop() - def consumer(self, callback, callsign=None): - if not callsign: - callsign = self.config["ham"]["callsign"] - self.aprsint.bind(callback=callback, callsign="WB4BOR", ssid=12, regex=False) + def set_filter(self, filter): + # This does nothing right now. + pass + + def consumer(self, callback, blocking=True, immortal=False, raw=False): + callsign = self.config["kiss"]["callsign"] + call = callsign.split("-") + if len(call) > 1: + callsign = call[0] + ssid = int(call[1]) + else: + ssid = 0 + self.aprsint.bind(callback=callback, callsign=callsign, ssid=ssid, regex=False) + self.loop.run_forever() def send(self, msg): """Send an APRS Message object.""" @@ -145,8 +92,3 @@ class Aioax25Client: path=["WIDE1-1", "WIDE2-1"], oneshot=True, ) - - -def get_client(): - cl = KISSClient() - return cl.client diff --git a/aprsd/flask.py b/aprsd/flask.py index ede31db..ce5ab36 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -19,7 +19,8 @@ from werkzeug.security import check_password_hash, generate_password_hash import aprsd from aprsd import client from aprsd import config as aprsd_config -from aprsd import kissclient, messaging, packets, plugin, stats, threads, utils +from aprsd import messaging, packets, plugin, stats, threads, utils +from aprsd.clients import aprsis LOG = logging.getLogger("APRSD") @@ -136,7 +137,8 @@ class SendMessageThread(threads.APRSDThread): while not connected: try: LOG.info("Creating aprslib client") - aprs_client = client.Aprsdis( + + aprs_client = aprsis.Aprsdis( user, passwd=password, host=host, @@ -312,16 +314,16 @@ class APRSDFlask(flask_classful.FlaskView): ) else: # We might be connected to a KISS socket? - if kissclient.KISSClient.kiss_enabled(self.config): - transport = kissclient.KISSClient.transport(self.config) - if transport == kissclient.TRANSPORT_TCPKISS: + if client.KISSClient.kiss_enabled(self.config): + transport = client.KISSClient.transport(self.config) + if transport == client.TRANSPORT_TCPKISS: aprs_connection = ( "TCPKISS://{}:{}".format( self.config["kiss"]["tcp"]["host"], self.config["kiss"]["tcp"]["port"], ) ) - elif transport == kissclient.TRANSPORT_SERIALKISS: + elif transport == client.TRANSPORT_SERIALKISS: aprs_connection = ( "SerialKISS://{}@{} baud".format( self.config["kiss"]["serial"]["device"], diff --git a/aprsd/main.py b/aprsd/main.py index 91b4816..3bebc80 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -37,7 +37,7 @@ import click_completion # local imports here import aprsd from aprsd import ( - flask, kissclient, messaging, packets, plugin, stats, threads, trace, utils, + flask, messaging, packets, plugin, stats, threads, trace, utils, ) from aprsd import client from aprsd import config as aprsd_config @@ -463,23 +463,13 @@ def server( trace.setup_tracing(["method", "api"]) stats.APRSDStats(config) - if config["aprs"].get("enabled", True): - try: - cl = client.Client(config) - cl.client - except LoginError: - sys.exit(-1) - - rx_thread = threads.APRSDRXThread( - msg_queues=threads.msg_queues, - config=config, - ) - rx_thread.start() - else: - LOG.info( - "APRS network connection Not Enabled in config. This is" - " for setups without internet connectivity.", - ) + # Initialize the client factory and create + # The correct client object ready for use + client.ClientFactory.setup(config) + # Make sure we have 1 client transport enabled + if not client.factory.is_client_enabled(): + LOG.error("No Clients are enabled in config.") + sys.exit(-1) # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) @@ -497,13 +487,11 @@ def server( packets.PacketList(config=config) packets.WatchList(config=config) - if kissclient.KISSClient.kiss_enabled(config): - kcl = kissclient.KISSClient(config=config) - # This initializes the client object. - kcl.client - - kissrx_thread = threads.KISSRXThread(msg_queues=threads.msg_queues, config=config) - kissrx_thread.start() + rx_thread = threads.APRSDRXThread( + msg_queues=threads.msg_queues, + config=config, + ) + rx_thread.start() messaging.MsgTrack().restart() diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 1a983a1..f80574c 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -11,7 +11,7 @@ import time from aprsd import client from aprsd import config as aprsd_config -from aprsd import kissclient, packets, stats, threads, trace +from aprsd import packets, stats, threads LOG = logging.getLogger("APRSD") @@ -20,10 +20,6 @@ LOG = logging.getLogger("APRSD") # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 -MESSAGE_TRANSPORT_TCPKISS = "tcpkiss" -MESSAGE_TRANSPORT_SERIALKISS = "serialkiss" -MESSAGE_TRANSPORT_APRSIS = "aprsis" - class MsgTrack: """Class to keep track of outstanding text messages. @@ -36,13 +32,6 @@ class MsgTrack: automatically adds itself to this class. When the ack is recieved from the radio, the message object is removed from this class. - - # TODO(hemna) - When aprsd is asked to quit this class should be serialized and - saved to disk/db to keep track of the state of outstanding messages. - When aprsd is started, it should try and fetch the saved state, - and reloaded to a live state. - """ _instance = None @@ -241,7 +230,6 @@ class Message(metaclass=abc.ABCMeta): fromcall, tocall, msg_id=None, - transport=MESSAGE_TRANSPORT_APRSIS, ): self.fromcall = fromcall self.tocall = tocall @@ -250,18 +238,11 @@ class Message(metaclass=abc.ABCMeta): c.increment() msg_id = c.value self.id = msg_id - self.transport = transport @abc.abstractmethod def send(self): """Child class must declare.""" - def get_transport(self): - if self.transport == MESSAGE_TRANSPORT_APRSIS: - return client.get_client() - elif self.transport == MESSAGE_TRANSPORT_TCPKISS: - return kissclient.get_client() - class RawMessage(Message): """Send a raw message. @@ -273,8 +254,8 @@ class RawMessage(Message): message = None - def __init__(self, message, transport=MESSAGE_TRANSPORT_APRSIS): - super().__init__(None, None, msg_id=None, transport=transport) + def __init__(self, message): + super().__init__(None, None, msg_id=None) self.message = message def dict(self): @@ -303,7 +284,7 @@ class RawMessage(Message): def send_direct(self, aprsis_client=None): """Send a message without a separate thread.""" - cl = self.get_transport() + cl = client.factory.create().client log_message( "Sending Message Direct", str(self).rstrip("\n"), @@ -312,7 +293,7 @@ class RawMessage(Message): fromcall=self.fromcall, ) cl.send(self) - stats.APRSDStats().msgs_sent_inc() + stats.APRSDStats().msgs_tx_inc() class TextMessage(Message): @@ -327,9 +308,8 @@ class TextMessage(Message): message, msg_id=None, allow_delay=True, - transport=MESSAGE_TRANSPORT_APRSIS, ): - super().__init__(fromcall, tocall, msg_id, transport=transport) + super().__init__(fromcall, tocall, msg_id) self.message = message # do we try and save this message for later if we don't get # an ack? Some messages we don't want to do this ever. @@ -386,7 +366,7 @@ class TextMessage(Message): if aprsis_client: cl = aprsis_client else: - cl = self.get_transport() + cl = client.factory.create().client log_message( "Sending Message Direct", str(self).rstrip("\n"), @@ -424,7 +404,6 @@ class SendMessageThread(threads.APRSDThread): LOG.info("Message Send Complete via Ack.") return False else: - cl = msg.get_transport() send_now = False if msg.last_send_attempt == msg.retry_count: # we reached the send limit, don't send again @@ -455,6 +434,7 @@ class SendMessageThread(threads.APRSDThread): retry_number=msg.last_send_attempt, msg_num=msg.id, ) + cl = client.factory.create().client cl.send(msg) stats.APRSDStats().msgs_tx_inc() packets.PacketList().add(msg.dict()) @@ -469,8 +449,8 @@ class SendMessageThread(threads.APRSDThread): class AckMessage(Message): """Class for building Acks and sending them.""" - def __init__(self, fromcall, tocall, msg_id, transport=MESSAGE_TRANSPORT_APRSIS): - super().__init__(fromcall, tocall, msg_id=msg_id, transport=transport) + def __init__(self, fromcall, tocall, msg_id): + super().__init__(fromcall, tocall, msg_id=msg_id) def dict(self): now = datetime.datetime.now() @@ -509,7 +489,7 @@ class AckMessage(Message): if aprsis_client: cl = aprsis_client else: - cl = self.get_transport() + cl = client.factory.create().client log_message( "Sending ack", str(self).rstrip("\n"), @@ -526,10 +506,8 @@ class SendAckThread(threads.APRSDThread): self.ack = ack super().__init__(f"SendAck-{self.ack.id}") - @trace.trace def loop(self): """Separate thread to send acks with retries.""" - LOG.debug("SendAckThread loop start") send_now = False if self.ack.last_send_attempt == self.ack.retry_count: # we reached the send limit, don't send again @@ -554,7 +532,7 @@ class SendAckThread(threads.APRSDThread): send_now = True if send_now: - cl = self.ack.get_transport() + cl = client.factory.create().client log_message( "Sending ack", str(self.ack).rstrip("\n"), diff --git a/aprsd/plugin.py b/aprsd/plugin.py index fe5efee..d7fb0bc 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -158,7 +158,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): ) # make sure the timeout is set or this doesn't work if watch_list: - aprs_client = client.get_client() + aprs_client = client.factory.create().client filter_str = "b/{}".format("/".join(watch_list)) aprs_client.set_filter(filter_str) else: diff --git a/aprsd/threads.py b/aprsd/threads.py index 1132c0a..8cf75f0 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -8,7 +8,7 @@ import tracemalloc import aprslib -from aprsd import client, kissclient, messaging, packets, plugin, stats, utils +from aprsd import client, messaging, packets, plugin, stats, utils LOG = logging.getLogger("APRSD") @@ -137,9 +137,9 @@ class KeepAliveThread(APRSDThread): if delta > self.max_delta: # We haven't gotten a keepalive from aprs-is in a while # reset the connection.a - if not kissclient.KISSClient.kiss_enabled(self.config): + if not client.KISSClient.is_enabled(self.config): LOG.warning("Resetting connection to APRS-IS.") - client.Client().reset() + client.factory.create().reset() # Check version every hour delta = now - self.checker_time @@ -158,13 +158,13 @@ class APRSDRXThread(APRSDThread): super().__init__("RX_MSG") self.msg_queues = msg_queues self.config = config + self._client = client.factory.create() def stop(self): self.thread_stop = True - client.get_client().stop() + client.factory.create().client.stop() def loop(self): - aprs_client = client.get_client() # setup the consumer of messages and block until a messages try: @@ -177,7 +177,9 @@ class APRSDRXThread(APRSDThread): # and the aprslib developer didn't want to allow a PR to add # kwargs. :( # https://github.com/rossengeorgiev/aprs-python/pull/56 - aprs_client.consumer(self.process_packet, raw=False, blocking=False) + self._client.client.consumer( + self.process_packet, raw=False, blocking=False, + ) except aprslib.exceptions.ConnectionDrop: LOG.error("Connection dropped, reconnecting") @@ -185,21 +187,21 @@ class APRSDRXThread(APRSDThread): # Force the deletion of the client object connected to aprs # This will cause a reconnect, next time client.get_client() # is called - client.Client().reset() + self._client.reset() # Continue to loop return True - def process_packet(self, packet): + def process_packet(self, *args, **kwargs): + packet = self._client.decode_packet(*args, **kwargs) thread = APRSDProcessPacketThread(packet=packet, config=self.config) thread.start() class APRSDProcessPacketThread(APRSDThread): - def __init__(self, packet, config, transport="aprsis"): + def __init__(self, packet, config): self.packet = packet self.config = config - self.transport = transport name = self.packet["raw"][:10] super().__init__(f"RX_PACKET-{name}") @@ -254,7 +256,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, msg_id=msg_id, - transport=self.transport, ) ack.send() @@ -275,7 +276,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, subreply, - transport=self.transport, ) msg.send() elif isinstance(reply, messaging.Message): @@ -296,7 +296,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, reply, - transport=self.transport, ) msg.send() @@ -309,7 +308,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, reply, - transport=self.transport, ) msg.send() except Exception as ex: @@ -321,88 +319,7 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, reply, - transport=self.transport, ) msg.send() LOG.debug("Packet processing complete") - - -class APRSDTXThread(APRSDThread): - def __init__(self, msg_queues, config): - super().__init__("TX_MSG") - self.msg_queues = msg_queues - self.config = config - - def loop(self): - try: - msg = self.msg_queues["tx"].get(timeout=1) - msg.send() - except queue.Empty: - pass - # Continue to loop - return True - - -class KISSRXThread(APRSDThread): - """Thread that connects to direwolf's TCPKISS interface. - - All Packets are processed and sent back out the direwolf - interface instead of the aprs-is server. - - """ - - def __init__(self, msg_queues, config): - super().__init__("KISSRX_MSG") - self.msg_queues = msg_queues - self.config = config - - def stop(self): - self.thread_stop = True - kissclient.get_client().stop() - - def loop(self): - kiss_client = kissclient.get_client() - - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - kiss_client.consumer(self.process_packet, callsign=self.config["kiss"]["callsign"]) - kiss_client.loop.run_forever() - - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() - # Continue to loop - - def process_packet(self, interface, frame): - """Process a packet recieved from aprs-is server.""" - - LOG.debug(f"Got an APRS Frame '{frame}'") - # try and nuke the * from the fromcall sign. - frame.header._source._ch = False - payload = str(frame.payload.decode()) - msg = f"{str(frame.header)}:{payload}" - # msg = frame.tnc2 - LOG.debug(f"Decoding {msg}") - - packet = aprslib.parse(msg) - LOG.debug(packet) - thread = APRSDProcessPacketThread( - packet=packet, config=self.config, - transport=messaging.MESSAGE_TRANSPORT_TCPKISS, - ) - thread.start() - return