1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-01-14 01:25:32 -05:00

Added unit tests for packets.

Also did some code cleanup.
This commit is contained in:
Walter Boring 2026-01-05 16:51:54 -05:00
parent f9979fa3da
commit 1da92e52ef
48 changed files with 1791 additions and 445 deletions

View File

@ -12,8 +12,7 @@
from importlib.metadata import PackageNotFoundError, version
try:
__version__ = version("aprsd")
__version__ = version('aprsd')
except PackageNotFoundError:
pass

View File

@ -3,12 +3,12 @@ import click.shell_completion
from aprsd.main import cli
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@cli.command()
@click.argument(
"shell", type=click.Choice(list(click.shell_completion._available_shells))
'shell', type=click.Choice(list(click.shell_completion._available_shells))
)
def completion(shell):
"""Show the shell completion code"""
@ -16,10 +16,10 @@ def completion(shell):
cls = click.shell_completion.get_completion_class(shell)
prog_name = _detect_program_name()
complete_var = f"_{prog_name}_COMPLETE".replace("-", "_").upper()
complete_var = f'_{prog_name}_COMPLETE'.replace('-', '_').upper()
print(cls(cli, {}, prog_name, complete_var).source())
print(
"# Add the following line to your shell configuration file to have aprsd command line completion"
'# Add the following line to your shell configuration file to have aprsd command line completion'
)
print("# but remove the leading '#' character.")
print(f'# eval "$(aprsd completion {shell})"')

View File

@ -25,23 +25,23 @@ from aprsd.threads import stats as stats_threads
# setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
console = Console()
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.option(
"--timeout",
'--timeout',
show_default=True,
default=3,
help="How long to wait for healtcheck url to come back",
help='How long to wait for healtcheck url to come back',
)
@click.pass_context
@cli_helper.process_standard_options
def healthcheck(ctx, timeout):
"""Check the health of the running aprsd server."""
ver_str = f"APRSD HealthCheck version: {aprsd.__version__}"
ver_str = f'APRSD HealthCheck version: {aprsd.__version__}'
console.log(ver_str)
with console.status(ver_str):
@ -56,33 +56,33 @@ def healthcheck(ctx, timeout):
else:
now = datetime.datetime.now()
if not stats:
console.log("No stats from aprsd")
console.log('No stats from aprsd')
sys.exit(-1)
email_stats = stats.get("EmailStats")
email_stats = stats.get('EmailStats')
if email_stats:
email_thread_last_update = email_stats["last_check_time"]
email_thread_last_update = email_stats['last_check_time']
if email_thread_last_update != "never":
if email_thread_last_update != 'never':
d = now - email_thread_last_update
max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 30}
max_timeout = {'hours': 0.0, 'minutes': 5, 'seconds': 30}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
console.log(f"Email thread is very old! {d}")
console.log(f'Email thread is very old! {d}')
sys.exit(-1)
client_stats = stats.get("APRSClientStats")
client_stats = stats.get('APRSClientStats')
if not client_stats:
console.log("No APRSClientStats")
console.log('No APRSClientStats')
sys.exit(-1)
else:
aprsis_last_update = client_stats["connection_keepalive"]
aprsis_last_update = client_stats['connection_keepalive']
d = now - aprsis_last_update
max_timeout = {"hours": 0.0, "minutes": 5, "seconds": 0}
max_timeout = {'hours': 0.0, 'minutes': 5, 'seconds': 0}
max_delta = datetime.timedelta(**max_timeout)
if d > max_delta:
LOG.error(f"APRS-IS last update is very old! {d}")
LOG.error(f'APRS-IS last update is very old! {d}')
sys.exit(-1)
console.log("OK")
console.log('OK')
sys.exit(0)

View File

@ -2,7 +2,6 @@ from oslo_config import cfg
from aprsd.conf import client, common, log, plugin_common
CONF = cfg.CONF
log.register_opts(CONF)
@ -37,19 +36,19 @@ def conf_to_dict():
def _sanitize(opt, value):
"""Obfuscate values of options declared secret."""
return value if not opt.secret else "*" * 4
return value if not opt.secret else '*' * 4
for opt_name in sorted(CONF._opts):
opt = CONF._get_opt_info(opt_name)["opt"]
opt = CONF._get_opt_info(opt_name)['opt']
val = str(_sanitize(opt, getattr(CONF, opt_name)))
entries[str(opt)] = val
for group_name in list(CONF._groups):
group_attr = CONF.GroupAttr(CONF, CONF._get_group(group_name))
for opt_name in sorted(CONF._groups[group_name]._opts):
opt = CONF._get_opt_info(opt_name, group_name)["opt"]
opt = CONF._get_opt_info(opt_name, group_name)['opt']
val = str(_sanitize(opt, getattr(group_attr, opt_name)))
gname_opt_name = f"{group_name}.{opt_name}"
gname_opt_name = f'{group_name}.{opt_name}'
entries[gname_opt_name] = val
return entries

View File

@ -4,107 +4,107 @@ The options for log setup
from oslo_config import cfg
DEFAULT_LOGIN = "NOCALL"
DEFAULT_LOGIN = 'NOCALL'
aprs_group = cfg.OptGroup(
name="aprs_network",
title="APRS-IS Network settings",
name='aprs_network',
title='APRS-IS Network settings',
)
kiss_serial_group = cfg.OptGroup(
name="kiss_serial",
title="KISS Serial device connection",
name='kiss_serial',
title='KISS Serial device connection',
)
kiss_tcp_group = cfg.OptGroup(
name="kiss_tcp",
title="KISS TCP/IP Device connection",
name='kiss_tcp',
title='KISS TCP/IP Device connection',
)
fake_client_group = cfg.OptGroup(
name="fake_client",
title="Fake Client settings",
name='fake_client',
title='Fake Client settings',
)
aprs_opts = [
cfg.BoolOpt(
"enabled",
'enabled',
default=True,
help="Set enabled to False if there is no internet connectivity."
"This is useful for a direwolf KISS aprs connection only.",
help='Set enabled to False if there is no internet connectivity.'
'This is useful for a direwolf KISS aprs connection only.',
),
cfg.StrOpt(
"login",
'login',
default=DEFAULT_LOGIN,
help="APRS Username",
help='APRS Username',
),
cfg.StrOpt(
"password",
'password',
secret=True,
help="APRS Password "
"Get the passcode for your callsign here: "
"https://apps.magicbug.co.uk/passcode",
help='APRS Password '
'Get the passcode for your callsign here: '
'https://apps.magicbug.co.uk/passcode',
),
cfg.HostAddressOpt(
"host",
default="noam.aprs2.net",
help="The APRS-IS hostname",
'host',
default='noam.aprs2.net',
help='The APRS-IS hostname',
),
cfg.PortOpt(
"port",
'port',
default=14580,
help="APRS-IS port",
help='APRS-IS port',
),
]
kiss_serial_opts = [
cfg.BoolOpt(
"enabled",
'enabled',
default=False,
help="Enable Serial KISS interface connection.",
help='Enable Serial KISS interface connection.',
),
cfg.StrOpt(
"device",
help="Serial Device file to use. /dev/ttyS0",
'device',
help='Serial Device file to use. /dev/ttyS0',
),
cfg.IntOpt(
"baudrate",
'baudrate',
default=9600,
help="The Serial device baud rate for communication",
help='The Serial device baud rate for communication',
),
cfg.ListOpt(
"path",
default=["WIDE1-1", "WIDE2-1"],
help="The APRS path to use for wide area coverage.",
'path',
default=['WIDE1-1', 'WIDE2-1'],
help='The APRS path to use for wide area coverage.',
),
]
kiss_tcp_opts = [
cfg.BoolOpt(
"enabled",
'enabled',
default=False,
help="Enable Serial KISS interface connection.",
help='Enable Serial KISS interface connection.',
),
cfg.HostAddressOpt(
"host",
help="The KISS TCP Host to connect to.",
'host',
help='The KISS TCP Host to connect to.',
),
cfg.PortOpt(
"port",
'port',
default=8001,
help="The KISS TCP/IP network port",
help='The KISS TCP/IP network port',
),
cfg.ListOpt(
"path",
default=["WIDE1-1", "WIDE2-1"],
help="The APRS path to use for wide area coverage.",
'path',
default=['WIDE1-1', 'WIDE2-1'],
help='The APRS path to use for wide area coverage.',
),
]
fake_client_opts = [
cfg.BoolOpt(
"enabled",
'enabled',
default=False,
help="Enable fake client connection.",
help='Enable fake client connection.',
),
]

View File

@ -31,7 +31,7 @@ import importlib
import os
import pkgutil
LIST_OPTS_FUNC_NAME = "list_opts"
LIST_OPTS_FUNC_NAME = 'list_opts'
def _tupleize(dct):
@ -51,7 +51,7 @@ def _list_module_names():
module_names = []
package_path = os.path.dirname(os.path.abspath(__file__))
for _, modname, ispkg in pkgutil.iter_modules(path=[package_path]):
if modname == "opts" or ispkg:
if modname == 'opts' or ispkg:
continue
else:
module_names.append(modname)
@ -61,11 +61,11 @@ def _list_module_names():
def _import_modules(module_names):
imported_modules = []
for modname in module_names:
mod = importlib.import_module("aprsd.conf." + modname)
mod = importlib.import_module('aprsd.conf.' + modname)
if not hasattr(mod, LIST_OPTS_FUNC_NAME):
msg = (
"The module 'aprsd.conf.%s' should have a '%s' "
"function which returns the config options."
'function which returns the config options.'
% (modname, LIST_OPTS_FUNC_NAME)
)
raise Exception(msg)

View File

@ -1,55 +1,55 @@
from oslo_config import cfg
aprsfi_group = cfg.OptGroup(
name="aprs_fi",
title="APRS.FI website settings",
name='aprs_fi',
title='APRS.FI website settings',
)
query_group = cfg.OptGroup(
name="query_plugin",
title="Options for the Query Plugin",
name='query_plugin',
title='Options for the Query Plugin',
)
avwx_group = cfg.OptGroup(
name="avwx_plugin",
title="Options for the AVWXWeatherPlugin",
name='avwx_plugin',
title='Options for the AVWXWeatherPlugin',
)
owm_wx_group = cfg.OptGroup(
name="owm_weather_plugin",
title="Options for the OWMWeatherPlugin",
name='owm_weather_plugin',
title='Options for the OWMWeatherPlugin',
)
aprsfi_opts = [
cfg.StrOpt(
"apiKey",
help="Get the apiKey from your aprs.fi account here:" "http://aprs.fi/account",
'apiKey',
help='Get the apiKey from your aprs.fi account here:http://aprs.fi/account',
),
]
owm_wx_opts = [
cfg.StrOpt(
"apiKey",
'apiKey',
help="OWMWeatherPlugin api key to OpenWeatherMap's API."
"This plugin uses the openweathermap API to fetch"
"location and weather information."
"To use this plugin you need to get an openweathermap"
"account and apikey."
"https://home.openweathermap.org/api_keys",
'This plugin uses the openweathermap API to fetch'
'location and weather information.'
'To use this plugin you need to get an openweathermap'
'account and apikey.'
'https://home.openweathermap.org/api_keys',
),
]
avwx_opts = [
cfg.StrOpt(
"apiKey",
help="avwx-api is an opensource project that has"
"a hosted service here: https://avwx.rest/"
"You can launch your own avwx-api in a container"
"by cloning the githug repo here:"
"https://github.com/avwx-rest/AVWX-API",
'apiKey',
help='avwx-api is an opensource project that has'
'a hosted service here: https://avwx.rest/'
'You can launch your own avwx-api in a container'
'by cloning the githug repo here:'
'https://github.com/avwx-rest/AVWX-API',
),
cfg.StrOpt(
"base_url",
default="https://avwx.rest",
help="The base url for the avwx API. If you are hosting your own"
"Here is where you change the url to point to yours.",
'base_url',
default='https://avwx.rest',
help='The base url for the avwx API. If you are hosting your own'
'Here is where you change the url to point to yours.',
),
]

