From 877e1c6c65e1c6de67fafcce28c88eab901488bd Mon Sep 17 00:00:00 2001 From: Cort Buffington Date: Wed, 4 Dec 2019 17:12:26 -0600 Subject: [PATCH] Adding inbound OBP Processing --- bridge.py | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/bridge.py b/bridge.py index c41876b..3e844f6 100755 --- a/bridge.py +++ b/bridge.py @@ -413,9 +413,135 @@ class routerOBP(OPENBRIDGE): def unit_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): + global UNIT_MAP pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] + + # Make/update this unit in the UNIT_MAP cache + UNIT_MAP[_rf_src] = (self.name, pkt_time) + + + # Is this a new call stream? + if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): + + # Collision in progress, bail out! + if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): + logger.warning('(%s) Packet received with STREAM ID: %s SUB: %s PEER: %s UNIT %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) + return + + # Create a destination list for the call: + if _dst_id in UNIT_MAP: + if UNIT_MAP[_dst_id][0] != self._system: + self._targets = [UNIT_MAP[_dst_id][0]] + _target_route = UNIT_MAP[_dst_id][0] + else: + self._targets = [] + logger.debug('UNIT call to a subscriber on the same system, send nothing') + else: + self._targets = list(systems) + self._targets.remove(self._system) + _target_route = 'FLOOD' + + # This is a new call stream, so log & report + self.STATUS[_slot]['RX_START'] = pkt_time + logger.info('(%s) *UNIT CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) UNIT: %s (%s), TS: %s, FORWARD: %s', \ + self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, _target_route) + if CONFIG['REPORTS']['REPORT']: + self._report.send_bridgeEvent('UNIT VOICE,START,RX,{},{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), _target_route).encode(encoding='utf-8', errors='ignore')) + + for _target in self._targets: + _target_status = systems[_target].STATUS + _target_system = self._CONFIG['SYSTEMS'][_target] + + if self._CONFIG['SYSTEMS'][_target]['MODE'] == 'OPENBRIDGE': + if (_stream_id not in _target_status): + # This is a new call stream on the target + _target_status[_stream_id] = { + 'START': pkt_time, + 'CONTENTION':False, + 'RFS': _rf_src, + 'TGID': _dst_id, + } + + logger.info('(%s) Unit call bridged to OBP System: %s TS: %s, TGID: %s', self._system, _target, _slot, int_id(_dst_id)) + if CONFIG['REPORTS']['REPORT']: + systems[_target]._report.send_bridgeEvent('UNIT VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + + # Record the time of this packet so we can later identify a stale stream + _target_status[_stream_id]['LAST'] = pkt_time + # Clear the TS bit -- all OpenBridge streams are effectively on TS1 + _tmp_bits = _bits & ~(1 << 7) + + # Assemble transmit HBP packet + _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + _data = b''.join([_tmp_data, dmrpkt]) + + else: + # BEGIN STANDARD CONTENTION HANDLING + # + # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is: + # From a different group than last RX from this HBSystem, but it has been less than Group Hangtime + # From a different group than last TX to this HBSystem, but it has been less than Group Hangtime + # From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout + # From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout + # The "continue" at the end of each means the next iteration of the for loop that tests for matching rules + # + if ((_dst_id != _target_status[_slot]['RX_TGID']) and ((pkt_time - _target_status[_slot]['RX_TIME']) < _target_system['GROUP_HANGTIME'])): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed to destination %s, target active or in group hangtime: HBSystem: %s, TS: %s, DEST: %s', self._system, int_id(_dst_id), _target, _slot, int_id(_target_status[_slot]['RX_TGID'])) + continue + if ((_dst_id != _target_status[_slot]['TX_TGID']) and ((pkt_time - _target_status[_slot]['TX_TIME']) < _target_system['GROUP_HANGTIME'])): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed to destination %s, target in group hangtime: HBSystem: %s, TS: %s, DEST: %s', self._system, int_id(_dst_id), _target, _slot, int_id(_target_status[_slot]['TX_TGID'])) + continue + if (_dst_id == _target_status[_slot]['RX_TGID']) and ((pkt_time - _target_status[_slot]['RX_TIME']) < STREAM_TO): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed to destination %s, matching call already active on target: HBSystem: %s, TS: %s, DEST: %s', self._system, int_id(_dst_id), _target, _slot, int_id(_target_status[_slot]['RX_TGID'])) + continue + if (_dst_id == _target_status[_slot]['TX_TGID']) and (_rf_src != _target_status[_slot]['TX_RFS']) and ((pkt_time - _target_status[_slot]['TX_TIME']) < STREAM_TO): + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: + logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, DEST: %s, SUB: %s', self._system, int_id(_rf_src), _target, _slot, int_id(_target_status[_slot]['TX_TGID']), int_id(_target_status[_slot]['TX_RFS'])) + continue + + # Record target information if this is a new call stream? + if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): + # Record the DST TGID and Stream ID + _target_status[_slot]['TX_START'] = pkt_time + _target_status[_slot]['TX_TGID'] = _dst_id + _target_status[_slot]['TX_STREAM_ID'] = _stream_id + _target_status[_slot]['TX_RFS'] = _rf_src + _target_status[_slot]['TX_PEER'] = _peer_id + + logger.info('(%s) Unit call bridged to HBP System: %s TS: %s, UNIT: %s', self._system, _target, _slot, int_id(_dst_id)) + if CONFIG['REPORTS']['REPORT']: + systems[_target]._report.send_bridgeEvent('UNIT VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + + # Set other values for the contention handler to test next time there is a frame to forward + _target_status[_slot]['TX_TIME'] = pkt_time + _target_status[_slot]['TX_TYPE'] = _dtype_vseq + + #send the call: + systems[_target].send_system(_data) + + + # Final actions - Is this a voice terminator? + if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): + self._targets = [] + call_duration = pkt_time - self.STATUS[_slot]['RX_START'] + logger.info('(%s) *UNIT CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) UNIT %s (%s), TS %s, Duration: %.2f', \ + self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) + if CONFIG['REPORTS']['REPORT']: + self._report.send_bridgeEvent('UNIT VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + + # Mark status variables for use later + self.STATUS[_slot]['RX_PEER'] = _peer_id + self.STATUS[_slot]['RX_SEQ'] = _seq + self.STATUS[_slot]['RX_RFS'] = _rf_src + self.STATUS[_slot]['RX_TYPE'] = _dtype_vseq + self.STATUS[_slot]['RX_TGID'] = _dst_id + self.STATUS[_slot]['RX_TIME'] = pkt_time + self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):