From b317d0eb6362747ebc5cb46e274f45fad5cc7a56 Mon Sep 17 00:00:00 2001 From: Hemna Date: Sat, 18 May 2024 14:09:53 -0400 Subject: [PATCH] Refactor client and drivers this patch refactors the client, drivers and client factory to use the same Protocol mechanism used by the stats collector to construct the proper client to be used according to the configuration --- aprsd/client.py | 461 ------------------ aprsd/client/__init__.py | 13 + aprsd/client/aprsis.py | 132 +++++ aprsd/client/base.py | 105 ++++ aprsd/{clients => client/drivers}/__init__.py | 0 aprsd/{clients => client/drivers}/aprsis.py | 0 aprsd/{clients => client/drivers}/fake.py | 0 aprsd/{clients => client/drivers}/kiss.py | 0 aprsd/client/factory.py | 88 ++++ aprsd/client/fake.py | 48 ++ aprsd/client/kiss.py | 103 ++++ aprsd/client/stats.py | 38 ++ aprsd/cmds/dev.py | 5 +- aprsd/cmds/listen.py | 8 +- aprsd/cmds/send_message.py | 10 +- aprsd/cmds/server.py | 16 +- aprsd/cmds/webchat.py | 10 +- aprsd/plugin.py | 2 +- aprsd/stats/__init__.py | 4 +- aprsd/threads/keep_alive.py | 7 +- aprsd/threads/log_monitor.py | 6 +- aprsd/threads/rx.py | 7 +- aprsd/threads/tx.py | 6 +- tests/plugins/test_notify.py | 20 +- 24 files changed, 576 insertions(+), 513 deletions(-) delete mode 100644 aprsd/client.py create mode 100644 aprsd/client/__init__.py create mode 100644 aprsd/client/aprsis.py create mode 100644 aprsd/client/base.py rename aprsd/{clients => client/drivers}/__init__.py (100%) rename aprsd/{clients => client/drivers}/aprsis.py (100%) rename aprsd/{clients => client/drivers}/fake.py (100%) rename aprsd/{clients => client/drivers}/kiss.py (100%) create mode 100644 aprsd/client/factory.py create mode 100644 aprsd/client/fake.py create mode 100644 aprsd/client/kiss.py create mode 100644 aprsd/client/stats.py diff --git a/aprsd/client.py b/aprsd/client.py deleted file mode 100644 index 52d14ef..0000000 --- a/aprsd/client.py +++ /dev/null @@ -1,461 +0,0 @@ -import abc -import datetime -import logging -import threading -import time - -import aprslib -from aprslib.exceptions import LoginError -from oslo_config import cfg -import wrapt - -from aprsd import exception -from aprsd.clients import aprsis, fake, kiss -from aprsd.packets import core -from aprsd.utils import singleton, trace - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") -TRANSPORT_APRSIS = "aprsis" -TRANSPORT_TCPKISS = "tcpkiss" -TRANSPORT_SERIALKISS = "serialkiss" -TRANSPORT_FAKE = "fake" - -# Main must create this from the ClientFactory -# object such that it's populated with the -# Correct config -factory = None - - -@singleton -class APRSClientStats: - - lock = threading.Lock() - - @wrapt.synchronized(lock) - def stats(self, serializable=False): - client = factory.create() - stats = { - "transport": client.transport(), - "filter": client.filter, - "connected": client.connected, - } - - if client.transport() == TRANSPORT_APRSIS: - stats["server_string"] = client.client.server_string - keepalive = client.client.aprsd_keepalive - if serializable: - keepalive = keepalive.isoformat() - stats["server_keepalive"] = keepalive - elif client.transport() == TRANSPORT_TCPKISS: - stats["host"] = CONF.kiss_tcp.host - stats["port"] = CONF.kiss_tcp.port - elif client.transport() == TRANSPORT_SERIALKISS: - stats["device"] = CONF.kiss_serial.device - return stats - - -class Client: - """Singleton client class that constructs the aprslib connection.""" - - _instance = None - _client = None - - connected = False - filter = None - lock = threading.Lock() - - def __new__(cls, *args, **kwargs): - """This magic turns this into a singleton.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - # Put any initialization here. - cls._instance._create_client() - return cls._instance - - @abc.abstractmethod - def stats(self) -> dict: - pass - - def set_filter(self, filter): - self.filter = filter - if self._client: - self._client.set_filter(filter) - - @property - def client(self): - if not self._client: - self._create_client() - return self._client - - def _create_client(self): - self._client = self.setup_connection() - if self.filter: - LOG.info("Creating APRS client filter") - self._client.set_filter(self.filter) - - def stop(self): - if self._client: - LOG.info("Stopping client connection.") - self._client.stop() - - def send(self, packet: core.Packet): - """Send a packet to the network.""" - self.client.send(packet) - - @wrapt.synchronized(lock) - def reset(self): - """Call this to force a rebuild/reconnect.""" - LOG.info("Resetting client connection.") - if self._client: - self._client.close() - del self._client - self._create_client() - else: - LOG.warning("Client not initialized, nothing to reset.") - - # Recreate the client - LOG.info(f"Creating new client {self.client}") - - @abc.abstractmethod - def setup_connection(self): - pass - - @staticmethod - @abc.abstractmethod - def is_enabled(): - pass - - @staticmethod - @abc.abstractmethod - def transport(): - pass - - @abc.abstractmethod - def decode_packet(self, *args, **kwargs): - pass - - @abc.abstractmethod - def consumer(self, callback, blocking=False, immortal=False, raw=False): - pass - - @abc.abstractmethod - def is_alive(self): - pass - - @abc.abstractmethod - def close(self): - pass - - -class APRSISClient(Client): - - _client = None - - def __init__(self): - max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} - self.max_delta = datetime.timedelta(**max_timeout) - - def stats(self) -> dict: - stats = {} - if self.is_configured(): - stats = { - "server_string": self._client.server_string, - "sever_keepalive": self._client.aprsd_keepalive, - "filter": self.filter, - } - - return stats - - @staticmethod - def is_enabled(): - # Defaults to True if the enabled flag is non existent - try: - return CONF.aprs_network.enabled - except KeyError: - return False - - @staticmethod - def is_configured(): - if APRSISClient.is_enabled(): - # Ensure that the config vars are correctly set - if not CONF.aprs_network.login: - LOG.error("Config aprs_network.login not set.") - raise exception.MissingConfigOptionException( - "aprs_network.login is not set.", - ) - if not CONF.aprs_network.password: - LOG.error("Config aprs_network.password not set.") - raise exception.MissingConfigOptionException( - "aprs_network.password is not set.", - ) - if not CONF.aprs_network.host: - LOG.error("Config aprs_network.host not set.") - raise exception.MissingConfigOptionException( - "aprs_network.host is not set.", - ) - - return True - return True - - def _is_stale_connection(self): - delta = datetime.datetime.now() - self._client.aprsd_keepalive - if delta > self.max_delta: - LOG.error(f"Connection is stale, last heard {delta} ago.") - return True - - def is_alive(self): - if self._client: - return self._client.is_alive() and not self._is_stale_connection() - else: - LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") - return False - - def close(self): - if self._client: - self._client.stop() - self._client.close() - - @staticmethod - def transport(): - return TRANSPORT_APRSIS - - def decode_packet(self, *args, **kwargs): - """APRS lib already decodes this.""" - return core.factory(args[0]) - - def setup_connection(self): - user = CONF.aprs_network.login - password = CONF.aprs_network.password - host = CONF.aprs_network.host - port = CONF.aprs_network.port - self.connected = False - backoff = 1 - aprs_client = None - while not self.connected: - try: - LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.") - aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) - # Force the log to be the same - aprs_client.logger = LOG - aprs_client.connect() - self.connected = True - backoff = 1 - except LoginError as e: - LOG.error(f"Failed to login to APRS-IS Server '{e}'") - self.connected = False - time.sleep(backoff) - except Exception as e: - LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") - self.connected = False - time.sleep(backoff) - # Don't allow the backoff to go to inifinity. - if backoff > 5: - backoff = 5 - else: - backoff += 1 - continue - self._client = aprs_client - return aprs_client - - def consumer(self, callback, blocking=False, immortal=False, raw=False): - self._client.consumer( - callback, blocking=blocking, - immortal=immortal, raw=raw, - ) - - -class KISSClient(Client): - - _client = None - - def stats(self) -> dict: - stats = {} - if self.is_configured(): - return { - "transport": self.transport(), - } - return stats - - @staticmethod - def is_enabled(): - """Return if tcp or serial KISS is enabled.""" - if CONF.kiss_serial.enabled: - return True - - if CONF.kiss_tcp.enabled: - return True - - return False - - @staticmethod - def is_configured(): - # Ensure that the config vars are correctly set - if KISSClient.is_enabled(): - transport = KISSClient.transport() - if transport == TRANSPORT_SERIALKISS: - if not CONF.kiss_serial.device: - LOG.error("KISS serial enabled, but no device is set.") - raise exception.MissingConfigOptionException( - "kiss_serial.device is not set.", - ) - elif transport == TRANSPORT_TCPKISS: - if not CONF.kiss_tcp.host: - LOG.error("KISS TCP enabled, but no host is set.") - raise exception.MissingConfigOptionException( - "kiss_tcp.host is not set.", - ) - - return True - return False - - def is_alive(self): - if self._client: - return self._client.is_alive() - else: - return False - - def close(self): - if self._client: - self._client.stop() - - @staticmethod - def transport(): - if CONF.kiss_serial.enabled: - return TRANSPORT_SERIALKISS - - if CONF.kiss_tcp.enabled: - return TRANSPORT_TCPKISS - - def decode_packet(self, *args, **kwargs): - """We get a frame, which has to be decoded.""" - LOG.debug(f"kwargs {kwargs}") - frame = kwargs["frame"] - LOG.debug(f"Got an APRS Frame '{frame}'") - # try and nuke the * from the fromcall sign. - # frame.header._source._ch = False - # payload = str(frame.payload.decode()) - # msg = f"{str(frame.header)}:{payload}" - # msg = frame.tnc2 - # LOG.debug(f"Decoding {msg}") - - raw = aprslib.parse(str(frame)) - packet = core.factory(raw) - if isinstance(packet, core.ThirdParty): - return packet.subpacket - else: - return packet - - def setup_connection(self): - self._client = kiss.KISS3Client() - self.connected = True - return self._client - - def consumer(self, callback, blocking=False, immortal=False, raw=False): - self._client.consumer(callback) - - -class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): - - def stats(self) -> dict: - return {} - - @staticmethod - def is_enabled(): - if CONF.fake_client.enabled: - return True - return False - - @staticmethod - def is_configured(): - return APRSDFakeClient.is_enabled() - - def is_alive(self): - return True - - def close(self): - pass - - def setup_connection(self): - self.connected = True - return fake.APRSDFakeClient() - - @staticmethod - def transport(): - return TRANSPORT_FAKE - - def decode_packet(self, *args, **kwargs): - LOG.debug(f"kwargs {kwargs}") - pkt = kwargs["packet"] - LOG.debug(f"Got an APRS Fake Packet '{pkt}'") - return pkt - - -class ClientFactory: - _instance = None - - def __new__(cls, *args, **kwargs): - """This magic turns this into a singleton.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - # Put any initialization here. - return cls._instance - - def __init__(self): - self._builders = {} - - def register(self, key, builder): - self._builders[key] = builder - - def create(self, key=None): - if not key: - if APRSISClient.is_enabled(): - key = TRANSPORT_APRSIS - elif KISSClient.is_enabled(): - key = KISSClient.transport() - elif APRSDFakeClient.is_enabled(): - key = TRANSPORT_FAKE - - builder = self._builders.get(key) - if not builder: - raise ValueError(key) - return builder() - - def is_client_enabled(self): - """Make sure at least one client is enabled.""" - enabled = False - for key in self._builders.keys(): - try: - enabled |= self._builders[key].is_enabled() - except KeyError: - pass - - return enabled - - def is_client_configured(self): - enabled = False - for key in self._builders.keys(): - try: - enabled |= self._builders[key].is_configured() - except KeyError: - pass - except exception.MissingConfigOptionException as ex: - LOG.error(ex.message) - return False - except exception.ConfigOptionBogusDefaultException as ex: - LOG.error(ex.message) - return False - - return enabled - - @staticmethod - def setup(): - """Create and register all possible client objects.""" - global factory - - factory = ClientFactory() - factory.register(TRANSPORT_APRSIS, APRSISClient) - factory.register(TRANSPORT_TCPKISS, KISSClient) - factory.register(TRANSPORT_SERIALKISS, KISSClient) - factory.register(TRANSPORT_FAKE, APRSDFakeClient) diff --git a/aprsd/client/__init__.py b/aprsd/client/__init__.py new file mode 100644 index 0000000..28dd803 --- /dev/null +++ b/aprsd/client/__init__.py @@ -0,0 +1,13 @@ +from aprsd.client import aprsis, factory, fake, kiss + + +TRANSPORT_APRSIS = "aprsis" +TRANSPORT_TCPKISS = "tcpkiss" +TRANSPORT_SERIALKISS = "serialkiss" +TRANSPORT_FAKE = "fake" + + +client_factory = factory.ClientFactory() +client_factory.register(aprsis.APRSISClient) +client_factory.register(kiss.KISSClient) +client_factory.register(fake.APRSDFakeClient) diff --git a/aprsd/client/aprsis.py b/aprsd/client/aprsis.py new file mode 100644 index 0000000..2079be0 --- /dev/null +++ b/aprsd/client/aprsis.py @@ -0,0 +1,132 @@ +import datetime +import logging +import time + +from aprslib.exceptions import LoginError +from oslo_config import cfg + +from aprsd import client, exception +from aprsd.client import base +from aprsd.client.drivers import aprsis +from aprsd.packets import core + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class APRSISClient(base.APRSClient): + + _client = None + + def __init__(self): + max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} + self.max_delta = datetime.timedelta(**max_timeout) + + def stats(self) -> dict: + stats = {} + if self.is_configured(): + stats = { + "server_string": self._client.server_string, + "sever_keepalive": self._client.aprsd_keepalive, + "filter": self.filter, + } + + return stats + + @staticmethod + def is_enabled(): + # Defaults to True if the enabled flag is non existent + try: + return CONF.aprs_network.enabled + except KeyError: + return False + + @staticmethod + def is_configured(): + if APRSISClient.is_enabled(): + # Ensure that the config vars are correctly set + if not CONF.aprs_network.login: + LOG.error("Config aprs_network.login not set.") + raise exception.MissingConfigOptionException( + "aprs_network.login is not set.", + ) + if not CONF.aprs_network.password: + LOG.error("Config aprs_network.password not set.") + raise exception.MissingConfigOptionException( + "aprs_network.password is not set.", + ) + if not CONF.aprs_network.host: + LOG.error("Config aprs_network.host not set.") + raise exception.MissingConfigOptionException( + "aprs_network.host is not set.", + ) + + return True + return True + + def _is_stale_connection(self): + delta = datetime.datetime.now() - self._client.aprsd_keepalive + if delta > self.max_delta: + LOG.error(f"Connection is stale, last heard {delta} ago.") + return True + + def is_alive(self): + if self._client: + return self._client.is_alive() and not self._is_stale_connection() + else: + LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") + return False + + def close(self): + if self._client: + self._client.stop() + self._client.close() + + @staticmethod + def transport(): + return client.TRANSPORT_APRSIS + + def decode_packet(self, *args, **kwargs): + """APRS lib already decodes this.""" + return core.factory(args[0]) + + def setup_connection(self): + user = CONF.aprs_network.login + password = CONF.aprs_network.password + host = CONF.aprs_network.host + port = CONF.aprs_network.port + self.connected = False + backoff = 1 + aprs_client = None + while not self.connected: + try: + LOG.info(f"Creating aprslib client({host}:{port}) and logging in {user}.") + aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) + # Force the log to be the same + aprs_client.logger = LOG + aprs_client.connect() + self.connected = True + backoff = 1 + except LoginError as e: + LOG.error(f"Failed to login to APRS-IS Server '{e}'") + self.connected = False + time.sleep(backoff) + except Exception as e: + LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") + self.connected = False + time.sleep(backoff) + # Don't allow the backoff to go to inifinity. + if backoff > 5: + backoff = 5 + else: + backoff += 1 + continue + self._client = aprs_client + return aprs_client + + def consumer(self, callback, blocking=False, immortal=False, raw=False): + self._client.consumer( + callback, blocking=blocking, + immortal=immortal, raw=raw, + ) diff --git a/aprsd/client/base.py b/aprsd/client/base.py new file mode 100644 index 0000000..252a5f9 --- /dev/null +++ b/aprsd/client/base.py @@ -0,0 +1,105 @@ +import abc +import logging +import threading + +from oslo_config import cfg +import wrapt + +from aprsd.packets import core + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class APRSClient: + """Singleton client class that constructs the aprslib connection.""" + + _instance = None + _client = None + + connected = False + filter = None + lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + """This magic turns this into a singleton.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + # Put any initialization here. + cls._instance._create_client() + return cls._instance + + @abc.abstractmethod + def stats(self) -> dict: + pass + + def set_filter(self, filter): + self.filter = filter + if self._client: + self._client.set_filter(filter) + + @property + def client(self): + if not self._client: + self._create_client() + return self._client + + def _create_client(self): + self._client = self.setup_connection() + if self.filter: + LOG.info("Creating APRS client filter") + self._client.set_filter(self.filter) + + def stop(self): + if self._client: + LOG.info("Stopping client connection.") + self._client.stop() + + def send(self, packet: core.Packet): + """Send a packet to the network.""" + self.client.send(packet) + + @wrapt.synchronized(lock) + def reset(self): + """Call this to force a rebuild/reconnect.""" + LOG.info("Resetting client connection.") + if self._client: + self._client.close() + del self._client + self._create_client() + else: + LOG.warning("Client not initialized, nothing to reset.") + + # Recreate the client + LOG.info(f"Creating new client {self.client}") + + @abc.abstractmethod + def setup_connection(self): + pass + + @staticmethod + @abc.abstractmethod + def is_enabled(): + pass + + @staticmethod + @abc.abstractmethod + def transport(): + pass + + @abc.abstractmethod + def decode_packet(self, *args, **kwargs): + pass + + @abc.abstractmethod + def consumer(self, callback, blocking=False, immortal=False, raw=False): + pass + + @abc.abstractmethod + def is_alive(self): + pass + + @abc.abstractmethod + def close(self): + pass diff --git a/aprsd/clients/__init__.py b/aprsd/client/drivers/__init__.py similarity index 100% rename from aprsd/clients/__init__.py rename to aprsd/client/drivers/__init__.py diff --git a/aprsd/clients/aprsis.py b/aprsd/client/drivers/aprsis.py similarity index 100% rename from aprsd/clients/aprsis.py rename to aprsd/client/drivers/aprsis.py diff --git a/aprsd/clients/fake.py b/aprsd/client/drivers/fake.py similarity index 100% rename from aprsd/clients/fake.py rename to aprsd/client/drivers/fake.py diff --git a/aprsd/clients/kiss.py b/aprsd/client/drivers/kiss.py similarity index 100% rename from aprsd/clients/kiss.py rename to aprsd/client/drivers/kiss.py diff --git a/aprsd/client/factory.py b/aprsd/client/factory.py new file mode 100644 index 0000000..cce1480 --- /dev/null +++ b/aprsd/client/factory.py @@ -0,0 +1,88 @@ +import logging +from typing import Callable, Protocol, runtime_checkable + +from aprsd import exception +from aprsd.packets import core + + +LOG = logging.getLogger("APRSD") + + +@runtime_checkable +class Client(Protocol): + + def __init__(self): + pass + + def connect(self) -> bool: + pass + + def disconnect(self) -> bool: + pass + + def decode_packet(self, *args, **kwargs) -> type[core.Packet]: + pass + + def is_enabled(self) -> bool: + pass + + def is_configured(self) -> bool: + pass + + def transport(self) -> str: + pass + + def send(self, message: str) -> bool: + pass + + def setup_connection(self) -> None: + pass + + +class ClientFactory: + _instance = None + clients = [] + + def __new__(cls, *args, **kwargs): + """This magic turns this into a singleton.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + # Put any initialization here. + return cls._instance + + def __init__(self): + self.clients: list[Callable] = [] + + def register(self, aprsd_client: Callable): + if isinstance(aprsd_client, Client): + raise ValueError("Client must be a subclass of Client protocol") + + self.clients.append(aprsd_client) + + def create(self, key=None): + for client in self.clients: + if client.is_enabled(): + return client() + raise Exception("No client is configured!!") + + def is_client_enabled(self): + """Make sure at least one client is enabled.""" + enabled = False + for client in self.clients: + if client.is_enabled(): + enabled = True + return enabled + + def is_client_configured(self): + enabled = False + for client in self.clients: + try: + if client.is_configured(): + enabled = True + except exception.MissingConfigOptionException as ex: + LOG.error(ex.message) + return False + except exception.ConfigOptionBogusDefaultException as ex: + LOG.error(ex.message) + return False + return enabled diff --git a/aprsd/client/fake.py b/aprsd/client/fake.py new file mode 100644 index 0000000..c8e255b --- /dev/null +++ b/aprsd/client/fake.py @@ -0,0 +1,48 @@ +import logging + +from oslo_config import cfg + +from aprsd import client +from aprsd.client import base +from aprsd.client.drivers import fake as fake_driver +from aprsd.utils import trace + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class APRSDFakeClient(base.APRSClient, metaclass=trace.TraceWrapperMetaclass): + + def stats(self) -> dict: + return {} + + @staticmethod + def is_enabled(): + if CONF.fake_client.enabled: + return True + return False + + @staticmethod + def is_configured(): + return APRSDFakeClient.is_enabled() + + def is_alive(self): + return True + + def close(self): + pass + + def setup_connection(self): + self.connected = True + return fake_driver.APRSDFakeClient() + + @staticmethod + def transport(): + return client.TRANSPORT_FAKE + + def decode_packet(self, *args, **kwargs): + LOG.debug(f"kwargs {kwargs}") + pkt = kwargs["packet"] + LOG.debug(f"Got an APRS Fake Packet '{pkt}'") + return pkt diff --git a/aprsd/client/kiss.py b/aprsd/client/kiss.py new file mode 100644 index 0000000..14e3044 --- /dev/null +++ b/aprsd/client/kiss.py @@ -0,0 +1,103 @@ +import logging + +import aprslib +from oslo_config import cfg + +from aprsd import client, exception +from aprsd.client import base +from aprsd.client.drivers import kiss +from aprsd.packets import core + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class KISSClient(base.APRSClient): + + _client = None + + def stats(self) -> dict: + stats = {} + if self.is_configured(): + return { + "transport": self.transport(), + } + return stats + + @staticmethod + def is_enabled(): + """Return if tcp or serial KISS is enabled.""" + if CONF.kiss_serial.enabled: + return True + + if CONF.kiss_tcp.enabled: + return True + + return False + + @staticmethod + def is_configured(): + # Ensure that the config vars are correctly set + if KISSClient.is_enabled(): + transport = KISSClient.transport() + if transport == client.TRANSPORT_SERIALKISS: + if not CONF.kiss_serial.device: + LOG.error("KISS serial enabled, but no device is set.") + raise exception.MissingConfigOptionException( + "kiss_serial.device is not set.", + ) + elif transport == client.TRANSPORT_TCPKISS: + if not CONF.kiss_tcp.host: + LOG.error("KISS TCP enabled, but no host is set.") + raise exception.MissingConfigOptionException( + "kiss_tcp.host is not set.", + ) + + return True + return False + + def is_alive(self): + if self._client: + return self._client.is_alive() + else: + return False + + def close(self): + if self._client: + self._client.stop() + + @staticmethod + def transport(): + if CONF.kiss_serial.enabled: + return client.TRANSPORT_SERIALKISS + + if CONF.kiss_tcp.enabled: + return client.TRANSPORT_TCPKISS + + def decode_packet(self, *args, **kwargs): + """We get a frame, which has to be decoded.""" + LOG.debug(f"kwargs {kwargs}") + frame = kwargs["frame"] + LOG.debug(f"Got an APRS Frame '{frame}'") + # try and nuke the * from the fromcall sign. + # frame.header._source._ch = False + # payload = str(frame.payload.decode()) + # msg = f"{str(frame.header)}:{payload}" + # msg = frame.tnc2 + # LOG.debug(f"Decoding {msg}") + + raw = aprslib.parse(str(frame)) + packet = core.factory(raw) + if isinstance(packet, core.ThirdParty): + return packet.subpacket + else: + return packet + + def setup_connection(self): + self._client = kiss.KISS3Client() + self.connected = True + return self._client + + def consumer(self, callback, blocking=False, immortal=False, raw=False): + self._client.consumer(callback) diff --git a/aprsd/client/stats.py b/aprsd/client/stats.py new file mode 100644 index 0000000..5b0d088 --- /dev/null +++ b/aprsd/client/stats.py @@ -0,0 +1,38 @@ +import threading + +from oslo_config import cfg +import wrapt + +from aprsd import client +from aprsd.utils import singleton + + +CONF = cfg.CONF + + +@singleton +class APRSClientStats: + + lock = threading.Lock() + + @wrapt.synchronized(lock) + def stats(self, serializable=False): + cl = client.client_factory.create() + stats = { + "transport": cl.transport(), + "filter": cl.filter, + "connected": cl.connected, + } + + if cl.transport() == client.TRANSPORT_APRSIS: + stats["server_string"] = cl.client.server_string + keepalive = cl.client.aprsd_keepalive + if serializable: + keepalive = keepalive.isoformat() + stats["server_keepalive"] = keepalive + elif cl.transport() == client.TRANSPORT_TCPKISS: + stats["host"] = CONF.kiss_tcp.host + stats["port"] = CONF.kiss_tcp.port + elif cl.transport() == client.TRANSPORT_SERIALKISS: + stats["device"] = CONF.kiss_serial.device + return stats diff --git a/aprsd/cmds/dev.py b/aprsd/cmds/dev.py index 0cb2a87..0754b6d 100644 --- a/aprsd/cmds/dev.py +++ b/aprsd/cmds/dev.py @@ -8,8 +8,9 @@ import logging import click from oslo_config import cfg +from aprsd import cli_helper, conf, packets, plugin # local imports here -from aprsd import cli_helper, client, conf, packets, plugin +from aprsd.client import base from aprsd.main import cli from aprsd.utils import trace @@ -96,7 +97,7 @@ def test_plugin( if CONF.trace_enabled: trace.setup_tracing(["method", "api"]) - client.Client() + base.APRSClient() pm = plugin.PluginManager() if load_all: diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 71d5e61..0c03c82 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -15,7 +15,8 @@ from rich.console import Console # local imports here import aprsd -from aprsd import cli_helper, client, packets, plugin, threads +from aprsd import cli_helper, packets, plugin, threads +from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import log as packet_log @@ -179,15 +180,14 @@ def listen( # Initialize the client factory and create # The correct client object ready for use - client.ClientFactory.setup() # Make sure we have 1 client transport enabled - if not client.factory.is_client_enabled(): + if not client_factory.is_client_enabled(): LOG.error("No Clients are enabled in config.") sys.exit(-1) # Creates the client object LOG.info("Creating client connection") - aprs_client = client.factory.create() + aprs_client = client_factory.create() LOG.info(aprs_client) LOG.debug(f"Filter by '{filter}'") diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index aba0d04..458b2ec 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -8,8 +8,9 @@ import click from oslo_config import cfg import aprsd -from aprsd import cli_helper, client, packets +from aprsd import cli_helper, packets from aprsd import conf # noqa : F401 +from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector from aprsd.threads import tx @@ -102,7 +103,7 @@ def send_message( def rx_packet(packet): global got_ack, got_response - cl = client.factory.create() + cl = client_factory.create() packet = cl.decode_packet(packet) collector.PacketCollector().rx(packet) packet.log("RX") @@ -130,8 +131,7 @@ def send_message( sys.exit(0) try: - client.ClientFactory.setup() - client.factory.create().client + client_factory.create().client except LoginError: sys.exit(-1) @@ -163,7 +163,7 @@ def send_message( # This will register a packet consumer with aprslib # When new packets come in the consumer will process # the packet - aprs_client = client.factory.create().client + aprs_client = client_factory.create().client aprs_client.consumer(rx_packet, raw=False) except aprslib.exceptions.ConnectionDrop: LOG.error("Connection dropped, reconnecting") diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 16c7b59..f734be7 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -6,9 +6,10 @@ import click from oslo_config import cfg import aprsd -from aprsd import cli_helper, client +from aprsd import cli_helper from aprsd import main as aprsd_main from aprsd import packets, plugin, threads, utils +from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import seen_list @@ -49,14 +50,13 @@ def server(ctx, flush): # Initialize the client factory and create # The correct client object ready for use - client.ClientFactory.setup() - if not client.factory.is_client_enabled(): + if not client_factory.is_client_enabled(): LOG.error("No Clients are enabled in config.") sys.exit(-1) # Creates the client object LOG.info("Creating client connection") - aprs_client = client.factory.create() + aprs_client = client_factory.create() LOG.info(aprs_client) # Create the initial PM singleton and Register plugins @@ -79,18 +79,14 @@ def server(ctx, flush): LOG.info(p) # Make sure we have 1 client transport enabled - if not client.factory.is_client_enabled(): + if not client_factory.is_client_enabled(): LOG.error("No Clients are enabled in config.") sys.exit(-1) - if not client.factory.is_client_configured(): + if not client_factory.is_client_configured(): LOG.error("APRS client is not properly configured in config file.") sys.exit(-1) - # Creates the client object - # LOG.info("Creating client connection") - # client.factory.create().client - # Now load the msgTrack from disk if any packets.PacketList() if flush: diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 813bbc4..de8fd4e 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -21,6 +21,7 @@ import aprsd from aprsd import ( cli_helper, client, packets, plugin_utils, stats, threads, utils, ) +from aprsd.client import client_factory, kiss from aprsd.main import cli from aprsd.threads import aprsd as aprsd_threads from aprsd.threads import keep_alive, rx, tx @@ -380,8 +381,8 @@ def _get_transport(stats): "APRS-IS Server: " "{}".format(stats["APRSClientStats"]["server_string"]) ) - elif client.KISSClient.is_enabled(): - transport = client.KISSClient.transport() + elif kiss.KISSClient.is_enabled(): + transport = kiss.KISSClient.transport() if transport == client.TRANSPORT_TCPKISS: aprs_connection = ( "TCPKISS://{}:{}".format( @@ -637,13 +638,12 @@ def webchat(ctx, flush, port): # Initialize the client factory and create # The correct client object ready for use - client.ClientFactory.setup() # Make sure we have 1 client transport enabled - if not client.factory.is_client_enabled(): + if not client_factory.is_client_enabled(): LOG.error("No Clients are enabled in config.") sys.exit(-1) - if not client.factory.is_client_configured(): + if not client_factory.is_client_configured(): LOG.error("APRS client is not properly configured in config file.") sys.exit(-1) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index b923ee9..6c5f973 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -148,7 +148,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): watch_list = CONF.watch_list.callsigns # make sure the timeout is set or this doesn't work if watch_list: - aprs_client = client.factory.create().client + aprs_client = client.client_factory.create().client filter_str = "b/{}".format("/".join(watch_list)) aprs_client.set_filter(filter_str) else: diff --git a/aprsd/stats/__init__.py b/aprsd/stats/__init__.py index 759698a..2059a65 100644 --- a/aprsd/stats/__init__.py +++ b/aprsd/stats/__init__.py @@ -1,5 +1,5 @@ -from aprsd import client as aprs_client from aprsd import plugin +from aprsd.client import stats as client_stats from aprsd.packets import packet_list, seen_list, tracker, watch_list from aprsd.plugins import email from aprsd.stats import app, collector @@ -16,5 +16,5 @@ stats_collector.register_producer(tracker.PacketTrack) stats_collector.register_producer(plugin.PluginManager) stats_collector.register_producer(aprsd.APRSDThreadList) stats_collector.register_producer(email.EmailStats) -stats_collector.register_producer(aprs_client.APRSClientStats) +stats_collector.register_producer(client_stats.APRSClientStats) stats_collector.register_producer(seen_list.SeenList) diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index c40cb0d..9671c1a 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -5,7 +5,8 @@ import tracemalloc from oslo_config import cfg -from aprsd import client, packets, utils +from aprsd import packets, utils +from aprsd.client import client_factory from aprsd.log import log as aprsd_log from aprsd.stats import collector from aprsd.threads import APRSDThread, APRSDThreadList @@ -89,7 +90,7 @@ class KeepAliveThread(APRSDThread): LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}") # check the APRS connection - cl = client.factory.create() + cl = client_factory.create() # Reset the connection if it's dead and this isn't our # First time through the loop. # The first time through the loop can happen at startup where @@ -97,7 +98,7 @@ class KeepAliveThread(APRSDThread): # to make it's connection the first time. if not cl.is_alive() and self.cntr > 0: LOG.error(f"{cl.__class__.__name__} is not alive!!! Resetting") - client.factory.create().reset() + client_factory.create().reset() # else: # # See if we should reset the aprs-is client # # Due to losing a keepalive from them diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py index 098e6ce..f4a17ca 100644 --- a/aprsd/threads/log_monitor.py +++ b/aprsd/threads/log_monitor.py @@ -19,7 +19,6 @@ def send_log_entries(force=False): if CONF.admin.web_enabled: if force or LogEntries().is_purge_ready(): entries = LogEntries().get_all_and_purge() - print(f"Sending log entries {len(entries)}") if entries: try: requests.post( @@ -27,9 +26,8 @@ def send_log_entries(force=False): json=entries, auth=(CONF.admin.user, CONF.admin.password), ) - except Exception as ex: - LOG.warning(f"Failed to send log entries {len(entries)}") - LOG.warning(ex) + except Exception: + LOG.warning(f"Failed to send log entries. len={len(entries)}") class LogEntries: diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index dbf558b..3c88958 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -6,7 +6,8 @@ import time import aprslib from oslo_config import cfg -from aprsd import client, packets, plugin +from aprsd import packets, plugin +from aprsd.client import client_factory from aprsd.packets import collector from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx @@ -20,7 +21,7 @@ class APRSDRXThread(APRSDThread): def __init__(self, packet_queue): super().__init__("RX_PKT") self.packet_queue = packet_queue - self._client = client.factory.create() + self._client = client_factory.create() def stop(self): self.thread_stop = True @@ -29,7 +30,7 @@ class APRSDRXThread(APRSDThread): def loop(self): if not self._client: - self._client = client.factory.create() + self._client = client_factory.create() time.sleep(1) return True # setup the consumer of messages and block until a messages diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index 9b6c75a..25e61b1 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -9,9 +9,9 @@ from rush.limiters import periodic from rush.stores import dictionary import wrapt -from aprsd import client from aprsd import conf # noqa from aprsd import threads as aprsd_threads +from aprsd.client import client_factory from aprsd.packets import collector, core from aprsd.packets import log as packet_log from aprsd.packets import tracker @@ -80,7 +80,7 @@ def _send_direct(packet, aprs_client=None): if aprs_client: cl = aprs_client else: - cl = client.factory.create() + cl = client_factory.create() packet.update_timestamp() packet_log.log(packet, tx=True) @@ -247,7 +247,7 @@ class BeaconSendThread(aprsd_threads.APRSDThread): send(pkt, direct=True) except Exception as e: LOG.error(f"Failed to send beacon: {e}") - client.factory.create().reset() + client_factory.create().reset() time.sleep(5) self._loop_cnt += 1 diff --git a/tests/plugins/test_notify.py b/tests/plugins/test_notify.py index 1880f5d..b1a6954 100644 --- a/tests/plugins/test_notify.py +++ b/tests/plugins/test_notify.py @@ -62,9 +62,9 @@ class TestAPRSDWatchListPluginBase(TestWatchListPlugin): expected = packets.NULL_MESSAGE self.assertEqual(expected, actual) - @mock.patch("aprsd.client.ClientFactory", autospec=True) + @mock.patch("aprsd.client.factory.ClientFactory", autospec=True) def test_watchlist_not_in_watchlist(self, mock_factory): - client.factory = mock_factory + client.client_factory = mock_factory self.config_and_init() plugin = fake.FakeWatchListPlugin() @@ -92,9 +92,9 @@ class TestNotifySeenPlugin(TestWatchListPlugin): expected = packets.NULL_MESSAGE self.assertEqual(expected, actual) - @mock.patch("aprsd.client.ClientFactory", autospec=True) + @mock.patch("aprsd.client.factory.ClientFactory", autospec=True) def test_callsign_not_in_watchlist(self, mock_factory): - client.factory = mock_factory + client.client_factory = mock_factory self.config_and_init(watchlist_enabled=False) plugin = notify_plugin.NotifySeenPlugin() @@ -106,10 +106,10 @@ class TestNotifySeenPlugin(TestWatchListPlugin): expected = packets.NULL_MESSAGE self.assertEqual(expected, actual) - @mock.patch("aprsd.client.ClientFactory", autospec=True) + @mock.patch("aprsd.client.factory.ClientFactory", autospec=True) @mock.patch("aprsd.packets.WatchList.is_old") def test_callsign_in_watchlist_not_old(self, mock_is_old, mock_factory): - client.factory = mock_factory + client.client_factory = mock_factory mock_is_old.return_value = False self.config_and_init( watchlist_enabled=True, @@ -126,10 +126,10 @@ class TestNotifySeenPlugin(TestWatchListPlugin): expected = packets.NULL_MESSAGE self.assertEqual(expected, actual) - @mock.patch("aprsd.client.ClientFactory", autospec=True) + @mock.patch("aprsd.client.factory.ClientFactory", autospec=True) @mock.patch("aprsd.packets.WatchList.is_old") def test_callsign_in_watchlist_old_same_alert_callsign(self, mock_is_old, mock_factory): - client.factory = mock_factory + client.client_factory = mock_factory mock_is_old.return_value = True self.config_and_init( watchlist_enabled=True, @@ -147,10 +147,10 @@ class TestNotifySeenPlugin(TestWatchListPlugin): expected = packets.NULL_MESSAGE self.assertEqual(expected, actual) - @mock.patch("aprsd.client.ClientFactory", autospec=True) + @mock.patch("aprsd.client.factory.ClientFactory", autospec=True) @mock.patch("aprsd.packets.WatchList.is_old") def test_callsign_in_watchlist_old_send_alert(self, mock_is_old, mock_factory): - client.factory = mock_factory + client.client_factory = mock_factory mock_is_old.return_value = True notify_callsign = fake.FAKE_TO_CALLSIGN fromcall = "WB4BOR"