From 9b0c626b59406102cdab85b7a9de20cf53563953 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 22 Nov 2022 13:19:28 -0500 Subject: [PATCH] Update aprsd thread base class to use queue This patch updates the main aprsd threads class to use a shared queue to notify all aprsd thread classes they need to exit. This ensures any closing down of sockets, etc happens from inside the context of the thread itself, not the MainThread that calls stop. --- aprsd/threads/aprsd.py | 56 ++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index d981252..819ee7f 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -1,7 +1,10 @@ import abc import logging +from queue import Queue import threading +import wrapt + LOG = logging.getLogger("APRSD") @@ -12,41 +15,64 @@ class APRSDThreadList: _instance = None threads_list = [] - lock = None + lock = threading.Lock() + global_queue = Queue() def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls.lock = threading.Lock() cls.threads_list = [] return cls._instance + @wrapt.synchronized(lock) def add(self, thread_obj): - with self.lock: - self.threads_list.append(thread_obj) + thread_obj.set_global_queue(self.global_queue) + self.threads_list.append(thread_obj) + @wrapt.synchronized(lock) def remove(self, thread_obj): - with self.lock: - self.threads_list.remove(thread_obj) + self.threads_list.remove(thread_obj) + @wrapt.synchronized(lock) def stop_all(self): + self.global_queue.put_nowait({"quit": True}) """Iterate over all threads and call stop on them.""" - with self.lock: - for th in self.threads_list: - LOG.debug(f"Stopping Thread {th.name}") - th.stop() + for th in self.threads_list: + LOG.info(f"Stopping Thread {th.name}") + th.stop() + @wrapt.synchronized(lock) def __len__(self): - with self.lock: - return len(self.threads_list) + return len(self.threads_list) class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + + global_queue = None + def __init__(self, name): super().__init__(name=name) self.thread_stop = False APRSDThreadList().add(self) + def set_global_queue(self, global_queue): + self.global_queue = global_queue + + def _should_quit(self): + """ see if we have a quit message from the global queue.""" + if self.thread_stop: + return True + if self.global_queue.empty(): + return False + msg = self.global_queue.get(timeout=1) + if not msg: + return False + if "quit" in msg and msg["quit"] is True: + # put the message back on the queue for others + self.global_queue.put_nowait(msg) + self.thread_stop = True + return True + def stop(self): self.thread_stop = True @@ -54,11 +80,15 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def loop(self): pass + def _cleanup(self): + """Add code to subclass to do any cleanup""" + def run(self): LOG.debug("Starting") - while not self.thread_stop: + while not self._should_quit(): can_loop = self.loop() if not can_loop: self.stop() + self._cleanup() APRSDThreadList().remove(self) LOG.debug("Exiting")