1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-24 17:08:38 -05:00

Collector cleanup

This commit is contained in:
Hemna 2024-10-18 12:07:02 -04:00
parent 8cdbf18bef
commit 765e02f5b3
12 changed files with 742 additions and 749 deletions

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,7 @@ from aprsd import cli_helper, packets
from aprsd import conf # noqa : F401 from aprsd import conf # noqa : F401
from aprsd.client import client_factory from aprsd.client import client_factory
from aprsd.main import cli from aprsd.main import cli
import aprsd.packets # noqa : F401
from aprsd.packets import collector from aprsd.packets import collector
from aprsd.threads import tx from aprsd.threads import tx
@ -94,10 +95,6 @@ def send_message(
else: else:
LOG.info(f"L'{aprs_login}' To'{tocallsign}' C'{command}'") LOG.info(f"L'{aprs_login}' To'{tocallsign}' C'{command}'")
packets.PacketList()
packets.WatchList()
packets.SeenList()
got_ack = False got_ack = False
got_response = False got_response = False

View File

@ -8,7 +8,7 @@ from oslo_config import cfg
import aprsd import aprsd
from aprsd import cli_helper from aprsd import cli_helper
from aprsd import main as aprsd_main from aprsd import main as aprsd_main
from aprsd import packets, plugin, threads, utils from aprsd import plugin, threads, utils
from aprsd.client import client_factory from aprsd.client import client_factory
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector as packet_collector from aprsd.packets import collector as packet_collector
@ -87,29 +87,24 @@ def server(ctx, flush):
LOG.error("APRS client is not properly configured in config file.") LOG.error("APRS client is not properly configured in config file.")
sys.exit(-1) sys.exit(-1)
# Now load the msgTrack from disk if any
packets.PacketList()
if flush:
LOG.debug("Deleting saved MsgTrack.")
packets.PacketTrack().flush()
packets.WatchList().flush()
packets.SeenList().flush()
packets.PacketList().flush()
else:
# Try and load saved MsgTrack list
LOG.debug("Loading saved MsgTrack object.")
packets.PacketTrack().load()
packets.WatchList().load()
packets.SeenList().load()
packets.PacketList().load()
keepalive = keep_alive.KeepAliveThread()
keepalive.start()
if not CONF.enable_seen_list: if not CONF.enable_seen_list:
# just deregister the class from the packet collector # just deregister the class from the packet collector
packet_collector.PacketCollector().unregister(seen_list.SeenList) packet_collector.PacketCollector().unregister(seen_list.SeenList)
# Now load the msgTrack from disk if any
if flush:
LOG.debug("Flushing All packet tracking objects.")
packet_collector.PacketCollector().flush()
else:
# Try and load saved MsgTrack list
LOG.debug("Loading saved packet tracking data.")
packet_collector.PacketCollector().load()
# Now start all the main processing threads.
keepalive = keep_alive.KeepAliveThread()
keepalive.start()
stats_store_thread = stats_thread.APRSDStatsStoreThread() stats_store_thread = stats_thread.APRSDStatsStoreThread()
stats_store_thread.start() stats_store_thread.start()

View File

@ -62,8 +62,6 @@ def signal_handler(sig, frame):
threads.APRSDThreadList().stop_all() threads.APRSDThreadList().stop_all()
if "subprocess" not in str(frame): if "subprocess" not in str(frame):
time.sleep(1.5) time.sleep(1.5)
# packets.WatchList().save()
# packets.SeenList().save()
stats.stats_collector.collect() stats.stats_collector.collect()
LOG.info("Telling flask to bail.") LOG.info("Telling flask to bail.")
signal.signal(signal.SIGTERM, sys.exit(0)) signal.signal(signal.SIGTERM, sys.exit(0))
@ -647,11 +645,6 @@ def webchat(ctx, flush, port):
LOG.error("APRS client is not properly configured in config file.") LOG.error("APRS client is not properly configured in config file.")
sys.exit(-1) sys.exit(-1)
packets.PacketList()
packets.PacketTrack()
packets.WatchList()
packets.SeenList()
keepalive = keep_alive.KeepAliveThread() keepalive = keep_alive.KeepAliveThread()
LOG.info("Start KeepAliveThread") LOG.info("Start KeepAliveThread")
keepalive.start() keepalive.start()

View File

@ -1,4 +0,0 @@
# What to return from a plugin if we have processed the message
# and it's ok, but don't send a usage string back
# REMOVE THIS FILE

View File

@ -1,3 +1,4 @@
from aprsd.packets import collector
from aprsd.packets.core import ( # noqa: F401 from aprsd.packets.core import ( # noqa: F401
AckPacket, BeaconPacket, BulletinPacket, GPSPacket, MessagePacket, AckPacket, BeaconPacket, BulletinPacket, GPSPacket, MessagePacket,
MicEPacket, ObjectPacket, Packet, RejectPacket, StatusPacket, MicEPacket, ObjectPacket, Packet, RejectPacket, StatusPacket,
@ -9,4 +10,11 @@ from aprsd.packets.tracker import PacketTrack # noqa: F401
from aprsd.packets.watch_list import WatchList # noqa: F401 from aprsd.packets.watch_list import WatchList # noqa: F401
# Register all the packet tracking objects.
collector.PacketCollector().register(PacketList)
collector.PacketCollector().register(SeenList)
collector.PacketCollector().register(PacketTrack)
collector.PacketCollector().register(WatchList)
NULL_MESSAGE = -1 NULL_MESSAGE = -1

View File

@ -20,6 +20,14 @@ class PacketMonitor(Protocol):
"""When we send a packet out the network.""" """When we send a packet out the network."""
... ...
def flush(self) -> None:
"""Flush out any data."""
...
def load(self) -> None:
"""Load any data."""
...
@singleton @singleton
class PacketCollector: class PacketCollector:
@ -27,30 +35,46 @@ class PacketCollector:
self.monitors: list[Callable] = [] self.monitors: list[Callable] = []
def register(self, monitor: Callable) -> None: def register(self, monitor: Callable) -> None:
if not isinstance(monitor, PacketMonitor):
raise TypeError(f"Monitor {monitor} is not a PacketMonitor")
self.monitors.append(monitor) self.monitors.append(monitor)
def unregister(self, monitor: Callable) -> None: def unregister(self, monitor: Callable) -> None:
if not isinstance(monitor, PacketMonitor):
raise TypeError(f"Monitor {monitor} is not a PacketMonitor")
self.monitors.remove(monitor) self.monitors.remove(monitor)
def rx(self, packet: type[core.Packet]) -> None: def rx(self, packet: type[core.Packet]) -> None:
for name in self.monitors: for name in self.monitors:
cls = name() cls = name()
if isinstance(cls, PacketMonitor): try:
try: cls.rx(packet)
cls.rx(packet) except Exception as e:
except Exception as e: LOG.error(f"Error in monitor {name} (rx): {e}")
LOG.error(f"Error in monitor {name} (rx): {e}")
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor")
def tx(self, packet: type[core.Packet]) -> None: def tx(self, packet: type[core.Packet]) -> None:
for name in self.monitors: for name in self.monitors:
cls = name() cls = name()
if isinstance(cls, PacketMonitor): try:
try: cls.tx(packet)
cls.tx(packet) except Exception as e:
except Exception as e: LOG.error(f"Error in monitor {name} (tx): {e}")
LOG.error(f"Error in monitor {name} (tx): {e}")
else:
raise TypeError(f"Monitor {name} is not a PacketMonitor") def flush(self):
"""Call flush on the objects. This is used to flush out any data."""
for name in self.monitors:
cls = name()
try:
cls.flush()
except Exception as e:
LOG.error(f"Error in monitor {name} (flush): {e}")
def load(self):
"""Call load on the objects. This is used to load any data."""
for name in self.monitors:
cls = name()
try:
cls.load()
except Exception as e:
LOG.error(f"Error in monitor {name} (load): {e}")

View File

@ -3,7 +3,7 @@ import logging
from oslo_config import cfg from oslo_config import cfg
from aprsd.packets import collector, core from aprsd.packets import core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -108,9 +108,3 @@ class PacketList(objectstore.ObjectStoreMixin):
"packets": pkts, "packets": pkts,
} }
return stats return stats
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketList)

