1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-07-21 17:55:24 -04:00

Migrate admin web out of aprsd.

This patch removes the admin command out of aprsd proper.
The admin interface will be an aprsd extension called
aprsd-admin-extension.

This is the start of the effort to trim down the base of APRSD
and the commands to server, healthcheck, list_plugins, list_extensions.

This will reduce the number of required packages that APRSD core needs.
This is useful for others using APRSD as their base to their
applications.
This commit is contained in:
Hemna 2024-12-09 13:36:01 -05:00
parent caa4bb8bd0
commit c48ff8dfd4
8 changed files with 62 additions and 590 deletions

View File

@ -1,57 +0,0 @@
import logging
import os
import signal
import click
from oslo_config import cfg
import socketio
import aprsd
from aprsd import cli_helper
from aprsd import main as aprsd_main
from aprsd import utils
from aprsd.main import cli
os.environ["APRSD_ADMIN_COMMAND"] = "1"
# this import has to happen AFTER we set the
# above environment variable, so that the code
# inside the wsgi.py has the value
from aprsd import wsgi as aprsd_wsgi # noqa
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
# main() ###
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.pass_context
@cli_helper.process_standard_options
def admin(ctx):
"""Start the aprsd admin interface."""
signal.signal(signal.SIGINT, aprsd_main.signal_handler)
signal.signal(signal.SIGTERM, aprsd_main.signal_handler)
level, msg = utils._check_version()
if level:
LOG.warning(msg)
else:
LOG.info(msg)
LOG.info(f"APRSD Started version: {aprsd.__version__}")
# Dump all the config options now.
CONF.log_opt_values(LOG, logging.DEBUG)
async_mode = "threading"
sio = socketio.Server(logger=True, async_mode=async_mode)
aprsd_wsgi.app.wsgi_app = socketio.WSGIApp(sio, aprsd_wsgi.app.wsgi_app)
aprsd_wsgi.init_app()
sio.register_namespace(aprsd_wsgi.LoggingNamespace("/logs"))
CONF.log_opt_values(LOG, logging.DEBUG)
aprsd_wsgi.app.run(
threaded=True,
debug=False,
port=CONF.admin.web_port,
host=CONF.admin.web_ip,
)

View File

@ -13,15 +13,49 @@ from aprsd.client import client_factory
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector as packet_collector from aprsd.packets import collector as packet_collector
from aprsd.packets import seen_list from aprsd.packets import seen_list
from aprsd.threads import keep_alive, log_monitor, registry, rx from aprsd.threads import aprsd as aprsd_threads
from aprsd.threads import keep_alive, registry, rx
from aprsd.threads import stats as stats_thread from aprsd.threads import stats as stats_thread
from aprsd.threads import tx from aprsd.threads import tx
from aprsd.utils import singleton
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@singleton
class ServerThreads:
"""Registry for threads that the server command runs.
This enables extensions to register a thread to run during
the server command.
"""
def __init__(self):
self.threads: list[aprsd_threads.APRSDThread] = []
def register(self, thread: aprsd_threads.APRSDThread):
if not isinstance(thread, aprsd_threads.APRSDThread):
raise TypeError(f"Thread {thread} is not an APRSDThread")
self.threads.append(thread)
def unregister(self, thread: aprsd_threads.APRSDThread):
if not isinstance(thread, aprsd_threads.APRSDThread):
raise TypeError(f"Thread {thread} is not an APRSDThread")
self.threads.remove(thread)
def start(self):
"""Start all threads in the list."""
for thread in self.threads:
thread.start()
def join(self):
"""Join all the threads in the list"""
for thread in self.threads:
thread.join()
# main() ### # main() ###
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@ -41,6 +75,8 @@ def server(ctx, flush):
signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGINT, aprsd_main.signal_handler)
signal.signal(signal.SIGTERM, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler)
server_threads = ServerThreads()
level, msg = utils._check_version() level, msg = utils._check_version()
if level: if level:
LOG.warning(msg) LOG.warning(msg)
@ -110,36 +146,28 @@ def server(ctx, flush):
# Now start all the main processing threads. # Now start all the main processing threads.
keepalive = keep_alive.KeepAliveThread() server_threads.register(keep_alive.KeepAliveThread())
keepalive.start() server_threads.register(stats_thread.APRSDStatsStoreThread())
server_threads.register(
stats_store_thread = stats_thread.APRSDStatsStoreThread() rx.APRSDPluginRXThread(
stats_store_thread.start()
rx_thread = rx.APRSDPluginRXThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
),
) )
process_thread = rx.APRSDPluginProcessPacketThread( server_threads.register(
rx.APRSDPluginProcessPacketThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
),
) )
rx_thread.start()
process_thread.start()
if CONF.enable_beacon: if CONF.enable_beacon:
LOG.info("Beacon Enabled. Starting Beacon thread.") LOG.info("Beacon Enabled. Starting Beacon thread.")
bcn_thread = tx.BeaconSendThread() server_threads.register(tx.BeaconSendThread())
bcn_thread.start()
if CONF.aprs_registry.enabled: if CONF.aprs_registry.enabled:
LOG.info("Registry Enabled. Starting Registry thread.") LOG.info("Registry Enabled. Starting Registry thread.")
registry_thread = registry.APRSRegistryThread() server_threads.register(registry.APRSRegistryThread())
registry_thread.start()
if CONF.admin.web_enabled: server_threads.start()
log_monitor_thread = log_monitor.LogMonitorThread() server_threads.join()
log_monitor_thread.start()
rx_thread.join()
process_thread.join()
return 0 return 0

