mirror of
https://github.com/craigerl/aprsd.git
synced 2026-04-29 19:34:00 -04:00
Lots of APRS messages don't have message ids in the packet. This prevents packet.msgNo from having a value. so when we look the packet up in the packet_list, the packet key has a None for a value for the msgNo portion of the key. This results in every packet after the first from that callsign that doesn't have a msg id, will be marked as a dupe and will never be processed. This patch checks to see if msgNo is None. If it is, then just mark the packet as never being seen and pass it on the chain to be processed. This also means that there should never be an Ack sent for those packets that don't container a msgNo value.
384 lines
14 KiB
Python
384 lines
14 KiB
Python
import abc
|
|
import logging
|
|
import queue
|
|
import time
|
|
|
|
import aprslib
|
|
from oslo_config import cfg
|
|
|
|
from aprsd import packets, plugin
|
|
from aprsd.client import client_factory
|
|
from aprsd.packets import collector
|
|
from aprsd.packets import log as packet_log
|
|
from aprsd.threads import APRSDThread, tx
|
|
|
|
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger("APRSD")
|
|
|
|
|
|
class APRSDRXThread(APRSDThread):
|
|
_client = None
|
|
|
|
def __init__(self, packet_queue):
|
|
super().__init__("RX_PKT")
|
|
self.packet_queue = packet_queue
|
|
|
|
def stop(self):
|
|
self.thread_stop = True
|
|
if self._client:
|
|
self._client.stop()
|
|
|
|
def loop(self):
|
|
if not self._client:
|
|
self._client = client_factory.create()
|
|
time.sleep(1)
|
|
return True
|
|
|
|
if not self._client.is_connected:
|
|
self._client = client_factory.create()
|
|
time.sleep(1)
|
|
return True
|
|
|
|
# setup the consumer of messages and block until a messages
|
|
try:
|
|
# This will register a packet consumer with aprslib
|
|
# When new packets come in the consumer will process
|
|
# the packet
|
|
|
|
# Do a partial here because the consumer signature doesn't allow
|
|
# For kwargs to be passed in to the consumer func we declare
|
|
# and the aprslib developer didn't want to allow a PR to add
|
|
# kwargs. :(
|
|
# https://github.com/rossengeorgiev/aprs-python/pull/56
|
|
self._client.consumer(
|
|
self._process_packet, raw=False, blocking=False,
|
|
)
|
|
except (
|
|
aprslib.exceptions.ConnectionDrop,
|
|
aprslib.exceptions.ConnectionError,
|
|
):
|
|
LOG.error("Connection dropped, reconnecting")
|
|
# Force the deletion of the client object connected to aprs
|
|
# This will cause a reconnect, next time client.get_client()
|
|
# is called
|
|
self._client.reset()
|
|
time.sleep(5)
|
|
except Exception:
|
|
# LOG.exception(ex)
|
|
LOG.error("Resetting connection and trying again.")
|
|
self._client.reset()
|
|
time.sleep(5)
|
|
# Continue to loop
|
|
time.sleep(1)
|
|
return True
|
|
|
|
def _process_packet(self, *args, **kwargs):
|
|
"""Intermediate callback so we can update the keepalive time."""
|
|
# Now call the 'real' packet processing for a RX'x packet
|
|
self.process_packet(*args, **kwargs)
|
|
|
|
@abc.abstractmethod
|
|
def process_packet(self, *args, **kwargs):
|
|
pass
|
|
|
|
|
|
class APRSDDupeRXThread(APRSDRXThread):
|
|
"""Process received packets.
|
|
|
|
This is the main APRSD Server command thread that
|
|
receives packets and makes sure the packet
|
|
hasn't been seen previously before sending it on
|
|
to be processed.
|
|
"""
|
|
|
|
def process_packet(self, *args, **kwargs):
|
|
"""This handles the processing of an inbound packet.
|
|
|
|
When a packet is received by the connected client object,
|
|
it sends the raw packet into this function. This function then
|
|
decodes the packet via the client, and then processes the packet.
|
|
Ack Packets are sent to the PluginProcessPacketThread for processing.
|
|
All other packets have to be checked as a dupe, and then only after
|
|
we haven't seen this packet before, do we send it to the
|
|
PluginProcessPacketThread for processing.
|
|
"""
|
|
packet = self._client.decode_packet(*args, **kwargs)
|
|
# LOG.debug(raw)
|
|
packet_log.log(packet)
|
|
pkt_list = packets.PacketList()
|
|
|
|
if isinstance(packet, packets.AckPacket):
|
|
# We don't need to drop AckPackets, those should be
|
|
# processed.
|
|
self.packet_queue.put(packet)
|
|
else:
|
|
# Make sure we aren't re-processing the same packet
|
|
# For RF based APRS Clients we can get duplicate packets
|
|
# So we need to track them and not process the dupes.
|
|
found = False
|
|
try:
|
|
# Find the packet in the list of already seen packets
|
|
# Based on the packet.key
|
|
found = pkt_list.find(packet)
|
|
if not packet.msgNo:
|
|
# If the packet doesn't have a message id
|
|
# then there is no reliable way to detect
|
|
# if it's a dupe, so we just pass it on.
|
|
# it shouldn't get acked either.
|
|
found = False
|
|
except KeyError:
|
|
found = False
|
|
|
|
if not found:
|
|
# We haven't seen this packet before, so we process it.
|
|
collector.PacketCollector().rx(packet)
|
|
self.packet_queue.put(packet)
|
|
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
|
|
# If the packet came in within N seconds of the
|
|
# Last time seeing the packet, then we drop it as a dupe.
|
|
LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.")
|
|
else:
|
|
LOG.warning(
|
|
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
|
|
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
|
|
)
|
|
collector.PacketCollector().rx(packet)
|
|
self.packet_queue.put(packet)
|
|
|
|
|
|
class APRSDPluginRXThread(APRSDDupeRXThread):
|
|
""""Process received packets.
|
|
|
|
For backwards compatibility, we keep the APRSDPluginRXThread.
|
|
"""
|
|
|
|
|
|
class APRSDProcessPacketThread(APRSDThread):
|
|
"""Base class for processing received packets.
|
|
|
|
This is the base class for processing packets coming from
|
|
the consumer. This base class handles sending ack packets and
|
|
will ack a message before sending the packet to the subclass
|
|
for processing."""
|
|
|
|
def __init__(self, packet_queue):
|
|
self.packet_queue = packet_queue
|
|
super().__init__("ProcessPKT")
|
|
if not CONF.enable_sending_ack_packets:
|
|
LOG.warning(
|
|
"Sending ack packets is disabled, messages "
|
|
"will not be acknowledged.",
|
|
)
|
|
|
|
def process_ack_packet(self, packet):
|
|
"""We got an ack for a message, no need to resend it."""
|
|
ack_num = packet.msgNo
|
|
LOG.debug(f"Got ack for message {ack_num}")
|
|
collector.PacketCollector().rx(packet)
|
|
|
|
def process_piggyback_ack(self, packet):
|
|
"""We got an ack embedded in a packet."""
|
|
ack_num = packet.ackMsgNo
|
|
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
|
|
collector.PacketCollector().rx(packet)
|
|
|
|
def process_reject_packet(self, packet):
|
|
"""We got a reject message for a packet. Stop sending the message."""
|
|
ack_num = packet.msgNo
|
|
LOG.debug(f"Got REJECT for message {ack_num}")
|
|
collector.PacketCollector().rx(packet)
|
|
|
|
def loop(self):
|
|
try:
|
|
packet = self.packet_queue.get(timeout=1)
|
|
if packet:
|
|
self.process_packet(packet)
|
|
except queue.Empty:
|
|
pass
|
|
return True
|
|
|
|
def process_packet(self, packet):
|
|
"""Process a packet received from aprs-is server."""
|
|
LOG.debug(f"ProcessPKT-LOOP {self.loop_count}")
|
|
our_call = CONF.callsign.lower()
|
|
|
|
from_call = packet.from_call
|
|
if packet.addresse:
|
|
to_call = packet.addresse
|
|
else:
|
|
to_call = packet.to_call
|
|
msg_id = packet.msgNo
|
|
|
|
# We don't put ack packets destined for us through the
|
|
# plugins.
|
|
if (
|
|
isinstance(packet, packets.AckPacket)
|
|
and packet.addresse.lower() == our_call
|
|
):
|
|
self.process_ack_packet(packet)
|
|
elif (
|
|
isinstance(packet, packets.RejectPacket)
|
|
and packet.addresse.lower() == our_call
|
|
):
|
|
self.process_reject_packet(packet)
|
|
else:
|
|
if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
|
|
# we got an ack embedded in this packet
|
|
# we need to handle the ack
|
|
self.process_piggyback_ack(packet)
|
|
# Only ack messages that were sent directly to us
|
|
if isinstance(packet, packets.MessagePacket):
|
|
if (
|
|
to_call and to_call.lower() == our_call
|
|
and msg_id
|
|
):
|
|
# It's a MessagePacket and it's for us!
|
|
# let any threads do their thing, then ack
|
|
# send an ack last
|
|
tx.send(
|
|
packets.AckPacket(
|
|
from_call=CONF.callsign,
|
|
to_call=from_call,
|
|
msgNo=msg_id,
|
|
),
|
|
)
|
|
|
|
self.process_our_message_packet(packet)
|
|
else:
|
|
# Packet wasn't meant for us!
|
|
self.process_other_packet(packet, for_us=False)
|
|
else:
|
|
self.process_other_packet(
|
|
packet, for_us=(to_call.lower() == our_call),
|
|
)
|
|
LOG.debug(f"Packet processing complete for pkt '{packet.key}'")
|
|
return False
|
|
|
|
@abc.abstractmethod
|
|
def process_our_message_packet(self, packet):
|
|
"""Process a MessagePacket destined for us!"""
|
|
|
|
def process_other_packet(self, packet, for_us=False):
|
|
"""Process an APRS Packet that isn't a message or ack"""
|
|
if not for_us:
|
|
LOG.info("Got a packet not meant for us.")
|
|
else:
|
|
LOG.info("Got a non AckPacket/MessagePacket")
|
|
|
|
|
|
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
|
"""Process the packet through the plugin manager.
|
|
|
|
This is the main aprsd server plugin processing thread."""
|
|
|
|
def process_other_packet(self, packet, for_us=False):
|
|
pm = plugin.PluginManager()
|
|
try:
|
|
results = pm.run_watchlist(packet)
|
|
for reply in results:
|
|
if isinstance(reply, list):
|
|
for subreply in reply:
|
|
LOG.debug(f"Sending '{subreply}'")
|
|
if isinstance(subreply, packets.Packet):
|
|
tx.send(subreply)
|
|
else:
|
|
wl = CONF.watch_list
|
|
to_call = wl["alert_callsign"]
|
|
tx.send(
|
|
packets.MessagePacket(
|
|
from_call=CONF.callsign,
|
|
to_call=to_call,
|
|
message_text=subreply,
|
|
),
|
|
)
|
|
elif isinstance(reply, packets.Packet):
|
|
# We have a message based object.
|
|
tx.send(reply)
|
|
except Exception as ex:
|
|
LOG.error("Plugin failed!!!")
|
|
LOG.exception(ex)
|
|
|
|
def process_our_message_packet(self, packet):
|
|
"""Send the packet through the plugins."""
|
|
from_call = packet.from_call
|
|
if packet.addresse:
|
|
to_call = packet.addresse
|
|
else:
|
|
to_call = None
|
|
|
|
pm = plugin.PluginManager()
|
|
try:
|
|
results = pm.run(packet)
|
|
replied = False
|
|
for reply in results:
|
|
if isinstance(reply, list):
|
|
# one of the plugins wants to send multiple messages
|
|
replied = True
|
|
for subreply in reply:
|
|
LOG.debug(f"Sending '{subreply}'")
|
|
if isinstance(subreply, packets.Packet):
|
|
tx.send(subreply)
|
|
else:
|
|
tx.send(
|
|
packets.MessagePacket(
|
|
from_call=CONF.callsign,
|
|
to_call=from_call,
|
|
message_text=subreply,
|
|
),
|
|
)
|
|
elif isinstance(reply, packets.Packet):
|
|
# We have a message based object.
|
|
tx.send(reply)
|
|
replied = True
|
|
else:
|
|
replied = True
|
|
# A plugin can return a null message flag which signals
|
|
# us that they processed the message correctly, but have
|
|
# nothing to reply with, so we avoid replying with a
|
|
# usage string
|
|
if reply is not packets.NULL_MESSAGE:
|
|
LOG.debug(f"Sending '{reply}'")
|
|
tx.send(
|
|
packets.MessagePacket(
|
|
from_call=CONF.callsign,
|
|
to_call=from_call,
|
|
message_text=reply,
|
|
),
|
|
)
|
|
|
|
# If the message was for us and we didn't have a
|
|
# response, then we send a usage statement.
|
|
if to_call == CONF.callsign and not replied:
|
|
|
|
# Tailor the messages accordingly
|
|
if CONF.load_help_plugin:
|
|
LOG.warning("Sending help!")
|
|
message_text = "Unknown command! Send 'help' message for help"
|
|
else:
|
|
LOG.warning("Unknown command!")
|
|
message_text = "Unknown command!"
|
|
|
|
tx.send(
|
|
packets.MessagePacket(
|
|
from_call=CONF.callsign,
|
|
to_call=from_call,
|
|
message_text=message_text,
|
|
),
|
|
)
|
|
except Exception as ex:
|
|
LOG.error("Plugin failed!!!")
|
|
LOG.exception(ex)
|
|
# Do we need to send a reply?
|
|
if to_call == CONF.callsign:
|
|
reply = "A Plugin failed! try again?"
|
|
tx.send(
|
|
packets.MessagePacket(
|
|
from_call=CONF.callsign,
|
|
to_call=from_call,
|
|
message_text=reply,
|
|
),
|
|
)
|
|
|
|
LOG.debug("Completed process_our_message_packet")
|