1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-24 08:58:49 -05:00

Compare commits

...

2 Commits

Author SHA1 Message Date
1187f1ed73 Make tracking objectstores work w/o initializing
This changes the objectstore to test to see if the config has been
set or not.  if not, then it doesn't try to save/load from disk.
2022-12-17 20:06:28 -05:00
c201c93b5d Cleaned up packet transmit class attributes
This patch cleans up the Packet class attributes used to
keep track of how many times packets have been sent and
the last time they were sent.  This is used by the PacketTracker
and the tx threads for transmitting packets
2022-12-17 18:06:24 -05:00
15 changed files with 185 additions and 118 deletions

View File

@ -40,7 +40,8 @@ def signal_handler(sig, frame):
class APRSDListenThread(rx.APRSDRXThread):
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
packet.log(header="RX Packet")
packet.log(header="RX")
packets.PacketList().rx(packet)
@cli.command()
@ -113,9 +114,6 @@ def listen(
# Try and load saved MsgTrack list
LOG.debug("Loading saved MsgTrack object.")
packets.PacketTrack(config=config).load()
packets.WatchList(config=config).load()
packets.SeenList(config=config).load()
# Initialize the client factory and create
# The correct client object ready for use
@ -133,8 +131,6 @@ def listen(
LOG.debug(f"Filter by '{filter}'")
aprs_client.set_filter(filter)
packets.PacketList(config=config)
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()

View File

@ -181,7 +181,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
packet.get("addresse", None)
fromcall = packet.from_call
packets.PacketList().add(packet)
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
message = packet.get("message_text", None)
msg = {

View File

@ -209,7 +209,7 @@ class SendMessageThread(threads.APRSDRXThread):
def process_our_message_packet(self, packet):
global socketio
packets.PacketList().add(packet)
packets.PacketList().rx(packet)
stats.APRSDStats().msgs_rx_inc()
msg_number = packet.msgNo
SentMessages().reply(self.packet.msgNo, packet)

View File

@ -31,31 +31,41 @@ def _int_timestamp():
return int(round(time.time()))
@dataclass()
def _init_msgNo(): # noqa: N802
"""For some reason __post__init doesn't get called.
So in order to initialize the msgNo field in the packet
we use this workaround.
"""
c = counter.PacketCounter()
c.increment()
return c.value
@dataclass
class Packet(metaclass=abc.ABCMeta):
from_call: str
to_call: str
addresse: str = None
format: str = None
msgNo: str = None # noqa: N815
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
packet_type: str = None
timestamp: float = field(default_factory=_int_timestamp)
# Holds the raw text string to be sent over the wire
# or holds the raw string from input packet
raw: str = None
_raw_dict: dict = field(repr=False, default_factory=lambda: {})
_retry_count = 3
_last_send_time = 0
_last_send_attempt = 0
raw_dict: dict = field(repr=False, default_factory=lambda: {})
# Fields related to sending packets out
send_count: int = field(repr=False, default=1)
retry_count: int = field(repr=False, default=3)
last_send_time: datetime.timedelta = field(repr=False, default=None)
last_send_attempt: int = field(repr=False, default=0)
# Do we allow this packet to be saved to send later?
_allow_delay = True
allow_delay: bool = field(repr=False, default=True)
_transport = None
_raw_message = None
def __post__init(self):
if not self.msgNo:
c = counter.PacketCounter()
c.increment()
self.msgNo = c.value
def __post__init__(self):
LOG.warning(f"POST INIT {self}")
def get(self, key, default=None):
"""Emulate a getter on a dict."""
@ -76,7 +86,7 @@ class Packet(metaclass=abc.ABCMeta):
@staticmethod
def factory(raw_packet):
raw = raw_packet
raw["_raw_dict"] = raw.copy()
raw["raw_dict"] = raw.copy()
translate_fields = {
"from": "from_call",
"to": "to_call",
@ -110,15 +120,16 @@ class Packet(metaclass=abc.ABCMeta):
"""LOG a packet to the logfile."""
asdict(self)
log_list = ["\n"]
name = self.__class__.__name__
if header:
if isinstance(self, AckPacket):
if isinstance(self, AckPacket) and "tx" in header.lower():
log_list.append(
f"{header} ___________"
f"(TX:{self._send_count} of {self._retry_count})",
f"{header}____________({name}__"
f"TX:{self.send_count} of {self.retry_count})",
)
else:
log_list.append(f"{header} _______________")
log_list.append(f" Packet : {self.__class__.__name__}")
log_list.append(f"{header}____________({name})")
# log_list.append(f" Packet : {self.__class__.__name__}")
log_list.append(f" Raw : {self.raw}")
if self.to_call:
log_list.append(f" To : {self.to_call}")
@ -137,7 +148,7 @@ class Packet(metaclass=abc.ABCMeta):
if self.msgNo:
log_list.append(f" Msg # : {self.msgNo}")
log_list.append(f"{header} _______________ Complete")
log_list.append(f"{header}____________({name})")
LOG.info("\n".join(log_list))
LOG.debug(self)
@ -165,12 +176,12 @@ class Packet(metaclass=abc.ABCMeta):
cl = aprsis_client
else:
cl = client.factory.create().client
self.log(header="Sending Message Direct")
self.log(header="TX Message Direct")
cl.send(self.raw)
stats.APRSDStats().msgs_tx_inc()
@dataclass()
@dataclass
class PathPacket(Packet):
path: List[str] = field(default_factory=list)
via: str = None
@ -179,10 +190,13 @@ class PathPacket(Packet):
raise NotImplementedError
@dataclass()
@dataclass
class AckPacket(PathPacket):
response: str = None
_send_count = 1
def __post__init__(self):
if self.response:
LOG.warning("Response set!")
def _build_raw(self):
"""Build the self.raw which is what is sent over the air."""
@ -200,7 +214,7 @@ class AckPacket(PathPacket):
thread.start()
@dataclass()
@dataclass
class MessagePacket(PathPacket):
message_text: str = None

View File

@ -1,6 +1,5 @@
import logging
import threading
import time
import wrapt
@ -18,32 +17,40 @@ class PacketList:
lock = threading.Lock()
config = None
packet_list = utils.RingBuffer(1000)
packet_list: utils.RingBuffer = utils.RingBuffer(1000)
total_recv = 0
total_tx = 0
total_recv: int = 0
total_tx: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.config = kwargs["config"]
if "config" in kwargs:
cls._instance.config = kwargs["config"]
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
def _is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def __iter__(self):
return iter(self.packet_list)
@wrapt.synchronized(lock)
def add(self, packet):
packet.ts = time.time()
if packet.from_call == self.config["aprs"]["login"]:
self.total_tx += 1
else:
self.total_recv += 1
def rx(self, packet):
"""Add a packet that was received."""
self.total_recv += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)
@wrapt.synchronized(lock)
def tx(self, packet):
"""Add a packet that was received."""
self.total_tx += 1
self.packet_list.append(packet)
seen_list.SeenList().update_seen(packet)

View File

@ -15,18 +15,22 @@ class SeenList(objectstore.ObjectStoreMixin):
_instance = None
lock = threading.Lock()
data = {}
data: dict = {}
config = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
if "config" in kwargs:
cls._instance.config = kwargs["config"]
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
cls._instance.data = {}
return cls._instance
def is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def update_seen(self, packet):
callsign = None

View File

@ -23,18 +23,23 @@ class PacketTrack(objectstore.ObjectStoreMixin):
_instance = None
_start_time = None
lock = threading.Lock()
config = None
data = {}
total_tracked = 0
data: dict = {}
total_tracked: int = 0
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._start_time = datetime.datetime.now()
cls._instance.config = kwargs["config"]
if "config" in kwargs:
cls._instance.config = kwargs["config"]
cls._instance._init_store()
return cls._instance
def is_initialized(self):
return self.config is not None
@wrapt.synchronized(lock)
def __getitem__(self, name):
return self.data[name]

View File

@ -47,6 +47,9 @@ class WatchList(objectstore.ObjectStoreMixin):
),
}
def is_initialized(self):
return self.config is not None
def is_enabled(self):
if self.config and "watch_list" in self.config["aprsd"]:
return self.config["aprsd"]["watch_list"].get("enabled", False)

View File

@ -43,8 +43,9 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
message_text=(
f"{fromcall} was just seen by type:'{packet_type}'"
),
allow_delay=False,
)
pkt._allow_delay = False
# pkt.allow_delay = False
return pkt
else:
LOG.debug("fromcall and notify_callsign are the same, not notifying")

View File

@ -90,6 +90,14 @@ class APRSDStats:
def set_aprsis_keepalive(self):
self._aprsis_keepalive = datetime.datetime.now()
def rx_packet(self, packet):
if isinstance(packet, packets.MessagePacket):
self.msgs_rx_inc()
elif isinstance(packet, packets.MicEPacket):
self.msgs_mice_inc()
elif isinstance(packet, packets.AckPacket):
self.ack_rx_inc()
@wrapt.synchronized(lock)
@property
def msgs_tx(self):

View File

@ -45,6 +45,11 @@ class KeepAliveThread(APRSDThread):
except KeyError:
login = self.config["ham"]["callsign"]
if pkt_tracker.is_initialized():
tracked_packets = len(pkt_tracker)
else:
tracked_packets = 0
keepalive = (
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}"
@ -53,7 +58,7 @@ class KeepAliveThread(APRSDThread):
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
len(pkt_tracker),
tracked_packets,
stats_obj.msgs_tx,
stats_obj.msgs_rx,
last_msg_time,

View File

@ -66,7 +66,8 @@ class APRSDPluginRXThread(APRSDRXThread):
def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
# LOG.debug(raw)
packet.log(header="RX Packet")
packet.log(header="RX")
packets.PacketList().rx(packet)
thread = APRSDPluginProcessPacketThread(
config=self.config,
packet=packet,
@ -92,7 +93,6 @@ class APRSDProcessPacketThread(APRSDThread):
def process_ack_packet(self, packet):
ack_num = packet.msgNo
LOG.info(f"Got ack for message {ack_num}")
packet.log("RXACK")
pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num)
stats.APRSDStats().ack_rx_inc()
@ -102,7 +102,6 @@ class APRSDProcessPacketThread(APRSDThread):
"""Process a packet received from aprs-is server."""
LOG.debug(f"RXPKT-LOOP {self._loop_cnt}")
packet = self.packet
packets.PacketList().add(packet)
our_call = self.config["aprsd"]["callsign"].lower()
from_call = packet.from_call

