diff --git a/bridge_master.py b/bridge_master.py index 7041cba..070f07f 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -43,8 +43,8 @@ from twisted.internet.protocol import Factory, Protocol from twisted.protocols.basic import NetstringReceiver from twisted.internet import reactor, task #We're going to *try* and be thread safe -from twisted.python import threadable -threadable.init(1) +#from twisted.python import threadable +#threadable.init(1) from threading import Semaphore @@ -160,7 +160,6 @@ def make_bridges(_rules): #Make a single bridge - used for on-the-fly UA bridges def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): - BRIDGE_SEMA.acquire(blocking = True) _tgid_s = str(int_id(_tgid)) BRIDGES[_tgid_s] = [] for _system in CONFIG['SYSTEMS']: @@ -181,11 +180,8 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): if _system[0:3] == 'OBP': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) - BRIDGE_SEMA.release() - #Make static bridge - used for on-the-fly relay bridges def make_stat_bridge(_tgid): - BRIDGE_SEMA.acquire(blocking = True) _tgid_s = str(int_id(_tgid)) BRIDGES[_tgid_s] = [] for _system in CONFIG['SYSTEMS']: @@ -198,8 +194,6 @@ def make_stat_bridge(_tgid): if _system[0:3] == 'OBP': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'STAT','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) - BRIDGE_SEMA.release() - def make_default_reflector(reflector,_tmout,system): bridge = '#'+str(reflector) @@ -208,7 +202,6 @@ def make_default_reflector(reflector,_tmout,system): BRIDGES[bridge] = [] make_single_reflector(bytes_3(reflector),_tmout, system) bridgetemp = [] - BRIDGE_SEMA.acquire(blocking = True) for bridgesystem in BRIDGES[bridge]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == 2: bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(reflector),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) @@ -216,14 +209,12 @@ def make_default_reflector(reflector,_tmout,system): bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp - BRIDGE_SEMA.release() def make_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] if str(tg) not in BRIDGES: make_single_bridge(bytes_3(tg),system,ts,_tmout) bridgetemp = [] - BRIDGE_SEMA.acquire(blocking = True) for bridgesystem in BRIDGES[str(tg)]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == ts: bridgetemp.append({'SYSTEM': system, 'TS': ts, 'TGID': bytes_3(tg),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(tg),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) @@ -232,11 +223,8 @@ def make_static_tg(tg,ts,_tmout,system): BRIDGES[str(tg)] = bridgetemp - BRIDGE_SEMA.release() - def reset_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] - BRIDGE_SEMA.acquire(blocking = True) bridgetemp = [] for bridgesystem in BRIDGES[str(tg)]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == ts: @@ -245,8 +233,6 @@ def reset_static_tg(tg,ts,_tmout,system): bridgetemp.append(bridgesystem) BRIDGES[str(tg)] = bridgetemp - - BRIDGE_SEMA.release() def reset_default_reflector(reflector,_tmout,system): bridge = '#'+str(reflector) @@ -255,19 +241,16 @@ def reset_default_reflector(reflector,_tmout,system): BRIDGES[bridge] = [] make_single_reflector(bytes_3(reflector),_tmout, system) bridgetemp = [] - BRIDGE_SEMA.acquire(blocking = True) for bridgesystem in BRIDGES[bridge]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == 2: bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [bytes_3(reflector),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp - BRIDGE_SEMA.release() def make_single_reflector(_tgid,_tmout,_sourcesystem): _tgid_s = str(int_id(_tgid)) _bridge = '#' + _tgid_s - BRIDGE_SEMA.acquire(blocking = True) BRIDGES[_bridge] = [] for _system in CONFIG['SYSTEMS']: if _system[0:3] != 'OBP': @@ -279,10 +262,8 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) if _system[0:3] == 'OBP': BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) - BRIDGE_SEMA.release() def remove_bridge_system(system): - BRIDGE_SEMA.acquire(blocking = True) _bridgestemp = {} _bridgetemp = {} for _bridge in BRIDGES: @@ -292,14 +273,12 @@ def remove_bridge_system(system): _bridgestemp[_bridge] = [] _bridgestemp[_bridge].append(_bridgesystem) BRIDGES.update(_bridgestemp) - BRIDGE_SEMA.release() # Run this every minute for rule timer updates def rule_timer_loop(): logger.debug('(ROUTER) routerHBP Rule timer loop started') _now = time() - BRIDGE_SEMA.acquire(blocking = True) _remove_bridges = [] for _bridge in BRIDGES: _bridge_used = False @@ -345,13 +324,11 @@ def rule_timer_loop(): del BRIDGES[_bridgerem] logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem) - BRIDGE_SEMA.release() if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') def statTrimmer(): logger.debug('(ROUTER) STAT trimmer loop started') - BRIDGE_SEMA.acquire(blocking = True) _remove_bridges = [] for _bridge in BRIDGES: _bridge_stat = False @@ -368,7 +345,6 @@ def statTrimmer(): for _bridgerem in _remove_bridges: del BRIDGES[_bridgerem] logger.debug('(ROUTER) STAT bridge %s removed',_bridgerem) - BRIDGE_SEMA.release() if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') @@ -520,7 +496,7 @@ def threadedMysql(): if not mysql_sema.acquire(blocking = False): logger.debug('(MYSQL) Previous thread is still running (can\'t acquire semaphore). Try next iteration') return - reactor.callInThread(mysql_config_check) + reactor.callInThread(mysqlGetConfig) mysql_sema.release() def ident(): @@ -675,17 +651,13 @@ def options_config(): tg = int(tg) make_static_tg(tg,2,_tmout,_system) - # systems[_system]._peer_sema.acquire(blocking=True) - # _peerstmp = CONFIG['SYSTEMS'][_system]['PEERS'] CONFIG['SYSTEMS'][_system]['TS1_STATIC'] = _options['TS1_STATIC'] CONFIG['SYSTEMS'][_system]['TS2_STATIC'] = _options['TS2_STATIC'] CONFIG['SYSTEMS'][_system]['DEFAULT_REFLECTOR'] = int(_options['DEFAULT_REFLECTOR']) CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] = int(_options['DEFAULT_UA_TIMER']) - -def mysql_config_check(): +def mysqlGetConfig(): logger.debug('(MYSQL) Periodic config check') - SQLCONFIG = {} SQLGETCONFIG = {} if sql.con(): logger.debug('(MYSQL) reading config from database') @@ -698,8 +670,12 @@ def mysql_config_check(): logger.debug('(MYSQL) problem connecting to SQL server, aborting') return - SQLCONFIG = SQLGETCONFIG + reactor.callFromThread(mysql_config_check,SQLGETCONFIG) + +def mysql_config_check(SQLGETCONFIG): + + SQLCONFIG = SQLGETCONFIG for system in SQLGETCONFIG: if system not in CONFIG['SYSTEMS']: if SQLCONFIG[system]['ENABLED']: @@ -721,7 +697,6 @@ def mysql_config_check(): #Add system to bridges if SQLCONFIG[system]['ENABLED']: logger.debug('(MYSQL) adding new system to static bridges') - BRIDGE_SEMA.acquire(blocking = True) for _bridge in BRIDGES: ts1 = False ts2 = False @@ -738,7 +713,6 @@ def mysql_config_check(): else: if ts2 == False: BRIDGES[_bridge].append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) - BRIDGE_SEMA.release() if SQLCONFIG[system]['DEFAULT_REFLECTOR'] > 0: logger.debug('(MYSQL) %s setting default reflector',system) @@ -791,7 +765,6 @@ def mysql_config_check(): listeningPorts[system] = reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) logger.debug('(MYSQL) adding new system to static bridges') - BRIDGE_SEMA.acquire(blocking = True) _tmout = SQLCONFIG[system]['DEFAULT_UA_TIMER'] for _bridge in BRIDGES: ts1 = False @@ -809,7 +782,7 @@ def mysql_config_check(): else: if ts2 == False: BRIDGES[_bridge].append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) - BRIDGE_SEMA.release() + if SQLCONFIG[system]['DEFAULT_REFLECTOR'] > 0: if 'OPTIONS' not in SQLCONFIG[system]: @@ -947,10 +920,8 @@ def mysql_config_check(): #Preserve peers list if system in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS'][system]['ENABLED'] and 'PEERS' in CONFIG['SYSTEMS'][system] : - systems[system]._peer_sema.acquire(blocking=True) SQLCONFIG[system]['PEERS'] = CONFIG['SYSTEMS'][system]['PEERS'] CONFIG['SYSTEMS'][system].update(SQLCONFIG[system]) - systems[system]._peer_sema.release() else: CONFIG['SYSTEMS'][system].update(SQLCONFIG[system]) @@ -1436,7 +1407,6 @@ class routerHBP(HBSYSTEM): logger.info('(%s) [A] Reflector for TG %s does not exist. Creating as User Activated. Timeout: %s',self._system, _int_dst_id,CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) make_single_reflector(_dst_id,CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER'],self._system) - BRIDGE_SEMA.acquire(blocking = True) if _int_dst_id > 5 and _int_dst_id != 9 and _int_dst_id != 5000: for _bridge in BRIDGES: if _bridge[0:1] != '#': @@ -1487,8 +1457,7 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON' and _dst_id in _system['OFF']: _system['TIMER'] = pkt_time logger.info('(%s) [I] Reflector: %s has ON timer and set to "OFF": timeout timer cancelled', self._system, _bridge) - - BRIDGE_SEMA.release() + if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): @@ -1623,7 +1592,6 @@ class routerHBP(HBSYSTEM): # # Iterate the rules dictionary - BRIDGE_SEMA.acquire(blocking = True) for _bridge in BRIDGES: if (_bridge[0:1] == '#') and (_int_dst_id != 9): continue @@ -1698,8 +1666,7 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON' and _dst_id in _system['OFF']: _system['TIMER'] = pkt_time logger.info('(%s) [12] Bridge: %s set to ON with and "OFF" timer rule: timeout timer cancelled', self._system, _bridge) - - BRIDGE_SEMA.release() + # # END IN-BAND SIGNALLING # @@ -1824,7 +1791,6 @@ if __name__ == '__main__': sys.exit('(ROUTER) TERMINATING: Routing bridges file not found or invalid: {}'.format(cli_args.RULES_FILE)) # Build the routing rules file - BRIDGE_SEMA = Semaphore(value=1) BRIDGES = make_bridges(rules_module.BRIDGES) # Default reflector @@ -1932,6 +1898,6 @@ if __name__ == '__main__': stat_trimmer.addErrback(loopingErrHandle) #more threads - reactor.suggestThreadPoolSize(50) + reactor.suggestThreadPoolSize(500) reactor.run() diff --git a/hblink.py b/hblink.py index 6b553ce..aba51cf 100755 --- a/hblink.py +++ b/hblink.py @@ -209,7 +209,6 @@ class HBSYSTEM(DatagramProtocol): self._config = self._CONFIG['SYSTEMS'][self._system] self._laststrid = {1: b'', 2: b''} - self._peer_sema = Semaphore(value=1) # Define shortcuts and generic function names based on the type of system we are if self._config['MODE'] == 'MASTER': @@ -247,12 +246,10 @@ class HBSYSTEM(DatagramProtocol): # Check to see if any of the peers have been quiet (no ping) longer than allowed if _this_peer['LAST_PING']+(self._CONFIG['GLOBAL']['PING_TIME']*self._CONFIG['GLOBAL']['MAX_MISSED']) < time(): remove_list.append(peer) - self._peer_sema.acquire(blocking=True) for peer in remove_list: logger.info('(%s) Peer %s (%s) has timed out and is being removed', self._system, self._peers[peer]['CALLSIGN'], self._peers[peer]['RADIO_ID']) # Remove any timed out peers from the configuration del self._CONFIG['SYSTEMS'][self._system]['PEERS'][peer] - self._peer_sema.release() if 'PEERS' not in self._CONFIG['SYSTEMS'][self._system] and 'OPTIONS' in self._CONFIG['SYSTEMS'][self._system]: logger.info('(%s) Deleting HBP Options',self._system) del self._CONFIG['SYSTEMS'][self._system]['OPTIONS'] @@ -415,7 +412,6 @@ class HBSYSTEM(DatagramProtocol): # Check for valid Radio ID if acl_check(_peer_id, self._CONFIG['GLOBAL']['REG_ACL']) and acl_check(_peer_id, self._config['REG_ACL']): # Build the configuration data strcuture for the peer - self._peer_sema.acquire(blocking=True) self._peers.update({_peer_id: { 'CONNECTION': 'RPTL-RECEIVED', 'CONNECTED': time(), @@ -441,7 +437,6 @@ class HBSYSTEM(DatagramProtocol): 'SOFTWARE_ID': '', 'PACKAGE_ID': '', }}) - self._peer_sema.release() logger.info('(%s) Repeater Logging in with Radio ID: %s, %s:%s', self._system, int_id(_peer_id), _sockaddr[0], _sockaddr[1]) _salt_str = bytes_4(self._peers[_peer_id]['SALT']) self.send_peer(_peer_id, b''.join([RPTACK, _salt_str])) @@ -459,10 +454,8 @@ class HBSYSTEM(DatagramProtocol): if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == 'CHALLENGE_SENT' \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: - self._peer_sema.acquire(blocking=True) _this_peer = self._peers[_peer_id] _this_peer['LAST_PING'] = time() - self._peer_sema.release() _sent_hash = _data[8:] _salt_str = bytes_4(_this_peer['SALT']) if self._CONFIG['GLOBAL']['ALLOW_NULL_PASSPHRASE'] and len(self._config['PASSPHRASE']) == 0: @@ -478,9 +471,7 @@ class HBSYSTEM(DatagramProtocol): else: logger.info('(%s) Peer %s has FAILED the login exchange successfully', self._system, _this_peer['RADIO_ID']) self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) - self._peer_sema.acquire(blocking=True) del self._peers[_peer_id] - self._peer_sema.release() else: self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) @@ -493,9 +484,7 @@ class HBSYSTEM(DatagramProtocol): and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: logger.info('(%s) Peer is closing down: %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) - self._peer_sema.acquire(blocking=True) del self._peers[_peer_id] - self._peer_sema.release() if 'OPTIONS' in self._CONFIG['SYSTEMS'][self._system]: logger.info('(%s) Deleting HBP Options',self._system) del self._CONFIG['SYSTEMS'][self._system]['OPTIONS'] @@ -505,7 +494,6 @@ class HBSYSTEM(DatagramProtocol): if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == 'WAITING_CONFIG' \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: - self._peer_sema.acquire(blocking=True) _this_peer = self._peers[_peer_id] _this_peer['CONNECTION'] = 'YES' _this_peer['CONNECTED'] = time() @@ -524,7 +512,6 @@ class HBSYSTEM(DatagramProtocol): _this_peer['URL'] = _data[98:222] _this_peer['SOFTWARE_ID'] = _data[222:262] _this_peer['PACKAGE_ID'] = _data[262:302] - self._peer_sema.release() self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s (%s) has sent repeater configuration', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID']) @@ -536,9 +523,7 @@ class HBSYSTEM(DatagramProtocol): _peer_id = _data[4:8] if _peer_id in self._peers and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: _this_peer = self._peers[_peer_id] - self._peer_sema.acquire(blocking=True) _this_peer['OPTIONS'] = _data[8:] - self._peer_sema.release() self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s has sent options %s', self._system, _this_peer['CALLSIGN'], _this_peer['OPTIONS']) self._CONFIG['SYSTEMS'][self._system]['OPTIONS'] = _this_peer['OPTIONS'].decode() @@ -548,10 +533,8 @@ class HBSYSTEM(DatagramProtocol): if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == "YES" \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: - self._peer_sema.acquire(blocking=True) self._peers[_peer_id]['PINGS_RECEIVED'] += 1 self._peers[_peer_id]['LAST_PING'] = time() - self._peer_sema.release() self.send_peer(_peer_id, b''.join([MSTPONG, _peer_id])) #logger.debug('(%s) Received and answered RPTPING from peer %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) else: