mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-24 08:58:49 -05:00
Compare commits
2 Commits
f1de7bc681
...
1187f1ed73
Author | SHA1 | Date | |
---|---|---|---|
1187f1ed73 | |||
c201c93b5d |
@ -40,7 +40,8 @@ def signal_handler(sig, frame):
|
||||
class APRSDListenThread(rx.APRSDRXThread):
|
||||
def process_packet(self, *args, **kwargs):
|
||||
packet = self._client.decode_packet(*args, **kwargs)
|
||||
packet.log(header="RX Packet")
|
||||
packet.log(header="RX")
|
||||
packets.PacketList().rx(packet)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@ -113,9 +114,6 @@ def listen(
|
||||
|
||||
# Try and load saved MsgTrack list
|
||||
LOG.debug("Loading saved MsgTrack object.")
|
||||
packets.PacketTrack(config=config).load()
|
||||
packets.WatchList(config=config).load()
|
||||
packets.SeenList(config=config).load()
|
||||
|
||||
# Initialize the client factory and create
|
||||
# The correct client object ready for use
|
||||
@ -133,8 +131,6 @@ def listen(
|
||||
LOG.debug(f"Filter by '{filter}'")
|
||||
aprs_client.set_filter(filter)
|
||||
|
||||
packets.PacketList(config=config)
|
||||
|
||||
keepalive = threads.KeepAliveThread(config=config)
|
||||
keepalive.start()
|
||||
|
||||
|
@ -181,7 +181,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
|
||||
packet.get("addresse", None)
|
||||
fromcall = packet.from_call
|
||||
|
||||
packets.PacketList().add(packet)
|
||||
packets.PacketList().rx(packet)
|
||||
stats.APRSDStats().msgs_rx_inc()
|
||||
message = packet.get("message_text", None)
|
||||
msg = {
|
||||
|
@ -209,7 +209,7 @@ class SendMessageThread(threads.APRSDRXThread):
|
||||
|
||||
def process_our_message_packet(self, packet):
|
||||
global socketio
|
||||
packets.PacketList().add(packet)
|
||||
packets.PacketList().rx(packet)
|
||||
stats.APRSDStats().msgs_rx_inc()
|
||||
msg_number = packet.msgNo
|
||||
SentMessages().reply(self.packet.msgNo, packet)
|
||||
|
@ -31,31 +31,41 @@ def _int_timestamp():
|
||||
return int(round(time.time()))
|
||||
|
||||
|
||||
@dataclass()
|
||||
def _init_msgNo(): # noqa: N802
|
||||
"""For some reason __post__init doesn't get called.
|
||||
|
||||
So in order to initialize the msgNo field in the packet
|
||||
we use this workaround.
|
||||
"""
|
||||
c = counter.PacketCounter()
|
||||
c.increment()
|
||||
return c.value
|
||||
|
||||
|
||||
@dataclass
|
||||
class Packet(metaclass=abc.ABCMeta):
|
||||
from_call: str
|
||||
to_call: str
|
||||
addresse: str = None
|
||||
format: str = None
|
||||
msgNo: str = None # noqa: N815
|
||||
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
|
||||
packet_type: str = None
|
||||
timestamp: float = field(default_factory=_int_timestamp)
|
||||
# Holds the raw text string to be sent over the wire
|
||||
# or holds the raw string from input packet
|
||||
raw: str = None
|
||||
_raw_dict: dict = field(repr=False, default_factory=lambda: {})
|
||||
_retry_count = 3
|
||||
_last_send_time = 0
|
||||
_last_send_attempt = 0
|
||||
raw_dict: dict = field(repr=False, default_factory=lambda: {})
|
||||
|
||||
# Fields related to sending packets out
|
||||
send_count: int = field(repr=False, default=1)
|
||||
retry_count: int = field(repr=False, default=3)
|
||||
last_send_time: datetime.timedelta = field(repr=False, default=None)
|
||||
last_send_attempt: int = field(repr=False, default=0)
|
||||
# Do we allow this packet to be saved to send later?
|
||||
_allow_delay = True
|
||||
allow_delay: bool = field(repr=False, default=True)
|
||||
|
||||
_transport = None
|
||||
_raw_message = None
|
||||
|
||||
def __post__init(self):
|
||||
if not self.msgNo:
|
||||
c = counter.PacketCounter()
|
||||
c.increment()
|
||||
self.msgNo = c.value
|
||||
def __post__init__(self):
|
||||
LOG.warning(f"POST INIT {self}")
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Emulate a getter on a dict."""
|
||||
@ -76,7 +86,7 @@ class Packet(metaclass=abc.ABCMeta):
|
||||
@staticmethod
|
||||
def factory(raw_packet):
|
||||
raw = raw_packet
|
||||
raw["_raw_dict"] = raw.copy()
|
||||
raw["raw_dict"] = raw.copy()
|
||||
translate_fields = {
|
||||
"from": "from_call",
|
||||
"to": "to_call",
|
||||
@ -110,15 +120,16 @@ class Packet(metaclass=abc.ABCMeta):
|
||||
"""LOG a packet to the logfile."""
|
||||
asdict(self)
|
||||
log_list = ["\n"]
|
||||
name = self.__class__.__name__
|
||||
if header:
|
||||
if isinstance(self, AckPacket):
|
||||
if isinstance(self, AckPacket) and "tx" in header.lower():
|
||||
log_list.append(
|
||||
f"{header} ___________"
|
||||
f"(TX:{self._send_count} of {self._retry_count})",
|
||||
f"{header}____________({name}__"
|
||||
f"TX:{self.send_count} of {self.retry_count})",
|
||||
)
|
||||
else:
|
||||
log_list.append(f"{header} _______________")
|
||||
log_list.append(f" Packet : {self.__class__.__name__}")
|
||||
log_list.append(f"{header}____________({name})")
|
||||
# log_list.append(f" Packet : {self.__class__.__name__}")
|
||||
log_list.append(f" Raw : {self.raw}")
|
||||
if self.to_call:
|
||||
log_list.append(f" To : {self.to_call}")
|
||||
@ -137,7 +148,7 @@ class Packet(metaclass=abc.ABCMeta):
|
||||
|
||||
if self.msgNo:
|
||||
log_list.append(f" Msg # : {self.msgNo}")
|
||||
log_list.append(f"{header} _______________ Complete")
|
||||
log_list.append(f"{header}____________({name})")
|
||||
|
||||
LOG.info("\n".join(log_list))
|
||||
LOG.debug(self)
|
||||
@ -165,12 +176,12 @@ class Packet(metaclass=abc.ABCMeta):
|
||||
cl = aprsis_client
|
||||
else:
|
||||
cl = client.factory.create().client
|
||||
self.log(header="Sending Message Direct")
|
||||
self.log(header="TX Message Direct")
|
||||
cl.send(self.raw)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
|
||||
|
||||
@dataclass()
|
||||
@dataclass
|
||||
class PathPacket(Packet):
|
||||
path: List[str] = field(default_factory=list)
|
||||
via: str = None
|
||||
@ -179,10 +190,13 @@ class PathPacket(Packet):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass()
|
||||
@dataclass
|
||||
class AckPacket(PathPacket):
|
||||
response: str = None
|
||||
_send_count = 1
|
||||
|
||||
def __post__init__(self):
|
||||
if self.response:
|
||||
LOG.warning("Response set!")
|
||||
|
||||
def _build_raw(self):
|
||||
"""Build the self.raw which is what is sent over the air."""
|
||||
@ -200,7 +214,7 @@ class AckPacket(PathPacket):
|
||||
thread.start()
|
||||
|
||||
|
||||
@dataclass()
|
||||
@dataclass
|
||||
class MessagePacket(PathPacket):
|
||||
message_text: str = None
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import wrapt
|
||||
|
||||
@ -18,32 +17,40 @@ class PacketList:
|
||||
lock = threading.Lock()
|
||||
config = None
|
||||
|
||||
packet_list = utils.RingBuffer(1000)
|
||||
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
|
||||
|
||||
total_recv = 0
|
||||
total_tx = 0
|
||||
total_recv: int = 0
|
||||
total_tx: int = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.config = kwargs["config"]
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, config=None):
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
def _is_initialized(self):
|
||||
return self.config is not None
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __iter__(self):
|
||||
return iter(self.packet_list)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, packet):
|
||||
packet.ts = time.time()
|
||||
if packet.from_call == self.config["aprs"]["login"]:
|
||||
self.total_tx += 1
|
||||
else:
|
||||
self.total_recv += 1
|
||||
def rx(self, packet):
|
||||
"""Add a packet that was received."""
|
||||
self.total_recv += 1
|
||||
self.packet_list.append(packet)
|
||||
seen_list.SeenList().update_seen(packet)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def tx(self, packet):
|
||||
"""Add a packet that was received."""
|
||||
self.total_tx += 1
|
||||
self.packet_list.append(packet)
|
||||
seen_list.SeenList().update_seen(packet)
|
||||
|
||||
|
@ -15,18 +15,22 @@ class SeenList(objectstore.ObjectStoreMixin):
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
data = {}
|
||||
data: dict = {}
|
||||
config = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
def is_initialized(self):
|
||||
return self.config is not None
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def update_seen(self, packet):
|
||||
callsign = None
|
||||
|
@ -23,18 +23,23 @@ class PacketTrack(objectstore.ObjectStoreMixin):
|
||||
_instance = None
|
||||
_start_time = None
|
||||
lock = threading.Lock()
|
||||
config = None
|
||||
|
||||
data = {}
|
||||
total_tracked = 0
|
||||
data: dict = {}
|
||||
total_tracked: int = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._start_time = datetime.datetime.now()
|
||||
cls._instance.config = kwargs["config"]
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
return cls._instance
|
||||
|
||||
def is_initialized(self):
|
||||
return self.config is not None
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __getitem__(self, name):
|
||||
return self.data[name]
|
||||
|
@ -47,6 +47,9 @@ class WatchList(objectstore.ObjectStoreMixin):
|
||||
),
|
||||
}
|
||||
|
||||
def is_initialized(self):
|
||||
return self.config is not None
|
||||
|
||||
def is_enabled(self):
|
||||
if self.config and "watch_list" in self.config["aprsd"]:
|
||||
return self.config["aprsd"]["watch_list"].get("enabled", False)
|
||||
|
@ -43,8 +43,9 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
|
||||
message_text=(
|
||||
f"{fromcall} was just seen by type:'{packet_type}'"
|
||||
),
|
||||
allow_delay=False,
|
||||
)
|
||||
pkt._allow_delay = False
|
||||
# pkt.allow_delay = False
|
||||
return pkt
|
||||
else:
|
||||
LOG.debug("fromcall and notify_callsign are the same, not notifying")
|
||||
|
@ -90,6 +90,14 @@ class APRSDStats:
|
||||
def set_aprsis_keepalive(self):
|
||||
self._aprsis_keepalive = datetime.datetime.now()
|
||||
|
||||
def rx_packet(self, packet):
|
||||
if isinstance(packet, packets.MessagePacket):
|
||||
self.msgs_rx_inc()
|
||||
elif isinstance(packet, packets.MicEPacket):
|
||||
self.msgs_mice_inc()
|
||||
elif isinstance(packet, packets.AckPacket):
|
||||
self.ack_rx_inc()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
@property
|
||||
def msgs_tx(self):
|
||||
|
@ -45,6 +45,11 @@ class KeepAliveThread(APRSDThread):
|
||||
except KeyError:
|
||||
login = self.config["ham"]["callsign"]
|
||||
|
||||
if pkt_tracker.is_initialized():
|
||||
tracked_packets = len(pkt_tracker)
|
||||
else:
|
||||
tracked_packets = 0
|
||||
|
||||
keepalive = (
|
||||
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
|
||||
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}"
|
||||
@ -53,7 +58,7 @@ class KeepAliveThread(APRSDThread):
|
||||
utils.strfdelta(stats_obj.uptime),
|
||||
pl.total_recv,
|
||||
pl.total_tx,
|
||||
len(pkt_tracker),
|
||||
tracked_packets,
|
||||
stats_obj.msgs_tx,
|
||||
stats_obj.msgs_rx,
|
||||
last_msg_time,
|
||||
|
@ -66,7 +66,8 @@ class APRSDPluginRXThread(APRSDRXThread):
|
||||
def process_packet(self, *args, **kwargs):
|
||||
packet = self._client.decode_packet(*args, **kwargs)
|
||||
# LOG.debug(raw)
|
||||
packet.log(header="RX Packet")
|
||||
packet.log(header="RX")
|
||||
packets.PacketList().rx(packet)
|
||||
thread = APRSDPluginProcessPacketThread(
|
||||
config=self.config,
|
||||
packet=packet,
|
||||
@ -92,7 +93,6 @@ class APRSDProcessPacketThread(APRSDThread):
|
||||
def process_ack_packet(self, packet):
|
||||
ack_num = packet.msgNo
|
||||
LOG.info(f"Got ack for message {ack_num}")
|
||||
packet.log("RXACK")
|
||||
pkt_tracker = packets.PacketTrack()
|
||||
pkt_tracker.remove(ack_num)
|
||||
stats.APRSDStats().ack_rx_inc()
|
||||
@ -102,7 +102,6 @@ class APRSDProcessPacketThread(APRSDThread):
|
||||
"""Process a packet received from aprs-is server."""
|
||||
LOG.debug(f"RXPKT-LOOP {self._loop_cnt}")
|
||||
packet = self.packet
|
||||
packets.PacketList().add(packet)
|
||||
our_call = self.config["aprsd"]["callsign"].lower()
|
||||
|
||||
from_call = packet.from_call
|
||||
|
@ -11,6 +11,8 @@ LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
|
||||
def __init__(self, packet):
|
||||
self.packet = packet
|
||||
name = self.packet.raw[:5]
|
||||
@ -19,7 +21,6 @@ class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
pkt_tracker.add(packet)
|
||||
|
||||
def loop(self):
|
||||
LOG.debug("TX Loop")
|
||||
"""Loop until a message is acked or it gets delayed.
|
||||
|
||||
We only sleep for 5 seconds between each loop run, so
|
||||
@ -39,20 +40,20 @@ class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
return False
|
||||
else:
|
||||
send_now = False
|
||||
if packet._last_send_attempt == packet._retry_count:
|
||||
if packet.last_send_attempt == packet.retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Message Send Complete. Max attempts reached.")
|
||||
if not packet._allow_delay:
|
||||
if not packet.allow_delay:
|
||||
pkt_tracker.remove(packet.msgNo)
|
||||
return False
|
||||
|
||||
# Message is still outstanding and needs to be acked.
|
||||
if packet._last_send_time:
|
||||
if packet.last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
sleeptime = (packet._last_send_attempt + 1) * 31
|
||||
delta = now - packet._last_send_time
|
||||
sleeptime = (packet.last_send_attempt + 1) * 31
|
||||
delta = now - packet.last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
@ -62,59 +63,62 @@ class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
if send_now:
|
||||
# no attempt time, so lets send it, and start
|
||||
# tracking the time.
|
||||
packet.log("Sending Message")
|
||||
packet.log("TX")
|
||||
cl = client.factory.create().client
|
||||
cl.send(packet.raw)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
packet_list.PacketList().add(packet)
|
||||
packet._last_send_time = datetime.datetime.now()
|
||||
packet._last_send_attempt += 1
|
||||
packet_list.PacketList().tx(packet)
|
||||
packet.last_send_time = datetime.datetime.now()
|
||||
packet.last_send_attempt += 1
|
||||
|
||||
time.sleep(5)
|
||||
time.sleep(1)
|
||||
# Make sure we get called again.
|
||||
self.loop_count += 1
|
||||
return True
|
||||
|
||||
|
||||
class SendAckThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
|
||||
def __init__(self, packet):
|
||||
self.packet = packet
|
||||
super().__init__(f"SendAck-{self.packet.msgNo}")
|
||||
self._loop_cnt = 1
|
||||
|
||||
def loop(self):
|
||||
"""Separate thread to send acks with retries."""
|
||||
send_now = False
|
||||
if self.packet._last_send_attempt == self.packet._retry_count:
|
||||
if self.packet.last_send_attempt == self.packet.retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Ack Send Complete. Max attempts reached.")
|
||||
return False
|
||||
|
||||
if self.packet._last_send_time:
|
||||
if self.packet.last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
|
||||
# aprs duplicate detection is 30 secs?
|
||||
# (21 only sends first, 28 skips middle)
|
||||
sleeptime = 31
|
||||
delta = now - self.packet._last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
sleep_time = 31
|
||||
delta = now - self.packet.last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleep_time):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
elif self._loop_cnt % 5 == 0:
|
||||
elif self.loop_count % 10 == 0:
|
||||
LOG.debug(f"Still wating. {delta}")
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
cl = client.factory.create().client
|
||||
self.packet.log("Sending ACK")
|
||||
self.packet.log("TX")
|
||||
cl.send(self.packet.raw)
|
||||
self.packet._send_count += 1
|
||||
self.packet.send_count += 1
|
||||
stats.APRSDStats().ack_tx_inc()
|
||||
packet_list.PacketList().add(self.packet)
|
||||
self.packet._last_send_attempt += 1
|
||||
self.packet._last_send_time = datetime.datetime.now()
|
||||
packet_list.PacketList().tx(self.packet)
|
||||
self.packet.last_send_attempt += 1
|
||||
self.packet.last_send_time = datetime.datetime.now()
|
||||
|
||||
time.sleep(1)
|
||||
self._loop_cnt += 1
|
||||
self.loop_count += 1
|
||||
return True
|
||||
|
@ -1,3 +1,4 @@
|
||||
import abc
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
@ -9,7 +10,7 @@ from aprsd import config as aprsd_config
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class ObjectStoreMixin:
|
||||
class ObjectStoreMixin(metaclass=abc.ABCMeta):
|
||||
"""Class 'MIXIN' intended to save/load object data.
|
||||
|
||||
The asumption of how this mixin is used:
|
||||
@ -23,6 +24,13 @@ class ObjectStoreMixin:
|
||||
When APRSD Starts, it calls load()
|
||||
aprsd server -f (flush) will wipe all saved objects.
|
||||
"""
|
||||
@abc.abstractmethod
|
||||
def is_initialized(self):
|
||||
"""Return True if the class has been setup correctly.
|
||||
|
||||
If this returns False, the ObjectStore doesn't save anything.
|
||||
|
||||
"""
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
@ -36,13 +44,16 @@ class ObjectStoreMixin:
|
||||
return self.data[id]
|
||||
|
||||
def _init_store(self):
|
||||
sl = self._save_location()
|
||||
if not os.path.exists(sl):
|
||||
LOG.warning(f"Save location {sl} doesn't exist")
|
||||
try:
|
||||
os.makedirs(sl)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
if self.is_initialized():
|
||||
sl = self._save_location()
|
||||
if not os.path.exists(sl):
|
||||
LOG.warning(f"Save location {sl} doesn't exist")
|
||||
try:
|
||||
os.makedirs(sl)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
else:
|
||||
LOG.warning(f"{self.__class__.__name__} is not initialized")
|
||||
|
||||
def _save_location(self):
|
||||
save_location = self.config.get("aprsd.save_location", None)
|
||||
@ -68,38 +79,45 @@ class ObjectStoreMixin:
|
||||
|
||||
def save(self):
|
||||
"""Save any queued to disk?"""
|
||||
if len(self) > 0:
|
||||
LOG.info(f"{self.__class__.__name__}::Saving {len(self)} entries to disk at {self._save_location()}")
|
||||
with open(self._save_filename(), "wb+") as fp:
|
||||
pickle.dump(self._dump(), fp)
|
||||
else:
|
||||
LOG.debug(
|
||||
"{} Nothing to save, flushing old save file '{}'".format(
|
||||
self.__class__.__name__,
|
||||
self._save_filename(),
|
||||
),
|
||||
)
|
||||
self.flush()
|
||||
if self.is_initialized():
|
||||
if len(self) > 0:
|
||||
LOG.info(
|
||||
f"{self.__class__.__name__}::Saving"
|
||||
f" {len(self)} entries to disk at"
|
||||
f"{self._save_location()}",
|
||||
)
|
||||
with open(self._save_filename(), "wb+") as fp:
|
||||
pickle.dump(self._dump(), fp)
|
||||
else:
|
||||
LOG.debug(
|
||||
"{} Nothing to save, flushing old save file '{}'".format(
|
||||
self.__class__.__name__,
|
||||
self._save_filename(),
|
||||
),
|
||||
)
|
||||
self.flush()
|
||||
|
||||
def load(self):
|
||||
if os.path.exists(self._save_filename()):
|
||||
try:
|
||||
with open(self._save_filename(), "rb") as fp:
|
||||
raw = pickle.load(fp)
|
||||
if raw:
|
||||
self.data = raw
|
||||
LOG.debug(
|
||||
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
|
||||
)
|
||||
LOG.debug(f"{self.data}")
|
||||
except (pickle.UnpicklingError, Exception) as ex:
|
||||
LOG.error(f"Failed to UnPickle {self._save_filename()}")
|
||||
LOG.error(ex)
|
||||
self.data = {}
|
||||
if self.is_initialized():
|
||||
if os.path.exists(self._save_filename()):
|
||||
try:
|
||||
with open(self._save_filename(), "rb") as fp:
|
||||
raw = pickle.load(fp)
|
||||
if raw:
|
||||
self.data = raw
|
||||
LOG.debug(
|
||||
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
|
||||
)
|
||||
LOG.debug(f"{self.data}")
|
||||
except (pickle.UnpicklingError, Exception) as ex:
|
||||
LOG.error(f"Failed to UnPickle {self._save_filename()}")
|
||||
LOG.error(ex)
|
||||
self.data = {}
|
||||
|
||||
def flush(self):
|
||||
"""Nuke the old pickle file that stored the old results from last aprsd run."""
|
||||
if os.path.exists(self._save_filename()):
|
||||
pathlib.Path(self._save_filename()).unlink()
|
||||
with self.lock:
|
||||
self.data = {}
|
||||
if self.is_initialized():
|
||||
if os.path.exists(self._save_filename()):
|
||||
pathlib.Path(self._save_filename()).unlink()
|
||||
with self.lock:
|
||||
self.data = {}
|
||||
|
@ -1,6 +1,9 @@
|
||||
class RingBuffer:
|
||||
"""class that implements a not-yet-full buffer"""
|
||||
|
||||
max: int = 100
|
||||
data: list = []
|
||||
|
||||
def __init__(self, size_max):
|
||||
self.max = size_max
|
||||
self.data = []
|
||||
|
Loading…
Reference in New Issue
Block a user