From d87c67acb08e177bd334b8d9a8cbfb268a1df68c Mon Sep 17 00:00:00 2001 From: KF7EEL Date: Thu, 3 Jun 2021 09:15:01 -0700 Subject: [PATCH] clean code from hblink.py --- hblink.py | 68 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/hblink.py b/hblink.py index 787deec..8126ddd 100755 --- a/hblink.py +++ b/hblink.py @@ -236,6 +236,7 @@ class HBSYSTEM(DatagramProtocol): self.maintenance_loop = self.peer_maintenance_loop self.datagramReceived = self.peer_datagramReceived self.dereg = self.peer_dereg + def check_user_man(self, _id): #Change this to a config value user_man_url = self._CONFIG['USER_MANAGER']['URL'] @@ -253,6 +254,36 @@ class HBSYSTEM(DatagramProtocol): except requests.ConnectionError: return {'allow':True} + def calc_passphrase(self, peer_id, _salt_str): + try: + if self.ums_response['mode'] == 'legacy': + _calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest()) + if self.ums_response['mode'] == 'override': + _calc_hash = bhex(sha256(_salt_str+str.encode(self.ums_response['value'])).hexdigest()) + if self.ums_response['mode'] == 'normal': + _new_peer_id = bytes_4(int(str(int_id(peer_id))[:7])) + #print(self._CONFIG['USER_MANAGER']['APPEND_INT']) + calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8))) + if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True: + calc_passphrase = calc_passphrase[-8:] + if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False: + pass + _calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest()) + #ums_down = False + #If exception, assume UMS down and default to calculated passphrase + except Exception as e: + # If UMS down, default to base 64 auth + _new_peer_id = bytes_4(int(str(int_id(peer_id))[:7])) + calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8))) + if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True: + calc_passphrase = calc_passphrase[-8:] + if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False: + pass + _calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest()) + #ums_down = True + return _calc_hash + + def startProtocol(self): # Set up periodic loop for tracking pings from peers. Run every 'PING_TIME' seconds self._system_maintenance = task.LoopingCall(self.maintenance_loop) @@ -498,39 +529,14 @@ class HBSYSTEM(DatagramProtocol): #print(self.ums_response) if self._config['USE_USER_MAN'] == True: -## print(self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE']) - try: - if self.ums_response['mode'] == 'legacy': - _calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest()) - if self.ums_response['mode'] == 'override': - _calc_hash = bhex(sha256(_salt_str+str.encode(self.ums_response['value'])).hexdigest()) - if self.ums_response['mode'] == 'normal': - _new_peer_id = bytes_4(int(str(int_id(_peer_id))[:7])) - #print(self._CONFIG['USER_MANAGER']['APPEND_INT']) - if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True: - calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8))) - #calc_passphrase = base64.b64encode((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big')) - calc_passphrase = calc_passphrase[-8:] - if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False: - calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8))) - _calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest()) - ums_down = False - except Exception as e: -## # If UMS down, default to base 64 auth - _new_peer_id = bytes_4(int(str(int_id(_peer_id))[:7])) - if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == True: - calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8))) - calc_passphrase = calc_passphrase[-8:] - if self._CONFIG['USER_MANAGER']['SHORTEN_PASSPHRASE'] == False: - calc_passphrase = base64.b64encode(bytes.fromhex(str(hex(libscrc.ccitt((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8)))))[2:].zfill(4)) + (_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big') + bytes.fromhex(str(hex(libscrc.posix((_new_peer_id) + self._CONFIG['USER_MANAGER']['APPEND_INT'].to_bytes(2, 'big'))))[2:].zfill(8))) - _calc_hash = bhex(sha256(_salt_str+calc_passphrase).hexdigest()) - ums_down = True + print(self.calc_passphrase(_peer_id, _salt_str)) + _calc_hash = self.calc_passphrase(_peer_id, _salt_str) if self._config['USE_USER_MAN'] == False: _calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest()) - -## if _sent_hash == _calc_hash or (ums_down == True and _sent_hash == _calc_hash): - - if _sent_hash == _calc_hash or (ums_down == True and _sent_hash == _calc_hash) or (ums_down == False and _sent_hash == _ocalc_hash): + # Uncomment below to only accept calculated passphrase +# if _sent_hash == _calc_hash: + # Condition below accepts either calculated passphrase or config passphrase + if _sent_hash == _calc_hash or _sent_hash == _ocalc_hash: _this_peer['CONNECTION'] = 'WAITING_CONFIG' self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s has completed the login exchange successfully', self._system, _this_peer['RADIO_ID'])