View File

@ -4,8 +4,7 @@ from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
@runtime_checkable
@ -36,12 +35,12 @@ class PacketCollector:
def register(self, monitor: Callable) -> None:
if not isinstance(monitor, PacketMonitor):
raise TypeError(f"Monitor {monitor} is not a PacketMonitor")
raise TypeError(f'Monitor {monitor} is not a PacketMonitor')
self.monitors.append(monitor)
def unregister(self, monitor: Callable) -> None:
if not isinstance(monitor, PacketMonitor):
raise TypeError(f"Monitor {monitor} is not a PacketMonitor")
raise TypeError(f'Monitor {monitor} is not a PacketMonitor')
self.monitors.remove(monitor)
def rx(self, packet: type[core.Packet]) -> None:
@ -50,7 +49,7 @@ class PacketCollector:
try:
cls.rx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (rx): {e}")
LOG.error(f'Error in monitor {name} (rx): {e}')
def tx(self, packet: type[core.Packet]) -> None:
for name in self.monitors:
@ -58,7 +57,7 @@ class PacketCollector:
try:
cls.tx(packet)
except Exception as e:
LOG.error(f"Error in monitor {name} (tx): {e}")
LOG.error(f'Error in monitor {name} (tx): {e}')
def flush(self):
"""Call flush on the objects. This is used to flush out any data."""
@ -67,7 +66,7 @@ class PacketCollector:
try:
cls.flush()
except Exception as e:
LOG.error(f"Error in monitor {name} (flush): {e}")
LOG.error(f'Error in monitor {name} (flush): {e}')
def load(self):
"""Call load on the objects. This is used to load any data."""
@ -76,4 +75,4 @@ class PacketCollector:
try:
cls.load()
except Exception as e:
LOG.error(f"Error in monitor {name} (load): {e}")
LOG.error(f'Error in monitor {name} (load): {e}')

View File

@ -19,6 +19,7 @@ class DupePacketFilter:
If the packet has been processed already within the allowed
timeframe, then it's a dupe.
"""
def __init__(self):
self.pl = packets.PacketList()

View File

@ -7,9 +7,8 @@ from oslo_config import cfg
from aprsd.packets import core
from aprsd.utils import objectstore
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class SeenList(objectstore.ObjectStoreMixin):
@ -41,11 +40,11 @@ class SeenList(objectstore.ObjectStoreMixin):
return
if callsign not in self.data:
self.data[callsign] = {
"last": None,
"count": 0,
'last': None,
'count': 0,
}
self.data[callsign]["last"] = datetime.datetime.now()
self.data[callsign]["count"] += 1
self.data[callsign]['last'] = datetime.datetime.now()
self.data[callsign]['count'] += 1
def tx(self, packet: type[core.Packet]):
"""We don't care about TX packets."""

View File

@ -7,9 +7,8 @@ from oslo_config import cfg
from aprsd.packets import core
from aprsd.utils import objectstore
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class PacketTrack(objectstore.ObjectStoreMixin):
@ -62,18 +61,18 @@ class PacketTrack(objectstore.ObjectStoreMixin):
def stats(self, serializable=False):
with self.lock:
stats = {
"total_tracked": self.total_tracked,
'total_tracked': self.total_tracked,
}
pkts = {}
for key in self.data:
last_send_time = self.data[key].last_send_time
pkts[key] = {
"last_send_time": last_send_time,
"send_count": self.data[key].send_count,
"retry_count": self.data[key].retry_count,
"message": self.data[key].raw,
'last_send_time': last_send_time,
'send_count': self.data[key].send_count,
'retry_count': self.data[key].retry_count,
'message': self.data[key].raw,
}
stats["packets"] = pkts
stats['packets'] = pkts
return stats
def rx(self, packet: type[core.Packet]) -> None:
@ -82,7 +81,7 @@ class PacketTrack(objectstore.ObjectStoreMixin):
self._remove(packet.msgNo)
elif isinstance(packet, core.RejectPacket):
self._remove(packet.msgNo)
elif hasattr(packet, "ackMsgNo"):
elif hasattr(packet, 'ackMsgNo'):
# Got a piggyback ack, so remove the original message
self._remove(packet.ackMsgNo)

View File

