Merge pull request #158 from craigerl/client-update

Update client.py to add consumer in the API.
This commit is contained in:
Walter A. Boring IV 2024-04-02 09:26:30 -04:00 committed by GitHub
commit 200944f37a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 71 additions and 14 deletions

View File

@ -40,6 +40,7 @@ class Client:
if cls._instance is None:
cls._instance = super().__new__(cls)
# Put any initialization here.
cls._instance._create_client()
return cls._instance
def set_filter(self, filter):
@ -50,13 +51,20 @@ class Client:
@property
def client(self):
if not self._client:
LOG.info("Creating APRS client")
self._client = self.setup_connection()
if self.filter:
LOG.info("Creating APRS client filter")
self._client.set_filter(self.filter)
self._create_client()
return self._client
def _create_client(self):
self._client = self.setup_connection()
if self.filter:
LOG.info("Creating APRS client filter")
self._client.set_filter(self.filter)
def stop(self):
if self._client:
LOG.info("Stopping client connection.")
self._client.stop()
def send(self, packet: core.Packet):
packet_list.PacketList().tx(packet)
self.client.send(packet)
@ -65,6 +73,7 @@ class Client:
"""Call this to force a rebuild/reconnect."""
if self._client:
del self._client
self._create_client()
else:
LOG.warning("Client not initialized, nothing to reset.")
@ -89,6 +98,10 @@ class Client:
def decode_packet(self, *args, **kwargs):
pass
@abc.abstractmethod
def consumer(self, callback, blocking=False, immortal=False, raw=False):
pass
class APRSISClient(Client):
@ -127,8 +140,10 @@ class APRSISClient(Client):
def is_alive(self):
if self._client:
LOG.warning(f"APRS_CLIENT {self._client} alive? {self._client.is_alive()}")
return self._client.is_alive()
else:
LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!")
return False
@staticmethod
@ -149,7 +164,7 @@ class APRSISClient(Client):
aprs_client = None
while not connected:
try:
LOG.info("Creating aprslib client")
LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.")
aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port)
# Force the log to be the same
aprs_client.logger = LOG
@ -170,10 +185,17 @@ class APRSISClient(Client):
else:
backoff += 1
continue
LOG.debug(f"Logging in to APRS-IS with user '{user}'")
self._client = aprs_client
LOG.warning(f"APRS_CLIENT {aprs_client}")
return aprs_client
def consumer(self, callback, blocking=False, immortal=False, raw=False):
if self.is_alive():
self._client.consumer(
callback, blocking=blocking,
immortal=immortal, raw=raw,
)
class KISSClient(Client):
@ -248,6 +270,9 @@ class KISSClient(Client):
self._client = kiss.KISS3Client()
return self._client
def consumer(self, callback, blocking=False, immortal=False, raw=False):
self._client.consumer(callback)
class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass):
@ -304,7 +329,7 @@ class ClientFactory:
key = TRANSPORT_FAKE
builder = self._builders.get(key)
LOG.debug(f"Creating client {key}")
LOG.debug(f"ClientFactory Creating client of type '{key}'")
if not builder:
raise ValueError(key)
return builder()

View File

@ -81,7 +81,7 @@ class KISS3Client:
LOG.error("Failed to parse bytes received from KISS interface.")
LOG.exception(ex)
def consumer(self, callback, blocking=False, immortal=False, raw=False):
def consumer(self, callback):
LOG.debug("Start blocking KISS consumer")
self._parse_callback = callback
self.kiss.read(callback=self.parse_frame, min_frames=None)

View File

@ -11,6 +11,9 @@ LOG = logging.getLogger("APRSD")
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
"""Base class for all threads in APRSD."""
loop_interval = 1
def __init__(self, name):
super().__init__(name=name)
@ -45,6 +48,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
LOG.debug("Starting")
while not self._should_quit():
can_loop = self.loop()
self.loop_interval += 1
self._last_loop = datetime.datetime.now()
if not can_loop:
self.stop()
@ -84,6 +88,17 @@ class APRSDThreadList:
LOG.info(F"{th.name} packet {th.packet}")
th.stop()
@wrapt.synchronized(lock)
def info(self):
"""Go through all the threads and collect info about each."""
info = {}
for thread in self.threads_list:
alive = thread.is_alive()
age = thread.loop_age()
key = thread.__class__.__name__
info[key] = {"alive": True if alive else False, "age": age, "name": thread.name}
return info
@wrapt.synchronized(lock)
def __len__(self):
return len(self.threads_list)

View File

@ -6,7 +6,7 @@ import time
import aprslib
from oslo_config import cfg
from aprsd import client, packets, plugin
from aprsd import client, packets, plugin, stats
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
@ -23,9 +23,15 @@ class APRSDRXThread(APRSDThread):
def stop(self):
self.thread_stop = True
client.factory.create().client.stop()
if self._client:
self._client.stop()
def loop(self):
LOG.debug(f"RX_MSG-LOOP {self.loop_interval}")
if not self._client:
self._client = client.factory.create()
time.sleep(1)
return True
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
@ -37,10 +43,11 @@ class APRSDRXThread(APRSDThread):
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
self._client.client.consumer(
self.process_packet, raw=False, blocking=False,
LOG.debug(f"Calling client consumer CL {self._client}")
self._client.consumer(
self._process_packet, raw=False, blocking=False,
)
LOG.debug(f"Consumer done {self._client}")
except (
aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError,
@ -51,9 +58,19 @@ class APRSDRXThread(APRSDThread):
# This will cause a reconnect, next time client.get_client()
# is called
self._client.reset()
except Exception as ex:
LOG.error("Something bad happened!!!")
LOG.exception(ex)
return False
# Continue to loop
return True
def _process_packet(self, *args, **kwargs):
"""Intermediate callback so we can update the keepalive time."""
stats.APRSDStats().set_aprsis_keepalive()
# Now call the 'real' packet processing for a RX'x packet
self.process_packet(*args, **kwargs)
@abc.abstractmethod
def process_packet(self, *args, **kwargs):
pass