From 347a6d69f7825a408553a4ee08002917c5a6ab73 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 6 Jul 2022 19:31:59 -0400 Subject: [PATCH] Refactored threads.py This patch creates a threads directory and separates out the contents of threads.py into separate files in the threads directory to make it easier to find and maintain. --- aprsd/threads/__init__.py | 13 +++ aprsd/threads/aprsd.py | 64 ++++++++++++ aprsd/threads/keep_alive.py | 87 ++++++++++++++++ aprsd/{threads.py => threads/rx.py} | 151 +--------------------------- 4 files changed, 166 insertions(+), 149 deletions(-) create mode 100644 aprsd/threads/__init__.py create mode 100644 aprsd/threads/aprsd.py create mode 100644 aprsd/threads/keep_alive.py rename aprsd/{threads.py => threads/rx.py} (60%) diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py new file mode 100644 index 0000000..0927235 --- /dev/null +++ b/aprsd/threads/__init__.py @@ -0,0 +1,13 @@ +import queue + +# Make these available to anyone importing +# aprsd.threads +from .aprsd import APRSDThread, APRSDThreadList +from .keep_alive import KeepAliveThread +from .rx import APRSDRXThread + + +rx_msg_queue = queue.Queue(maxsize=20) +msg_queues = { + "rx": rx_msg_queue, +} diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py new file mode 100644 index 0000000..d981252 --- /dev/null +++ b/aprsd/threads/aprsd.py @@ -0,0 +1,64 @@ +import abc +import logging +import threading + + +LOG = logging.getLogger("APRSD") + + +class APRSDThreadList: + """Singleton class that keeps track of application wide threads.""" + + _instance = None + + threads_list = [] + lock = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls.lock = threading.Lock() + cls.threads_list = [] + return cls._instance + + def add(self, thread_obj): + with self.lock: + self.threads_list.append(thread_obj) + + def remove(self, thread_obj): + with self.lock: + self.threads_list.remove(thread_obj) + + def stop_all(self): + """Iterate over all threads and call stop on them.""" + with self.lock: + for th in self.threads_list: + LOG.debug(f"Stopping Thread {th.name}") + th.stop() + + def __len__(self): + with self.lock: + return len(self.threads_list) + + +class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + def __init__(self, name): + super().__init__(name=name) + self.thread_stop = False + APRSDThreadList().add(self) + + def stop(self): + self.thread_stop = True + + @abc.abstractmethod + def loop(self): + pass + + def run(self): + LOG.debug("Starting") + while not self.thread_stop: + can_loop = self.loop() + if not can_loop: + self.stop() + APRSDThreadList().remove(self) + LOG.debug("Exiting") diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py new file mode 100644 index 0000000..b838665 --- /dev/null +++ b/aprsd/threads/keep_alive.py @@ -0,0 +1,87 @@ +import datetime +import logging +import time +import tracemalloc + +from aprsd import client, messaging, packets, stats, utils +from aprsd.threads import APRSDThread, APRSDThreadList + + +LOG = logging.getLogger("APRSD") + +class KeepAliveThread(APRSDThread): + cntr = 0 + checker_time = datetime.datetime.now() + + def __init__(self, config): + tracemalloc.start() + super().__init__("KeepAlive") + self.config = config + max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} + self.max_delta = datetime.timedelta(**max_timeout) + + def loop(self): + if self.cntr % 60 == 0: + tracker = messaging.MsgTrack() + stats_obj = stats.APRSDStats() + pl = packets.PacketList() + thread_list = APRSDThreadList() + now = datetime.datetime.now() + last_email = stats_obj.email_thread_time + if last_email: + email_thread_time = utils.strfdelta(now - last_email) + else: + email_thread_time = "N/A" + + last_msg_time = utils.strfdelta(now - stats_obj.aprsis_keepalive) + + current, peak = tracemalloc.get_traced_memory() + stats_obj.set_memory(current) + stats_obj.set_memory_peak(peak) + + try: + login = self.config["aprs"]["login"] + except KeyError: + login = self.config["ham"]["callsign"] + + keepalive = ( + "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " + "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" + ).format( + login, + utils.strfdelta(stats_obj.uptime), + pl.total_recv, + pl.total_tx, + len(tracker), + stats_obj.msgs_tx, + stats_obj.msgs_rx, + last_msg_time, + email_thread_time, + utils.human_size(current), + utils.human_size(peak), + len(thread_list), + ) + LOG.info(keepalive) + + # See if we should reset the aprs-is client + # Due to losing a keepalive from them + delta_dict = utils.parse_delta_str(last_msg_time) + delta = datetime.timedelta(**delta_dict) + + if delta > self.max_delta: + # We haven't gotten a keepalive from aprs-is in a while + # reset the connection.a + if not client.KISSClient.is_enabled(self.config): + LOG.warning("Resetting connection to APRS-IS.") + client.factory.create().reset() + + # Check version every hour + delta = now - self.checker_time + if delta > datetime.timedelta(hours=1): + self.checker_time = now + level, msg = utils._check_version() + if level: + LOG.warning(msg) + self.cntr += 1 + time.sleep(1) + return True diff --git a/aprsd/threads.py b/aprsd/threads/rx.py similarity index 60% rename from aprsd/threads.py rename to aprsd/threads/rx.py index c81275e..f9c1cb5 100644 --- a/aprsd/threads.py +++ b/aprsd/threads/rx.py @@ -1,161 +1,14 @@ -import abc -import datetime import logging -import queue -import threading import time -import tracemalloc import aprslib -from aprsd import client, messaging, packets, plugin, stats, utils +from aprsd import client, messaging, packets, plugin, stats +from aprsd.threads import APRSDThread LOG = logging.getLogger("APRSD") -RX_THREAD = "RX" -EMAIL_THREAD = "Email" - -rx_msg_queue = queue.Queue(maxsize=20) -msg_queues = { - "rx": rx_msg_queue, -} - - -class APRSDThreadList: - """Singleton class that keeps track of application wide threads.""" - - _instance = None - - threads_list = [] - lock = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls.lock = threading.Lock() - cls.threads_list = [] - return cls._instance - - def add(self, thread_obj): - with self.lock: - self.threads_list.append(thread_obj) - - def remove(self, thread_obj): - with self.lock: - self.threads_list.remove(thread_obj) - - def stop_all(self): - """Iterate over all threads and call stop on them.""" - with self.lock: - for th in self.threads_list: - LOG.debug(f"Stopping Thread {th.name}") - th.stop() - - def __len__(self): - with self.lock: - return len(self.threads_list) - - -class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): - def __init__(self, name): - super().__init__(name=name) - self.thread_stop = False - APRSDThreadList().add(self) - - def stop(self): - self.thread_stop = True - - @abc.abstractmethod - def loop(self): - pass - - def run(self): - LOG.debug("Starting") - while not self.thread_stop: - can_loop = self.loop() - if not can_loop: - self.stop() - APRSDThreadList().remove(self) - LOG.debug("Exiting") - - -class KeepAliveThread(APRSDThread): - cntr = 0 - checker_time = datetime.datetime.now() - - def __init__(self, config): - tracemalloc.start() - super().__init__("KeepAlive") - self.config = config - max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} - self.max_delta = datetime.timedelta(**max_timeout) - - def loop(self): - if self.cntr % 60 == 0: - tracker = messaging.MsgTrack() - stats_obj = stats.APRSDStats() - pl = packets.PacketList() - thread_list = APRSDThreadList() - now = datetime.datetime.now() - last_email = stats_obj.email_thread_time - if last_email: - email_thread_time = utils.strfdelta(now - last_email) - else: - email_thread_time = "N/A" - - last_msg_time = utils.strfdelta(now - stats_obj.aprsis_keepalive) - - current, peak = tracemalloc.get_traced_memory() - stats_obj.set_memory(current) - stats_obj.set_memory_peak(peak) - - try: - login = self.config["aprs"]["login"] - except KeyError: - login = self.config["ham"]["callsign"] - - keepalive = ( - "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " - "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" - ).format( - login, - utils.strfdelta(stats_obj.uptime), - pl.total_recv, - pl.total_tx, - len(tracker), - stats_obj.msgs_tx, - stats_obj.msgs_rx, - last_msg_time, - email_thread_time, - utils.human_size(current), - utils.human_size(peak), - len(thread_list), - ) - LOG.info(keepalive) - - # See if we should reset the aprs-is client - # Due to losing a keepalive from them - delta_dict = utils.parse_delta_str(last_msg_time) - delta = datetime.timedelta(**delta_dict) - - if delta > self.max_delta: - # We haven't gotten a keepalive from aprs-is in a while - # reset the connection.a - if not client.KISSClient.is_enabled(self.config): - LOG.warning("Resetting connection to APRS-IS.") - client.factory.create().reset() - - # Check version every hour - delta = now - self.checker_time - if delta > datetime.timedelta(hours=1): - self.checker_time = now - level, msg = utils._check_version() - if level: - LOG.warning(msg) - self.cntr += 1 - time.sleep(1) - return True class APRSDRXThread(APRSDThread):