diff --git a/bridge_master.py b/bridge_master.py index 6573696..2dc83e2 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -157,6 +157,7 @@ def make_bridges(_rules): #Make a single bridge - used for on-the-fly UA bridges def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): + BRIDGE_SEMA.acquire(blocking = True) _tgid_s = str(int_id(_tgid)) BRIDGES[_tgid_s] = [] for _system in CONFIG['SYSTEMS']: @@ -177,6 +178,8 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): if _system == 'OBP-BM': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) + BRIDGE_SEMA.release() + def make_default_reflector(reflector,_tmout,system): bridge = '#'+str(reflector) @@ -185,6 +188,7 @@ def make_default_reflector(reflector,_tmout,system): BRIDGES[bridge] = [] make_single_reflector(bytes_3(reflector),_tmout, system) bridgetemp = [] + BRIDGE_SEMA.acquire(blocking = True) for bridgesystem in BRIDGES[bridge]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == 2: bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(reflector),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) @@ -192,12 +196,15 @@ def make_default_reflector(reflector,_tmout,system): bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp + BRIDGE_SEMA.release() def make_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] + BRIDGE_SEMA.acquire(blocking = True) if str(tg) not in BRIDGES: make_single_bridge(bytes_3(tg),system,ts,_tmout) bridgetemp = [] + BRIDGE_SEMA.acquire(blocking = True) for bridgesystem in BRIDGES[str(tg)]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == ts: bridgetemp.append({'SYSTEM': system, 'TS': ts, 'TGID': bytes_3(tg),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(tg),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) @@ -206,8 +213,11 @@ def make_static_tg(tg,ts,_tmout,system): BRIDGES[str(tg)] = bridgetemp + BRIDGE_SEMA.release() + def reset_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] + BRIDGE_SEMA.acquire(blocking = True) bridgetemp = [] for bridgesystem in BRIDGES[str(tg)]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == ts: @@ -216,6 +226,8 @@ def reset_static_tg(tg,ts,_tmout,system): bridgetemp.append(bridgesystem) BRIDGES[str(tg)] = bridgetemp + + BRIDGE_SEMA.release() def reset_default_reflector(reflector,_tmout,system): bridge = '#'+str(reflector) @@ -224,17 +236,19 @@ def reset_default_reflector(reflector,_tmout,system): BRIDGES[bridge] = [] make_single_reflector(bytes_3(reflector),_tmout, system) bridgetemp = [] + BRIDGE_SEMA.acquire(blocking = True) for bridgesystem in BRIDGES[bridge]: if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == 2: bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [bytes_3(reflector),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) - + BRIDGE_SEMA.release() BRIDGES[bridge] = bridgetemp def make_single_reflector(_tgid,_tmout,_sourcesystem): _tgid_s = str(int_id(_tgid)) _bridge = '#' + _tgid_s + BRIDGE_SEMA.acquire(blocking = True) BRIDGES[_bridge] = [] for _system in CONFIG['SYSTEMS']: if _system != 'OBP-BM': @@ -246,12 +260,13 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) if _system == 'OBP-BM': BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) + BRIDGE_SEMA.release() # Run this every minute for rule timer updates def rule_timer_loop(): logger.debug('(ROUTER) routerHBP Rule timer loop started') _now = time() - + BRIDGE_SEMA.acquire(blocking = True) for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: if _system['TO_TYPE'] == 'ON': @@ -278,7 +293,7 @@ def rule_timer_loop(): logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) - + BRIDGE_SEMA.release() if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') @@ -331,6 +346,7 @@ def stream_trimmer_loop(): def sendSpeech(self,speech): sleep(1) + _nine = bytes_3(9) while True: try: pkt = next(speech) @@ -338,6 +354,20 @@ def sendSpeech(self,speech): break #Packet every 60ms sleep(0.058) + _stream_id = pkt[16:20] + _pkt_time = time() + if _stream_id not in systems[system].STATUS: + systems[system].STATUS[_stream_id] = { + 'START': _pkt_time, + 'CONTENTION':False, + 'RFS': _nine, + 'TGID': _nine, + } + _slot['TX_TGID'] = _nine + else: + systems[system].STATUS[_stream_id]['LAST'] = _pkt_time + _slot['TX_TIME'] = _pkt_time + #Call the actual packet send in the reactor thread #as it's not thread safe reactor.callFromThread(self.send_system,pkt) @@ -369,6 +399,20 @@ def disconnectedVoice(system): break #Packet every 60ms sleep(0.058) + _stream_id = pkt[16:20] + _pkt_time = time() + if _stream_id not in systems[system].STATUS: + systems[system].STATUS[_stream_id] = { + 'START': _pkt_time, + 'CONTENTION':False, + 'RFS': _nine, + 'TGID': _nine, + } + _slot['TX_TGID'] = _nine + else: + systems[system].STATUS[_stream_id]['LAST'] = _pkt_time + _slot['TX_TIME'] = _pkt_time + #Twisted is not thread safe. We need to call this in the reactor main thread reactor.callFromThread(systems[system].send_system,pkt) #systems[system].send_system(pkt) @@ -394,7 +438,7 @@ def ident(): #We only care about slot 2 - idents go out on slot 2 _slot = systems[system].STATUS[2] #If slot is idle for RX and TX - if (_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_slot['TX_TYPE'] == HBPF_SLT_VTERM): + if (_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _slot['TX_TIME'] < CONFIG['SYSTEMS'][system]['GROUP_HANGTIME']): #_stream_id = hex_str_4(1234567) logger.info('(%s) Sending voice ident',system) _say = [words['silence']] @@ -405,7 +449,8 @@ def ident(): _say.append(words['silence']) #test #_say.append(AMBEobj.readSingleFile('44xx.ambe')) - speech = pkt_gen(bytes_3(16777215), bytes_3(16777215), bytes_4(16777215), 1, _say) + _all_call = bytes_3(16777215) + speech = pkt_gen(_all_call, _all_call, bytes_4(16777215), 1, _say) sleep(1) while True: @@ -415,6 +460,21 @@ def ident(): break #Packet every 60ms sleep(0.058) + + _stream_id = pkt[16:20] + _pkt_time = time() + if _stream_id not in systems[system].STATUS: + systems[system].STATUS[_stream_id] = { + 'START': _pkt_time, + 'CONTENTION':False, + 'RFS': _all_call, + 'TGID': _all_call, + } + _slot['TX_TGID'] = _all_call + else: + systems[system].STATUS[_stream_id]['LAST'] = _pkt_time + _slot['TX_TIME'] = _pkt_time + #Twisted is not thread safe. We need to call this in the reactor main thread reactor.callFromThread(systems[system].send_system,pkt) #systems[system].send_system(pkt) @@ -452,6 +512,7 @@ def mysql_config_check(): SQLCONFIG[system][acl] = acl_build(SQLCONFIG[system][acl], ID_MAX) #Add system to bridges logger.debug('(MYSQL) adding new system to static bridges') + BRIDGE_SEMA = acquire(blocking = True) for _bridge in BRIDGES: ts1 = False ts2 = False @@ -468,6 +529,7 @@ def mysql_config_check(): else: if ts2 == False: BRIDGES[_bridge].append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) + BRIDGE_SEMA.release() if SQLCONFIG[system]['DEFAULT_REFLECTOR'] > 0: logger.debug('(MYSQL) %s setting default reflector',system) @@ -856,7 +918,8 @@ class routerHBP(HBSYSTEM): if _bridgename not in BRIDGES and not (_int_dst_id >= 4000 and _int_dst_id <= 5000): logger.info('(%s) [A] Reflector for TG %s does not exist. Creating as User Activated',self._system, _int_dst_id) make_single_reflector(_dst_id,CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'],self._system) - + + BRIDGE_SEMA.acquire(blocking = True) if _int_dst_id > 10 and _int_dst_id != 5000: for _bridge in BRIDGES: if _bridge[0:1] != '#': @@ -907,6 +970,8 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON' and _dst_id in _system['OFF']: _system['TIMER'] = pkt_time logger.info('(%s) [I] Reflector: %s has ON timer and set to "OFF": timeout timer cancelled', self._system, _bridge) + + BRIDGE_SEMA.release() if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): @@ -1165,6 +1230,7 @@ class routerHBP(HBSYSTEM): # # Iterate the rules dictionary + BRIDGE_SEMA.acquire(blocking = True) for _bridge in BRIDGES: if _bridge[0:1] == '#': continue @@ -1239,7 +1305,8 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON' and _dst_id in _system['OFF']: _system['TIMER'] = pkt_time logger.info('(%s) [12] Bridge: %s set to ON with and "OFF" timer rule: timeout timer cancelled', self._system, _bridge) - + + BRIDGE_SEMA.release() # # END IN-BAND SIGNALLING # @@ -1364,6 +1431,7 @@ if __name__ == '__main__': sys.exit('(ROUTER) TERMINATING: Routing bridges file not found or invalid: {}'.format(cli_args.RULES_FILE)) # Build the routing rules file + BRIDGE_SEMA = Semaphore(value=1) BRIDGES = make_bridges(rules_module.BRIDGES) # Default reflector