From ff392395ed9e4eb8fcc46e8ec475ff82f033c45b Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 28 Dec 2022 15:44:49 -0500 Subject: [PATCH] Decouple admin web interface from server command This patch introduces rpyc based RPC client/server for the flask web interface to call into the running aprsd server command to fetch stats, logs, etc to send to the browser. This allows running the web interface via gunicorn command gunicorn -k gevent --reload --threads 10 -w 1 aprsd.flask:app --log-level DEBUG --- aprsd/aprsd.py | 3 + aprsd/cmds/server.py | 28 +- aprsd/conf/common.py | 32 ++ aprsd/flask.py | 597 ++++++++------------ aprsd/rpc_server.py | 90 +++ aprsd/stats.py | 2 +- aprsd/threads/log_monitor.py | 77 +++ aprsd/web/admin/static/js/main.js | 5 - aprsd/web/admin/static/js/send-message.js | 30 - aprsd/web/admin/templates/index.html | 24 - aprsd/web/admin/templates/messages.html | 15 - aprsd/web/admin/templates/send-message.html | 74 --- dev-requirements.txt | 6 +- requirements.in | 3 +- requirements.txt | 2 + 15 files changed, 447 insertions(+), 541 deletions(-) create mode 100644 aprsd/rpc_server.py create mode 100644 aprsd/threads/log_monitor.py delete mode 100644 aprsd/web/admin/templates/messages.html delete mode 100644 aprsd/web/admin/templates/send-message.html diff --git a/aprsd/aprsd.py b/aprsd/aprsd.py index 7021e1b..43d079a 100644 --- a/aprsd/aprsd.py +++ b/aprsd/aprsd.py @@ -40,9 +40,11 @@ from aprsd import cli_helper, packets, stats, threads, utils # setup the global logger # logging.basicConfig(level=logging.DEBUG) # level=10 +CONF = cfg.CONF LOG = logging.getLogger("APRSD") CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) flask_enabled = False +rpc_serv = None def custom_startswith(string, incomplete): @@ -92,6 +94,7 @@ def signal_handler(sig, frame): LOG.info(stats.APRSDStats()) # signal.signal(signal.SIGTERM, sys.exit(0)) # sys.exit(0) + if flask_enabled: signal.signal(signal.SIGTERM, sys.exit(0)) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index d936352..dec6be1 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -6,8 +6,10 @@ import click from oslo_config import cfg import aprsd +from aprsd import ( + cli_helper, client, packets, plugin, rpc_server, threads, utils, +) from aprsd import aprsd as aprsd_main -from aprsd import cli_helper, client, flask, packets, plugin, threads, utils from aprsd.aprsd import cli from aprsd.threads import rx @@ -32,15 +34,9 @@ LOG = logging.getLogger("APRSD") @cli_helper.process_standard_options def server(ctx, flush): """Start the aprsd server gateway process.""" - loglevel = ctx.obj["loglevel"] - quiet = ctx.obj["quiet"] - signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler) - if not quiet: - click.echo("Load config") - level, msg = utils._check_version() if level: LOG.warning(msg) @@ -99,18 +95,10 @@ def server(ctx, flush): keepalive = threads.KeepAliveThread() keepalive.start() - web_enabled = CONF.admin.web_enabled + if CONF.rpc_settings.enabled: + rpc = rpc_server.APRSDRPCThread() + rpc.start() + log_monitor = threads.log_monitor.LogMonitorThread() + log_monitor.start() - if web_enabled: - aprsd_main.flask_enabled = True - (socketio, app) = flask.init_flask(loglevel, quiet) - socketio.run( - app, - allow_unsafe_werkzeug=True, - host=CONF.admin.web_ip, - port=CONF.admin.web_port, - ) - - # If there are items in the msgTracker, then save them - LOG.info("APRSD Exiting.") return 0 diff --git a/aprsd/conf/common.py b/aprsd/conf/common.py index 0691d3c..a24bd55 100644 --- a/aprsd/conf/common.py +++ b/aprsd/conf/common.py @@ -1,6 +1,8 @@ from oslo_config import cfg +APRSD_DEFAULT_MAGIC_WORD = "CHANGEME!!!" + admin_group = cfg.OptGroup( name="admin", title="Admin web interface settings", @@ -9,6 +11,10 @@ watch_list_group = cfg.OptGroup( name="watch_list", title="Watch List settings", ) +rpc_group = cfg.OptGroup( + name="rpc_settings", + title="RPC Settings for admin <--> web", +) aprsd_opts = [ @@ -96,6 +102,29 @@ admin_opts = [ ), ] +rpc_opts = [ + cfg.BoolOpt( + "enabled", + default=True, + help="Enable RPC calls", + ), + cfg.StrOpt( + "ip", + default="localhost", + help="The ip address to listen on", + ), + cfg.PortOpt( + "port", + default=18861, + help="The port to listen on", + ), + cfg.StrOpt( + "magic_word", + default=APRSD_DEFAULT_MAGIC_WORD, + help="Magic word to authenticate requests between client/server", + ), +] + enabled_plugins_opts = [ cfg.ListOpt( "enabled_plugins", @@ -123,6 +152,8 @@ def register_opts(config): config.register_opts(admin_opts, group=admin_group) config.register_group(watch_list_group) config.register_opts(watch_list_opts, group=watch_list_group) + config.register_group(rpc_group) + config.register_opts(rpc_opts, group=rpc_group) def list_opts(): @@ -130,4 +161,5 @@ def list_opts(): "DEFAULT": (aprsd_opts + enabled_plugins_opts), admin_group.name: admin_opts, watch_list_group.name: watch_list_opts, + rpc_group.name: rpc_opts, } diff --git a/aprsd/flask.py b/aprsd/flask.py index 9b1a302..58a2575 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -2,27 +2,21 @@ import datetime import json import logging from logging.handlers import RotatingFileHandler -import threading import time -import aprslib -from aprslib.exceptions import LoginError import flask -from flask import request from flask.logging import default_handler import flask_classful from flask_httpauth import HTTPBasicAuth from flask_socketio import Namespace, SocketIO from oslo_config import cfg +import rpyc from werkzeug.security import check_password_hash, generate_password_hash -import wrapt import aprsd -from aprsd import client, conf, packets, plugin, stats, threads, utils -from aprsd.clients import aprsis -from aprsd.logging import log +from aprsd import cli_helper, client, conf, packets, plugin, threads +from aprsd.conf import common from aprsd.logging import rich as aprsd_logging -from aprsd.threads import tx CONF = cfg.CONF @@ -30,72 +24,20 @@ LOG = logging.getLogger("APRSD") auth = HTTPBasicAuth() users = None +app = None -class SentMessages: - _instance = None - lock = threading.Lock() +class AuthSocketStream(rpyc.SocketStream): + """Used to authenitcate the RPC stream to remote.""" - msgs = {} + @classmethod + def connect(cls, *args, authorizer=None, **kwargs): + stream_obj = super().connect(*args, **kwargs) - def __new__(cls, *args, **kwargs): - """This magic turns this into a singleton.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - # Put any initialization here. - return cls._instance + if callable(authorizer): + authorizer(stream_obj.sock) - @wrapt.synchronized(lock) - def add(self, packet): - self.msgs[packet.msgNo] = self._create(packet.msgNo) - self.msgs[packet.msgNo]["from"] = packet.from_call - self.msgs[packet.msgNo]["to"] = packet.to_call - self.msgs[packet.msgNo]["message"] = packet.message_text.rstrip("\n") - packet._build_raw() - self.msgs[packet.msgNo]["raw"] = packet.raw.rstrip("\n") - - def _create(self, id): - return { - "id": id, - "ts": time.time(), - "ack": False, - "from": None, - "to": None, - "raw": None, - "message": None, - "status": None, - "last_update": None, - "reply": None, - } - - @wrapt.synchronized(lock) - def __len__(self): - return len(self.msgs.keys()) - - @wrapt.synchronized(lock) - def get(self, id): - if id in self.msgs: - return self.msgs[id] - - @wrapt.synchronized(lock) - def get_all(self): - return self.msgs - - @wrapt.synchronized(lock) - def set_status(self, id, status): - self.msgs[id]["last_update"] = str(datetime.datetime.now()) - self.msgs[id]["status"] = status - - @wrapt.synchronized(lock) - def ack(self, id): - """The message got an ack!""" - self.msgs[id]["last_update"] = str(datetime.datetime.now()) - self.msgs[id]["ack"] = True - - @wrapt.synchronized(lock) - def reply(self, id, packet): - """We got a packet back from the sent message.""" - self.msgs[id]["reply"] = packet + return stream_obj # HTTPBasicAuth doesn't work on a class method. @@ -109,174 +51,129 @@ def verify_password(username, password): return username -class SendMessageThread(threads.APRSDRXThread): - """Thread for sending a message from web.""" +class RPCClient: + _instance = None + _rpc_client = None - aprsis_client = None - request = None - got_ack = False - got_reply = False + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance - def __init__(self, info, packet, namespace): - self.request = info - self.packet = packet - self.namespace = namespace - self.start_time = datetime.datetime.now() - msg = "({} -> {}) : {}".format( - info["from"], - info["to"], - info["message"], - ) - super().__init__(f"WEB_SEND_MSG-{msg}") + def __init__(self): + self._check_settings() + self.get_rpc_client() - def setup_connection(self): - user = self.request["from"] - password = self.request["password"] - host = CONF.aprs_network.host - port = CONF.aprs_network.port - connected = False - backoff = 1 - while not connected: - try: - LOG.info("Creating aprslib client") + def _check_settings(self): + if not CONF.rpc_settings.enabled: + LOG.error("RPC is not enabled, no way to get stats!!") - aprs_client = aprsis.Aprsdis( - user, - passwd=password, - host=host, - port=port, - ) - # Force the logging to be the same - aprs_client.logger = LOG - aprs_client.connect() - connected = True - backoff = 1 - except LoginError as e: - LOG.error(f"Failed to login to APRS-IS Server '{e}'") - connected = False - raise e - except Exception as e: - LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") - time.sleep(backoff) - backoff = backoff * 2 - continue - LOG.debug(f"Logging in to APRS-IS with user '{user}'") - return aprs_client + if CONF.rpc_settings.magic_word == common.APRSD_DEFAULT_MAGIC_WORD: + LOG.warning("You are using the default RPC magic word!!!") + LOG.warning("edit aprsd.conf and change rpc_settings.magic_word") - def run(self): - LOG.debug("Starting") - from_call = self.request["from"] - to_call = self.request["to"] - message = self.request["message"] - LOG.info( - "From: '{}' To: '{}' Send '{}'".format( - from_call, - to_call, - message, - ), - ) + def _rpyc_connect( + self, host, port, + service=rpyc.VoidService, + config={}, ipv6=False, + keepalive=False, authorizer=None, + ): + print(f"Connecting to RPC host {host}:{port}") try: - self.aprs_client = self.setup_connection() - except LoginError as e: - f"Failed to setup Connection {e}" - - tx.send( - self.packet, - direct=True, - aprs_client=self.aprs_client, - ) - SentMessages().set_status(self.packet.msgNo, "Sent") - - while not self.thread_stop: - can_loop = self.loop() - if not can_loop: - self.stop() - threads.APRSDThreadList().remove(self) - LOG.debug("Exiting") - - def process_ack_packet(self, packet): - global socketio - ack_num = packet.msgNo - LOG.info(f"We got ack for our sent message {ack_num}") - packet.log("RXACK") - SentMessages().ack(self.packet.msgNo) - stats.APRSDStats().ack_rx_inc() - socketio.emit( - "ack", SentMessages().get(self.packet.msgNo), - namespace="/sendmsg", - ) - if self.request["wait_reply"] == "0" or self.got_reply: - # We aren't waiting for a reply, so we can bail - self.stop() - self.thread_stop = self.aprs_client.thread_stop = True - - def process_our_message_packet(self, packet): - global socketio - packets.PacketList().rx(packet) - stats.APRSDStats().msgs_rx_inc() - msg_number = packet.msgNo - SentMessages().reply(self.packet.msgNo, packet) - SentMessages().set_status(self.packet.msgNo, "Got Reply") - socketio.emit( - "reply", SentMessages().get(self.packet.msgNo), - namespace="/sendmsg", - ) - tx.send( - packets.AckPacket( - from_call=self.request["from"], - to_call=packet.from_call, - msgNo=msg_number, - ), - direct=True, - aprs_client=self.aprsis_client, - ) - SentMessages().set_status(self.packet.msgNo, "Ack Sent") - - # Now we can exit, since we are done. - self.got_reply = True - if self.got_ack: - self.stop() - self.thread_stop = self.aprs_client.thread_stop = True - - def process_packet(self, *args, **kwargs): - packet = self._client.decode_packet(*args, **kwargs) - packet.log(header="RX Packet") - - if isinstance(packet, packets.AckPacket): - self.process_ack_packet(packet) - else: - self.process_our_message_packet(packet) - - def loop(self): - # we have a general time limit expecting results of - # around 120 seconds before we exit - now = datetime.datetime.now() - start_delta = str(now - self.start_time) - delta = utils.parse_delta_str(start_delta) - d = datetime.timedelta(**delta) - max_timeout = {"hours": 0.0, "minutes": 1, "seconds": 0} - max_delta = datetime.timedelta(**max_timeout) - if d > max_delta: - LOG.error("XXXXXX Haven't completed everything in 60 seconds. BAIL!") - return False - - if self.got_ack and self.got_reply: - LOG.warning("We got everything already. BAIL") - return False - - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - self.aprs_client.consumer( - self.process_packet, raw=False, blocking=False, + s = AuthSocketStream.connect( + host, port, ipv6=ipv6, keepalive=keepalive, + authorizer=authorizer, ) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped.") - return False + return rpyc.utils.factory.connect_stream(s, service, config=config) + except ConnectionRefusedError: + LOG.error(f"Failed to connect to RPC host {host}") + return None - return True + def get_rpc_client(self): + if not self._rpc_client: + magic = CONF.rpc_settings.magic_word + self._rpc_client = self._rpyc_connect( + CONF.rpc_settings.ip, + CONF.rpc_settings.port, + authorizer=lambda sock: sock.send(magic.encode()), + ) + return self._rpc_client + + def get_stats_dict(self): + cl = self.get_rpc_client() + result = {} + if not cl: + return result + + try: + rpc_stats_dict = cl.root.get_stats() + result = json.loads(rpc_stats_dict) + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_packet_track(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_packet_track() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_packet_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_packet_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_watch_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_watch_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_seen_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_seen_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_log_entries(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result_str = cl.root.get_log_entries() + result = json.loads(result_str) + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result class APRSDFlask(flask_classful.FlaskView): @@ -291,21 +188,25 @@ class APRSDFlask(flask_classful.FlaskView): @auth.login_required def index(self): stats = self._stats() + print(stats) LOG.debug( "watch list? {}".format( CONF.watch_list.callsigns, ), ) - wl = packets.WatchList() - if wl.is_enabled(): + wl = RPCClient().get_watch_list() + if wl and wl.is_enabled(): watch_count = len(wl) watch_age = wl.max_delta() else: watch_count = 0 watch_age = 0 - sl = packets.SeenList() - seen_count = len(sl) + sl = RPCClient().get_seen_list() + if sl: + seen_count = len(sl) + else: + seen_count = 0 pm = plugin.PluginManager() plugins = pm.get_plugins() @@ -346,7 +247,10 @@ class APRSDFlask(flask_classful.FlaskView): aprs_connection=aprs_connection, callsign=CONF.callsign, version=aprsd.__version__, - config_json=json.dumps(entries), + config_json=json.dumps( + entries, indent=4, + sort_keys=True, default=str, + ), watch_count=watch_count, watch_age=watch_age, seen_count=seen_count, @@ -363,31 +267,18 @@ class APRSDFlask(flask_classful.FlaskView): return flask.render_template("messages.html", messages=json.dumps(msgs)) - @auth.login_required - def send_message_status(self): - LOG.debug(request) - msgs = SentMessages() - info = msgs.get_all() - return json.dumps(info) - - @auth.login_required - def send_message(self): - LOG.debug(request) - if request.method == "GET": - return flask.render_template( - "send-message.html", - callsign=CONF.callsign, - version=aprsd.__version__, - ) - @auth.login_required def packets(self): - packet_list = packets.PacketList().get() - tmp_list = [] - for pkt in packet_list: - tmp_list.append(pkt.json) + packet_list = RPCClient().get_packet_list() + if packet_list: + packets = packet_list.get() + tmp_list = [] + for pkt in packets: + tmp_list.append(pkt.json) - return json.dumps(tmp_list) + return json.dumps(tmp_list) + else: + return json.dumps([]) @auth.login_required def plugins(self): @@ -404,39 +295,69 @@ class APRSDFlask(flask_classful.FlaskView): return json.dumps({"messages": "saved"}) def _stats(self): - stats_obj = stats.APRSDStats() - track = packets.PacketTrack() + track = RPCClient().get_packet_track() now = datetime.datetime.now() time_format = "%m-%d-%Y %H:%M:%S" - stats_dict = stats_obj.stats() - - # Convert the watch_list entries to age - wl = packets.WatchList() - new_list = {} - for call in wl.get_all(): - # call_date = datetime.datetime.strptime( - # str(wl.last_seen(call)), - # "%Y-%m-%d %H:%M:%S.%f", - # ) - new_list[call] = { - "last": wl.age(call), - "packets": wl.get(call)["packets"].get(), + stats_dict = RPCClient().get_stats_dict() + if not stats_dict: + stats_dict = { + "aprsd": {}, + "aprs-is": {"server": ""}, + "messages": { + "sent": 0, + "received": 0, + }, + "email": { + "sent": 0, + "received": 0, + }, + "seen_list": { + "sent": 0, + "received": 0, + }, } + # Convert the watch_list entries to age + wl = RPCClient().get_watch_list() + new_list = {} + if wl: + for call in wl.get_all(): + # call_date = datetime.datetime.strptime( + # str(wl.last_seen(call)), + # "%Y-%m-%d %H:%M:%S.%f", + # ) + + # We have to convert the RingBuffer to a real list + # so that json.dumps works. + # pkts = [] + # for pkt in wl.get(call)["packets"].get(): + # pkts.append(pkt) + + new_list[call] = { + "last": wl.age(call), + # "packets": pkts + } + stats_dict["aprsd"]["watch_list"] = new_list - packet_list = packets.PacketList() - rx = packet_list.total_rx() - tx = packet_list.total_tx() + packet_list = RPCClient().get_packet_list() + rx = tx = 0 + if packet_list: + rx = packet_list.total_rx() + tx = packet_list.total_tx() stats_dict["packets"] = { "sent": tx, "received": rx, } + if track: + size_tracker = len(track) + else: + size_tracker = 0 result = { "time": now.strftime(time_format), - "size_tracker": len(track), + "size_tracker": size_tracker, "stats": stats_dict, } @@ -446,116 +367,44 @@ class APRSDFlask(flask_classful.FlaskView): return json.dumps(self._stats()) -class SendMessageNamespace(Namespace): - got_ack = False - reply_sent = False - packet = None - request = None - - def __init__(self, namespace=None): - super().__init__(namespace) - - def on_connect(self): - global socketio - LOG.debug("Web socket connected") - socketio.emit( - "connected", {"data": "/sendmsg Connected"}, - namespace="/sendmsg", - ) - - def on_disconnect(self): - LOG.debug("WS Disconnected") - - def on_send(self, data): - global socketio - LOG.debug(f"WS: on_send {data}") - self.request = data - self.packet = packets.MessagePacket( - from_call=data["from"], - to_call=data["to"], - message_text=data["message"], - ) - msgs = SentMessages() - msgs.add(self.packet) - msgs.set_status(self.packet.msgNo, "Sending") - socketio.emit( - "sent", SentMessages().get(self.packet.msgNo), - namespace="/sendmsg", - ) - - socketio.start_background_task( - self._start, data, - self.packet, self, - ) - LOG.warning("WS: on_send: exit") - - def _start(self, data, packet, namespace): - msg_thread = SendMessageThread(data, packet, self) - msg_thread.start() - - def handle_message(self, data): - LOG.debug(f"WS Data {data}") - - def handle_json(self, data): - LOG.debug(f"WS json {data}") - - -class LogMonitorThread(threads.APRSDThread): +class LogUpdateThread(threads.APRSDThread): def __init__(self): - super().__init__("LogMonitorThread") + super().__init__("LogUpdate") def loop(self): global socketio - try: - record = log.logging_queue.get(block=True, timeout=5) - json_record = self.json_record(record) - socketio.emit( - "log_entry", json_record, - namespace="/logs", - ) - except Exception: - # Just ignore thi - pass + if socketio: + log_entries = RPCClient().get_log_entries() + + if log_entries: + for entry in log_entries: + socketio.emit( + "log_entry", entry, + namespace="/logs", + ) + + time.sleep(5) return True - def json_record(self, record): - entry = {} - entry["filename"] = record.filename - entry["funcName"] = record.funcName - entry["levelname"] = record.levelname - entry["lineno"] = record.lineno - entry["module"] = record.module - entry["name"] = record.name - entry["pathname"] = record.pathname - entry["process"] = record.process - entry["processName"] = record.processName - if hasattr(record, "stack_info"): - entry["stack_info"] = record.stack_info - else: - entry["stack_info"] = None - entry["thread"] = record.thread - entry["threadName"] = record.threadName - entry["message"] = record.getMessage() - return entry - class LoggingNamespace(Namespace): + log_thread = None def on_connect(self): global socketio - LOG.debug("Web socket connected") socketio.emit( "connected", {"data": "/logs Connected"}, namespace="/logs", ) - self.log_thread = LogMonitorThread() + self.log_thread = LogUpdateThread() self.log_thread.start() def on_disconnect(self): - LOG.debug("WS Disconnected") - self.log_thread.stop() + LOG.debug("LOG Disconnected") + if self.log_thread: + self.log_thread.stop() def setup_logging(flask_app, loglevel, quiet): @@ -608,8 +457,6 @@ def init_flask(loglevel, quiet): flask_app.route("/stats", methods=["GET"])(server.stats) flask_app.route("/messages", methods=["GET"])(server.messages) flask_app.route("/packets", methods=["GET"])(server.packets) - flask_app.route("/send-message", methods=["GET"])(server.send_message) - flask_app.route("/send-message-status", methods=["GET"])(server.send_message_status) flask_app.route("/save", methods=["GET"])(server.save) flask_app.route("/plugins", methods=["GET"])(server.plugins) @@ -619,7 +466,21 @@ def init_flask(loglevel, quiet): ) # import eventlet # eventlet.monkey_patch() + gunicorn_logger = logging.getLogger("gunicorn.error") + flask_app.logger.handlers = gunicorn_logger.handlers + flask_app.logger.setLevel(gunicorn_logger.level) - socketio.on_namespace(SendMessageNamespace("/sendmsg")) socketio.on_namespace(LoggingNamespace("/logs")) return socketio, flask_app + + +if __name__ == "aprsd.flask": + try: + default_config_file = cli_helper.DEFAULT_CONFIG_FILE + CONF( + [], project="aprsd", version=aprsd.__version__, + default_config_files=[default_config_file], + ) + except cfg.ConfigFilesNotFoundError: + pass + sio, app = init_flask("DEBUG", False) diff --git a/aprsd/rpc_server.py b/aprsd/rpc_server.py new file mode 100644 index 0000000..040a93d --- /dev/null +++ b/aprsd/rpc_server.py @@ -0,0 +1,90 @@ +import json +import logging + +from oslo_config import cfg +import rpyc +from rpyc.utils.authenticators import AuthenticationError +from rpyc.utils.server import ThreadPoolServer + +from aprsd import conf # noqa: F401 +from aprsd import packets, stats, threads +from aprsd.threads import log_monitor + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +def magic_word_authenticator(sock): + magic = sock.recv(len(CONF.rpc_settings.magic_word)).decode() + if magic != CONF.rpc_settings.magic_word: + raise AuthenticationError(f"wrong magic word {magic}") + return sock, None + + +class APRSDRPCThread(threads.APRSDThread): + def __init__(self): + super().__init__(name="RPCThread") + self.thread = ThreadPoolServer( + APRSDService, + port=CONF.rpc_settings.port, + protocol_config={"allow_public_attrs": True}, + authenticator=magic_word_authenticator, + ) + + def stop(self): + if self.thread: + self.thread.close() + self.thread_stop = True + + def loop(self): + # there is no loop as run is blocked + if self.thread and not self.thread_stop: + # This is a blocking call + self.thread.start() + + +@rpyc.service +class APRSDService(rpyc.Service): + def on_connect(self, conn): + # code that runs when a connection is created + # (to init the service, if needed) + LOG.info("Connected") + self._conn = conn + + def on_disconnect(self, conn): + # code that runs after the connection has already closed + # (to finalize the service, if needed) + LOG.info("Disconnected") + self._conn = None + + @rpyc.exposed + def get_stats(self): + stat = stats.APRSDStats() + stats_dict = stat.stats() + return json.dumps(stats_dict, indent=4, sort_keys=True, default=str) + + @rpyc.exposed + def get_stats_obj(self): + return stats.APRSDStats() + + @rpyc.exposed + def get_packet_list(self): + return packets.PacketList() + + @rpyc.exposed + def get_packet_track(self): + return packets.PacketTrack() + + @rpyc.exposed + def get_watch_list(self): + return packets.WatchList() + + @rpyc.exposed + def get_seen_list(self): + return packets.SeenList() + + @rpyc.exposed + def get_log_entries(self): + entries = log_monitor.LogEntries().get_all_and_purge() + return json.dumps(entries, default=str) diff --git a/aprsd/stats.py b/aprsd/stats.py index a5ba446..e52a224 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -63,7 +63,7 @@ class APRSDStats: def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - # any initializetion here + # any init here cls._instance.start_time = datetime.datetime.now() cls._instance._aprsis_keepalive = datetime.datetime.now() return cls._instance diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py new file mode 100644 index 0000000..90bda55 --- /dev/null +++ b/aprsd/threads/log_monitor.py @@ -0,0 +1,77 @@ +import logging +import threading + +import wrapt + +from aprsd import threads +from aprsd.logging import log + + +LOG = logging.getLogger("APRSD") + + +class LogEntries: + entries = [] + lock = threading.Lock() + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @wrapt.synchronized(lock) + def add(self, entry): + self.entries.append(entry) + + @wrapt.synchronized(lock) + def get_all_and_purge(self): + entries = self.entries.copy() + self.entries = [] + return entries + + @wrapt.synchronized(lock) + def __len__(self): + return len(self.entries) + + +class LogMonitorThread(threads.APRSDThread): + + def __init__(self): + super().__init__("LogMonitorThread") + + def loop(self): + try: + record = log.logging_queue.get(block=True, timeout=2) + if isinstance(record, list): + for item in record: + entry = self.json_record(item) + LogEntries().add(entry) + else: + entry = self.json_record(record) + LogEntries().add(entry) + except Exception: + # Just ignore thi + pass + + return True + + def json_record(self, record): + entry = {} + entry["filename"] = record.filename + entry["funcName"] = record.funcName + entry["levelname"] = record.levelname + entry["lineno"] = record.lineno + entry["module"] = record.module + entry["name"] = record.name + entry["pathname"] = record.pathname + entry["process"] = record.process + entry["processName"] = record.processName + if hasattr(record, "stack_info"): + entry["stack_info"] = record.stack_info + else: + entry["stack_info"] = None + entry["thread"] = record.thread + entry["threadName"] = record.threadName + entry["message"] = record.getMessage() + return entry diff --git a/aprsd/web/admin/static/js/main.js b/aprsd/web/admin/static/js/main.js index f02ab55..749f3b9 100644 --- a/aprsd/web/admin/static/js/main.js +++ b/aprsd/web/admin/static/js/main.js @@ -107,13 +107,8 @@ function update_packets( data ) { if (size_dict(packet_list) == 0 && size_dict(data) > 0) { packetsdiv.html('') } - console.log("PACKET_LIST") - console.log(packet_list); jQuery.each(data, function(i, val) { pkt = JSON.parse(val); - console.log("PACKET"); - console.log(pkt); - console.log(pkt.timestamp); update_watchlist_from_packet(pkt['from_call'], pkt); if ( packet_list.hasOwnProperty(pkt.timestamp) == false ) { diff --git a/aprsd/web/admin/static/js/send-message.js b/aprsd/web/admin/static/js/send-message.js index 51b71c4..9bcb470 100644 --- a/aprsd/web/admin/static/js/send-message.js +++ b/aprsd/web/admin/static/js/send-message.js @@ -28,36 +28,6 @@ function init_messages() { update_msg(msg); }); - $("#sendform").submit(function(event) { - event.preventDefault(); - - var $checkboxes = $(this).find('input[type=checkbox]'); - - //loop through the checkboxes and change to hidden fields - $checkboxes.each(function() { - if ($(this)[0].checked) { - $(this).attr('type', 'hidden'); - $(this).val(1); - } else { - $(this).attr('type', 'hidden'); - $(this).val(0); - } - }); - - msg = {'from': $('#from').val(), - 'password': $('#password').val(), - 'to': $('#to').val(), - 'message': $('#message').val(), - 'wait_reply': $('#wait_reply').val(), - } - - socket.emit("send", msg); - - //loop through the checkboxes and change to hidden fields - $checkboxes.each(function() { - $(this).attr('type', 'checkbox'); - }); - }); } function add_msg(msg) { diff --git a/aprsd/web/admin/templates/index.html b/aprsd/web/admin/templates/index.html index b7e4f21..ec452bc 100644 --- a/aprsd/web/admin/templates/index.html +++ b/aprsd/web/admin/templates/index.html @@ -82,7 +82,6 @@
Watch List
Plugins
Config
-
Send Message
LogFile
Raw JSON
@@ -160,29 +159,6 @@
{{ config_json|safe }}
-
-

