diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py index d981252..819ee7f 100644 --- a/aprsd/threads/aprsd.py +++ b/aprsd/threads/aprsd.py @@ -1,7 +1,10 @@ import abc import logging +from queue import Queue import threading +import wrapt + LOG = logging.getLogger("APRSD") @@ -12,41 +15,64 @@ class APRSDThreadList: _instance = None threads_list = [] - lock = None + lock = threading.Lock() + global_queue = Queue() def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls.lock = threading.Lock() cls.threads_list = [] return cls._instance + @wrapt.synchronized(lock) def add(self, thread_obj): - with self.lock: - self.threads_list.append(thread_obj) + thread_obj.set_global_queue(self.global_queue) + self.threads_list.append(thread_obj) + @wrapt.synchronized(lock) def remove(self, thread_obj): - with self.lock: - self.threads_list.remove(thread_obj) + self.threads_list.remove(thread_obj) + @wrapt.synchronized(lock) def stop_all(self): + self.global_queue.put_nowait({"quit": True}) """Iterate over all threads and call stop on them.""" - with self.lock: - for th in self.threads_list: - LOG.debug(f"Stopping Thread {th.name}") - th.stop() + for th in self.threads_list: + LOG.info(f"Stopping Thread {th.name}") + th.stop() + @wrapt.synchronized(lock) def __len__(self): - with self.lock: - return len(self.threads_list) + return len(self.threads_list) class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): + + global_queue = None + def __init__(self, name): super().__init__(name=name) self.thread_stop = False APRSDThreadList().add(self) + def set_global_queue(self, global_queue): + self.global_queue = global_queue + + def _should_quit(self): + """ see if we have a quit message from the global queue.""" + if self.thread_stop: + return True + if self.global_queue.empty(): + return False + msg = self.global_queue.get(timeout=1) + if not msg: + return False + if "quit" in msg and msg["quit"] is True: + # put the message back on the queue for others + self.global_queue.put_nowait(msg) + self.thread_stop = True + return True + def stop(self): self.thread_stop = True @@ -54,11 +80,15 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def loop(self): pass + def _cleanup(self): + """Add code to subclass to do any cleanup""" + def run(self): LOG.debug("Starting") - while not self.thread_stop: + while not self._should_quit(): can_loop = self.loop() if not can_loop: self.stop() + self._cleanup() APRSDThreadList().remove(self) LOG.debug("Exiting")