From 88334eaaf51492b82b7d05b6fccd5349f2d0be2c Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 12 Apr 2021 23:28:01 +0100 Subject: [PATCH] Adds support for new Bridge Control OPCODE - BCSQ BCSQ is Source Quench or Squelch When a stream id is received from more than one source on a TG the system sends a BCSQ to all of the non-first systems to ask them to stop sending this stream ID. This reduces network and CPU load. Also packets can't loop of they arent even received! Squashed commit of the following: commit e5ba9ece5d84e56096c459139fb39eba16249f96 Author: Simon Date: Mon Apr 12 23:16:30 2021 +0100 Tidy up log handling for streams commit fc1e4bd91f0e576e8c58e84cf4b96f00ce6ec933 Author: Simon Date: Mon Apr 12 22:56:45 2021 +0100 Fix target port (BCKA) commit 27e046e5efe744679c7652f65135cf3129481092 Author: Simon Date: Mon Apr 12 22:50:35 2021 +0100 Handle keyerror commit 99c660fa813a300940b16e55d840218f1868cadb Author: Simon Date: Mon Apr 12 22:45:05 2021 +0100 Stream trimmer for BCSQ commit 9d6102be13813b27b752192e29e94d5eafbbb934 Author: Simon Date: Mon Apr 12 21:32:54 2021 +0100 Brack commit 2f8a8cf620153cb0138e970c0dcd85328a47521c Merge: f77a68a 17b6968 Author: Simon Date: Sun Apr 11 22:16:38 2021 +0100 Merge branch 'master' into bcsq commit f77a68a96297fe050609b8ee5fb56b0866ec78f4 Merge: 7f4c6f5 9a3e5fb Author: Simon Date: Fri Apr 9 01:39:54 2021 +0100 Merge branch 'master' into bcsq commit 7f4c6f5375b589aa78e74f0002448afe95a625a2 Author: Simon Date: Thu Apr 8 18:35:08 2021 +0100 c commit c0904242e0f532c26e33bdd9cba8d1bea5329abc Author: Simon Date: Tue Apr 6 19:05:40 2021 +0100 only try to quench the source once commit 2748d0cf7c5557522a5b2d0d9e3b37e294711910 Author: Simon Date: Tue Apr 6 18:56:05 2021 +0100 Comment out check for enahnced to tes commit fa43a09db73862343f0f70a367786c4d77aaf9b9 Author: Simon Date: Tue Apr 6 18:51:19 2021 +0100 Dst id not tgid commit f02df6edf10cfa936147b0862671ed949a820785 Author: Simon Date: Tue Apr 6 18:48:44 2021 +0100 More BCSQ commit adf3bb4059f0710848f8e3349722141836889c41 Author: Simon Date: Tue Apr 6 18:44:15 2021 +0100 more BCKA fixes commit b4dc518d9b74a0d8c83b3b1bb17d7de4d04b9915 Author: Simon Date: Tue Apr 6 18:42:13 2021 +0100 Fix broken BKCA commit 637d772dbadb346ad49d5b80d8299c13b0fbfc79 Author: Simon Date: Tue Apr 6 18:38:17 2021 +0100 More work on BCSQ --- bridge_master.py | 31 +++++++++++++++++++++++++++---- const.py | 1 + hblink.py | 31 ++++++++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index ac3dd41..457df45 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -366,17 +366,18 @@ def stream_trimmer_loop(): _slot = systems[system].STATUS[slot] # RX slot check - if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 60: + if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: _slot['RX_TYPE'] = HBPF_SLT_VTERM logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) - #Null stream_id - for loop control + #Null stream_id - for loop control + if _slot['RX_TIME'] < _now - 60: _slot['RX_STREAM_ID'] = b'\x00' # TX slot check - if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 60: + if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: _slot['TX_TYPE'] = HBPF_SLT_VTERM logger.info('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) @@ -417,6 +418,13 @@ def stream_trimmer_loop(): if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) removed = systems[system].STATUS.pop(stream_id) + + try: + for tgid in _sysconfig['_bcsq']: + if sysconfig['_bcsq'][tgid] == stream_id: + removed = sysconfig['_bcsq'].pop(tgid) + except KeyError: + pass else: logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) @@ -1072,6 +1080,12 @@ class routerOBP(OPENBRIDGE): #We want to ignore this system and TS combination if it's called again for this packet _sysIgnore.append((_target['SYSTEM'],_target['TS'])) + if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_dst_id] == _stream_id): + #logger.info('(%s) Conference Bridge: %s, is Source Quenched for Stream ID: %s, skipping system: %s TS: %s, TGID: %s', self._system, _bridge, int_id(_stream_id), _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + continue + + + # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target @@ -1261,8 +1275,9 @@ class routerOBP(OPENBRIDGE): else: - if '_fin' in self.STATUS[_stream_id]: + if '_fin' in self.STATUS[_stream_id] and '_finlog' not in self.STATUS[_stream_id]: logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) + self.STATUS[_stream_id]['_finlog'] = True return # Loop Control @@ -1298,6 +1313,13 @@ class routerOBP(OPENBRIDGE): logger.warning("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LAST'] = pkt_time + + if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_stream_id]: + systems[self._system].send_bcsq(_dst_id,_stream_id) + logger.warning("(%s) OBP *BridgeControl* This system Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_stream_id]['_bcsq'] = True + + return @@ -1317,6 +1339,7 @@ class routerOBP(OPENBRIDGE): #if _bridge[0:1] != '#': #if True: for _system in BRIDGES[_bridge]: + if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore) diff --git a/const.py b/const.py index 5ff15c0..c491223 100755 --- a/const.py +++ b/const.py @@ -72,6 +72,7 @@ DMRA = b'DMRA' #Bridge Control commands BC = b'BC' BCKA = b'BCKA' +BCSQ = b'BCSQ' # Higheset peer ID permitted by HBP PEER_MAX = 4294967295 diff --git a/hblink.py b/hblink.py index 0eee520..7fe2cb8 100755 --- a/hblink.py +++ b/hblink.py @@ -116,7 +116,7 @@ class OPENBRIDGE(DatagramProtocol): def startProtocol(self): if self._config['ENHANCED_OBP']: - self._bcka = task.LoopingCall(self.send_bcka) + self._bcka = task.LoopingCall(self.send_bcka()) self._bcka = self._bcka.start(10) def dereg(self): @@ -132,13 +132,20 @@ class OPENBRIDGE(DatagramProtocol): # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! #logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet)) else: - logger.error('(%s) OpenBridge system was asked to send non DMRD packet: %s', self._system, _packet) + logger.error('(%s) OpenBridge system was asked to send non DMRD packet with send_system(): %s', self._system, _packet) def send_bcka(self): _packet = BCKA _packet = b''.join([_packet[:4], (hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest())]) self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) logger.debug('(%s) *BridgeControl* sent Keep Alive',self._system) + + def send_bcsq(self,_tgid,_stream_id): + _packet = b''.join([BCSQ, _tgid, _stream_id]) + _packet = b''.join([_packet, (hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest())]) + self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) + logger.debug('(%s) *BridgeControl* sent Source Quench, TG: %s, Stream ID: %s',self._system,int_id(_tgid), int_id(_stream_id)) + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): pass @@ -209,6 +216,7 @@ class OPENBRIDGE(DatagramProtocol): if self._config['ENHANCED_OBP']: if _packet[:2] == BC: # Bridge Control packet (Extended OBP) + #Keep Alive if _packet[:4] == BCKA: #_data = _packet[:53] _hash = _packet[4:] @@ -218,7 +226,7 @@ class OPENBRIDGE(DatagramProtocol): self._config['_bcka'] = time() if _sockaddr != self._config['TARGET_SOCK']: h,p = _sockaddr - logger.info('(%s) *BridgeControl* Source IP and Port has changed for OBP from %s:%s to %s:%s, updating',self._system,self._config['TARGET_IP'],self._config['TARGET_port'],h,p) + logger.info('(%s) *BridgeControl* Source IP and Port has changed for OBP from %s:%s to %s:%s, updating',self._system,self._config['TARGET_IP'],self._config['TARGET_PORT'],h,p) self._config['TARGET_IP'] = h self._config['TARGET_PORT'] = p self._config['TARGET_SOCK'] = (h,p) @@ -226,6 +234,23 @@ class OPENBRIDGE(DatagramProtocol): else: h,p = _sockaddr logger.info('(%s) *BridgeControl* BCKA invalid KeepAlive, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p) + #Source Quench + if _packet[:4] == BCSQ: + #_data = _packet[:11] + _hash = _packet[11:] + _tgid = _packet[4:7] + _stream_id = _packet[7:11] + _ckhs = hmac_new(self._config['PASSPHRASE'],_packet[:11],sha1).digest() + if compare_digest(_hash, _ckhs): + logger.info('(%s) *BridgeControl* Source Quench request received for TGID: %s, Stream ID: %s',self._system,int_id(_tgid), int_id(_stream_id)) + if '_bcsq' not in self._config: + self._config['_bcsq'] = {} + self._config['_bcsq'][_tgid] = _stream_id + else: + h,p = _sockaddr + logger.info('(%s) *BridgeControl* BCSQ invalid Source Quench, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p) + + #************************************************ # HB MASTER CLASS