1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-06-25 13:35:20 -04:00

Cleaned up packet transmit class attributes

This patch cleans up the Packet class attributes used to
keep track of how many times packets have been sent and
the last time they were sent.  This is used by the PacketTracker
and the tx threads for transmitting packets
This commit is contained in:
Hemna 2022-12-17 18:00:26 -05:00
parent f1de7bc681
commit c201c93b5d
4 changed files with 70 additions and 52 deletions

View File

@ -31,31 +31,41 @@ def _int_timestamp():
return int(round(time.time())) return int(round(time.time()))
@dataclass() def _init_msgNo(): # noqa: N802
"""For some reason __post__init doesn't get called.
So in order to initialize the msgNo field in the packet
we use this workaround.
"""
c = counter.PacketCounter()
c.increment()
return c.value
@dataclass
class Packet(metaclass=abc.ABCMeta): class Packet(metaclass=abc.ABCMeta):
from_call: str from_call: str
to_call: str to_call: str
addresse: str = None addresse: str = None
format: str = None format: str = None
msgNo: str = None # noqa: N815 msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
packet_type: str = None packet_type: str = None
timestamp: float = field(default_factory=_int_timestamp) timestamp: float = field(default_factory=_int_timestamp)
# Holds the raw text string to be sent over the wire
# or holds the raw string from input packet
raw: str = None raw: str = None
_raw_dict: dict = field(repr=False, default_factory=lambda: {}) raw_dict: dict = field(repr=False, default_factory=lambda: {})
_retry_count = 3
_last_send_time = 0 # Fields related to sending packets out
_last_send_attempt = 0 send_count: int = field(repr=False, default=1)
retry_count: int = field(repr=False, default=3)
last_send_time: datetime.timedelta = field(repr=False, default=None)
last_send_attempt: int = field(repr=False, default=0)
# Do we allow this packet to be saved to send later? # Do we allow this packet to be saved to send later?
_allow_delay = True allow_delay: bool = field(repr=False, default=True)
_transport = None def __post__init__(self):
_raw_message = None LOG.warning(f"POST INIT {self}")
def __post__init(self):
if not self.msgNo:
c = counter.PacketCounter()
c.increment()
self.msgNo = c.value
def get(self, key, default=None): def get(self, key, default=None):
"""Emulate a getter on a dict.""" """Emulate a getter on a dict."""
@ -76,7 +86,7 @@ class Packet(metaclass=abc.ABCMeta):
@staticmethod @staticmethod
def factory(raw_packet): def factory(raw_packet):
raw = raw_packet raw = raw_packet
raw["_raw_dict"] = raw.copy() raw["raw_dict"] = raw.copy()
translate_fields = { translate_fields = {
"from": "from_call", "from": "from_call",
"to": "to_call", "to": "to_call",
@ -110,15 +120,16 @@ class Packet(metaclass=abc.ABCMeta):
"""LOG a packet to the logfile.""" """LOG a packet to the logfile."""
asdict(self) asdict(self)
log_list = ["\n"] log_list = ["\n"]
name = self.__class__.__name__
if header: if header:
if isinstance(self, AckPacket): if isinstance(self, AckPacket) and "tx" in header.lower():
log_list.append( log_list.append(
f"{header} ___________" f"{header}____________({name}__"
f"(TX:{self._send_count} of {self._retry_count})", f"TX:{self.send_count} of {self.retry_count})",
) )
else: else:
log_list.append(f"{header} _______________") log_list.append(f"{header}____________({name})")
log_list.append(f" Packet : {self.__class__.__name__}") # log_list.append(f" Packet : {self.__class__.__name__}")
log_list.append(f" Raw : {self.raw}") log_list.append(f" Raw : {self.raw}")
if self.to_call: if self.to_call:
log_list.append(f" To : {self.to_call}") log_list.append(f" To : {self.to_call}")
@ -137,7 +148,7 @@ class Packet(metaclass=abc.ABCMeta):
if self.msgNo: if self.msgNo:
log_list.append(f" Msg # : {self.msgNo}") log_list.append(f" Msg # : {self.msgNo}")
log_list.append(f"{header} _______________ Complete") log_list.append(f"{header}____________({name})")
LOG.info("\n".join(log_list)) LOG.info("\n".join(log_list))
LOG.debug(self) LOG.debug(self)
@ -165,12 +176,12 @@ class Packet(metaclass=abc.ABCMeta):
cl = aprsis_client cl = aprsis_client
else: else:
cl = client.factory.create().client cl = client.factory.create().client
self.log(header="Sending Message Direct") self.log(header="TX Message Direct")
cl.send(self.raw) cl.send(self.raw)
stats.APRSDStats().msgs_tx_inc() stats.APRSDStats().msgs_tx_inc()
@dataclass() @dataclass
class PathPacket(Packet): class PathPacket(Packet):
path: List[str] = field(default_factory=list) path: List[str] = field(default_factory=list)
via: str = None via: str = None
@ -179,10 +190,13 @@ class PathPacket(Packet):
raise NotImplementedError raise NotImplementedError
@dataclass() @dataclass
class AckPacket(PathPacket): class AckPacket(PathPacket):
response: str = None response: str = None
_send_count = 1
def __post__init__(self):
if self.response:
LOG.warning("Response set!")
def _build_raw(self): def _build_raw(self):
"""Build the self.raw which is what is sent over the air.""" """Build the self.raw which is what is sent over the air."""
@ -200,7 +214,7 @@ class AckPacket(PathPacket):
thread.start() thread.start()
@dataclass() @dataclass
class MessagePacket(PathPacket): class MessagePacket(PathPacket):
message_text: str = None message_text: str = None

View File

@ -43,8 +43,9 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
message_text=( message_text=(
f"{fromcall} was just seen by type:'{packet_type}'" f"{fromcall} was just seen by type:'{packet_type}'"
), ),
allow_delay=False,
) )
pkt._allow_delay = False # pkt.allow_delay = False
return pkt return pkt
else: else:
LOG.debug("fromcall and notify_callsign are the same, not notifying") LOG.debug("fromcall and notify_callsign are the same, not notifying")

