diff --git a/aprsd/client.py b/aprsd/client.py index e3d2079..1722a83 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -45,9 +45,9 @@ class APRSClientStats: if client.transport() == TRANSPORT_APRSIS: stats["server_string"] = client.client.server_string keepalive = client.client.aprsd_keepalive - if keepalive: + if serializable: keepalive = keepalive.isoformat() - stats["sever_keepalive"] = keepalive + stats["server_keepalive"] = keepalive elif client.transport() == TRANSPORT_TCPKISS: stats["host"] = CONF.kiss_tcp.host stats["port"] = CONF.kiss_tcp.port diff --git a/aprsd/cmds/fetch_stats.py b/aprsd/cmds/fetch_stats.py index c0a4d42..386ff41 100644 --- a/aprsd/cmds/fetch_stats.py +++ b/aprsd/cmds/fetch_stats.py @@ -1,10 +1,9 @@ -# Fetch active stats from a remote running instance of aprsd server -# This uses the RPC server to fetch the stats from the remote server. - +# Fetch active stats from a remote running instance of aprsd admin web interface. import logging import click from oslo_config import cfg +import requests from rich.console import Console from rich.table import Table @@ -12,7 +11,6 @@ from rich.table import Table import aprsd from aprsd import cli_helper from aprsd.main import cli -from aprsd.rpc import client as rpc_client # setup the global logger @@ -26,87 +24,80 @@ CONF = cfg.CONF @click.option( "--host", type=str, default=None, - help="IP address of the remote aprsd server to fetch stats from.", + help="IP address of the remote aprsd admin web ui fetch stats from.", ) @click.option( "--port", type=int, default=None, - help="Port of the remote aprsd server rpc port to fetch stats from.", -) -@click.option( - "--magic-word", type=str, - default=None, - help="Magic word of the remote aprsd server rpc port to fetch stats from.", + help="Port of the remote aprsd web admin interface to fetch stats from.", ) @click.pass_context @cli_helper.process_standard_options -def fetch_stats(ctx, host, port, magic_word): - """Fetch stats from a remote running instance of aprsd server.""" - LOG.info(f"APRSD Fetch-Stats started version: {aprsd.__version__}") +def fetch_stats(ctx, host, port): + """Fetch stats from a APRSD admin web interface.""" + console = Console() + console.print(f"APRSD Fetch-Stats started version: {aprsd.__version__}") CONF.log_opt_values(LOG, logging.DEBUG) if not host: - host = CONF.rpc_settings.ip + host = CONF.admin.web_ip if not port: - port = CONF.rpc_settings.port - if not magic_word: - magic_word = CONF.rpc_settings.magic_word + port = CONF.admin.web_port - msg = f"Fetching stats from {host}:{port} with magic word '{magic_word}'" - console = Console() + msg = f"Fetching stats from {host}:{port}" console.print(msg) with console.status(msg): - client = rpc_client.RPCClient(host, port, magic_word) - stats = client.get_stats_dict() - if stats: - console.print_json(data=stats) - else: - LOG.error(f"Failed to fetch stats via RPC aprsd server at {host}:{port}") + response = requests.get(f"http://{host}:{port}/stats", timeout=120) + if not response: + console.print( + f"Failed to fetch stats from {host}:{port}?", + style="bold red", + ) return + + stats = response.json() + if not stats: + console.print( + f"Failed to fetch stats from aprsd admin ui at {host}:{port}", + style="bold red", + ) + return + aprsd_title = ( "APRSD " - f"[bold cyan]v{stats['aprsd']['version']}[/] " - f"Callsign [bold green]{stats['aprsd']['callsign']}[/] " - f"Uptime [bold yellow]{stats['aprsd']['uptime']}[/]" + f"[bold cyan]v{stats['APRSDStats']['version']}[/] " + f"Callsign [bold green]{stats['APRSDStats']['callsign']}[/] " + f"Uptime [bold yellow]{stats['APRSDStats']['uptime']}[/]" ) - console.rule(f"Stats from {host}:{port} with magic word '{magic_word}'") + console.rule(f"Stats from {host}:{port}") console.print("\n\n") console.rule(aprsd_title) # Show the connection to APRS # It can be a connection to an APRS-IS server or a local TNC via KISS or KISSTCP if "aprs-is" in stats: - title = f"APRS-IS Connection {stats['aprs-is']['server']}" + title = f"APRS-IS Connection {stats['APRSClientStats']['server_string']}" table = Table(title=title) table.add_column("Key") table.add_column("Value") - for key, value in stats["aprs-is"].items(): + for key, value in stats["APRSClientStats"].items(): table.add_row(key, value) console.print(table) threads_table = Table(title="Threads") threads_table.add_column("Name") threads_table.add_column("Alive?") - for name, alive in stats["aprsd"]["threads"].items(): + for name, alive in stats["APRSDThreadList"].items(): threads_table.add_row(name, str(alive)) console.print(threads_table) - msgs_table = Table(title="Messages") - msgs_table.add_column("Key") - msgs_table.add_column("Value") - for key, value in stats["messages"].items(): - msgs_table.add_row(key, str(value)) - - console.print(msgs_table) - packet_totals = Table(title="Packet Totals") packet_totals.add_column("Key") packet_totals.add_column("Value") - packet_totals.add_row("Total Received", str(stats["packets"]["total_received"])) - packet_totals.add_row("Total Sent", str(stats["packets"]["total_sent"])) - packet_totals.add_row("Total Tracked", str(stats["packets"]["total_tracked"])) + packet_totals.add_row("Total Received", str(stats["PacketList"]["rx"])) + packet_totals.add_row("Total Sent", str(stats["PacketList"]["tx"])) console.print(packet_totals) # Show each of the packet types @@ -114,47 +105,52 @@ def fetch_stats(ctx, host, port, magic_word): packets_table.add_column("Packet Type") packets_table.add_column("TX") packets_table.add_column("RX") - for key, value in stats["packets"]["by_type"].items(): + for key, value in stats["PacketList"]["packets"].items(): packets_table.add_row(key, str(value["tx"]), str(value["rx"])) console.print(packets_table) if "plugins" in stats: - count = len(stats["plugins"]) + count = len(stats["PluginManager"]) plugins_table = Table(title=f"Plugins ({count})") plugins_table.add_column("Plugin") plugins_table.add_column("Enabled") plugins_table.add_column("Version") plugins_table.add_column("TX") plugins_table.add_column("RX") - for key, value in stats["plugins"].items(): + plugins = stats["PluginManager"] + for key, value in plugins.items(): plugins_table.add_row( key, - str(stats["plugins"][key]["enabled"]), - stats["plugins"][key]["version"], - str(stats["plugins"][key]["tx"]), - str(stats["plugins"][key]["rx"]), + str(plugins[key]["enabled"]), + plugins[key]["version"], + str(plugins[key]["tx"]), + str(plugins[key]["rx"]), ) console.print(plugins_table) - if "seen_list" in stats["aprsd"]: - count = len(stats["aprsd"]["seen_list"]) + seen_list = stats.get("SeenList") + + if seen_list: + count = len(seen_list) seen_table = Table(title=f"Seen List ({count})") seen_table.add_column("Callsign") seen_table.add_column("Message Count") seen_table.add_column("Last Heard") - for key, value in stats["aprsd"]["seen_list"].items(): + for key, value in seen_list.items(): seen_table.add_row(key, str(value["count"]), value["last"]) console.print(seen_table) - if "watch_list" in stats["aprsd"]: - count = len(stats["aprsd"]["watch_list"]) + watch_list = stats.get("WatchList") + + if watch_list: + count = len(watch_list) watch_table = Table(title=f"Watch List ({count})") watch_table.add_column("Callsign") watch_table.add_column("Last Heard") - for key, value in stats["aprsd"]["watch_list"].items(): + for key, value in watch_list.items(): watch_table.add_row(key, value["last"]) console.print(watch_table) diff --git a/aprsd/cmds/healthcheck.py b/aprsd/cmds/healthcheck.py index c8c9a38..6914e8f 100644 --- a/aprsd/cmds/healthcheck.py +++ b/aprsd/cmds/healthcheck.py @@ -13,11 +13,11 @@ from oslo_config import cfg from rich.console import Console import aprsd -from aprsd import cli_helper, utils +from aprsd import cli_helper from aprsd import conf # noqa # local imports here from aprsd.main import cli -from aprsd.rpc import client as aprsd_rpc_client +from aprsd.threads import stats as stats_threads # setup the global logger @@ -39,46 +39,48 @@ console = Console() @cli_helper.process_standard_options def healthcheck(ctx, timeout): """Check the health of the running aprsd server.""" - console.log(f"APRSD HealthCheck version: {aprsd.__version__}") - if not CONF.rpc_settings.enabled: - LOG.error("Must enable rpc_settings.enabled to use healthcheck") - sys.exit(-1) - if not CONF.rpc_settings.ip: - LOG.error("Must enable rpc_settings.ip to use healthcheck") - sys.exit(-1) - if not CONF.rpc_settings.magic_word: - LOG.error("Must enable rpc_settings.magic_word to use healthcheck") - sys.exit(-1) + ver_str = f"APRSD HealthCheck version: {aprsd.__version__}" + console.log(ver_str) - with console.status(f"APRSD HealthCheck version: {aprsd.__version__}") as status: + with console.status(ver_str): try: - status.update(f"Contacting APRSD via RPC {CONF.rpc_settings.ip}") - stats = aprsd_rpc_client.RPCClient().get_stats_dict() + stats_obj = stats_threads.StatsStore() + stats_obj.load() + stats = stats_obj.data + # console.print(stats) except Exception as ex: - console.log(f"Failed to fetch healthcheck : '{ex}'") + console.log(f"Failed to load stats: '{ex}'") sys.exit(-1) else: + now = datetime.datetime.now() if not stats: console.log("No stats from aprsd") sys.exit(-1) - email_thread_last_update = stats["email"]["thread_last_update"] - if email_thread_last_update != "never": - delta = utils.parse_delta_str(email_thread_last_update) - d = datetime.timedelta(**delta) + email_stats = stats.get("EmailStats") + if email_stats: + email_thread_last_update = email_stats["last_check_time"] + + if email_thread_last_update != "never": + d = now - email_thread_last_update + max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0} + max_delta = datetime.timedelta(**max_timeout) + if d > max_delta: + console.log(f"Email thread is very old! {d}") + sys.exit(-1) + + client_stats = stats.get("APRSClientStats") + if not client_stats: + console.log("No APRSClientStats") + sys.exit(-1) + else: + aprsis_last_update = client_stats["server_keepalive"] + d = now - aprsis_last_update max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0} max_delta = datetime.timedelta(**max_timeout) if d > max_delta: - console.log(f"Email thread is very old! {d}") + LOG.error(f"APRS-IS last update is very old! {d}") sys.exit(-1) - aprsis_last_update = stats["aprs-is"]["last_update"] - delta = utils.parse_delta_str(aprsis_last_update) - d = datetime.timedelta(**delta) - max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0} - max_delta = datetime.timedelta(**max_timeout) - if d > max_delta: - LOG.error(f"APRS-IS last update is very old! {d}") - sys.exit(-1) - + console.log("OK") sys.exit(0) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index b4a877c..174e024 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -10,7 +10,9 @@ from aprsd import cli_helper, client from aprsd import main as aprsd_main from aprsd import packets, plugin, threads, utils from aprsd.main import cli -from aprsd.threads import log_monitor, registry, rx, tx +from aprsd.threads import keep_alive, log_monitor, registry, rx +from aprsd.threads import stats as stats_thread +from aprsd.threads import tx CONF = cfg.CONF @@ -101,11 +103,11 @@ def server(ctx, flush): packets.WatchList().load() packets.SeenList().load() - keepalive = threads.KeepAliveThread() + keepalive = keep_alive.KeepAliveThread() keepalive.start() - stats_thread = threads.APRSDStatsStoreThread() - stats_thread.start() + stats_store_thread = stats_thread.APRSDStatsStoreThread() + stats_store_thread.start() rx_thread = rx.APRSDPluginRXThread( packet_queue=threads.packet_queue, @@ -126,7 +128,7 @@ def server(ctx, flush): registry_thread = registry.APRSRegistryThread() registry_thread.start() - if CONF.rpc_settings.enabled: + if CONF.admin.web_enabled: log_monitor_thread = log_monitor.LogMonitorThread() log_monitor_thread.start() diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index f88bb21..d543ae8 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -23,7 +23,7 @@ from aprsd import ( ) from aprsd.main import cli from aprsd.threads import aprsd as aprsd_threads -from aprsd.threads import rx, tx +from aprsd.threads import keep_alive, rx, tx from aprsd.utils import trace @@ -642,7 +642,7 @@ def webchat(ctx, flush, port): packets.WatchList() packets.SeenList() - keepalive = threads.KeepAliveThread() + keepalive = keep_alive.KeepAliveThread() LOG.info("Start KeepAliveThread") keepalive.start() diff --git a/aprsd/conf/common.py b/aprsd/conf/common.py index 6221d4f..9309d24 100644 --- a/aprsd/conf/common.py +++ b/aprsd/conf/common.py @@ -15,10 +15,6 @@ watch_list_group = cfg.OptGroup( name="watch_list", title="Watch List settings", ) -rpc_group = cfg.OptGroup( - name="rpc_settings", - title="RPC Settings for admin <--> web", -) webchat_group = cfg.OptGroup( name="webchat", title="Settings specific to the webchat command", @@ -169,28 +165,6 @@ admin_opts = [ ), ] -rpc_opts = [ - cfg.BoolOpt( - "enabled", - default=True, - help="Enable RPC calls", - ), - cfg.StrOpt( - "ip", - default="localhost", - help="The ip address to listen on", - ), - cfg.PortOpt( - "port", - default=18861, - help="The port to listen on", - ), - cfg.StrOpt( - "magic_word", - default=APRSD_DEFAULT_MAGIC_WORD, - help="Magic word to authenticate requests between client/server", - ), -] enabled_plugins_opts = [ cfg.ListOpt( @@ -281,8 +255,6 @@ def register_opts(config): config.register_opts(admin_opts, group=admin_group) config.register_group(watch_list_group) config.register_opts(watch_list_opts, group=watch_list_group) - config.register_group(rpc_group) - config.register_opts(rpc_opts, group=rpc_group) config.register_group(webchat_group) config.register_opts(webchat_opts, group=webchat_group) config.register_group(registry_group) @@ -294,7 +266,6 @@ def list_opts(): "DEFAULT": (aprsd_opts + enabled_plugins_opts), admin_group.name: admin_opts, watch_list_group.name: watch_list_opts, - rpc_group.name: rpc_opts, webchat_group.name: webchat_opts, registry_group.name: registry_opts, } diff --git a/aprsd/log/log.py b/aprsd/log/log.py index fcdb620..2c34e12 100644 --- a/aprsd/log/log.py +++ b/aprsd/log/log.py @@ -36,7 +36,6 @@ class InterceptHandler(logging.Handler): # to disable log to stdout, but still log to file # use the --quiet option on the cmdln def setup_logging(loglevel=None, quiet=False): - print(f"setup_logging: loglevel={loglevel}, quiet={quiet}") if not loglevel: log_level = CONF.logging.log_level else: @@ -58,6 +57,8 @@ def setup_logging(loglevel=None, quiet=False): webserver_list = [ "werkzeug", "werkzeug._internal", + "socketio", + "urllib3.connectionpool", ] # We don't really want to see the aprslib parsing debug output. diff --git a/aprsd/main.py b/aprsd/main.py index e6eba5b..cdb88dd 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -45,7 +45,6 @@ CONF = cfg.CONF LOG = logging.getLogger("APRSD") CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) flask_enabled = False -rpc_serv = None def custom_startswith(string, incomplete): diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 87c51c6..344c70a 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -109,14 +109,6 @@ class Packet: path: List[str] = field(default_factory=list, compare=False, hash=False) via: Optional[str] = field(default=None, compare=False, hash=False) - @property - def json(self): - """get the json formated string. - - This is used soley by the rpc server to return json over the wire. - """ - return self.to_json() - def get(self, key: str, default: Optional[str] = None): """Emulate a getter on a dict.""" if hasattr(self, key): diff --git a/aprsd/packets/seen_list.py b/aprsd/packets/seen_list.py index f917eb9..5c7774f 100644 --- a/aprsd/packets/seen_list.py +++ b/aprsd/packets/seen_list.py @@ -26,6 +26,14 @@ class SeenList(objectstore.ObjectStoreMixin): cls._instance.data = {} return cls._instance + def stats(self, serializable=False): + """Return the stats for the PacketTrack class.""" + stats = self.data + # if serializable: + # for call in self.data: + # stats[call]["last"] = stats[call]["last"].isoformat() + return stats + @wrapt.synchronized(lock) def update_seen(self, packet): callsign = None @@ -39,5 +47,5 @@ class SeenList(objectstore.ObjectStoreMixin): "last": None, "count": 0, } - self.data[callsign]["last"] = str(datetime.datetime.now()) + self.data[callsign]["last"] = datetime.datetime.now() self.data[callsign]["count"] += 1 diff --git a/aprsd/rpc/__init__.py b/aprsd/rpc/__init__.py deleted file mode 100644 index 87df2d6..0000000 --- a/aprsd/rpc/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -import rpyc - - -class AuthSocketStream(rpyc.SocketStream): - """Used to authenitcate the RPC stream to remote.""" - - @classmethod - def connect(cls, *args, authorizer=None, **kwargs): - stream_obj = super().connect(*args, **kwargs) - - if callable(authorizer): - authorizer(stream_obj.sock) - - return stream_obj diff --git a/aprsd/rpc/client.py b/aprsd/rpc/client.py deleted file mode 100644 index 985dc2e..0000000 --- a/aprsd/rpc/client.py +++ /dev/null @@ -1,165 +0,0 @@ -import json -import logging - -from oslo_config import cfg -import rpyc - -from aprsd import conf # noqa -from aprsd import rpc - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -class RPCClient: - _instance = None - _rpc_client = None - - ip = None - port = None - magic_word = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self, ip=None, port=None, magic_word=None): - if ip: - self.ip = ip - else: - self.ip = CONF.rpc_settings.ip - if port: - self.port = int(port) - else: - self.port = CONF.rpc_settings.port - if magic_word: - self.magic_word = magic_word - else: - self.magic_word = CONF.rpc_settings.magic_word - self._check_settings() - self.get_rpc_client() - - def _check_settings(self): - if not CONF.rpc_settings.enabled: - LOG.warning("RPC is not enabled, no way to get stats!!") - - if self.magic_word == conf.common.APRSD_DEFAULT_MAGIC_WORD: - LOG.warning("You are using the default RPC magic word!!!") - LOG.warning("edit aprsd.conf and change rpc_settings.magic_word") - - LOG.debug(f"RPC Client: {self.ip}:{self.port} {self.magic_word}") - - def _rpyc_connect( - self, host, port, service=rpyc.VoidService, - config={}, ipv6=False, - keepalive=False, authorizer=None, ): - - LOG.info(f"Connecting to RPC host '{host}:{port}'") - try: - s = rpc.AuthSocketStream.connect( - host, port, ipv6=ipv6, keepalive=keepalive, - authorizer=authorizer, - ) - return rpyc.utils.factory.connect_stream(s, service, config=config) - except ConnectionRefusedError: - LOG.error(f"Failed to connect to RPC host '{host}:{port}'") - return None - - def get_rpc_client(self): - if not self._rpc_client: - self._rpc_client = self._rpyc_connect( - self.ip, - self.port, - authorizer=lambda sock: sock.send(self.magic_word.encode()), - ) - return self._rpc_client - - def get_stats_dict(self): - cl = self.get_rpc_client() - result = {} - if not cl: - return result - - try: - rpc_stats_dict = cl.root.get_stats() - result = json.loads(rpc_stats_dict) - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_stats(self): - cl = self.get_rpc_client() - result = {} - if not cl: - return result - - try: - result = cl.root.get_stats_obj() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_packet_track(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_packet_track() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_packet_list(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_packet_list() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_watch_list(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_watch_list() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_seen_list(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result = cl.root.get_seen_list() - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result - - def get_log_entries(self): - cl = self.get_rpc_client() - result = None - if not cl: - return result - try: - result_str = cl.root.get_log_entries() - result = json.loads(result_str) - except EOFError: - LOG.error("Lost connection to RPC Host") - self._rpc_client = None - return result diff --git a/aprsd/rpc/server.py b/aprsd/rpc/server.py deleted file mode 100644 index 749300f..0000000 --- a/aprsd/rpc/server.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -import logging - -from oslo_config import cfg -import rpyc -from rpyc.utils.authenticators import AuthenticationError -from rpyc.utils.server import ThreadPoolServer - -from aprsd import conf # noqa: F401 -from aprsd import packets, stats, threads -from aprsd.threads import log_monitor - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -def magic_word_authenticator(sock): - client_ip = sock.getpeername()[0] - magic = sock.recv(len(CONF.rpc_settings.magic_word)).decode() - if magic != CONF.rpc_settings.magic_word: - LOG.error( - f"wrong magic word passed from {client_ip} " - "'{magic}' != '{CONF.rpc_settings.magic_word}'", - ) - raise AuthenticationError( - f"wrong magic word passed in '{magic}'" - f" != '{CONF.rpc_settings.magic_word}'", - ) - return sock, None - - -class APRSDRPCThread(threads.APRSDThread): - def __init__(self): - super().__init__(name="RPCThread") - self.thread = ThreadPoolServer( - APRSDService, - port=CONF.rpc_settings.port, - protocol_config={"allow_public_attrs": True}, - authenticator=magic_word_authenticator, - ) - - def stop(self): - if self.thread: - self.thread.close() - self.thread_stop = True - - def loop(self): - # there is no loop as run is blocked - if self.thread and not self.thread_stop: - # This is a blocking call - self.thread.start() - - -@rpyc.service -class APRSDService(rpyc.Service): - def on_connect(self, conn): - # code that runs when a connection is created - # (to init the service, if needed) - LOG.info("RPC Client Connected") - self._conn = conn - - def on_disconnect(self, conn): - # code that runs after the connection has already closed - # (to finalize the service, if needed) - LOG.info("RPC Client Disconnected") - self._conn = None - - @rpyc.exposed - def get_stats(self): - stat = stats.APRSDStats() - stats_dict = stat.stats() - return_str = json.dumps(stats_dict, indent=4, sort_keys=True, default=str) - return return_str - - @rpyc.exposed - def get_stats_obj(self): - return stats.APRSDStats() - - @rpyc.exposed - def get_packet_list(self): - return packets.PacketList() - - @rpyc.exposed - def get_packet_track(self): - return packets.PacketTrack() - - @rpyc.exposed - def get_watch_list(self): - return packets.WatchList() - - @rpyc.exposed - def get_seen_list(self): - return packets.SeenList() - - @rpyc.exposed - def get_log_entries(self): - entries = log_monitor.LogEntries().get_all_and_purge() - return json.dumps(entries, default=str) diff --git a/aprsd/stats/__init__.py b/aprsd/stats/__init__.py index e1861c7..44602fd 100644 --- a/aprsd/stats/__init__.py +++ b/aprsd/stats/__init__.py @@ -1,6 +1,6 @@ from aprsd import client as aprs_client from aprsd import plugin -from aprsd.packets import packet_list, tracker, watch_list +from aprsd.packets import packet_list, seen_list, tracker, watch_list from aprsd.plugins import email from aprsd.stats import app, collector from aprsd.threads import aprsd @@ -17,3 +17,4 @@ stats_collector.register_producer(plugin.PluginManager()) stats_collector.register_producer(aprsd.APRSDThreadList()) stats_collector.register_producer(email.EmailStats()) stats_collector.register_producer(aprs_client.APRSClientStats()) +stats_collector.register_producer(seen_list.SeenList()) diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index db36e17..bd26e01 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -3,11 +3,9 @@ import queue # Make these available to anyone importing # aprsd.threads from .aprsd import APRSDThread, APRSDThreadList # noqa: F401 -from .keep_alive import KeepAliveThread # noqa: F401 from .rx import ( # noqa: F401 APRSDDupeRXThread, APRSDProcessPacketThread, APRSDRXThread, ) -from .stats import APRSDStatsStoreThread # noqa: F401 packet_queue = queue.Queue(maxsize=20) diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py index 2d95d06..1d3e6e9 100644 --- a/aprsd/threads/log_monitor.py +++ b/aprsd/threads/log_monitor.py @@ -1,19 +1,44 @@ +import datetime import logging import threading +from oslo_config import cfg +import requests import wrapt from aprsd import threads from aprsd.log import log +CONF = cfg.CONF LOG = logging.getLogger("APRSD") +def send_log_entries(force=False): + """Send all of the log entries to the web interface.""" + if CONF.admin.web_enabled: + if force or LogEntries().is_purge_ready(): + entries = LogEntries().get_all_and_purge() + print(f"Sending log entries {len(entries)}") + if entries: + try: + requests.post( + f"http://{CONF.admin.web_ip}:{CONF.admin.web_port}/log_entries", + json=entries, + auth=(CONF.admin.user, CONF.admin.password), + ) + except Exception: + pass + + class LogEntries: entries = [] lock = threading.Lock() _instance = None + last_purge = datetime.datetime.now() + max_delta = datetime.timedelta( + hours=0.0, minutes=0, seconds=2, + ) def __new__(cls, *args, **kwargs): if cls._instance is None: @@ -33,8 +58,18 @@ class LogEntries: def get_all_and_purge(self): entries = self.entries.copy() self.entries = [] + self.last_purge = datetime.datetime.now() return entries + def is_purge_ready(self): + now = datetime.datetime.now() + if ( + now - self.last_purge > self.max_delta + and len(self.entries) > 1 + ): + return True + return False + @wrapt.synchronized(lock) def __len__(self): return len(self.entries) @@ -45,6 +80,10 @@ class LogMonitorThread(threads.APRSDThread): def __init__(self): super().__init__("LogMonitorThread") + def stop(self): + send_log_entries(force=True) + super().stop() + def loop(self): try: record = log.logging_queue.get(block=True, timeout=2) @@ -59,6 +98,7 @@ class LogMonitorThread(threads.APRSDThread): # Just ignore thi pass + send_log_entries() return True def json_record(self, record): diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 59956a6..b643b65 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -27,7 +27,6 @@ class APRSDRXThread(APRSDThread): self._client.stop() def loop(self): - LOG.debug(f"RX_MSG-LOOP {self.loop_count}") if not self._client: self._client = client.factory.create() time.sleep(1) @@ -43,11 +42,9 @@ class APRSDRXThread(APRSDThread): # and the aprslib developer didn't want to allow a PR to add # kwargs. :( # https://github.com/rossengeorgiev/aprs-python/pull/56 - LOG.debug(f"Calling client consumer CL {self._client}") self._client.consumer( self._process_packet, raw=False, blocking=False, ) - LOG.debug(f"Consumer done {self._client}") except ( aprslib.exceptions.ConnectionDrop, aprslib.exceptions.ConnectionError, diff --git a/aprsd/utils/json.py b/aprsd/utils/json.py index 8876738..29f6565 100644 --- a/aprsd/utils/json.py +++ b/aprsd/utils/json.py @@ -42,6 +42,22 @@ class EnhancedJSONEncoder(json.JSONEncoder): return super().default(obj) +class SimpleJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime.datetime): + return obj.isoformat() + elif isinstance(obj, datetime.date): + return str(obj) + elif isinstance(obj, datetime.time): + return str(obj) + elif isinstance(obj, datetime.timedelta): + return str(obj) + elif isinstance(obj, decimal.Decimal): + return str(obj) + else: + return super().default(obj) + + class EnhancedJSONDecoder(json.JSONDecoder): def __init__(self, *args, **kwargs): diff --git a/aprsd/web/admin/static/js/charts.js b/aprsd/web/admin/static/js/charts.js index 4ceb6d7..237641a 100644 --- a/aprsd/web/admin/static/js/charts.js +++ b/aprsd/web/admin/static/js/charts.js @@ -219,15 +219,17 @@ function updateQuadData(chart, label, first, second, third, fourth) { } function update_stats( data ) { - our_callsign = data["stats"]["aprsd"]["callsign"]; - $("#version").text( data["stats"]["aprsd"]["version"] ); + our_callsign = data["APRSDStats"]["callsign"]; + $("#version").text( data["APRSDStats"]["version"] ); $("#aprs_connection").html( data["aprs_connection"] ); - $("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] ); + $("#uptime").text( "uptime: " + data["APRSDStats"]["uptime"] ); const html_pretty = Prism.highlight(JSON.stringify(data, null, '\t'), Prism.languages.json, 'json'); $("#jsonstats").html(html_pretty); short_time = data["time"].split(/\s(.+)/)[1]; - updateDualData(packets_chart, short_time, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]); - updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]); - updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]); - updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]); + packet_list = data["PacketList"]["packets"]; + updateDualData(packets_chart, short_time, data["PacketList"]["sent"], data["PacketList"]["received"]); + updateQuadData(message_chart, short_time, packet_list["MessagePacket"]["tx"], packet_list["MessagePacket"]["rx"], + packet_list["AckPacket"]["tx"], packet_list["AckPacket"]["rx"]); + updateDualData(email_chart, short_time, data["EmailStats"]["sent"], data["EmailStats"]["recieved"]); + updateDualData(memory_chart, short_time, data["APRSDStats"]["memory_peak"], data["APRSDStats"]["memory_current"]); } diff --git a/aprsd/web/admin/static/js/echarts.js b/aprsd/web/admin/static/js/echarts.js index adeb5f6..607d88b 100644 --- a/aprsd/web/admin/static/js/echarts.js +++ b/aprsd/web/admin/static/js/echarts.js @@ -382,21 +382,21 @@ function updateAcksChart() { function update_stats( data ) { console.log(data); - our_callsign = data["stats"]["aprsd"]["callsign"]; - $("#version").text( data["stats"]["aprsd"]["version"] ); + our_callsign = data["APRSDStats"]["callsign"]; + $("#version").text( data["APRSDStats"]["version"] ); $("#aprs_connection").html( data["aprs_connection"] ); - $("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] ); + $("#uptime").text( "uptime: " + data["APRSDStats"]["uptime"] ); const html_pretty = Prism.highlight(JSON.stringify(data, null, '\t'), Prism.languages.json, 'json'); $("#jsonstats").html(html_pretty); t = Date.parse(data["time"]); ts = new Date(t); - updatePacketData(packets_chart, ts, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]); - updatePacketTypesData(ts, data["stats"]["packets"]["types"]); + updatePacketData(packets_chart, ts, data["PacketList"]["tx"], data["PacketList"]["rx"]); + updatePacketTypesData(ts, data["PacketList"]["packets"]); updatePacketTypesChart(); updateMessagesChart(); updateAcksChart(); - updateMemChart(ts, data["stats"]["aprsd"]["memory_current"], data["stats"]["aprsd"]["memory_peak"]); + updateMemChart(ts, data["APRSDStats"]["memory_current"], data["APRSDStats"]["memory_peak"]); //updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]); //updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]); //updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]); diff --git a/aprsd/web/admin/static/js/main.js b/aprsd/web/admin/static/js/main.js index 4598f59..0b6389f 100644 --- a/aprsd/web/admin/static/js/main.js +++ b/aprsd/web/admin/static/js/main.js @@ -28,7 +28,7 @@ function update_watchlist( data ) { var watchdiv = $("#watchDiv"); var html_str = '' watchdiv.html('') - jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) { + jQuery.each(data["WatchList"], function(i, val) { html_str += '' }); html_str += "
HAM CallsignAge since last seen by APRSD
' + i + '' + val["last"] + '
"; @@ -65,7 +65,7 @@ function update_seenlist( data ) { html_str += 'HAM CallsignAge since last seen by APRSD' html_str += 'Number of packets RX' seendiv.html('') - var seen_list = data["stats"]["aprsd"]["seen_list"] + var seen_list = data["SeenList"] var len = Object.keys(seen_list).length $('#seen_count').html(len) jQuery.each(seen_list, function(i, val) { @@ -87,7 +87,7 @@ function update_plugins( data ) { html_str += '' plugindiv.html('') - var plugins = data["stats"]["plugins"]; + var plugins = data["PluginManager"]; var keys = Object.keys(plugins); keys.sort(); for (var i=0; i

