1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-10-31 15:07:13 -04:00

Removed RPC Server and client.

This patch removes the need for the RPC Server from aprsd.

APRSD Now saves it's stats to a pickled file on disk in the
aprsd.conf configured save_location.  The web admin UI
will depickle that file to fetch the stats.  The aprsd server
will periodically pickle and save the stats to disk.

The Logmonitor will not do a url post to the web admin ui
to send it the latest log entries.

Updated the healthcheck app to use the pickled stats file
and the fetch-stats command to make a url request to the running
admin ui to fetch the stats of the remote aprsd server.
This commit is contained in:
Hemna 2024-04-05 12:42:50 -04:00
parent a8d56a9967
commit 333feee805
23 changed files with 230 additions and 564 deletions

View File

@ -45,9 +45,9 @@ class APRSClientStats:
if client.transport() == TRANSPORT_APRSIS:
stats["server_string"] = client.client.server_string
keepalive = client.client.aprsd_keepalive
if keepalive:
if serializable:
keepalive = keepalive.isoformat()
stats["sever_keepalive"] = keepalive
stats["server_keepalive"] = keepalive
elif client.transport() == TRANSPORT_TCPKISS:
stats["host"] = CONF.kiss_tcp.host
stats["port"] = CONF.kiss_tcp.port

View File

@ -1,10 +1,9 @@
# Fetch active stats from a remote running instance of aprsd server
# This uses the RPC server to fetch the stats from the remote server.
# Fetch active stats from a remote running instance of aprsd admin web interface.
import logging
import click
from oslo_config import cfg
import requests
from rich.console import Console
from rich.table import Table
@ -12,7 +11,6 @@ from rich.table import Table
import aprsd
from aprsd import cli_helper
from aprsd.main import cli
from aprsd.rpc import client as rpc_client
# setup the global logger
@ -26,87 +24,80 @@ CONF = cfg.CONF
@click.option(
"--host", type=str,
default=None,
help="IP address of the remote aprsd server to fetch stats from.",
help="IP address of the remote aprsd admin web ui fetch stats from.",
)
@click.option(
"--port", type=int,
default=None,
help="Port of the remote aprsd server rpc port to fetch stats from.",
)
@click.option(
"--magic-word", type=str,
default=None,
help="Magic word of the remote aprsd server rpc port to fetch stats from.",
help="Port of the remote aprsd web admin interface to fetch stats from.",
)
@click.pass_context
@cli_helper.process_standard_options
def fetch_stats(ctx, host, port, magic_word):
"""Fetch stats from a remote running instance of aprsd server."""
LOG.info(f"APRSD Fetch-Stats started version: {aprsd.__version__}")
def fetch_stats(ctx, host, port):
"""Fetch stats from a APRSD admin web interface."""
console = Console()
console.print(f"APRSD Fetch-Stats started version: {aprsd.__version__}")
CONF.log_opt_values(LOG, logging.DEBUG)
if not host:
host = CONF.rpc_settings.ip
host = CONF.admin.web_ip
if not port:
port = CONF.rpc_settings.port
if not magic_word:
magic_word = CONF.rpc_settings.magic_word
port = CONF.admin.web_port
msg = f"Fetching stats from {host}:{port} with magic word '{magic_word}'"
console = Console()
msg = f"Fetching stats from {host}:{port}"
console.print(msg)
with console.status(msg):
client = rpc_client.RPCClient(host, port, magic_word)
stats = client.get_stats_dict()
if stats:
console.print_json(data=stats)
else:
LOG.error(f"Failed to fetch stats via RPC aprsd server at {host}:{port}")
response = requests.get(f"http://{host}:{port}/stats", timeout=120)
if not response:
console.print(
f"Failed to fetch stats from {host}:{port}?",
style="bold red",
)
return
stats = response.json()
if not stats:
console.print(
f"Failed to fetch stats from aprsd admin ui at {host}:{port}",
style="bold red",
)
return
aprsd_title = (
"APRSD "
f"[bold cyan]v{stats['aprsd']['version']}[/] "
f"Callsign [bold green]{stats['aprsd']['callsign']}[/] "
f"Uptime [bold yellow]{stats['aprsd']['uptime']}[/]"
f"[bold cyan]v{stats['APRSDStats']['version']}[/] "
f"Callsign [bold green]{stats['APRSDStats']['callsign']}[/] "
f"Uptime [bold yellow]{stats['APRSDStats']['uptime']}[/]"
)
console.rule(f"Stats from {host}:{port} with magic word '{magic_word}'")
console.rule(f"Stats from {host}:{port}")
console.print("\n\n")
console.rule(aprsd_title)
# Show the connection to APRS
# It can be a connection to an APRS-IS server or a local TNC via KISS or KISSTCP
if "aprs-is" in stats:
title = f"APRS-IS Connection {stats['aprs-is']['server']}"
title = f"APRS-IS Connection {stats['APRSClientStats']['server_string']}"
table = Table(title=title)
table.add_column("Key")
table.add_column("Value")
for key, value in stats["aprs-is"].items():
for key, value in stats["APRSClientStats"].items():
table.add_row(key, value)
console.print(table)
threads_table = Table(title="Threads")
threads_table.add_column("Name")
threads_table.add_column("Alive?")
for name, alive in stats["aprsd"]["threads"].items():
for name, alive in stats["APRSDThreadList"].items():
threads_table.add_row(name, str(alive))
console.print(threads_table)
msgs_table = Table(title="Messages")
msgs_table.add_column("Key")
msgs_table.add_column("Value")
for key, value in stats["messages"].items():
msgs_table.add_row(key, str(value))
console.print(msgs_table)
packet_totals = Table(title="Packet Totals")
packet_totals.add_column("Key")
packet_totals.add_column("Value")
packet_totals.add_row("Total Received", str(stats["packets"]["total_received"]))
packet_totals.add_row("Total Sent", str(stats["packets"]["total_sent"]))
packet_totals.add_row("Total Tracked", str(stats["packets"]["total_tracked"]))
packet_totals.add_row("Total Received", str(stats["PacketList"]["rx"]))
packet_totals.add_row("Total Sent", str(stats["PacketList"]["tx"]))
console.print(packet_totals)
# Show each of the packet types
@ -114,47 +105,52 @@ def fetch_stats(ctx, host, port, magic_word):
packets_table.add_column("Packet Type")
packets_table.add_column("TX")
packets_table.add_column("RX")
for key, value in stats["packets"]["by_type"].items():
for key, value in stats["PacketList"]["packets"].items():
packets_table.add_row(key, str(value["tx"]), str(value["rx"]))
console.print(packets_table)
if "plugins" in stats:
count = len(stats["plugins"])
count = len(stats["PluginManager"])
plugins_table = Table(title=f"Plugins ({count})")
plugins_table.add_column("Plugin")
plugins_table.add_column("Enabled")
plugins_table.add_column("Version")
plugins_table.add_column("TX")
plugins_table.add_column("RX")
for key, value in stats["plugins"].items():
plugins = stats["PluginManager"]
for key, value in plugins.items():
plugins_table.add_row(
key,
str(stats["plugins"][key]["enabled"]),
stats["plugins"][key]["version"],
str(stats["plugins"][key]["tx"]),
str(stats["plugins"][key]["rx"]),
str(plugins[key]["enabled"]),
plugins[key]["version"],
str(plugins[key]["tx"]),
str(plugins[key]["rx"]),
)
console.print(plugins_table)
if "seen_list" in stats["aprsd"]:
count = len(stats["aprsd"]["seen_list"])
seen_list = stats.get("SeenList")
if seen_list:
count = len(seen_list)
seen_table = Table(title=f"Seen List ({count})")
seen_table.add_column("Callsign")
seen_table.add_column("Message Count")
seen_table.add_column("Last Heard")
for key, value in stats["aprsd"]["seen_list"].items():
for key, value in seen_list.items():
seen_table.add_row(key, str(value["count"]), value["last"])
console.print(seen_table)
if "watch_list" in stats["aprsd"]:
count = len(stats["aprsd"]["watch_list"])
watch_list = stats.get("WatchList")
if watch_list:
count = len(watch_list)
watch_table = Table(title=f"Watch List ({count})")
watch_table.add_column("Callsign")
watch_table.add_column("Last Heard")
for key, value in stats["aprsd"]["watch_list"].items():
for key, value in watch_list.items():
watch_table.add_row(key, value["last"])
console.print(watch_table)