View File

@ -3,7 +3,7 @@ import logging
from oslo_config import cfg from oslo_config import cfg
from aprsd.packets import collector, core from aprsd.packets import core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -47,8 +47,3 @@ class SeenList(objectstore.ObjectStoreMixin):
def tx(self, packet: type[core.Packet]): def tx(self, packet: type[core.Packet]):
"""We don't care about TX packets.""" """We don't care about TX packets."""
# Register with the packet collector so we can process the packet
# when we get it off the client (network)
collector.PacketCollector().register(SeenList)

View File

@ -3,7 +3,7 @@ import logging
from oslo_config import cfg from oslo_config import cfg
from aprsd.packets import collector, core from aprsd.packets import core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -101,9 +101,3 @@ class PacketTrack(objectstore.ObjectStoreMixin):
del self.data[key] del self.data[key]
except KeyError: except KeyError:
pass pass
# Now register the PacketList with the collector
# every packet we RX and TX goes through the collector
# for processing for whatever reason is needed.
collector.PacketCollector().register(PacketTrack)

View File

@ -4,7 +4,7 @@ import logging
from oslo_config import cfg from oslo_config import cfg
from aprsd import utils from aprsd import utils
from aprsd.packets import collector, core from aprsd.packets import core
from aprsd.utils import objectstore from aprsd.utils import objectstore
@ -117,6 +117,3 @@ class WatchList(objectstore.ObjectStoreMixin):
return False return False
else: else:
return False return False
collector.PacketCollector().register(WatchList)

View File

@ -25,14 +25,13 @@ class Collector:
stats = {} stats = {}
for name in self.producers: for name in self.producers:
cls = name() cls = name()
if isinstance(cls, StatsProducer): try:
try: stats[cls.__class__.__name__] = cls.stats(serializable=serializable).copy()
stats[cls.__class__.__name__] = cls.stats(serializable=serializable).copy() except Exception as e:
except Exception as e: LOG.error(f"Error in producer {name} (stats): {e}")
LOG.error(f"Error in producer {name} (stats): {e}")
else:
raise TypeError(f"{cls} is not an instance of StatsProducer")
return stats return stats
def register_producer(self, producer_name: Callable): def register_producer(self, producer_name: Callable):
if not isinstance(producer_name, StatsProducer):
raise TypeError(f"Producer {producer_name} is not a StatsProducer")
self.producers.append(producer_name) self.producers.append(producer_name)