Raw JSON

-
{{ stats|safe }}
+
{{ initial_stats|safe }}
diff --git a/aprsd/wsgi.py b/aprsd/wsgi.py index 04d7464..8386e33 100644 --- a/aprsd/wsgi.py +++ b/aprsd/wsgi.py @@ -1,12 +1,11 @@ -import datetime import importlib.metadata as imp import io import json import logging -import time +import queue import flask -from flask import Flask +from flask import Flask, request from flask_httpauth import HTTPBasicAuth from oslo_config import cfg, generator import socketio @@ -15,11 +14,13 @@ from werkzeug.security import check_password_hash import aprsd from aprsd import cli_helper, client, conf, packets, plugin, threads from aprsd.log import log -from aprsd.rpc import client as aprsd_rpc_client +from aprsd.threads import stats as stats_threads +from aprsd.utils import json as aprsd_json CONF = cfg.CONF LOG = logging.getLogger("gunicorn.access") +logging_queue = queue.Queue() auth = HTTPBasicAuth() users: dict[str, str] = {} @@ -45,105 +46,23 @@ def verify_password(username, password): def _stats(): - track = aprsd_rpc_client.RPCClient().get_packet_track() - now = datetime.datetime.now() - - time_format = "%m-%d-%Y %H:%M:%S" - - stats_dict = aprsd_rpc_client.RPCClient().get_stats_dict() - if not stats_dict: - stats_dict = { - "aprsd": {}, - "aprs-is": {"server": ""}, - "messages": { - "sent": 0, - "received": 0, - }, - "email": { - "sent": 0, - "received": 0, - }, - "seen_list": { - "sent": 0, - "received": 0, - }, - } - - # Convert the watch_list entries to age - wl = aprsd_rpc_client.RPCClient().get_watch_list() - new_list = {} - if wl: - for call in wl.get_all(): - # call_date = datetime.datetime.strptime( - # str(wl.last_seen(call)), - # "%Y-%m-%d %H:%M:%S.%f", - # ) - - # We have to convert the RingBuffer to a real list - # so that json.dumps works. - # pkts = [] - # for pkt in wl.get(call)["packets"].get(): - # pkts.append(pkt) - - new_list[call] = { - "last": wl.age(call), - # "packets": pkts - } - - stats_dict["aprsd"]["watch_list"] = new_list - packet_list = aprsd_rpc_client.RPCClient().get_packet_list() - rx = tx = 0 - types = {} - if packet_list: - rx = packet_list.total_rx() - tx = packet_list.total_tx() - types_copy = packet_list.types.copy() - - for key in types_copy: - types[str(key)] = dict(types_copy[key]) - - stats_dict["packets"] = { - "sent": tx, - "received": rx, - "types": types, - } - if track: - size_tracker = len(track) - else: - size_tracker = 0 - - result = { - "time": now.strftime(time_format), - "size_tracker": size_tracker, - "stats": stats_dict, - } - - return result + stats_obj = stats_threads.StatsStore() + stats_obj.load() + # now = datetime.datetime.now() + # time_format = "%m-%d-%Y %H:%M:%S" + stats_dict = stats_obj.data + return stats_dict @app.route("/stats") def stats(): LOG.debug("/stats called") - return json.dumps(_stats()) + return json.dumps(_stats(), cls=aprsd_json.SimpleJSONEncoder) @app.route("/") def index(): stats = _stats() - wl = aprsd_rpc_client.RPCClient().get_watch_list() - if wl and wl.is_enabled(): - watch_count = len(wl) - watch_age = wl.max_delta() - else: - watch_count = 0 - watch_age = 0 - - sl = aprsd_rpc_client.RPCClient().get_seen_list() - if sl: - seen_count = len(sl) - else: - seen_count = 0 - pm = plugin.PluginManager() plugins = pm.get_plugins() plugin_count = len(plugins) @@ -152,7 +71,7 @@ def index(): transport = "aprs-is" aprs_connection = ( "APRS-IS Server: " - "{}".format(stats["stats"]["aprs-is"]["server"]) + "{}".format(stats["APRSClientStats"]["server_string"]) ) else: # We might be connected to a KISS socket? @@ -179,7 +98,7 @@ def index(): return flask.render_template( "index.html", - initial_stats=stats, + initial_stats=json.dumps(stats, cls=aprsd_json.SimpleJSONEncoder), aprs_connection=aprs_connection, callsign=CONF.callsign, version=aprsd.__version__, @@ -187,9 +106,6 @@ def index(): entries, indent=4, sort_keys=True, default=str, ), - watch_count=watch_count, - watch_age=watch_age, - seen_count=seen_count, plugin_count=plugin_count, # oslo_out=generate_oslo() ) @@ -197,6 +113,7 @@ def index(): @auth.login_required def messages(): + _stats() track = packets.PacketTrack() msgs = [] for id in track: @@ -210,18 +127,8 @@ def messages(): @app.route("/packets") def get_packets(): LOG.debug("/packets called") - packet_list = aprsd_rpc_client.RPCClient().get_packet_list() - if packet_list: - tmp_list = [] - pkts = packet_list.copy() - for key in pkts: - pkt = packet_list.get(key) - if pkt: - tmp_list.append(pkt.json) - - return json.dumps(tmp_list) - else: - return json.dumps([]) + stats_dict = _stats() + return json.dumps(stats_dict.get("PacketList", {})) @auth.login_required @@ -273,23 +180,35 @@ def save(): return json.dumps({"messages": "saved"}) +@app.route("/log_entries", methods=["POST"]) +def log_entries(): + """The url that the server can call to update the logs.""" + entries = request.json + LOG.debug(f"Log entries called {len(entries)}") + for entry in entries: + logging_queue.put(entry) + LOG.debug("log_entries done") + return json.dumps({"messages": "saved"}) + + class LogUpdateThread(threads.APRSDThread): - def __init__(self): + def __init__(self, logging_queue=None): super().__init__("LogUpdate") + self.logging_queue = logging_queue def loop(self): if sio: - log_entries = aprsd_rpc_client.RPCClient().get_log_entries() - - if log_entries: - LOG.info(f"Sending log entries! {len(log_entries)}") - for entry in log_entries: + try: + log_entry = self.logging_queue.get(block=True, timeout=1) + if log_entry: sio.emit( - "log_entry", entry, + "log_entry", + log_entry, namespace="/logs", ) - time.sleep(5) + except queue.Empty: + pass return True @@ -297,17 +216,17 @@ class LoggingNamespace(socketio.Namespace): log_thread = None def on_connect(self, sid, environ): - global sio - LOG.debug(f"LOG on_connect {sid}") + global sio, logging_queue + LOG.info(f"LOG on_connect {sid}") sio.emit( "connected", {"data": "/logs Connected"}, namespace="/logs", ) - self.log_thread = LogUpdateThread() + self.log_thread = LogUpdateThread(logging_queue=logging_queue) self.log_thread.start() def on_disconnect(self, sid): - LOG.debug(f"LOG Disconnected {sid}") + LOG.info(f"LOG Disconnected {sid}") if self.log_thread: self.log_thread.stop() @@ -332,7 +251,7 @@ if __name__ == "__main__": async_mode = "threading" sio = socketio.Server(logger=True, async_mode=async_mode) app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app) - log_level = init_app(log_level="DEBUG") + log_level = init_app() log.setup_logging(log_level) sio.register_namespace(LoggingNamespace("/logs")) CONF.log_opt_values(LOG, logging.DEBUG) @@ -352,7 +271,7 @@ if __name__ == "uwsgi_file_aprsd_wsgi": sio = socketio.Server(logger=True, async_mode=async_mode) app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app) log_level = init_app( - log_level="DEBUG", + # log_level="DEBUG", config_file="/config/aprsd.conf", # Commented out for local development. # config_file=cli_helper.DEFAULT_CONFIG_FILE @@ -371,9 +290,9 @@ if __name__ == "aprsd.wsgi": app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app) log_level = init_app( - log_level="DEBUG", - config_file="/config/aprsd.conf", - # config_file=cli_helper.DEFAULT_CONFIG_FILE, + # log_level="DEBUG", + # config_file="/config/aprsd.conf", + config_file=cli_helper.DEFAULT_CONFIG_FILE, ) log.setup_logging(log_level) sio.register_namespace(LoggingNamespace("/logs"))