View File

@ -11,6 +11,8 @@ LOG = logging.getLogger("APRSD")
class SendPacketThread(aprsd_threads.APRSDThread):
loop_count: int = 1
def __init__(self, packet):
self.packet = packet
name = self.packet.raw[:5]
@ -19,7 +21,6 @@ class SendPacketThread(aprsd_threads.APRSDThread):
pkt_tracker.add(packet)
def loop(self):
LOG.debug("TX Loop")
"""Loop until a message is acked or it gets delayed.
We only sleep for 5 seconds between each loop run, so
@ -39,20 +40,20 @@ class SendPacketThread(aprsd_threads.APRSDThread):
return False
else:
send_now = False
if packet._last_send_attempt == packet._retry_count:
if packet.last_send_attempt == packet.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Message Send Complete. Max attempts reached.")
if not packet._allow_delay:
if not packet.allow_delay:
pkt_tracker.remove(packet.msgNo)
return False
# Message is still outstanding and needs to be acked.
if packet._last_send_time:
if packet.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
sleeptime = (packet._last_send_attempt + 1) * 31
delta = now - packet._last_send_time
sleeptime = (packet.last_send_attempt + 1) * 31
delta = now - packet.last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again
send_now = True
@ -62,59 +63,62 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if send_now:
# no attempt time, so lets send it, and start
# tracking the time.
packet.log("Sending Message")
packet.log("TX")
cl = client.factory.create().client
cl.send(packet.raw)
stats.APRSDStats().msgs_tx_inc()
packet_list.PacketList().add(packet)
packet._last_send_time = datetime.datetime.now()
packet._last_send_attempt += 1
packet_list.PacketList().tx(packet)
packet.last_send_time = datetime.datetime.now()
packet.last_send_attempt += 1
time.sleep(5)
time.sleep(1)
# Make sure we get called again.
self.loop_count += 1
return True
class SendAckThread(aprsd_threads.APRSDThread):
loop_count: int = 1
def __init__(self, packet):
self.packet = packet
super().__init__(f"SendAck-{self.packet.msgNo}")
self._loop_cnt = 1
def loop(self):
"""Separate thread to send acks with retries."""
send_now = False
if self.packet._last_send_attempt == self.packet._retry_count:
if self.packet.last_send_attempt == self.packet.retry_count:
# we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Ack Send Complete. Max attempts reached.")
return False
if self.packet._last_send_time:
if self.packet.last_send_time:
# Message has a last send time tracking
now = datetime.datetime.now()
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
sleeptime = 31
delta = now - self.packet._last_send_time
if delta > datetime.timedelta(seconds=sleeptime):
sleep_time = 31
delta = now - self.packet.last_send_time
if delta > datetime.timedelta(seconds=sleep_time):
# It's time to try to send it again
send_now = True
elif self._loop_cnt % 5 == 0:
elif self.loop_count % 10 == 0:
LOG.debug(f"Still wating. {delta}")
else:
send_now = True
if send_now:
cl = client.factory.create().client
self.packet.log("Sending ACK")
self.packet.log("TX")
cl.send(self.packet.raw)
self.packet._send_count += 1
self.packet.send_count += 1
stats.APRSDStats().ack_tx_inc()
packet_list.PacketList().add(self.packet)
self.packet._last_send_attempt += 1
self.packet._last_send_time = datetime.datetime.now()
packet_list.PacketList().tx(self.packet)
self.packet.last_send_attempt += 1
self.packet.last_send_time = datetime.datetime.now()
time.sleep(1)
self._loop_cnt += 1
self.loop_count += 1
return True

View File

@ -1,3 +1,4 @@
import abc
import logging
import os
import pathlib
@ -9,7 +10,7 @@ from aprsd import config as aprsd_config
LOG = logging.getLogger("APRSD")
class ObjectStoreMixin:
class ObjectStoreMixin(metaclass=abc.ABCMeta):
"""Class 'MIXIN' intended to save/load object data.
The asumption of how this mixin is used:
@ -23,6 +24,13 @@ class ObjectStoreMixin:
When APRSD Starts, it calls load()
aprsd server -f (flush) will wipe all saved objects.
"""
@abc.abstractmethod
def is_initialized(self):
"""Return True if the class has been setup correctly.
If this returns False, the ObjectStore doesn't save anything.
"""
def __len__(self):
return len(self.data)
@ -36,13 +44,16 @@ class ObjectStoreMixin:
return self.data[id]
def _init_store(self):
sl = self._save_location()
if not os.path.exists(sl):
LOG.warning(f"Save location {sl} doesn't exist")
try:
os.makedirs(sl)
except Exception as ex:
LOG.exception(ex)
if self.is_initialized():
sl = self._save_location()
if not os.path.exists(sl):
LOG.warning(f"Save location {sl} doesn't exist")
try:
os.makedirs(sl)
except Exception as ex:
LOG.exception(ex)
else:
LOG.warning(f"{self.__class__.__name__} is not initialized")
def _save_location(self):
save_location = self.config.get("aprsd.save_location", None)
@ -68,38 +79,45 @@ class ObjectStoreMixin:
def save(self):
"""Save any queued to disk?"""
if len(self) > 0:
LOG.info(f"{self.__class__.__name__}::Saving {len(self)} entries to disk at {self._save_location()}")
with open(self._save_filename(), "wb+") as fp:
pickle.dump(self._dump(), fp)
else:
LOG.debug(
"{} Nothing to save, flushing old save file '{}'".format(
self.__class__.__name__,
self._save_filename(),
),
)
self.flush()
if self.is_initialized():
if len(self) > 0:
LOG.info(
f"{self.__class__.__name__}::Saving"
f" {len(self)} entries to disk at"
f"{self._save_location()}",
)
with open(self._save_filename(), "wb+") as fp:
pickle.dump(self._dump(), fp)
else:
LOG.debug(
"{} Nothing to save, flushing old save file '{}'".format(
self.__class__.__name__,
self._save_filename(),
),
)
self.flush()
def load(self):
if os.path.exists(self._save_filename()):
try:
with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
)
LOG.debug(f"{self.data}")
except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex)
self.data = {}
if self.is_initialized():
if os.path.exists(self._save_filename()):
try:
with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
)
LOG.debug(f"{self.data}")
except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex)
self.data = {}
def flush(self):
"""Nuke the old pickle file that stored the old results from last aprsd run."""
if os.path.exists(self._save_filename()):
pathlib.Path(self._save_filename()).unlink()
with self.lock:
self.data = {}
if self.is_initialized():
if os.path.exists(self._save_filename()):
pathlib.Path(self._save_filename()).unlink()
with self.lock:
self.data = {}

View File

@ -1,6 +1,9 @@
class RingBuffer:
"""class that implements a not-yet-full buffer"""
max: int = 100
data: list = []
def __init__(self, size_max):
self.max = size_max
self.data = []