View File

@ -13,11 +13,11 @@ from oslo_config import cfg
from rich.console import Console
import aprsd
from aprsd import cli_helper, utils
from aprsd import cli_helper
from aprsd import conf # noqa
# local imports here
from aprsd.main import cli
from aprsd.rpc import client as aprsd_rpc_client
from aprsd.threads import stats as stats_threads
# setup the global logger
@ -39,46 +39,48 @@ console = Console()
@cli_helper.process_standard_options
def healthcheck(ctx, timeout):
"""Check the health of the running aprsd server."""
console.log(f"APRSD HealthCheck version: {aprsd.__version__}")
if not CONF.rpc_settings.enabled:
LOG.error("Must enable rpc_settings.enabled to use healthcheck")
sys.exit(-1)
if not CONF.rpc_settings.ip:
LOG.error("Must enable rpc_settings.ip to use healthcheck")
sys.exit(-1)
if not CONF.rpc_settings.magic_word:
LOG.error("Must enable rpc_settings.magic_word to use healthcheck")
sys.exit(-1)
ver_str = f"APRSD HealthCheck version: {aprsd.__version__}"
console.log(ver_str)
with console.status(f"APRSD HealthCheck version: {aprsd.__version__}") as status:
with console.status(ver_str):
try:
status.update(f"Contacting APRSD via RPC {CONF.rpc_settings.ip}")
stats = aprsd_rpc_client.RPCClient().get_stats_dict()
stats_obj = stats_threads.StatsStore()
stats_obj.load()
stats = stats_obj.data
# console.print(stats)
except Exception as ex:
console.log(f"Failed to fetch healthcheck : '{ex}'")
console.log(f"Failed to load stats: '{ex}'")
sys.exit(-1)
else:
now = datetime.datetime.now()
if not stats:
console.log("No stats from aprsd")
sys.exit(-1)
email_thread_last_update = stats["email"]["thread_last_update"]
if email_thread_last_update != "never":
delta = utils.parse_delta_str(email_thread_last_update)
d = datetime.timedelta(**delta)
email_stats = stats.get("EmailStats")
if email_stats:
email_thread_last_update = email_stats["last_check_time"]
if email_thread_last_update != "never":
d = now - email_thread_last_update
max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
console.log(f"Email thread is very old! {d}")
sys.exit(-1)
client_stats = stats.get("APRSClientStats")
if not client_stats:
console.log("No APRSClientStats")
sys.exit(-1)
else:
aprsis_last_update = client_stats["server_keepalive"]
d = now - aprsis_last_update
max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
console.log(f"Email thread is very old! {d}")
LOG.error(f"APRS-IS last update is very old! {d}")
sys.exit(-1)
aprsis_last_update = stats["aprs-is"]["last_update"]
delta = utils.parse_delta_str(aprsis_last_update)
d = datetime.timedelta(**delta)
max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.error(f"APRS-IS last update is very old! {d}")
sys.exit(-1)
console.log("OK")
sys.exit(0)