@ -66,8 +66,8 @@ def fetch_openweathermap(api_key, lat, lon, units='metric', exclude=None):
exclude = 'minutely,hourly,daily,alerts'
try:
url = (
"https://api.openweathermap.org/data/3.0/onecall?"
"lat={}&lon={}&appid={}&units={}&exclude={}".format(
'https://api.openweathermap.org/data/3.0/onecall?'
'lat={}&lon={}&appid={}&units={}&exclude={}'.format(
lat,
lon,
api_key,

View File

@ -4,20 +4,19 @@ import time
from aprsd import plugin
from aprsd.utils import trace
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class PingPlugin(plugin.APRSDRegexCommandPluginBase):
"""Ping."""
command_regex = r"^([p]|[p]\s|ping)"
command_name = "ping"
short_description = "reply with a Pong!"
command_regex = r'^([p]|[p]\s|ping)'
command_name = 'ping'
short_description = 'reply with a Pong!'
@trace.trace
def process(self, packet):
LOG.info("PingPlugin")
LOG.info('PingPlugin')
# fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
@ -26,6 +25,6 @@ class PingPlugin(plugin.APRSDRegexCommandPluginBase):
m = stm.tm_min
s = stm.tm_sec
reply = (
"Pong! " + str(h).zfill(2) + ":" + str(m).zfill(2) + ":" + str(s).zfill(2)
'Pong! ' + str(h).zfill(2) + ':' + str(m).zfill(2) + ':' + str(s).zfill(2)
)
return reply.rstrip()

View File

@ -1,25 +1,24 @@
import logging
import re
from oslo_config import cfg
import pytz
from oslo_config import cfg
from tzlocal import get_localzone
from aprsd import packets, plugin, plugin_utils
from aprsd.utils import fuzzy, trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class TimePlugin(plugin.APRSDRegexCommandPluginBase):
"""Time command."""
# Look for t or t<space> or T<space> or time
command_regex = r"^([t]|[t]\s|time)"
command_name = "time"
short_description = "What is the current local time."
command_regex = r'^([t]|[t]\s|time)'
command_name = 'time'
short_description = 'What is the current local time.'
def _get_local_tz(self):
lz = get_localzone()
@ -33,12 +32,12 @@ class TimePlugin(plugin.APRSDRegexCommandPluginBase):
gmt_t = pytz.utc.localize(utcnow)
local_t = gmt_t.astimezone(localzone)
local_short_str = local_t.strftime("%H:%M %Z")
local_hour = local_t.strftime("%H")
local_min = local_t.strftime("%M")
local_short_str = local_t.strftime('%H:%M %Z')
local_hour = local_t.strftime('%H')
local_min = local_t.strftime('%M')
cur_time = fuzzy(int(local_hour), int(local_min), 1)
reply = "{} ({})".format(
reply = '{} ({})'.format(
cur_time,
local_short_str,
)
@ -47,7 +46,7 @@ class TimePlugin(plugin.APRSDRegexCommandPluginBase):
@trace.trace
def process(self, packet: packets.Packet):
LOG.info("TIME COMMAND")
LOG.info('TIME COMMAND')
# So we can mock this in unit tests
localzone = self._get_local_tz()
return self.build_date_str(localzone)
@ -56,8 +55,8 @@ class TimePlugin(plugin.APRSDRegexCommandPluginBase):
class TimeOWMPlugin(TimePlugin, plugin.APRSFIKEYMixin):
"""OpenWeatherMap based timezone fetching."""
command_regex = r"^([t]|[t]\s|time)"
command_name = "time"
command_regex = r'^([t]|[t]\s|time)'
command_name = 'time'
short_description = "Current time of GPS beacon's timezone. Uses OpenWeatherMap"
def setup(self):
@ -70,7 +69,7 @@ class TimeOWMPlugin(TimePlugin, plugin.APRSFIKEYMixin):
# ack = packet.get("msgNo", "0")
# optional second argument is a callsign to search
a = re.search(r"^.*\s+(.*)", message)
a = re.search(r'^.*\s+(.*)', message)
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
@ -82,34 +81,34 @@ class TimeOWMPlugin(TimePlugin, plugin.APRSFIKEYMixin):
try:
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
except Exception as ex:
LOG.error(f"Failed to fetch aprs.fi data {ex}")
return "Failed to fetch location"
LOG.error(f'Failed to fetch aprs.fi data {ex}')
return 'Failed to fetch location'
LOG.debug(f"LocationPlugin: aprs_data = {aprs_data}")
if not len(aprs_data["entries"]):
LOG.debug(f'LocationPlugin: aprs_data = {aprs_data}')
if not len(aprs_data['entries']):
LOG.error("Didn't get any entries from aprs.fi")
return "Failed to fetch aprs.fi location"
return 'Failed to fetch aprs.fi location'
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
try:
self.config.exists(
["services", "openweathermap", "apiKey"],
['services', 'openweathermap', 'apiKey'],
)
except Exception as ex:
LOG.error(f"Failed to find config openweathermap:apiKey {ex}")
return "No openweathermap apiKey found"
LOG.error(f'Failed to find config openweathermap:apiKey {ex}')
return 'No openweathermap apiKey found'
api_key = self.config["services"]["openweathermap"]["apiKey"]
api_key = self.config['services']['openweathermap']['apiKey']
try:
results = plugin_utils.fetch_openweathermap(api_key, lat, lon)
except Exception as ex:
LOG.error(f"Couldn't fetch openweathermap api '{ex}'")
# default to UTC
localzone = pytz.timezone("UTC")
localzone = pytz.timezone('UTC')
else:
tzone = results["timezone"]
tzone = results['timezone']
localzone = pytz.timezone(tzone)
return self.build_date_str(localzone)

View File

@ -4,28 +4,27 @@ import aprsd
from aprsd import plugin
from aprsd.stats import collector
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class VersionPlugin(plugin.APRSDRegexCommandPluginBase):
"""Version of APRSD Plugin."""
command_regex = r"^([v]|[v]\s|version)"
command_name = "version"
short_description = "What is the APRSD Version"
command_regex = r'^([v]|[v]\s|version)'
command_name = 'version'
short_description = 'What is the APRSD Version'
# message_number:time combos so we don't resend the same email in
# five mins {int:int}
email_sent_dict = {}
def process(self, packet):
LOG.info("Version COMMAND")
LOG.info('Version COMMAND')
# fromcall = packet.get("from")
# message = packet.get("message_text", None)
# ack = packet.get("msgNo", "0")
s = collector.Collector().collect()
return "APRSD ver:{} uptime:{}".format(
return 'APRSD ver:{} uptime:{}'.format(
aprsd.__version__,
s["APRSDStats"]["uptime"],
s['APRSDStats']['uptime'],
)

View File

@ -4,7 +4,6 @@ from aprsd.packets import packet_list, seen_list, tracker, watch_list
from aprsd.stats import app, collector
from aprsd.threads import aprsd
# Create the collector and register all the objects
# that APRSD has that implement the stats protocol
stats_collector = collector.Collector()

View File

@ -7,7 +7,6 @@ import aprsd
from aprsd import utils
from aprsd.log import log as aprsd_log
CONF = cfg.CONF
@ -37,13 +36,13 @@ class APRSDStats:
if serializable:
uptime = str(uptime)
stats = {
"version": aprsd.__version__,
"uptime": uptime,
"callsign": CONF.callsign,
"memory_current": int(current),
"memory_current_str": utils.human_size(current),
"memory_peak": int(peak),
"memory_peak_str": utils.human_size(peak),
"loging_queue": qsize,
'version': aprsd.__version__,
'uptime': uptime,
'callsign': CONF.callsign,
'memory_current': int(current),
'memory_current_str': utils.human_size(current),
'memory_peak': int(peak),
'memory_peak_str': utils.human_size(peak),
'loging_queue': qsize,
}
return stats

View File

@ -13,7 +13,7 @@ from aprsd.threads import APRSDThread, APRSDThreadList
from aprsd.utils import keepalive_collector
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
LOGU = logger
@ -23,8 +23,8 @@ class KeepAliveThread(APRSDThread):
def __init__(self):
tracemalloc.start()
super().__init__("KeepAlive")
max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0}
super().__init__('KeepAlive')
max_timeout = {'hours': 0.0, 'minutes': 2, 'seconds': 0}
self.max_delta = datetime.timedelta(**max_timeout)
def loop(self):
@ -35,58 +35,58 @@ class KeepAliveThread(APRSDThread):
now = datetime.datetime.now()
if (
"APRSClientStats" in stats_json
and stats_json["APRSClientStats"].get("transport") == "aprsis"
'APRSClientStats' in stats_json
and stats_json['APRSClientStats'].get('transport') == 'aprsis'
):
if stats_json["APRSClientStats"].get("server_keepalive"):
if stats_json['APRSClientStats'].get('server_keepalive'):
last_msg_time = utils.strfdelta(
now - stats_json["APRSClientStats"]["server_keepalive"]
now - stats_json['APRSClientStats']['server_keepalive']
)
else:
last_msg_time = "N/A"
last_msg_time = 'N/A'
else:
last_msg_time = "N/A"
last_msg_time = 'N/A'
tracked_packets = stats_json["PacketTrack"]["total_tracked"]
tracked_packets = stats_json['PacketTrack']['total_tracked']
tx_msg = 0
rx_msg = 0
if "PacketList" in stats_json:
msg_packets = stats_json["PacketList"].get("MessagePacket")
if 'PacketList' in stats_json:
msg_packets = stats_json['PacketList'].get('MessagePacket')
if msg_packets:
tx_msg = msg_packets.get("tx", 0)
rx_msg = msg_packets.get("rx", 0)
tx_msg = msg_packets.get('tx', 0)
rx_msg = msg_packets.get('rx', 0)
keepalive = (
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}"
'{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} '
'Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}'
).format(
stats_json["APRSDStats"]["callsign"],
stats_json["APRSDStats"]["uptime"],
stats_json['APRSDStats']['callsign'],
stats_json['APRSDStats']['uptime'],
pl.total_rx(),
pl.total_tx(),
tracked_packets,
tx_msg,
rx_msg,
last_msg_time,
stats_json["APRSDStats"]["memory_current_str"],
stats_json["APRSDStats"]["memory_peak_str"],
stats_json['APRSDStats']['memory_current_str'],
stats_json['APRSDStats']['memory_peak_str'],
len(thread_list),
aprsd_log.logging_queue.qsize(),
)
LOG.info(keepalive)
if "APRSDThreadList" in stats_json:
thread_list = stats_json["APRSDThreadList"]
if 'APRSDThreadList' in stats_json:
thread_list = stats_json['APRSDThreadList']
for thread_name in thread_list:
thread = thread_list[thread_name]
alive = thread["alive"]
age = thread["age"]
key = thread["name"]
alive = thread['alive']
age = thread['age']
key = thread['name']
if not alive:
LOG.error(f"Thread {thread}")
LOG.error(f'Thread {thread}')
thread_hex = f"fg {utils.hex_from_name(key)}"
t_name = f"<{thread_hex}>{key:<15}</{thread_hex}>"
thread_msg = f"{t_name} Alive? {str(alive): <5} {str(age): <20}"
thread_hex = f'fg {utils.hex_from_name(key)}'
t_name = f'<{thread_hex}>{key:<15}</{thread_hex}>'
thread_msg = f'{t_name} Alive? {str(alive): <5} {str(age): <20}'
LOGU.opt(colors=True).info(thread_msg)
# LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}")

View File

@ -8,7 +8,7 @@ import aprsd
from aprsd import threads as aprsd_threads
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class APRSRegistryThread(aprsd_threads.APRSDThread):
@ -17,39 +17,39 @@ class APRSRegistryThread(aprsd_threads.APRSDThread):
_loop_cnt: int = 1
def __init__(self):
super().__init__("APRSRegistryThread")
super().__init__('APRSRegistryThread')
self._loop_cnt = 1
if not CONF.aprs_registry.enabled:
LOG.error(
"APRS Registry is not enabled. ",
'APRS Registry is not enabled. ',
)
LOG.error(
"APRS Registry thread is STOPPING.",
'APRS Registry thread is STOPPING.',
)
self.stop()
LOG.info(
"APRS Registry thread is running and will send "
f"info every {CONF.aprs_registry.frequency_seconds} seconds "
f"to {CONF.aprs_registry.registry_url}.",
'APRS Registry thread is running and will send '
f'info every {CONF.aprs_registry.frequency_seconds} seconds '
f'to {CONF.aprs_registry.registry_url}.',
)
def loop(self):
# Only call the registry every N seconds
if self._loop_cnt % CONF.aprs_registry.frequency_seconds == 0:
info = {
"callsign": CONF.callsign,
"description": CONF.aprs_registry.description,
"service_website": CONF.aprs_registry.service_website,
"software": f"APRSD version {aprsd.__version__} "
"https://github.com/craigerl/aprsd",
'callsign': CONF.callsign,
'description': CONF.aprs_registry.description,
'service_website': CONF.aprs_registry.service_website,
'software': f'APRSD version {aprsd.__version__} '
'https://github.com/craigerl/aprsd',
}
try:
requests.post(
f"{CONF.aprs_registry.registry_url}",
f'{CONF.aprs_registry.registry_url}',
json=info,
)
except Exception as e:
LOG.error(f"Failed to send registry info: {e}")
LOG.error(f'Failed to send registry info: {e}')
time.sleep(1)
self._loop_cnt += 1

View File

@ -89,7 +89,7 @@ class APRSDRXThread(APRSDThread):
def process_packet(self, *args, **kwargs):
"""Convert the raw packet into a Packet object and put it on the queue.
The processing of the packet will happen in a separate thread.
The processing of the packet will happen in a separate thread.
"""
packet = self._client.decode_packet(*args, **kwargs)
if not packet:

View File

@ -3,7 +3,6 @@ import threading
import wrapt
MAX_PACKET_ID = 9999

View File

@ -26,42 +26,42 @@ def fuzzy(hour, minute, degree=1):
When degree = 2, time is in quantum of 15 minutes."""
if degree <= 0 or degree > 2:
print("Please use a degree of 1 or 2. Using fuzziness degree=1")
print('Please use a degree of 1 or 2. Using fuzziness degree=1')
degree = 1
begin = "It's "
f0 = "almost "
f1 = "exactly "
f2 = "around "
f0 = 'almost '
f1 = 'exactly '
f2 = 'around '
b0 = " past "
b1 = " to "
b0 = ' past '
b1 = ' to '
hourlist = (
"One",
"Two",
"Three",
"Four",
"Five",
"Six",
"Seven",
"Eight",
"Nine",
"Ten",
"Eleven",
"Twelve",
'One',
'Two',
'Three',
'Four',
'Five',
'Six',
'Seven',
'Eight',
'Nine',
'Ten',
'Eleven',
'Twelve',
)
s1 = s2 = s3 = s4 = ""
s1 = s2 = s3 = s4 = ''
base = 5
if degree == 1:
base = 5
val = ("Five", "Ten", "Quarter", "Twenty", "Twenty-Five", "Half")
val = ('Five', 'Ten', 'Quarter', 'Twenty', 'Twenty-Five', 'Half')
elif degree == 2:
base = 15
val = ("Quarter", "Half")
val = ('Quarter', 'Half')
# to find whether we have to use 'almost', 'exactly' or 'around'
dmin = minute % base
@ -86,11 +86,11 @@ def fuzzy(hour, minute, degree=1):
if minute <= base / 2:
# Case like "It's around/exactly Ten"
s2 = s3 = ""
s2 = s3 = ''
s4 = hourlist[hour - 12 - 1]
elif minute >= 60 - base / 2:
# Case like "It's almost Ten"
s2 = s3 = ""
s2 = s3 = ''
s4 = hourlist[hour - 12]
else:
# Other cases with all words, like "It's around Quarter past One"
@ -114,22 +114,22 @@ def main():
try:
deg = int(sys.argv[1])
except Exception:
print("Please use a degree of 1 or 2. Using fuzziness degree=1")
print('Please use a degree of 1 or 2. Using fuzziness degree=1')
if len(sys.argv) >= 3:
tm = sys.argv[2].split(":")
tm = sys.argv[2].split(':')
try:
h = int(tm[0])
m = int(tm[1])
if h < 0 or h > 23 or m < 0 or m > 59:
raise Exception
except Exception:
print("Bad time entered. Using the system time.")
print('Bad time entered. Using the system time.')
h = stm.tm_hour
m = stm.tm_min
print(fuzzy(h, m, deg))
return
if __name__ == "__main__":
if __name__ == '__main__':
main()

View File

@ -10,40 +10,40 @@ class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
args = (
"year",
"month",
"day",
"hour",
"minute",
"second",
"microsecond",
'year',
'month',
'day',
'hour',
'minute',
'second',
'microsecond',
)
return {
"__type__": "datetime.datetime",
"args": [getattr(obj, a) for a in args],
'__type__': 'datetime.datetime',
'args': [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.date):
args = ("year", "month", "day")
args = ('year', 'month', 'day')
return {
"__type__": "datetime.date",
"args": [getattr(obj, a) for a in args],
'__type__': 'datetime.date',
'args': [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.time):
args = ("hour", "minute", "second", "microsecond")
args = ('hour', 'minute', 'second', 'microsecond')
return {
"__type__": "datetime.time",
"args": [getattr(obj, a) for a in args],
'__type__': 'datetime.time',
'args': [getattr(obj, a) for a in args],
}
elif isinstance(obj, datetime.timedelta):
args = ("days", "seconds", "microseconds")
args = ('days', 'seconds', 'microseconds')
return {
"__type__": "datetime.timedelta",
"args": [getattr(obj, a) for a in args],
'__type__': 'datetime.timedelta',
'args': [getattr(obj, a) for a in args],
}
elif isinstance(obj, decimal.Decimal):
return {
"__type__": "decimal.Decimal",
"args": [str(obj)],
'__type__': 'decimal.Decimal',
'args': [str(obj)],
}
else:
return super().default(obj)
@ -76,10 +76,10 @@ class EnhancedJSONDecoder(json.JSONDecoder):
)
def object_hook(self, d):
if "__type__" not in d:
if '__type__' not in d:
return d
o = sys.modules[__name__]
for e in d["__type__"].split("."):
for e in d['__type__'].split('.'):
o = getattr(o, e)
args, kwargs = d.get("args", ()), d.get("kwargs", {})
args, kwargs = d.get('args', ()), d.get('kwargs', {})
return o(*args, **kwargs)

View File

@ -23,6 +23,7 @@ class ObjectStoreMixin:
When APRSD Starts, it calls load()
aprsd server -f (flush) will wipe all saved objects.
"""
# Child class must create the lock.
lock = None

View File

@ -45,4 +45,4 @@ export COLUMNS=200
#exec uwsgi --http :8000 --gevent 1000 --http-websockets --master -w aprsd.wsgi --callable app
#exec aprsd listen -c $APRSD_CONFIG --loglevel ${LOG_LEVEL} ${APRSD_LOAD_PLUGINS} ${APRSD_LISTEN_FILTER}
#
uv run aprsd admin web -c $APRSD_CONFIG --loglevel ${LOG_LEVEL}
uv run aprsd admin web -c $APRSD_CONFIG --loglevel ${LOG_LEVEL}

View File

@ -2,18 +2,18 @@ import logging
from aprsd import packets, plugin
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class HelloPlugin(plugin.APRSDRegexCommandPluginBase):
"""Hello World."""
version = "1.0"
version = '1.0'
# matches any string starting with h or H
command_regex = "^[hH]"
command_name = "hello"
command_regex = '^[hH]'
command_name = 'hello'
def process(self, packet: packets.MessagePacket):
LOG.info("HelloPlugin")
LOG.info('HelloPlugin')
reply = f"Hello '{packet.from_call}'"
return reply

View File

@ -14,5 +14,4 @@
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
setuptools.setup()

View File

@ -12,7 +12,7 @@ from aprsd.main import cli
from .. import fake
CONF = cfg.CONF
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
F = t.TypeVar('F', bound=t.Callable[..., t.Any])
class TestSendMessageCommand(unittest.TestCase):
@ -28,37 +28,37 @@ class TestSendMessageCommand(unittest.TestCase):
# CONF.aprsd_admin_extension.user = "admin"
# CONF.aprsd_admin_extension.password = "password"
@mock.patch("aprsd.log.log.setup_logging")
@mock.patch('aprsd.log.log.setup_logging')
def test_no_tocallsign(self, mock_logging):
"""Make sure we get an error if there is no tocallsign."""
self.config_and_init(
login="something",
password="another",
login='something',
password='another',
)
runner = CliRunner()
result = runner.invoke(
cli,
["send-message"],
['send-message'],
catch_exceptions=False,
)
assert result.exit_code == 2
assert "Error: Missing argument 'TOCALLSIGN'" in result.output
@mock.patch("aprsd.log.log.setup_logging")
@mock.patch('aprsd.log.log.setup_logging')
def test_no_command(self, mock_logging):
"""Make sure we get an error if there is no command."""
self.config_and_init(
login="something",
password="another",
login='something',
password='another',
)
runner = CliRunner()
result = runner.invoke(
cli,
["send-message", "WB4BOR"],
['send-message', 'WB4BOR'],
catch_exceptions=False,
)
assert result.exit_code == 2

View File

@ -1,9 +1,9 @@
from aprsd import plugin, threads
from aprsd.packets import core
FAKE_MESSAGE_TEXT = "fake MeSSage"
FAKE_FROM_CALLSIGN = "KFAKE"
FAKE_TO_CALLSIGN = "KMINE"
FAKE_MESSAGE_TEXT = 'fake MeSSage'
FAKE_FROM_CALLSIGN = 'KFAKE'
FAKE_TO_CALLSIGN = 'KMINE'
def fake_packet(
@ -15,20 +15,20 @@ def fake_packet(
response=None,
):
packet_dict = {
"from": fromcall,
"addresse": tocall,
"to": tocall,
"format": message_format,
"raw": "",
'from': fromcall,
'addresse': tocall,
'to': tocall,
'format': message_format,
'raw': '',
}
if message:
packet_dict["message_text"] = message
packet_dict['message_text'] = message
if msg_number:
packet_dict["msgNo"] = str(msg_number)
packet_dict['msgNo'] = str(msg_number)
if response:
packet_dict["response"] = response
packet_dict['response'] = response
return core.factory(packet_dict)
@ -41,7 +41,7 @@ def fake_ack_packet():
class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase):
version = "1.0"
version = '1.0'
def setup(self):
self.enabled = True
@ -50,19 +50,19 @@ class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase):
return None
def process(self, packet):
return "process"
return 'process'
class FakeThread(threads.APRSDThread):
def __init__(self):
super().__init__("FakeThread")
super().__init__('FakeThread')
def loop(self):
return False
class FakeBaseThreadsPlugin(plugin.APRSDPluginBase):
version = "1.0"
version = '1.0'
def setup(self):
self.enabled = True
@ -71,16 +71,16 @@ class FakeBaseThreadsPlugin(plugin.APRSDPluginBase):
return None
def process(self, packet):
return "process"
return 'process'
def create_threads(self):
return FakeThread()
class FakeRegexCommandPlugin(plugin.APRSDRegexCommandPluginBase):
version = "1.0"
command_regex = "^[fF]"
command_name = "fake"
version = '1.0'
command_regex = '^[fF]'
command_name = 'fake'
def process(self, packet):
return FAKE_MESSAGE_TEXT

View File

@ -0,0 +1,76 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestAckPacket(unittest.TestCase):
"""Test AckPacket JSON serialization."""
def test_ack_packet_to_json(self):
"""Test AckPacket.to_json() method."""
packet = packets.AckPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo='123',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'AckPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['msgNo'], '123')
def test_ack_packet_from_dict(self):
"""Test AckPacket.from_dict() method."""
packet_dict = {
'_type': 'AckPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'msgNo': '123',
}
packet = packets.AckPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.AckPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.msgNo, '123')
def test_ack_packet_round_trip(self):
"""Test AckPacket round-trip: to_json -> from_dict."""
original = packets.AckPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo='123',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.AckPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.msgNo, original.msgNo)
self.assertEqual(restored._type, original._type)
def test_ack_packet_from_raw_string(self):
"""Test AckPacket creation from raw APRS string."""
packet_raw = 'KFAKE>APZ100::KMINE :ack123'
packet_dict = aprslib.parse(packet_raw)
# aprslib might not set format/response correctly, so set them manually
packet_dict['format'] = 'message'
packet_dict['response'] = 'ack'
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.AckPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'AckPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.AckPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.to_call, packet.to_call)
self.assertEqual(restored.msgNo, packet.msgNo)