View File

@ -14,7 +14,6 @@ from flask_socketio import Namespace, SocketIO
from geopy.distance import geodesic from geopy.distance import geodesic
from oslo_config import cfg from oslo_config import cfg
import timeago import timeago
from werkzeug.security import check_password_hash, generate_password_hash
import wrapt import wrapt
import aprsd import aprsd
@ -33,7 +32,6 @@ from aprsd.utils import trace
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger() LOG = logging.getLogger()
auth = HTTPBasicAuth() auth = HTTPBasicAuth()
users = {}
socketio = None socketio = None
# List of callsigns that we don't want to track/fetch their location # List of callsigns that we don't want to track/fetch their location
@ -122,17 +120,6 @@ class SentMessages:
self.data[id]["reply"] = packet self.data[id]["reply"] = packet
# HTTPBasicAuth doesn't work on a class method.
# This has to be out here. Rely on the APRSDFlask
# class to initialize the users from the config
@auth.verify_password
def verify_password(username, password):
global users
if username in users and check_password_hash(users[username], password):
return username
def _build_location_from_repeat(message): def _build_location_from_repeat(message):
# This is a location message Format is # This is a location message Format is
# ^ld^callsign:latitude,longitude,altitude,course,speed,timestamp # ^ld^callsign:latitude,longitude,altitude,course,speed,timestamp
@ -340,10 +327,6 @@ class LocationProcessingThread(aprsd_threads.APRSDThread):
pass pass
def set_config():
global users
def _get_transport(stats): def _get_transport(stats):
if CONF.aprs_network.enabled: if CONF.aprs_network.enabled:
transport = "aprs-is" transport = "aprs-is"
@ -603,8 +586,6 @@ def webchat(ctx, flush, port):
LOG.info(f"APRSD Started version: {aprsd.__version__}") LOG.info(f"APRSD Started version: {aprsd.__version__}")
CONF.log_opt_values(logging.getLogger(), logging.DEBUG) CONF.log_opt_values(logging.getLogger(), logging.DEBUG)
user = CONF.admin.user
users[user] = generate_password_hash(CONF.admin.password)
if not port: if not port:
port = CONF.webchat.web_port port = CONF.webchat.web_port

View File

