clean code from hblink.py

This commit is contained in:
KF7EEL 2021-06-03 09:15:01 -07:00
parent 06cc28fe82
commit d87c67acb0

View File

@ -236,6 +236,7 @@ class HBSYSTEM(DatagramProtocol):
self.maintenance_loop = self.peer_maintenance_loop
self.datagramReceived = self.peer_datagramReceived
self.dereg = self.peer_dereg
def check_user_man(self, _id):
#Change this to a config value
user_man_url = self._CONFIG['USER_MANAGER']['URL']
@ -253,6 +254,36 @@ class HBSYSTEM(DatagramProtocol):
except requests.ConnectionError:
return {'allow':True}
def calc_passphrase(self, peer_id, _salt_str):
try:
if self.ums_response['mode'] == 'legacy':
_calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest())
if self.ums_response['mode'] == 'override':
_calc_hash = bhex(sha256(_salt_str+str.encode(self.ums_response['value'])).hexdigest())
if self.ums_response['mode'] == 'normal':
_new_peer_id = bytes_4(int(str(int_id(peer_id))[:7]))
#print(self._CONFIG['USER_MANAGER']['APPEND_INT'])
calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True:
calc_passphrase = calc_passphrase[-8:]
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False:
pass
_calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest())
#ums_down = False
#If exception, assume UMS down and default to calculated passphrase
except Exception as e:
# If UMS down, default to base 64 auth
_new_peer_id = bytes_4(int(str(int_id(peer_id))[:7]))
calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True:
calc_passphrase = calc_passphrase[-8:]
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False:
pass
_calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest())
#ums_down = True
return _calc_hash
def startProtocol(self):
# Set up periodic loop for tracking pings from peers. Run every 'PING_TIME' seconds
self._system_maintenance = task.LoopingCall(self.maintenance_loop)
@ -498,39 +529,14 @@ class HBSYSTEM(DatagramProtocol):
#print(self.ums_response)
if self._config['USE_USER_MAN'] == True:
## print(self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'])
try:
if self.ums_response['mode'] == 'legacy':
_calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest())
if self.ums_response['mode'] == 'override':
_calc_hash = bhex(sha256(_salt_str+str.encode(self.ums_response['value'])).hexdigest())
if self.ums_response['mode'] == 'normal':
_new_peer_id = bytes_4(int(str(int_id(_peer_id))[:7]))
#print(self._CONFIG['USER_MANAGER']['APPEND_INT'])
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True:
calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))
#calc_passphrase = base64.b64encode((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))
calc_passphrase = calc_passphrase[-8:]
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False:
calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))
_calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest())
ums_down = False
except Exception as e:
## # If UMS down, default to base 64 auth
_new_peer_id = bytes_4(int(str(int_id(_peer_id))[:7]))
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True:
calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))
calc_passphrase = calc_passphrase[-8:]
if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False:
calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))
_calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest())
ums_down = True
print(self.calc_passphrase(_peer_id, _salt_str))
_calc_hash = self.calc_passphrase(_peer_id, _salt_str)
if self._config['USE_USER_MAN'] == False:
_calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest())
## if _sent_hash == _calc_hash or (ums_down == True and _sent_hash == _calc_hash):
if _sent_hash == _calc_hash or (ums_down == True and _sent_hash == _calc_hash) or (ums_down == False and _sent_hash == _ocalc_hash):
# Uncomment below to only accept calculated passphrase
# if _sent_hash == _calc_hash:
# Condition below accepts either calculated passphrase or config passphrase
if _sent_hash == _calc_hash or _sent_hash == _ocalc_hash:
_this_peer['CONNECTION'] = 'WAITING_CONFIG'
self.send_peer(_peer_id, b''.join([RPTACK, _peer_id]))
logger.info('(%s) Peer %s has completed the login exchange successfully', self._system, _this_peer['RADIO_ID'])