View File

@ -0,0 +1,98 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestBeaconPacket(unittest.TestCase):
"""Test BeaconPacket JSON serialization."""
def test_beacon_packet_to_json(self):
"""Test BeaconPacket.to_json() method."""
packet = packets.BeaconPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
symbol='>',
symbol_table='/',
comment='Test beacon comment',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'BeaconPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['latitude'], 37.7749)
self.assertEqual(json_dict['longitude'], -122.4194)
self.assertEqual(json_dict['symbol'], '>')
self.assertEqual(json_dict['symbol_table'], '/')
self.assertEqual(json_dict['comment'], 'Test beacon comment')
def test_beacon_packet_from_dict(self):
"""Test BeaconPacket.from_dict() method."""
packet_dict = {
'_type': 'BeaconPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'latitude': 37.7749,
'longitude': -122.4194,
'symbol': '>',
'symbol_table': '/',
'comment': 'Test beacon comment',
}
packet = packets.BeaconPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.latitude, 37.7749)
self.assertEqual(packet.longitude, -122.4194)
self.assertEqual(packet.symbol, '>')
self.assertEqual(packet.symbol_table, '/')
self.assertEqual(packet.comment, 'Test beacon comment')
def test_beacon_packet_round_trip(self):
"""Test BeaconPacket round-trip: to_json -> from_dict."""
original = packets.BeaconPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
symbol='>',
symbol_table='/',
comment='Test beacon comment',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.BeaconPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.latitude, original.latitude)
self.assertEqual(restored.longitude, original.longitude)
self.assertEqual(restored.symbol, original.symbol)
self.assertEqual(restored.symbol_table, original.symbol_table)
self.assertEqual(restored.comment, original.comment)
self.assertEqual(restored._type, original._type)
def test_beacon_packet_from_raw_string(self):
"""Test BeaconPacket creation from raw APRS string."""
# Use a format that aprslib can parse correctly
packet_raw = 'kd8mey-10>APRS,TCPIP*,qAC,T2SYDNEY:=4247.80N/08539.00WrPHG1210/Making 220 Great Again Allstar# 552191'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'BeaconPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.BeaconPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.latitude, packet.latitude)
self.assertEqual(restored.longitude, packet.longitude)

View File

@ -0,0 +1,75 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestBulletinPacket(unittest.TestCase):
"""Test BulletinPacket JSON serialization."""
def test_bulletin_packet_to_json(self):
"""Test BulletinPacket.to_json() method."""
packet = packets.BulletinPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
message_text='Test bulletin message',
bid='1',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'BulletinPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['message_text'], 'Test bulletin message')
self.assertEqual(json_dict['bid'], '1')
def test_bulletin_packet_from_dict(self):
"""Test BulletinPacket.from_dict() method."""
packet_dict = {
'_type': 'BulletinPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'message_text': 'Test bulletin message',
'bid': '1',
}
packet = packets.BulletinPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.BulletinPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.message_text, 'Test bulletin message')
self.assertEqual(packet.bid, '1')
def test_bulletin_packet_round_trip(self):
"""Test BulletinPacket round-trip: to_json -> from_dict."""
original = packets.BulletinPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
message_text='Test bulletin message',
bid='1',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.BulletinPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.message_text, original.message_text)
self.assertEqual(restored.bid, original.bid)
self.assertEqual(restored._type, original._type)
def test_bulletin_packet_from_raw_string(self):
"""Test BulletinPacket creation from raw APRS string."""
packet_raw = 'KFAKE>APZ100::BLN1 :Test bulletin message'
packet_dict = aprslib.parse(packet_raw)
# aprslib might not set format correctly, so set it manually
packet_dict['format'] = 'bulletin'
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BulletinPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'BulletinPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.BulletinPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.message_text, packet.message_text)
self.assertEqual(restored.bid, packet.bid)

View File