@ -7,10 +7,6 @@ home = str(Path.home())
DEFAULT_CONFIG_DIR = f"{home}/.config/aprsd/" DEFAULT_CONFIG_DIR = f"{home}/.config/aprsd/"
APRSD_DEFAULT_MAGIC_WORD = "CHANGEME!!!" APRSD_DEFAULT_MAGIC_WORD = "CHANGEME!!!"
admin_group = cfg.OptGroup(
name="admin",
title="Admin web interface settings",
)
watch_list_group = cfg.OptGroup( watch_list_group = cfg.OptGroup(
name="watch_list", name="watch_list",
title="Watch List settings", title="Watch List settings",
@ -178,35 +174,6 @@ watch_list_opts = [
), ),
] ]
admin_opts = [
cfg.BoolOpt(
"web_enabled",
default=False,
help="Enable the Admin Web Interface",
),
cfg.StrOpt(
"web_ip",
default="0.0.0.0",
help="The ip address to listen on",
),
cfg.PortOpt(
"web_port",
default=8001,
help="The port to listen on",
),
cfg.StrOpt(
"user",
default="admin",
help="The admin user for the admin web interface",
),
cfg.StrOpt(
"password",
default="password",
secret=True,
help="Admin interface password",
),
]
enabled_plugins_opts = [ enabled_plugins_opts = [
cfg.ListOpt( cfg.ListOpt(
@ -292,8 +259,6 @@ registry_opts = [
def register_opts(config): def register_opts(config):
config.register_opts(aprsd_opts) config.register_opts(aprsd_opts)
config.register_opts(enabled_plugins_opts) config.register_opts(enabled_plugins_opts)
config.register_group(admin_group)
config.register_opts(admin_opts, group=admin_group)
config.register_group(watch_list_group) config.register_group(watch_list_group)
config.register_opts(watch_list_opts, group=watch_list_group) config.register_opts(watch_list_opts, group=watch_list_group)
config.register_group(webchat_group) config.register_group(webchat_group)
@ -305,7 +270,6 @@ def register_opts(config):
def list_opts(): def list_opts():
return { return {
"DEFAULT": (aprsd_opts + enabled_plugins_opts), "DEFAULT": (aprsd_opts + enabled_plugins_opts),
admin_group.name: admin_opts,
watch_list_group.name: watch_list_opts, watch_list_group.name: watch_list_opts,
webchat_group.name: webchat_opts, webchat_group.name: webchat_opts,
registry_group.name: registry_opts, registry_group.name: registry_opts,

View File

@ -1,5 +1,4 @@
import logging import logging
from logging.handlers import QueueHandler
import queue import queue
import sys import sys
@ -122,16 +121,16 @@ def setup_logging(loglevel=None, quiet=False):
for name in imap_list: for name in imap_list:
logging.getLogger(name).propagate = True logging.getLogger(name).propagate = True
if CONF.admin.web_enabled: # if CONF.admin.web_enabled:
qh = QueueHandler(logging_queue) # qh = QueueHandler(logging_queue)
handlers.append( # handlers.append(
{ # {
"sink": qh, "serialize": False, # "sink": qh, "serialize": False,
"format": CONF.logging.logformat, # "format": CONF.logging.logformat,
"level": log_level, # "level": log_level,
"colorize": False, # "colorize": False,
}, # },
) # )
# configure loguru # configure loguru
logger.configure(handlers=handlers) logger.configure(handlers=handlers)

View File

@ -54,7 +54,7 @@ def cli(ctx):
def load_commands(): def load_commands():
from .cmds import ( # noqa from .cmds import ( # noqa
admin, completion, dev, fetch_stats, healthcheck, list_plugins, listen, completion, dev, fetch_stats, healthcheck, list_plugins, listen,
send_message, server, webchat, send_message, server, webchat,
) )

View File

@ -1,121 +0,0 @@
import datetime
import logging
import threading
from oslo_config import cfg
import requests
import wrapt
from aprsd import threads
from aprsd.log import log
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
def send_log_entries(force=False):
"""Send all of the log entries to the web interface."""
if CONF.admin.web_enabled:
if force or LogEntries().is_purge_ready():
entries = LogEntries().get_all_and_purge()
if entries:
try:
requests.post(
f"http://{CONF.admin.web_ip}:{CONF.admin.web_port}/log_entries",
json=entries,
auth=(CONF.admin.user, CONF.admin.password),
)
except Exception:
LOG.warning(f"Failed to send log entries. len={len(entries)}")
class LogEntries:
entries = []
lock = threading.Lock()
_instance = None
last_purge = datetime.datetime.now()
max_delta = datetime.timedelta(
hours=0.0, minutes=0, seconds=2,
)
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def stats(self) -> dict:
return {
"log_entries": self.entries,
}
@wrapt.synchronized(lock)
def add(self, entry):
self.entries.append(entry)
@wrapt.synchronized(lock)
def get_all_and_purge(self):
entries = self.entries.copy()
self.entries = []
self.last_purge = datetime.datetime.now()
return entries
def is_purge_ready(self):
now = datetime.datetime.now()
if (
now - self.last_purge > self.max_delta
and len(self.entries) > 1
):
return True
return False
@wrapt.synchronized(lock)
def __len__(self):
return len(self.entries)
class LogMonitorThread(threads.APRSDThread):
def __init__(self):
super().__init__("LogMonitorThread")
def stop(self):
send_log_entries(force=True)
super().stop()
def loop(self):
try:
record = log.logging_queue.get(block=True, timeout=2)
if isinstance(record, list):
for item in record:
entry = self.json_record(item)
LogEntries().add(entry)
else:
entry = self.json_record(record)
LogEntries().add(entry)
except Exception:
# Just ignore thi
pass
send_log_entries()
return True
def json_record(self, record):
entry = {}
entry["filename"] = record.filename
entry["funcName"] = record.funcName
entry["levelname"] = record.levelname
entry["lineno"] = record.lineno
entry["module"] = record.module
entry["name"] = record.name
entry["pathname"] = record.pathname
entry["process"] = record.process
entry["processName"] = record.processName
if hasattr(record, "stack_info"):
entry["stack_info"] = record.stack_info
else:
entry["stack_info"] = None
entry["thread"] = record.thread
entry["threadName"] = record.threadName
entry["message"] = record.getMessage()
return entry

View File

@ -1,322 +0,0 @@
import datetime
import importlib.metadata as imp
import io
import json
import logging
import os
import queue
import flask
from flask import Flask, request
from flask_httpauth import HTTPBasicAuth
from oslo_config import cfg, generator
import socketio
from werkzeug.security import check_password_hash
import aprsd
from aprsd import cli_helper, client, conf, packets, plugin, threads
from aprsd.log import log
from aprsd.threads import stats as stats_threads
from aprsd.utils import json as aprsd_json
CONF = cfg.CONF
LOG = logging.getLogger("gunicorn.access")
logging_queue = queue.Queue()
# ADMIN_COMMAND True means we are running from `aprsd admin`
# the `aprsd admin` command will import this file after setting
# the APRSD_ADMIN_COMMAND environment variable.
ADMIN_COMMAND = os.environ.get("APRSD_ADMIN_COMMAND", False)
auth = HTTPBasicAuth()
users: dict[str, str] = {}
app = Flask(
"aprsd",
static_url_path="/static",
static_folder="web/admin/static",
template_folder="web/admin/templates",
)
bg_thread = None
app.config["SECRET_KEY"] = "secret!"
# HTTPBasicAuth doesn't work on a class method.
# This has to be out here. Rely on the APRSDFlask
# class to initialize the users from the config
@auth.verify_password
def verify_password(username, password):
global users
if username in users and check_password_hash(users.get(username), password):
return username
def _stats():
stats_obj = stats_threads.StatsStore()
stats_obj.load()
now = datetime.datetime.now()
time_format = "%m-%d-%Y %H:%M:%S"
stats = {
"time": now.strftime(time_format),
"stats": stats_obj.data,
}
return stats
@app.route("/stats")
def stats():
LOG.debug("/stats called")
return json.dumps(_stats(), cls=aprsd_json.SimpleJSONEncoder)
@app.route("/")
def index():
stats = _stats()
pm = plugin.PluginManager()
plugins = pm.get_plugins()
plugin_count = len(plugins)
client_stats = stats["stats"].get("APRSClientStats", {})
if CONF.aprs_network.enabled:
transport = "aprs-is"
if client_stats:
aprs_connection = client_stats.get("server_string", "")
else:
aprs_connection = "APRS-IS"
aprs_connection = (
"APRS-IS Server: <a href='http://status.aprs2.net' >"
"{}</a>".format(aprs_connection)
)
else:
# We might be connected to a KISS socket?
if client.KISSClient.kiss_enabled():
transport = client.KISSClient.transport()
if transport == client.TRANSPORT_TCPKISS:
aprs_connection = (
"TCPKISS://{}:{}".format(
CONF.kiss_tcp.host,
CONF.kiss_tcp.port,
)
)
elif transport == client.TRANSPORT_SERIALKISS:
aprs_connection = (
"SerialKISS://{}@{} baud".format(
CONF.kiss_serial.device,
CONF.kiss_serial.baudrate,
)
)
if client_stats:
stats["stats"]["APRSClientStats"]["transport"] = transport
stats["stats"]["APRSClientStats"]["aprs_connection"] = aprs_connection
entries = conf.conf_to_dict()
thread_info = stats["stats"].get("APRSDThreadList", {})
if thread_info:
thread_count = len(thread_info)
else:
thread_count = "unknown"
return flask.render_template(
"index.html",
initial_stats=json.dumps(stats, cls=aprsd_json.SimpleJSONEncoder),
aprs_connection=aprs_connection,
callsign=CONF.callsign,
version=aprsd.__version__,
config_json=json.dumps(
entries, indent=4,
sort_keys=True, default=str,
),
plugin_count=plugin_count,
thread_count=thread_count,
# oslo_out=generate_oslo()
)
@auth.login_required
def messages():
track = packets.PacketTrack()
msgs = []
for id in track:
LOG.info(track[id].dict())
msgs.append(track[id].dict())
return flask.render_template("messages.html", messages=json.dumps(msgs))
@auth.login_required
@app.route("/packets")
def get_packets():
stats = _stats()
stats_dict = stats["stats"]
packets = stats_dict.get("PacketList", {})
return json.dumps(packets, cls=aprsd_json.SimpleJSONEncoder)
@auth.login_required
@app.route("/plugins")
def plugins():
LOG.debug("/plugins called")
pm = plugin.PluginManager()
pm.reload_plugins()
return "reloaded"
def _get_namespaces():
args = []
all = imp.entry_points()
selected = []
if "oslo.config.opts" in all:
for x in all["oslo.config.opts"]:
if x.group == "oslo.config.opts":
selected.append(x)
for entry in selected:
if "aprsd" in entry.name:
args.append("--namespace")
args.append(entry.name)
return args
def generate_oslo():
CONF.namespace = _get_namespaces()
string_out = io.StringIO()
generator.generate(CONF, string_out)
return string_out.getvalue()
@auth.login_required
@app.route("/oslo")
def oslo():
return generate_oslo()
@auth.login_required
@app.route("/save")
def save():
"""Save the existing queue to disk."""
track = packets.PacketTrack()
track.save()
return json.dumps({"messages": "saved"})
@app.route("/log_entries", methods=["POST"])
def log_entries():
"""The url that the server can call to update the logs."""
entries = request.json
LOG.info(f"Log entries called {len(entries)}")
for entry in entries:
logging_queue.put(entry)
return json.dumps({"messages": "saved"})
class LogUpdateThread(threads.APRSDThread):
def __init__(self, logging_queue=None):
super().__init__("LogUpdate")
self.logging_queue = logging_queue
def loop(self):
if sio:
try:
log_entry = self.logging_queue.get(block=True, timeout=1)
if log_entry:
sio.emit(
"log_entry",
log_entry,
namespace="/logs",
)
except queue.Empty:
pass
return True
class LoggingNamespace(socketio.Namespace):
log_thread = None
def on_connect(self, sid, environ):
global sio, logging_queue
LOG.info(f"LOG on_connect {sid}")
sio.emit(
"connected", {"data": "/logs Connected"},
namespace="/logs",
)
self.log_thread = LogUpdateThread(logging_queue=logging_queue)
self.log_thread.start()
def on_disconnect(self, sid):
LOG.info(f"LOG Disconnected {sid}")
if self.log_thread:
self.log_thread.stop()
def init_app(config_file=None, log_level=None):
default_config_file = cli_helper.DEFAULT_CONFIG_FILE
if not config_file:
config_file = default_config_file
CONF(
[], project="aprsd", version=aprsd.__version__,
default_config_files=[config_file],
)
if not log_level:
log_level = CONF.logging.log_level
return log_level
if __name__ == "__main__":
async_mode = "threading"
sio = socketio.Server(logger=True, async_mode=async_mode)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)
log_level = init_app()
log.setup_logging(log_level)
sio.register_namespace(LoggingNamespace("/logs"))
CONF.log_opt_values(LOG, logging.DEBUG)
app.run(
threaded=True,
debug=False,
port=CONF.admin.web_port,
host=CONF.admin.web_ip,
)
if __name__ == "uwsgi_file_aprsd_wsgi":
# Start with
# uwsgi --http :8000 --gevent 1000 --http-websockets --master -w aprsd.wsgi --callable app
async_mode = "gevent_uwsgi"
sio = socketio.Server(logger=True, async_mode=async_mode)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)
log_level = init_app(
# log_level="DEBUG",
config_file="/config/aprsd.conf",
# Commented out for local development.
# config_file=cli_helper.DEFAULT_CONFIG_FILE
)
log.setup_logging(log_level)
sio.register_namespace(LoggingNamespace("/logs"))
CONF.log_opt_values(LOG, logging.DEBUG)
if __name__ == "aprsd.wsgi" and not ADMIN_COMMAND:
# set async_mode to 'threading', 'eventlet', 'gevent' or 'gevent_uwsgi' to
# force a mode else, the best mode is selected automatically from what's
# installed
async_mode = "gevent_uwsgi"
sio = socketio.Server(logger=True, async_mode=async_mode)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)
log_level = init_app(
# log_level="DEBUG",
config_file="/config/aprsd.conf",
# config_file=cli_helper.DEFAULT_CONFIG_FILE,
)
log.setup_logging(log_level)
sio.register_namespace(LoggingNamespace("/logs"))
CONF.log_opt_values(LOG, logging.DEBUG)