1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-04-19 09:49:01 -04:00

Added new PacketFilter mechanism

This patch adds the new PacketFilter class as a generic mechanism
for doing packet filtering during the packet processing phase of
recieving packets.

The packet phases are:
1. reception and stats collection
2. packet processing.

Each phase has a single thread for handling that phase.

Phase 1:
The ARPSDRXThread connects to the APRS client, and gets packets
from the client.  Then it puts the packet through the Collector
for stats and tracking.  Then the packet is put into the packet_queue.

Phase 2:
Packets are pulled from the packet_queue.  Then packets are run
through the PacketFilter mechanism, then processed depending
on the command being run.
By default there is 1 loaded packet filter, which is the
DupePacketFilter which removes "duplicate" packets that aprsd has
already seen and processed within the configured time frame.

This PacketFilter mechanism allows an external extension or plugin
to add/remove packet filters at will depending on the function
of the extension or plugin.   For example, this allows an extension
to get a packet and push the packet into an MQTT queue.
This commit is contained in:
Hemna 2025-01-30 09:41:01 -08:00
parent 3b57e7597d
commit 1b4eaf35e1
11 changed files with 349 additions and 140 deletions

View File

@ -13,6 +13,7 @@ import click
from loguru import logger
from oslo_config import cfg
from rich.console import Console
from typing import Union
# local imports here
import aprsd
@ -22,10 +23,15 @@ from aprsd.main import cli
from aprsd.packets import collector as packet_collector
from aprsd.packets import log as packet_log
from aprsd.packets import seen_list
from aprsd.packets import core
from aprsd.packets.filters import dupe_filter
from aprsd.packets.filters import packet_type
from aprsd.packets.filter import PacketFilter
from aprsd.stats import collector
from aprsd.threads import keepalive, rx
from aprsd.threads import stats as stats_thread
from aprsd.threads.aprsd import APRSDThread
from aprsd.utils import singleton
# setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10
@ -48,7 +54,7 @@ def signal_handler(sig, frame):
collector.Collector().collect()
class APRSDListenThread(rx.APRSDRXThread):
class APRSDListenProcessThread(rx.APRSDFilterThread):
def __init__(
self,
packet_queue,
@ -57,47 +63,22 @@ class APRSDListenThread(rx.APRSDRXThread):
enabled_plugins=[],
log_packets=False,
):
super().__init__(packet_queue)
super().__init__("ListenProcThread", packet_queue)
self.packet_filter = packet_filter
self.plugin_manager = plugin_manager
if self.plugin_manager:
LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}")
self.log_packets = log_packets
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
filters = {
packets.Packet.__name__: packets.Packet,
packets.AckPacket.__name__: packets.AckPacket,
packets.BeaconPacket.__name__: packets.BeaconPacket,
packets.GPSPacket.__name__: packets.GPSPacket,
packets.MessagePacket.__name__: packets.MessagePacket,
packets.MicEPacket.__name__: packets.MicEPacket,
packets.ObjectPacket.__name__: packets.ObjectPacket,
packets.StatusPacket.__name__: packets.StatusPacket,
packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
packets.WeatherPacket.__name__: packets.WeatherPacket,
packets.UnknownPacket.__name__: packets.UnknownPacket,
}
def print_packet(self, packet):
if self.log_packets:
packet_log.log(packet)
if self.packet_filter:
filter_class = filters[self.packet_filter]
if isinstance(packet, filter_class):
if self.log_packets:
packet_log.log(packet)
if self.plugin_manager:
# Don't do anything with the reply
# This is the listen only command.
self.plugin_manager.run(packet)
else:
if self.log_packets:
packet_log.log(packet)
if self.plugin_manager:
# Don't do anything with the reply.
# This is the listen only command.
self.plugin_manager.run(packet)
packet_collector.PacketCollector().rx(packet)
def process_packet(self, packet: type[core.Packet]):
if self.plugin_manager:
# Don't do anything with the reply.
# This is the listen only command.
self.plugin_manager.run(packet)
class ListenStatsThread(APRSDThread):
@ -106,23 +87,24 @@ class ListenStatsThread(APRSDThread):
def __init__(self):
super().__init__("PacketStatsLog")
self._last_total_rx = 0
self.period = 31
def loop(self):
if self.loop_count % 10 == 0:
if self.loop_count % self.period == 0:
# log the stats every 10 seconds
stats_json = collector.Collector().collect()
stats = stats_json["PacketList"]
total_rx = stats["rx"]
packet_count = len(stats["packets"])
rx_delta = total_rx - self._last_total_rx
rate = rx_delta / 10
rate = rx_delta / self.period
# Log summary stats
LOGU.opt(colors=True).info(
f"<green>RX Rate: {rate} pps</green> "
f"<green>RX Rate: {rate:.2f} pps</green> "
f"<yellow>Total RX: {total_rx}</yellow> "
f"<red>RX Last 10 secs: {rx_delta}</red> "
f"<white>Packets in PacketList: {packet_count}</white>",
f"<red>RX Last {self.period} secs: {rx_delta}</red> "
f"<white>Packets in PacketListStats: {packet_count}</white>",
)
self._last_total_rx = total_rx
@ -170,6 +152,8 @@ class ListenStatsThread(APRSDThread):
],
case_sensitive=False,
),
multiple=True,
default=[],
help="Filter by packet type",
)
@click.option(
@ -267,7 +251,7 @@ def listen(
print(msg)
sys.exit(-1)
LOG.debug(f"Filter by '{filter}'")
LOG.debug(f"Filter messages on aprsis server by '{filter}'")
aprs_client.set_filter(filter)
keepalive_thread = keepalive.KeepAliveThread()
@ -276,6 +260,15 @@ def listen(
# just deregister the class from the packet collector
packet_collector.PacketCollector().unregister(seen_list.SeenList)
# we don't want the dupe filter to run here.
PacketFilter().unregister(dupe_filter.DupePacketFilter)
if packet_filter:
LOG.info("Enabling packet filtering for {packet_filter}")
packet_type.PacketTypeFilter().set_allow_list(packet_filter)
PacketFilter().register(packet_type.PacketTypeFilter)
else:
LOG.info("No packet filtering enabled.")
pm = None
if load_plugins:
pm = plugin.PluginManager()
@ -300,15 +293,20 @@ def listen(
stats = stats_thread.APRSDStatsStoreThread()
stats.start()
LOG.debug("Create APRSDListenThread")
listen_thread = APRSDListenThread(
LOG.debug("Start APRSDRxThread")
rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue)
rx_thread.start()
LOG.debug("Create APRSDListenProcessThread")
listen_thread = APRSDListenProcessThread(
packet_queue=threads.packet_queue,
packet_filter=packet_filter,
plugin_manager=pm,
enabled_plugins=enable_plugin,
log_packets=log_packets,
)
LOG.debug("Start APRSDListenThread")
LOG.debug("Start APRSDListenProcessThread")
listen_thread.start()
if enable_packet_stats:
listen_stats = ListenStatsThread()
@ -317,6 +315,6 @@ def listen(
keepalive_thread.start()
LOG.debug("keepalive Join")
keepalive_thread.join()
LOG.debug("listen_thread Join")
rx_thread.join()
listen_thread.join()
stats.join()

View File

@ -147,7 +147,7 @@ def server(ctx, flush):
server_threads.register(keepalive.KeepAliveThread())
server_threads.register(stats_thread.APRSDStatsStoreThread())
server_threads.register(
rx.APRSDPluginRXThread(
rx.APRSDRXThread(
packet_queue=threads.packet_queue,
),
)

View File

@ -15,6 +15,8 @@ from aprsd.packets.core import ( # noqa: F401
WeatherPacket,
factory,
)
from aprsd.packets.filter import PacketFilter
from aprsd.packets.filters.dupe_filter import DupePacketFilter
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401
from aprsd.packets.tracker import PacketTrack # noqa: F401
@ -26,5 +28,9 @@ collector.PacketCollector().register(SeenList)
collector.PacketCollector().register(PacketTrack)
collector.PacketCollector().register(WatchList)
# Register all the packet filters for normal processing
# For specific commands you can deregister these if you don't want them.
PacketFilter().register(DupePacketFilter)
NULL_MESSAGE = -1

View File

@ -106,6 +106,8 @@ class Packet:
last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
# Was the packet acked?
acked: bool = field(repr=False, default=False, compare=False, hash=False)
# Was the packet previously processed (for dupe checking)
processed: bool = field(repr=False, default=False, compare=False, hash=False)
# Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
@ -186,12 +188,11 @@ class Packet:
def __repr__(self) -> str:
"""Build the repr version of the packet."""
repr = (
return (
f"{self.__class__.__name__}:"
f" From: {self.from_call} "
f" To: {self.to_call}"
)
return repr
@dataclass_json
@ -694,6 +695,8 @@ class UnknownPacket:
path: List[str] = field(default_factory=list, compare=False, hash=False)
packet_type: Optional[str] = field(default=None)
via: Optional[str] = field(default=None, compare=False, hash=False)
# Was the packet previously processed (for dupe checking)
processed: bool = field(repr=False, default=False, compare=False, hash=False)
@property
def key(self) -> str:

View File

@ -0,0 +1,66 @@
import logging
from typing import Union
from oslo_config import cfg
from aprsd.packets import core
from aprsd.utils import singleton
from aprsd import packets
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
@singleton
class DupePacketFilter:
"""This is a packet filter to detect duplicate packets.
This Uses the PacketList object to see if a packet exists
already. If it does exist in the PacketList, then we need to
check the flag on the packet to see if it's been processed before.
If the packet has been processed already within the allowed
timeframe, then it's a dupe.
"""
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""Filter a packet out if it's already been seen and processed."""
if isinstance(packet, core.AckPacket):
# We don't need to drop AckPackets, those should be
# processed.
# Send the AckPacket to the queue for processing elsewhere.
return packet
else:
# Make sure we aren't re-processing the same packet
# For RF based APRS Clients we can get duplicate packets
# So we need to track them and not process the dupes.
pkt_list = packets.PacketList()
found = False
try:
# Find the packet in the list of already seen packets
# Based on the packet.key
found = pkt_list.find(packet)
if not packet.msgNo:
# If the packet doesn't have a message id
# then there is no reliable way to detect
# if it's a dupe, so we just pass it on.
# it shouldn't get acked either.
found = False
except KeyError:
found = False
if not found:
# We haven't seen this packet before, so we process it.
return packet
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
)
else:
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
)
return packet

56
aprsd/packets/filter.py Normal file
View File

@ -0,0 +1,56 @@
import logging
from typing import Callable, Protocol, runtime_checkable
from aprsd.packets import core
from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable
class PacketFilterProtocol(Protocol):
"""Protocol API for a packet filter class.
"""
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""When we get a packet from the network.
Return a Packet object if the filter passes. Return None if the
Packet is filtered out.
"""
...
@singleton
class PacketFilter:
def __init__(self):
self.filters = list[Callable] = []
def register(self, packet_filter: Callable) -> None:
if not isinstance(packet_filter, PacketFilterProtocol):
raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
self.filters.append(packet_filter)
def unregister(self, packet_filter: Callable) -> None:
if not isinstance(packet_filter, PacketFilterProtocol):
raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
self.filters.remove(packet_filter)
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""Run through each of the filters.
This will step through each registered filter class
and call filter on it.
If the filter object returns None, we are done filtering.
If the filter object returns the packet, we continue filtering.
"""
for name in self.filters:
cls = name()
try:
packet = cls.filter(packet)
except Exception as ex:
LOG.error(f"Error in fitering packet {packet} with filter class {name}")
if not packet:
return None
return packet

View File

View File

@ -0,0 +1,69 @@
import logging
from typing import Union
from oslo_config import cfg
from aprsd.packets import core
from aprsd import packets
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class DupePacketFilter:
"""This is a packet filter to detect duplicate packets.
This Uses the PacketList object to see if a packet exists
already. If it does exist in the PacketList, then we need to
check the flag on the packet to see if it's been processed before.
If the packet has been processed already within the allowed
timeframe, then it's a dupe.
"""
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
#LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
"""Filter a packet out if it's already been seen and processed."""
if isinstance(packet, core.AckPacket):
# We don't need to drop AckPackets, those should be
# processed.
# Send the AckPacket to the queue for processing elsewhere.
return packet
else:
# Make sure we aren't re-processing the same packet
# For RF based APRS Clients we can get duplicate packets
# So we need to track them and not process the dupes.
pkt_list = packets.PacketList()
found = False
try:
# Find the packet in the list of already seen packets
# Based on the packet.key
found = pkt_list.find(packet)
if not packet.msgNo:
# If the packet doesn't have a message id
# then there is no reliable way to detect
# if it's a dupe, so we just pass it on.
# it shouldn't get acked either.
found = False
except KeyError:
found = False
if not found:
# We haven't seen this packet before, so we process it.
return packet
if not packet.processed:
# We haven't processed this packet through the plugins.
return packet
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
)
else:
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
)
return packet

View File

@ -0,0 +1,40 @@
@singleton
class PacketTypeFilter:
"""This filter is used to filter out packets that don't match a specific type.
To use this, register it with the PacketFilter class,
then instante it and call set_allow_list() with a list of packet types
you want to allow to pass the filtering. All other packets will be
filtered out.
"""
filters = {
packets.Packet.__name__: packets.Packet,
packets.AckPacket.__name__: packets.AckPacket,
packets.BeaconPacket.__name__: packets.BeaconPacket,
packets.GPSPacket.__name__: packets.GPSPacket,
packets.MessagePacket.__name__: packets.MessagePacket,
packets.MicEPacket.__name__: packets.MicEPacket,
packets.ObjectPacket.__name__: packets.ObjectPacket,
packets.StatusPacket.__name__: packets.StatusPacket,
packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
packets.WeatherPacket.__name__: packets.WeatherPacket,
packets.UnknownPacket.__name__: packets.UnknownPacket,
}
allow_list = tuple()
def set_allow_list(self, filter_list):
tmp_list = []
for filter in filter_list:
LOG.warning(f"Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}")
tmp_list.append(self.filters[filter])
self.allow_list = tuple(tmp_list)
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""Only allow packets of certain types to filter through."""
if self.allow_list:
if isinstance(packet, self.allow_list):
return packet1

View File

@ -4,9 +4,8 @@ import queue
# aprsd.threads
from .aprsd import APRSDThread, APRSDThreadList # noqa: F401
from .rx import ( # noqa: F401
APRSDDupeRXThread,
APRSDProcessPacketThread,
APRSDRXThread,
)
packet_queue = queue.Queue(maxsize=20)
packet_queue = queue.Queue(maxsize=500)

View File

@ -12,14 +12,27 @@ from aprsd.packets import collector
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
from aprsd.utils import trace
from aprsd.packets import filter, dupe_filter
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread):
"""Main Class to connect to an APRS Client and recieve packets.
A packet is received in the main loop and then sent to the
process_packet method, which sends the packet through the collector
to track the packet for stats, and then put into the packet queue
for processing in a separate thread.
"""
_client = None
# This is the queue that packets are sent to for processing.
# We process packets in a separate thread to help prevent
# getting blocked by the APRS server trying to send us packets.
packet_queue = None
def __init__(self, packet_queue):
super().__init__("RX_PKT")
self.packet_queue = packet_queue
@ -52,7 +65,7 @@ class APRSDRXThread(APRSDThread):
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
self._client.consumer(
self._process_packet,
self.process_packet,
raw=False,
blocking=False,
)
@ -75,91 +88,55 @@ class APRSDRXThread(APRSDThread):
time.sleep(1)
return True
def _process_packet(self, *args, **kwargs):
"""Intermediate callback so we can update the keepalive time."""
# Now call the 'real' packet processing for a RX'x packet
self.process_packet(*args, **kwargs)
@abc.abstractmethod
def process_packet(self, *args, **kwargs):
pass
class APRSDDupeRXThread(APRSDRXThread):
"""Process received packets.
This is the main APRSD Server command thread that
receives packets and makes sure the packet
hasn't been seen previously before sending it on
to be processed.
"""
@trace.trace
def process_packet(self, *args, **kwargs):
"""This handles the processing of an inbound packet.
When a packet is received by the connected client object,
it sends the raw packet into this function. This function then
decodes the packet via the client, and then processes the packet.
Ack Packets are sent to the PluginProcessPacketThread for processing.
All other packets have to be checked as a dupe, and then only after
we haven't seen this packet before, do we send it to the
PluginProcessPacketThread for processing.
"""
packet = self._client.decode_packet(*args, **kwargs)
# Don't log the packet here. We want the processing thread
# To make the choice if they want it logged or not.
# For now we want all the stats updated, so send the packet
# into the packet collector.
collector.PacketCollector().rx(packet)
self.packet_queue.put(packet)
class APRSDFilterThread(APRSDThread):
def __init__(self, thread_name, packet_queue):
super().__init__(thread_name)
self.packet_queue = packet_queue
def filter_packet(self, packet):
# Do any packet filtering prior to processing
if not filter.PacketFilter().filter(packet):
return None
return packet
def print_packet(self, packet):
"""Allow a child of this class to override this.
This is helpful if for whatever reason the child class
doesn't want to log packets.
"""
packet_log.log(packet)
pkt_list = packets.PacketList()
if isinstance(packet, packets.AckPacket):
# We don't need to drop AckPackets, those should be
# processed.
self.packet_queue.put(packet)
else:
# Make sure we aren't re-processing the same packet
# For RF based APRS Clients we can get duplicate packets
# So we need to track them and not process the dupes.
found = False
try:
# Find the packet in the list of already seen packets
# Based on the packet.key
found = pkt_list.find(packet)
if not packet.msgNo:
# If the packet doesn't have a message id
# then there is no reliable way to detect
# if it's a dupe, so we just pass it on.
# it shouldn't get acked either.
found = False
except KeyError:
found = False
if not found:
# We haven't seen this packet before, so we process it.
collector.PacketCollector().rx(packet)
self.packet_queue.put(packet)
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
)
else:
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
)
collector.PacketCollector().rx(packet)
self.packet_queue.put(packet)
def loop(self):
try:
packet = self.packet_queue.get(timeout=1)
self.print_packet(packet)
if packet:
if self.filter_packet(packet):
self.process_packet(packet)
except queue.Empty:
pass
return True
class APRSDPluginRXThread(APRSDDupeRXThread):
""" "Process received packets.
class APRSDProcessPacketThread(APRSDFilterThread):
"""Base class for processing received packets after they have been filtered.
For backwards compatibility, we keep the APRSDPluginRXThread.
"""
class APRSDProcessPacketThread(APRSDThread):
"""Base class for processing received packets.
Packets are received from the client, then filtered for dupes,
then sent to the packet queue. This thread pulls packets from
the packet queue for processing.
This is the base class for processing packets coming from
the consumer. This base class handles sending ack packets and
@ -167,8 +144,7 @@ class APRSDProcessPacketThread(APRSDThread):
for processing."""
def __init__(self, packet_queue):
self.packet_queue = packet_queue
super().__init__("ProcessPKT")
super().__init__("ProcessPKT", packet_queue=packet_queue)
if not CONF.enable_sending_ack_packets:
LOG.warning(
"Sending ack packets is disabled, messages "
@ -193,18 +169,14 @@ class APRSDProcessPacketThread(APRSDThread):
LOG.debug(f"Got REJECT for message {ack_num}")
collector.PacketCollector().rx(packet)
def loop(self):
try:
packet = self.packet_queue.get(timeout=1)
if packet:
self.process_packet(packet)
except queue.Empty:
pass
return True
def process_packet(self, packet):
"""Process a packet received from aprs-is server."""
LOG.debug(f"ProcessPKT-LOOP {self.loop_count}")
# set this now as we are going to process it.
# This is used during dupe checking, so set it early
packet.processed = True
our_call = CONF.callsign.lower()
from_call = packet.from_call