dmr_utils/dmr_utils/utils.py

198 lines
6.9 KiB
Python
Raw Normal View History

2016-12-03 16:16:21 -05:00
#!/usr/bin/env python
#
###############################################################################
# Copyright (C) 2016 Cortney T. Buffington, N0MJS <n0mjs@me.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
###############################################################################
from __future__ import print_function
import json
2016-12-03 16:16:21 -05:00
from os.path import isfile, getmtime
from time import time
from urllib import URLopener
from csv import reader as csv_reader
from binascii import b2a_hex as ahex
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Cortney T. Buffington, N0MJS'
2017-05-19 10:45:56 -04:00
__copyright__ = 'Copyright (c) 2016-2017 Cortney T. Buffington, N0MJS and the K0USY Group'
2016-12-03 16:16:21 -05:00
__credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman'
__license__ = 'GNU GPLv3'
__maintainer__ = 'Cort Buffington, N0MJS'
__email__ = 'n0mjs@me.com'
2017-05-19 10:45:56 -04:00
2016-12-03 16:16:21 -05:00
#************************************************
# STRING UTILITY FUNCTIONS
#************************************************
# Create a 2 byte hex string from an integer
def hex_str_2(_int_id):
try:
return format(_int_id,'x').rjust(4,'0').decode('hex')
except TypeError:
raise
# Create a 3 byte hex string from an integer
def hex_str_3(_int_id):
try:
return format(_int_id,'x').rjust(6,'0').decode('hex')
except TypeError:
raise
# Create a 4 byte hex string from an integer
def hex_str_4(_int_id):
try:
return format(_int_id,'x').rjust(8,'0').decode('hex')
except TypeError:
raise
# Convert a hex string to an int (radio ID, etc.)
def int_id(_hex_string):
return int(ahex(_hex_string), 16)
#************************************************
# RANDOM UTILITY FUNCTIONS
#************************************************
# Ensure all keys and values in a dictionary are ascii
def mk_ascii_dict(input):
if isinstance(input, dict):
return {mk_ascii_dict(key): mk_ascii_dict(value) for key, value in input.iteritems()}
elif isinstance(input, list):
return [mk_ascii_dict(element) for element in input]
elif isinstance(input, unicode):
return input.encode('ascii','ignore')
else:
return input
2016-12-03 16:16:21 -05:00
#************************************************
# ID ALIAS FUNCTIONS
#************************************************
# Download and build dictionaries for mapping number to aliases
# Used by applications. These lookups take time, please do not shove them
# into this file everywhere and send a pull request!!!
# Download a new file if it doesn't exist, or is older than the stale time
def try_download(_path, _file, _url, _stale,):
now = time()
url = URLopener()
file_exists = isfile(_path+_file) == True
if file_exists:
file_old = (getmtime(_path+_file) + _stale) < now
if not file_exists or (file_exists and file_old):
try:
url.retrieve(_url, _path+_file)
result = 'ID ALIAS MAPPER: \'{}\' successfully downloaded'.format(_file)
except IOError:
result = 'ID ALIAS MAPPER: \'{}\' could not be downloaded'.format(_file)
else:
result = 'ID ALIAS MAPPER: \'{}\' is current, not downloaded'.format(_file)
url.close()
return result
2017-05-19 10:45:56 -04:00
# LEGACY VERSION - MAKES A SIMPLE {INTEGER ID: 'CALLSIGN'} DICTIONARY
2016-12-03 16:16:21 -05:00
def mk_id_dict(_path, _file):
dict = {}
if _file.endswith(('.json','.JSON')):
try:
with open(_path+_file, 'rU') as _handle:
ids = json.loads(_handle.read().decode('utf-8', 'ignore'))
if 'repeaters' in ids:
ids = ids['repeaters']
id_type = 'locator'
id_value = 'callsign'
elif 'users' in ids:
ids = ids['users']
id_type = 'radio_id'
id_value = 'callsign'
elif 'tgids' in ids:
ids = ids['tgids']
id_type = 'tgid'
id_value = 'name'
else:
return dict
for row in range(len(ids)):
dict[int(ids[row][id_type])] = ids[row][id_value].encode('ascii','ignore')
_handle.close
return dict
except IOError:
return dict
elif _file.endswith(('.csv','.CSV')):
try:
with open(_path+_file, 'rU') as _handle:
ids = csv_reader(_handle, dialect='excel', delimiter=',')
for row in ids:
dict[int(row[0])] = (row[1])
_handle.close
return dict
except IOError:
2017-05-18 21:33:43 -04:00
return dict
2017-05-19 10:45:56 -04:00
# NEW VERSION - MAKES A FULL DICTIONARY OF INFORMATION BASED ON TYPE OF ALIAS FILE
# BASED ON DOWNLOADS FROM DMR-MARC AND ONLY WORKS FOR DMR-MARC STYLE JSON FILES!!!
# RESULTING DICTIONARY KEYS ARE INTEGER RADIO ID, AND VALURES ARE DICTIONARIES OF
# WHATEVER IS IN EACH ROW OF THE DMR-MARC DATABASE USED, IE.:
# 312345: {u'map': u'0', u'color_code': u'1', u'city': u'Morlon'.....u'map_info': u'', u'trustee': u'HB9HFF'}
def mk_full_id_dict(_path, _file):
2017-05-18 21:33:43 -04:00
dict = {}
try:
with open(_path+_file, 'rU') as _handle:
ids = json.loads(_handle.read().decode('utf-8', 'ignore'))
if 'repeaters' in ids:
ids = ids['repeaters']
id_type = 'locator'
elif 'users' in ids:
ids = ids['users']
id_type = 'radio_id'
else:
return dict
for row in range(len(ids)):
dict[int(ids[row][id_type])] = ids[row]
2016-12-03 16:16:21 -05:00
_handle.close
dict = mk_ascii_dict(dict)
2016-12-03 16:16:21 -05:00
return dict
except IOError:
return dict
# USE THIS TO QUERY THE ID DICTIONARIES WE MAKE WITH THE FUNCTION(S) ABOVE
2017-05-19 10:45:56 -04:00
def get_alias(_id, _dict, *args):
if type(_id) == str:
_id = int_id(_id)
if _id in _dict:
2017-05-19 10:45:56 -04:00
if args:
retValue = []
for _item in args:
try:
retValue.append(_dict[_id][_item])
except TypeError:
return _dict[_id]
return retValue
else:
return _dict[_id]
return _id
2017-05-19 10:45:56 -04:00
# FOR LEGACY PURPOSES
get_info = get_alias