diff --git a/aprsd/aprsd.py b/aprsd/aprsd.py index 7021e1b..43d079a 100644 --- a/aprsd/aprsd.py +++ b/aprsd/aprsd.py @@ -40,9 +40,11 @@ from aprsd import cli_helper, packets, stats, threads, utils # setup the global logger # logging.basicConfig(level=logging.DEBUG) # level=10 +CONF = cfg.CONF LOG = logging.getLogger("APRSD") CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) flask_enabled = False +rpc_serv = None def custom_startswith(string, incomplete): @@ -92,6 +94,7 @@ def signal_handler(sig, frame): LOG.info(stats.APRSDStats()) # signal.signal(signal.SIGTERM, sys.exit(0)) # sys.exit(0) + if flask_enabled: signal.signal(signal.SIGTERM, sys.exit(0)) diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index d936352..dec6be1 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -6,8 +6,10 @@ import click from oslo_config import cfg import aprsd +from aprsd import ( + cli_helper, client, packets, plugin, rpc_server, threads, utils, +) from aprsd import aprsd as aprsd_main -from aprsd import cli_helper, client, flask, packets, plugin, threads, utils from aprsd.aprsd import cli from aprsd.threads import rx @@ -32,15 +34,9 @@ LOG = logging.getLogger("APRSD") @cli_helper.process_standard_options def server(ctx, flush): """Start the aprsd server gateway process.""" - loglevel = ctx.obj["loglevel"] - quiet = ctx.obj["quiet"] - signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler) - if not quiet: - click.echo("Load config") - level, msg = utils._check_version() if level: LOG.warning(msg) @@ -99,18 +95,10 @@ def server(ctx, flush): keepalive = threads.KeepAliveThread() keepalive.start() - web_enabled = CONF.admin.web_enabled + if CONF.rpc_settings.enabled: + rpc = rpc_server.APRSDRPCThread() + rpc.start() + log_monitor = threads.log_monitor.LogMonitorThread() + log_monitor.start() - if web_enabled: - aprsd_main.flask_enabled = True - (socketio, app) = flask.init_flask(loglevel, quiet) - socketio.run( - app, - allow_unsafe_werkzeug=True, - host=CONF.admin.web_ip, - port=CONF.admin.web_port, - ) - - # If there are items in the msgTracker, then save them - LOG.info("APRSD Exiting.") return 0 diff --git a/aprsd/conf/common.py b/aprsd/conf/common.py index 0691d3c..a24bd55 100644 --- a/aprsd/conf/common.py +++ b/aprsd/conf/common.py @@ -1,6 +1,8 @@ from oslo_config import cfg +APRSD_DEFAULT_MAGIC_WORD = "CHANGEME!!!" + admin_group = cfg.OptGroup( name="admin", title="Admin web interface settings", @@ -9,6 +11,10 @@ watch_list_group = cfg.OptGroup( name="watch_list", title="Watch List settings", ) +rpc_group = cfg.OptGroup( + name="rpc_settings", + title="RPC Settings for admin <--> web", +) aprsd_opts = [ @@ -96,6 +102,29 @@ admin_opts = [ ), ] +rpc_opts = [ + cfg.BoolOpt( + "enabled", + default=True, + help="Enable RPC calls", + ), + cfg.StrOpt( + "ip", + default="localhost", + help="The ip address to listen on", + ), + cfg.PortOpt( + "port", + default=18861, + help="The port to listen on", + ), + cfg.StrOpt( + "magic_word", + default=APRSD_DEFAULT_MAGIC_WORD, + help="Magic word to authenticate requests between client/server", + ), +] + enabled_plugins_opts = [ cfg.ListOpt( "enabled_plugins", @@ -123,6 +152,8 @@ def register_opts(config): config.register_opts(admin_opts, group=admin_group) config.register_group(watch_list_group) config.register_opts(watch_list_opts, group=watch_list_group) + config.register_group(rpc_group) + config.register_opts(rpc_opts, group=rpc_group) def list_opts(): @@ -130,4 +161,5 @@ def list_opts(): "DEFAULT": (aprsd_opts + enabled_plugins_opts), admin_group.name: admin_opts, watch_list_group.name: watch_list_opts, + rpc_group.name: rpc_opts, } diff --git a/aprsd/flask.py b/aprsd/flask.py index 9b1a302..58a2575 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -2,27 +2,21 @@ import datetime import json import logging from logging.handlers import RotatingFileHandler -import threading import time -import aprslib -from aprslib.exceptions import LoginError import flask -from flask import request from flask.logging import default_handler import flask_classful from flask_httpauth import HTTPBasicAuth from flask_socketio import Namespace, SocketIO from oslo_config import cfg +import rpyc from werkzeug.security import check_password_hash, generate_password_hash -import wrapt import aprsd -from aprsd import client, conf, packets, plugin, stats, threads, utils -from aprsd.clients import aprsis -from aprsd.logging import log +from aprsd import cli_helper, client, conf, packets, plugin, threads +from aprsd.conf import common from aprsd.logging import rich as aprsd_logging -from aprsd.threads import tx CONF = cfg.CONF @@ -30,72 +24,20 @@ LOG = logging.getLogger("APRSD") auth = HTTPBasicAuth() users = None +app = None -class SentMessages: - _instance = None - lock = threading.Lock() +class AuthSocketStream(rpyc.SocketStream): + """Used to authenitcate the RPC stream to remote.""" - msgs = {} + @classmethod + def connect(cls, *args, authorizer=None, **kwargs): + stream_obj = super().connect(*args, **kwargs) - def __new__(cls, *args, **kwargs): - """This magic turns this into a singleton.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - # Put any initialization here. - return cls._instance + if callable(authorizer): + authorizer(stream_obj.sock) - @wrapt.synchronized(lock) - def add(self, packet): - self.msgs[packet.msgNo] = self._create(packet.msgNo) - self.msgs[packet.msgNo]["from"] = packet.from_call - self.msgs[packet.msgNo]["to"] = packet.to_call - self.msgs[packet.msgNo]["message"] = packet.message_text.rstrip("\n") - packet._build_raw() - self.msgs[packet.msgNo]["raw"] = packet.raw.rstrip("\n") - - def _create(self, id): - return { - "id": id, - "ts": time.time(), - "ack": False, - "from": None, - "to": None, - "raw": None, - "message": None, - "status": None, - "last_update": None, - "reply": None, - } - - @wrapt.synchronized(lock) - def __len__(self): - return len(self.msgs.keys()) - - @wrapt.synchronized(lock) - def get(self, id): - if id in self.msgs: - return self.msgs[id] - - @wrapt.synchronized(lock) - def get_all(self): - return self.msgs - - @wrapt.synchronized(lock) - def set_status(self, id, status): - self.msgs[id]["last_update"] = str(datetime.datetime.now()) - self.msgs[id]["status"] = status - - @wrapt.synchronized(lock) - def ack(self, id): - """The message got an ack!""" - self.msgs[id]["last_update"] = str(datetime.datetime.now()) - self.msgs[id]["ack"] = True - - @wrapt.synchronized(lock) - def reply(self, id, packet): - """We got a packet back from the sent message.""" - self.msgs[id]["reply"] = packet + return stream_obj # HTTPBasicAuth doesn't work on a class method. @@ -109,174 +51,129 @@ def verify_password(username, password): return username -class SendMessageThread(threads.APRSDRXThread): - """Thread for sending a message from web.""" +class RPCClient: + _instance = None + _rpc_client = None - aprsis_client = None - request = None - got_ack = False - got_reply = False + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance - def __init__(self, info, packet, namespace): - self.request = info - self.packet = packet - self.namespace = namespace - self.start_time = datetime.datetime.now() - msg = "({} -> {}) : {}".format( - info["from"], - info["to"], - info["message"], - ) - super().__init__(f"WEB_SEND_MSG-{msg}") + def __init__(self): + self._check_settings() + self.get_rpc_client() - def setup_connection(self): - user = self.request["from"] - password = self.request["password"] - host = CONF.aprs_network.host - port = CONF.aprs_network.port - connected = False - backoff = 1 - while not connected: - try: - LOG.info("Creating aprslib client") + def _check_settings(self): + if not CONF.rpc_settings.enabled: + LOG.error("RPC is not enabled, no way to get stats!!") - aprs_client = aprsis.Aprsdis( - user, - passwd=password, - host=host, - port=port, - ) - # Force the logging to be the same - aprs_client.logger = LOG - aprs_client.connect() - connected = True - backoff = 1 - except LoginError as e: - LOG.error(f"Failed to login to APRS-IS Server '{e}'") - connected = False - raise e - except Exception as e: - LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") - time.sleep(backoff) - backoff = backoff * 2 - continue - LOG.debug(f"Logging in to APRS-IS with user '{user}'") - return aprs_client + if CONF.rpc_settings.magic_word == common.APRSD_DEFAULT_MAGIC_WORD: + LOG.warning("You are using the default RPC magic word!!!") + LOG.warning("edit aprsd.conf and change rpc_settings.magic_word") - def run(self): - LOG.debug("Starting") - from_call = self.request["from"] - to_call = self.request["to"] - message = self.request["message"] - LOG.info( - "From: '{}' To: '{}' Send '{}'".format( - from_call, - to_call, - message, - ), - ) + def _rpyc_connect( + self, host, port, + service=rpyc.VoidService, + config={}, ipv6=False, + keepalive=False, authorizer=None, + ): + print(f"Connecting to RPC host {host}:{port}") try: - self.aprs_client = self.setup_connection() - except LoginError as e: - f"Failed to setup Connection {e}" - - tx.send( - self.packet, - direct=True, - aprs_client=self.aprs_client, - ) - SentMessages().set_status(self.packet.msgNo, "Sent") - - while not self.thread_stop: - can_loop = self.loop() - if not can_loop: - self.stop() - threads.APRSDThreadList().remove(self) - LOG.debug("Exiting") - - def process_ack_packet(self, packet): - global socketio - ack_num = packet.msgNo - LOG.info(f"We got ack for our sent message {ack_num}") - packet.log("RXACK") - SentMessages().ack(self.packet.msgNo) - stats.APRSDStats().ack_rx_inc() - socketio.emit( - "ack", SentMessages().get(self.packet.msgNo), - namespace="/sendmsg", - ) - if self.request["wait_reply"] == "0" or self.got_reply: - # We aren't waiting for a reply, so we can bail - self.stop() - self.thread_stop = self.aprs_client.thread_stop = True - - def process_our_message_packet(self, packet): - global socketio - packets.PacketList().rx(packet) - stats.APRSDStats().msgs_rx_inc() - msg_number = packet.msgNo - SentMessages().reply(self.packet.msgNo, packet) - SentMessages().set_status(self.packet.msgNo, "Got Reply") - socketio.emit( - "reply", SentMessages().get(self.packet.msgNo), - namespace="/sendmsg", - ) - tx.send( - packets.AckPacket( - from_call=self.request["from"], - to_call=packet.from_call, - msgNo=msg_number, - ), - direct=True, - aprs_client=self.aprsis_client, - ) - SentMessages().set_status(self.packet.msgNo, "Ack Sent") - - # Now we can exit, since we are done. - self.got_reply = True - if self.got_ack: - self.stop() - self.thread_stop = self.aprs_client.thread_stop = True - - def process_packet(self, *args, **kwargs): - packet = self._client.decode_packet(*args, **kwargs) - packet.log(header="RX Packet") - - if isinstance(packet, packets.AckPacket): - self.process_ack_packet(packet) - else: - self.process_our_message_packet(packet) - - def loop(self): - # we have a general time limit expecting results of - # around 120 seconds before we exit - now = datetime.datetime.now() - start_delta = str(now - self.start_time) - delta = utils.parse_delta_str(start_delta) - d = datetime.timedelta(**delta) - max_timeout = {"hours": 0.0, "minutes": 1, "seconds": 0} - max_delta = datetime.timedelta(**max_timeout) - if d > max_delta: - LOG.error("XXXXXX Haven't completed everything in 60 seconds. BAIL!") - return False - - if self.got_ack and self.got_reply: - LOG.warning("We got everything already. BAIL") - return False - - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - self.aprs_client.consumer( - self.process_packet, raw=False, blocking=False, + s = AuthSocketStream.connect( + host, port, ipv6=ipv6, keepalive=keepalive, + authorizer=authorizer, ) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped.") - return False + return rpyc.utils.factory.connect_stream(s, service, config=config) + except ConnectionRefusedError: + LOG.error(f"Failed to connect to RPC host {host}") + return None - return True + def get_rpc_client(self): + if not self._rpc_client: + magic = CONF.rpc_settings.magic_word + self._rpc_client = self._rpyc_connect( + CONF.rpc_settings.ip, + CONF.rpc_settings.port, + authorizer=lambda sock: sock.send(magic.encode()), + ) + return self._rpc_client + + def get_stats_dict(self): + cl = self.get_rpc_client() + result = {} + if not cl: + return result + + try: + rpc_stats_dict = cl.root.get_stats() + result = json.loads(rpc_stats_dict) + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_packet_track(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_packet_track() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_packet_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_packet_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_watch_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_watch_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_seen_list(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result = cl.root.get_seen_list() + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result + + def get_log_entries(self): + cl = self.get_rpc_client() + result = None + if not cl: + return result + try: + result_str = cl.root.get_log_entries() + result = json.loads(result_str) + except EOFError: + LOG.error("Lost connection to RPC Host") + self._rpc_client = None + return result class APRSDFlask(flask_classful.FlaskView): @@ -291,21 +188,25 @@ class APRSDFlask(flask_classful.FlaskView): @auth.login_required def index(self): stats = self._stats() + print(stats) LOG.debug( "watch list? {}".format( CONF.watch_list.callsigns, ), ) - wl = packets.WatchList() - if wl.is_enabled(): + wl = RPCClient().get_watch_list() + if wl and wl.is_enabled(): watch_count = len(wl) watch_age = wl.max_delta() else: watch_count = 0 watch_age = 0 - sl = packets.SeenList() - seen_count = len(sl) + sl = RPCClient().get_seen_list() + if sl: + seen_count = len(sl) + else: + seen_count = 0 pm = plugin.PluginManager() plugins = pm.get_plugins() @@ -346,7 +247,10 @@ class APRSDFlask(flask_classful.FlaskView): aprs_connection=aprs_connection, callsign=CONF.callsign, version=aprsd.__version__, - config_json=json.dumps(entries), + config_json=json.dumps( + entries, indent=4, + sort_keys=True, default=str, + ), watch_count=watch_count, watch_age=watch_age, seen_count=seen_count, @@ -363,31 +267,18 @@ class APRSDFlask(flask_classful.FlaskView): return flask.render_template("messages.html", messages=json.dumps(msgs)) - @auth.login_required - def send_message_status(self): - LOG.debug(request) - msgs = SentMessages() - info = msgs.get_all() - return json.dumps(info) - - @auth.login_required - def send_message(self): - LOG.debug(request) - if request.method == "GET": - return flask.render_template( - "send-message.html", - callsign=CONF.callsign, - version=aprsd.__version__, - ) - @auth.login_required def packets(self): - packet_list = packets.PacketList().get() - tmp_list = [] - for pkt in packet_list: - tmp_list.append(pkt.json) + packet_list = RPCClient().get_packet_list() + if packet_list: + packets = packet_list.get() + tmp_list = [] + for pkt in packets: + tmp_list.append(pkt.json) - return json.dumps(tmp_list) + return json.dumps(tmp_list) + else: + return json.dumps([]) @auth.login_required def plugins(self): @@ -404,39 +295,69 @@ class APRSDFlask(flask_classful.FlaskView): return json.dumps({"messages": "saved"}) def _stats(self): - stats_obj = stats.APRSDStats() - track = packets.PacketTrack() + track = RPCClient().get_packet_track() now = datetime.datetime.now() time_format = "%m-%d-%Y %H:%M:%S" - stats_dict = stats_obj.stats() - - # Convert the watch_list entries to age - wl = packets.WatchList() - new_list = {} - for call in wl.get_all(): - # call_date = datetime.datetime.strptime( - # str(wl.last_seen(call)), - # "%Y-%m-%d %H:%M:%S.%f", - # ) - new_list[call] = { - "last": wl.age(call), - "packets": wl.get(call)["packets"].get(), + stats_dict = RPCClient().get_stats_dict() + if not stats_dict: + stats_dict = { + "aprsd": {}, + "aprs-is": {"server": ""}, + "messages": { + "sent": 0, + "received": 0, + }, + "email": { + "sent": 0, + "received": 0, + }, + "seen_list": { + "sent": 0, + "received": 0, + }, } + # Convert the watch_list entries to age + wl = RPCClient().get_watch_list() + new_list = {} + if wl: + for call in wl.get_all(): + # call_date = datetime.datetime.strptime( + # str(wl.last_seen(call)), + # "%Y-%m-%d %H:%M:%S.%f", + # ) + + # We have to convert the RingBuffer to a real list + # so that json.dumps works. + # pkts = [] + # for pkt in wl.get(call)["packets"].get(): + # pkts.append(pkt) + + new_list[call] = { + "last": wl.age(call), + # "packets": pkts + } + stats_dict["aprsd"]["watch_list"] = new_list - packet_list = packets.PacketList() - rx = packet_list.total_rx() - tx = packet_list.total_tx() + packet_list = RPCClient().get_packet_list() + rx = tx = 0 + if packet_list: + rx = packet_list.total_rx() + tx = packet_list.total_tx() stats_dict["packets"] = { "sent": tx, "received": rx, } + if track: + size_tracker = len(track) + else: + size_tracker = 0 result = { "time": now.strftime(time_format), - "size_tracker": len(track), + "size_tracker": size_tracker, "stats": stats_dict, } @@ -446,116 +367,44 @@ class APRSDFlask(flask_classful.FlaskView): return json.dumps(self._stats()) -class SendMessageNamespace(Namespace): - got_ack = False - reply_sent = False - packet = None - request = None - - def __init__(self, namespace=None): - super().__init__(namespace) - - def on_connect(self): - global socketio - LOG.debug("Web socket connected") - socketio.emit( - "connected", {"data": "/sendmsg Connected"}, - namespace="/sendmsg", - ) - - def on_disconnect(self): - LOG.debug("WS Disconnected") - - def on_send(self, data): - global socketio - LOG.debug(f"WS: on_send {data}") - self.request = data - self.packet = packets.MessagePacket( - from_call=data["from"], - to_call=data["to"], - message_text=data["message"], - ) - msgs = SentMessages() - msgs.add(self.packet) - msgs.set_status(self.packet.msgNo, "Sending") - socketio.emit( - "sent", SentMessages().get(self.packet.msgNo), - namespace="/sendmsg", - ) - - socketio.start_background_task( - self._start, data, - self.packet, self, - ) - LOG.warning("WS: on_send: exit") - - def _start(self, data, packet, namespace): - msg_thread = SendMessageThread(data, packet, self) - msg_thread.start() - - def handle_message(self, data): - LOG.debug(f"WS Data {data}") - - def handle_json(self, data): - LOG.debug(f"WS json {data}") - - -class LogMonitorThread(threads.APRSDThread): +class LogUpdateThread(threads.APRSDThread): def __init__(self): - super().__init__("LogMonitorThread") + super().__init__("LogUpdate") def loop(self): global socketio - try: - record = log.logging_queue.get(block=True, timeout=5) - json_record = self.json_record(record) - socketio.emit( - "log_entry", json_record, - namespace="/logs", - ) - except Exception: - # Just ignore thi - pass + if socketio: + log_entries = RPCClient().get_log_entries() + + if log_entries: + for entry in log_entries: + socketio.emit( + "log_entry", entry, + namespace="/logs", + ) + + time.sleep(5) return True - def json_record(self, record): - entry = {} - entry["filename"] = record.filename - entry["funcName"] = record.funcName - entry["levelname"] = record.levelname - entry["lineno"] = record.lineno - entry["module"] = record.module - entry["name"] = record.name - entry["pathname"] = record.pathname - entry["process"] = record.process - entry["processName"] = record.processName - if hasattr(record, "stack_info"): - entry["stack_info"] = record.stack_info - else: - entry["stack_info"] = None - entry["thread"] = record.thread - entry["threadName"] = record.threadName - entry["message"] = record.getMessage() - return entry - class LoggingNamespace(Namespace): + log_thread = None def on_connect(self): global socketio - LOG.debug("Web socket connected") socketio.emit( "connected", {"data": "/logs Connected"}, namespace="/logs", ) - self.log_thread = LogMonitorThread() + self.log_thread = LogUpdateThread() self.log_thread.start() def on_disconnect(self): - LOG.debug("WS Disconnected") - self.log_thread.stop() + LOG.debug("LOG Disconnected") + if self.log_thread: + self.log_thread.stop() def setup_logging(flask_app, loglevel, quiet): @@ -608,8 +457,6 @@ def init_flask(loglevel, quiet): flask_app.route("/stats", methods=["GET"])(server.stats) flask_app.route("/messages", methods=["GET"])(server.messages) flask_app.route("/packets", methods=["GET"])(server.packets) - flask_app.route("/send-message", methods=["GET"])(server.send_message) - flask_app.route("/send-message-status", methods=["GET"])(server.send_message_status) flask_app.route("/save", methods=["GET"])(server.save) flask_app.route("/plugins", methods=["GET"])(server.plugins) @@ -619,7 +466,21 @@ def init_flask(loglevel, quiet): ) # import eventlet # eventlet.monkey_patch() + gunicorn_logger = logging.getLogger("gunicorn.error") + flask_app.logger.handlers = gunicorn_logger.handlers + flask_app.logger.setLevel(gunicorn_logger.level) - socketio.on_namespace(SendMessageNamespace("/sendmsg")) socketio.on_namespace(LoggingNamespace("/logs")) return socketio, flask_app + + +if __name__ == "aprsd.flask": + try: + default_config_file = cli_helper.DEFAULT_CONFIG_FILE + CONF( + [], project="aprsd", version=aprsd.__version__, + default_config_files=[default_config_file], + ) + except cfg.ConfigFilesNotFoundError: + pass + sio, app = init_flask("DEBUG", False) diff --git a/aprsd/rpc_server.py b/aprsd/rpc_server.py new file mode 100644 index 0000000..040a93d --- /dev/null +++ b/aprsd/rpc_server.py @@ -0,0 +1,90 @@ +import json +import logging + +from oslo_config import cfg +import rpyc +from rpyc.utils.authenticators import AuthenticationError +from rpyc.utils.server import ThreadPoolServer + +from aprsd import conf # noqa: F401 +from aprsd import packets, stats, threads +from aprsd.threads import log_monitor + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +def magic_word_authenticator(sock): + magic = sock.recv(len(CONF.rpc_settings.magic_word)).decode() + if magic != CONF.rpc_settings.magic_word: + raise AuthenticationError(f"wrong magic word {magic}") + return sock, None + + +class APRSDRPCThread(threads.APRSDThread): + def __init__(self): + super().__init__(name="RPCThread") + self.thread = ThreadPoolServer( + APRSDService, + port=CONF.rpc_settings.port, + protocol_config={"allow_public_attrs": True}, + authenticator=magic_word_authenticator, + ) + + def stop(self): + if self.thread: + self.thread.close() + self.thread_stop = True + + def loop(self): + # there is no loop as run is blocked + if self.thread and not self.thread_stop: + # This is a blocking call + self.thread.start() + + +@rpyc.service +class APRSDService(rpyc.Service): + def on_connect(self, conn): + # code that runs when a connection is created + # (to init the service, if needed) + LOG.info("Connected") + self._conn = conn + + def on_disconnect(self, conn): + # code that runs after the connection has already closed + # (to finalize the service, if needed) + LOG.info("Disconnected") + self._conn = None + + @rpyc.exposed + def get_stats(self): + stat = stats.APRSDStats() + stats_dict = stat.stats() + return json.dumps(stats_dict, indent=4, sort_keys=True, default=str) + + @rpyc.exposed + def get_stats_obj(self): + return stats.APRSDStats() + + @rpyc.exposed + def get_packet_list(self): + return packets.PacketList() + + @rpyc.exposed + def get_packet_track(self): + return packets.PacketTrack() + + @rpyc.exposed + def get_watch_list(self): + return packets.WatchList() + + @rpyc.exposed + def get_seen_list(self): + return packets.SeenList() + + @rpyc.exposed + def get_log_entries(self): + entries = log_monitor.LogEntries().get_all_and_purge() + return json.dumps(entries, default=str) diff --git a/aprsd/stats.py b/aprsd/stats.py index a5ba446..e52a224 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -63,7 +63,7 @@ class APRSDStats: def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - # any initializetion here + # any init here cls._instance.start_time = datetime.datetime.now() cls._instance._aprsis_keepalive = datetime.datetime.now() return cls._instance diff --git a/aprsd/threads/log_monitor.py b/aprsd/threads/log_monitor.py new file mode 100644 index 0000000..90bda55 --- /dev/null +++ b/aprsd/threads/log_monitor.py @@ -0,0 +1,77 @@ +import logging +import threading + +import wrapt + +from aprsd import threads +from aprsd.logging import log + + +LOG = logging.getLogger("APRSD") + + +class LogEntries: + entries = [] + lock = threading.Lock() + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @wrapt.synchronized(lock) + def add(self, entry): + self.entries.append(entry) + + @wrapt.synchronized(lock) + def get_all_and_purge(self): + entries = self.entries.copy() + self.entries = [] + return entries + + @wrapt.synchronized(lock) + def __len__(self): + return len(self.entries) + + +class LogMonitorThread(threads.APRSDThread): + + def __init__(self): + super().__init__("LogMonitorThread") + + def loop(self): + try: + record = log.logging_queue.get(block=True, timeout=2) + if isinstance(record, list): + for item in record: + entry = self.json_record(item) + LogEntries().add(entry) + else: + entry = self.json_record(record) + LogEntries().add(entry) + except Exception: + # Just ignore thi + pass + + return True + + def json_record(self, record): + entry = {} + entry["filename"] = record.filename + entry["funcName"] = record.funcName + entry["levelname"] = record.levelname + entry["lineno"] = record.lineno + entry["module"] = record.module + entry["name"] = record.name + entry["pathname"] = record.pathname + entry["process"] = record.process + entry["processName"] = record.processName + if hasattr(record, "stack_info"): + entry["stack_info"] = record.stack_info + else: + entry["stack_info"] = None + entry["thread"] = record.thread + entry["threadName"] = record.threadName + entry["message"] = record.getMessage() + return entry diff --git a/aprsd/web/admin/static/js/main.js b/aprsd/web/admin/static/js/main.js index f02ab55..749f3b9 100644 --- a/aprsd/web/admin/static/js/main.js +++ b/aprsd/web/admin/static/js/main.js @@ -107,13 +107,8 @@ function update_packets( data ) { if (size_dict(packet_list) == 0 && size_dict(data) > 0) { packetsdiv.html('') } - console.log("PACKET_LIST") - console.log(packet_list); jQuery.each(data, function(i, val) { pkt = JSON.parse(val); - console.log("PACKET"); - console.log(pkt); - console.log(pkt.timestamp); update_watchlist_from_packet(pkt['from_call'], pkt); if ( packet_list.hasOwnProperty(pkt.timestamp) == false ) { diff --git a/aprsd/web/admin/static/js/send-message.js b/aprsd/web/admin/static/js/send-message.js index 51b71c4..9bcb470 100644 --- a/aprsd/web/admin/static/js/send-message.js +++ b/aprsd/web/admin/static/js/send-message.js @@ -28,36 +28,6 @@ function init_messages() { update_msg(msg); }); - $("#sendform").submit(function(event) { - event.preventDefault(); - - var $checkboxes = $(this).find('input[type=checkbox]'); - - //loop through the checkboxes and change to hidden fields - $checkboxes.each(function() { - if ($(this)[0].checked) { - $(this).attr('type', 'hidden'); - $(this).val(1); - } else { - $(this).attr('type', 'hidden'); - $(this).val(0); - } - }); - - msg = {'from': $('#from').val(), - 'password': $('#password').val(), - 'to': $('#to').val(), - 'message': $('#message').val(), - 'wait_reply': $('#wait_reply').val(), - } - - socket.emit("send", msg); - - //loop through the checkboxes and change to hidden fields - $checkboxes.each(function() { - $(this).attr('type', 'checkbox'); - }); - }); } function add_msg(msg) { diff --git a/aprsd/web/admin/templates/index.html b/aprsd/web/admin/templates/index.html index b7e4f21..ec452bc 100644 --- a/aprsd/web/admin/templates/index.html +++ b/aprsd/web/admin/templates/index.html @@ -82,7 +82,6 @@
Watch List
Plugins
Config
-
Send Message
LogFile
Raw JSON
@@ -160,29 +159,6 @@
{{ config_json|safe }}
-
-

