Refactored threads.py

This patch creates a threads directory and separates out
the contents of threads.py into separate files in the
threads directory to make it easier to find and maintain.
This commit is contained in:
Hemna 2022-07-06 19:31:59 -04:00
parent bed060f1c5
commit 347a6d69f7
4 changed files with 166 additions and 149 deletions

13
aprsd/threads/__init__.py Normal file
View File

@ -0,0 +1,13 @@
import queue
# Make these available to anyone importing
# aprsd.threads
from .aprsd import APRSDThread, APRSDThreadList
from .keep_alive import KeepAliveThread
from .rx import APRSDRXThread
rx_msg_queue = queue.Queue(maxsize=20)
msg_queues = {
"rx": rx_msg_queue,
}

64
aprsd/threads/aprsd.py Normal file
View File

@ -0,0 +1,64 @@
import abc
import logging
import threading
LOG = logging.getLogger("APRSD")
class APRSDThreadList:
"""Singleton class that keeps track of application wide threads."""
_instance = None
threads_list = []
lock = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls.lock = threading.Lock()
cls.threads_list = []
return cls._instance
def add(self, thread_obj):
with self.lock:
self.threads_list.append(thread_obj)
def remove(self, thread_obj):
with self.lock:
self.threads_list.remove(thread_obj)
def stop_all(self):
"""Iterate over all threads and call stop on them."""
with self.lock:
for th in self.threads_list:
LOG.debug(f"Stopping Thread {th.name}")
th.stop()
def __len__(self):
with self.lock:
return len(self.threads_list)
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
def __init__(self, name):
super().__init__(name=name)
self.thread_stop = False
APRSDThreadList().add(self)
def stop(self):
self.thread_stop = True
@abc.abstractmethod
def loop(self):
pass
def run(self):
LOG.debug("Starting")
while not self.thread_stop:
can_loop = self.loop()
if not can_loop:
self.stop()
APRSDThreadList().remove(self)
LOG.debug("Exiting")

View File

@ -0,0 +1,87 @@
import datetime
import logging
import time
import tracemalloc
from aprsd import client, messaging, packets, stats, utils
from aprsd.threads import APRSDThread, APRSDThreadList
LOG = logging.getLogger("APRSD")
class KeepAliveThread(APRSDThread):
cntr = 0
checker_time = datetime.datetime.now()
def __init__(self, config):
tracemalloc.start()
super().__init__("KeepAlive")
self.config = config
max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0}
self.max_delta = datetime.timedelta(**max_timeout)
def loop(self):
if self.cntr % 60 == 0:
tracker = messaging.MsgTrack()
stats_obj = stats.APRSDStats()
pl = packets.PacketList()
thread_list = APRSDThreadList()
now = datetime.datetime.now()
last_email = stats_obj.email_thread_time
if last_email:
email_thread_time = utils.strfdelta(now - last_email)
else:
email_thread_time = "N/A"
last_msg_time = utils.strfdelta(now - stats_obj.aprsis_keepalive)
current, peak = tracemalloc.get_traced_memory()
stats_obj.set_memory(current)
stats_obj.set_memory_peak(peak)
try:
login = self.config["aprs"]["login"]
except KeyError:
login = self.config["ham"]["callsign"]
keepalive = (
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}"
).format(
login,
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
len(tracker),
stats_obj.msgs_tx,
stats_obj.msgs_rx,
last_msg_time,
email_thread_time,
utils.human_size(current),
utils.human_size(peak),
len(thread_list),
)
LOG.info(keepalive)
# See if we should reset the aprs-is client
# Due to losing a keepalive from them
delta_dict = utils.parse_delta_str(last_msg_time)
delta = datetime.timedelta(**delta_dict)
if delta > self.max_delta:
# We haven't gotten a keepalive from aprs-is in a while
# reset the connection.a
if not client.KISSClient.is_enabled(self.config):
LOG.warning("Resetting connection to APRS-IS.")
client.factory.create().reset()
# Check version every hour
delta = now - self.checker_time
if delta > datetime.timedelta(hours=1):
self.checker_time = now
level, msg = utils._check_version()
if level:
LOG.warning(msg)
self.cntr += 1
time.sleep(1)
return True

