Decouple admin web interface from server command

This patch introduces rpyc based RPC client/server for
the flask web interface to call into the running aprsd server
command to fetch stats, logs, etc to send to the browser.

This allows running the web interface via gunicorn command
gunicorn -k gevent --reload --threads 10 -w 1 aprsd.flask:app --log-level DEBUG
This commit is contained in:
Hemna 2022-12-28 15:44:49 -05:00
parent 02e4f78d0e
commit ff392395ed
15 changed files with 447 additions and 541 deletions

View File

@ -40,9 +40,11 @@ from aprsd import cli_helper, packets, stats, threads, utils
# setup the global logger # setup the global logger
# logging.basicConfig(level=logging.DEBUG) # level=10 # logging.basicConfig(level=logging.DEBUG) # level=10
CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
flask_enabled = False flask_enabled = False
rpc_serv = None
def custom_startswith(string, incomplete): def custom_startswith(string, incomplete):
@ -92,6 +94,7 @@ def signal_handler(sig, frame):
LOG.info(stats.APRSDStats()) LOG.info(stats.APRSDStats())
# signal.signal(signal.SIGTERM, sys.exit(0)) # signal.signal(signal.SIGTERM, sys.exit(0))
# sys.exit(0) # sys.exit(0)
if flask_enabled: if flask_enabled:
signal.signal(signal.SIGTERM, sys.exit(0)) signal.signal(signal.SIGTERM, sys.exit(0))

View File

@ -6,8 +6,10 @@ import click
from oslo_config import cfg from oslo_config import cfg
import aprsd import aprsd
from aprsd import (
cli_helper, client, packets, plugin, rpc_server, threads, utils,
)
from aprsd import aprsd as aprsd_main from aprsd import aprsd as aprsd_main
from aprsd import cli_helper, client, flask, packets, plugin, threads, utils
from aprsd.aprsd import cli from aprsd.aprsd import cli
from aprsd.threads import rx from aprsd.threads import rx
@ -32,15 +34,9 @@ LOG = logging.getLogger("APRSD")
@cli_helper.process_standard_options @cli_helper.process_standard_options
def server(ctx, flush): def server(ctx, flush):
"""Start the aprsd server gateway process.""" """Start the aprsd server gateway process."""
loglevel = ctx.obj["loglevel"]
quiet = ctx.obj["quiet"]
signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGINT, aprsd_main.signal_handler)
signal.signal(signal.SIGTERM, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler)
if not quiet:
click.echo("Load config")
level, msg = utils._check_version() level, msg = utils._check_version()
if level: if level:
LOG.warning(msg) LOG.warning(msg)
@ -99,18 +95,10 @@ def server(ctx, flush):
keepalive = threads.KeepAliveThread() keepalive = threads.KeepAliveThread()
keepalive.start() keepalive.start()
web_enabled = CONF.admin.web_enabled if CONF.rpc_settings.enabled:
rpc = rpc_server.APRSDRPCThread()
rpc.start()
log_monitor = threads.log_monitor.LogMonitorThread()
log_monitor.start()
if web_enabled:
aprsd_main.flask_enabled = True
(socketio, app) = flask.init_flask(loglevel, quiet)
socketio.run(
app,
allow_unsafe_werkzeug=True,
host=CONF.admin.web_ip,
port=CONF.admin.web_port,
)
# If there are items in the msgTracker, then save them
LOG.info("APRSD Exiting.")
return 0 return 0

View File

