Got webchat working with KISS tcp

This patch reworks the KISS client to get rid of
aioax25 as it was too difficult to work with due to
heavy use of asyncio.

Switched to the kiss3 pypi library.
This commit is contained in:
Hemna 2022-11-22 13:32:19 -05:00
parent d717a22717
commit 7d970cbe70
6 changed files with 108 additions and 104 deletions

View File

@ -195,8 +195,8 @@ class KISSClient(Client):
@trace.trace
def setup_connection(self):
ax25client = kiss.Aioax25Client(self.config)
return ax25client
client = kiss.KISS3Client(self.config)
return client
class ClientFactory:
@ -223,7 +223,7 @@ class ClientFactory:
elif KISSClient.is_enabled(self.config):
key = KISSClient.transport(self.config)
LOG.debug(f"GET client {key}")
LOG.debug(f"GET client '{key}'")
builder = self._builders.get(key)
if not builder:
raise ValueError(key)

View File

@ -1,21 +1,19 @@
import asyncio
import logging
from aioax25 import interface
from aioax25 import kiss as kiss
from aioax25.aprs import APRSInterface
from aioax25.aprs.frame import APRSFrame
import aprslib
from ax253 import Frame
import kiss
from aprsd import messaging
from aprsd.utils import trace
LOG = logging.getLogger("APRSD")
class Aioax25Client:
class KISS3Client:
def __init__(self, config):
self.config = config
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
self.setup()
def setup(self):
@ -25,84 +23,65 @@ class Aioax25Client:
False,
):
LOG.debug(
"Setting up Serial KISS connection to {}".format(
"KISS({}) Serial connection to {}".format(
kiss.__version__,
self.config["kiss"]["serial"]["device"],
),
)
self.kissdev = kiss.SerialKISSDevice(
device=self.config["kiss"]["serial"]["device"],
baudrate=self.config["kiss"]["serial"].get("baudrate", 9600),
loop=self.loop,
log=LOG,
self.kiss = kiss.SerialKISS(
port=self.config["kiss"]["serial"]["device"],
speed=self.config["kiss"]["serial"].get("baudrate", 9600),
strip_df_start=True,
)
elif "tcp" in self.config["kiss"] and self.config["kiss"]["tcp"].get(
"enabled",
False,
):
LOG.debug(
"Setting up KISSTCP Connection to {}:{}".format(
"KISS({}) TCP Connection to {}:{}".format(
kiss.__version__,
self.config["kiss"]["tcp"]["host"],
self.config["kiss"]["tcp"]["port"],
),
)
self.kissdev = kiss.TCPKISSDevice(
self.config["kiss"]["tcp"]["host"],
self.config["kiss"]["tcp"]["port"],
loop=self.loop,
log=LOG,
self.kiss = kiss.TCPKISS(
host=self.config["kiss"]["tcp"]["host"],
port=int(self.config["kiss"]["tcp"]["port"]),
strip_df_start=True,
)
LOG.debug("Creating AX25Interface")
self.ax25int = interface.AX25Interface(
kissport=self.kissdev[0],
loop=self.loop,
log=LOG,
)
LOG.debug("Creating APRSInterface")
self.aprsint = APRSInterface(
ax25int=self.ax25int,
mycall=self.config["aprsd"]["callsign"],
log=LOG,
)
self.kissdev.open()
LOG.debug("Starting KISS interface connection")
self.kiss.start()
@trace.trace
def stop(self):
LOG.debug(self.kissdev)
self.loop.stop()
self.kissdev.close()
try:
self.kiss.stop()
self.kiss.loop.call_soon_threadsafe(
self.kiss.protocol.transport.close,
)
except Exception as ex:
LOG.exception(ex)
def set_filter(self, filter):
# This does nothing right now.
pass
def consumer(self, callback, blocking=False, immortal=False, raw=False):
callsign = self.config["aprsd"]["callsign"]
call = callsign.split("-")
if len(call) > 1:
callsign = call[0]
ssid = int(call[1])
else:
ssid = 0
self.aprsint.bind(callback=callback, callsign=callsign, ssid=ssid, regex=False)
def parse_frame(self, frame_bytes):
frame = Frame.from_bytes(frame_bytes)
# Now parse it with aprslib
packet = aprslib.parse(str(frame))
kwargs = {
"frame": str(frame),
"packet": packet,
}
self._parse_callback(**kwargs)
# async def set_after(fut, delay, value):
# # Sleep for *delay* seconds.
# await asyncio.sleep(delay)
#
# # Set *value* as a result of *fut* Future.
# fut.set_result(value)
#
# async def my_wait(fut):
# await fut
#
# fut = self.loop.create_future()
# self.loop.create_task(
# set_after(fut, 5, "nothing")
# )
LOG.debug("RUN FOREVER")
self.loop.run_forever()
# my_wait(fut)
def consumer(self, callback, blocking=False, immortal=False, raw=False):
LOG.debug("Start blocking KISS consumer")
self._parse_callback = callback
self.kiss.read(callback=self.parse_frame, min_frames=None)
LOG.debug("END blocking KISS consumer")
def send(self, msg):
"""Send an APRS Message object."""
@ -112,7 +91,10 @@ class Aioax25Client:
# payload
# )).encode('US-ASCII'),
# payload = str(msg).encode('US-ASCII')
msg_payload = f"{msg.message}{{{str(msg.id)}"
if isinstance(msg, messaging.AckMessage):
msg_payload = f"ack{msg.id}"
else:
msg_payload = f"{msg.message}{{{str(msg.id)}"
payload = (
":{:<9}:{}".format(
msg.tocall,
@ -120,19 +102,10 @@ class Aioax25Client:
)
).encode("US-ASCII")
LOG.debug(f"Send '{payload}' TO KISS")
self.aprsint.transmit(
APRSFrame(
destination=msg.tocall,
source=msg.fromcall,
payload=payload,
repeaters=["WIDE1-1", "WIDE2-1"],
),
frame = Frame.ui(
destination=msg.tocall,
source=msg.fromcall,
path=["WIDE1-1", "WIDE2-1"],
info=payload,
)
# self.aprsint.send_message(
# addressee=msg.tocall,
# message=payload,
# path=["WIDE1-1", "WIDE2-1"],
# oneshot=True,
# )
self.kiss.write(frame)

View File

@ -20,7 +20,6 @@ from werkzeug.security import check_password_hash, generate_password_hash
import wrapt
import aprsd
from aprsd import aprsd as aprsd_main
from aprsd import cli_helper, client
from aprsd import config as aprsd_config
from aprsd import messaging, packets, stats, threads, utils
@ -44,6 +43,25 @@ msg_queues = {
}
def signal_handler(sig, frame):
click.echo("signal_handler: called")
LOG.info(
f"Ctrl+C, Sending all threads({len(threads.APRSDThreadList())}) exit! "
f"Can take up to 10 seconds {datetime.datetime.now()}",
)
threads.APRSDThreadList().stop_all()
if "subprocess" not in str(frame):
time.sleep(1.5)
# messaging.MsgTrack().save()
# packets.WatchList().save()
# packets.SeenList().save()
LOG.info(stats.APRSDStats())
LOG.info("Telling flask to bail.")
signal.signal(signal.SIGTERM, sys.exit(0))
sys.exit(0)
class SentMessages(objectstore.ObjectStoreMixin):
_instance = None
lock = threading.Lock()
@ -123,11 +141,15 @@ def verify_password(username, password):
class WebChatRXThread(rx.APRSDRXThread):
"""Class that connects to aprsis and waits for messages."""
"""Class that connects to aprsis/kiss and waits for messages."""
def connected(self, connected=True):
self.connected = connected
def stop(self):
self.thread_stop = True
client.factory.create().client.stop()
def loop(self):
# setup the consumer of messages and block until a messages
msg = None
@ -154,15 +176,10 @@ class WebChatRXThread(rx.APRSDRXThread):
time.sleep(2)
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
# Do a partial here because the consumer signature doesn't allow
# For kwargs to be passed in to the consumer func we declare
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
# This call blocks until thread stop() is called.
self._client.client.consumer(
self.process_packet, raw=False, blocking=False,
)
@ -177,12 +194,16 @@ class WebChatRXThread(rx.APRSDRXThread):
# This will cause a reconnect, next time client.get_client()
# is called
self._client.reset()
# Continue to loop
time.sleep(1)
return True
return True
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
# packet = self._client.decode_packet(*args, **kwargs)
if "packet" in kwargs:
packet = kwargs["packet"]
else:
packet = self._client.decode_packet(*args, **kwargs)
LOG.debug(f"GOT Packet {packet}")
self.msg_queues["rx"].put(packet)
@ -404,8 +425,9 @@ class SendMessageNamespace(Namespace):
msgs = SentMessages()
msgs.add(msg)
msgs.set_status(msg.id, "Sending")
obj = msgs.get(self.msg.id)
socketio.emit(
"sent", SentMessages().get(self.msg.id),
"sent", obj,
namespace="/sendmsg",
)
msg.send()
@ -527,8 +549,8 @@ def webchat(ctx, flush, port):
quiet = ctx.obj["quiet"]
config = ctx.obj["config"]
signal.signal(signal.SIGINT, aprsd_main.signal_handler)
signal.signal(signal.SIGTERM, aprsd_main.signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if not quiet:
click.echo("Load config")
@ -567,28 +589,29 @@ def webchat(ctx, flush, port):
packets.WatchList(config=config)
packets.SeenList(config=config)
aprsd_main.flask_enabled = True
(socketio, app) = init_flask(config, loglevel, quiet)
rx_thread = WebChatRXThread(
msg_queues=msg_queues,
config=config,
)
LOG.warning("Start RX Thread")
LOG.info("Start RX Thread")
rx_thread.start()
tx_thread = WebChatTXThread(
msg_queues=msg_queues,
config=config,
socketio=socketio,
)
LOG.warning("Start TX Thread")
LOG.info("Start TX Thread")
tx_thread.start()
keepalive = threads.KeepAliveThread(config=config)
LOG.warning("Start KeepAliveThread")
LOG.info("Start KeepAliveThread")
keepalive.start()
LOG.warning("Start socketio.run()")
LOG.info("Start socketio.run()")
socketio.run(
app,
host=config["aprsd"]["web"]["host"],
port=port,
)
LOG.info("WebChat exiting!!!! Bye.")

View File

@ -10,3 +10,5 @@ pip-tools
pytest
pytest-cov
gray
pip==22.0.4
pip-tools==5.4.0

View File

@ -1,8 +1,8 @@
aioax25>=0.0.10
aprslib>=0.7.0
click
click-completion
flask
flask==2.1.2
werkzeug==2.1.2
flask-classful
flask-httpauth
imapclient
@ -25,3 +25,6 @@ rich
# For the list-plugins pypi.org search scraping
beautifulsoup4
wrapt
# kiss3 uses attrs
kiss3
attrs==22.1.0

View File

@ -3,6 +3,9 @@ minversion = 2.9.0
skipdist = True
skip_missing_interpreters = true
envlist = pre-commit,pep8,py{36,37,38,39}
#requires = tox-pipenv
# pip==22.0.4
# pip-tools==5.4.0
# Activate isolated build environment. tox will use a virtual environment
# to build a source distribution from the source tree. For build tools and