Dangerous, but it works
Needs code optomization and cleanup, and as of now, has no method to purge abandoned stream_ids, and thus will have the effect of a memory leak for calls wothout voice terminators
This commit is contained in:
		
							parent
							
								
									5fbf8f0a11
								
							
						
					
					
						commit
						ed12a8effa
					
				
							
								
								
									
										118
									
								
								hb_confbridge.py
									
									
									
									
									
								
							
							
						
						
									
										118
									
								
								hb_confbridge.py
									
									
									
									
									
								
							| @ -180,12 +180,24 @@ def rule_timer_loop(): | ||||
|     if CONFIG['REPORTS']['REPORT']: | ||||
|         report_server.send_clients('bridge updated') | ||||
| 
 | ||||
|     for system in CONFIG['SYSTEMS']: | ||||
| 
 | ||||
| # run this every 10 seconds to trim orphaned stream ids | ||||
| def stream_trimmer_loop(): | ||||
|     return | ||||
|     logger.info('(ALL OPENBRIDGE SYSTEMS) Trimming orphaned stream IDs from system lists') | ||||
|     _now = time() | ||||
|      | ||||
|     for system in systems: | ||||
|         remove_list = [] | ||||
|         if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': | ||||
|             for stream_id in system['STATUS']: | ||||
|                 if system['STATUS'][stream_id]['STREAM_START'] < _now + 1: | ||||
|                     removed = system['STATUS'].pop(stream_id) | ||||
|                     logger.warning('STALE OPENBRIDGE STREAM ID REMOVED FROM SYSTEM: %s, STREAM ID %s', system, int_id(stream_id)) | ||||
|             for stream_id in systems[system].STATUS: | ||||
|                 if systems[system].STATUS[stream_id]['LAST'] < _now + 1: | ||||
|                     remove_list.append(stream_id) | ||||
|          | ||||
|         for stream in remove_list: | ||||
|             removed = systems[system].STATUS.pop(stream_id) | ||||
|             logger.warning('STALE OPENBRIDGE STREAM ID REMOVED FROM SYSTEM: %s, STREAM ID %s', system, int_id(stream)) | ||||
| 
 | ||||
