Update client.py to add consumer in the API.

This adds a layer between the client object and the
actual client instance, so we can reset the actual
client object instance upon failure of connection.
This commit is contained in:
Hemna 2024-03-28 16:32:55 -04:00
parent 428edaced9
commit a62e490353
4 changed files with 71 additions and 14 deletions

View File

@ -40,6 +40,7 @@ class Client:
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
# Put any initialization here. # Put any initialization here.
cls._instance._create_client()
return cls._instance return cls._instance
def set_filter(self, filter): def set_filter(self, filter):
@ -50,13 +51,20 @@ class Client:
@property @property
def client(self): def client(self):
if not self._client: if not self._client:
LOG.info("Creating APRS client") self._create_client()
self._client = self.setup_connection()
if self.filter:
LOG.info("Creating APRS client filter")
self._client.set_filter(self.filter)
return self._client return self._client
def _create_client(self):
self._client = self.setup_connection()
if self.filter:
LOG.info("Creating APRS client filter")
self._client.set_filter(self.filter)
def stop(self):
if self._client:
LOG.info("Stopping client connection.")
self._client.stop()
def send(self, packet: core.Packet): def send(self, packet: core.Packet):
packet_list.PacketList().tx(packet) packet_list.PacketList().tx(packet)
self.client.send(packet) self.client.send(packet)
@ -65,6 +73,7 @@ class Client:
"""Call this to force a rebuild/reconnect.""" """Call this to force a rebuild/reconnect."""
if self._client: if self._client:
del self._client del self._client
self._create_client()
else: else:
LOG.warning("Client not initialized, nothing to reset.") LOG.warning("Client not initialized, nothing to reset.")
@ -89,6 +98,10 @@ class Client:
def decode_packet(self, *args, **kwargs): def decode_packet(self, *args, **kwargs):
pass pass
@abc.abstractmethod
def consumer(self, callback, blocking=False, immortal=False, raw=False):
pass
class APRSISClient(Client): class APRSISClient(Client):
@ -127,8 +140,10 @@ class APRSISClient(Client):
def is_alive(self): def is_alive(self):
if self._client: if self._client:
LOG.warning(f"APRS_CLIENT {self._client} alive? {self._client.is_alive()}")
return self._client.is_alive() return self._client.is_alive()
else: else:
LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!")
return False return False
@staticmethod @staticmethod
@ -149,7 +164,7 @@ class APRSISClient(Client):
aprs_client = None aprs_client = None
while not connected: while not connected:
try: try:
LOG.info("Creating aprslib client") LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.")
aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port)
# Force the log to be the same # Force the log to be the same
aprs_client.logger = LOG aprs_client.logger = LOG
@ -170,10 +185,17 @@ class APRSISClient(Client):
else: else:
backoff += 1 backoff += 1
continue continue
LOG.debug(f"Logging in to APRS-IS with user '{user}'")
self._client = aprs_client self._client = aprs_client
LOG.warning(f"APRS_CLIENT {aprs_client}")
return aprs_client return aprs_client
def consumer(self, callback, blocking=False, immortal=False, raw=False):
if self.is_alive():
self._client.consumer(
callback, blocking=blocking,
immortal=immortal, raw=raw,
)
class KISSClient(Client): class KISSClient(Client):
@ -248,6 +270,9 @@ class KISSClient(Client):
self._client = kiss.KISS3Client() self._client = kiss.KISS3Client()
return self._client return self._client
def consumer(self, callback, blocking=False, immortal=False, raw=False):
self._client.consumer(callback)
class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass):
@ -304,7 +329,7 @@ class ClientFactory:
key = TRANSPORT_FAKE key = TRANSPORT_FAKE
builder = self._builders.get(key) builder = self._builders.get(key)
LOG.debug(f"Creating client {key}") LOG.debug(f"ClientFactory Creating client of type '{key}'")
if not builder: if not builder:
raise ValueError(key) raise ValueError(key)
return builder() return builder()

View File