View File

@ -66,7 +66,7 @@ class APRSDPluginRXThread(APRSDRXThread):
def process_packet(self, *args, **kwargs): def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs) packet = self._client.decode_packet(*args, **kwargs)
# LOG.debug(raw) # LOG.debug(raw)
packet.log(header="RX Packet") packet.log(header="RX")
thread = APRSDPluginProcessPacketThread( thread = APRSDPluginProcessPacketThread(
config=self.config, config=self.config,
packet=packet, packet=packet,
@ -92,7 +92,6 @@ class APRSDProcessPacketThread(APRSDThread):
def process_ack_packet(self, packet): def process_ack_packet(self, packet):
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.info(f"Got ack for message {ack_num}") LOG.info(f"Got ack for message {ack_num}")
packet.log("RXACK")
pkt_tracker = packets.PacketTrack() pkt_tracker = packets.PacketTrack()
pkt_tracker.remove(ack_num) pkt_tracker.remove(ack_num)
stats.APRSDStats().ack_rx_inc() stats.APRSDStats().ack_rx_inc()

View File

@ -11,6 +11,8 @@ LOG = logging.getLogger("APRSD")
class SendPacketThread(aprsd_threads.APRSDThread): class SendPacketThread(aprsd_threads.APRSDThread):
loop_count: int = 1
def __init__(self, packet): def __init__(self, packet):
self.packet = packet self.packet = packet
name = self.packet.raw[:5] name = self.packet.raw[:5]
@ -19,7 +21,6 @@ class SendPacketThread(aprsd_threads.APRSDThread):
pkt_tracker.add(packet) pkt_tracker.add(packet)
def loop(self): def loop(self):
LOG.debug("TX Loop")
"""Loop until a message is acked or it gets delayed. """Loop until a message is acked or it gets delayed.
We only sleep for 5 seconds between each loop run, so We only sleep for 5 seconds between each loop run, so
@ -39,20 +40,20 @@ class SendPacketThread(aprsd_threads.APRSDThread):
return False return False
else: else:
send_now = False send_now = False
if packet._last_send_attempt == packet._retry_count: if packet.last_send_attempt == packet.retry_count:
# we reached the send limit, don't send again # we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue? # TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Message Send Complete. Max attempts reached.") LOG.info("Message Send Complete. Max attempts reached.")
if not packet._allow_delay: if not packet.allow_delay:
pkt_tracker.remove(packet.msgNo) pkt_tracker.remove(packet.msgNo)
return False return False
# Message is still outstanding and needs to be acked. # Message is still outstanding and needs to be acked.
if packet._last_send_time: if packet.last_send_time:
# Message has a last send time tracking # Message has a last send time tracking
now = datetime.datetime.now() now = datetime.datetime.now()
sleeptime = (packet._last_send_attempt + 1) * 31 sleeptime = (packet.last_send_attempt + 1) * 31
delta = now - packet._last_send_time delta = now - packet.last_send_time
if delta > datetime.timedelta(seconds=sleeptime): if delta > datetime.timedelta(seconds=sleeptime):
# It's time to try to send it again # It's time to try to send it again
send_now = True send_now = True
@ -62,59 +63,62 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if send_now: if send_now:
# no attempt time, so lets send it, and start # no attempt time, so lets send it, and start
# tracking the time. # tracking the time.
packet.log("Sending Message") packet.log("TX")
cl = client.factory.create().client cl = client.factory.create().client
cl.send(packet.raw) cl.send(packet.raw)
stats.APRSDStats().msgs_tx_inc() stats.APRSDStats().msgs_tx_inc()
packet_list.PacketList().add(packet) packet_list.PacketList().add(packet)
packet._last_send_time = datetime.datetime.now() packet.last_send_time = datetime.datetime.now()
packet._last_send_attempt += 1 packet.last_send_attempt += 1
time.sleep(5) time.sleep(1)
# Make sure we get called again. # Make sure we get called again.
self.loop_count += 1
return True return True
class SendAckThread(aprsd_threads.APRSDThread): class SendAckThread(aprsd_threads.APRSDThread):
loop_count: int = 1
def __init__(self, packet): def __init__(self, packet):
self.packet = packet self.packet = packet
super().__init__(f"SendAck-{self.packet.msgNo}") super().__init__(f"SendAck-{self.packet.msgNo}")
self._loop_cnt = 1
def loop(self): def loop(self):
"""Separate thread to send acks with retries.""" """Separate thread to send acks with retries."""
send_now = False send_now = False
if self.packet._last_send_attempt == self.packet._retry_count: if self.packet.last_send_attempt == self.packet.retry_count:
# we reached the send limit, don't send again # we reached the send limit, don't send again
# TODO(hemna) - Need to put this in a delayed queue? # TODO(hemna) - Need to put this in a delayed queue?
LOG.info("Ack Send Complete. Max attempts reached.") LOG.info("Ack Send Complete. Max attempts reached.")
return False return False
if self.packet._last_send_time: if self.packet.last_send_time:
# Message has a last send time tracking # Message has a last send time tracking
now = datetime.datetime.now() now = datetime.datetime.now()
# aprs duplicate detection is 30 secs? # aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle) # (21 only sends first, 28 skips middle)
sleeptime = 31 sleep_time = 31
delta = now - self.packet._last_send_time delta = now - self.packet.last_send_time
if delta > datetime.timedelta(seconds=sleeptime): if delta > datetime.timedelta(seconds=sleep_time):
# It's time to try to send it again # It's time to try to send it again
send_now = True send_now = True
elif self._loop_cnt % 5 == 0: elif self.loop_count % 10 == 0:
LOG.debug(f"Still wating. {delta}") LOG.debug(f"Still wating. {delta}")
else: else:
send_now = True send_now = True
if send_now: if send_now:
cl = client.factory.create().client cl = client.factory.create().client
self.packet.log("Sending ACK") self.packet.log("TX")
cl.send(self.packet.raw) cl.send(self.packet.raw)
self.packet._send_count += 1 self.packet.send_count += 1
stats.APRSDStats().ack_tx_inc() stats.APRSDStats().ack_tx_inc()
packet_list.PacketList().add(self.packet) packet_list.PacketList().add(self.packet)
self.packet._last_send_attempt += 1 self.packet.last_send_attempt += 1
self.packet._last_send_time = datetime.datetime.now() self.packet.last_send_time = datetime.datetime.now()
time.sleep(1) time.sleep(1)
self._loop_cnt += 1 self.loop_count += 1
return True return True