Update aprsd thread base class to use queue

This patch updates the main aprsd threads class to use
a shared queue to notify all aprsd thread classes they need
to exit.  This ensures any closing down of sockets, etc happens from
inside the context of the thread itself, not the MainThread that
calls stop.
This commit is contained in:
Hemna 2022-11-22 13:19:28 -05:00
parent 967959e7b3
commit 9b0c626b59
1 changed files with 43 additions and 13 deletions

View File

@ -1,7 +1,10 @@
import abc import abc
import logging import logging
from queue import Queue
import threading import threading
import wrapt
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -12,41 +15,64 @@ class APRSDThreadList:
_instance = None _instance = None
threads_list = [] threads_list = []
lock = None lock = threading.Lock()
global_queue = Queue()
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls.lock = threading.Lock()
cls.threads_list = [] cls.threads_list = []
return cls._instance return cls._instance
@wrapt.synchronized(lock)
def add(self, thread_obj): def add(self, thread_obj):
with self.lock: thread_obj.set_global_queue(self.global_queue)
self.threads_list.append(thread_obj) self.threads_list.append(thread_obj)
@wrapt.synchronized(lock)
def remove(self, thread_obj): def remove(self, thread_obj):
with self.lock: self.threads_list.remove(thread_obj)
self.threads_list.remove(thread_obj)
@wrapt.synchronized(lock)
def stop_all(self): def stop_all(self):
self.global_queue.put_nowait({"quit": True})
"""Iterate over all threads and call stop on them.""" """Iterate over all threads and call stop on them."""
with self.lock: for th in self.threads_list:
for th in self.threads_list: LOG.info(f"Stopping Thread {th.name}")
LOG.debug(f"Stopping Thread {th.name}") th.stop()
th.stop()
@wrapt.synchronized(lock)
def __len__(self): def __len__(self):
with self.lock: return len(self.threads_list)
return len(self.threads_list)
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
global_queue = None
def __init__(self, name): def __init__(self, name):
super().__init__(name=name) super().__init__(name=name)
self.thread_stop = False self.thread_stop = False
APRSDThreadList().add(self) APRSDThreadList().add(self)
def set_global_queue(self, global_queue):
self.global_queue = global_queue
def _should_quit(self):
""" see if we have a quit message from the global queue."""
if self.thread_stop:
return True
if self.global_queue.empty():
return False
msg = self.global_queue.get(timeout=1)
if not msg:
return False
if "quit" in msg and msg["quit"] is True:
# put the message back on the queue for others
self.global_queue.put_nowait(msg)
self.thread_stop = True
return True
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
@ -54,11 +80,15 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
def loop(self): def loop(self):
pass pass
def _cleanup(self):
"""Add code to subclass to do any cleanup"""
def run(self): def run(self):
LOG.debug("Starting") LOG.debug("Starting")
while not self.thread_stop: while not self._should_quit():
can_loop = self.loop() can_loop = self.loop()
if not can_loop: if not can_loop:
self.stop() self.stop()
self._cleanup()
APRSDThreadList().remove(self) APRSDThreadList().remove(self)
LOG.debug("Exiting") LOG.debug("Exiting")