View File

@ -10,7 +10,9 @@ from aprsd import cli_helper, client
from aprsd import main as aprsd_main
from aprsd import packets, plugin, threads, utils
from aprsd.main import cli
from aprsd.threads import log_monitor, registry, rx, tx
from aprsd.threads import keep_alive, log_monitor, registry, rx
from aprsd.threads import stats as stats_thread
from aprsd.threads import tx
CONF = cfg.CONF
@ -101,11 +103,11 @@ def server(ctx, flush):
packets.WatchList().load()
packets.SeenList().load()
keepalive = threads.KeepAliveThread()
keepalive = keep_alive.KeepAliveThread()
keepalive.start()
stats_thread = threads.APRSDStatsStoreThread()
stats_thread.start()
stats_store_thread = stats_thread.APRSDStatsStoreThread()
stats_store_thread.start()
rx_thread = rx.APRSDPluginRXThread(
packet_queue=threads.packet_queue,
@ -126,7 +128,7 @@ def server(ctx, flush):
registry_thread = registry.APRSRegistryThread()
registry_thread.start()
if CONF.rpc_settings.enabled:
if CONF.admin.web_enabled:
log_monitor_thread = log_monitor.LogMonitorThread()
log_monitor_thread.start()

View File

@ -23,7 +23,7 @@ from aprsd import (
)
from aprsd.main import cli
from aprsd.threads import aprsd as aprsd_threads
from aprsd.threads import rx, tx
from aprsd.threads import keep_alive, rx, tx
from aprsd.utils import trace
@ -642,7 +642,7 @@ def webchat(ctx, flush, port):
packets.WatchList()
packets.SeenList()
keepalive = threads.KeepAliveThread()
keepalive = keep_alive.KeepAliveThread()
LOG.info("Start KeepAliveThread")
keepalive.start()

View File

@ -15,10 +15,6 @@ watch_list_group = cfg.OptGroup(
name="watch_list",
title="Watch List settings",
)
rpc_group = cfg.OptGroup(
name="rpc_settings",
title="RPC Settings for admin <--> web",
)
webchat_group = cfg.OptGroup(
name="webchat",
title="Settings specific to the webchat command",
@ -169,28 +165,6 @@ admin_opts = [
),
]
rpc_opts = [
cfg.BoolOpt(
"enabled",
default=True,
help="Enable RPC calls",
),
cfg.StrOpt(
"ip",
default="localhost",
help="The ip address to listen on",
),
cfg.PortOpt(
"port",
default=18861,
help="The port to listen on",
),
cfg.StrOpt(
"magic_word",
default=APRSD_DEFAULT_MAGIC_WORD,
help="Magic word to authenticate requests between client/server",
),
]
enabled_plugins_opts = [
cfg.ListOpt(
@ -281,8 +255,6 @@ def register_opts(config):
config.register_opts(admin_opts, group=admin_group)
config.register_group(watch_list_group)
config.register_opts(watch_list_opts, group=watch_list_group)
config.register_group(rpc_group)
config.register_opts(rpc_opts, group=rpc_group)
config.register_group(webchat_group)
config.register_opts(webchat_opts, group=webchat_group)
config.register_group(registry_group)
@ -294,7 +266,6 @@ def list_opts():
"DEFAULT": (aprsd_opts + enabled_plugins_opts),
admin_group.name: admin_opts,
watch_list_group.name: watch_list_opts,
rpc_group.name: rpc_opts,
webchat_group.name: webchat_opts,
registry_group.name: registry_opts,
}

View File

@ -36,7 +36,6 @@ class InterceptHandler(logging.Handler):
# to disable log to stdout, but still log to file
# use the --quiet option on the cmdln
def setup_logging(loglevel=None, quiet=False):
print(f"setup_logging: loglevel={loglevel}, quiet={quiet}")
if not loglevel:
log_level = CONF.logging.log_level
else:
@ -58,6 +57,8 @@ def setup_logging(loglevel=None, quiet=False):
webserver_list = [
"werkzeug",
"werkzeug._internal",
"socketio",
"urllib3.connectionpool",
]
# We don't really want to see the aprslib parsing debug output.

View File

@ -45,7 +45,6 @@ CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
flask_enabled = False
rpc_serv = None
def custom_startswith(string, incomplete):

View File

@ -109,14 +109,6 @@ class Packet:
path: List[str] = field(default_factory=list, compare=False, hash=False)
via: Optional[str] = field(default=None, compare=False, hash=False)
@property
def json(self):
"""get the json formated string.
This is used soley by the rpc server to return json over the wire.
"""
return self.to_json()
def get(self, key: str, default: Optional[str] = None):
"""Emulate a getter on a dict."""
if hasattr(self, key):

View File

@ -26,6 +26,14 @@ class SeenList(objectstore.ObjectStoreMixin):
cls._instance.data = {}
return cls._instance
def stats(self, serializable=False):
"""Return the stats for the PacketTrack class."""
stats = self.data
# if serializable:
# for call in self.data:
# stats[call]["last"] = stats[call]["last"].isoformat()
return stats
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = None
@ -39,5 +47,5 @@ class SeenList(objectstore.ObjectStoreMixin):
"last": None,
"count": 0,
}
self.data[callsign]["last"] = str(datetime.datetime.now())
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["count"] += 1

View File

@ -1,14 +0,0 @@
import rpyc
class AuthSocketStream(rpyc.SocketStream):
"""Used to authenitcate the RPC stream to remote."""
@classmethod
def connect(cls, *args, authorizer=None, **kwargs):
stream_obj = super().connect(*args, **kwargs)
if callable(authorizer):
authorizer(stream_obj.sock)
return stream_obj

View File

@ -1,165 +0,0 @@
import json
import logging
from oslo_config import cfg
import rpyc
from aprsd import conf # noqa
from aprsd import rpc
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class RPCClient:
_instance = None
_rpc_client = None
ip = None
port = None
magic_word = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, ip=None, port=None, magic_word=None):
if ip:
self.ip = ip
else:
self.ip = CONF.rpc_settings.ip
if port:
self.port = int(port)
else:
self.port = CONF.rpc_settings.port
if magic_word:
self.magic_word = magic_word
else:
self.magic_word = CONF.rpc_settings.magic_word
self._check_settings()
self.get_rpc_client()
def _check_settings(self):
if not CONF.rpc_settings.enabled:
LOG.warning("RPC is not enabled, no way to get stats!!")
if self.magic_word == conf.common.APRSD_DEFAULT_MAGIC_WORD:
LOG.warning("You are using the default RPC magic word!!!")
LOG.warning("edit aprsd.conf and change rpc_settings.magic_word")
LOG.debug(f"RPC Client: {self.ip}:{self.port} {self.magic_word}")
def _rpyc_connect(
self, host, port, service=rpyc.VoidService,
config={}, ipv6=False,
keepalive=False, authorizer=None, ):
LOG.info(f"Connecting to RPC host '{host}:{port}'")
try:
s = rpc.AuthSocketStream.connect(
host, port, ipv6=ipv6, keepalive=keepalive,
authorizer=authorizer,
)
return rpyc.utils.factory.connect_stream(s, service, config=config)
except ConnectionRefusedError:
LOG.error(f"Failed to connect to RPC host '{host}:{port}'")
return None
def get_rpc_client(self):
if not self._rpc_client:
self._rpc_client = self._rpyc_connect(
self.ip,
self.port,
authorizer=lambda sock: sock.send(self.magic_word.encode()),
)
return self._rpc_client
def get_stats_dict(self):
cl = self.get_rpc_client()
result = {}
if not cl:
return result
try:
rpc_stats_dict = cl.root.get_stats()
result = json.loads(rpc_stats_dict)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_stats(self):
cl = self.get_rpc_client()
result = {}
if not cl:
return result
try:
result = cl.root.get_stats_obj()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_track(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_track()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_watch_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_watch_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_seen_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_seen_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_log_entries(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result_str = cl.root.get_log_entries()
result = json.loads(result_str)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result

View File

@ -1,99 +0,0 @@
import json
import logging
from oslo_config import cfg
import rpyc
from rpyc.utils.authenticators import AuthenticationError
from rpyc.utils.server import ThreadPoolServer
from aprsd import conf # noqa: F401
from aprsd import packets, stats, threads
from aprsd.threads import log_monitor
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
def magic_word_authenticator(sock):
client_ip = sock.getpeername()[0]
magic = sock.recv(len(CONF.rpc_settings.magic_word)).decode()
if magic != CONF.rpc_settings.magic_word:
LOG.error(
f"wrong magic word passed from {client_ip} "
"'{magic}' != '{CONF.rpc_settings.magic_word}'",
)
raise AuthenticationError(
f"wrong magic word passed in '{magic}'"
f" != '{CONF.rpc_settings.magic_word}'",
)
return sock, None
class APRSDRPCThread(threads.APRSDThread):
def __init__(self):
super().__init__(name="RPCThread")
self.thread = ThreadPoolServer(
APRSDService,
port=CONF.rpc_settings.port,
protocol_config={"allow_public_attrs": True},
authenticator=magic_word_authenticator,
)
def stop(self):
if self.thread:
self.thread.close()
self.thread_stop = True
def loop(self):
# there is no loop as run is blocked
if self.thread and not self.thread_stop:
# This is a blocking call
self.thread.start()
@rpyc.service
class APRSDService(rpyc.Service):
def on_connect(self, conn):
# code that runs when a connection is created
# (to init the service, if needed)
LOG.info("RPC Client Connected")
self._conn = conn
def on_disconnect(self, conn):
# code that runs after the connection has already closed
# (to finalize the service, if needed)
LOG.info("RPC Client Disconnected")
self._conn = None
@rpyc.exposed
def get_stats(self):
stat = stats.APRSDStats()
stats_dict = stat.stats()
return_str = json.dumps(stats_dict, indent=4, sort_keys=True, default=str)
return return_str
@rpyc.exposed
def get_stats_obj(self):
return stats.APRSDStats()
@rpyc.exposed
def get_packet_list(self):
return packets.PacketList()
@rpyc.exposed
def get_packet_track(self):
return packets.PacketTrack()
@rpyc.exposed
def get_watch_list(self):
return packets.WatchList()
@rpyc.exposed
def get_seen_list(self):
return packets.SeenList()
@rpyc.exposed
def get_log_entries(self):
entries = log_monitor.LogEntries().get_all_and_purge()
return json.dumps(entries, default=str)

View File

@ -1,6 +1,6 @@
from aprsd import client as aprs_client
from aprsd import plugin
from aprsd.packets import packet_list, tracker, watch_list
from aprsd.packets import packet_list, seen_list, tracker, watch_list
from aprsd.plugins import email
from aprsd.stats import app, collector
from aprsd.threads import aprsd
@ -17,3 +17,4 @@ stats_collector.register_producer(plugin.PluginManager())
stats_collector.register_producer(aprsd.APRSDThreadList())
stats_collector.register_producer(email.EmailStats())
stats_collector.register_producer(aprs_client.APRSClientStats())
stats_collector.register_producer(seen_list.SeenList())

View File

@ -3,11 +3,9 @@ import queue
# Make these available to anyone importing
# aprsd.threads
from .aprsd import APRSDThread, APRSDThreadList # noqa: F401
from .keep_alive import KeepAliveThread # noqa: F401
from .rx import ( # noqa: F401
APRSDDupeRXThread, APRSDProcessPacketThread, APRSDRXThread,
)
from .stats import APRSDStatsStoreThread # noqa: F401
packet_queue = queue.Queue(maxsize=20)

View File

@ -1,19 +1,44 @@
import datetime
import logging
import threading
from oslo_config import cfg
import requests
import wrapt
from aprsd import threads
from aprsd.log import log
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
def send_log_entries(force=False):
"""Send all of the log entries to the web interface."""
if CONF.admin.web_enabled:
if force or LogEntries().is_purge_ready():
entries = LogEntries().get_all_and_purge()
print(f"Sending log entries {len(entries)}")
if entries:
try:
requests.post(
f"http://{CONF.admin.web_ip}:{CONF.admin.web_port}/log_entries",
json=entries,
auth=(CONF.admin.user, CONF.admin.password),
)
except Exception:
pass
class LogEntries:
entries = []
lock = threading.Lock()
_instance = None
last_purge = datetime.datetime.now()
max_delta = datetime.timedelta(
hours=0.0, minutes=0, seconds=2,
)
def __new__(cls, *args, **kwargs):
if cls._instance is None:
@ -33,8 +58,18 @@ class LogEntries:
def get_all_and_purge(self):
entries = self.entries.copy()
self.entries = []
self.last_purge = datetime.datetime.now()
return entries
def is_purge_ready(self):
now = datetime.datetime.now()
if (
now - self.last_purge > self.max_delta
and len(self.entries) > 1
):
return True
return False
@wrapt.synchronized(lock)
def __len__(self):
return len(self.entries)
@ -45,6 +80,10 @@ class LogMonitorThread(threads.APRSDThread):
def __init__(self):
super().__init__("LogMonitorThread")
def stop(self):
send_log_entries(force=True)
super().stop()
def loop(self):
try:
record = log.logging_queue.get(block=True, timeout=2)
@ -59,6 +98,7 @@ class LogMonitorThread(threads.APRSDThread):
# Just ignore thi
pass
send_log_entries()
return True
def json_record(self, record):

View File

@ -27,7 +27,6 @@ class APRSDRXThread(APRSDThread):
self._client.stop()
def loop(self):
LOG.debug(f"RX_MSG-LOOP {self.loop_count}")
if not self._client:
self._client = client.factory.create()
time.sleep(1)
@ -43,11 +42,9 @@ class APRSDRXThread(APRSDThread):
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
LOG.debug(f"Calling client consumer CL {self._client}")
self._client.consumer(
self._process_packet, raw=False, blocking=False,
)
LOG.debug(f"Consumer done {self._client}")
except (
aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError,

View File

@ -42,6 +42,22 @@ class EnhancedJSONEncoder(json.JSONEncoder):
return super().default(obj)
class SimpleJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
elif isinstance(obj, datetime.date):
return str(obj)
elif isinstance(obj, datetime.time):
return str(obj)
elif isinstance(obj, datetime.timedelta):
return str(obj)
elif isinstance(obj, decimal.Decimal):
return str(obj)
else:
return super().default(obj)
class EnhancedJSONDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):

View File

@ -219,15 +219,17 @@ function updateQuadData(chart, label, first, second, third, fourth) {
}
function update_stats( data ) {
our_callsign = data["stats"]["aprsd"]["callsign"];
$("#version").text( data["stats"]["aprsd"]["version"] );
our_callsign = data["APRSDStats"]["callsign"];
$("#version").text( data["APRSDStats"]["version"] );
$("#aprs_connection").html( data["aprs_connection"] );
$("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] );
$("#uptime").text( "uptime: " + data["APRSDStats"]["uptime"] );
const html_pretty = Prism.highlight(JSON.stringify(data, null, '\t'), Prism.languages.json, 'json');
$("#jsonstats").html(html_pretty);
short_time = data["time"].split(/\s(.+)/)[1];
updateDualData(packets_chart, short_time, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]);
updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]);
updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]);
packet_list = data["PacketList"]["packets"];
updateDualData(packets_chart, short_time, data["PacketList"]["sent"], data["PacketList"]["received"]);
updateQuadData(message_chart, short_time, packet_list["MessagePacket"]["tx"], packet_list["MessagePacket"]["rx"],
packet_list["AckPacket"]["tx"], packet_list["AckPacket"]["rx"]);
updateDualData(email_chart, short_time, data["EmailStats"]["sent"], data["EmailStats"]["recieved"]);
updateDualData(memory_chart, short_time, data["APRSDStats"]["memory_peak"], data["APRSDStats"]["memory_current"]);
}

