Updated select timeouts

This patch updates the select timeouts for threads.  This allows
threads to exit quicker when user hits CTRL-C.

Updates the KeepAlive Thread to include total packets.
This commit is contained in:
Hemna 2021-08-24 13:31:33 -04:00
parent 8b5f21eece
commit 0f384b0e85
7 changed files with 54 additions and 48 deletions

View File

@ -84,7 +84,7 @@ class Aprsdis(aprslib.IS):
thread_stop = False
# timeout in seconds
select_timeout = 10
select_timeout = 1
def stop(self):
self.thread_stop = True

View File

@ -294,7 +294,7 @@ def listen(
# TODO(walt) - manually edit this list
# prior to running aprsd-listen listen
watch_list = []
watch_list = ["k*"]
# build last seen list
last_seen = {}

View File

@ -157,7 +157,7 @@ def signal_handler(sig, frame):
datetime.datetime.now(),
),
)
time.sleep(5)
time.sleep(1.5)
tracker = messaging.MsgTrack()
tracker.save()
LOG.info(stats.APRSDStats())
@ -458,15 +458,16 @@ def server(
trace.setup_tracing(["method", "api"])
stats.APRSDStats(config)
# Create the initial PM singleton and Register plugins
plugin_manager = plugin.PluginManager(config)
plugin_manager.setup_plugins()
try:
cl = client.Client(config)
cl.client
except LoginError:
sys.exit(-1)
# Create the initial PM singleton and Register plugins
plugin_manager = plugin.PluginManager(config)
plugin_manager.setup_plugins()
# Now load the msgTrack from disk if any
if flush:
LOG.debug("Deleting saved MsgTrack.")
@ -498,7 +499,7 @@ def server(
messaging.MsgTrack().restart()
keepalive = threads.KeepAliveThread()
keepalive = threads.KeepAliveThread(config=config)
keepalive.start()
try:

View File

@ -545,15 +545,8 @@ def log_packet(packet):
ack = packet.get("ack", None)
log_message(
"Packet",
packet["raw"],
msg,
fromcall=fromcall,
tocall=tocall,
ack=ack,
packet_type=response_type,
msg_num=msg_num,
)
"Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall,
ack=ack, packet_type=response_type, msg_num=msg_num, )
def log_message(

View File

@ -11,7 +11,7 @@ import threading
import pluggy
from thesmuggler import smuggle
from aprsd import messaging, packets, threads
from aprsd import client, messaging, packets, threads
# setup the global logger
@ -137,6 +137,27 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
by a particular HAM callsign, write a plugin based off of
this class.
"""
enabled = False
def setup(self):
# if we have a watch list enabled, we need to add filtering
# to enable seeing packets from the watch list.
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][
"watch_list"
].get("enabled", False):
# watch list is enabled
self.enabled = True
watch_list = self.config["aprsd"]["watch_list"].get(
"callsigns",
[],
)
# make sure the timeout is set or this doesn't work
if watch_list:
aprs_client = client.get_client()
filter_str = "b/{}".format("/".join(watch_list))
aprs_client.set_filter(filter_str)
else:
LOG.warning("Watch list enabled, but no callsigns set.")
def filter(self, packet):
wl = packets.WatchList()

View File

@ -52,6 +52,7 @@ class APRSDThreadList:
"""Iterate over all threads and call stop on them."""
with self.lock:
for th in self.threads_list:
LOG.debug(f"Stopping Thread {th.name}")
th.stop()
@ -82,15 +83,16 @@ class KeepAliveThread(APRSDThread):
cntr = 0
checker_time = datetime.datetime.now()
def __init__(self):
def __init__(self, config):
tracemalloc.start()
super().__init__("KeepAlive")
self.config = config
def loop(self):
if self.cntr % 6 == 0:
tracker = messaging.MsgTrack()
stats_obj = stats.APRSDStats()
packets_list = packets.PacketList().packet_list
pl = packets.PacketList()
now = datetime.datetime.now()
last_email = stats_obj.email_thread_time
if last_email:
@ -104,21 +106,22 @@ class KeepAliveThread(APRSDThread):
stats_obj.set_memory(current)
stats_obj.set_memory_peak(peak)
keepalive = (
"Uptime {} Tracker {} Msgs TX:{} RX:{} "
"Last:{} Email:{} Packets:{} RAM Current:{} "
"Peak:{}"
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} Email: {} - RAM Current:{} Peak:{}"
).format(
self.config["aprs"]["login"],
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
len(tracker),
stats_obj.msgs_tx,
stats_obj.msgs_rx,
last_msg_time,
email_thread_time,
len(packets_list),
utils.human_size(current),
utils.human_size(peak),
)
LOG.debug(keepalive)
LOG.info(keepalive)
# Check version every hour
delta = now - self.checker_time
if delta > datetime.timedelta(hours=1):
@ -127,7 +130,7 @@ class KeepAliveThread(APRSDThread):
if level:
LOG.warning(msg)
self.cntr += 1
time.sleep(10)
time.sleep(1)
return True
@ -144,23 +147,6 @@ class APRSDRXThread(APRSDThread):
def loop(self):
aprs_client = client.get_client()
# if we have a watch list enabled, we need to add filtering
# to enable seeing packets from the watch list.
if "watch_list" in self.config["aprsd"] and self.config["aprsd"][
"watch_list"
].get("enabled", False):
# watch list is enabled
watch_list = self.config["aprsd"]["watch_list"].get(
"callsigns",
[],
)
# make sure the timeout is set or this doesn't work
if watch_list:
filter_str = "p/{}".format("/".join(watch_list))
aprs_client.set_filter(filter_str)
else:
LOG.warning("Watch list enabled, but no callsigns set.")
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
@ -202,14 +188,13 @@ class APRSDRXThread(APRSDThread):
def process_packet(self, packet):
"""Process a packet recieved from aprs-is server."""
packets.PacketList().add(packet)
stats.APRSDStats().msgs_rx_inc()
fromcall = packet["from"]
tocall = packet.get("addresse", None)
msg = packet.get("message_text", None)
msg_id = packet.get("msgNo", "0")
msg_response = packet.get("response", None)
LOG.debug(f"Got packet from '{fromcall}' - {packet}")
# LOG.debug(f"Got packet from '{fromcall}' - {packet}")
# We don't put ack packets destined for us through the
# plugins.
@ -228,6 +213,7 @@ class APRSDRXThread(APRSDThread):
# Only ack messages that were sent directly to us
if tocall == self.config["aprs"]["login"]:
stats.APRSDStats().msgs_rx_inc()
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(
@ -240,7 +226,6 @@ class APRSDRXThread(APRSDThread):
pm = plugin.PluginManager()
try:
results = pm.run(packet)
LOG.debug(f"RESULTS {results}")
replied = False
for reply in results:
if isinstance(reply, list):
@ -306,7 +291,7 @@ class APRSDTXThread(APRSDThread):
def loop(self):
try:
msg = self.msg_queues["tx"].get(timeout=5)
msg = self.msg_queues["tx"].get(timeout=1)
msg.send()
except queue.Empty:
pass

View File

@ -394,8 +394,11 @@ def human_size(bytes, units=None):
return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:])
def strfdelta(tdelta, fmt="{hours}:{minutes}:{seconds}"):
d = {"days": tdelta.days}
def strfdelta(tdelta, fmt="{hours:{width}}:{minutes:{width}}:{seconds:{width}}"):
d = {
"days": tdelta.days,
"width": "02",
}
d["hours"], rem = divmod(tdelta.seconds, 3600)
d["minutes"], d["seconds"] = divmod(rem, 60)
return fmt.format(**d)
@ -460,6 +463,9 @@ class RingBuffer:
"""return list of elements in correct order"""
return self.data[self.cur :] + self.data[: self.cur]
def __len__(self):
return len(self.data)
def append(self, x):
"""append an element at the end of the buffer"""