@ -0,0 +1,109 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestGPSPacket(unittest.TestCase):
"""Test GPSPacket JSON serialization."""
def test_gps_packet_to_json(self):
"""Test GPSPacket.to_json() method."""
packet = packets.GPSPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
altitude=100.0,
symbol='>',
symbol_table='/',
comment='Test GPS comment',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'GPSPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['latitude'], 37.7749)
self.assertEqual(json_dict['longitude'], -122.4194)
self.assertEqual(json_dict['altitude'], 100.0)
self.assertEqual(json_dict['symbol'], '>')
self.assertEqual(json_dict['symbol_table'], '/')
self.assertEqual(json_dict['comment'], 'Test GPS comment')
def test_gps_packet_from_dict(self):
"""Test GPSPacket.from_dict() method."""
packet_dict = {
'_type': 'GPSPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'latitude': 37.7749,
'longitude': -122.4194,
'altitude': 100.0,
'symbol': '>',
'symbol_table': '/',
'comment': 'Test GPS comment',
}
packet = packets.GPSPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.GPSPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.latitude, 37.7749)
self.assertEqual(packet.longitude, -122.4194)
self.assertEqual(packet.altitude, 100.0)
self.assertEqual(packet.symbol, '>')
self.assertEqual(packet.symbol_table, '/')
self.assertEqual(packet.comment, 'Test GPS comment')
def test_gps_packet_round_trip(self):
"""Test GPSPacket round-trip: to_json -> from_dict."""
original = packets.GPSPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
altitude=100.0,
symbol='>',
symbol_table='/',
comment='Test GPS comment',
speed=25.5,
course=180,
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.GPSPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.latitude, original.latitude)
self.assertEqual(restored.longitude, original.longitude)
self.assertEqual(restored.altitude, original.altitude)
self.assertEqual(restored.symbol, original.symbol)
self.assertEqual(restored.symbol_table, original.symbol_table)
self.assertEqual(restored.comment, original.comment)
self.assertEqual(restored.speed, original.speed)
self.assertEqual(restored.course, original.course)
self.assertEqual(restored._type, original._type)
def test_gps_packet_from_raw_string(self):
"""Test GPSPacket creation from raw APRS string."""
packet_raw = 'KFAKE>APZ100,WIDE2-1:!3742.00N/12225.00W>Test GPS comment'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
# GPS packets are typically created as BeaconPacket or other types
# but we can test if it has GPS data
self.assertIsNotNone(packet)
if hasattr(packet, 'latitude') and hasattr(packet, 'longitude'):
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertIn('latitude', json_dict)
self.assertIn('longitude', json_dict)
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertEqual(restored.latitude, packet.latitude)
self.assertEqual(restored.longitude, packet.longitude)

View File

@ -0,0 +1,80 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestMessagePacket(unittest.TestCase):
"""Test MessagePacket JSON serialization."""
def test_message_packet_to_json(self):
"""Test MessagePacket.to_json() method."""
packet = packets.MessagePacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
message_text='Test message',
msgNo='123',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'MessagePacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['message_text'], 'Test message')
self.assertEqual(json_dict['msgNo'], '123')
def test_message_packet_from_dict(self):
"""Test MessagePacket.from_dict() method."""
packet_dict = {
'_type': 'MessagePacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'message_text': 'Test message',
'msgNo': '123',
}
packet = packets.MessagePacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.MessagePacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.message_text, 'Test message')
self.assertEqual(packet.msgNo, '123')
def test_message_packet_round_trip(self):
"""Test MessagePacket round-trip: to_json -> from_dict."""
original = packets.MessagePacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
message_text='Test message',
msgNo='123',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.MessagePacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.message_text, original.message_text)
self.assertEqual(restored.msgNo, original.msgNo)
self.assertEqual(restored._type, original._type)
def test_message_packet_from_raw_string(self):
"""Test MessagePacket creation from raw APRS string."""
packet_raw = 'KM6LYW>APZ100::WB4BOR :Test message{123'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.MessagePacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'MessagePacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.MessagePacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.to_call, packet.to_call)
self.assertEqual(restored.message_text, packet.message_text)
self.assertEqual(restored.msgNo, packet.msgNo)

View File

@ -0,0 +1,107 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestMicEPacket(unittest.TestCase):
"""Test MicEPacket JSON serialization."""
def test_mice_packet_to_json(self):
"""Test MicEPacket.to_json() method."""
packet = packets.MicEPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
speed=25.5,
course=180,
mbits='test',
mtype='test_type',
telemetry={'key': 'value'},
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'MicEPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['latitude'], 37.7749)
self.assertEqual(json_dict['longitude'], -122.4194)
self.assertEqual(json_dict['speed'], 25.5)
self.assertEqual(json_dict['course'], 180)
self.assertEqual(json_dict['mbits'], 'test')
self.assertEqual(json_dict['mtype'], 'test_type')
def test_mice_packet_from_dict(self):
"""Test MicEPacket.from_dict() method."""
packet_dict = {
'_type': 'MicEPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'latitude': 37.7749,
'longitude': -122.4194,
'speed': 25.5,
'course': 180,
'mbits': 'test',
'mtype': 'test_type',
'telemetry': {'key': 'value'},
}
packet = packets.MicEPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.MicEPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.latitude, 37.7749)
self.assertEqual(packet.longitude, -122.4194)
self.assertEqual(packet.speed, 25.5)
self.assertEqual(packet.course, 180)
self.assertEqual(packet.mbits, 'test')
self.assertEqual(packet.mtype, 'test_type')
def test_mice_packet_round_trip(self):
"""Test MicEPacket round-trip: to_json -> from_dict."""
original = packets.MicEPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
speed=25.5,
course=180,
mbits='test',
mtype='test_type',
telemetry={'key': 'value'},
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.MicEPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.latitude, original.latitude)
self.assertEqual(restored.longitude, original.longitude)
self.assertEqual(restored.speed, original.speed)
self.assertEqual(restored.course, original.course)
self.assertEqual(restored.mbits, original.mbits)
self.assertEqual(restored.mtype, original.mtype)
self.assertEqual(restored._type, original._type)
def test_mice_packet_from_raw_string(self):
"""Test MicEPacket creation from raw APRS string."""
packet_raw = 'kh2sr-15>S7TSYR,WIDE1-1,WIDE2-1,qAO,KO6KL-1:`1`7\x1c\x1c.#/`"4,}QuirkyQRP 4.6V 35.3C S06'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.MicEPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'MicEPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.MicEPacket)
self.assertEqual(restored.from_call, packet.from_call)
if hasattr(packet, 'latitude') and packet.latitude:
self.assertEqual(restored.latitude, packet.latitude)
self.assertEqual(restored.longitude, packet.longitude)

View File

@ -0,0 +1,122 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestObjectPacket(unittest.TestCase):
"""Test ObjectPacket JSON serialization."""
def test_object_packet_to_json(self):
"""Test ObjectPacket.to_json() method."""
packet = packets.ObjectPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
symbol='r',
symbol_table='/',
comment='Test object comment',
alive=True,
speed=25.5,
course=180,
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'ObjectPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['latitude'], 37.7749)
self.assertEqual(json_dict['longitude'], -122.4194)
self.assertEqual(json_dict['symbol'], 'r')
self.assertEqual(json_dict['symbol_table'], '/')
self.assertEqual(json_dict['comment'], 'Test object comment')
self.assertEqual(json_dict['alive'], True)
self.assertEqual(json_dict['speed'], 25.5)
self.assertEqual(json_dict['course'], 180)
def test_object_packet_from_dict(self):
"""Test ObjectPacket.from_dict() method."""
packet_dict = {
'_type': 'ObjectPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'latitude': 37.7749,
'longitude': -122.4194,
'symbol': 'r',
'symbol_table': '/',
'comment': 'Test object comment',
'alive': True,
'speed': 25.5,
'course': 180,
}
packet = packets.ObjectPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.ObjectPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.latitude, 37.7749)
self.assertEqual(packet.longitude, -122.4194)
self.assertEqual(packet.symbol, 'r')
self.assertEqual(packet.symbol_table, '/')
self.assertEqual(packet.comment, 'Test object comment')
self.assertEqual(packet.alive, True)
self.assertEqual(packet.speed, 25.5)
self.assertEqual(packet.course, 180)
def test_object_packet_round_trip(self):
"""Test ObjectPacket round-trip: to_json -> from_dict."""
original = packets.ObjectPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
symbol='r',
symbol_table='/',
comment='Test object comment',
alive=True,
speed=25.5,
course=180,
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.ObjectPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.latitude, original.latitude)
self.assertEqual(restored.longitude, original.longitude)
self.assertEqual(restored.symbol, original.symbol)
self.assertEqual(restored.symbol_table, original.symbol_table)
self.assertEqual(restored.comment, original.comment)
self.assertEqual(restored.alive, original.alive)
self.assertEqual(restored.speed, original.speed)
self.assertEqual(restored.course, original.course)
self.assertEqual(restored._type, original._type)
def test_object_packet_from_raw_string(self):
"""Test ObjectPacket creation from raw APRS string."""
# Use a working object packet example from the codebase
packet_raw = (
'REPEAT>APZ100:;K4CQ *301301z3735.11N/07903.08Wr145.490MHz T136 -060'
)
packet_dict = aprslib.parse(packet_raw)
# aprslib might not set format correctly, so set it manually
packet_dict['format'] = 'object'
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.ObjectPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'ObjectPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.ObjectPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.to_call, packet.to_call)
if hasattr(packet, 'latitude') and packet.latitude:
self.assertEqual(restored.latitude, packet.latitude)
self.assertEqual(restored.longitude, packet.longitude)

View File

@ -0,0 +1,75 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestPacket(unittest.TestCase):
"""Test Packet base class JSON serialization."""
def test_packet_to_json(self):
"""Test Packet.to_json() method."""
packet = packets.Packet(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo='123',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
# Verify it's valid JSON
json_dict = json.loads(json_str)
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['msgNo'], '123')
def test_packet_from_dict(self):
"""Test Packet.from_dict() method."""
packet_dict = {
'_type': 'Packet',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'msgNo': '123',
}
packet = packets.Packet.from_dict(packet_dict)
self.assertIsInstance(packet, packets.Packet)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.msgNo, '123')
def test_packet_round_trip(self):
"""Test Packet round-trip: to_json -> from_dict."""
original = packets.Packet(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo='123',
addresse=fake.FAKE_TO_CALLSIGN,
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.Packet.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.msgNo, original.msgNo)
self.assertEqual(restored.addresse, original.addresse)
def test_packet_from_raw_string(self):
"""Test Packet creation from raw APRS string."""
# Note: Base Packet is rarely used directly, but we can test with a simple message
packet_raw = 'KFAKE>APZ100::KMINE :Test message{123'
packet_dict = aprslib.parse(packet_raw)
# aprslib might not set format correctly, so set it manually
packet_dict['format'] = 'message'
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.MessagePacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertIn('from_call', json_dict)
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.to_call, packet.to_call)

View File

@ -0,0 +1,76 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestRejectPacket(unittest.TestCase):
"""Test RejectPacket JSON serialization."""
def test_reject_packet_to_json(self):
"""Test RejectPacket.to_json() method."""
packet = packets.RejectPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo='123',
response='rej',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'RejectPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['msgNo'], '123')
def test_reject_packet_from_dict(self):
"""Test RejectPacket.from_dict() method."""
packet_dict = {
'_type': 'RejectPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'msgNo': '123',
'response': 'rej',
}
packet = packets.RejectPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.RejectPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.msgNo, '123')
def test_reject_packet_round_trip(self):
"""Test RejectPacket round-trip: to_json -> from_dict."""
original = packets.RejectPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
msgNo='123',
response='rej',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.RejectPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.msgNo, original.msgNo)
self.assertEqual(restored._type, original._type)
def test_reject_packet_from_raw_string(self):
"""Test RejectPacket creation from raw APRS string."""
packet_raw = 'HB9FDL-1>APK102,HB9FM-4*,WIDE2,qAR,HB9FEF-11::REPEAT :rej4139'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.RejectPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'RejectPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.RejectPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.to_call, packet.to_call)
self.assertEqual(restored.msgNo, packet.msgNo)

View File