View File

@ -382,21 +382,21 @@ function updateAcksChart() {
function update_stats( data ) {
console.log(data);
our_callsign = data["stats"]["aprsd"]["callsign"];
$("#version").text( data["stats"]["aprsd"]["version"] );
our_callsign = data["APRSDStats"]["callsign"];
$("#version").text( data["APRSDStats"]["version"] );
$("#aprs_connection").html( data["aprs_connection"] );
$("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] );
$("#uptime").text( "uptime: " + data["APRSDStats"]["uptime"] );
const html_pretty = Prism.highlight(JSON.stringify(data, null, '\t'), Prism.languages.json, 'json');
$("#jsonstats").html(html_pretty);
t = Date.parse(data["time"]);
ts = new Date(t);
updatePacketData(packets_chart, ts, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]);
updatePacketTypesData(ts, data["stats"]["packets"]["types"]);
updatePacketData(packets_chart, ts, data["PacketList"]["tx"], data["PacketList"]["rx"]);
updatePacketTypesData(ts, data["PacketList"]["packets"]);
updatePacketTypesChart();
updateMessagesChart();
updateAcksChart();
updateMemChart(ts, data["stats"]["aprsd"]["memory_current"], data["stats"]["aprsd"]["memory_peak"]);
updateMemChart(ts, data["APRSDStats"]["memory_current"], data["APRSDStats"]["memory_peak"]);
//updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]);
//updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]);
//updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]);

View File

@ -28,7 +28,7 @@ function update_watchlist( data ) {
var watchdiv = $("#watchDiv");
var html_str = '<table class="ui celled striped table"><thead><tr><th>HAM Callsign</th><th>Age since last seen by APRSD</th></tr></thead><tbody>'
watchdiv.html('')
jQuery.each(data["stats"]["aprsd"]["watch_list"], function(i, val) {
jQuery.each(data["WatchList"], function(i, val) {
html_str += '<tr><td class="collapsing"><img id="callsign_'+i+'" class="aprsd_1"></img>' + i + '</td><td>' + val["last"] + '</td></tr>'
});
html_str += "</tbody></table>";
@ -65,7 +65,7 @@ function update_seenlist( data ) {
html_str += '<thead><tr><th>HAM Callsign</th><th>Age since last seen by APRSD</th>'
html_str += '<th>Number of packets RX</th></tr></thead><tbody>'
seendiv.html('')
var seen_list = data["stats"]["aprsd"]["seen_list"]
var seen_list = data["SeenList"]
var len = Object.keys(seen_list).length
$('#seen_count').html(len)
jQuery.each(seen_list, function(i, val) {
@ -87,7 +87,7 @@ function update_plugins( data ) {
html_str += '</tr></thead><tbody>'
plugindiv.html('')
var plugins = data["stats"]["plugins"];
var plugins = data["PluginManager"];
var keys = Object.keys(plugins);
keys.sort();
for (var i=0; i<keys.length; i++) { // now lets iterate in sort order

View File

@ -174,7 +174,7 @@
<div class="ui bottom attached tab segment" data-tab="raw-tab">
<h3 class="ui dividing header">Raw JSON</h3>
<pre id="jsonstats" class="language-yaml" style="height:600px;overflow-y:auto;">{{ stats|safe }}</pre>
<pre id="jsonstats" class="language-yaml" style="height:600px;overflow-y:auto;">{{ initial_stats|safe }}</pre>
</div>
<div class="ui text container">

View File

@ -1,12 +1,11 @@
import datetime
import importlib.metadata as imp
import io
import json
import logging
import time
import queue
import flask
from flask import Flask
from flask import Flask, request
from flask_httpauth import HTTPBasicAuth
from oslo_config import cfg, generator
import socketio
@ -15,11 +14,13 @@ from werkzeug.security import check_password_hash
import aprsd
from aprsd import cli_helper, client, conf, packets, plugin, threads
from aprsd.log import log
from aprsd.rpc import client as aprsd_rpc_client
from aprsd.threads import stats as stats_threads
from aprsd.utils import json as aprsd_json
CONF = cfg.CONF
LOG = logging.getLogger("gunicorn.access")
logging_queue = queue.Queue()
auth = HTTPBasicAuth()
users: dict[str, str] = {}
@ -45,105 +46,23 @@ def verify_password(username, password):
def _stats():
track = aprsd_rpc_client.RPCClient().get_packet_track()
now = datetime.datetime.now()
time_format = "%m-%d-%Y %H:%M:%S"
stats_dict = aprsd_rpc_client.RPCClient().get_stats_dict()
if not stats_dict:
stats_dict = {
"aprsd": {},
"aprs-is": {"server": ""},
"messages": {
"sent": 0,
"received": 0,
},
"email": {
"sent": 0,
"received": 0,
},
"seen_list": {
"sent": 0,
"received": 0,
},
}
# Convert the watch_list entries to age
wl = aprsd_rpc_client.RPCClient().get_watch_list()
new_list = {}
if wl:
for call in wl.get_all():
# call_date = datetime.datetime.strptime(
# str(wl.last_seen(call)),
# "%Y-%m-%d %H:%M:%S.%f",
# )
# We have to convert the RingBuffer to a real list
# so that json.dumps works.
# pkts = []
# for pkt in wl.get(call)["packets"].get():
# pkts.append(pkt)
new_list[call] = {
"last": wl.age(call),
# "packets": pkts
}
stats_dict["aprsd"]["watch_list"] = new_list
packet_list = aprsd_rpc_client.RPCClient().get_packet_list()
rx = tx = 0
types = {}
if packet_list:
rx = packet_list.total_rx()
tx = packet_list.total_tx()
types_copy = packet_list.types.copy()
for key in types_copy:
types[str(key)] = dict(types_copy[key])
stats_dict["packets"] = {
"sent": tx,
"received": rx,
"types": types,
}
if track:
size_tracker = len(track)
else:
size_tracker = 0
result = {
"time": now.strftime(time_format),
"size_tracker": size_tracker,
"stats": stats_dict,
}
return result
stats_obj = stats_threads.StatsStore()
stats_obj.load()
# now = datetime.datetime.now()
# time_format = "%m-%d-%Y %H:%M:%S"
stats_dict = stats_obj.data
return stats_dict
@app.route("/stats")
def stats():
LOG.debug("/stats called")
return json.dumps(_stats())
return json.dumps(_stats(), cls=aprsd_json.SimpleJSONEncoder)
@app.route("/")
def index():
stats = _stats()
wl = aprsd_rpc_client.RPCClient().get_watch_list()
if wl and wl.is_enabled():
watch_count = len(wl)
watch_age = wl.max_delta()
else:
watch_count = 0
watch_age = 0
sl = aprsd_rpc_client.RPCClient().get_seen_list()
if sl:
seen_count = len(sl)
else:
seen_count = 0
pm = plugin.PluginManager()
plugins = pm.get_plugins()
plugin_count = len(plugins)
@ -152,7 +71,7 @@ def index():
transport = "aprs-is"
aprs_connection = (
"APRS-IS Server: <a href='http://status.aprs2.net' >"
"{}</a>".format(stats["stats"]["aprs-is"]["server"])
"{}</a>".format(stats["APRSClientStats"]["server_string"])
)
else:
# We might be connected to a KISS socket?
@ -179,7 +98,7 @@ def index():
return flask.render_template(
"index.html",
initial_stats=stats,
initial_stats=json.dumps(stats, cls=aprsd_json.SimpleJSONEncoder),
aprs_connection=aprs_connection,
callsign=CONF.callsign,
version=aprsd.__version__,
@ -187,9 +106,6 @@ def index():
entries, indent=4,
sort_keys=True, default=str,
),
watch_count=watch_count,
watch_age=watch_age,
seen_count=seen_count,
plugin_count=plugin_count,
# oslo_out=generate_oslo()
)
@ -197,6 +113,7 @@ def index():
@auth.login_required
def messages():
_stats()
track = packets.PacketTrack()
msgs = []
for id in track:
@ -210,18 +127,8 @@ def messages():
@app.route("/packets")
def get_packets():
LOG.debug("/packets called")
packet_list = aprsd_rpc_client.RPCClient().get_packet_list()
if packet_list:
tmp_list = []
pkts = packet_list.copy()
for key in pkts:
pkt = packet_list.get(key)
if pkt:
tmp_list.append(pkt.json)
return json.dumps(tmp_list)
else:
return json.dumps([])
stats_dict = _stats()
return json.dumps(stats_dict.get("PacketList", {}))
@auth.login_required
@ -273,23 +180,35 @@ def save():
return json.dumps({"messages": "saved"})
@app.route("/log_entries", methods=["POST"])
def log_entries():
"""The url that the server can call to update the logs."""
entries = request.json
LOG.debug(f"Log entries called {len(entries)}")
for entry in entries:
logging_queue.put(entry)
LOG.debug("log_entries done")
return json.dumps({"messages": "saved"})
class LogUpdateThread(threads.APRSDThread):
def __init__(self):
def __init__(self, logging_queue=None):
super().__init__("LogUpdate")
self.logging_queue = logging_queue
def loop(self):
if sio:
log_entries = aprsd_rpc_client.RPCClient().get_log_entries()
if log_entries:
LOG.info(f"Sending log entries! {len(log_entries)}")
for entry in log_entries:
try:
log_entry = self.logging_queue.get(block=True, timeout=1)
if log_entry:
sio.emit(
"log_entry", entry,
"log_entry",
log_entry,
namespace="/logs",
)
time.sleep(5)
except queue.Empty:
pass
return True
@ -297,17 +216,17 @@ class LoggingNamespace(socketio.Namespace):
log_thread = None
def on_connect(self, sid, environ):
global sio
LOG.debug(f"LOG on_connect {sid}")
global sio, logging_queue
LOG.info(f"LOG on_connect {sid}")
sio.emit(
"connected", {"data": "/logs Connected"},
namespace="/logs",
)
self.log_thread = LogUpdateThread()
self.log_thread = LogUpdateThread(logging_queue=logging_queue)
self.log_thread.start()
def on_disconnect(self, sid):
LOG.debug(f"LOG Disconnected {sid}")
LOG.info(f"LOG Disconnected {sid}")
if self.log_thread:
self.log_thread.stop()
@ -332,7 +251,7 @@ if __name__ == "__main__":
async_mode = "threading"
sio = socketio.Server(logger=True, async_mode=async_mode)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)
log_level = init_app(log_level="DEBUG")
log_level = init_app()
log.setup_logging(log_level)
sio.register_namespace(LoggingNamespace("/logs"))
CONF.log_opt_values(LOG, logging.DEBUG)
@ -352,7 +271,7 @@ if __name__ == "uwsgi_file_aprsd_wsgi":
sio = socketio.Server(logger=True, async_mode=async_mode)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)
log_level = init_app(
log_level="DEBUG",
# log_level="DEBUG",
config_file="/config/aprsd.conf",
# Commented out for local development.
# config_file=cli_helper.DEFAULT_CONFIG_FILE
@ -371,9 +290,9 @@ if __name__ == "aprsd.wsgi":
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)
log_level = init_app(
log_level="DEBUG",
config_file="/config/aprsd.conf",
# config_file=cli_helper.DEFAULT_CONFIG_FILE,
# log_level="DEBUG",
# config_file="/config/aprsd.conf",
config_file=cli_helper.DEFAULT_CONFIG_FILE,
)
log.setup_logging(log_level)
sio.register_namespace(LoggingNamespace("/logs"))