Send Message

-
-
-

-

-

-

-

-

-

-

-

- -

- -
-
-
Messages
-
-
-
-

LOGFILE

diff --git a/aprsd/web/admin/templates/messages.html b/aprsd/web/admin/templates/messages.html deleted file mode 100644 index c3f6beb..0000000 --- a/aprsd/web/admin/templates/messages.html +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - -

-
-    
-
-
diff --git a/aprsd/web/admin/templates/send-message.html b/aprsd/web/admin/templates/send-message.html
deleted file mode 100644
index 566eaba..0000000
--- a/aprsd/web/admin/templates/send-message.html
+++ /dev/null
@@ -1,74 +0,0 @@
-
-    
-        
-        
-        
-        
-        
-
-
-        
-        
-        
-
-        
-        
-
-        
-        
-        
-
-        
-
-    
-
-    
-        
-

APRSD {{ version }}

-
- -
-
- {{ callsign }} - connected to - NONE -
- -
- NONE -
-
- -

Send Message Form

-
-

-

-

-

- -

-

- -

-

- -

- -

- - -
- -

Messages (0)

-
-
Messages
-
- - - - - diff --git a/dev-requirements.txt b/dev-requirements.txt index 9d3412f..16e5a38 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -49,7 +49,7 @@ pep8-naming==0.13.3 # via -r dev-requirements.in pip-tools==6.12.1 # via -r dev-requirements.in platformdirs==2.6.0 # via black, tox, virtualenv pluggy==1.0.0 # via pytest, tox -pre-commit==2.20.0 # via -r dev-requirements.in +pre-commit==2.21.0 # via -r dev-requirements.in pycodestyle==2.10.0 # via flake8 pyflakes==3.0.1 # via autoflake, flake8 pygments==2.13.0 # via rich, sphinx @@ -71,9 +71,9 @@ sphinxcontrib-jsmath==1.0.1 # via sphinx sphinxcontrib-qthelp==1.0.3 # via sphinx sphinxcontrib-serializinghtml==1.1.5 # via sphinx tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade -toml==0.10.2 # via autoflake, pre-commit +toml==0.10.2 # via autoflake tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox -tox==4.0.16 # via -r dev-requirements.in +tox==4.0.18 # via -r dev-requirements.in typing-extensions==4.4.0 # via libcst, mypy, typing-inspect typing-inspect==0.8.0 # via libcst unify==0.5 # via gray diff --git a/requirements.in b/requirements.in index aec3c2b..0ce8236 100644 --- a/requirements.in +++ b/requirements.in @@ -23,10 +23,11 @@ beautifulsoup4 wrapt # kiss3 uses attrs kiss3 -attrs==22.1.0 +attrs # for mobile checking user-agents pyopenssl dataclasses dacite2 oslo.config +rpyc diff --git a/requirements.txt b/requirements.txt index 5830f74..6ed3153 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,6 +39,7 @@ oslo-config==9.0.0 # via -r requirements.in oslo-i18n==5.1.0 # via oslo-config pbr==5.11.0 # via -r requirements.in, oslo-i18n, stevedore pluggy==1.0.0 # via -r requirements.in +plumbum==1.8.0 # via rpyc pycparser==2.21 # via cffi pygments==2.13.0 # via rich pyopenssl==22.1.0 # via -r requirements.in @@ -51,6 +52,7 @@ pyyaml==6.0 # via -r requirements.in, oslo-config requests==2.28.1 # via -r requirements.in, oslo-config, update-checker rfc3986==2.0.0 # via oslo-config rich==12.6.0 # via -r requirements.in +rpyc==5.3.0 # via -r requirements.in shellingham==1.5.0 # via click-completion six==1.16.0 # via -r requirements.in, click-completion, eventlet, imapclient soupsieve==2.3.2.post1 # via beautifulsoup4