From a62e4903530a63a6a5dbe22457b8ba3b37584315 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 28 Mar 2024 16:32:55 -0400 Subject: [PATCH] Update client.py to add consumer in the API. This adds a layer between the client object and the actual client instance, so we can reset the actual client object instance upon failure of connection. --- aprsd/client.py | 41 +++++++++++++++++++++++++++++++++-------- aprsd/clients/kiss.py | 2 +- aprsd/threads/aprsd.py | 15 +++++++++++++++ aprsd/threads/rx.py | 27 ++++++++++++++++++++++----- 4 files changed, 71 insertions(+), 14 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 6a8b26d..2561692 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -40,6 +40,7 @@ class Client: if cls._instance is None: cls._instance = super().__new__(cls) # Put any initialization here. + cls._instance._create_client() return cls._instance def set_filter(self, filter): @@ -50,13 +51,20 @@ class Client: @property def client(self): if not self._client: - LOG.info("Creating APRS client") - self._client = self.setup_connection() - if self.filter: - LOG.info("Creating APRS client filter") - self._client.set_filter(self.filter) + self._create_client() return self._client + def _create_client(self): + self._client = self.setup_connection() + if self.filter: + LOG.info("Creating APRS client filter") + self._client.set_filter(self.filter) + + def stop(self): + if self._client: + LOG.info("Stopping client connection.") + self._client.stop() + def send(self, packet: core.Packet): packet_list.PacketList().tx(packet) self.client.send(packet) @@ -65,6 +73,7 @@ class Client: """Call this to force a rebuild/reconnect.""" if self._client: del self._client + self._create_client() else: LOG.warning("Client not initialized, nothing to reset.") @@ -89,6 +98,10 @@ class Client: def decode_packet(self, *args, **kwargs): pass + @abc.abstractmethod + def consumer(self, callback, blocking=False, immortal=False, raw=False): + pass + class APRSISClient(Client): @@ -127,8 +140,10 @@ class APRSISClient(Client): def is_alive(self): if self._client: + LOG.warning(f"APRS_CLIENT {self._client} alive? {self._client.is_alive()}") return self._client.is_alive() else: + LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") return False @staticmethod @@ -149,7 +164,7 @@ class APRSISClient(Client): aprs_client = None while not connected: try: - LOG.info("Creating aprslib client") + LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.") aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) # Force the log to be the same aprs_client.logger = LOG @@ -170,10 +185,17 @@ class APRSISClient(Client): else: backoff += 1 continue - LOG.debug(f"Logging in to APRS-IS with user '{user}'") self._client = aprs_client + LOG.warning(f"APRS_CLIENT {aprs_client}") return aprs_client + def consumer(self, callback, blocking=False, immortal=False, raw=False): + if self.is_alive(): + self._client.consumer( + callback, blocking=blocking, + immortal=immortal, raw=raw, + ) + class KISSClient(Client): @@ -248,6 +270,9 @@ class KISSClient(Client): self._client = kiss.KISS3Client() return self._client + def consumer(self, callback, blocking=False, immortal=False, raw=False): + self._client.consumer(callback) + class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): @@ -304,7 +329,7 @@ class ClientFactory: key = TRANSPORT_FAKE builder = self._builders.get(key) - LOG.debug(f"Creating client {key}") + LOG.debug(f"ClientFactory Creating client of type '{key}'") if not builder: raise ValueError(key) return builder() diff --git a/aprsd/clients/kiss.py b/aprsd/clients/kiss.py index 21b21f9..e4c7bca 100644 --- a/aprsd/clients/kiss.py +++ b/aprsd/clients/kiss.py @@ -81,7 +81,7 @@ class KISS3Client: LOG.error("Failed to parse bytes received from KISS interface.") LOG.exception(ex) - def consumer(self, callback, blocking=False, immortal=False, raw=False): + def consumer(self, callback): LOG.debug("Start blocking KISS consumer") self._parse_callback = callback self.kiss.read(callback=self.parse_frame, min_frames=None) diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index 306abd5..d815af6 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -11,6 +11,9 @@ LOG = logging.getLogger("APRSD") class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + """Base class for all threads in APRSD.""" + + loop_interval = 1 def __init__(self, name): super().__init__(name=name) @@ -45,6 +48,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): LOG.debug("Starting") while not self._should_quit(): can_loop = self.loop() + self.loop_interval += 1 self._last_loop = datetime.datetime.now() if not can_loop: self.stop() @@ -84,6 +88,17 @@ class APRSDThreadList: LOG.info(F"{th.name} packet {th.packet}") th.stop() + @wrapt.synchronized(lock) + def info(self): + """Go through all the threads and collect info about each.""" + info = {} + for thread in self.threads_list: + alive = thread.is_alive() + age = thread.loop_age() + key = thread.__class__.__name__ + info[key] = {"alive": True if alive else False, "age": age, "name": thread.name} + return info + @wrapt.synchronized(lock) def __len__(self): return len(self.threads_list) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 5613368..e78a204 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -6,7 +6,7 @@ import time import aprslib from oslo_config import cfg -from aprsd import client, packets, plugin +from aprsd import client, packets, plugin, stats from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx @@ -23,9 +23,15 @@ class APRSDRXThread(APRSDThread): def stop(self): self.thread_stop = True - client.factory.create().client.stop() + if self._client: + self._client.stop() def loop(self): + LOG.debug(f"RX_MSG-LOOP {self.loop_interval}") + if not self._client: + self._client = client.factory.create() + time.sleep(1) + return True # setup the consumer of messages and block until a messages try: # This will register a packet consumer with aprslib @@ -37,10 +43,11 @@ class APRSDRXThread(APRSDThread): # and the aprslib developer didn't want to allow a PR to add # kwargs. :( # https://github.com/rossengeorgiev/aprs-python/pull/56 - self._client.client.consumer( - self.process_packet, raw=False, blocking=False, + LOG.debug(f"Calling client consumer CL {self._client}") + self._client.consumer( + self._process_packet, raw=False, blocking=False, ) - + LOG.debug(f"Consumer done {self._client}") except ( aprslib.exceptions.ConnectionDrop, aprslib.exceptions.ConnectionError, @@ -51,9 +58,19 @@ class APRSDRXThread(APRSDThread): # This will cause a reconnect, next time client.get_client() # is called self._client.reset() + except Exception as ex: + LOG.error("Something bad happened!!!") + LOG.exception(ex) + return False # Continue to loop return True + def _process_packet(self, *args, **kwargs): + """Intermediate callback so we can update the keepalive time.""" + stats.APRSDStats().set_aprsis_keepalive() + # Now call the 'real' packet processing for a RX'x packet + self.process_packet(*args, **kwargs) + @abc.abstractmethod def process_packet(self, *args, **kwargs): pass