From b639f830578d32aa1845a0b9797e14e5eb508ff3 Mon Sep 17 00:00:00 2001 From: n0mjs710 Date: Mon, 12 Nov 2018 16:39:33 -0600 Subject: [PATCH] OpenBridge Improvements, bug fixes, logging --- hb_confbridge.py | 99 +++++++++++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 44 deletions(-) diff --git a/hb_confbridge.py b/hb_confbridge.py index 1fffb59..ef0007e 100755 --- a/hb_confbridge.py +++ b/hb_confbridge.py @@ -211,27 +211,31 @@ def stream_trimmer_loop(): _now = time() for system in systems: - remove_list = [] - if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': - for stream_id in systems[system].STATUS: - if systems[system].STATUS[stream_id]['LAST'] < _now - 5: - remove_list.append(stream_id) - for stream_id in remove_list: - _system = systems[system].STATUS[stream_id] - _config = CONFIG['SYSTEMS'][system] - logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \ - system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START']) - # self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration)) - removed = systems[system].STATUS.pop(stream_id) - logger.debug('Inactive OpenBridge Stream ID removed from System: %s, Stream ID %s', system, int_id(stream_id)) - for system in systems: + # HBP systems, master and peer if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': for slot in range(1,3): _slot = systems[system].STATUS[slot] if _slot['RX_TYPE'] != hb_const.HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: _slot['RX_TYPE'] = hb_const.HBPF_SLT_VTERM - logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s (%s) TGID %s (%s), TS %s, Duration: %s', \ - system, int_id(_slot['RX_STREAM_ID']), get_alias(_slot['RX_RFS'], subscriber_ids), int_id(_slot['RX_RFS']), get_alias(_slot['RX_TGID'], talkgroup_ids), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) + logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %s', \ + system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) + # OBP systems + # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': + remove_list = [] + for stream_id in systems[system].STATUS: + if systems[system].STATUS[stream_id]['LAST'] < _now - 5: + remove_list.append(stream_id) + for stream_id in remove_list: + if stream_id in systems[system].STATUS: + _system = systems[system].STATUS[stream_id] + _config = CONFIG['SYSTEMS'][system] + logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \ + system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START']) + # self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration)) + removed = systems[system].STATUS.pop(stream_id) + else: + logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) class routerOBP(OPENBRIDGE): @@ -374,11 +378,11 @@ class routerOBP(OPENBRIDGE): self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - # Set values for the contention handler to test next time there is a frame to forward - _target_status[_target['TS']]['TX_TIME'] = pkt_time - - if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): + # Is this a new call stream? + if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): #(_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): + #if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): # Record the DST TGID and Stream ID + _target_status[_target['TS']]['TX_START'] = pkt_time _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id _target_status[_target['TS']]['TX_RFS'] = _rf_src @@ -387,8 +391,11 @@ class routerOBP(OPENBRIDGE): _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) - self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - self._logger.info('(%s) Conference Bridge: %s, Call Bridged HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + + # Set other values for the contention handler to test next time there is a frame to forward + _target_status[_target['TS']]['TX_TIME'] = pkt_time # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: @@ -444,8 +451,6 @@ class routerHBP(HBSYSTEM): # In TX_EMB_LC, 2-5 are burst B-E self.STATUS = { 1: { - 'RX_ACTIVE': False, - 'TX_ACTIVE': False, 'RX_START': time(), 'TX_START': time(), 'RX_SEQ': '\x00', @@ -469,8 +474,6 @@ class routerHBP(HBSYSTEM): } }, 2: { - 'RX_ACTIVE': False, - 'TX_ACTIVE': False, 'RX_START': time(), 'TX_START': time(), 'RX_SEQ': '\x00', @@ -620,21 +623,23 @@ class routerHBP(HBSYSTEM): self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - # Set values for the contention handler to test next time there is a frame to forward - _target_status[_target['TS']]['TX_TIME'] = pkt_time - + # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): - # Record the DST TGID and Stream ID - _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] - _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id - _target_status[_target['TS']]['TX_RFS'] = _rf_src - # Generate LCs (full and EMB) for the TX stream - dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src - _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) - _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) - _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) - self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + # Record the DST TGID and Stream ID + _target_status[_target['TS']]['TX_START'] = pkt_time + _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] + _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id + _target_status[_target['TS']]['TX_RFS'] = _rf_src + # Generate LCs (full and EMB) for the TX stream + dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src + _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) + _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) + _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) + self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + + # Set other values for the contention handler to test next time there is a frame to forward + _target_status[_target['TS']]['TX_TIME'] = pkt_time # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: @@ -848,12 +853,18 @@ if __name__ == '__main__': reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) + def loopingErrHandle(failure): + logger.error('STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) + reactor.stop() + # Initialize the rule timer -- this if for user activated stuff - rule_timer = task.LoopingCall(rule_timer_loop) - rule_timer.start(60) + rule_timer_task = task.LoopingCall(rule_timer_loop) + rule_timer = rule_timer_task.start(60) + rule_timer.addErrback(loopingErrHandle) # Initialize the stream trimmer - stream_trimmer = task.LoopingCall(stream_trimmer_loop) - stream_trimmer.start(5) + stream_trimmer_task = task.LoopingCall(stream_trimmer_loop) + stream_trimmer = stream_trimmer_task.start(5) + stream_trimmer.addErrback(loopingErrHandle) reactor.run()