@ -0,0 +1,93 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestStatusPacket(unittest.TestCase):
"""Test StatusPacket JSON serialization."""
def test_status_packet_to_json(self):
"""Test StatusPacket.to_json() method."""
packet = packets.StatusPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
status='Test status message',
msgNo='123',
messagecapable=True,
comment='Test comment',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'StatusPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['status'], 'Test status message')
self.assertEqual(json_dict['msgNo'], '123')
self.assertEqual(json_dict['messagecapable'], True)
self.assertEqual(json_dict['comment'], 'Test comment')
def test_status_packet_from_dict(self):
"""Test StatusPacket.from_dict() method."""
packet_dict = {
'_type': 'StatusPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'status': 'Test status message',
'msgNo': '123',
'messagecapable': True,
'comment': 'Test comment',
}
packet = packets.StatusPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.StatusPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.status, 'Test status message')
self.assertEqual(packet.msgNo, '123')
self.assertEqual(packet.messagecapable, True)
self.assertEqual(packet.comment, 'Test comment')
def test_status_packet_round_trip(self):
"""Test StatusPacket round-trip: to_json -> from_dict."""
original = packets.StatusPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
status='Test status message',
msgNo='123',
messagecapable=True,
comment='Test comment',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.StatusPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.status, original.status)
self.assertEqual(restored.msgNo, original.msgNo)
self.assertEqual(restored.messagecapable, original.messagecapable)
self.assertEqual(restored.comment, original.comment)
self.assertEqual(restored._type, original._type)
def test_status_packet_from_raw_string(self):
"""Test StatusPacket creation from raw APRS string."""
packet_raw = 'KFAKE>APZ100::KMINE :Test status message{123'
packet_dict = aprslib.parse(packet_raw)
# aprslib might not set format correctly, so set it manually
packet_dict['format'] = 'status'
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.StatusPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'StatusPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.StatusPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.to_call, packet.to_call)
self.assertEqual(restored.status, packet.status)

View File

@ -0,0 +1,115 @@
import json
import unittest
import aprslib
from aprsd import packets
from aprsd.packets.core import TelemetryPacket
from tests import fake
class TestTelemetryPacket(unittest.TestCase):
"""Test TelemetryPacket JSON serialization."""
def test_telemetry_packet_to_json(self):
"""Test TelemetryPacket.to_json() method."""
packet = TelemetryPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
speed=25.5,
course=180,
mbits='test',
mtype='test_type',
telemetry={'key': 'value'},
tPARM=['parm1', 'parm2'],
tUNIT=['unit1', 'unit2'],
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'TelemetryPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['latitude'], 37.7749)
self.assertEqual(json_dict['longitude'], -122.4194)
self.assertEqual(json_dict['speed'], 25.5)
self.assertEqual(json_dict['course'], 180)
self.assertEqual(json_dict['mbits'], 'test')
self.assertEqual(json_dict['mtype'], 'test_type')
def test_telemetry_packet_from_dict(self):
"""Test TelemetryPacket.from_dict() method."""
packet_dict = {
'_type': 'TelemetryPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'latitude': 37.7749,
'longitude': -122.4194,
'speed': 25.5,
'course': 180,
'mbits': 'test',
'mtype': 'test_type',
'telemetry': {'key': 'value'},
'tPARM': ['parm1', 'parm2'],
'tUNIT': ['unit1', 'unit2'],
}
packet = TelemetryPacket.from_dict(packet_dict)
self.assertIsInstance(packet, TelemetryPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.latitude, 37.7749)
self.assertEqual(packet.longitude, -122.4194)
self.assertEqual(packet.speed, 25.5)
self.assertEqual(packet.course, 180)
self.assertEqual(packet.mbits, 'test')
self.assertEqual(packet.mtype, 'test_type')
def test_telemetry_packet_round_trip(self):
"""Test TelemetryPacket round-trip: to_json -> from_dict."""
original = TelemetryPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
speed=25.5,
course=180,
mbits='test',
mtype='test_type',
telemetry={'key': 'value'},
tPARM=['parm1', 'parm2'],
tUNIT=['unit1', 'unit2'],
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = TelemetryPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.latitude, original.latitude)
self.assertEqual(restored.longitude, original.longitude)
self.assertEqual(restored.speed, original.speed)
self.assertEqual(restored.course, original.course)
self.assertEqual(restored.mbits, original.mbits)
self.assertEqual(restored.mtype, original.mtype)
self.assertEqual(restored._type, original._type)
def test_telemetry_packet_from_raw_string(self):
"""Test TelemetryPacket creation from raw APRS string."""
# Telemetry packets are less common, using a Mic-E with telemetry as example
packet_raw = (
"KD9YIL>T0PX9W,WIDE1-1,WIDE2-1,qAO,NU9R-10:`sB,l#P>/'\"6+}|#*%U'a|!whl!|3"
)
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
# This might be MicEPacket or TelemetryPacket depending on content
self.assertIsNotNone(packet)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertEqual(restored.from_call, packet.from_call)
if hasattr(packet, 'telemetry') and packet.telemetry:
self.assertIsNotNone(restored.telemetry)

View File

@ -0,0 +1,99 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestThirdPartyPacket(unittest.TestCase):
"""Test ThirdPartyPacket JSON serialization."""
def test_thirdparty_packet_to_json(self):
"""Test ThirdPartyPacket.to_json() method."""
subpacket = packets.MessagePacket(
from_call='SUB',
to_call='TARGET',
message_text='Sub message',
)
packet = packets.ThirdPartyPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
subpacket=subpacket,
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'ThirdPartyPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
# subpacket should be serialized as a dict
self.assertIn('subpacket', json_dict)
self.assertIsInstance(json_dict['subpacket'], dict)
def test_thirdparty_packet_from_dict(self):
"""Test ThirdPartyPacket.from_dict() method."""
subpacket_dict = {
'_type': 'MessagePacket',
'from_call': 'SUB',
'to_call': 'TARGET',
'message_text': 'Sub message',
}
packet_dict = {
'_type': 'ThirdPartyPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'subpacket': subpacket_dict,
}
packet = packets.ThirdPartyPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.ThirdPartyPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertIsNotNone(packet.subpacket)
self.assertIsInstance(packet.subpacket, packets.MessagePacket)
def test_thirdparty_packet_round_trip(self):
"""Test ThirdPartyPacket round-trip: to_json -> from_dict."""
subpacket = packets.MessagePacket(
from_call='SUB',
to_call='TARGET',
message_text='Sub message',
)
original = packets.ThirdPartyPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
subpacket=subpacket,
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.ThirdPartyPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored._type, original._type)
# Verify subpacket was restored
self.assertIsNotNone(restored.subpacket)
self.assertIsInstance(restored.subpacket, packets.MessagePacket)
self.assertEqual(restored.subpacket.from_call, original.subpacket.from_call)
self.assertEqual(restored.subpacket.to_call, original.subpacket.to_call)
self.assertEqual(
restored.subpacket.message_text, original.subpacket.message_text
)
def test_thirdparty_packet_from_raw_string(self):
"""Test ThirdPartyPacket creation from raw APRS string."""
packet_raw = 'GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.ThirdPartyPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'ThirdPartyPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.ThirdPartyPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertIsNotNone(restored.subpacket)
self.assertEqual(restored.subpacket.from_call, packet.subpacket.from_call)

View File

@ -0,0 +1,82 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestUnknownPacket(unittest.TestCase):
"""Test UnknownPacket JSON serialization."""
def test_unknown_packet_to_json(self):
"""Test UnknownPacket.to_json() method."""
packet = packets.UnknownPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
format='unknown_format',
packet_type='unknown',
unknown_fields={'extra_field': 'extra_value'},
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'UnknownPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['format'], 'unknown_format')
self.assertEqual(json_dict['packet_type'], 'unknown')
def test_unknown_packet_from_dict(self):
"""Test UnknownPacket.from_dict() method."""
packet_dict = {
'_type': 'UnknownPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'format': 'unknown_format',
'packet_type': 'unknown',
'extra_field': 'extra_value',
}
packet = packets.UnknownPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.UnknownPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.format, 'unknown_format')
self.assertEqual(packet.packet_type, 'unknown')
def test_unknown_packet_round_trip(self):
"""Test UnknownPacket round-trip: to_json -> from_dict."""
original = packets.UnknownPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
format='unknown_format',
packet_type='unknown',
unknown_fields={'extra_field': 'extra_value'},
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.UnknownPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.format, original.format)
self.assertEqual(restored.packet_type, original.packet_type)
self.assertEqual(restored._type, original._type)
def test_unknown_packet_from_raw_string(self):
"""Test UnknownPacket creation from raw APRS string."""
# Use a packet format that might not be recognized
packet_raw = 'KFAKE>APZ100:>Unknown format data'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
# This might be UnknownPacket or another type depending on parsing
self.assertIsNotNone(packet)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertEqual(restored.from_call, packet.from_call)
if isinstance(packet, packets.UnknownPacket):
self.assertIsInstance(restored, packets.UnknownPacket)

View File

