mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-21 23:55:17 -05:00
Update aprsd thread base class to use queue
This patch updates the main aprsd threads class to use a shared queue to notify all aprsd thread classes they need to exit. This ensures any closing down of sockets, etc happens from inside the context of the thread itself, not the MainThread that calls stop.
This commit is contained in:
parent
823e83a2a6
commit
41af5ed21f
@ -1,7 +1,10 @@
|
|||||||
import abc
|
import abc
|
||||||
import logging
|
import logging
|
||||||
|
from queue import Queue
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
import wrapt
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger("APRSD")
|
LOG = logging.getLogger("APRSD")
|
||||||
|
|
||||||
@ -12,41 +15,64 @@ class APRSDThreadList:
|
|||||||
_instance = None
|
_instance = None
|
||||||
|
|
||||||
threads_list = []
|
threads_list = []
|
||||||
lock = None
|
lock = threading.Lock()
|
||||||
|
global_queue = Queue()
|
||||||
|
|
||||||
def __new__(cls, *args, **kwargs):
|
def __new__(cls, *args, **kwargs):
|
||||||
if cls._instance is None:
|
if cls._instance is None:
|
||||||
cls._instance = super().__new__(cls)
|
cls._instance = super().__new__(cls)
|
||||||
cls.lock = threading.Lock()
|
|
||||||
cls.threads_list = []
|
cls.threads_list = []
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
|
@wrapt.synchronized(lock)
|
||||||
def add(self, thread_obj):
|
def add(self, thread_obj):
|
||||||
with self.lock:
|
thread_obj.set_global_queue(self.global_queue)
|
||||||
self.threads_list.append(thread_obj)
|
self.threads_list.append(thread_obj)
|
||||||
|
|
||||||
|
@wrapt.synchronized(lock)
|
||||||
def remove(self, thread_obj):
|
def remove(self, thread_obj):
|
||||||
with self.lock:
|
self.threads_list.remove(thread_obj)
|
||||||
self.threads_list.remove(thread_obj)
|
|
||||||
|
|
||||||
|
@wrapt.synchronized(lock)
|
||||||
def stop_all(self):
|
def stop_all(self):
|
||||||
|
self.global_queue.put_nowait({"quit": True})
|
||||||
"""Iterate over all threads and call stop on them."""
|
"""Iterate over all threads and call stop on them."""
|
||||||
with self.lock:
|
for th in self.threads_list:
|
||||||
for th in self.threads_list:
|
LOG.info(f"Stopping Thread {th.name}")
|
||||||
LOG.debug(f"Stopping Thread {th.name}")
|
th.stop()
|
||||||
th.stop()
|
|
||||||
|
|
||||||
|
@wrapt.synchronized(lock)
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
with self.lock:
|
return len(self.threads_list)
|
||||||
return len(self.threads_list)
|
|
||||||
|
|
||||||
|
|
||||||
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
||||||
|
|
||||||
|
global_queue = None
|
||||||
|
|
||||||
def __init__(self, name):
|
def __init__(self, name):
|
||||||
super().__init__(name=name)
|
super().__init__(name=name)
|
||||||
self.thread_stop = False
|
self.thread_stop = False
|
||||||
APRSDThreadList().add(self)
|
APRSDThreadList().add(self)
|
||||||
|
|
||||||
|
def set_global_queue(self, global_queue):
|
||||||
|
self.global_queue = global_queue
|
||||||
|
|
||||||
|
def _should_quit(self):
|
||||||
|
""" see if we have a quit message from the global queue."""
|
||||||
|
if self.thread_stop:
|
||||||
|
return True
|
||||||
|
if self.global_queue.empty():
|
||||||
|
return False
|
||||||
|
msg = self.global_queue.get(timeout=1)
|
||||||
|
if not msg:
|
||||||
|
return False
|
||||||
|
if "quit" in msg and msg["quit"] is True:
|
||||||
|
# put the message back on the queue for others
|
||||||
|
self.global_queue.put_nowait(msg)
|
||||||
|
self.thread_stop = True
|
||||||
|
return True
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.thread_stop = True
|
self.thread_stop = True
|
||||||
|
|
||||||
@ -54,11 +80,15 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
|||||||
def loop(self):
|
def loop(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _cleanup(self):
|
||||||
|
"""Add code to subclass to do any cleanup"""
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.debug("Starting")
|
LOG.debug("Starting")
|
||||||
while not self.thread_stop:
|
while not self._should_quit():
|
||||||
can_loop = self.loop()
|
can_loop = self.loop()
|
||||||
if not can_loop:
|
if not can_loop:
|
||||||
self.stop()
|
self.stop()
|
||||||
|
self._cleanup()
|
||||||
APRSDThreadList().remove(self)
|
APRSDThreadList().remove(self)
|
||||||
LOG.debug("Exiting")
|
LOG.debug("Exiting")
|
||||||
|
Loading…
Reference in New Issue
Block a user