PROGRESS - DOES NOT WORK
this commit is to get the test code moved to the testing server. It is not intended for people to try and download to use
This commit is contained in:
parent
9b8edd2e1c
commit
bccf8d93dd
162
hb_confbridge.py
162
hb_confbridge.py
|
@ -179,6 +179,13 @@ def rule_timer_loop():
|
||||||
|
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
report_server.send_clients('bridge updated')
|
report_server.send_clients('bridge updated')
|
||||||
|
|
||||||
|
for system in CONFIG['SYSTEMS']:
|
||||||
|
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
|
||||||
|
for stream_id in system['STATUS']:
|
||||||
|
if system['STATUS'][stream_id]['STREAM_START'] < _now + 1:
|
||||||
|
removed = system['STATUS'].pop(stream_id)
|
||||||
|
logger.warning('STALE OPENBRIDGE STREAM ID REMOVED FROM SYSTEM: %s, STREAM ID %s', system, int_id(stream_id))
|
||||||
|
|
||||||
class routerOBP(OPENBRIDGE):
|
class routerOBP(OPENBRIDGE):
|
||||||
|
|
||||||
|
@ -198,10 +205,14 @@ class routerOBP(OPENBRIDGE):
|
||||||
if allow_sub(_rf_src) == False:
|
if allow_sub(_rf_src) == False:
|
||||||
self._logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_peer_id), int_id(_dst_id))
|
self._logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_peer_id), int_id(_dst_id))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Is this a new call stream?
|
||||||
|
if (_stream_id not in self.STATUS):
|
||||||
|
|
||||||
if _stream_id not in self.STATUS:
|
|
||||||
# This is a new call stream
|
# This is a new call stream
|
||||||
self.STATUS[_stream_id] = pkt_time
|
self.STATUS[_stream_id]['STREAM_START'] = pkt_time
|
||||||
|
self.STATUS[_stream_id]['PKT_COUNT'] = 0
|
||||||
|
self.STATUS[_stream_id]['CONTENTION'] = False
|
||||||
self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \
|
self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \
|
||||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
|
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
|
@ -210,12 +221,12 @@ class routerOBP(OPENBRIDGE):
|
||||||
# If we can, use the LC from the voice header as to keep all options intact
|
# If we can, use the LC from the voice header as to keep all options intact
|
||||||
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD:
|
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD:
|
||||||
decoded = decode.voice_head_term(dmrpkt)
|
decoded = decode.voice_head_term(dmrpkt)
|
||||||
self.STATUS[_slot]['RX_LC'] = decoded['LC']
|
self.STATUS[_stream_id]['LC'] = decoded['LC']
|
||||||
|
|
||||||
# If we don't have a voice header then don't wait to decode it from the Embedded LC
|
# If we don't have a voice header then don't wait to decode it from the Embedded LC
|
||||||
# just make a new one from the HBP header. This is good enough, and it saves lots of time
|
# just make a new one from the HBP header. This is good enough, and it saves lots of time
|
||||||
else:
|
else:
|
||||||
self.STATUS[_slot]['RX_LC'] = const.LC_OPT + _dst_id + _rf_src
|
self.STATUS[_stream_id] = const.LC_OPT + _dst_id + _rf_src
|
||||||
|
|
||||||
|
|
||||||
for _bridge in BRIDGES:
|
for _bridge in BRIDGES:
|
||||||
|
@ -224,73 +235,98 @@ class routerOBP(OPENBRIDGE):
|
||||||
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
||||||
|
|
||||||
for _target in BRIDGES[_bridge]:
|
for _target in BRIDGES[_bridge]:
|
||||||
if _target['SYSTEM'] != self._system:
|
if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']) and (_target['MODE'] != 'OPENBRIDGE'):
|
||||||
if _target['ACTIVE']:
|
_target_status = systems[_target['SYSTEM']].STATUS
|
||||||
_target_status = systems[_target['SYSTEM']].STATUS
|
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
||||||
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
|
||||||
|
# BEGIN CONTENTION HANDLING
|
||||||
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
#
|
||||||
# Record the DST TGID and Stream ID
|
# The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is:
|
||||||
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
# From a different group than last RX from this HBSystem, but it has been less than Group Hangtime
|
||||||
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
# From a different group than last TX to this HBSystem, but it has been less than Group Hangtime
|
||||||
_target_status[_target['TS']]['TX_RFS'] = _rf_src
|
# From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout
|
||||||
# Generate LCs (full and EMB) for the TX stream
|
# From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout
|
||||||
dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src
|
# The "continue" at the end of each means the next iteration of the for loop that tests for matching rules
|
||||||
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
|
#
|
||||||
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
|
if ((_target['TGID'] != _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < _target_system['GROUP_HANGTIME'])):
|
||||||
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
if self.STATUS[_stream_id]['CONTENTION'] == False:
|
||||||
self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
self.STATUS[_stream_id]['CONTENTION'] = True
|
||||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to: System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
self._logger.info('(%s) Call not routed to TGID %s, target active or in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID']))
|
||||||
|
continue
|
||||||
# Handle any necessary re-writes for the destination
|
if ((_target['TGID'] != _target_status[_target['TS']]['TX_TGID']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < _target_system['GROUP_HANGTIME'])):
|
||||||
if _system['TS'] != _target['TS']:
|
if self.STATUS[_stream_id]['CONTENTION'] == False:
|
||||||
_tmp_bits = _bits ^ 1 << 7
|
self.STATUS[_stream_id]['CONTENTION'] = True
|
||||||
else:
|
self._logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']))
|
||||||
_tmp_bits = _bits
|
continue
|
||||||
|
if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < hb_const.STREAM_TO):
|
||||||
# Assemble transmit HBP packet header
|
if self.STATUS[_stream_id]['CONTENTION'] == False:
|
||||||
_tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
|
self.STATUS[_stream_id]['CONTENTION'] = True
|
||||||
|
self._logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID']))
|
||||||
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
|
continue
|
||||||
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
|
if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < hb_const.STREAM_TO):
|
||||||
# if _dst_id != rule['DST_GROUP']:
|
if self.STATUS[_stream_id]['CONTENTION'] == False:
|
||||||
dmrbits = bitarray(endian='big')
|
self.STATUS[_stream_id]['CONTENTION'] = True
|
||||||
dmrbits.frombytes(dmrpkt)
|
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
||||||
# Create a voice header packet (FULL LC)
|
continue
|
||||||
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD:
|
|
||||||
dmrbits = _target_status[_target['TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_H_LC'][98:197]
|
# Set values for the contention handler to test next time there is a frame to forward
|
||||||
# Create a voice terminator packet (FULL LC)
|
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
||||||
elif _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VTERM:
|
|
||||||
dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197]
|
if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||||
# Create a Burst B-E packet (Embedded LC)
|
# Record the DST TGID and Stream ID
|
||||||
elif _dtype_vseq in [1,2,3,4]:
|
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
||||||
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
|
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
||||||
dmrpkt = dmrbits.tobytes()
|
_target_status[_target['TS']]['TX_RFS'] = _rf_src
|
||||||
_tmp_data = _tmp_data + dmrpkt + _data[53:55]
|
# Generate LCs (full and EMB) for the TX stream
|
||||||
|
dst_lc = self.STATUS[_stream_id]['LC'][0:3] + _target['TGID'] + _rf_src
|
||||||
# Transmit the packet to the destination system
|
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
|
||||||
systems[_target['SYSTEM']].send_system(_tmp_data)
|
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
|
||||||
#self._logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
||||||
|
self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
|
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to: System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
|
|
||||||
|
# Handle any necessary re-writes for the destination
|
||||||
|
if _system['TS'] != _target['TS']:
|
||||||
|
_tmp_bits = _bits ^ 1 << 7
|
||||||
|
else:
|
||||||
|
_tmp_bits = _bits
|
||||||
|
|
||||||
|
# Assemble transmit HBP packet header
|
||||||
|
_tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
|
||||||
|
|
||||||
|
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
|
||||||
|
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
|
||||||
|
# if _dst_id != rule['DST_GROUP']:
|
||||||
|
dmrbits = bitarray(endian='big')
|
||||||
|
dmrbits.frombytes(dmrpkt)
|
||||||
|
# Create a voice header packet (FULL LC)
|
||||||
|
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD:
|
||||||
|
dmrbits = _target_status[_target['TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_H_LC'][98:197]
|
||||||
|
# Create a voice terminator packet (FULL LC)
|
||||||
|
elif _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VTERM:
|
||||||
|
dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197]
|
||||||
|
# Create a Burst B-E packet (Embedded LC)
|
||||||
|
elif _dtype_vseq in [1,2,3,4]:
|
||||||
|
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
|
||||||
|
dmrpkt = dmrbits.tobytes()
|
||||||
|
_tmp_data = _tmp_data + dmrpkt + _data[53:55]
|
||||||
|
|
||||||
|
# Transmit the packet to the destination system
|
||||||
|
systems[_target['SYSTEM']].send_system(_tmp_data)
|
||||||
|
#self._logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Final actions - Is this a voice terminator?
|
# Final actions - Is this a voice terminator?
|
||||||
if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != hb_const.HBPF_SLT_VTERM):
|
if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM):
|
||||||
call_duration = pkt_time - self.STATUS['RX_START']
|
call_duration = pkt_time - self.STATUS['STREAM_START']
|
||||||
self._logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %s', \
|
self._logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %s', \
|
||||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
|
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration))
|
self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration))
|
||||||
|
removed = self.STATUS.pop(_stream_id)
|
||||||
|
if not removed:
|
||||||
# Mark status variables for use later
|
self_logger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id))
|
||||||
self.STATUS[_slot]['RX_SEQ'] = _seq
|
|
||||||
self.STATUS[_slot]['RX_RFS'] = _rf_src
|
|
||||||
self.STATUS[_slot]['RX_TYPE'] = _dtype_vseq
|
|
||||||
self.STATUS[_slot]['RX_TGID'] = _dst_id
|
|
||||||
self.STATUS[_slot]['RX_TIME'] = pkt_time
|
|
||||||
self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id
|
|
||||||
|
|
||||||
|
|
||||||
class routerHBP(HBSYSTEM):
|
class routerHBP(HBSYSTEM):
|
||||||
|
|
||||||
|
|
|
@ -201,6 +201,7 @@ class OPENBRIDGE(DatagramProtocol):
|
||||||
def send_system(self, _packet):
|
def send_system(self, _packet):
|
||||||
if _packet[:4] == 'DMRD':
|
if _packet[:4] == 'DMRD':
|
||||||
_packet = _packet[:11] + self._config['NETWORK_ID'] + _packet[15:]
|
_packet = _packet[:11] + self._config['NETWORK_ID'] + _packet[15:]
|
||||||
|
_packet += hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest()
|
||||||
self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT']))
|
self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT']))
|
||||||
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
|
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
|
||||||
# self._logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet))
|
# self._logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet))
|
||||||
|
|
Loading…
Reference in New Issue