Send Message

-
-
-

-

-

-

-

-

-

-

-

- -

- -
-
-
Messages
-
-
-
-

LOGFILE

diff --git a/aprsd/web/admin/templates/messages.html b/aprsd/web/admin/templates/messages.html deleted file mode 100644 index c3f6beb..0000000 --- a/aprsd/web/admin/templates/messages.html +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - -

-
-    
-
-
diff --git a/aprsd/web/admin/templates/send-message.html b/aprsd/web/admin/templates/send-message.html
deleted file mode 100644
index 566eaba..0000000
--- a/aprsd/web/admin/templates/send-message.html
+++ /dev/null
@@ -1,74 +0,0 @@
-
-    
-        
-        
-        
-        
-        
-
-
-        
-        
-        
-
-        
-        
-
-        
-        
-        
-
-        
-
-    
-
-    
-        
-

APRSD {{ version }}

-
- -
-
- {{ callsign }} - connected to - NONE -
- -
- NONE -
-
- -

Send Message Form

-
-

-

-

-

- -

-

- -

-

- -

- -

- - -
- -

Messages (0)

-
-
Messages
-
- - - - - diff --git a/dev-requirements.txt b/dev-requirements.txt index 9d3412f..16e5a38 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -49,7 +49,7 @@ pep8-naming==0.13.3 # via -r dev-requirements.in pip-tools==6.12.1 # via -r dev-requirements.in platformdirs==2.6.0 # via black, tox, virtualenv pluggy==1.0.0 # via pytest, tox -pre-commit==2.20.0 # via -r dev-requirements.in +pre-commit==2.21.0 # via -r dev-requirements.in pycodestyle==2.10.0 # via flake8 pyflakes==3.0.1 # via autoflake, flake8 pygments==2.13.0 # via rich, sphinx @@ -71,9 +71,9 @@ sphinxcontrib-jsmath==1.0.1 # via sphinx sphinxcontrib-qthelp==1.0.3 # via sphinx sphinxcontrib-serializinghtml==1.1.5 # via sphinx tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade -toml==0.10.2 # via autoflake, pre-commit +toml==0.10.2 # via autoflake tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox -tox==4.0.16 # via -r dev-requirements.in +tox==4.0.18 # via -r dev-requirements.in typing-extensions==4.4.0 # via libcst, mypy, typing-inspect typing-inspect==0.8.0 # via libcst unify==0.5 # via gray diff --git a/requirements.in b/requirements.in index aec3c2b..0ce8236 100644 --- a/requirements.in +++ b/requirements.in @@ -23,10 +23,11 @@ beautifulsoup4 wrapt # kiss3 uses attrs kiss3 -attrs==22.1.0 +attrs # for mobile checking user-agents pyopenssl dataclasses dacite2 oslo.config +rpyc diff --git a/requirements.txt b/requirements.txt index 5830f74..6ed3153 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,6 +39,7 @@ oslo-config==9.0.0 # via -r requirements.in oslo-i18n==5.1.0 # via oslo-config pbr==5.11.0 # via -r requirements.in, oslo-i18n, stevedore pluggy==1.0.0 # via -r requirements.in +plumbum==1.8.0 # via rpyc pycparser==2.21 # via cffi pygments==2.13.0 # via rich pyopenssl==22.1.0 # via -r requirements.in @@ -51,6 +52,7 @@ pyyaml==6.0 # via -r requirements.in, oslo-config requests==2.28.1 # via -r requirements.in, oslo-config, update-checker rfc3986==2.0.0 # via oslo-config rich==12.6.0 # via -r requirements.in +rpyc==5.3.0 # via -r requirements.in shellingham==1.5.0 # via click-completion six==1.16.0 # via -r requirements.in, click-completion, eventlet, imapclient soupsieve==2.3.2.post1 # via beautifulsoup4