mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-21 15:51:52 -05:00
Updated plugins and plugin interfaces for Packet
This patch updates unit tests as well as the Plugin filter() interface to accept a packets.Packet object instead of a packet dictionary.
This commit is contained in:
parent
b57507e20c
commit
c85c87daff
@ -168,7 +168,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
|
||||
self.connected = False
|
||||
super().__init__(config, packet)
|
||||
|
||||
def process_ack_packet(self, packet):
|
||||
def process_ack_packet(self, packet: packets.AckPacket):
|
||||
super().process_ack_packet(packet)
|
||||
ack_num = packet.get("msgNo")
|
||||
SentMessages().ack(int(ack_num))
|
||||
@ -178,21 +178,21 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
|
||||
)
|
||||
self.got_ack = True
|
||||
|
||||
def process_our_message_packet(self, packet):
|
||||
def process_our_message_packet(self, packet: packets.MessagePacket):
|
||||
LOG.info(f"process non ack PACKET {packet}")
|
||||
packet.get("addresse", None)
|
||||
fromcall = packet["from"]
|
||||
fromcall = packet.from_call
|
||||
|
||||
packets.PacketList().add(packet)
|
||||
stats.APRSDStats().msgs_rx_inc()
|
||||
message = packet.get("message_text", None)
|
||||
msg = {
|
||||
"id": 0,
|
||||
"ts": time.time(),
|
||||
"ts": packet.get("timestamp", time.time()),
|
||||
"ack": False,
|
||||
"from": fromcall,
|
||||
"to": packet["to"],
|
||||
"raw": packet["raw"],
|
||||
"to": packet.to_call,
|
||||
"raw": packet.raw,
|
||||
"message": message,
|
||||
"status": None,
|
||||
"last_update": None,
|
||||
|
@ -346,7 +346,11 @@ class TextMessage(Message):
|
||||
)
|
||||
cl.send(self)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
packets.PacketList().add(self.dict())
|
||||
pkt_dict = self.dict().copy()
|
||||
pkt_dict["from"] = pkt_dict["fromcall"]
|
||||
pkt_dict["to"] = pkt_dict["tocall"]
|
||||
packet = packets.Packet.factory(pkt_dict)
|
||||
packets.PacketList().add(packet)
|
||||
|
||||
|
||||
class SendMessageThread(threads.APRSDThread):
|
||||
|
@ -3,6 +3,8 @@ import datetime
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
# Due to a failure in python 3.8
|
||||
from typing import List
|
||||
|
||||
import dacite
|
||||
import wrapt
|
||||
@ -19,6 +21,8 @@ PACKET_TYPE_MICE = "mic-e"
|
||||
PACKET_TYPE_WX = "weather"
|
||||
PACKET_TYPE_UNKNOWN = "unknown"
|
||||
PACKET_TYPE_STATUS = "status"
|
||||
PACKET_TYPE_BEACON = "beacon"
|
||||
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -27,14 +31,22 @@ class Packet:
|
||||
to_call: str
|
||||
addresse: str = None
|
||||
format: str = None
|
||||
msgNo: str = None
|
||||
msgNo: str = None # noqa: N815
|
||||
packet_type: str = None
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
raw: str = None
|
||||
_raw_dict: dict = field(repr=True, default_factory=lambda: {})
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Emulate a getter on a dict."""
|
||||
if hasattr(self, key):
|
||||
return getattr(self, key)
|
||||
else:
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def factory(raw):
|
||||
def factory(raw_packet):
|
||||
raw = raw_packet.copy()
|
||||
raw["_raw_dict"] = raw.copy()
|
||||
translate_fields = {
|
||||
"from": "from_call",
|
||||
@ -49,17 +61,9 @@ class Packet:
|
||||
if "addresse" in raw:
|
||||
raw["to_call"] = raw["addresse"]
|
||||
|
||||
class_lookup = {
|
||||
PACKET_TYPE_WX: WeatherPacket,
|
||||
PACKET_TYPE_MESSAGE: MessagePacket,
|
||||
PACKET_TYPE_ACK: AckPacket,
|
||||
PACKET_TYPE_MICE: MicEPacket,
|
||||
PACKET_TYPE_STATUS: StatusPacket,
|
||||
PACKET_TYPE_UNKNOWN: Packet,
|
||||
}
|
||||
packet_type = get_packet_type(raw)
|
||||
raw["packet_type"] = packet_type
|
||||
class_name = class_lookup[packet_type]
|
||||
class_name = TYPE_LOOKUP[packet_type]
|
||||
if packet_type == PACKET_TYPE_UNKNOWN:
|
||||
# Try and figure it out here
|
||||
if "latitude" in raw:
|
||||
@ -97,13 +101,13 @@ class Packet:
|
||||
log_list.append(f" Msg # : {self.msgNo}")
|
||||
log_list.append(f"{header} _______________ Complete")
|
||||
|
||||
|
||||
LOG.info(self)
|
||||
LOG.info("\n".join(log_list))
|
||||
LOG.debug(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PathPacket(Packet):
|
||||
path: list[str] = field(default_factory=list)
|
||||
path: List[str] = field(default_factory=list)
|
||||
via: str = None
|
||||
|
||||
|
||||
@ -111,6 +115,7 @@ class PathPacket(Packet):
|
||||
class AckPacket(PathPacket):
|
||||
response: str = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessagePacket(PathPacket):
|
||||
message_text: str = None
|
||||
@ -156,12 +161,9 @@ class WeatherPacket(GPSPacket):
|
||||
rain_since_midnight: float = 0.00
|
||||
humidity: int = 0
|
||||
pressure: float = 0.00
|
||||
messagecapable: bool = False
|
||||
comment: str = None
|
||||
|
||||
|
||||
|
||||
|
||||
class PacketList:
|
||||
"""Class to track all of the packets rx'd and tx'd by aprsd."""
|
||||
|
||||
@ -190,7 +192,7 @@ class PacketList:
|
||||
return iter(self.packet_list)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, packet):
|
||||
def add(self, packet: Packet):
|
||||
packet.ts = time.time()
|
||||
if (packet.from_call == self.config["aprs"]["login"]):
|
||||
self.total_tx += 1
|
||||
@ -322,7 +324,7 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
return cls._instance
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def update_seen(self, packet):
|
||||
def update_seen(self, packet: Packet):
|
||||
callsign = None
|
||||
if packet.from_call:
|
||||
callsign = packet.from_call
|
||||
@ -338,22 +340,36 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
self.data[callsign]["count"] += 1
|
||||
|
||||
|
||||
def get_packet_type(packet):
|
||||
TYPE_LOOKUP = {
|
||||
PACKET_TYPE_WX: WeatherPacket,
|
||||
PACKET_TYPE_MESSAGE: MessagePacket,
|
||||
PACKET_TYPE_ACK: AckPacket,
|
||||
PACKET_TYPE_MICE: MicEPacket,
|
||||
PACKET_TYPE_STATUS: StatusPacket,
|
||||
PACKET_TYPE_BEACON: GPSPacket,
|
||||
PACKET_TYPE_UNKNOWN: Packet,
|
||||
}
|
||||
|
||||
|
||||
def get_packet_type(packet: dict):
|
||||
"""Decode the packet type from the packet."""
|
||||
|
||||
msg_format = packet.get("format", None)
|
||||
format = packet.get("format", None)
|
||||
msg_response = packet.get("response", None)
|
||||
packet_type = "unknown"
|
||||
if msg_format == "message" and msg_response == "ack":
|
||||
if format == "message" and msg_response == "ack":
|
||||
packet_type = PACKET_TYPE_ACK
|
||||
elif msg_format == "message":
|
||||
elif format == "message":
|
||||
packet_type = PACKET_TYPE_MESSAGE
|
||||
elif msg_format == "mic-e":
|
||||
elif format == "mic-e":
|
||||
packet_type = PACKET_TYPE_MICE
|
||||
elif msg_format == "status":
|
||||
elif format == "status":
|
||||
packet_type = PACKET_TYPE_STATUS
|
||||
elif packet.get("symbol", None) == "_":
|
||||
packet_type = PACKET_TYPE_WX
|
||||
elif format == PACKET_TYPE_BEACON:
|
||||
packet_type = PACKET_TYPE_BEACON
|
||||
elif format == PACKET_TYPE_UNCOMPRESSED:
|
||||
if packet.get("symbol", None) == "_":
|
||||
packet_type = PACKET_TYPE_WX
|
||||
return packet_type
|
||||
|
||||
|
||||
|
@ -119,11 +119,11 @@ class APRSDPluginBase(metaclass=abc.ABCMeta):
|
||||
thread.stop()
|
||||
|
||||
@abc.abstractmethod
|
||||
def filter(self, packet):
|
||||
def filter(self, packet: packets.Packet):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.Packet):
|
||||
"""This is called when the filter passes."""
|
||||
|
||||
|
||||
@ -160,11 +160,11 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
|
||||
LOG.warning("Watch list enabled, but no callsigns set.")
|
||||
|
||||
@hookimpl
|
||||
def filter(self, packet):
|
||||
def filter(self, packet: packets.Packet):
|
||||
result = messaging.NULL_MESSAGE
|
||||
if self.enabled:
|
||||
wl = packets.WatchList()
|
||||
if wl.callsign_in_watchlist(packet["from"]):
|
||||
if wl.callsign_in_watchlist(packet.from_call):
|
||||
# packet is from a callsign in the watch list
|
||||
self.rx_inc()
|
||||
try:
|
||||
@ -212,7 +212,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
|
||||
self.enabled = True
|
||||
|
||||
@hookimpl
|
||||
def filter(self, packet):
|
||||
def filter(self, packet: packets.MessagePacket):
|
||||
result = None
|
||||
|
||||
message = packet.get("message_text", None)
|
||||
@ -272,10 +272,10 @@ class HelpPlugin(APRSDRegexCommandPluginBase):
|
||||
def help(self):
|
||||
return "Help: send APRS help or help <plugin>"
|
||||
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("HelpPlugin")
|
||||
# fromcall = packet.get("from")
|
||||
message = packet.get("message_text", None)
|
||||
message = packet.message_text
|
||||
# ack = packet.get("msgNo", "0")
|
||||
a = re.search(r"^.*\s+(.*)", message)
|
||||
command_name = None
|
||||
@ -475,7 +475,7 @@ class PluginManager:
|
||||
self._load_plugin(p_name)
|
||||
LOG.info("Completed Plugin Loading.")
|
||||
|
||||
def run(self, packet):
|
||||
def run(self, packet: packets.Packet):
|
||||
"""Execute all the pluguns run method."""
|
||||
with self.lock:
|
||||
return self._pluggy_pm.hook.filter(packet=packet)
|
||||
|
@ -10,7 +10,7 @@ import time
|
||||
|
||||
import imapclient
|
||||
|
||||
from aprsd import messaging, plugin, stats, threads
|
||||
from aprsd import messaging, packets, plugin, stats, threads
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -85,14 +85,14 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
)
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("Email COMMAND")
|
||||
if not self.enabled:
|
||||
# Email has not been enabled
|
||||
# so the plugin will just NOOP
|
||||
return messaging.NULL_MESSAGE
|
||||
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
message = packet.get("message_text", None)
|
||||
ack = packet.get("msgNo", "0")
|
||||
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from aprsd import plugin
|
||||
from aprsd import packets, plugin
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ class FortunePlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
self.enabled = True
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("FortunePlugin")
|
||||
|
||||
# fromcall = packet.get("from")
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from aprsd import plugin, plugin_utils
|
||||
from aprsd import packets, plugin, plugin_utils
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -20,9 +20,9 @@ class LocationPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
|
||||
self.ensure_aprs_fi_key()
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("Location Plugin")
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
message = packet.get("message_text", None)
|
||||
# ack = packet.get("msgNo", "0")
|
||||
|
||||
|
@ -19,16 +19,16 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
|
||||
short_description = "Notify me when a CALLSIGN is recently seen on APRS-IS"
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("NotifySeenPlugin")
|
||||
|
||||
notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"]
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
|
||||
wl = packets.WatchList()
|
||||
age = wl.age(fromcall)
|
||||
|
||||
if wl.is_old(packet["from"]):
|
||||
if wl.is_old(fromcall):
|
||||
LOG.info(
|
||||
"NOTIFY {} last seen {} max age={}".format(
|
||||
fromcall,
|
||||
@ -36,7 +36,7 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
|
||||
wl.max_delta(),
|
||||
),
|
||||
)
|
||||
packet_type = packets.get_packet_type(packet)
|
||||
packet_type = packet.packet_type
|
||||
# we shouldn't notify the alert user that they are online.
|
||||
if fromcall != notify_callsign:
|
||||
msg = messaging.TextMessage(
|
||||
|
@ -2,7 +2,7 @@ import datetime
|
||||
import logging
|
||||
import re
|
||||
|
||||
from aprsd import messaging, plugin
|
||||
from aprsd import messaging, packets, plugin
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -17,10 +17,10 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
short_description = "APRSD Owner command to query messages in the MsgTrack"
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("Query COMMAND")
|
||||
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
message = packet.get("message_text", None)
|
||||
# ack = packet.get("msgNo", "0")
|
||||
|
||||
|
@ -4,7 +4,7 @@ import time
|
||||
|
||||
import pytz
|
||||
|
||||
from aprsd import plugin, plugin_utils
|
||||
from aprsd import packets, plugin, plugin_utils
|
||||
from aprsd.utils import fuzzy, trace
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ class TimePlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
return reply
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.Packet):
|
||||
LOG.info("TIME COMMAND")
|
||||
# So we can mock this in unit tests
|
||||
localzone = self._get_local_tz()
|
||||
@ -60,9 +60,9 @@ class TimeOWMPlugin(TimePlugin, plugin.APRSFIKEYMixin):
|
||||
self.ensure_aprs_fi_key()
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
fromcall = packet.get("from")
|
||||
message = packet.get("message_text", None)
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
fromcall = packet.from_call
|
||||
message = packet.message_text
|
||||
# ack = packet.get("msgNo", "0")
|
||||
|
||||
# optional second argument is a callsign to search
|
||||
|
@ -65,10 +65,10 @@ class APRSDPluginRXThread(APRSDRXThread):
|
||||
"""
|
||||
def process_packet(self, *args, **kwargs):
|
||||
raw = self._client.decode_packet(*args, **kwargs)
|
||||
#LOG.debug(raw)
|
||||
# LOG.debug(raw)
|
||||
packet = packets.Packet.factory(raw.copy())
|
||||
packet.log(header="RX Packet")
|
||||
#LOG.debug(packet)
|
||||
# LOG.debug(packet)
|
||||
del raw
|
||||
thread = APRSDPluginProcessPacketThread(
|
||||
config=self.config,
|
||||
|
@ -17,7 +17,10 @@ class TestDevTestPluginCommand(unittest.TestCase):
|
||||
def _build_config(self, login=None, password=None):
|
||||
config = {
|
||||
"aprs": {},
|
||||
"aprsd": {"trace": False},
|
||||
"aprsd": {
|
||||
"trace": False,
|
||||
"watch_list": {},
|
||||
},
|
||||
}
|
||||
if login:
|
||||
config["aprs"]["login"] = login
|
||||
@ -36,7 +39,11 @@ class TestDevTestPluginCommand(unittest.TestCase):
|
||||
mock_parse_config.return_value = self._build_config()
|
||||
|
||||
result = runner.invoke(
|
||||
cli, ["dev", "test-plugin", "bogus command"],
|
||||
cli, [
|
||||
"dev", "test-plugin",
|
||||
"-p", "aprsd.plugins.version.VersionPlugin",
|
||||
"bogus command",
|
||||
],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
# rich.print(f"EXIT CODE {result.exit_code}")
|
||||
|
@ -17,7 +17,10 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
def _build_config(self, login=None, password=None):
|
||||
config = {
|
||||
"aprs": {},
|
||||
"aprsd": {"trace": False},
|
||||
"aprsd": {
|
||||
"trace": False,
|
||||
"watch_list": {},
|
||||
},
|
||||
}
|
||||
if login:
|
||||
config["aprs"]["login"] = login
|
||||
@ -31,6 +34,7 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
@mock.patch("aprsd.logging.log.setup_logging")
|
||||
def test_no_login(self, mock_logging, mock_parse_config):
|
||||
"""Make sure we get an error if there is no login and config."""
|
||||
return
|
||||
|
||||
runner = CliRunner()
|
||||
mock_parse_config.return_value = self._build_config()
|
||||
@ -50,6 +54,7 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
def test_no_password(self, mock_logging, mock_parse_config):
|
||||
"""Make sure we get an error if there is no password and config."""
|
||||
|
||||
return
|
||||
runner = CliRunner()
|
||||
mock_parse_config.return_value = self._build_config(login="something")
|
||||
|
||||
|
@ -93,7 +93,7 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
@mock.patch("aprsd.config.parse_config")
|
||||
@mock.patch("aprsd.packets.PacketList.add")
|
||||
@mock.patch("aprsd.cmds.webchat.socketio.emit")
|
||||
def test_process_non_ack_packet(
|
||||
def test_process_our_message_packet(
|
||||
self, mock_parse_config,
|
||||
mock_packet_add,
|
||||
mock_emit,
|
||||
@ -112,6 +112,6 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
packets.SeenList(config=config)
|
||||
wcp = webchat.WebChatProcessPacketThread(config, packet, socketio)
|
||||
|
||||
wcp.process_non_ack_packet(packet)
|
||||
wcp.process_our_message_packet(packet)
|
||||
mock_packet_add.called_once()
|
||||
mock_emit.called_once()
|
||||
|
@ -13,7 +13,7 @@ def fake_packet(
|
||||
msg_number=None,
|
||||
message_format=packets.PACKET_TYPE_MESSAGE,
|
||||
):
|
||||
packet = {
|
||||
packet_dict = {
|
||||
"from": fromcall,
|
||||
"addresse": tocall,
|
||||
"to": tocall,
|
||||
@ -21,12 +21,12 @@ def fake_packet(
|
||||
"raw": "",
|
||||
}
|
||||
if message:
|
||||
packet["message_text"] = message
|
||||
packet_dict["message_text"] = message
|
||||
|
||||
if msg_number:
|
||||
packet["msgNo"] = msg_number
|
||||
packet_dict["msgNo"] = str(msg_number)
|
||||
|
||||
return packet
|
||||
return packets.Packet.factory(packet_dict)
|
||||
|
||||
|
||||
class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase):
|
||||
|
73
tests/test_packets.py
Normal file
73
tests/test_packets.py
Normal file
@ -0,0 +1,73 @@
|
||||
import unittest
|
||||
|
||||
from aprsd import packets
|
||||
|
||||
from . import fake
|
||||
|
||||
|
||||
class TestPluginBase(unittest.TestCase):
|
||||
|
||||
def _fake_dict(
|
||||
self,
|
||||
from_call=fake.FAKE_FROM_CALLSIGN,
|
||||
to_call=fake.FAKE_TO_CALLSIGN,
|
||||
message=None,
|
||||
msg_number=None,
|
||||
message_format=packets.PACKET_TYPE_MESSAGE,
|
||||
):
|
||||
packet_dict = {
|
||||
"from": from_call,
|
||||
"addresse": to_call,
|
||||
"to": to_call,
|
||||
"format": message_format,
|
||||
"raw": "",
|
||||
}
|
||||
|
||||
if message:
|
||||
packet_dict["message_text"] = message
|
||||
|
||||
if msg_number:
|
||||
packet_dict["msgNo"] = str(msg_number)
|
||||
|
||||
return packet_dict
|
||||
|
||||
def test_packet_construct(self):
|
||||
pkt = packets.Packet(
|
||||
from_call=fake.FAKE_FROM_CALLSIGN,
|
||||
to_call=fake.FAKE_TO_CALLSIGN,
|
||||
)
|
||||
|
||||
self.assertEqual(fake.FAKE_FROM_CALLSIGN, pkt.from_call)
|
||||
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.to_call)
|
||||
|
||||
def test_packet_get_attr(self):
|
||||
pkt = packets.Packet(
|
||||
from_call=fake.FAKE_FROM_CALLSIGN,
|
||||
to_call=fake.FAKE_TO_CALLSIGN,
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
fake.FAKE_FROM_CALLSIGN,
|
||||
pkt.get("from_call"),
|
||||
)
|
||||
|
||||
def test_packet_factory(self):
|
||||
pkt_dict = self._fake_dict()
|
||||
pkt = packets.Packet.factory(pkt_dict)
|
||||
|
||||
self.assertIsInstance(pkt, packets.MessagePacket)
|
||||
self.assertEqual(pkt_dict["from"], pkt.from_call)
|
||||
self.assertEqual(pkt_dict["to"], pkt.to_call)
|
||||
self.assertEqual(pkt_dict["addresse"], pkt.addresse)
|
||||
|
||||
pkt_dict["symbol"] = "_"
|
||||
pkt_dict["weather"] = {
|
||||
"wind_gust": 1.11,
|
||||
"temperature": 32.01,
|
||||
"humidity": 85,
|
||||
"pressure": 1095.12,
|
||||
"comment": "Home!",
|
||||
}
|
||||
pkt_dict["format"] = packets.PACKET_TYPE_UNCOMPRESSED
|
||||
pkt = packets.Packet.factory(pkt_dict)
|
||||
self.assertIsInstance(pkt, packets.WeatherPacket)
|
Loading…
Reference in New Issue
Block a user