@ -0,0 +1,151 @@
import json
import unittest
import aprslib
from aprsd import packets
from tests import fake
class TestWeatherPacket(unittest.TestCase):
"""Test WeatherPacket JSON serialization."""
def test_weather_packet_to_json(self):
"""Test WeatherPacket.to_json() method."""
packet = packets.WeatherPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
symbol='_',
symbol_table='/',
wind_speed=10.5,
wind_direction=180,
wind_gust=15.0,
temperature=72.5,
rain_1h=0.1,
rain_24h=0.5,
rain_since_midnight=0.3,
humidity=65,
pressure=1013.25,
comment='Test weather comment',
)
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'WeatherPacket')
self.assertEqual(json_dict['from_call'], fake.FAKE_FROM_CALLSIGN)
self.assertEqual(json_dict['to_call'], fake.FAKE_TO_CALLSIGN)
self.assertEqual(json_dict['latitude'], 37.7749)
self.assertEqual(json_dict['longitude'], -122.4194)
self.assertEqual(json_dict['symbol'], '_')
self.assertEqual(json_dict['wind_speed'], 10.5)
self.assertEqual(json_dict['wind_direction'], 180)
self.assertEqual(json_dict['wind_gust'], 15.0)
self.assertEqual(json_dict['temperature'], 72.5)
self.assertEqual(json_dict['rain_1h'], 0.1)
self.assertEqual(json_dict['rain_24h'], 0.5)
self.assertEqual(json_dict['rain_since_midnight'], 0.3)
self.assertEqual(json_dict['humidity'], 65)
self.assertEqual(json_dict['pressure'], 1013.25)
self.assertEqual(json_dict['comment'], 'Test weather comment')
def test_weather_packet_from_dict(self):
"""Test WeatherPacket.from_dict() method."""
packet_dict = {
'_type': 'WeatherPacket',
'from_call': fake.FAKE_FROM_CALLSIGN,
'to_call': fake.FAKE_TO_CALLSIGN,
'latitude': 37.7749,
'longitude': -122.4194,
'symbol': '_',
'symbol_table': '/',
'wind_speed': 10.5,
'wind_direction': 180,
'wind_gust': 15.0,
'temperature': 72.5,
'rain_1h': 0.1,
'rain_24h': 0.5,
'rain_since_midnight': 0.3,
'humidity': 65,
'pressure': 1013.25,
'comment': 'Test weather comment',
}
packet = packets.WeatherPacket.from_dict(packet_dict)
self.assertIsInstance(packet, packets.WeatherPacket)
self.assertEqual(packet.from_call, fake.FAKE_FROM_CALLSIGN)
self.assertEqual(packet.to_call, fake.FAKE_TO_CALLSIGN)
self.assertEqual(packet.latitude, 37.7749)
self.assertEqual(packet.longitude, -122.4194)
self.assertEqual(packet.symbol, '_')
self.assertEqual(packet.wind_speed, 10.5)
self.assertEqual(packet.wind_direction, 180)
self.assertEqual(packet.wind_gust, 15.0)
self.assertEqual(packet.temperature, 72.5)
self.assertEqual(packet.rain_1h, 0.1)
self.assertEqual(packet.rain_24h, 0.5)
self.assertEqual(packet.rain_since_midnight, 0.3)
self.assertEqual(packet.humidity, 65)
self.assertEqual(packet.pressure, 1013.25)
self.assertEqual(packet.comment, 'Test weather comment')
def test_weather_packet_round_trip(self):
"""Test WeatherPacket round-trip: to_json -> from_dict."""
original = packets.WeatherPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=37.7749,
longitude=-122.4194,
symbol='_',
symbol_table='/',
wind_speed=10.5,
wind_direction=180,
wind_gust=15.0,
temperature=72.5,
rain_1h=0.1,
rain_24h=0.5,
rain_since_midnight=0.3,
humidity=65,
pressure=1013.25,
comment='Test weather comment',
)
json_str = original.to_json()
packet_dict = json.loads(json_str)
restored = packets.WeatherPacket.from_dict(packet_dict)
self.assertEqual(restored.from_call, original.from_call)
self.assertEqual(restored.to_call, original.to_call)
self.assertEqual(restored.latitude, original.latitude)
self.assertEqual(restored.longitude, original.longitude)
self.assertEqual(restored.symbol, original.symbol)
self.assertEqual(restored.wind_speed, original.wind_speed)
self.assertEqual(restored.wind_direction, original.wind_direction)
self.assertEqual(restored.wind_gust, original.wind_gust)
self.assertEqual(restored.temperature, original.temperature)
self.assertEqual(restored.rain_1h, original.rain_1h)
self.assertEqual(restored.rain_24h, original.rain_24h)
self.assertEqual(restored.rain_since_midnight, original.rain_since_midnight)
self.assertEqual(restored.humidity, original.humidity)
self.assertEqual(restored.pressure, original.pressure)
self.assertEqual(restored.comment, original.comment)
self.assertEqual(restored._type, original._type)
def test_weather_packet_from_raw_string(self):
"""Test WeatherPacket creation from raw APRS string."""
packet_raw = 'FW9222>APRS,TCPXX*,qAX,CWOP-6:@122025z2953.94N/08423.77W_232/003g006t084r000p032P000h80b10157L745.DsWLL'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.WeatherPacket)
# Test to_json
json_str = packet.to_json()
self.assertIsInstance(json_str, str)
json_dict = json.loads(json_str)
self.assertEqual(json_dict['_type'], 'WeatherPacket')
# Test from_dict round trip
restored = packets.factory(json_dict)
self.assertIsInstance(restored, packets.WeatherPacket)
self.assertEqual(restored.from_call, packet.from_call)
self.assertEqual(restored.temperature, packet.temperature)
self.assertEqual(restored.humidity, packet.humidity)
self.assertEqual(restored.pressure, packet.pressure)
self.assertEqual(restored.wind_speed, packet.wind_speed)
self.assertEqual(restored.wind_direction, packet.wind_direction)

View File

@ -7,29 +7,28 @@ from aprsd.plugins import fortune as fortune_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestFortunePlugin(test_plugin.TestPlugin):
@mock.patch("shutil.which")
@mock.patch('shutil.which')
def test_fortune_fail(self, mock_which):
mock_which.return_value = None
fortune = fortune_plugin.FortunePlugin()
expected = "FortunePlugin isn't enabled"
packet = fake.fake_packet(message="fortune")
packet = fake.fake_packet(message='fortune')
actual = fortune.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("subprocess.check_output")
@mock.patch("shutil.which")
@mock.patch('subprocess.check_output')
@mock.patch('shutil.which')
def test_fortune_success(self, mock_which, mock_output):
mock_which.return_value = "/usr/bin/games/fortune"
mock_output.return_value = "Funny fortune"
mock_which.return_value = '/usr/bin/games/fortune'
mock_output.return_value = 'Funny fortune'
CONF.callsign = fake.FAKE_TO_CALLSIGN
fortune = fortune_plugin.FortunePlugin()
expected = "Funny fortune"
packet = fake.fake_packet(message="fortune")
expected = 'Funny fortune'
packet = fake.fake_packet(message='fortune')
actual = fortune.filter(packet)
self.assertEqual(expected, actual)

View File

@ -7,12 +7,11 @@ from aprsd.plugins import ping as ping_plugin
from .. import fake, test_plugin
CONF = cfg.CONF
class TestPingPlugin(test_plugin.TestPlugin):
@mock.patch("time.localtime")
@mock.patch('time.localtime')
def test_ping(self, mock_time):
fake_time = mock.MagicMock()
h = fake_time.tm_hour = 16
@ -24,7 +23,7 @@ class TestPingPlugin(test_plugin.TestPlugin):
ping = ping_plugin.PingPlugin()
packet = fake.fake_packet(
message="location",
message='location',
msg_number=1,
)
@ -33,16 +32,16 @@ class TestPingPlugin(test_plugin.TestPlugin):
def ping_str(h, m, s):
return (
"Pong! "
'Pong! '
+ str(h).zfill(2)
+ ":"
+ ':'
+ str(m).zfill(2)
+ ":"
+ ':'
+ str(s).zfill(2)
)
packet = fake.fake_packet(
message="Ping",
message='Ping',
msg_number=1,
)
actual = ping.filter(packet)
@ -50,7 +49,7 @@ class TestPingPlugin(test_plugin.TestPlugin):
self.assertEqual(expected, actual)
packet = fake.fake_packet(
message="ping",
message='ping',
msg_number=1,
)
actual = ping.filter(packet)

View File

