From 0f384b0e856403611dfb00dd6bce63bbcfb665a9 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 24 Aug 2021 13:31:33 -0400 Subject: [PATCH] Updated select timeouts This patch updates the select timeouts for threads. This allows threads to exit quicker when user hits CTRL-C. Updates the KeepAlive Thread to include total packets. --- aprsd/client.py | 2 +- aprsd/listen.py | 2 +- aprsd/main.py | 11 ++++++----- aprsd/messaging.py | 11 ++--------- aprsd/plugin.py | 23 ++++++++++++++++++++++- aprsd/threads.py | 43 ++++++++++++++----------------------------- aprsd/utils.py | 10 ++++++++-- 7 files changed, 54 insertions(+), 48 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 87faf5b..40c9d40 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -84,7 +84,7 @@ class Aprsdis(aprslib.IS): thread_stop = False # timeout in seconds - select_timeout = 10 + select_timeout = 1 def stop(self): self.thread_stop = True diff --git a/aprsd/listen.py b/aprsd/listen.py index 8d8a9c0..4342447 100644 --- a/aprsd/listen.py +++ b/aprsd/listen.py @@ -294,7 +294,7 @@ def listen( # TODO(walt) - manually edit this list # prior to running aprsd-listen listen - watch_list = [] + watch_list = ["k*"] # build last seen list last_seen = {} diff --git a/aprsd/main.py b/aprsd/main.py index 1e1c3c6..a3cbd9e 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -157,7 +157,7 @@ def signal_handler(sig, frame): datetime.datetime.now(), ), ) - time.sleep(5) + time.sleep(1.5) tracker = messaging.MsgTrack() tracker.save() LOG.info(stats.APRSDStats()) @@ -458,15 +458,16 @@ def server( trace.setup_tracing(["method", "api"]) stats.APRSDStats(config) - # Create the initial PM singleton and Register plugins - plugin_manager = plugin.PluginManager(config) - plugin_manager.setup_plugins() try: cl = client.Client(config) cl.client except LoginError: sys.exit(-1) + # Create the initial PM singleton and Register plugins + plugin_manager = plugin.PluginManager(config) + plugin_manager.setup_plugins() + # Now load the msgTrack from disk if any if flush: LOG.debug("Deleting saved MsgTrack.") @@ -498,7 +499,7 @@ def server( messaging.MsgTrack().restart() - keepalive = threads.KeepAliveThread() + keepalive = threads.KeepAliveThread(config=config) keepalive.start() try: diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 00ed7f5..53809e0 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -545,15 +545,8 @@ def log_packet(packet): ack = packet.get("ack", None) log_message( - "Packet", - packet["raw"], - msg, - fromcall=fromcall, - tocall=tocall, - ack=ack, - packet_type=response_type, - msg_num=msg_num, - ) + "Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall, + ack=ack, packet_type=response_type, msg_num=msg_num, ) def log_message( diff --git a/aprsd/plugin.py b/aprsd/plugin.py index b67898d..ebf160c 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -11,7 +11,7 @@ import threading import pluggy from thesmuggler import smuggle -from aprsd import messaging, packets, threads +from aprsd import client, messaging, packets, threads # setup the global logger @@ -137,6 +137,27 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): by a particular HAM callsign, write a plugin based off of this class. """ + enabled = False + + def setup(self): + # if we have a watch list enabled, we need to add filtering + # to enable seeing packets from the watch list. + if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ + "watch_list" + ].get("enabled", False): + # watch list is enabled + self.enabled = True + watch_list = self.config["aprsd"]["watch_list"].get( + "callsigns", + [], + ) + # make sure the timeout is set or this doesn't work + if watch_list: + aprs_client = client.get_client() + filter_str = "b/{}".format("/".join(watch_list)) + aprs_client.set_filter(filter_str) + else: + LOG.warning("Watch list enabled, but no callsigns set.") def filter(self, packet): wl = packets.WatchList() diff --git a/aprsd/threads.py b/aprsd/threads.py index 4ad6be4..aa9f1e5 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -52,6 +52,7 @@ class APRSDThreadList: """Iterate over all threads and call stop on them.""" with self.lock: for th in self.threads_list: + LOG.debug(f"Stopping Thread {th.name}") th.stop() @@ -82,15 +83,16 @@ class KeepAliveThread(APRSDThread): cntr = 0 checker_time = datetime.datetime.now() - def __init__(self): + def __init__(self, config): tracemalloc.start() super().__init__("KeepAlive") + self.config = config def loop(self): if self.cntr % 6 == 0: tracker = messaging.MsgTrack() stats_obj = stats.APRSDStats() - packets_list = packets.PacketList().packet_list + pl = packets.PacketList() now = datetime.datetime.now() last_email = stats_obj.email_thread_time if last_email: @@ -104,21 +106,22 @@ class KeepAliveThread(APRSDThread): stats_obj.set_memory(current) stats_obj.set_memory_peak(peak) keepalive = ( - "Uptime {} Tracker {} Msgs TX:{} RX:{} " - "Last:{} Email:{} Packets:{} RAM Current:{} " - "Peak:{}" + "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " + "Last:{} Email: {} - RAM Current:{} Peak:{}" ).format( + self.config["aprs"]["login"], utils.strfdelta(stats_obj.uptime), + pl.total_recv, + pl.total_tx, len(tracker), stats_obj.msgs_tx, stats_obj.msgs_rx, last_msg_time, email_thread_time, - len(packets_list), utils.human_size(current), utils.human_size(peak), ) - LOG.debug(keepalive) + LOG.info(keepalive) # Check version every hour delta = now - self.checker_time if delta > datetime.timedelta(hours=1): @@ -127,7 +130,7 @@ class KeepAliveThread(APRSDThread): if level: LOG.warning(msg) self.cntr += 1 - time.sleep(10) + time.sleep(1) return True @@ -144,23 +147,6 @@ class APRSDRXThread(APRSDThread): def loop(self): aprs_client = client.get_client() - # if we have a watch list enabled, we need to add filtering - # to enable seeing packets from the watch list. - if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ - "watch_list" - ].get("enabled", False): - # watch list is enabled - watch_list = self.config["aprsd"]["watch_list"].get( - "callsigns", - [], - ) - # make sure the timeout is set or this doesn't work - if watch_list: - filter_str = "p/{}".format("/".join(watch_list)) - aprs_client.set_filter(filter_str) - else: - LOG.warning("Watch list enabled, but no callsigns set.") - # setup the consumer of messages and block until a messages try: # This will register a packet consumer with aprslib @@ -202,14 +188,13 @@ class APRSDRXThread(APRSDThread): def process_packet(self, packet): """Process a packet recieved from aprs-is server.""" packets.PacketList().add(packet) - stats.APRSDStats().msgs_rx_inc() fromcall = packet["from"] tocall = packet.get("addresse", None) msg = packet.get("message_text", None) msg_id = packet.get("msgNo", "0") msg_response = packet.get("response", None) - LOG.debug(f"Got packet from '{fromcall}' - {packet}") + # LOG.debug(f"Got packet from '{fromcall}' - {packet}") # We don't put ack packets destined for us through the # plugins. @@ -228,6 +213,7 @@ class APRSDRXThread(APRSDThread): # Only ack messages that were sent directly to us if tocall == self.config["aprs"]["login"]: + stats.APRSDStats().msgs_rx_inc() # let any threads do their thing, then ack # send an ack last ack = messaging.AckMessage( @@ -240,7 +226,6 @@ class APRSDRXThread(APRSDThread): pm = plugin.PluginManager() try: results = pm.run(packet) - LOG.debug(f"RESULTS {results}") replied = False for reply in results: if isinstance(reply, list): @@ -306,7 +291,7 @@ class APRSDTXThread(APRSDThread): def loop(self): try: - msg = self.msg_queues["tx"].get(timeout=5) + msg = self.msg_queues["tx"].get(timeout=1) msg.send() except queue.Empty: pass diff --git a/aprsd/utils.py b/aprsd/utils.py index 3793b90..295de97 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -394,8 +394,11 @@ def human_size(bytes, units=None): return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:]) -def strfdelta(tdelta, fmt="{hours}:{minutes}:{seconds}"): - d = {"days": tdelta.days} +def strfdelta(tdelta, fmt="{hours:{width}}:{minutes:{width}}:{seconds:{width}}"): + d = { + "days": tdelta.days, + "width": "02", + } d["hours"], rem = divmod(tdelta.seconds, 3600) d["minutes"], d["seconds"] = divmod(rem, 60) return fmt.format(**d) @@ -460,6 +463,9 @@ class RingBuffer: """return list of elements in correct order""" return self.data[self.cur :] + self.data[: self.cur] + def __len__(self): + return len(self.data) + def append(self, x): """append an element at the end of the buffer"""