|              | ||||
| class routerOBP(OPENBRIDGE): | ||||
|      | ||||
| @ -208,26 +220,32 @@ class routerOBP(OPENBRIDGE): | ||||
|              | ||||
|             # Is this a new call stream?    | ||||
|             if (_stream_id not in self.STATUS):    | ||||
|                  | ||||
|                 # This is a new call stream | ||||
|                 self.STATUS[_stream_id] = {} | ||||
|                 self.STATUS[_stream_id]['STREAM_START'] = pkt_time | ||||
|                 self.STATUS[_stream_id]['PKT_COUNT'] = 0 | ||||
|                 self.STATUS[_stream_id]['CONTENTION'] = False | ||||
|                 self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ | ||||
|                         self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) | ||||
|                 if CONFIG['REPORTS']['REPORT']: | ||||
|                     self._report.send_bridgeEvent('GROUP VOICE,START,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id))) | ||||
|                 self.STATUS[_stream_id] = { | ||||
|                     'START':     pkt_time, | ||||
|                     'CONTENTION':False, | ||||
|                     'RFS':       _rf_src, | ||||
|                     'TGID':      _dst_id, | ||||
|                 } | ||||
| 
 | ||||
|                 # If we can, use the LC from the voice header as to keep all options intact | ||||
|                 if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD: | ||||
|                     decoded = decode.voice_head_term(dmrpkt) | ||||
|                     self.STATUS[_stream_id]['LC'] = decoded['LC'] | ||||
|                  | ||||
|                 # If we don't have a voice header then don't wait to decode it from the Embedded LC | ||||
|                 # If we don't have a voice header then don't wait to decode the Embedded LC | ||||
|                 # just make a new one from the HBP header. This is good enough, and it saves lots of time | ||||
|                 else: | ||||
|                     self.STATUS[_stream_id] = const.LC_OPT + _dst_id + _rf_src | ||||
|                     self.STATUS[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src | ||||
|                     | ||||
|                      | ||||
|                 self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ | ||||
|                         self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) | ||||
|                 if CONFIG['REPORTS']['REPORT']: | ||||
|                     self._report.send_bridgeEvent('GROUP VOICE,START,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id))) | ||||
|              | ||||
|              | ||||
|             self.STATUS[_stream_id]['LAST'] = pkt_time | ||||
| 
 | ||||
| 
 | ||||
|             for _bridge in BRIDGES: | ||||
| @ -236,9 +254,7 @@ class routerOBP(OPENBRIDGE): | ||||
|                     if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True): | ||||
|                          | ||||
|                         for _target in BRIDGES[_bridge]: | ||||
|                             print('got here') | ||||
|                             if (_target['SYSTEM'] != self._system) and (CONFIG['SYSTEMS'][_target['SYSTEM']]['ACTIVE']) and (CONFIG['SYSTEMS'][_target['SYSTEM']]['MODE'] != 'OPENBRIDGE'): | ||||
|                                 print('then here') | ||||
|                             if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']) and (CONFIG['SYSTEMS'][_target['SYSTEM']]['MODE'] != 'OPENBRIDGE'): | ||||
|                                 _target_status = systems[_target['SYSTEM']].STATUS | ||||
|                                 _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] | ||||
|                              | ||||
| @ -322,7 +338,7 @@ class routerOBP(OPENBRIDGE): | ||||
|              | ||||
|             # Final actions - Is this a voice terminator? | ||||
|             if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM): | ||||
|                 call_duration = pkt_time - self.STATUS['STREAM_START'] | ||||
|                 call_duration = pkt_time - self.STATUS[_stream_id]['START'] | ||||
|                 self._logger.info('(%s) *CALL END*   STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %s', \ | ||||
|                         self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) | ||||
|                 if CONFIG['REPORTS']['REPORT']: | ||||
| @ -421,7 +437,6 @@ class routerHBP(HBSYSTEM): | ||||
|                 else: | ||||
|                     self.STATUS[_slot]['RX_LC'] = const.LC_OPT + _dst_id + _rf_src | ||||
| 
 | ||||
| 
 | ||||
