diff --git a/bridge_master.py b/bridge_master.py index 22100fd..5cfc8e2 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -1061,143 +1061,144 @@ class routerHBP(HBSYSTEM): def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP): for _target in BRIDGES[_bridge]: - if _target['SYSTEM'] != self._system and (_target['ACTIVE']): - _target_status = systems[_target['SYSTEM']].STATUS - _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] + if _target['SYSTEM'] != self._system or (_target['SYSTEM'] == self._system and _target['TS'] != _slot): + if _target['ACTIVE']: + _target_status = systems[_target['SYSTEM']].STATUS + _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] - if _target_system['MODE'] == 'OPENBRIDGE': - if _noOBP == True: - continue - # Is this a new call stream on the target? - if (_stream_id not in _target_status): - # This is a new call stream on the target - _target_status[_stream_id] = { - 'START': pkt_time, - 'CONTENTION':False, - 'RFS': _rf_src, - 'TGID': _dst_id, - } - # Generate LCs (full and EMB) for the TX stream - dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) - _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) - _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) - _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) - - logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - if CONFIG['REPORTS']['REPORT']: - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - - # Record the time of this packet so we can later identify a stale stream - _target_status[_stream_id]['LAST'] = pkt_time - # Clear the TS bit -- all OpenBridge streams are effectively on TS1 - _tmp_bits = _bits & ~(1 << 7) - - # Assemble transmit HBP packet header - _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET - # MUST RE-WRITE DESTINATION TGID IF DIFFERENT - # if _dst_id != rule['DST_GROUP']: - dmrbits = bitarray(endian='big') - dmrbits.frombytes(dmrpkt) - # Create a voice header packet (FULL LC) - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: - dmrbits = _target_status[_stream_id]['H_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['H_LC'][98:197] - # Create a voice terminator packet (FULL LC) - elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: - dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] - if CONFIG['REPORTS']['REPORT']: - call_duration = pkt_time - _target_status[_stream_id]['START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) - # Create a Burst B-E packet (Embedded LC) - elif _dtype_vseq in [1,2,3,4]: - dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] - dmrpkt = dmrbits.tobytes() - _tmp_data = b''.join([_tmp_data, dmrpkt]) - - else: - # BEGIN STANDARD CONTENTION HANDLING - # - # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is: - # From a different group than last RX from this HBSystem, but it has been less than Group Hangtime - # From a different group than last TX to this HBSystem, but it has been less than Group Hangtime - # From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout - # From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout - # The "continue" at the end of each means the next iteration of the for loop that tests for matching rules - # - if ((_target['TGID'] != _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < _target_system['GROUP_HANGTIME'])): - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: - logger.info('(%s) Call not routed to TGID %s, target active or in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) - continue - if ((_target['TGID'] != _target_status[_target['TS']]['TX_TGID']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < _target_system['GROUP_HANGTIME'])): - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: - logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) - continue - if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: - logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) - continue - if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: - logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) - continue - - # Is this a new call stream? - if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): - # Record the DST TGID and Stream ID - _target_status[_target['TS']]['TX_START'] = pkt_time - _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] - _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id - _target_status[_target['TS']]['TX_RFS'] = _rf_src - _target_status[_target['TS']]['TX_PEER'] = _peer_id + if _target_system['MODE'] == 'OPENBRIDGE': + if _noOBP == True: + continue + # Is this a new call stream on the target? + if (_stream_id not in _target_status): + # This is a new call stream on the target + _target_status[_stream_id] = { + 'START': pkt_time, + 'CONTENTION':False, + 'RFS': _rf_src, + 'TGID': _dst_id, + } # Generate LCs (full and EMB) for the TX stream - dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src - _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) - _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) - _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) - logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) + _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) + _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + + logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) + + # Record the time of this packet so we can later identify a stale stream + _target_status[_stream_id]['LAST'] = pkt_time + # Clear the TS bit -- all OpenBridge streams are effectively on TS1 + _tmp_bits = _bits & ~(1 << 7) - # Set other values for the contention handler to test next time there is a frame to forward - _target_status[_target['TS']]['TX_TIME'] = pkt_time - _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq + # Assemble transmit HBP packet header + _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET + # MUST RE-WRITE DESTINATION TGID IF DIFFERENT + # if _dst_id != rule['DST_GROUP']: + dmrbits = bitarray(endian='big') + dmrbits.frombytes(dmrpkt) + # Create a voice header packet (FULL LC) + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: + dmrbits = _target_status[_stream_id]['H_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['H_LC'][98:197] + # Create a voice terminator packet (FULL LC) + elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: + dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] + if CONFIG['REPORTS']['REPORT']: + call_duration = pkt_time - _target_status[_stream_id]['START'] + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + # Create a Burst B-E packet (Embedded LC) + elif _dtype_vseq in [1,2,3,4]: + dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] + dmrpkt = dmrbits.tobytes() + _tmp_data = b''.join([_tmp_data, dmrpkt]) - # Handle any necessary re-writes for the destination - if _system['TS'] != _target['TS']: - _tmp_bits = _bits ^ 1 << 7 else: - _tmp_bits = _bits + # BEGIN STANDARD CONTENTION HANDLING + # + # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is: + # From a different group than last RX from this HBSystem, but it has been less than Group Hangtime + # From a different group than last TX to this HBSystem, but it has been less than Group Hangtime + # From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout + # From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout + # The "continue" at the end of each means the next iteration of the for loop that tests for matching rules + # + if ((_target['TGID'] != _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < _target_system['GROUP_HANGTIME'])): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed to TGID %s, target active or in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) + continue + if ((_target['TGID'] != _target_status[_target['TS']]['TX_TGID']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < _target_system['GROUP_HANGTIME'])): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) + continue + if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) + continue + if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) + continue - # Assemble transmit HBP packet header - _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + # Is this a new call stream? + if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): + # Record the DST TGID and Stream ID + _target_status[_target['TS']]['TX_START'] = pkt_time + _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] + _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id + _target_status[_target['TS']]['TX_RFS'] = _rf_src + _target_status[_target['TS']]['TX_PEER'] = _peer_id + # Generate LCs (full and EMB) for the TX stream + dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src + _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) + _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) + _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) + logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + if CONFIG['REPORTS']['REPORT']: + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET - # MUST RE-WRITE DESTINATION TGID IF DIFFERENT - # if _dst_id != rule['DST_GROUP']: - dmrbits = bitarray(endian='big') - dmrbits.frombytes(dmrpkt) - # Create a voice header packet (FULL LC) - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: - dmrbits = _target_status[_target['TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_H_LC'][98:197] - # Create a voice terminator packet (FULL LC) - elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: - dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] - if CONFIG['REPORTS']['REPORT']: - call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) - # Create a Burst B-E packet (Embedded LC) - elif _dtype_vseq in [1,2,3,4]: - dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] - dmrpkt = dmrbits.tobytes() - _tmp_data = b''.join([_tmp_data, dmrpkt, _data[53:55]]) + # Set other values for the contention handler to test next time there is a frame to forward + _target_status[_target['TS']]['TX_TIME'] = pkt_time + _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq - # Transmit the packet to the destination system - systems[_target['SYSTEM']].send_system(_tmp_data) - #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + # Handle any necessary re-writes for the destination + if _system['TS'] != _target['TS']: + _tmp_bits = _bits ^ 1 << 7 + else: + _tmp_bits = _bits - + # Assemble transmit HBP packet header + _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET + # MUST RE-WRITE DESTINATION TGID IF DIFFERENT + # if _dst_id != rule['DST_GROUP']: + dmrbits = bitarray(endian='big') + dmrbits.frombytes(dmrpkt) + # Create a voice header packet (FULL LC) + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: + dmrbits = _target_status[_target['TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_H_LC'][98:197] + # Create a voice terminator packet (FULL LC) + elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: + dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] + if CONFIG['REPORTS']['REPORT']: + call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + # Create a Burst B-E packet (Embedded LC) + elif _dtype_vseq in [1,2,3,4]: + dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] + dmrpkt = dmrbits.tobytes() + _tmp_data = b''.join([_tmp_data, dmrpkt, _data[53:55]]) + + # Transmit the packet to the destination system + systems[_target['SYSTEM']].send_system(_tmp_data) + #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() dmrpkt = _data[20:53]