diff --git a/aprsd/cmds/admin.py b/aprsd/cmds/admin.py deleted file mode 100644 index 6f3a8d9..0000000 --- a/aprsd/cmds/admin.py +++ /dev/null @@ -1,57 +0,0 @@ -import logging -import os -import signal - -import click -from oslo_config import cfg -import socketio - -import aprsd -from aprsd import cli_helper -from aprsd import main as aprsd_main -from aprsd import utils -from aprsd.main import cli - - -os.environ["APRSD_ADMIN_COMMAND"] = "1" -# this import has to happen AFTER we set the -# above environment variable, so that the code -# inside the wsgi.py has the value -from aprsd import wsgi as aprsd_wsgi # noqa - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -# main() ### -@cli.command() -@cli_helper.add_options(cli_helper.common_options) -@click.pass_context -@cli_helper.process_standard_options -def admin(ctx): - """Start the aprsd admin interface.""" - signal.signal(signal.SIGINT, aprsd_main.signal_handler) - signal.signal(signal.SIGTERM, aprsd_main.signal_handler) - - level, msg = utils._check_version() - if level: - LOG.warning(msg) - else: - LOG.info(msg) - LOG.info(f"APRSD Started version: {aprsd.__version__}") - # Dump all the config options now. - CONF.log_opt_values(LOG, logging.DEBUG) - - async_mode = "threading" - sio = socketio.Server(logger=True, async_mode=async_mode) - aprsd_wsgi.app.wsgi_app = socketio.WSGIApp(sio, aprsd_wsgi.app.wsgi_app) - aprsd_wsgi.init_app() - sio.register_namespace(aprsd_wsgi.LoggingNamespace("/logs")) - CONF.log_opt_values(LOG, logging.DEBUG) - aprsd_wsgi.app.run( - threaded=True, - debug=False, - port=CONF.admin.web_port, - host=CONF.admin.web_ip, - ) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index bf418a6..0732a8d 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -13,15 +13,49 @@ from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import seen_list -from aprsd.threads import keep_alive, log_monitor, registry, rx +from aprsd.threads import aprsd as aprsd_threads +from aprsd.threads import keep_alive, registry, rx from aprsd.threads import stats as stats_thread from aprsd.threads import tx +from aprsd.utils import singleton CONF = cfg.CONF LOG = logging.getLogger("APRSD") +@singleton +class ServerThreads: + """Registry for threads that the server command runs. + + This enables extensions to register a thread to run during + the server command. + + """ + def __init__(self): + self.threads: list[aprsd_threads.APRSDThread] = [] + + def register(self, thread: aprsd_threads.APRSDThread): + if not isinstance(thread, aprsd_threads.APRSDThread): + raise TypeError(f"Thread {thread} is not an APRSDThread") + self.threads.append(thread) + + def unregister(self, thread: aprsd_threads.APRSDThread): + if not isinstance(thread, aprsd_threads.APRSDThread): + raise TypeError(f"Thread {thread} is not an APRSDThread") + self.threads.remove(thread) + + def start(self): + """Start all threads in the list.""" + for thread in self.threads: + thread.start() + + def join(self): + """Join all the threads in the list""" + for thread in self.threads: + thread.join() + + # main() ### @cli.command() @cli_helper.add_options(cli_helper.common_options) @@ -41,6 +75,8 @@ def server(ctx, flush): signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler) + server_threads = ServerThreads() + level, msg = utils._check_version() if level: LOG.warning(msg) @@ -110,36 +146,28 @@ def server(ctx, flush): # Now start all the main processing threads. - keepalive = keep_alive.KeepAliveThread() - keepalive.start() - - stats_store_thread = stats_thread.APRSDStatsStoreThread() - stats_store_thread.start() - - rx_thread = rx.APRSDPluginRXThread( - packet_queue=threads.packet_queue, + server_threads.register(keep_alive.KeepAliveThread()) + server_threads.register(stats_thread.APRSDStatsStoreThread()) + server_threads.register( + rx.APRSDPluginRXThread( + packet_queue=threads.packet_queue, + ), ) - process_thread = rx.APRSDPluginProcessPacketThread( - packet_queue=threads.packet_queue, + server_threads.register( + rx.APRSDPluginProcessPacketThread( + packet_queue=threads.packet_queue, + ), ) - rx_thread.start() - process_thread.start() if CONF.enable_beacon: LOG.info("Beacon Enabled. Starting Beacon thread.") - bcn_thread = tx.BeaconSendThread() - bcn_thread.start() + server_threads.register(tx.BeaconSendThread()) if CONF.aprs_registry.enabled: LOG.info("Registry Enabled. Starting Registry thread.") - registry_thread = registry.APRSRegistryThread() - registry_thread.start() + server_threads.register(registry.APRSRegistryThread()) - if CONF.admin.web_enabled: - log_monitor_thread = log_monitor.LogMonitorThread() - log_monitor_thread.start() - - rx_thread.join() - process_thread.join() + server_threads.start() + server_threads.join() return 0 diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 26df7c9..143d2e2 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -14,7 +14,6 @@ from flask_socketio import Namespace, SocketIO from geopy.distance import geodesic from oslo_config import cfg import timeago -from werkzeug.security import check_password_hash, generate_password_hash import wrapt import aprsd @@ -33,7 +32,6 @@ from aprsd.utils import trace CONF = cfg.CONF LOG = logging.getLogger() auth = HTTPBasicAuth() -users = {} socketio = None # List of callsigns that we don't want to track/fetch their location @@ -122,17 +120,6 @@ class SentMessages: self.data[id]["reply"] = packet -# HTTPBasicAuth doesn't work on a class method. -# This has to be out here. Rely on the APRSDFlask -# class to initialize the users from the config -@auth.verify_password -def verify_password(username, password): - global users - - if username in users and check_password_hash(users[username], password): - return username - - def _build_location_from_repeat(message): # This is a location message Format is # ^ld^callsign:latitude,longitude,altitude,course,speed,timestamp @@ -340,10 +327,6 @@ class LocationProcessingThread(aprsd_threads.APRSDThread): pass -def set_config(): - global users - - def _get_transport(stats): if CONF.aprs_network.enabled: transport = "aprs-is" @@ -603,8 +586,6 @@ def webchat(ctx, flush, port): LOG.info(f"APRSD Started version: {aprsd.__version__}") CONF.log_opt_values(logging.getLogger(), logging.DEBUG) - user = CONF.admin.user - users[user] = generate_password_hash(CONF.admin.password) if not port: port = CONF.webchat.web_port diff --git a/aprsd/conf/common.py b/aprsd/conf/common.py index 19b02aa..c713fd8 100644 --- a/aprsd/conf/common.py +++ b/aprsd/conf/common.py @@ -7,10 +7,6 @@ home = str(Path.home()) DEFAULT_CONFIG_DIR = f"{home}/.config/aprsd/" APRSD_DEFAULT_MAGIC_WORD = "CHANGEME!!!" -admin_group = cfg.OptGroup( - name="admin", - title="Admin web interface settings", -) watch_list_group = cfg.OptGroup( name="watch_list", title="Watch List settings", @@ -178,35 +174,6 @@ watch_list_opts = [ ), ] -admin_opts = [ - cfg.BoolOpt( - "web_enabled", - default=False, - help="Enable the Admin Web Interface", - ), - cfg.StrOpt( - "web_ip", - default="0.0.0.0", - help="The ip address to listen on", - ), - cfg.PortOpt( - "web_port", - default=8001, - help="The port to listen on", - ), - cfg.StrOpt( - "user", - default="admin", - help="The admin user for the admin web interface", - ), - cfg.StrOpt( - "password", - default="password", - secret=True, - help="Admin interface password", - ), -] - enabled_plugins_opts = [ cfg.ListOpt( @@ -292,8 +259,6 @@ registry_opts = [ def register_opts(config): config.register_opts(aprsd_opts) config.register_opts(enabled_plugins_opts) - config.register_group(admin_group) - config.register_opts(admin_opts, group=admin_group) config.register_group(watch_list_group) config.register_opts(watch_list_opts, group=watch_list_group) config.register_group(webchat_group) @@ -305,7 +270,6 @@ def register_opts(config): def list_opts(): return { "DEFAULT": (aprsd_opts + enabled_plugins_opts), - admin_group.name: admin_opts, watch_list_group.name: watch_list_opts, webchat_group.name: webchat_opts, registry_group.name: registry_opts, diff --git a/aprsd/log/log.py b/aprsd/log/log.py index e5c20d5..e50a38f 100644 --- a/aprsd/log/log.py +++ b/aprsd/log/log.py @@ -1,5 +1,4 @@ import logging -from logging.handlers import QueueHandler import queue import sys @@ -122,16 +121,16 @@ def setup_logging(loglevel=None, quiet=False): for name in imap_list: logging.getLogger(name).propagate = True - if CONF.admin.web_enabled: - qh = QueueHandler(logging_queue) - handlers.append( - { - "sink": qh, "serialize": False, - "format": CONF.logging.logformat, - "level": log_level, - "colorize": False, - }, - ) + # if CONF.admin.web_enabled: + # qh = QueueHandler(logging_queue) + # handlers.append( + # { + # "sink": qh, "serialize": False, + # "format": CONF.logging.logformat, + # "level": log_level, + # "colorize": False, + # }, + # ) # configure loguru logger.configure(handlers=handlers) diff --git a/aprsd/main.py b/aprsd/main.py index 7609bc0..669f254 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -54,7 +54,7 @@ def cli(ctx): def load_commands(): from .cmds import ( # noqa - admin, completion, dev, fetch_stats, healthcheck, list_plugins, listen, + completion, dev, fetch_stats, healthcheck, list_plugins, listen, send_message, server, webchat, ) diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py deleted file mode 100644 index f4a17ca..0000000 --- a/aprsd/threads/log_monitor.py +++ /dev/null @@ -1,121 +0,0 @@ -import datetime -import logging -import threading - -from oslo_config import cfg -import requests -import wrapt - -from aprsd import threads -from aprsd.log import log - - -CONF = cfg.CONF -LOG = logging.getLogger("APRSD") - - -def send_log_entries(force=False): - """Send all of the log entries to the web interface.""" - if CONF.admin.web_enabled: - if force or LogEntries().is_purge_ready(): - entries = LogEntries().get_all_and_purge() - if entries: - try: - requests.post( - f"http://{CONF.admin.web_ip}:{CONF.admin.web_port}/log_entries", - json=entries, - auth=(CONF.admin.user, CONF.admin.password), - ) - except Exception: - LOG.warning(f"Failed to send log entries. len={len(entries)}") - - -class LogEntries: - entries = [] - lock = threading.Lock() - _instance = None - last_purge = datetime.datetime.now() - max_delta = datetime.timedelta( - hours=0.0, minutes=0, seconds=2, - ) - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def stats(self) -> dict: - return { - "log_entries": self.entries, - } - - @wrapt.synchronized(lock) - def add(self, entry): - self.entries.append(entry) - - @wrapt.synchronized(lock) - def get_all_and_purge(self): - entries = self.entries.copy() - self.entries = [] - self.last_purge = datetime.datetime.now() - return entries - - def is_purge_ready(self): - now = datetime.datetime.now() - if ( - now - self.last_purge > self.max_delta - and len(self.entries) > 1 - ): - return True - return False - - @wrapt.synchronized(lock) - def __len__(self): - return len(self.entries) - - -class LogMonitorThread(threads.APRSDThread): - - def __init__(self): - super().__init__("LogMonitorThread") - - def stop(self): - send_log_entries(force=True) - super().stop() - - def loop(self): - try: - record = log.logging_queue.get(block=True, timeout=2) - if isinstance(record, list): - for item in record: - entry = self.json_record(item) - LogEntries().add(entry) - else: - entry = self.json_record(record) - LogEntries().add(entry) - except Exception: - # Just ignore thi - pass - - send_log_entries() - return True - - def json_record(self, record): - entry = {} - entry["filename"] = record.filename - entry["funcName"] = record.funcName - entry["levelname"] = record.levelname - entry["lineno"] = record.lineno - entry["module"] = record.module - entry["name"] = record.name - entry["pathname"] = record.pathname - entry["process"] = record.process - entry["processName"] = record.processName - if hasattr(record, "stack_info"): - entry["stack_info"] = record.stack_info - else: - entry["stack_info"] = None - entry["thread"] = record.thread - entry["threadName"] = record.threadName - entry["message"] = record.getMessage() - return entry diff --git a/aprsd/wsgi.py b/aprsd/wsgi.py deleted file mode 100644 index 47da201..0000000 --- a/aprsd/wsgi.py +++ /dev/null @@ -1,322 +0,0 @@ -import datetime -import importlib.metadata as imp -import io -import json -import logging -import os -import queue - -import flask -from flask import Flask, request -from flask_httpauth import HTTPBasicAuth -from oslo_config import cfg, generator -import socketio -from werkzeug.security import check_password_hash - -import aprsd -from aprsd import cli_helper, client, conf, packets, plugin, threads -from aprsd.log import log -from aprsd.threads import stats as stats_threads -from aprsd.utils import json as aprsd_json - - -CONF = cfg.CONF -LOG = logging.getLogger("gunicorn.access") -logging_queue = queue.Queue() - - -# ADMIN_COMMAND True means we are running from `aprsd admin` -# the `aprsd admin` command will import this file after setting -# the APRSD_ADMIN_COMMAND environment variable. -ADMIN_COMMAND = os.environ.get("APRSD_ADMIN_COMMAND", False) - -auth = HTTPBasicAuth() -users: dict[str, str] = {} -app = Flask( - "aprsd", - static_url_path="/static", - static_folder="web/admin/static", - template_folder="web/admin/templates", -) -bg_thread = None -app.config["SECRET_KEY"] = "secret!" - - -# HTTPBasicAuth doesn't work on a class method. -# This has to be out here. Rely on the APRSDFlask -# class to initialize the users from the config -@auth.verify_password -def verify_password(username, password): - global users - - if username in users and check_password_hash(users.get(username), password): - return username - - -def _stats(): - stats_obj = stats_threads.StatsStore() - stats_obj.load() - now = datetime.datetime.now() - time_format = "%m-%d-%Y %H:%M:%S" - stats = { - "time": now.strftime(time_format), - "stats": stats_obj.data, - } - return stats - - -@app.route("/stats") -def stats(): - LOG.debug("/stats called") - return json.dumps(_stats(), cls=aprsd_json.SimpleJSONEncoder) - - -@app.route("/") -def index(): - stats = _stats() - pm = plugin.PluginManager() - plugins = pm.get_plugins() - plugin_count = len(plugins) - client_stats = stats["stats"].get("APRSClientStats", {}) - - if CONF.aprs_network.enabled: - transport = "aprs-is" - if client_stats: - aprs_connection = client_stats.get("server_string", "") - else: - aprs_connection = "APRS-IS" - aprs_connection = ( - "APRS-IS Server: " - "{}".format(aprs_connection) - ) - else: - # We might be connected to a KISS socket? - if client.KISSClient.kiss_enabled(): - transport = client.KISSClient.transport() - if transport == client.TRANSPORT_TCPKISS: - aprs_connection = ( - "TCPKISS://{}:{}".format( - CONF.kiss_tcp.host, - CONF.kiss_tcp.port, - ) - ) - elif transport == client.TRANSPORT_SERIALKISS: - aprs_connection = ( - "SerialKISS://{}@{} baud".format( - CONF.kiss_serial.device, - CONF.kiss_serial.baudrate, - ) - ) - - if client_stats: - stats["stats"]["APRSClientStats"]["transport"] = transport - stats["stats"]["APRSClientStats"]["aprs_connection"] = aprs_connection - entries = conf.conf_to_dict() - - thread_info = stats["stats"].get("APRSDThreadList", {}) - if thread_info: - thread_count = len(thread_info) - else: - thread_count = "unknown" - - return flask.render_template( - "index.html", - initial_stats=json.dumps(stats, cls=aprsd_json.SimpleJSONEncoder), - aprs_connection=aprs_connection, - callsign=CONF.callsign, - version=aprsd.__version__, - config_json=json.dumps( - entries, indent=4, - sort_keys=True, default=str, - ), - plugin_count=plugin_count, - thread_count=thread_count, - # oslo_out=generate_oslo() - ) - - -@auth.login_required -def messages(): - track = packets.PacketTrack() - msgs = [] - for id in track: - LOG.info(track[id].dict()) - msgs.append(track[id].dict()) - - return flask.render_template("messages.html", messages=json.dumps(msgs)) - - -@auth.login_required -@app.route("/packets") -def get_packets(): - stats = _stats() - stats_dict = stats["stats"] - packets = stats_dict.get("PacketList", {}) - return json.dumps(packets, cls=aprsd_json.SimpleJSONEncoder) - - -@auth.login_required -@app.route("/plugins") -def plugins(): - LOG.debug("/plugins called") - pm = plugin.PluginManager() - pm.reload_plugins() - - return "reloaded" - - -def _get_namespaces(): - args = [] - - all = imp.entry_points() - selected = [] - if "oslo.config.opts" in all: - for x in all["oslo.config.opts"]: - if x.group == "oslo.config.opts": - selected.append(x) - for entry in selected: - if "aprsd" in entry.name: - args.append("--namespace") - args.append(entry.name) - - return args - - -def generate_oslo(): - CONF.namespace = _get_namespaces() - string_out = io.StringIO() - generator.generate(CONF, string_out) - return string_out.getvalue() - - -@auth.login_required -@app.route("/oslo") -def oslo(): - return generate_oslo() - - -@auth.login_required -@app.route("/save") -def save(): - """Save the existing queue to disk.""" - track = packets.PacketTrack() - track.save() - return json.dumps({"messages": "saved"}) - - -@app.route("/log_entries", methods=["POST"]) -def log_entries(): - """The url that the server can call to update the logs.""" - entries = request.json - LOG.info(f"Log entries called {len(entries)}") - for entry in entries: - logging_queue.put(entry) - return json.dumps({"messages": "saved"}) - - -class LogUpdateThread(threads.APRSDThread): - - def __init__(self, logging_queue=None): - super().__init__("LogUpdate") - self.logging_queue = logging_queue - - def loop(self): - if sio: - try: - log_entry = self.logging_queue.get(block=True, timeout=1) - if log_entry: - sio.emit( - "log_entry", - log_entry, - namespace="/logs", - ) - except queue.Empty: - pass - return True - - -class LoggingNamespace(socketio.Namespace): - log_thread = None - - def on_connect(self, sid, environ): - global sio, logging_queue - LOG.info(f"LOG on_connect {sid}") - sio.emit( - "connected", {"data": "/logs Connected"}, - namespace="/logs", - ) - self.log_thread = LogUpdateThread(logging_queue=logging_queue) - self.log_thread.start() - - def on_disconnect(self, sid): - LOG.info(f"LOG Disconnected {sid}") - if self.log_thread: - self.log_thread.stop() - - -def init_app(config_file=None, log_level=None): - default_config_file = cli_helper.DEFAULT_CONFIG_FILE - if not config_file: - config_file = default_config_file - - CONF( - [], project="aprsd", version=aprsd.__version__, - default_config_files=[config_file], - ) - - if not log_level: - log_level = CONF.logging.log_level - - return log_level - - -if __name__ == "__main__": - async_mode = "threading" - sio = socketio.Server(logger=True, async_mode=async_mode) - app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app) - log_level = init_app() - log.setup_logging(log_level) - sio.register_namespace(LoggingNamespace("/logs")) - CONF.log_opt_values(LOG, logging.DEBUG) - app.run( - threaded=True, - debug=False, - port=CONF.admin.web_port, - host=CONF.admin.web_ip, - ) - - -if __name__ == "uwsgi_file_aprsd_wsgi": - # Start with - # uwsgi --http :8000 --gevent 1000 --http-websockets --master -w aprsd.wsgi --callable app - - async_mode = "gevent_uwsgi" - sio = socketio.Server(logger=True, async_mode=async_mode) - app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app) - log_level = init_app( - # log_level="DEBUG", - config_file="/config/aprsd.conf", - # Commented out for local development. - # config_file=cli_helper.DEFAULT_CONFIG_FILE - ) - log.setup_logging(log_level) - sio.register_namespace(LoggingNamespace("/logs")) - CONF.log_opt_values(LOG, logging.DEBUG) - - -if __name__ == "aprsd.wsgi" and not ADMIN_COMMAND: - # set async_mode to 'threading', 'eventlet', 'gevent' or 'gevent_uwsgi' to - # force a mode else, the best mode is selected automatically from what's - # installed - async_mode = "gevent_uwsgi" - sio = socketio.Server(logger=True, async_mode=async_mode) - app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app) - - log_level = init_app( - # log_level="DEBUG", - config_file="/config/aprsd.conf", - # config_file=cli_helper.DEFAULT_CONFIG_FILE, - ) - log.setup_logging(log_level) - sio.register_namespace(LoggingNamespace("/logs")) - CONF.log_opt_values(LOG, logging.DEBUG)