|             for _bridge in BRIDGES: | ||||
|                 for _system in BRIDGES[_bridge]: | ||||
|   | ||||
| @ -433,7 +448,59 @@ class routerHBP(HBSYSTEM): | ||||
|                                     _target_status = systems[_target['SYSTEM']].STATUS | ||||
|                                     _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] | ||||
|                                  | ||||
|                                     # BEGIN CONTENTION HANDLING | ||||
|                                     if _target_system['MODE'] == 'OPENBRIDGE': | ||||
|                                         # Is this a new call stream on the target?   | ||||
|                                         if (_stream_id not in _target_status): | ||||
|                                             # This is a new call stream on the target | ||||
|                                             _target_status[_stream_id] = { | ||||
|                                                 'START':     pkt_time, | ||||
|                                                 'CONTENTION':False, | ||||
|                                                 'RFS':       _rf_src, | ||||
|                                                 'TGID':      _dst_id, | ||||
|                                             } | ||||
|                                             # If we can, use the LC from the voice header as to keep all options intact | ||||
|                                             if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD: | ||||
|                                                 decoded = decode.voice_head_term(dmrpkt) | ||||
|                                                 _target_status[_stream_id]['LC'] = decoded['LC'] | ||||
|                                                 self._logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) | ||||
|                  | ||||
|                                             # If we don't have a voice header then don't wait to decode the Embedded LC | ||||
|                                             # just make a new one from the HBP header. This is good enough, and it saves lots of time | ||||
|                                             else: | ||||
|                                                 _target_status[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src | ||||
|                                                 self._logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) | ||||
|                                                  | ||||
|                                             _target_status[_stream_id]['H_LC']   = bptc.encode_header_lc(_target_status[_stream_id]['LC']) | ||||
|                                             _target_status[_stream_id]['T_LC']   = bptc.encode_terminator_lc(_target_status[_stream_id]['LC']) | ||||
|                                             _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC']) | ||||
|                                          | ||||
|                                         # Record the time of this packet so we can later identify a stale stream    | ||||
|                                         _target_status[_stream_id]['LAST'] = pkt_time | ||||
|                                         # Clear the TS bit -- all OpenBridge streams are effectively on TS1 | ||||
|                                         _tmp_bits = _bits & ~(1 << 7) | ||||
|                                              | ||||
|                                         # Assemble transmit HBP packet header | ||||
|                                         _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20] | ||||
|                      | ||||
|                                         # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET | ||||
|                                         # MUST RE-WRITE DESTINATION TGID IF DIFFERENT | ||||
|                                         # if _dst_id != rule['DST_GROUP']: | ||||
|                                         dmrbits = bitarray(endian='big') | ||||
|                                         dmrbits.frombytes(dmrpkt) | ||||
|                                         # Create a voice header packet (FULL LC) | ||||
|                                         if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD: | ||||
|                                             dmrbits = _target_status[_stream_id]['H_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['H_LC'][98:197] | ||||
|                                         # Create a voice terminator packet (FULL LC) | ||||
|                                         elif _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VTERM: | ||||
|                                             dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] | ||||
|                                         # Create a Burst B-E packet (Embedded LC) | ||||
|                                         elif _dtype_vseq in [1,2,3,4]: | ||||
|                                             dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] | ||||
|                                         dmrpkt = dmrbits.tobytes() | ||||
|                                         _tmp_data = _tmp_data + dmrpkt #+ _data[53:55] | ||||
|                                              | ||||
|                                     else: | ||||
|                                         # BEGIN STANDARD CONTENTION HANDLING | ||||
|                                         # | ||||
|                                         # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is: | ||||
|                                         #   From a different group than last RX from this HBSystem, but it has been less than Group Hangtime | ||||
| @ -472,7 +539,8 @@ class routerHBP(HBSYSTEM): | ||||
|                                             _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) | ||||
|                                             _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) | ||||
|                                             _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) | ||||
|                                         self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) | ||||
|                                             self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) | ||||
|                                              | ||||
|                                         self._logger.info('(%s) Conference Bridge: %s, Call Bridged to: System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) | ||||
|                      | ||||
|                                         # Handle any necessary re-writes for the destination | ||||
| @ -691,4 +759,8 @@ if __name__ == '__main__': | ||||
|     rule_timer = task.LoopingCall(rule_timer_loop) | ||||
|     rule_timer.start(60) | ||||
|      | ||||
|     # Initialize the stream trimmer | ||||
|     stream_trimmer = task.LoopingCall(stream_trimmer_loop) | ||||
|     stream_trimmer.start(10) | ||||
| 
 | ||||
|     reactor.run() | ||||
| @ -156,7 +156,7 @@ def build_config(_config_file): | ||||
|                     CONFIG['SYSTEMS'].update({section: { | ||||
|                         'MODE': config.get(section, 'MODE'), | ||||
|                         'ENABLED': config.getboolean(section, 'ENABLED'), | ||||
|                         'NETWORK_ID': config.getint(section, 'NETWORK_ID'), | ||||
|                         'NETWORK_ID': hex(int(config.get(section, 'NETWORK_ID')))[2:].rjust(8,'0').decode('hex'), | ||||
|                         'IP': gethostbyname(config.get(section, 'IP')), | ||||
|                         'PORT': config.getint(section, 'PORT'), | ||||
|                         'PASSPHRASE': config.get(section, 'PASSPHRASE').ljust(20,'\x00')[:20], | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user