@ -12,26 +12,26 @@ CONF = cfg.CONF
class TestTimePlugins(test_plugin.TestPlugin):
@mock.patch("aprsd.plugins.time.TimePlugin._get_local_tz")
@mock.patch("aprsd.plugins.time.TimePlugin._get_utcnow")
@mock.patch('aprsd.plugins.time.TimePlugin._get_local_tz')
@mock.patch('aprsd.plugins.time.TimePlugin._get_utcnow')
def test_time(self, mock_utcnow, mock_localtz):
utcnow = pytz.datetime.datetime.utcnow()
mock_utcnow.return_value = utcnow
tz = pytz.timezone("US/Pacific")
tz = pytz.timezone('US/Pacific')
mock_localtz.return_value = tz
gmt_t = pytz.utc.localize(utcnow)
local_t = gmt_t.astimezone(tz)
fake_time = mock.MagicMock()
h = int(local_t.strftime("%H"))
m = int(local_t.strftime("%M"))
h = int(local_t.strftime('%H'))
m = int(local_t.strftime('%M'))
fake_time.tm_sec = 13
CONF.callsign = fake.FAKE_TO_CALLSIGN
time = time_plugin.TimePlugin()
packet = fake.fake_packet(
message="location",
message='location',
msg_number=1,
)
@ -41,11 +41,11 @@ class TestTimePlugins(test_plugin.TestPlugin):
cur_time = fuzzy(h, m, 1)
packet = fake.fake_packet(
message="time",
message='time',
msg_number=1,
)
local_short_str = local_t.strftime("%H:%M %Z")
expected = "{} ({})".format(
local_short_str = local_t.strftime('%H:%M %Z')
expected = '{} ({})'.format(
cur_time,
local_short_str,
)

View File

@ -18,89 +18,89 @@ class TestUSWeatherPlugin(test_plugin.TestPlugin):
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
expected = "USWeatherPlugin isn't enabled"
packet = fake.fake_packet(message="weather")
packet = fake.fake_packet(message='weather')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
def test_failed_aprs_fi_location(self, mock_check):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.side_effect = Exception
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="weather")
expected = 'Failed to fetch aprs.fi location'
packet = fake.fake_packet(message='weather')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
def test_failed_aprs_fi_location_no_entries(self, mock_check):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.return_value = {"entries": []}
CONF.aprs_fi.apiKey = "abc123"
mock_check.return_value = {'entries': []}
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
wx.enabled = True
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="weather")
expected = 'Failed to fetch aprs.fi location'
packet = fake.fake_packet(message='weather')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch("aprsd.plugin_utils.get_weather_gov_for_gps")
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
@mock.patch('aprsd.plugin_utils.get_weather_gov_for_gps')
def test_unknown_gps(self, mock_weather, mock_check_aprs):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check_aprs.return_value = {
"entries": [
'entries': [
{
"lat": 10,
"lng": 11,
"lasttime": 10,
'lat': 10,
'lng': 11,
'lasttime': 10,
},
],
}
mock_weather.side_effect = Exception
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
wx.enabled = True
expected = "Unable to get weather"
packet = fake.fake_packet(message="weather")
expected = 'Unable to get weather'
packet = fake.fake_packet(message='weather')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch("aprsd.plugin_utils.get_weather_gov_for_gps")
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
@mock.patch('aprsd.plugin_utils.get_weather_gov_for_gps')
def test_working(self, mock_weather, mock_check_aprs):
# When the aprs.fi api key isn't set, then
# the LocationPlugin will be disabled.
mock_check_aprs.return_value = {
"entries": [
'entries': [
{
"lat": 10,
"lng": 11,
"lasttime": 10,
'lat': 10,
'lng': 11,
'lasttime': 10,
},
],
}
mock_weather.return_value = {
"currentobservation": {"Temp": "400"},
"data": {
"temperature": ["10", "11"],
"weather": ["test", "another"],
'currentobservation': {'Temp': '400'},
'data': {
'temperature': ['10', '11'],
'weather': ['test', 'another'],
},
"time": {"startPeriodName": ["ignored", "sometime"]},
'time': {'startPeriodName': ['ignored', 'sometime']},
}
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USWeatherPlugin()
wx.enabled = True
expected = "400F(10F/11F) test. sometime, another."
packet = fake.fake_packet(message="weather")
expected = '400F(10F/11F) test. sometime, another.'
packet = fake.fake_packet(message='weather')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@ -112,93 +112,93 @@ class TestUSMetarPlugin(test_plugin.TestPlugin):
CONF.aprs_fi.apiKey = None
wx = weather_plugin.USMetarPlugin()
expected = "USMetarPlugin isn't enabled"
packet = fake.fake_packet(message="metar")
packet = fake.fake_packet(message='metar')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
def test_failed_aprs_fi_location(self, mock_check):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.side_effect = Exception
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="metar")
expected = 'Failed to fetch aprs.fi location'
packet = fake.fake_packet(message='metar')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
def test_failed_aprs_fi_location_no_entries(self, mock_check):
# When the aprs.fi api key isn't set, then
# the Plugin will be disabled.
mock_check.return_value = {"entries": []}
CONF.aprs_fi.apiKey = "abc123"
mock_check.return_value = {'entries': []}
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "Failed to fetch aprs.fi location"
packet = fake.fake_packet(message="metar")
expected = 'Failed to fetch aprs.fi location'
packet = fake.fake_packet(message='metar')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_weather_gov_metar")
@mock.patch('aprsd.plugin_utils.get_weather_gov_metar')
def test_gov_metar_fetch_fails(self, mock_metar):
mock_metar.side_effect = Exception
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "Unable to find station METAR"
packet = fake.fake_packet(message="metar KPAO")
expected = 'Unable to find station METAR'
packet = fake.fake_packet(message='metar KPAO')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_weather_gov_metar")
@mock.patch('aprsd.plugin_utils.get_weather_gov_metar')
def test_airport_works(self, mock_metar):
class Response:
text = '{"properties": {"rawMessage": "BOGUSMETAR"}}'
mock_metar.return_value = Response()
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "BOGUSMETAR"
packet = fake.fake_packet(message="metar KPAO")
expected = 'BOGUSMETAR'
packet = fake.fake_packet(message='metar KPAO')
actual = wx.filter(packet)
self.assertEqual(expected, actual)
@mock.patch("aprsd.plugin_utils.get_weather_gov_metar")
@mock.patch("aprsd.plugin_utils.get_aprs_fi")
@mock.patch("aprsd.plugin_utils.get_weather_gov_for_gps")
@mock.patch('aprsd.plugin_utils.get_weather_gov_metar')
@mock.patch('aprsd.plugin_utils.get_aprs_fi')
@mock.patch('aprsd.plugin_utils.get_weather_gov_for_gps')
def test_metar_works(self, mock_wx_for_gps, mock_check_aprs, mock_metar):
mock_wx_for_gps.return_value = {
"location": {"metar": "BOGUSMETAR"},
'location': {'metar': 'BOGUSMETAR'},
}
class Response:
text = '{"properties": {"rawMessage": "BOGUSMETAR"}}'
mock_check_aprs.return_value = {
"entries": [
'entries': [
{
"lat": 10,
"lng": 11,
"lasttime": 10,
'lat': 10,
'lng': 11,
'lasttime': 10,
},
],
}
mock_metar.return_value = Response()
CONF.aprs_fi.apiKey = "abc123"
CONF.aprs_fi.apiKey = 'abc123'
CONF.callsign = fake.FAKE_TO_CALLSIGN
wx = weather_plugin.USMetarPlugin()
wx.enabled = True
expected = "BOGUSMETAR"
packet = fake.fake_packet(message="metar")
expected = 'BOGUSMETAR'
packet = fake.fake_packet(message='metar')
actual = wx.filter(packet)
self.assertEqual(expected, actual)

View File

@ -20,18 +20,18 @@ class TestPacketBase(unittest.TestCase):
message_format=core.PACKET_TYPE_MESSAGE,
):
packet_dict = {
"from": from_call,
"addresse": to_call,
"to": to_call,
"format": message_format,
"raw": "",
'from': from_call,
'addresse': to_call,
'to': to_call,
'format': message_format,
'raw': '',
}
if message:
packet_dict["message_text"] = message
packet_dict['message_text'] = message
if msg_number:
packet_dict["msgNo"] = str(msg_number)
packet_dict['msgNo'] = str(msg_number)
return packet_dict
@ -52,7 +52,7 @@ class TestPacketBase(unittest.TestCase):
self.assertEqual(
fake.FAKE_FROM_CALLSIGN,
pkt.get("from_call"),
pkt.get('from_call'),
)
def test_packet_factory(self):
@ -64,21 +64,21 @@ class TestPacketBase(unittest.TestCase):
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.to_call)
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.addresse)
pkt_dict["symbol"] = "_"
pkt_dict["weather"] = {
"wind_gust": 1.11,
"temperature": 32.01,
"humidity": 85,
"pressure": 1095.12,
"comment": "Home!",
pkt_dict['symbol'] = '_'
pkt_dict['weather'] = {
'wind_gust': 1.11,
'temperature': 32.01,
'humidity': 85,
'pressure': 1095.12,
'comment': 'Home!',
}
pkt_dict["format"] = core.PACKET_TYPE_UNCOMPRESSED
pkt_dict['format'] = core.PACKET_TYPE_UNCOMPRESSED
pkt = packets.factory(pkt_dict)
self.assertIsInstance(pkt, packets.WeatherPacket)
@mock.patch("aprsd.packets.core.GPSPacket._build_time_zulu")
@mock.patch('aprsd.packets.core.GPSPacket._build_time_zulu')
def test_packet_format_rain_1h(self, mock_time_zulu):
mock_time_zulu.return_value = "221450"
mock_time_zulu.return_value = '221450'
wx = packets.WeatherPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
@ -87,58 +87,58 @@ class TestPacketBase(unittest.TestCase):
)
wx.prepare()
expected = "KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r000p000P000h00b00000"
expected = 'KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r000p000P000h00b00000'
self.assertEqual(expected, wx.raw)
rain_location = 59
self.assertEqual(rain_location, wx.raw.find("r000"))
self.assertEqual(rain_location, wx.raw.find('r000'))
wx.rain_1h = 1.11
wx.prepare()
expected = "KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r111p000P000h00b00000"
expected = 'KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r111p000P000h00b00000'
self.assertEqual(expected, wx.raw)
wx.rain_1h = 0.01
wx.prepare()
expected = "KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r001p000P000h00b00000"
expected = 'KFAKE>KMINE,WIDE1-1,WIDE2-1:@221450z0.0/0.0_000/000g000t000r001p000P000h00b00000'
self.assertEqual(expected, wx.raw)
def test_beacon_factory(self):
"""Test to ensure a beacon packet is created."""
packet_raw = (
"WB4BOR-12>APZ100,WIDE2-1:@161647z3724.15N107847.58W$ APRSD WebChat"
'WB4BOR-12>APZ100,WIDE2-1:@161647z3724.15N107847.58W$ APRSD WebChat'
)
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
packet_raw = "kd8mey-10>APRS,TCPIP*,qAC,T2SYDNEY:=4247.80N/08539.00WrPHG1210/Making 220 Great Again Allstar# 552191"
packet_raw = 'kd8mey-10>APRS,TCPIP*,qAC,T2SYDNEY:=4247.80N/08539.00WrPHG1210/Making 220 Great Again Allstar# 552191'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.BeaconPacket)
def test_reject_factory(self):
"""Test to ensure a reject packet is created."""
packet_raw = "HB9FDL-1>APK102,HB9FM-4*,WIDE2,qAR,HB9FEF-11::REPEAT :rej4139"
packet_raw = 'HB9FDL-1>APK102,HB9FM-4*,WIDE2,qAR,HB9FEF-11::REPEAT :rej4139'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.RejectPacket)
self.assertEqual("4139", packet.msgNo)
self.assertEqual("HB9FDL-1", packet.from_call)
self.assertEqual("REPEAT", packet.to_call)
self.assertEqual("reject", packet.packet_type)
self.assertEqual('4139', packet.msgNo)
self.assertEqual('HB9FDL-1', packet.from_call)
self.assertEqual('REPEAT', packet.to_call)
self.assertEqual('reject', packet.packet_type)
self.assertIsNone(packet.payload)
def test_thirdparty_factory(self):
"""Test to ensure a third party packet is created."""
packet_raw = "GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW"
packet_raw = 'GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.ThirdPartyPacket)
def test_weather_factory(self):
"""Test to ensure a weather packet is created."""
packet_raw = "FW9222>APRS,TCPXX*,qAX,CWOP-6:@122025z2953.94N/08423.77W_232/003g006t084r000p032P000h80b10157L745.DsWLL"
packet_raw = 'FW9222>APRS,TCPXX*,qAX,CWOP-6:@122025z2953.94N/08423.77W_232/003g006t084r000p032P000h80b10157L745.DsWLL'
packet_dict = aprslib.parse(packet_raw)
packet = packets.factory(packet_dict)
self.assertIsInstance(packet, packets.WeatherPacket)
@ -178,7 +178,7 @@ class TestPacketBase(unittest.TestCase):
)
expected = (
f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:ack123"
f'{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:ack123'
)
self.assertEqual(expected, str(ack))
@ -191,7 +191,7 @@ class TestPacketBase(unittest.TestCase):
)
expected = (
f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:rej123"
f'{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:rej123'
)
self.assertEqual(expected, str(reject))
@ -200,20 +200,20 @@ class TestPacketBase(unittest.TestCase):
lat = 28.123456
lon = -80.123456
ts = 1711219496.6426
comment = "My Beacon Comment"
comment = 'My Beacon Comment'
packet = packets.BeaconPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
latitude=lat,
longitude=lon,
timestamp=ts,
symbol=">",
symbol='>',
comment=comment,
)
expected_lat = aprslib_util.latitude_to_ddm(lat)
expected_lon = aprslib_util.longitude_to_ddm(lon)
expected = f"KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{comment}"
expected = f'KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{comment}'
self.assertEqual(expected, str(packet))
def test_beacon_format_no_comment(self):
@ -227,13 +227,13 @@ class TestPacketBase(unittest.TestCase):
latitude=lat,
longitude=lon,
timestamp=ts,
symbol=">",
symbol='>',
)
empty_comment = "APRSD Beacon"
empty_comment = 'APRSD Beacon'
expected_lat = aprslib_util.latitude_to_ddm(lat)
expected_lon = aprslib_util.longitude_to_ddm(lon)
expected = f"KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{empty_comment}"
expected = f'KFAKE>APZ100:@231844z{expected_lat}/{expected_lon}>{empty_comment}'
self.assertEqual(expected, str(packet))
def test_bulletin_format(self):
@ -242,32 +242,32 @@ class TestPacketBase(unittest.TestCase):
bid = 0
packet = packets.BulletinPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
message_text="My Bulletin Message",
message_text='My Bulletin Message',
bid=0,
)
expected = (
f"{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{packet.message_text}"
f'{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{packet.message_text}'
)
self.assertEqual(expected, str(packet))
# bulletin id = 1
bid = 1
txt = "((((((( CX2SA - Salto Uruguay ))))))) http://www.cx2sa.org"
txt = '((((((( CX2SA - Salto Uruguay ))))))) http://www.cx2sa.org'
packet = packets.BulletinPacket(
from_call=fake.FAKE_FROM_CALLSIGN,
message_text=txt,
bid=1,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{txt}"
expected = f'{fake.FAKE_FROM_CALLSIGN}>APZ100::BLN{bid:<9}:{txt}'
self.assertEqual(expected, str(packet))
def test_message_format(self):
"""Test the message packet format."""
message = "My Message"
msgno = "ABX"
message = 'My Message'
msgno = 'ABX'
packet = packets.MessagePacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
@ -275,19 +275,19 @@ class TestPacketBase(unittest.TestCase):
msgNo=msgno,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{message}{{{msgno}"
expected = f'{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{message}{{{msgno}'
self.assertEqual(expected, str(packet))
# test with bad words
# Currently fails with mixed case
message = "My cunt piss fuck shIt text"
exp_msg = "My **** **** **** **** text"
msgno = "ABX"
message = 'My cunt piss fuck shIt text'
exp_msg = 'My **** **** **** **** text'
msgno = 'ABX'
packet = packets.MessagePacket(
from_call=fake.FAKE_FROM_CALLSIGN,
to_call=fake.FAKE_TO_CALLSIGN,
message_text=message,
msgNo=msgno,
)
expected = f"{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{exp_msg}{{{msgno}"
expected = f'{fake.FAKE_FROM_CALLSIGN}>APZ100::{fake.FAKE_TO_CALLSIGN:<9}:{exp_msg}{{{msgno}'
self.assertEqual(expected, str(packet))