diff --git a/bridge_master.py b/bridge_master.py index 4a50544..03f53a9 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -897,11 +897,14 @@ def mysql_config_check(): logger.debug('(MYSQL) TG2 ACL changed') #Preserve peers list - if system in CONFIG['SYSTEMS']: + if system in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS'][system]['ENABLED']: + systems[system]._peer_sema.acquire(blocking=True) SQLCONFIG[system]['PEERS'] = CONFIG['SYSTEMS'][system]['PEERS'] - + systems[system]._peer_sema.release() + CONFIG['SYSTEMS'][system].update(SQLCONFIG[system]) + #Add MySQL config data to config dict - CONFIG['SYSTEMS'].update(SQLCONFIG) + #CONFIG['SYSTEMS'].update(SQLCONFIG) SQLCONFIG = {} sql.close() diff --git a/hblink.py b/hblink.py index e7c31e4..6c2ff20 100755 --- a/hblink.py +++ b/hblink.py @@ -35,6 +35,7 @@ from hashlib import sha256, sha1 from hmac import new as hmac_new, compare_digest from time import time from collections import deque +from threading import Semaphore # Twisted is pretty important, so I keep it separate from twisted.internet.protocol import DatagramProtocol, Factory, Protocol @@ -207,6 +208,8 @@ class HBSYSTEM(DatagramProtocol): self._report = _report self._config = self._CONFIG['SYSTEMS'][self._system] self._laststrid = {1: b'', 2: b''} + + self._peer_sema = Semaphore(value=1) # Define shortcuts and generic function names based on the type of system we are if self._config['MODE'] == 'MASTER': @@ -244,10 +247,12 @@ class HBSYSTEM(DatagramProtocol): # Check to see if any of the peers have been quiet (no ping) longer than allowed if _this_peer['LAST_PING']+(self._CONFIG['GLOBAL']['PING_TIME']*self._CONFIG['GLOBAL']['MAX_MISSED']) < time(): remove_list.append(peer) + self._peer_sema.acquire(blocking=True) for peer in remove_list: logger.info('(%s) Peer %s (%s) has timed out and is being removed', self._system, self._peers[peer]['CALLSIGN'], self._peers[peer]['RADIO_ID']) # Remove any timed out peers from the configuration del self._CONFIG['SYSTEMS'][self._system]['PEERS'][peer] + self._peer_sema.release() if not self._peers and self._CONFIG['OPTIONS']: logger.info('(%s) Deleting HBP Options',self._system) del self._CONFIG['OPTIONS'] @@ -410,6 +415,7 @@ class HBSYSTEM(DatagramProtocol): # Check for valid Radio ID if acl_check(_peer_id, self._CONFIG['GLOBAL']['REG_ACL']) and acl_check(_peer_id, self._config['REG_ACL']): # Build the configuration data strcuture for the peer + self._peer_sema.acquire(blocking=True) self._peers.update({_peer_id: { 'CONNECTION': 'RPTL-RECEIVED', 'CONNECTED': time(), @@ -435,6 +441,7 @@ class HBSYSTEM(DatagramProtocol): 'SOFTWARE_ID': '', 'PACKAGE_ID': '', }}) + self._peer_sema.release() logger.info('(%s) Repeater Logging in with Radio ID: %s, %s:%s', self._system, int_id(_peer_id), _sockaddr[0], _sockaddr[1]) _salt_str = bytes_4(self._peers[_peer_id]['SALT']) self.send_peer(_peer_id, b''.join([RPTACK, _salt_str])) @@ -452,8 +459,10 @@ class HBSYSTEM(DatagramProtocol): if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == 'CHALLENGE_SENT' \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: + self._peer_sema.acquire(blocking=True) _this_peer = self._peers[_peer_id] _this_peer['LAST_PING'] = time() + self._peer_sema.release() _sent_hash = _data[8:] _salt_str = bytes_4(_this_peer['SALT']) _calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest()) @@ -464,7 +473,9 @@ class HBSYSTEM(DatagramProtocol): else: logger.info('(%s) Peer %s has FAILED the login exchange successfully', self._system, _this_peer['RADIO_ID']) self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self._peer_sema.acquire(blocking=True) del self._peers[_peer_id] + self._peer_sema.release() else: self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) @@ -477,13 +488,16 @@ class HBSYSTEM(DatagramProtocol): and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: logger.info('(%s) Peer is closing down: %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self._peer_sema.acquire(blocking=True) del self._peers[_peer_id] + self._peer_sema.release() else: _peer_id = _data[4:8] # Configure Command if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == 'WAITING_CONFIG' \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: + self._peer_sema.acquire(blocking=True) _this_peer = self._peers[_peer_id] _this_peer['CONNECTION'] = 'YES' _this_peer['CONNECTED'] = time() @@ -502,6 +516,7 @@ class HBSYSTEM(DatagramProtocol): _this_peer['URL'] = _data[98:222] _this_peer['SOFTWARE_ID'] = _data[222:262] _this_peer['PACKAGE_ID'] = _data[262:302] + self._peer_sema.release() self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s (%s) has sent repeater configuration', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID']) @@ -513,7 +528,9 @@ class HBSYSTEM(DatagramProtocol): _peer_id = _data[4:8] if _peer_id in self._peers and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: _this_peer = self._peers[_peer_id] + self._peer_sema.aquire(blocking-True) _this_peer['OPTIONS'] = _data[8:] + self._peer_sema.release() self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s has sent options %s', self._system, _this_peer['CALLSIGN'], _this_peer['OPTIONS']) self._CONFIG['SYSTEMS'][self._system]['OPTIONS'] = _this_peer['OPTIONS'].decode() @@ -523,8 +540,10 @@ class HBSYSTEM(DatagramProtocol): if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == "YES" \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: + self._peer_sema.acquire(blocking=True) self._peers[_peer_id]['PINGS_RECEIVED'] += 1 self._peers[_peer_id]['LAST_PING'] = time() + self._peer_sema.release() self.send_peer(_peer_id, b''.join([MSTPONG, _peer_id])) #logger.debug('(%s) Received and answered RPTPING from peer %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) else: