mirror of https://github.com/craigerl/aprsd.git
171 lines
4.4 KiB
Python
171 lines
4.4 KiB
Python
"""Utilities and helper functions."""
|
|
|
|
import collections
|
|
import errno
|
|
import functools
|
|
import os
|
|
import re
|
|
import threading
|
|
|
|
import update_checker
|
|
|
|
import aprsd
|
|
|
|
|
|
def synchronized(wrapped):
|
|
lock = threading.Lock()
|
|
|
|
@functools.wraps(wrapped)
|
|
def _wrap(*args, **kwargs):
|
|
with lock:
|
|
return wrapped(*args, **kwargs)
|
|
|
|
return _wrap
|
|
|
|
|
|
def env(*vars, **kwargs):
|
|
"""This returns the first environment variable set.
|
|
if none are non-empty, defaults to '' or keyword arg default
|
|
"""
|
|
for v in vars:
|
|
value = os.environ.get(v, None)
|
|
if value:
|
|
return value
|
|
return kwargs.get("default", "")
|
|
|
|
|
|
def mkdir_p(path):
|
|
"""Make directory and have it work in py2 and py3."""
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as exc: # Python >= 2.5
|
|
if exc.errno == errno.EEXIST and os.path.isdir(path):
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
|
|
def insert_str(string, str_to_insert, index):
|
|
return string[:index] + str_to_insert + string[index:]
|
|
|
|
|
|
def end_substr(original, substr):
|
|
"""Get the index of the end of the <substr>.
|
|
|
|
So you can insert a string after <substr>
|
|
"""
|
|
idx = original.find(substr)
|
|
if idx != -1:
|
|
idx += len(substr)
|
|
return idx
|
|
|
|
|
|
def rgb_from_name(name):
|
|
"""Create an rgb tuple from a string."""
|
|
hash = 0
|
|
for char in name:
|
|
hash = ord(char) + ((hash << 5) - hash)
|
|
red = hash & 255
|
|
green = (hash >> 8) & 255
|
|
blue = (hash >> 16) & 255
|
|
return red, green, blue
|
|
|
|
|
|
def human_size(bytes, units=None):
|
|
"""Returns a human readable string representation of bytes"""
|
|
if not units:
|
|
units = [" bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
|
|
return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:])
|
|
|
|
|
|
def strfdelta(tdelta, fmt="{hours:{width}}:{minutes:{width}}:{seconds:{width}}"):
|
|
d = {
|
|
"days": tdelta.days,
|
|
"width": "02",
|
|
}
|
|
if tdelta.days > 0:
|
|
fmt = "{days} days " + fmt
|
|
|
|
d["hours"], rem = divmod(tdelta.seconds, 3600)
|
|
d["minutes"], d["seconds"] = divmod(rem, 60)
|
|
return fmt.format(**d)
|
|
|
|
|
|
def _check_version():
|
|
# check for a newer version
|
|
try:
|
|
check = update_checker.UpdateChecker()
|
|
result = check.check("aprsd", aprsd.__version__)
|
|
if result:
|
|
# Looks like there is an updated version.
|
|
return 1, result
|
|
else:
|
|
return 0, "APRSD is up to date"
|
|
except Exception:
|
|
# probably can't get in touch with pypi for some reason
|
|
# Lets put up an error and move on. We might not
|
|
# have internet in this aprsd deployment.
|
|
return 1, "Couldn't check for new version of APRSD"
|
|
|
|
|
|
def flatten_dict(d, parent_key="", sep="."):
|
|
"""Flatten a dict to key.key.key = value."""
|
|
items = []
|
|
for k, v in d.items():
|
|
new_key = parent_key + sep + k if parent_key else k
|
|
if isinstance(v, collections.MutableMapping):
|
|
items.extend(flatten_dict(v, new_key, sep=sep).items())
|
|
else:
|
|
items.append((new_key, v))
|
|
return dict(items)
|
|
|
|
|
|
def parse_delta_str(s):
|
|
if "day" in s:
|
|
m = re.match(
|
|
r"(?P<days>[-\d]+) day[s]*, (?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)",
|
|
s,
|
|
)
|
|
else:
|
|
m = re.match(r"(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)", s)
|
|
return {key: float(val) for key, val in m.groupdict().items()}
|
|
|
|
|
|
class RingBuffer:
|
|
"""class that implements a not-yet-full buffer"""
|
|
|
|
def __init__(self, size_max):
|
|
self.max = size_max
|
|
self.data = []
|
|
|
|
class __Full:
|
|
"""class that implements a full buffer"""
|
|
|
|
def append(self, x):
|
|
"""Append an element overwriting the oldest one."""
|
|
self.data[self.cur] = x
|
|
self.cur = (self.cur + 1) % self.max
|
|
|
|
def get(self):
|
|
"""return list of elements in correct order"""
|
|
return self.data[self.cur :] + self.data[: self.cur]
|
|
|
|
def __len__(self):
|
|
return len(self.data)
|
|
|
|
def append(self, x):
|
|
"""append an element at the end of the buffer"""
|
|
|
|
self.data.append(x)
|
|
if len(self.data) == self.max:
|
|
self.cur = 0
|
|
# Permanently change self's class from non-full to full
|
|
self.__class__ = self.__Full
|
|
|
|
def get(self):
|
|
"""Return a list of elements from the oldest to the newest."""
|
|
return self.data
|
|
|
|
def __len__(self):
|
|
return len(self.data)
|