1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-03-11 02:19:38 -04:00

Added SerialKISSDriver

This refactors the tcp kiss driver into the base class KISSDdriver
and implements the serial kiss driver.
This commit is contained in:
Walter Boring 2025-11-26 13:49:21 -05:00
parent 8f6f8cd406
commit a8822672d8
5 changed files with 420 additions and 139 deletions

View File

@ -113,6 +113,7 @@ You can see the [available plugins/extensions on pypi here:](https://pypi.org/se
- [aprsd-admin-extension](https://github.com/hemna/aprsd-admin-extension) - Web Administration page for APRSD
- [aprsd-webchat-extension](https://github.com/hemna/aprsd-webchat-extension) - Web page for APRS Messaging
- [aprsd-rich-cli-extension](https://github.com/hemna/aprsd-rich-cli-extension) - Textual rich CLI versions of aprsd commands
- [aprsd-irc-extension](https://github.com/hemna/aprsd-irc-extension) - an IRC like server command for APRS
### APRSD Overview Diagram
@ -299,6 +300,8 @@ file
│ 📂 aprsd-admin-extension │ Administration extension for the Ham radio APRSD Server │ 1.0.1 │ 2025-01-06T21:57:24 │ Yes │
│ 📂 aprsd-irc-extension │ An Extension to Ham radio APRSD Daemon to act like an irc server │ 0.0.5 │ 2024-04-09T11:28:47 │ No │
│ │ for APRS │ │ │ │
│ 📂 aprsd-rich-cli-extens │ APRSD Extension to create textual rich CLI versions of aprsd │ 0.1.1 │ 2024-12-01T00:00:00 │ No │
│ ion │ commands │ │ │ │
│ 📂 aprsd-webchat-extens │ Web page for APRS Messaging │ 1.2.3 │ 2024-10-01T00:00:00 │ No │
│ ion │ │ │ │ │
└──────────────────────────┴─────────────────────────────────────────────────────────────────────┴─────────┴─────────────────────┴────────────┘

View File

@ -0,0 +1,181 @@
"""
APRSD KISS Client Driver Base classusing native KISS implementation.
This module provides a KISS client driver for APRSD using the new
non-asyncio KISSInterface implementation.
"""
import datetime
import logging
from typing import Any, Callable, Dict
import aprslib
from kiss import util as kissutil
from aprsd.packets import core
from aprsd.utils import trace
LOG = logging.getLogger('APRSD')
class KISSDriver(metaclass=trace.TraceWrapperMetaclass):
"""APRSD KISS Client Driver Base class."""
packets_received = 0
packets_sent = 0
last_packet_sent = None
last_packet_received = None
keepalive = None
def __init__(self):
"""Initialize the KISS client.
Args:
client_name: Name of the client instance
"""
super().__init__()
self._connected = False
self.keepalive = datetime.datetime.now()
def login_success(self) -> bool:
"""There is no login for KISS."""
if not self._connected:
return False
return True
def login_failure(self) -> str:
"""There is no login for KISS."""
return 'Login successful'
def set_filter(self, filter_text: str):
"""Set packet filter (not implemented for KISS).
Args:
filter_text: Filter specification (ignored for KISS)
"""
# KISS doesn't support filtering at the TNC level
pass
@property
def filter(self) -> str:
"""Get packet filter (not implemented for KISS).
Returns:
str: Empty string (not implemented for KISS)
"""
return ''
@property
def is_alive(self) -> bool:
"""Check if the client is connected.
Returns:
bool: True if connected to KISS TNC, False otherwise
"""
return self._connected
def _handle_fend(self, buffer: bytes, strip_df_start: bool = True) -> bytes:
"""
Handle FEND (end of frame) encountered in a KISS data stream.
:param buffer: the buffer containing the frame
:param strip_df_start: remove leading null byte (DATA_FRAME opcode)
:return: the bytes of the frame without escape characters or frame
end markers (FEND)
"""
frame = kissutil.recover_special_codes(kissutil.strip_nmea(bytes(buffer)))
if strip_df_start:
frame = kissutil.strip_df_start(frame)
LOG.warning(f'handle_fend {" ".join(f"{b:02X}" for b in bytes(frame))}')
return bytes(frame)
def fix_raw_frame(self, raw_frame: bytes) -> bytes:
"""Fix the raw frame by recalculating the FCS."""
ax25_data = raw_frame[2:-1] # Remove KISS markers
return self._handle_fend(ax25_data)
def decode_packet(self, *args, **kwargs) -> core.Packet:
"""Decode a packet from an AX.25 frame.
Args:
frame: Received AX.25 frame
"""
frame = kwargs.get('frame')
if not frame:
LOG.warning('No frame received to decode?!?!')
return None
try:
aprslib_frame = aprslib.parse(str(frame))
packet = core.factory(aprslib_frame)
if isinstance(packet, core.ThirdPartyPacket):
return packet.subpacket
else:
return packet
except Exception as e:
LOG.error(f'Error decoding packet: {e}')
return None
def consumer(self, callback: Callable, raw: bool = False):
"""Start consuming frames with the given callback.
Args:
callback: Function to call with received packets
Raises:
Exception: If not connected to KISS TNC
"""
# Ensure connection
if not self._connected:
return
# Read frame
frame = self.read_frame()
if frame:
LOG.info(f'GOT FRAME: {frame} calling {callback}')
kwargs = {
'frame': frame,
}
callback(**kwargs)
def read_frame(self):
"""Read a frame from the KISS interface.
This is implemented in the subclass.
"""
raise NotImplementedError('read_frame is not implemented for KISS')
def stats(self, serializable: bool = False) -> Dict[str, Any]:
"""Get client statistics.
Returns:
Dict containing client statistics
"""
if serializable:
keepalive = self.keepalive.isoformat()
if self.last_packet_sent:
last_packet_sent = self.last_packet_sent.isoformat()
else:
last_packet_sent = 'None'
if self.last_packet_received:
last_packet_received = self.last_packet_received.isoformat()
else:
last_packet_received = 'None'
else:
keepalive = self.keepalive
last_packet_sent = self.last_packet_sent
last_packet_received = self.last_packet_received
stats = {
'client': self.__class__.__name__,
'transport': self.transport,
'connected': self._connected,
'path': self.path,
'packets_sent': self.packets_sent,
'packets_received': self.packets_received,
'last_packet_sent': last_packet_sent,
'last_packet_received': last_packet_received,
'connection_keepalive': keepalive,
}
return stats

View File

@ -0,0 +1,226 @@
"""
APRSD KISS Client Driver using native KISS implementation.
This module provides a KISS client driver for APRSD using the new
non-asyncio KISSInterface implementation.
"""
import datetime
import logging
import select
from typing import Any, Dict
import serial
from ax253 import frame as ax25frame
from kiss import constants as kiss_constants
from kiss import util as kissutil
from kiss.kiss import Command
from oslo_config import cfg
from aprsd import ( # noqa
client,
conf, # noqa
exception,
)
from aprsd.client.drivers.kiss_common import KISSDriver
from aprsd.packets import core
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
class SerialKISSDriver(KISSDriver):
"""APRSD client driver for Serial KISS connections."""
_instance = None
def __new__(cls, *args, **kwargs):
"""This magic turns this into a singleton."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize the KISS client.
Args:
client_name: Name of the client instance
"""
super().__init__()
self._connected = False
self.keepalive = datetime.datetime.now()
# This is initialized in setup_connection()
self.socket = None
@property
def transport(self) -> str:
return client.TRANSPORT_SERIALKISS
@staticmethod
def is_enabled() -> bool:
"""Check if KISS is enabled in configuration.
Returns:
bool: True if either Serial is enabled
"""
return CONF.kiss_serial.enabled
@staticmethod
def is_configured():
# Ensure that the config vars are correctly set
if SerialKISSDriver.is_enabled():
if not CONF.kiss_serial.device:
LOG.error('KISS Serial enabled, but no device is set.')
raise exception.MissingConfigOptionException(
'kiss_serial.device is not set.',
)
return True
return False
def close(self):
"""Close the connection."""
self._connected = False
def setup_connection(self):
"""Set up the KISS interface."""
if not self.is_enabled():
LOG.error('KISS is not enabled in configuration')
return
if self._connected:
LOG.warning('KISS interface already connected')
return
try:
# Configure for TCP KISS
if self.is_enabled():
LOG.info(
f'Serial KISS Connection to {CONF.kiss_serial.device}:{CONF.kiss_serial.baudrate}'
)
self.path = CONF.kiss_serial.path
self.connect()
if self._connected:
LOG.info('KISS interface initialized')
else:
LOG.error('Failed to connect to KISS interface')
except Exception as ex:
LOG.error('Failed to initialize KISS interface')
LOG.exception(ex)
self._connected = False
def connect(self):
"""Connect to the KISS interface."""
if not self.is_enabled():
LOG.error('KISS is not enabled in configuration')
return
if self._connected:
LOG.warning('KISS interface already connected')
return
try:
self.socket = serial.Serial(
CONF.kiss_serial.device, CONF.kiss_serial.baudrate
)
self.socket.open()
self._connected = True
except serial.SerialException as e:
LOG.error(f'Failed to connect to KISS interface: {e}')
self._connected = False
return False
except Exception as ex:
LOG.error('Failed to connect to KISS interface')
LOG.exception(ex)
self._connected = False
def read_frame(self):
"""Read a frame from the KISS interface."""
if not self.socket:
return None
if not self._connected:
return None
while self._connected:
try:
readable, _, _ = select.select(
[self.socket],
[],
[],
self.select_timeout,
)
if not readable:
continue
except Exception as e:
# No need to log if we are not running.
# this happens when the client is stopped/closed.
LOG.error(f'Error in read loop: {e}')
self._connected = False
break
try:
short_buf = self.socket.read(1024)
if not short_buf:
continue
raw_frame = self.fix_raw_frame(short_buf)
return ax25frame.Frame.from_bytes(raw_frame)
except Exception as e:
LOG.error(f'Error in read loop: {e}')
self._connected = False
break
def send(self, packet: core.Packet):
"""Send an APRS packet.
Args:
packet: APRS packet to send (Packet or Message object)
Raises:
Exception: If not connected or send fails
"""
if not self.socket:
raise Exception('KISS interface not initialized')
payload = None
path = self.path
packet.prepare()
payload = packet.payload.encode('US-ASCII')
if packet.path:
path = packet.path
LOG.debug(
f"KISS Send '{payload}' TO '{packet.to_call}' From "
f"'{packet.from_call}' with PATH '{path}'",
)
frame = ax25frame.Frame.ui(
destination='APZ100',
# destination=packet.to_call,
source=packet.from_call,
path=path,
info=payload,
)
# now escape the frame special characters
frame_escaped = kissutil.escape_special_codes(bytes(frame))
# and finally wrap the frame in KISS protocol
command = Command.DATA_FRAME
frame_kiss = b''.join(
[kiss_constants.FEND, command.value, frame_escaped, kiss_constants.FEND]
)
self.socket.send(frame_kiss)
# Update last packet sent time
self.last_packet_sent = datetime.datetime.now()
# Increment packets sent counter
self.packets_sent += 1
def stats(self, serializable: bool = False) -> Dict[str, Any]:
"""Get client statistics.
Returns:
Dict containing client statistics
"""
stats = super().stats(serializable=serializable)
stats['device'] = CONF.kiss_serial.device
stats['baudrate'] = CONF.kiss_serial.baudrate
return stats

View File

@ -9,7 +9,7 @@ import datetime
import logging
import select
import socket
from typing import Any, Callable, Dict
from typing import Any, Dict
import aprslib
from ax253 import frame as ax25frame
@ -23,41 +23,20 @@ from aprsd import ( # noqa
conf, # noqa
exception,
)
from aprsd.client.drivers.kiss_common import KISSDriver
from aprsd.packets import core
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
def handle_fend(buffer: bytes, strip_df_start: bool = True) -> bytes:
"""
Handle FEND (end of frame) encountered in a KISS data stream.
:param buffer: the buffer containing the frame
:param strip_df_start: remove leading null byte (DATA_FRAME opcode)
:return: the bytes of the frame without escape characters or frame
end markers (FEND)
"""
frame = kissutil.recover_special_codes(kissutil.strip_nmea(bytes(buffer)))
if strip_df_start:
frame = kissutil.strip_df_start(frame)
LOG.warning(f'handle_fend {" ".join(f"{b:02X}" for b in bytes(frame))}')
return bytes(frame)
class TCPKISSDriver(metaclass=trace.TraceWrapperMetaclass):
class TCPKISSDriver(KISSDriver):
# class TCPKISSDriver:
"""APRSD client driver for TCP KISS connections."""
_instance = None
# Class level attributes required by Client protocol
packets_received = 0
packets_sent = 0
last_packet_sent = None
last_packet_received = None
keepalive = None
client_name = None
socket = None
# timeout in seconds
@ -107,15 +86,6 @@ class TCPKISSDriver(metaclass=trace.TraceWrapperMetaclass):
return True
return False
@property
def is_alive(self) -> bool:
"""Check if the client is connected.
Returns:
bool: True if connected to KISS TNC, False otherwise
"""
return self._connected
def close(self):
"""Close the connection."""
self._connected = False
@ -200,112 +170,15 @@ class TCPKISSDriver(metaclass=trace.TraceWrapperMetaclass):
LOG.exception(ex)
self._connected = False
def set_filter(self, filter_text: str):
"""Set packet filter (not implemented for KISS).
Args:
filter_text: Filter specification (ignored for KISS)
"""
# KISS doesn't support filtering at the TNC level
pass
@property
def filter(self) -> str:
"""Get packet filter (not implemented for KISS).
Returns:
str: Empty string (not implemented for KISS)
"""
return ''
def login_success(self) -> bool:
"""There is no login for KISS."""
if not self._connected:
return False
return True
def login_failure(self) -> str:
"""There is no login for KISS."""
return 'Login successful'
def consumer(self, callback: Callable, raw: bool = False):
"""Start consuming frames with the given callback.
Args:
callback: Function to call with received packets
Raises:
Exception: If not connected to KISS TNC
"""
# Ensure connection
if not self._connected:
return
# Read frame
frame = self.read_frame()
if frame:
LOG.info(f'GOT FRAME: {frame} calling {callback}')
kwargs = {
'frame': frame,
}
callback(**kwargs)
def decode_packet(self, *args, **kwargs) -> core.Packet:
"""Decode a packet from an AX.25 frame.
Args:
frame: Received AX.25 frame
"""
frame = kwargs.get('frame')
if not frame:
LOG.warning('No frame received to decode?!?!')
return None
try:
aprslib_frame = aprslib.parse(str(frame))
packet = core.factory(aprslib_frame)
if isinstance(packet, core.ThirdPartyPacket):
return packet.subpacket
else:
return packet
except Exception as e:
LOG.error(f'Error decoding packet: {e}')
return None
def stats(self, serializable: bool = False) -> Dict[str, Any]:
"""Get client statistics.
Returns:
Dict containing client statistics
"""
if serializable:
keepalive = self.keepalive.isoformat()
if self.last_packet_sent:
last_packet_sent = self.last_packet_sent.isoformat()
else:
last_packet_sent = 'None'
if self.last_packet_received:
last_packet_received = self.last_packet_received.isoformat()
else:
last_packet_received = 'None'
else:
keepalive = self.keepalive
last_packet_sent = self.last_packet_sent
last_packet_received = self.last_packet_received
stats = {
'client': self.__class__.__name__,
'transport': self.transport,
'connected': self._connected,
'path': self.path,
'packets_sent': self.packets_sent,
'packets_received': self.packets_received,
'last_packet_sent': last_packet_sent,
'last_packet_received': last_packet_received,
'connection_keepalive': keepalive,
'host': CONF.kiss_tcp.host,
'port': CONF.kiss_tcp.port,
}
stats = super().stats(serializable=serializable)
stats['host'] = CONF.kiss_tcp.host
stats['port'] = CONF.kiss_tcp.port
return stats
def connect(self) -> bool:
@ -348,11 +221,6 @@ class TCPKISSDriver(metaclass=trace.TraceWrapperMetaclass):
self._connected = False
return False
def fix_raw_frame(self, raw_frame: bytes) -> bytes:
"""Fix the raw frame by recalculating the FCS."""
ax25_data = raw_frame[2:-1] # Remove KISS markers
return handle_fend(ax25_data)
def read_frame(self, blocking=False):
"""
Generator for complete lines, received from the server

View File

@ -143,7 +143,10 @@ class TraceWrapperMetaclass(type):
def __new__(cls, classname, bases, class_dict):
new_class_dict = {}
for attribute_name, attribute in class_dict.items():
if isinstance(attribute, types.FunctionType):
if attribute_name == '__new__' or attribute_name == '__init__':
# Don't trace the __new__ or __init__ methods
pass
elif isinstance(attribute, types.FunctionType):
# replace it with a wrapped version
attribute = functools.update_wrapper(
trace_method(attribute),