View File

@ -1,161 +1,14 @@
import abc
import datetime
import logging
import queue
import threading
import time
import tracemalloc
import aprslib
from aprsd import client, messaging, packets, plugin, stats, utils
from aprsd import client, messaging, packets, plugin, stats
from aprsd.threads import APRSDThread
LOG = logging.getLogger("APRSD")
RX_THREAD = "RX"
EMAIL_THREAD = "Email"
rx_msg_queue = queue.Queue(maxsize=20)
msg_queues = {
"rx": rx_msg_queue,
}
class APRSDThreadList:
"""Singleton class that keeps track of application wide threads."""
_instance = None
threads_list = []
lock = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls.lock = threading.Lock()
cls.threads_list = []
return cls._instance
def add(self, thread_obj):
with self.lock:
self.threads_list.append(thread_obj)
def remove(self, thread_obj):
with self.lock:
self.threads_list.remove(thread_obj)
def stop_all(self):
"""Iterate over all threads and call stop on them."""
with self.lock:
for th in self.threads_list:
LOG.debug(f"Stopping Thread {th.name}")
th.stop()
def __len__(self):
with self.lock:
return len(self.threads_list)
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
def __init__(self, name):
super().__init__(name=name)
self.thread_stop = False
APRSDThreadList().add(self)
def stop(self):
self.thread_stop = True
@abc.abstractmethod
def loop(self):
pass
def run(self):
LOG.debug("Starting")
while not self.thread_stop:
can_loop = self.loop()
if not can_loop:
self.stop()
APRSDThreadList().remove(self)
LOG.debug("Exiting")
class KeepAliveThread(APRSDThread):
cntr = 0
checker_time = datetime.datetime.now()
def __init__(self, config):
tracemalloc.start()
super().__init__("KeepAlive")
self.config = config
max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0}
self.max_delta = datetime.timedelta(**max_timeout)
def loop(self):
if self.cntr % 60 == 0:
tracker = messaging.MsgTrack()
stats_obj = stats.APRSDStats()
pl = packets.PacketList()
thread_list = APRSDThreadList()
now = datetime.datetime.now()
last_email = stats_obj.email_thread_time
if last_email:
email_thread_time = utils.strfdelta(now - last_email)
else:
email_thread_time = "N/A"
last_msg_time = utils.strfdelta(now - stats_obj.aprsis_keepalive)
current, peak = tracemalloc.get_traced_memory()
stats_obj.set_memory(current)
stats_obj.set_memory_peak(peak)
try:
login = self.config["aprs"]["login"]
except KeyError:
login = self.config["ham"]["callsign"]
keepalive = (
"{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} "
"Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}"
).format(
login,
utils.strfdelta(stats_obj.uptime),
pl.total_recv,
pl.total_tx,
len(tracker),
stats_obj.msgs_tx,
stats_obj.msgs_rx,
last_msg_time,
email_thread_time,
utils.human_size(current),
utils.human_size(peak),
len(thread_list),
)
LOG.info(keepalive)
# See if we should reset the aprs-is client
# Due to losing a keepalive from them
delta_dict = utils.parse_delta_str(last_msg_time)
delta = datetime.timedelta(**delta_dict)
if delta > self.max_delta:
# We haven't gotten a keepalive from aprs-is in a while
# reset the connection.a
if not client.KISSClient.is_enabled(self.config):
LOG.warning("Resetting connection to APRS-IS.")
client.factory.create().reset()
# Check version every hour
delta = now - self.checker_time
if delta > datetime.timedelta(hours=1):
self.checker_time = now
level, msg = utils._check_version()
if level:
LOG.warning(msg)
self.cntr += 1
time.sleep(1)
return True
class APRSDRXThread(APRSDThread):