@ -1,6 +1,8 @@
from oslo_config import cfg from oslo_config import cfg
APRSD_DEFAULT_MAGIC_WORD = "CHANGEME!!!"
admin_group = cfg.OptGroup( admin_group = cfg.OptGroup(
name="admin", name="admin",
title="Admin web interface settings", title="Admin web interface settings",
@ -9,6 +11,10 @@ watch_list_group = cfg.OptGroup(
name="watch_list", name="watch_list",
title="Watch List settings", title="Watch List settings",
) )
rpc_group = cfg.OptGroup(
name="rpc_settings",
title="RPC Settings for admin <--> web",
)
aprsd_opts = [ aprsd_opts = [
@ -96,6 +102,29 @@ admin_opts = [
), ),
] ]
rpc_opts = [
cfg.BoolOpt(
"enabled",
default=True,
help="Enable RPC calls",
),
cfg.StrOpt(
"ip",
default="localhost",
help="The ip address to listen on",
),
cfg.PortOpt(
"port",
default=18861,
help="The port to listen on",
),
cfg.StrOpt(
"magic_word",
default=APRSD_DEFAULT_MAGIC_WORD,
help="Magic word to authenticate requests between client/server",
),
]
enabled_plugins_opts = [ enabled_plugins_opts = [
cfg.ListOpt( cfg.ListOpt(
"enabled_plugins", "enabled_plugins",
@ -123,6 +152,8 @@ def register_opts(config):
config.register_opts(admin_opts, group=admin_group) config.register_opts(admin_opts, group=admin_group)
config.register_group(watch_list_group) config.register_group(watch_list_group)
config.register_opts(watch_list_opts, group=watch_list_group) config.register_opts(watch_list_opts, group=watch_list_group)
config.register_group(rpc_group)
config.register_opts(rpc_opts, group=rpc_group)
def list_opts(): def list_opts():
@ -130,4 +161,5 @@ def list_opts():
"DEFAULT": (aprsd_opts + enabled_plugins_opts), "DEFAULT": (aprsd_opts + enabled_plugins_opts),
admin_group.name: admin_opts, admin_group.name: admin_opts,
watch_list_group.name: watch_list_opts, watch_list_group.name: watch_list_opts,
rpc_group.name: rpc_opts,
} }

View File

@ -2,27 +2,21 @@ import datetime
import json import json
import logging import logging
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
import threading
import time import time
import aprslib
from aprslib.exceptions import LoginError
import flask import flask
from flask import request
from flask.logging import default_handler from flask.logging import default_handler
import flask_classful import flask_classful
from flask_httpauth import HTTPBasicAuth from flask_httpauth import HTTPBasicAuth
from flask_socketio import Namespace, SocketIO from flask_socketio import Namespace, SocketIO
from oslo_config import cfg from oslo_config import cfg
import rpyc
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
import wrapt
import aprsd import aprsd
from aprsd import client, conf, packets, plugin, stats, threads, utils from aprsd import cli_helper, client, conf, packets, plugin, threads
from aprsd.clients import aprsis from aprsd.conf import common
from aprsd.logging import log
from aprsd.logging import rich as aprsd_logging from aprsd.logging import rich as aprsd_logging
from aprsd.threads import tx
CONF = cfg.CONF CONF = cfg.CONF
@ -30,72 +24,20 @@ LOG = logging.getLogger("APRSD")
auth = HTTPBasicAuth() auth = HTTPBasicAuth()
users = None users = None
app = None
class SentMessages: class AuthSocketStream(rpyc.SocketStream):
_instance = None """Used to authenitcate the RPC stream to remote."""
lock = threading.Lock()
msgs = {} @classmethod
def connect(cls, *args, authorizer=None, **kwargs):
stream_obj = super().connect(*args, **kwargs)
def __new__(cls, *args, **kwargs): if callable(authorizer):
"""This magic turns this into a singleton.""" authorizer(stream_obj.sock)
if cls._instance is None:
cls._instance = super().__new__(cls)
# Put any initialization here.
return cls._instance
@wrapt.synchronized(lock) return stream_obj
def add(self, packet):
self.msgs[packet.msgNo] = self._create(packet.msgNo)
self.msgs[packet.msgNo]["from"] = packet.from_call
self.msgs[packet.msgNo]["to"] = packet.to_call
self.msgs[packet.msgNo]["message"] = packet.message_text.rstrip("\n")
packet._build_raw()
self.msgs[packet.msgNo]["raw"] = packet.raw.rstrip("\n")
def _create(self, id):
return {
"id": id,
"ts": time.time(),
"ack": False,
"from": None,
"to": None,
"raw": None,
"message": None,
"status": None,
"last_update": None,
"reply": None,
}
@wrapt.synchronized(lock)
def __len__(self):
return len(self.msgs.keys())
@wrapt.synchronized(lock)
def get(self, id):
if id in self.msgs:
return self.msgs[id]
@wrapt.synchronized(lock)
def get_all(self):
return self.msgs
@wrapt.synchronized(lock)
def set_status(self, id, status):
self.msgs[id]["last_update"] = str(datetime.datetime.now())
self.msgs[id]["status"] = status
@wrapt.synchronized(lock)
def ack(self, id):
"""The message got an ack!"""
self.msgs[id]["last_update"] = str(datetime.datetime.now())
self.msgs[id]["ack"] = True
@wrapt.synchronized(lock)
def reply(self, id, packet):
"""We got a packet back from the sent message."""
self.msgs[id]["reply"] = packet
# HTTPBasicAuth doesn't work on a class method. # HTTPBasicAuth doesn't work on a class method.
@ -109,174 +51,129 @@ def verify_password(username, password):
return username return username
class SendMessageThread(threads.APRSDRXThread): class RPCClient:
"""Thread for sending a message from web.""" _instance = None
_rpc_client = None
aprsis_client = None def __new__(cls, *args, **kwargs):
request = None if cls._instance is None:
got_ack = False cls._instance = super().__new__(cls)
got_reply = False return cls._instance
def __init__(self, info, packet, namespace): def __init__(self):
self.request = info self._check_settings()
self.packet = packet self.get_rpc_client()
self.namespace = namespace
self.start_time = datetime.datetime.now()
msg = "({} -> {}) : {}".format(
info["from"],
info["to"],
info["message"],
)
super().__init__(f"WEB_SEND_MSG-{msg}")
def setup_connection(self): def _check_settings(self):
user = self.request["from"] if not CONF.rpc_settings.enabled:
password = self.request["password"] LOG.error("RPC is not enabled, no way to get stats!!")
host = CONF.aprs_network.host
port = CONF.aprs_network.port
connected = False
backoff = 1
while not connected:
try:
LOG.info("Creating aprslib client")
aprs_client = aprsis.Aprsdis( if CONF.rpc_settings.magic_word == common.APRSD_DEFAULT_MAGIC_WORD:
user, LOG.warning("You are using the default RPC magic word!!!")
passwd=password, LOG.warning("edit aprsd.conf and change rpc_settings.magic_word")
host=host,
port=port,
)
# Force the logging to be the same
aprs_client.logger = LOG
aprs_client.connect()
connected = True
backoff = 1
except LoginError as e:
LOG.error(f"Failed to login to APRS-IS Server '{e}'")
connected = False
raise e
except Exception as e:
LOG.error(f"Unable to connect to APRS-IS server. '{e}' ")
time.sleep(backoff)
backoff = backoff * 2
continue
LOG.debug(f"Logging in to APRS-IS with user '{user}'")
return aprs_client
def run(self): def _rpyc_connect(
LOG.debug("Starting") self, host, port,
from_call = self.request["from"] service=rpyc.VoidService,
to_call = self.request["to"] config={}, ipv6=False,
message = self.request["message"] keepalive=False, authorizer=None,
LOG.info( ):
"From: '{}' To: '{}' Send '{}'".format(
from_call,
to_call,
message,
),
)
print(f"Connecting to RPC host {host}:{port}")
try: try:
self.aprs_client = self.setup_connection() s = AuthSocketStream.connect(
except LoginError as e: host, port, ipv6=ipv6, keepalive=keepalive,
f"Failed to setup Connection {e}" authorizer=authorizer,
tx.send(
self.packet,
direct=True,
aprs_client=self.aprs_client,
)
SentMessages().set_status(self.packet.msgNo, "Sent")
while not self.thread_stop:
can_loop = self.loop()
if not can_loop:
self.stop()
threads.APRSDThreadList().remove(self)
LOG.debug("Exiting")
def process_ack_packet(self, packet):
global socketio
ack_num = packet.msgNo
LOG.info(f"We got ack for our sent message {ack_num}")
packet.log("RXACK")
SentMessages().ack(self.packet.msgNo)
stats.APRSDStats().ack_rx_inc()
socketio.emit(
"ack", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
if self.request["wait_reply"] == "0" or self.got_reply:
# We aren't waiting for a reply, so we can bail
self.stop()
self.thread_stop = self.aprs_client.thread_stop = True
def process_our_message_packet(self, packet):
global socketio
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
msg_number = packet.msgNo
SentMessages().reply(self.packet.msgNo, packet)
SentMessages().set_status(self.packet.msgNo, "Got Reply")
socketio.emit(
"reply", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
tx.send(
packets.AckPacket(
from_call=self.request["from"],
to_call=packet.from_call,
msgNo=msg_number,
),
direct=True,
aprs_client=self.aprsis_client,
)
SentMessages().set_status(self.packet.msgNo, "Ack Sent")
# Now we can exit, since we are done.
self.got_reply = True
if self.got_ack:
self.stop()
self.thread_stop = self.aprs_client.thread_stop = True
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
packet.log(header="RX Packet")
if isinstance(packet, packets.AckPacket):
self.process_ack_packet(packet)
else:
self.process_our_message_packet(packet)
def loop(self):
# we have a general time limit expecting results of
# around 120 seconds before we exit
now = datetime.datetime.now()
start_delta = str(now - self.start_time)
delta = utils.parse_delta_str(start_delta)
d = datetime.timedelta(**delta)
max_timeout = {"hours": 0.0, "minutes": 1, "seconds": 0}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.error("XXXXXX Haven't completed everything in 60 seconds. BAIL!")
return False
if self.got_ack and self.got_reply:
LOG.warning("We got everything already. BAIL")
return False
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
self.aprs_client.consumer(
self.process_packet, raw=False, blocking=False,
) )
except aprslib.exceptions.ConnectionDrop: return rpyc.utils.factory.connect_stream(s, service, config=config)
LOG.error("Connection dropped.") except ConnectionRefusedError:
return False LOG.error(f"Failed to connect to RPC host {host}")
return None
return True def get_rpc_client(self):
if not self._rpc_client:
magic = CONF.rpc_settings.magic_word
self._rpc_client = self._rpyc_connect(
CONF.rpc_settings.ip,
CONF.rpc_settings.port,
authorizer=lambda sock: sock.send(magic.encode()),
)
return self._rpc_client
def get_stats_dict(self):
cl = self.get_rpc_client()
result = {}
if not cl:
return result
try:
rpc_stats_dict = cl.root.get_stats()
result = json.loads(rpc_stats_dict)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_track(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_track()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_packet_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_packet_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_watch_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_watch_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_seen_list(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result = cl.root.get_seen_list()
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
def get_log_entries(self):
cl = self.get_rpc_client()
result = None
if not cl:
return result
try:
result_str = cl.root.get_log_entries()
result = json.loads(result_str)
except EOFError:
LOG.error("Lost connection to RPC Host")
self._rpc_client = None
return result
class APRSDFlask(flask_classful.FlaskView): class APRSDFlask(flask_classful.FlaskView):
@ -291,21 +188,25 @@ class APRSDFlask(flask_classful.FlaskView):
@auth.login_required @auth.login_required
def index(self): def index(self):
stats = self._stats() stats = self._stats()
print(stats)
LOG.debug( LOG.debug(
"watch list? {}".format( "watch list? {}".format(
CONF.watch_list.callsigns, CONF.watch_list.callsigns,
), ),
) )
wl = packets.WatchList() wl = RPCClient().get_watch_list()
if wl.is_enabled(): if wl and wl.is_enabled():
watch_count = len(wl) watch_count = len(wl)
watch_age = wl.max_delta() watch_age = wl.max_delta()
else: else:
watch_count = 0 watch_count = 0
watch_age = 0 watch_age = 0
sl = packets.SeenList() sl = RPCClient().get_seen_list()
seen_count = len(sl) if sl:
seen_count = len(sl)
else:
seen_count = 0
pm = plugin.PluginManager() pm = plugin.PluginManager()
plugins = pm.get_plugins() plugins = pm.get_plugins()
@ -346,7 +247,10 @@ class APRSDFlask(flask_classful.FlaskView):
aprs_connection=aprs_connection, aprs_connection=aprs_connection,
callsign=CONF.callsign, callsign=CONF.callsign,
version=aprsd.__version__, version=aprsd.__version__,
config_json=json.dumps(entries), config_json=json.dumps(
entries, indent=4,
sort_keys=True, default=str,
),
watch_count=watch_count, watch_count=watch_count,
watch_age=watch_age, watch_age=watch_age,
seen_count=seen_count, seen_count=seen_count,
@ -363,31 +267,18 @@ class APRSDFlask(flask_classful.FlaskView):
return flask.render_template("messages.html", messages=json.dumps(msgs)) return flask.render_template("messages.html", messages=json.dumps(msgs))
@auth.login_required
def send_message_status(self):
LOG.debug(request)
msgs = SentMessages()
info = msgs.get_all()
return json.dumps(info)
@auth.login_required
def send_message(self):
LOG.debug(request)
if request.method == "GET":
return flask.render_template(
"send-message.html",
callsign=CONF.callsign,
version=aprsd.__version__,
)
@auth.login_required @auth.login_required
def packets(self): def packets(self):
packet_list = packets.PacketList().get() packet_list = RPCClient().get_packet_list()
tmp_list = [] if packet_list:
for pkt in packet_list: packets = packet_list.get()
tmp_list.append(pkt.json) tmp_list = []
for pkt in packets:
tmp_list.append(pkt.json)
return json.dumps(tmp_list) return json.dumps(tmp_list)
else:
return json.dumps([])
@auth.login_required @auth.login_required
def plugins(self): def plugins(self):
@ -404,39 +295,69 @@ class APRSDFlask(flask_classful.FlaskView):
return json.dumps({"messages": "saved"}) return json.dumps({"messages": "saved"})
def _stats(self): def _stats(self):
stats_obj = stats.APRSDStats() track = RPCClient().get_packet_track()
track = packets.PacketTrack()
now = datetime.datetime.now() now = datetime.datetime.now()
time_format = "%m-%d-%Y %H:%M:%S" time_format = "%m-%d-%Y %H:%M:%S"
stats_dict = stats_obj.stats() stats_dict = RPCClient().get_stats_dict()
if not stats_dict:
# Convert the watch_list entries to age stats_dict = {
wl = packets.WatchList() "aprsd": {},
new_list = {} "aprs-is": {"server": ""},
for call in wl.get_all(): "messages": {
# call_date = datetime.datetime.strptime( "sent": 0,
# str(wl.last_seen(call)), "received": 0,
# "%Y-%m-%d %H:%M:%S.%f", },
# ) "email": {
new_list[call] = { "sent": 0,
"last": wl.age(call), "received": 0,
"packets": wl.get(call)["packets"].get(), },
"seen_list": {
"sent": 0,
"received": 0,
},
} }
# Convert the watch_list entries to age
wl = RPCClient().get_watch_list()
new_list = {}
if wl:
for call in wl.get_all():
# call_date = datetime.datetime.strptime(
# str(wl.last_seen(call)),
# "%Y-%m-%d %H:%M:%S.%f",
# )
# We have to convert the RingBuffer to a real list
# so that json.dumps works.
# pkts = []
# for pkt in wl.get(call)["packets"].get():
# pkts.append(pkt)
new_list[call] = {
"last": wl.age(call),
# "packets": pkts
}
stats_dict["aprsd"]["watch_list"] = new_list stats_dict["aprsd"]["watch_list"] = new_list
packet_list = packets.PacketList() packet_list = RPCClient().get_packet_list()
rx = packet_list.total_rx() rx = tx = 0
tx = packet_list.total_tx() if packet_list:
rx = packet_list.total_rx()
tx = packet_list.total_tx()
stats_dict["packets"] = { stats_dict["packets"] = {
"sent": tx, "sent": tx,
"received": rx, "received": rx,
} }
if track:
size_tracker = len(track)
else:
size_tracker = 0
result = { result = {
"time": now.strftime(time_format), "time": now.strftime(time_format),
"size_tracker": len(track), "size_tracker": size_tracker,
"stats": stats_dict, "stats": stats_dict,
} }
@ -446,116 +367,44 @@ class APRSDFlask(flask_classful.FlaskView):
return json.dumps(self._stats()) return json.dumps(self._stats())
class SendMessageNamespace(Namespace): class LogUpdateThread(threads.APRSDThread):
got_ack = False
reply_sent = False
packet = None
request = None
def __init__(self, namespace=None):
super().__init__(namespace)
def on_connect(self):
global socketio
LOG.debug("Web socket connected")
socketio.emit(
"connected", {"data": "/sendmsg Connected"},
namespace="/sendmsg",
)
def on_disconnect(self):
LOG.debug("WS Disconnected")
def on_send(self, data):
global socketio
LOG.debug(f"WS: on_send {data}")
self.request = data
self.packet = packets.MessagePacket(
from_call=data["from"],
to_call=data["to"],
message_text=data["message"],
)
msgs = SentMessages()
msgs.add(self.packet)
msgs.set_status(self.packet.msgNo, "Sending")
socketio.emit(
"sent", SentMessages().get(self.packet.msgNo),
namespace="/sendmsg",
)
socketio.start_background_task(
self._start, data,
self.packet, self,
)
LOG.warning("WS: on_send: exit")
def _start(self, data, packet, namespace):
msg_thread = SendMessageThread(data, packet, self)
msg_thread.start()
def handle_message(self, data):
LOG.debug(f"WS Data {data}")
def handle_json(self, data):
LOG.debug(f"WS json {data}")
class LogMonitorThread(threads.APRSDThread):
def __init__(self): def __init__(self):
super().__init__("LogMonitorThread") super().__init__("LogUpdate")
def loop(self): def loop(self):
global socketio global socketio
try:
record = log.logging_queue.get(block=True, timeout=5)
json_record = self.json_record(record)
socketio.emit(
"log_entry", json_record,
namespace="/logs",
)
except Exception:
# Just ignore thi
pass
if socketio:
log_entries = RPCClient().get_log_entries()
if log_entries:
for entry in log_entries:
socketio.emit(
"log_entry", entry,
namespace="/logs",
)
time.sleep(5)
return True return True
def json_record(self, record):
entry = {}
entry["filename"] = record.filename
entry["funcName"] = record.funcName
entry["levelname"] = record.levelname
entry["lineno"] = record.lineno
entry["module"] = record.module
entry["name"] = record.name
entry["pathname"] = record.pathname
entry["process"] = record.process
entry["processName"] = record.processName
if hasattr(record, "stack_info"):
entry["stack_info"] = record.stack_info
else:
entry["stack_info"] = None
entry["thread"] = record.thread
entry["threadName"] = record.threadName
entry["message"] = record.getMessage()
return entry
class LoggingNamespace(Namespace): class LoggingNamespace(Namespace):
log_thread = None
def on_connect(self): def on_connect(self):
global socketio global socketio
LOG.debug("Web socket connected")
socketio.emit( socketio.emit(
"connected", {"data": "/logs Connected"}, "connected", {"data": "/logs Connected"},
namespace="/logs", namespace="/logs",
) )
self.log_thread = LogMonitorThread() self.log_thread = LogUpdateThread()
self.log_thread.start() self.log_thread.start()
def on_disconnect(self): def on_disconnect(self):
LOG.debug("WS Disconnected") LOG.debug("LOG Disconnected")
self.log_thread.stop() if self.log_thread:
self.log_thread.stop()
def setup_logging(flask_app, loglevel, quiet): def setup_logging(flask_app, loglevel, quiet):
@ -608,8 +457,6 @@ def init_flask(loglevel, quiet):
flask_app.route("/stats", methods=["GET"])(server.stats) flask_app.route("/stats", methods=["GET"])(server.stats)
flask_app.route("/messages", methods=["GET"])(server.messages) flask_app.route("/messages", methods=["GET"])(server.messages)
flask_app.route("/packets", methods=["GET"])(server.packets) flask_app.route("/packets", methods=["GET"])(server.packets)
flask_app.route("/send-message", methods=["GET"])(server.send_message)
flask_app.route("/send-message-status", methods=["GET"])(server.send_message_status)
flask_app.route("/save", methods=["GET"])(server.save) flask_app.route("/save", methods=["GET"])(server.save)
flask_app.route("/plugins", methods=["GET"])(server.plugins) flask_app.route("/plugins", methods=["GET"])(server.plugins)
@ -619,7 +466,21 @@ def init_flask(loglevel, quiet):
) )
# import eventlet # import eventlet
# eventlet.monkey_patch() # eventlet.monkey_patch()
gunicorn_logger = logging.getLogger("gunicorn.error")
flask_app.logger.handlers = gunicorn_logger.handlers
flask_app.logger.setLevel(gunicorn_logger.level)
socketio.on_namespace(SendMessageNamespace("/sendmsg"))
socketio.on_namespace(LoggingNamespace("/logs")) socketio.on_namespace(LoggingNamespace("/logs"))
return socketio, flask_app return socketio, flask_app
if __name__ == "aprsd.flask":
try:
default_config_file = cli_helper.DEFAULT_CONFIG_FILE
CONF(
[], project="aprsd", version=aprsd.__version__,
default_config_files=[default_config_file],
)
except cfg.ConfigFilesNotFoundError:
pass
sio, app = init_flask("DEBUG", False)

90
aprsd/rpc_server.py Normal file
View File

@ -0,0 +1,90 @@
import json
import logging
from oslo_config import cfg
import rpyc
from rpyc.utils.authenticators import AuthenticationError
from rpyc.utils.server import ThreadPoolServer
from aprsd import conf # noqa: F401
from aprsd import packets, stats, threads
from aprsd.threads import log_monitor
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
def magic_word_authenticator(sock):
magic = sock.recv(len(CONF.rpc_settings.magic_word)).decode()
if magic != CONF.rpc_settings.magic_word:
raise AuthenticationError(f"wrong magic word {magic}")
return sock, None
class APRSDRPCThread(threads.APRSDThread):
def __init__(self):
super().__init__(name="RPCThread")
self.thread = ThreadPoolServer(
APRSDService,
port=CONF.rpc_settings.port,
protocol_config={"allow_public_attrs": True},
authenticator=magic_word_authenticator,
)
def stop(self):
if self.thread:
self.thread.close()
self.thread_stop = True
def loop(self):
# there is no loop as run is blocked
if self.thread and not self.thread_stop:
# This is a blocking call
self.thread.start()
@rpyc.service
class APRSDService(rpyc.Service):
def on_connect(self, conn):
# code that runs when a connection is created
# (to init the service, if needed)
LOG.info("Connected")
self._conn = conn
def on_disconnect(self, conn):
# code that runs after the connection has already closed
# (to finalize the service, if needed)
LOG.info("Disconnected")
self._conn = None
@rpyc.exposed
def get_stats(self):
stat = stats.APRSDStats()
stats_dict = stat.stats()
return json.dumps(stats_dict, indent=4, sort_keys=True, default=str)
@rpyc.exposed
def get_stats_obj(self):
return stats.APRSDStats()
@rpyc.exposed
def get_packet_list(self):
return packets.PacketList()
@rpyc.exposed
def get_packet_track(self):
return packets.PacketTrack()
@rpyc.exposed
def get_watch_list(self):
return packets.WatchList()
@rpyc.exposed
def get_seen_list(self):
return packets.SeenList()
@rpyc.exposed
def get_log_entries(self):
entries = log_monitor.LogEntries().get_all_and_purge()
return json.dumps(entries, default=str)

View File

@ -63,7 +63,7 @@ class APRSDStats:
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
# any initializetion here # any init here
cls._instance.start_time = datetime.datetime.now() cls._instance.start_time = datetime.datetime.now()
cls._instance._aprsis_keepalive = datetime.datetime.now() cls._instance._aprsis_keepalive = datetime.datetime.now()
return cls._instance return cls._instance

View File

@ -0,0 +1,77 @@
import logging
import threading
import wrapt
from aprsd import threads
from aprsd.logging import log
LOG = logging.getLogger("APRSD")
class LogEntries:
entries = []
lock = threading.Lock()
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@wrapt.synchronized(lock)
def add(self, entry):
self.entries.append(entry)
@wrapt.synchronized(lock)
def get_all_and_purge(self):
entries = self.entries.copy()
self.entries = []
return entries
@wrapt.synchronized(lock)
def __len__(self):
return len(self.entries)
class LogMonitorThread(threads.APRSDThread):
def __init__(self):
super().__init__("LogMonitorThread")
def loop(self):
try:
record = log.logging_queue.get(block=True, timeout=2)
if isinstance(record, list):
for item in record:
entry = self.json_record(item)
LogEntries().add(entry)
else:
entry = self.json_record(record)
LogEntries().add(entry)
except Exception:
# Just ignore thi
pass
return True
def json_record(self, record):
entry = {}
entry["filename"] = record.filename
entry["funcName"] = record.funcName
entry["levelname"] = record.levelname
entry["lineno"] = record.lineno
entry["module"] = record.module
entry["name"] = record.name
entry["pathname"] = record.pathname
entry["process"] = record.process
entry["processName"] = record.processName
if hasattr(record, "stack_info"):
entry["stack_info"] = record.stack_info
else:
entry["stack_info"] = None
entry["thread"] = record.thread
entry["threadName"] = record.threadName
entry["message"] = record.getMessage()
return entry

View File

@ -107,13 +107,8 @@ function update_packets( data ) {
if (size_dict(packet_list) == 0 && size_dict(data) > 0) { if (size_dict(packet_list) == 0 && size_dict(data) > 0) {
packetsdiv.html('') packetsdiv.html('')
} }
console.log("PACKET_LIST")
console.log(packet_list);
jQuery.each(data, function(i, val) { jQuery.each(data, function(i, val) {
pkt = JSON.parse(val); pkt = JSON.parse(val);
console.log("PACKET");
console.log(pkt);
console.log(pkt.timestamp);
update_watchlist_from_packet(pkt['from_call'], pkt); update_watchlist_from_packet(pkt['from_call'], pkt);
if ( packet_list.hasOwnProperty(pkt.timestamp) == false ) { if ( packet_list.hasOwnProperty(pkt.timestamp) == false ) {

View File

@ -28,36 +28,6 @@ function init_messages() {
update_msg(msg); update_msg(msg);
}); });
$("#sendform").submit(function(event) {
event.preventDefault();
var $checkboxes = $(this).find('input[type=checkbox]');
//loop through the checkboxes and change to hidden fields
$checkboxes.each(function() {
if ($(this)[0].checked) {
$(this).attr('type', 'hidden');
$(this).val(1);
} else {
$(this).attr('type', 'hidden');
$(this).val(0);
}
});
msg = {'from': $('#from').val(),
'password': $('#password').val(),
'to': $('#to').val(),
'message': $('#message').val(),
'wait_reply': $('#wait_reply').val(),
}
socket.emit("send", msg);
//loop through the checkboxes and change to hidden fields
$checkboxes.each(function() {
$(this).attr('type', 'checkbox');
});
});
} }
function add_msg(msg) { function add_msg(msg) {

View File

@ -82,7 +82,6 @@
<div class="item" data-tab="watch-tab">Watch List</div> <div class="item" data-tab="watch-tab">Watch List</div>
<div class="item" data-tab="plugin-tab">Plugins</div> <div class="item" data-tab="plugin-tab">Plugins</div>
<div class="item" data-tab="config-tab">Config</div> <div class="item" data-tab="config-tab">Config</div>
<div class="item" data-tab="send-tab">Send Message</div>
<div class="item" data-tab="log-tab">LogFile</div> <div class="item" data-tab="log-tab">LogFile</div>
<div class="item" data-tab="raw-tab">Raw JSON</div> <div class="item" data-tab="raw-tab">Raw JSON</div>
</div> </div>
@ -160,29 +159,6 @@
<pre id="configjson" class="language-json">{{ config_json|safe }}</pre> <pre id="configjson" class="language-json">{{ config_json|safe }}</pre>
</div> </div>
<div class="ui bottom attached tab segment" data-tab="send-tab">
<h3 class="ui dividing header">Send Message</h3>
<div id="sendMsgDiv" class="ui mini text">
<form id="sendform" name="sendmsg" action="">
<p><label for="from_call">From Callsign:</label>
<input type="text" name="from_call" id="from"></p>
<p><label for="from_call_password">Password:</label>
<input type="password" name="from_call_password" id='password'></p>
<p><label for="to_call">To Callsign:</label>
<input type="text" name="to_call" id="to" ></p>
<p><label for="message">Message:</label>
<input type="text" name="message" id="message" ></p>
<p><label for="wait">Wait for Reply?</label>
<input type="checkbox" name="wait_reply" id="wait_reply" value="off" checked>
</p>
<input type="submit" name="submit" class="button" id="send_msg" value="Send" />
</form>
<div class="ui styled fluid accordion" id="accordion">
<div id="sendMsgsDiv" class="ui mini text">Messages</div>
</div>
</div>
</div>
<div class="ui bottom attached tab segment" data-tab="log-tab"> <div class="ui bottom attached tab segment" data-tab="log-tab">
<h3 class="ui dividing header">LOGFILE</h3> <h3 class="ui dividing header">LOGFILE</h3>
<pre id="logContainer" style="height: 600px;overflow-y:auto;overflow-x:auto;"><code id="logtext" class="language-log" ></code></pre> <pre id="logContainer" style="height: 600px;overflow-y:auto;overflow-x:auto;"><code id="logtext" class="language-log" ></code></pre>

View File

@ -1,15 +0,0 @@
<html>
<head>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script src="json-viewer/jquery.json-viewer.js"></script>
<link href="json-viewer/jquery.json-viewer.css" type="text/css" rel="stylesheet" />
</head>
<pre id="json-viewer"></pre>
<script>
var data = {{ messages | safe }}
$('#json-viewer').jsonViewer(data)
</script>
</html>

View File

@ -1,74 +0,0 @@
<html>
<head>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
<link rel="stylesheet" href="https://ajax.googleapis.com/ajax/libs/jqueryui/1.12.1/themes/smoothness/jquery-ui.css">
<script src="https://ajax.googleapis.com/ajax/libs/jqueryui/1.12.1/jquery-ui.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/jquery-simple-websocket@1.1.4/src/jquery.simple.websocket.min.js"></script>
<script src="https://cdn.socket.io/4.1.2/socket.io.min.js" integrity="sha384-toS6mmwu70G0fw54EGlWWeA4z3dyJ+dlXBtSURSKN4vyRFOcxd3Bzjj/AoOwY+Rg" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/prismjs@1.23.0/prism.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/prismjs@1.23.0/components/prism-json.js"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/prismjs@1.23.0/themes/prism-tomorrow.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/semantic-ui@2.4.2/dist/semantic.min.css">
<script src="https://cdn.jsdelivr.net/npm/semantic-ui@2.4.2/dist/semantic.min.js"></script>
<link rel="stylesheet" href="/static/css/index.css">
<link rel="stylesheet" href="/static/css/tabs.css">
<script src="/static/js/send-message.js"></script>
<script language="JavaScript">
$(document).ready(function() {
init_messages();
});
</script>
</head>
<body>
<div class='ui text container'>
<h1 class='ui dividing header'>APRSD {{ version }}</h1>
</div>
<div class='ui grid text container'>
<div class='left floated ten wide column'>
<span style='color: green'>{{ callsign }}</span>
connected to
<span style='color: blue' id='aprsis'>NONE</span>
</div>
<div class='right floated four wide column'>
<span id='uptime'>NONE</span>
</div>
</div>
<h3 class="ui dividing header">Send Message Form</h3>
<form id="sendform" name="sendmsg" action="">
<p><label for="from_call">From Callsign:</label>
<input type="text" name="from_call" id="from" value="WB4BOR"></p>
<p><label for="from_call_password">Password:</label>
<input type="password" name="from_call_password" id='password' value="24496"></p>
<p><label for="to_call">To Callsign:</label>
<input type="text" name="to_call" id="to" value="WB4BOR-11"></p>
<p><label for="message">Message:</label>
<input type="text" name="message" id="message" value="ping"></p>
<p><label for="wait">Wait for Reply?</label>
<input type="checkbox" name="wait_reply" id="wait_reply" value="off" checked>
</p>
<input type="submit" name="submit" class="button" id="send_msg" value="Send" />
</form>
<h3 class="ui dividing header">Messages (<span id="msgs_count">0</span>)</h3>
<div class="ui styled fluid accordion" id="accordion">
<div id="msgsDiv" class="ui mini text">Messages</div>
</div>
</body>
</html>

View File

@ -49,7 +49,7 @@ pep8-naming==0.13.3 # via -r dev-requirements.in
pip-tools==6.12.1 # via -r dev-requirements.in pip-tools==6.12.1 # via -r dev-requirements.in
platformdirs==2.6.0 # via black, tox, virtualenv platformdirs==2.6.0 # via black, tox, virtualenv
pluggy==1.0.0 # via pytest, tox pluggy==1.0.0 # via pytest, tox
pre-commit==2.20.0 # via -r dev-requirements.in pre-commit==2.21.0 # via -r dev-requirements.in
pycodestyle==2.10.0 # via flake8 pycodestyle==2.10.0 # via flake8
pyflakes==3.0.1 # via autoflake, flake8 pyflakes==3.0.1 # via autoflake, flake8
pygments==2.13.0 # via rich, sphinx pygments==2.13.0 # via rich, sphinx
@ -71,9 +71,9 @@ sphinxcontrib-jsmath==1.0.1 # via sphinx
sphinxcontrib-qthelp==1.0.3 # via sphinx sphinxcontrib-qthelp==1.0.3 # via sphinx
sphinxcontrib-serializinghtml==1.1.5 # via sphinx sphinxcontrib-serializinghtml==1.1.5 # via sphinx
tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade
toml==0.10.2 # via autoflake, pre-commit toml==0.10.2 # via autoflake
tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox
tox==4.0.16 # via -r dev-requirements.in tox==4.0.18 # via -r dev-requirements.in
typing-extensions==4.4.0 # via libcst, mypy, typing-inspect typing-extensions==4.4.0 # via libcst, mypy, typing-inspect
typing-inspect==0.8.0 # via libcst typing-inspect==0.8.0 # via libcst
unify==0.5 # via gray unify==0.5 # via gray

View File

@ -23,10 +23,11 @@ beautifulsoup4
wrapt wrapt
# kiss3 uses attrs # kiss3 uses attrs
kiss3 kiss3
attrs==22.1.0 attrs
# for mobile checking # for mobile checking
user-agents user-agents
pyopenssl pyopenssl
dataclasses dataclasses
dacite2 dacite2
oslo.config oslo.config
rpyc

View File

@ -39,6 +39,7 @@ oslo-config==9.0.0 # via -r requirements.in
oslo-i18n==5.1.0 # via oslo-config oslo-i18n==5.1.0 # via oslo-config
pbr==5.11.0 # via -r requirements.in, oslo-i18n, stevedore pbr==5.11.0 # via -r requirements.in, oslo-i18n, stevedore
pluggy==1.0.0 # via -r requirements.in pluggy==1.0.0 # via -r requirements.in
plumbum==1.8.0 # via rpyc
pycparser==2.21 # via cffi pycparser==2.21 # via cffi
pygments==2.13.0 # via rich pygments==2.13.0 # via rich
pyopenssl==22.1.0 # via -r requirements.in pyopenssl==22.1.0 # via -r requirements.in
@ -51,6 +52,7 @@ pyyaml==6.0 # via -r requirements.in, oslo-config
requests==2.28.1 # via -r requirements.in, oslo-config, update-checker requests==2.28.1 # via -r requirements.in, oslo-config, update-checker
rfc3986==2.0.0 # via oslo-config rfc3986==2.0.0 # via oslo-config
rich==12.6.0 # via -r requirements.in rich==12.6.0 # via -r requirements.in
rpyc==5.3.0 # via -r requirements.in
shellingham==1.5.0 # via click-completion shellingham==1.5.0 # via click-completion
six==1.16.0 # via -r requirements.in, click-completion, eventlet, imapclient six==1.16.0 # via -r requirements.in, click-completion, eventlet, imapclient
soupsieve==2.3.2.post1 # via beautifulsoup4 soupsieve==2.3.2.post1 # via beautifulsoup4