@ -81,7 +81,7 @@ class KISS3Client:
LOG.error("Failed to parse bytes received from KISS interface.") LOG.error("Failed to parse bytes received from KISS interface.")
LOG.exception(ex) LOG.exception(ex)
def consumer(self, callback, blocking=False, immortal=False, raw=False): def consumer(self, callback):
LOG.debug("Start blocking KISS consumer") LOG.debug("Start blocking KISS consumer")
self._parse_callback = callback self._parse_callback = callback
self.kiss.read(callback=self.parse_frame, min_frames=None) self.kiss.read(callback=self.parse_frame, min_frames=None)

View File

@ -11,6 +11,9 @@ LOG = logging.getLogger("APRSD")
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
"""Base class for all threads in APRSD."""
loop_interval = 1
def __init__(self, name): def __init__(self, name):
super().__init__(name=name) super().__init__(name=name)
@ -45,6 +48,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
LOG.debug("Starting") LOG.debug("Starting")
while not self._should_quit(): while not self._should_quit():
can_loop = self.loop() can_loop = self.loop()
self.loop_interval += 1
self._last_loop = datetime.datetime.now() self._last_loop = datetime.datetime.now()
if not can_loop: if not can_loop:
self.stop() self.stop()
@ -84,6 +88,17 @@ class APRSDThreadList:
LOG.info(F"{th.name} packet {th.packet}") LOG.info(F"{th.name} packet {th.packet}")
th.stop() th.stop()
@wrapt.synchronized(lock)
def info(self):
"""Go through all the threads and collect info about each."""
info = {}
for thread in self.threads_list:
alive = thread.is_alive()
age = thread.loop_age()
key = thread.__class__.__name__
info[key] = {"alive": True if alive else False, "age": age, "name": thread.name}
return info
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def __len__(self): def __len__(self):
return len(self.threads_list) return len(self.threads_list)

View File

@ -6,7 +6,7 @@ import time
import aprslib import aprslib
from oslo_config import cfg from oslo_config import cfg
from aprsd import client, packets, plugin from aprsd import client, packets, plugin, stats
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx from aprsd.threads import APRSDThread, tx
@ -23,9 +23,15 @@ class APRSDRXThread(APRSDThread):
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
client.factory.create().client.stop() if self._client:
self._client.stop()
def loop(self): def loop(self):
LOG.debug(f"RX_MSG-LOOP {self.loop_interval}")
if not self._client:
self._client = client.factory.create()
time.sleep(1)
return True
# setup the consumer of messages and block until a messages # setup the consumer of messages and block until a messages
try: try:
# This will register a packet consumer with aprslib # This will register a packet consumer with aprslib
@ -37,10 +43,11 @@ class APRSDRXThread(APRSDThread):
# and the aprslib developer didn't want to allow a PR to add # and the aprslib developer didn't want to allow a PR to add
# kwargs. :( # kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56 # https://github.com/rossengeorgiev/aprs-python/pull/56
self._client.client.consumer( LOG.debug(f"Calling client consumer CL {self._client}")
self.process_packet, raw=False, blocking=False, self._client.consumer(
self._process_packet, raw=False, blocking=False,
) )
LOG.debug(f"Consumer done {self._client}")
except ( except (
aprslib.exceptions.ConnectionDrop, aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError, aprslib.exceptions.ConnectionError,
@ -51,9 +58,19 @@ class APRSDRXThread(APRSDThread):
# This will cause a reconnect, next time client.get_client() # This will cause a reconnect, next time client.get_client()
# is called # is called
self._client.reset() self._client.reset()
except Exception as ex:
LOG.error("Something bad happened!!!")
LOG.exception(ex)
return False
# Continue to loop # Continue to loop
return True return True
def _process_packet(self, *args, **kwargs):
"""Intermediate callback so we can update the keepalive time."""
stats.APRSDStats().set_aprsis_keepalive()
# Now call the 'real' packet processing for a RX'x packet
self.process_packet(*args, **kwargs)
@abc.abstractmethod @abc.abstractmethod
def process_packet(self, *args, **kwargs): def process_packet(self, *args, **kwargs):
pass pass