From 52dac7e0a0500a1f650e14d21183f9e4fbc1bd90 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 30 Jan 2025 09:41:01 -0800 Subject: [PATCH 1/4] Added new PacketFilter mechanism This patch adds the new PacketFilter class as a generic mechanism for doing packet filtering during the packet processing phase of recieving packets. The packet phases are: 1. reception and stats collection 2. packet processing. Each phase has a single thread for handling that phase. Phase 1: The ARPSDRXThread connects to the APRS client, and gets packets from the client. Then it puts the packet through the Collector for stats and tracking. Then the packet is put into the packet_queue. Phase 2: Packets are pulled from the packet_queue. Then packets are run through the PacketFilter mechanism, then processed depending on the command being run. By default there is 1 loaded packet filter, which is the DupePacketFilter which removes "duplicate" packets that aprsd has already seen and processed within the configured time frame. This PacketFilter mechanism allows an external extension or plugin to add/remove packet filters at will depending on the function of the extension or plugin. For example, this allows an extension to get a packet and push the packet into an MQTT queue. --- aprsd/cmds/listen.py | 88 +++++++++++----------- aprsd/cmds/server.py | 2 +- aprsd/packets/__init__.py | 6 ++ aprsd/packets/core.py | 7 +- aprsd/packets/filter.py | 58 +++++++++++++++ aprsd/packets/filters/__init__.py | 0 aprsd/packets/filters/dupe_filter.py | 69 +++++++++++++++++ aprsd/packets/filters/packet_type.py | 52 +++++++++++++ aprsd/threads/__init__.py | 3 +- aprsd/threads/rx.py | 107 +++++++++++++++------------ 10 files changed, 293 insertions(+), 99 deletions(-) create mode 100644 aprsd/packets/filter.py create mode 100644 aprsd/packets/filters/__init__.py create mode 100644 aprsd/packets/filters/dupe_filter.py create mode 100644 aprsd/packets/filters/packet_type.py diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index d949e3e..8ab4ff1 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -13,6 +13,7 @@ import click from loguru import logger from oslo_config import cfg from rich.console import Console +from typing import Union # local imports here import aprsd @@ -22,10 +23,15 @@ from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import log as packet_log from aprsd.packets import seen_list +from aprsd.packets import core +from aprsd.packets.filters import dupe_filter +from aprsd.packets.filters import packet_type +from aprsd.packets.filter import PacketFilter from aprsd.stats import collector from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread from aprsd.threads.aprsd import APRSDThread +from aprsd.utils import singleton # setup the global logger # log.basicConfig(level=log.DEBUG) # level=10 @@ -48,7 +54,7 @@ def signal_handler(sig, frame): collector.Collector().collect() -class APRSDListenThread(rx.APRSDRXThread): +class APRSDListenProcessThread(rx.APRSDFilterThread): def __init__( self, packet_queue, @@ -57,47 +63,22 @@ class APRSDListenThread(rx.APRSDRXThread): enabled_plugins=[], log_packets=False, ): - super().__init__(packet_queue) + super().__init__("ListenProcThread", packet_queue) self.packet_filter = packet_filter self.plugin_manager = plugin_manager if self.plugin_manager: LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}") self.log_packets = log_packets - def process_packet(self, *args, **kwargs): - packet = self._client.decode_packet(*args, **kwargs) - filters = { - packets.Packet.__name__: packets.Packet, - packets.AckPacket.__name__: packets.AckPacket, - packets.BeaconPacket.__name__: packets.BeaconPacket, - packets.GPSPacket.__name__: packets.GPSPacket, - packets.MessagePacket.__name__: packets.MessagePacket, - packets.MicEPacket.__name__: packets.MicEPacket, - packets.ObjectPacket.__name__: packets.ObjectPacket, - packets.StatusPacket.__name__: packets.StatusPacket, - packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket, - packets.WeatherPacket.__name__: packets.WeatherPacket, - packets.UnknownPacket.__name__: packets.UnknownPacket, - } + def print_packet(self, packet): + if self.log_packets: + packet_log.log(packet) - if self.packet_filter: - filter_class = filters[self.packet_filter] - if isinstance(packet, filter_class): - if self.log_packets: - packet_log.log(packet) - if self.plugin_manager: - # Don't do anything with the reply - # This is the listen only command. - self.plugin_manager.run(packet) - else: - if self.log_packets: - packet_log.log(packet) - if self.plugin_manager: - # Don't do anything with the reply. - # This is the listen only command. - self.plugin_manager.run(packet) - - packet_collector.PacketCollector().rx(packet) + def process_packet(self, packet: type[core.Packet]): + if self.plugin_manager: + # Don't do anything with the reply. + # This is the listen only command. + self.plugin_manager.run(packet) class ListenStatsThread(APRSDThread): @@ -106,23 +87,24 @@ class ListenStatsThread(APRSDThread): def __init__(self): super().__init__("PacketStatsLog") self._last_total_rx = 0 + self.period = 31 def loop(self): - if self.loop_count % 10 == 0: + if self.loop_count % self.period == 0: # log the stats every 10 seconds stats_json = collector.Collector().collect() stats = stats_json["PacketList"] total_rx = stats["rx"] packet_count = len(stats["packets"]) rx_delta = total_rx - self._last_total_rx - rate = rx_delta / 10 + rate = rx_delta / self.period # Log summary stats LOGU.opt(colors=True).info( - f"RX Rate: {rate} pps " + f"RX Rate: {rate:.2f} pps " f"Total RX: {total_rx} " - f"RX Last 10 secs: {rx_delta} " - f"Packets in PacketList: {packet_count}", + f"RX Last {self.period} secs: {rx_delta} " + f"Packets in PacketListStats: {packet_count}", ) self._last_total_rx = total_rx @@ -170,6 +152,8 @@ class ListenStatsThread(APRSDThread): ], case_sensitive=False, ), + multiple=True, + default=[], help="Filter by packet type", ) @click.option( @@ -267,7 +251,7 @@ def listen( print(msg) sys.exit(-1) - LOG.debug(f"Filter by '{filter}'") + LOG.debug(f"Filter messages on aprsis server by '{filter}'") aprs_client.set_filter(filter) keepalive_thread = keepalive.KeepAliveThread() @@ -276,6 +260,15 @@ def listen( # just deregister the class from the packet collector packet_collector.PacketCollector().unregister(seen_list.SeenList) + # we don't want the dupe filter to run here. + PacketFilter().unregister(dupe_filter.DupePacketFilter) + if packet_filter: + LOG.info("Enabling packet filtering for {packet_filter}") + packet_type.PacketTypeFilter().set_allow_list(packet_filter) + PacketFilter().register(packet_type.PacketTypeFilter) + else: + LOG.info("No packet filtering enabled.") + pm = None if load_plugins: pm = plugin.PluginManager() @@ -300,15 +293,20 @@ def listen( stats = stats_thread.APRSDStatsStoreThread() stats.start() - LOG.debug("Create APRSDListenThread") - listen_thread = APRSDListenThread( + LOG.debug("Start APRSDRxThread") + rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue) + rx_thread.start() + + + LOG.debug("Create APRSDListenProcessThread") + listen_thread = APRSDListenProcessThread( packet_queue=threads.packet_queue, packet_filter=packet_filter, plugin_manager=pm, enabled_plugins=enable_plugin, log_packets=log_packets, ) - LOG.debug("Start APRSDListenThread") + LOG.debug("Start APRSDListenProcessThread") listen_thread.start() if enable_packet_stats: listen_stats = ListenStatsThread() @@ -317,6 +315,6 @@ def listen( keepalive_thread.start() LOG.debug("keepalive Join") keepalive_thread.join() - LOG.debug("listen_thread Join") + rx_thread.join() listen_thread.join() stats.join() diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 3c812d2..f4ad975 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -147,7 +147,7 @@ def server(ctx, flush): server_threads.register(keepalive.KeepAliveThread()) server_threads.register(stats_thread.APRSDStatsStoreThread()) server_threads.register( - rx.APRSDPluginRXThread( + rx.APRSDRXThread( packet_queue=threads.packet_queue, ), ) diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py index 62760fb..b5d854c 100644 --- a/aprsd/packets/__init__.py +++ b/aprsd/packets/__init__.py @@ -15,6 +15,8 @@ from aprsd.packets.core import ( # noqa: F401 WeatherPacket, factory, ) +from aprsd.packets.filter import PacketFilter +from aprsd.packets.filters.dupe_filter import DupePacketFilter from aprsd.packets.packet_list import PacketList # noqa: F401 from aprsd.packets.seen_list import SeenList # noqa: F401 from aprsd.packets.tracker import PacketTrack # noqa: F401 @@ -26,5 +28,9 @@ collector.PacketCollector().register(SeenList) collector.PacketCollector().register(PacketTrack) collector.PacketCollector().register(WatchList) +# Register all the packet filters for normal processing +# For specific commands you can deregister these if you don't want them. +PacketFilter().register(DupePacketFilter) + NULL_MESSAGE = -1 diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 18a3c91..804db00 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -106,6 +106,8 @@ class Packet: last_send_time: float = field(repr=False, default=0, compare=False, hash=False) # Was the packet acked? acked: bool = field(repr=False, default=False, compare=False, hash=False) + # Was the packet previously processed (for dupe checking) + processed: bool = field(repr=False, default=False, compare=False, hash=False) # Do we allow this packet to be saved to send later? allow_delay: bool = field(repr=False, default=True, compare=False, hash=False) @@ -186,12 +188,11 @@ class Packet: def __repr__(self) -> str: """Build the repr version of the packet.""" - repr = ( + return ( f"{self.__class__.__name__}:" f" From: {self.from_call} " f" To: {self.to_call}" ) - return repr @dataclass_json @@ -694,6 +695,8 @@ class UnknownPacket: path: List[str] = field(default_factory=list, compare=False, hash=False) packet_type: Optional[str] = field(default=None) via: Optional[str] = field(default=None, compare=False, hash=False) + # Was the packet previously processed (for dupe checking) + processed: bool = field(repr=False, default=False, compare=False, hash=False) @property def key(self) -> str: diff --git a/aprsd/packets/filter.py b/aprsd/packets/filter.py new file mode 100644 index 0000000..152366b --- /dev/null +++ b/aprsd/packets/filter.py @@ -0,0 +1,58 @@ +import logging +from typing import Callable, Protocol, runtime_checkable, Union, Dict + +from aprsd.packets import core +from aprsd.utils import singleton + +LOG = logging.getLogger("APRSD") + + +@runtime_checkable +class PacketFilterProtocol(Protocol): + """Protocol API for a packet filter class. + """ + def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: + """When we get a packet from the network. + + Return a Packet object if the filter passes. Return None if the + Packet is filtered out. + """ + ... + + +@singleton +class PacketFilter: + + def __init__(self): + self.filters: Dict[str, Callable] = {} + + def register(self, packet_filter: Callable) -> None: + if not isinstance(packet_filter, PacketFilterProtocol): + raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object") + + if packet_filter not in self.filters: + self.filters[packet_filter] = packet_filter() + + def unregister(self, packet_filter: Callable) -> None: + if not isinstance(packet_filter, PacketFilterProtocol): + raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object") + if packet_filter in self.filters: + del self.filters[packet_filter] + + def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: + """Run through each of the filters. + + This will step through each registered filter class + and call filter on it. + + If the filter object returns None, we are done filtering. + If the filter object returns the packet, we continue filtering. + """ + for packet_filter in self.filters: + try: + if not self.filters[packet_filter].filter(packet): + LOG.debug(f"{self.filters[packet_filter].__class__.__name__} dropped {packet.__class__.__name__}:{packet.human_info}") + return None + except Exception as ex: + LOG.error(f"{packet_filter.__clas__.__name__} failed filtering packet {packet.__class__.__name__} : {ex}") + return packet diff --git a/aprsd/packets/filters/__init__.py b/aprsd/packets/filters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aprsd/packets/filters/dupe_filter.py b/aprsd/packets/filters/dupe_filter.py new file mode 100644 index 0000000..e0fa363 --- /dev/null +++ b/aprsd/packets/filters/dupe_filter.py @@ -0,0 +1,69 @@ +import logging +from typing import Union + +from oslo_config import cfg + +from aprsd.packets import core +from aprsd import packets +from aprsd.utils import trace + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class DupePacketFilter: + """This is a packet filter to detect duplicate packets. + + This Uses the PacketList object to see if a packet exists + already. If it does exist in the PacketList, then we need to + check the flag on the packet to see if it's been processed before. + If the packet has been processed already within the allowed + timeframe, then it's a dupe. + """ + def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: + #LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}") + """Filter a packet out if it's already been seen and processed.""" + if isinstance(packet, core.AckPacket): + # We don't need to drop AckPackets, those should be + # processed. + # Send the AckPacket to the queue for processing elsewhere. + return packet + else: + # Make sure we aren't re-processing the same packet + # For RF based APRS Clients we can get duplicate packets + # So we need to track them and not process the dupes. + pkt_list = packets.PacketList() + found = False + try: + # Find the packet in the list of already seen packets + # Based on the packet.key + found = pkt_list.find(packet) + if not packet.msgNo: + # If the packet doesn't have a message id + # then there is no reliable way to detect + # if it's a dupe, so we just pass it on. + # it shouldn't get acked either. + found = False + except KeyError: + found = False + + if not found: + # We haven't seen this packet before, so we process it. + return packet + + if not packet.processed: + # We haven't processed this packet through the plugins. + return packet + elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout: + # If the packet came in within N seconds of the + # Last time seeing the packet, then we drop it as a dupe. + LOG.warning( + f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping." + ) + else: + LOG.warning( + f"Packet {packet.from_call}:{packet.msgNo} already tracked " + f"but older than {CONF.packet_dupe_timeout} seconds. processing.", + ) + return packet diff --git a/aprsd/packets/filters/packet_type.py b/aprsd/packets/filters/packet_type.py new file mode 100644 index 0000000..82eaebd --- /dev/null +++ b/aprsd/packets/filters/packet_type.py @@ -0,0 +1,52 @@ +import logging +from typing import Union + +from oslo_config import cfg + +from aprsd.packets import core +from aprsd import packets +from aprsd.utils import singleton + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +@singleton +class PacketTypeFilter: + """This filter is used to filter out packets that don't match a specific type. + + To use this, register it with the PacketFilter class, + then instante it and call set_allow_list() with a list of packet types + you want to allow to pass the filtering. All other packets will be + filtered out. + """ + + filters = { + packets.Packet.__name__: packets.Packet, + packets.AckPacket.__name__: packets.AckPacket, + packets.BeaconPacket.__name__: packets.BeaconPacket, + packets.GPSPacket.__name__: packets.GPSPacket, + packets.MessagePacket.__name__: packets.MessagePacket, + packets.MicEPacket.__name__: packets.MicEPacket, + packets.ObjectPacket.__name__: packets.ObjectPacket, + packets.StatusPacket.__name__: packets.StatusPacket, + packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket, + packets.WeatherPacket.__name__: packets.WeatherPacket, + packets.UnknownPacket.__name__: packets.UnknownPacket, + } + + allow_list = tuple() + + def set_allow_list(self, filter_list): + tmp_list = [] + for filter in filter_list: + LOG.warning(f"Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}") + tmp_list.append(self.filters[filter]) + self.allow_list = tuple(tmp_list) + + def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: + """Only allow packets of certain types to filter through.""" + if self.allow_list: + if isinstance(packet, self.allow_list): + return packet diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py index eea5d2e..7946b6d 100644 --- a/aprsd/threads/__init__.py +++ b/aprsd/threads/__init__.py @@ -4,9 +4,8 @@ import queue # aprsd.threads from .aprsd import APRSDThread, APRSDThreadList # noqa: F401 from .rx import ( # noqa: F401 - APRSDDupeRXThread, APRSDProcessPacketThread, APRSDRXThread, ) -packet_queue = queue.Queue(maxsize=20) +packet_queue = queue.Queue(maxsize=500) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index d0244f9..1088daf 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -12,14 +12,28 @@ from aprsd.packets import collector from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx from aprsd.utils import trace +from aprsd.packets import filter +from aprsd.packets.filters import dupe_filter CONF = cfg.CONF LOG = logging.getLogger('APRSD') class APRSDRXThread(APRSDThread): + """Main Class to connect to an APRS Client and recieve packets. + + A packet is received in the main loop and then sent to the + process_packet method, which sends the packet through the collector + to track the packet for stats, and then put into the packet queue + for processing in a separate thread. + """ _client = None + # This is the queue that packets are sent to for processing. + # We process packets in a separate thread to help prevent + # getting blocked by the APRS server trying to send us packets. + packet_queue = None + def __init__(self, packet_queue): super().__init__('RX_PKT') self.packet_queue = packet_queue @@ -52,7 +66,7 @@ class APRSDRXThread(APRSDThread): # kwargs. :( # https://github.com/rossengeorgiev/aprs-python/pull/56 self._client.consumer( - self._process_packet, + self.process_packet, raw=False, blocking=False, ) @@ -73,37 +87,7 @@ class APRSDRXThread(APRSDThread): time.sleep(5) return True - def _process_packet(self, *args, **kwargs): - """Intermediate callback so we can update the keepalive time.""" - # Now call the 'real' packet processing for a RX'x packet - self.process_packet(*args, **kwargs) - - @abc.abstractmethod def process_packet(self, *args, **kwargs): - pass - - -class APRSDDupeRXThread(APRSDRXThread): - """Process received packets. - - This is the main APRSD Server command thread that - receives packets and makes sure the packet - hasn't been seen previously before sending it on - to be processed. - """ - - @trace.trace - def process_packet(self, *args, **kwargs): - """This handles the processing of an inbound packet. - - When a packet is received by the connected client object, - it sends the raw packet into this function. This function then - decodes the packet via the client, and then processes the packet. - Ack Packets are sent to the PluginProcessPacketThread for processing. - All other packets have to be checked as a dupe, and then only after - we haven't seen this packet before, do we send it to the - PluginProcessPacketThread for processing. - """ packet = self._client.decode_packet(*args, **kwargs) if not packet: LOG.error( @@ -154,15 +138,45 @@ class APRSDDupeRXThread(APRSDRXThread): self.packet_queue.put(packet) -class APRSDPluginRXThread(APRSDDupeRXThread): - """ "Process received packets. +class APRSDFilterThread(APRSDThread): - For backwards compatibility, we keep the APRSDPluginRXThread. - """ + def __init__(self, thread_name, packet_queue): + super().__init__(thread_name) + self.packet_queue = packet_queue + + def filter_packet(self, packet): + # Do any packet filtering prior to processing + if not filter.PacketFilter().filter(packet): + return None + return packet + + def print_packet(self, packet): + """Allow a child of this class to override this. + + This is helpful if for whatever reason the child class + doesn't want to log packets. + + """ + packet_log.log(packet) + + def loop(self): + try: + packet = self.packet_queue.get(timeout=1) + self.print_packet(packet) + if packet: + if self.filter_packet(packet): + self.process_packet(packet) + except queue.Empty: + pass + return True -class APRSDProcessPacketThread(APRSDThread): - """Base class for processing received packets. +class APRSDProcessPacketThread(APRSDFilterThread): + """Base class for processing received packets after they have been filtered. + + Packets are received from the client, then filtered for dupes, + then sent to the packet queue. This thread pulls packets from + the packet queue for processing. This is the base class for processing packets coming from the consumer. This base class handles sending ack packets and @@ -170,8 +184,7 @@ class APRSDProcessPacketThread(APRSDThread): for processing.""" def __init__(self, packet_queue): - self.packet_queue = packet_queue - super().__init__('ProcessPKT') + super().__init__("ProcessPKT", packet_queue=packet_queue) if not CONF.enable_sending_ack_packets: LOG.warning( 'Sending ack packets is disabled, messages will not be acknowledged.', @@ -195,18 +208,14 @@ class APRSDProcessPacketThread(APRSDThread): LOG.debug(f'Got REJECT for message {ack_num}') collector.PacketCollector().rx(packet) - def loop(self): - try: - packet = self.packet_queue.get(timeout=1) - if packet: - self.process_packet(packet) - except queue.Empty: - pass - return True - def process_packet(self, packet): """Process a packet received from aprs-is server.""" - LOG.debug(f'ProcessPKT-LOOP {self.loop_count}') + LOG.debug(f"ProcessPKT-LOOP {self.loop_count}") + + # set this now as we are going to process it. + # This is used during dupe checking, so set it early + packet.processed = True + our_call = CONF.callsign.lower() from_call = packet.from_call From e9e7e6b59f9f93f3f09142e56407bc87603a44cb Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 30 Jan 2025 10:44:17 -0800 Subject: [PATCH 2/4] Fixed some pep8 failures. --- aprsd/cmds/listen.py | 117 +++++++++++++-------------- aprsd/packets/filters/dupe_filter.py | 23 +++--- aprsd/threads/rx.py | 23 +++--- 3 files changed, 77 insertions(+), 86 deletions(-) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 8ab4ff1..4784e3a 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -13,7 +13,6 @@ import click from loguru import logger from oslo_config import cfg from rich.console import Console -from typing import Union # local imports here import aprsd @@ -21,21 +20,18 @@ from aprsd import cli_helper, packets, plugin, threads, utils from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector as packet_collector +from aprsd.packets import core, seen_list from aprsd.packets import log as packet_log -from aprsd.packets import seen_list -from aprsd.packets import core -from aprsd.packets.filters import dupe_filter -from aprsd.packets.filters import packet_type from aprsd.packets.filter import PacketFilter +from aprsd.packets.filters import dupe_filter, packet_type from aprsd.stats import collector from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread from aprsd.threads.aprsd import APRSDThread -from aprsd.utils import singleton # setup the global logger # log.basicConfig(level=log.DEBUG) # level=10 -LOG = logging.getLogger("APRSD") +LOG = logging.getLogger('APRSD') CONF = cfg.CONF LOGU = logger console = Console() @@ -43,9 +39,9 @@ console = Console() def signal_handler(sig, frame): threads.APRSDThreadList().stop_all() - if "subprocess" not in str(frame): + if 'subprocess' not in str(frame): LOG.info( - "Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format( + 'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format( datetime.datetime.now(), ), ) @@ -60,14 +56,14 @@ class APRSDListenProcessThread(rx.APRSDFilterThread): packet_queue, packet_filter=None, plugin_manager=None, - enabled_plugins=[], + enabled_plugins=None, log_packets=False, ): - super().__init__("ListenProcThread", packet_queue) + super().__init__('ListenProcThread', packet_queue) self.packet_filter = packet_filter self.plugin_manager = plugin_manager if self.plugin_manager: - LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}") + LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}') self.log_packets = log_packets def print_packet(self, packet): @@ -85,7 +81,7 @@ class ListenStatsThread(APRSDThread): """Log the stats from the PacketList.""" def __init__(self): - super().__init__("PacketStatsLog") + super().__init__('PacketStatsLog') self._last_total_rx = 0 self.period = 31 @@ -93,27 +89,27 @@ class ListenStatsThread(APRSDThread): if self.loop_count % self.period == 0: # log the stats every 10 seconds stats_json = collector.Collector().collect() - stats = stats_json["PacketList"] - total_rx = stats["rx"] - packet_count = len(stats["packets"]) + stats = stats_json['PacketList'] + total_rx = stats['rx'] + packet_count = len(stats['packets']) rx_delta = total_rx - self._last_total_rx - rate = rx_delta / self.period + rate = rx_delta / self.period # Log summary stats LOGU.opt(colors=True).info( - f"RX Rate: {rate:.2f} pps " - f"Total RX: {total_rx} " - f"RX Last {self.period} secs: {rx_delta} " - f"Packets in PacketListStats: {packet_count}", + f'RX Rate: {rate:.2f} pps ' + f'Total RX: {total_rx} ' + f'RX Last {self.period} secs: {rx_delta} ' + f'Packets in PacketListStats: {packet_count}', ) self._last_total_rx = total_rx # Log individual type stats - for k, v in stats["types"].items(): - thread_hex = f"fg {utils.hex_from_name(k)}" + for k, v in stats['types'].items(): + thread_hex = f'fg {utils.hex_from_name(k)}' LOGU.opt(colors=True).info( - f"<{thread_hex}>{k:<15} " - f"RX: {v['rx']} TX: {v['tx']}", + f'<{thread_hex}>{k:<15} ' + f'RX: {v["rx"]} TX: {v["tx"]}', ) time.sleep(1) @@ -123,19 +119,19 @@ class ListenStatsThread(APRSDThread): @cli.command() @cli_helper.add_options(cli_helper.common_options) @click.option( - "--aprs-login", - envvar="APRS_LOGIN", + '--aprs-login', + envvar='APRS_LOGIN', show_envvar=True, - help="What callsign to send the message from.", + help='What callsign to send the message from.', ) @click.option( - "--aprs-password", - envvar="APRS_PASSWORD", + '--aprs-password', + envvar='APRS_PASSWORD', show_envvar=True, - help="the APRS-IS password for APRS_LOGIN", + help='the APRS-IS password for APRS_LOGIN', ) @click.option( - "--packet-filter", + '--packet-filter', type=click.Choice( [ packets.AckPacket.__name__, @@ -154,35 +150,35 @@ class ListenStatsThread(APRSDThread): ), multiple=True, default=[], - help="Filter by packet type", + help='Filter by packet type', ) @click.option( - "--enable-plugin", + '--enable-plugin', multiple=True, - help="Enable a plugin. This is the name of the file in the plugins directory.", + help='Enable a plugin. This is the name of the file in the plugins directory.', ) @click.option( - "--load-plugins", + '--load-plugins', default=False, is_flag=True, - help="Load plugins as enabled in aprsd.conf ?", + help='Load plugins as enabled in aprsd.conf ?', ) @click.argument( - "filter", + 'filter', nargs=-1, required=True, ) @click.option( - "--log-packets", + '--log-packets', default=False, is_flag=True, - help="Log incoming packets.", + help='Log incoming packets.', ) @click.option( - "--enable-packet-stats", + '--enable-packet-stats', default=False, is_flag=True, - help="Enable packet stats periodic logging.", + help='Enable packet stats periodic logging.', ) @click.pass_context @cli_helper.process_standard_options @@ -212,41 +208,41 @@ def listen( if not aprs_login: click.echo(ctx.get_help()) - click.echo("") - ctx.fail("Must set --aprs-login or APRS_LOGIN") + click.echo('') + ctx.fail('Must set --aprs-login or APRS_LOGIN') ctx.exit() if not aprs_password: click.echo(ctx.get_help()) - click.echo("") - ctx.fail("Must set --aprs-password or APRS_PASSWORD") + click.echo('') + ctx.fail('Must set --aprs-password or APRS_PASSWORD') ctx.exit() # CONF.aprs_network.login = aprs_login # config["aprs"]["password"] = aprs_password - LOG.info(f"APRSD Listen Started version: {aprsd.__version__}") + LOG.info(f'APRSD Listen Started version: {aprsd.__version__}') CONF.log_opt_values(LOG, logging.DEBUG) collector.Collector() # Try and load saved MsgTrack list - LOG.debug("Loading saved MsgTrack object.") + LOG.debug('Loading saved MsgTrack object.') # Initialize the client factory and create # The correct client object ready for use # Make sure we have 1 client transport enabled if not client_factory.is_client_enabled(): - LOG.error("No Clients are enabled in config.") + LOG.error('No Clients are enabled in config.') sys.exit(-1) # Creates the client object - LOG.info("Creating client connection") + LOG.info('Creating client connection') aprs_client = client_factory.create() LOG.info(aprs_client) if not aprs_client.login_success: # We failed to login, will just quit! - msg = f"Login Failure: {aprs_client.login_failure}" + msg = f'Login Failure: {aprs_client.login_failure}' LOG.error(msg) print(msg) sys.exit(-1) @@ -263,16 +259,16 @@ def listen( # we don't want the dupe filter to run here. PacketFilter().unregister(dupe_filter.DupePacketFilter) if packet_filter: - LOG.info("Enabling packet filtering for {packet_filter}") + LOG.info('Enabling packet filtering for {packet_filter}') packet_type.PacketTypeFilter().set_allow_list(packet_filter) PacketFilter().register(packet_type.PacketTypeFilter) else: - LOG.info("No packet filtering enabled.") + LOG.info('No packet filtering enabled.') pm = None if load_plugins: pm = plugin.PluginManager() - LOG.info("Loading plugins") + LOG.info('Loading plugins') pm.setup_plugins(load_help_plugin=False) elif enable_plugin: pm = plugin.PluginManager() @@ -283,22 +279,21 @@ def listen( else: LOG.warning( "Not Loading any plugins use --load-plugins to load what's " - "defined in the config file.", + 'defined in the config file.', ) if pm: for p in pm.get_plugins(): - LOG.info("Loaded plugin %s", p.__class__.__name__) + LOG.info('Loaded plugin %s', p.__class__.__name__) stats = stats_thread.APRSDStatsStoreThread() stats.start() - LOG.debug("Start APRSDRxThread") + LOG.debug('Start APRSDRxThread') rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue) rx_thread.start() - - LOG.debug("Create APRSDListenProcessThread") + LOG.debug('Create APRSDListenProcessThread') listen_thread = APRSDListenProcessThread( packet_queue=threads.packet_queue, packet_filter=packet_filter, @@ -306,14 +301,14 @@ def listen( enabled_plugins=enable_plugin, log_packets=log_packets, ) - LOG.debug("Start APRSDListenProcessThread") + LOG.debug('Start APRSDListenProcessThread') listen_thread.start() if enable_packet_stats: listen_stats = ListenStatsThread() listen_stats.start() keepalive_thread.start() - LOG.debug("keepalive Join") + LOG.debug('keepalive Join') keepalive_thread.join() rx_thread.join() listen_thread.join() diff --git a/aprsd/packets/filters/dupe_filter.py b/aprsd/packets/filters/dupe_filter.py index e0fa363..0839fcf 100644 --- a/aprsd/packets/filters/dupe_filter.py +++ b/aprsd/packets/filters/dupe_filter.py @@ -1,28 +1,27 @@ import logging -from typing import Union +from typing import Union from oslo_config import cfg -from aprsd.packets import core from aprsd import packets -from aprsd.utils import trace - +from aprsd.packets import core CONF = cfg.CONF -LOG = logging.getLogger("APRSD") +LOG = logging.getLogger('APRSD') class DupePacketFilter: """This is a packet filter to detect duplicate packets. This Uses the PacketList object to see if a packet exists - already. If it does exist in the PacketList, then we need to + already. If it does exist in the PacketList, then we need to check the flag on the packet to see if it's been processed before. - If the packet has been processed already within the allowed + If the packet has been processed already within the allowed timeframe, then it's a dupe. """ + def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: - #LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}") + # LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}") """Filter a packet out if it's already been seen and processed.""" if isinstance(packet, core.AckPacket): # We don't need to drop AckPackets, those should be @@ -51,7 +50,7 @@ class DupePacketFilter: if not found: # We haven't seen this packet before, so we process it. return packet - + if not packet.processed: # We haven't processed this packet through the plugins. return packet @@ -59,11 +58,11 @@ class DupePacketFilter: # If the packet came in within N seconds of the # Last time seeing the packet, then we drop it as a dupe. LOG.warning( - f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping." + f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.' ) else: LOG.warning( - f"Packet {packet.from_call}:{packet.msgNo} already tracked " - f"but older than {CONF.packet_dupe_timeout} seconds. processing.", + f'Packet {packet.from_call}:{packet.msgNo} already tracked ' + f'but older than {CONF.packet_dupe_timeout} seconds. processing.', ) return packet diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 1088daf..b995f8c 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -8,12 +8,9 @@ from oslo_config import cfg from aprsd import packets, plugin from aprsd.client import client_factory -from aprsd.packets import collector +from aprsd.packets import collector, filter from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx -from aprsd.utils import trace -from aprsd.packets import filter -from aprsd.packets.filters import dupe_filter CONF = cfg.CONF LOG = logging.getLogger('APRSD') @@ -24,13 +21,14 @@ class APRSDRXThread(APRSDThread): A packet is received in the main loop and then sent to the process_packet method, which sends the packet through the collector - to track the packet for stats, and then put into the packet queue - for processing in a separate thread. + to track the packet for stats, and then put into the packet queue + for processing in a separate thread. """ + _client = None # This is the queue that packets are sent to for processing. - # We process packets in a separate thread to help prevent + # We process packets in a separate thread to help prevent # getting blocked by the APRS server trying to send us packets. packet_queue = None @@ -139,7 +137,6 @@ class APRSDRXThread(APRSDThread): class APRSDFilterThread(APRSDThread): - def __init__(self, thread_name, packet_queue): super().__init__(thread_name) self.packet_queue = packet_queue @@ -149,13 +146,13 @@ class APRSDFilterThread(APRSDThread): if not filter.PacketFilter().filter(packet): return None return packet - + def print_packet(self, packet): """Allow a child of this class to override this. This is helpful if for whatever reason the child class doesn't want to log packets. - + """ packet_log.log(packet) @@ -174,7 +171,7 @@ class APRSDFilterThread(APRSDThread): class APRSDProcessPacketThread(APRSDFilterThread): """Base class for processing received packets after they have been filtered. - Packets are received from the client, then filtered for dupes, + Packets are received from the client, then filtered for dupes, then sent to the packet queue. This thread pulls packets from the packet queue for processing. @@ -184,7 +181,7 @@ class APRSDProcessPacketThread(APRSDFilterThread): for processing.""" def __init__(self, packet_queue): - super().__init__("ProcessPKT", packet_queue=packet_queue) + super().__init__('ProcessPKT', packet_queue=packet_queue) if not CONF.enable_sending_ack_packets: LOG.warning( 'Sending ack packets is disabled, messages will not be acknowledged.', @@ -210,7 +207,7 @@ class APRSDProcessPacketThread(APRSDFilterThread): def process_packet(self, packet): """Process a packet received from aprs-is server.""" - LOG.debug(f"ProcessPKT-LOOP {self.loop_count}") + LOG.debug(f'ProcessPKT-LOOP {self.loop_count}') # set this now as we are going to process it. # This is used during dupe checking, so set it early From fd517b32188fdf15835a74fbd515ce417e7ef1f5 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 30 Jan 2025 10:57:04 -0800 Subject: [PATCH 3/4] updated gitignore --- .gitignore | 6 ++++++ aprsd/packets/filters/packet_type.py | 15 ++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 831722e..5944341 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,9 @@ AUTHORS Makefile.venv # Copilot .DS_Store + +.python-version +.fleet +.vscode +.envrc +.doit.db diff --git a/aprsd/packets/filters/packet_type.py b/aprsd/packets/filters/packet_type.py index 82eaebd..8cdeb31 100644 --- a/aprsd/packets/filters/packet_type.py +++ b/aprsd/packets/filters/packet_type.py @@ -1,22 +1,21 @@ import logging -from typing import Union +from typing import Union from oslo_config import cfg -from aprsd.packets import core from aprsd import packets +from aprsd.packets import core from aprsd.utils import singleton - CONF = cfg.CONF -LOG = logging.getLogger("APRSD") +LOG = logging.getLogger('APRSD') @singleton class PacketTypeFilter: """This filter is used to filter out packets that don't match a specific type. - To use this, register it with the PacketFilter class, + To use this, register it with the PacketFilter class, then instante it and call set_allow_list() with a list of packet types you want to allow to pass the filtering. All other packets will be filtered out. @@ -36,12 +35,14 @@ class PacketTypeFilter: packets.UnknownPacket.__name__: packets.UnknownPacket, } - allow_list = tuple() + allow_list = () def set_allow_list(self, filter_list): tmp_list = [] for filter in filter_list: - LOG.warning(f"Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}") + LOG.warning( + f'Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}' + ) tmp_list.append(self.filters[filter]) self.allow_list = tuple(tmp_list) From 361663e7d2cf43bd2fd53da0d8c5205bb848dbc2 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 4 Feb 2025 10:34:05 -0800 Subject: [PATCH 4/4] Changed Objectstore log to debug This should help silence the log a bit. --- aprsd/utils/objectstore.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py index b04f6e6..be494f0 100644 --- a/aprsd/utils/objectstore.py +++ b/aprsd/utils/objectstore.py @@ -6,9 +6,8 @@ import threading from oslo_config import cfg - CONF = cfg.CONF -LOG = logging.getLogger("APRSD") +LOG = logging.getLogger('APRSD') class ObjectStoreMixin: @@ -63,7 +62,7 @@ class ObjectStoreMixin: def _save_filename(self): save_location = CONF.save_location - return "{}/{}.p".format( + return '{}/{}.p'.format( save_location, self.__class__.__name__.lower(), ) @@ -75,13 +74,13 @@ class ObjectStoreMixin: self._init_store() save_filename = self._save_filename() if len(self) > 0: - LOG.info( - f"{self.__class__.__name__}::Saving" - f" {len(self)} entries to disk at " - f"{save_filename}", + LOG.debug( + f'{self.__class__.__name__}::Saving' + f' {len(self)} entries to disk at ' + f'{save_filename}', ) with self.lock: - with open(save_filename, "wb+") as fp: + with open(save_filename, 'wb+') as fp: pickle.dump(self.data, fp) else: LOG.debug( @@ -97,21 +96,21 @@ class ObjectStoreMixin: return if os.path.exists(self._save_filename()): try: - with open(self._save_filename(), "rb") as fp: + with open(self._save_filename(), 'rb') as fp: raw = pickle.load(fp) if raw: self.data = raw LOG.debug( - f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.", + f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.', ) else: - LOG.debug(f"{self.__class__.__name__}::No data to load.") + LOG.debug(f'{self.__class__.__name__}::No data to load.') except (pickle.UnpicklingError, Exception) as ex: - LOG.error(f"Failed to UnPickle {self._save_filename()}") + LOG.error(f'Failed to UnPickle {self._save_filename()}') LOG.error(ex) self.data = {} else: - LOG.debug(f"{self.__class__.__name__}::No save file found.") + LOG.debug(f'{self.__class__.__name__}::No save file found.') def flush(self): """Nuke the old pickle file that stored the old results from last aprsd run."""