Adds support for new Bridge Control OPCODE - BCSQ

BCSQ is Source Quench or Squelch

When a stream id is received from more than one source on a TG
the system sends a BCSQ to all of the non-first systems to ask them
to stop sending this stream ID. This reduces network and CPU load. Also
packets can't loop of they arent even received!

Squashed commit of the following:

commit e5ba9ece5d84e56096c459139fb39eba16249f96
Author: Simon <simon@gb7fr.org.uk>
Date:   Mon Apr 12 23:16:30 2021 +0100

    Tidy up log handling for streams

commit fc1e4bd91f0e576e8c58e84cf4b96f00ce6ec933
Author: Simon <simon@gb7fr.org.uk>
Date:   Mon Apr 12 22:56:45 2021 +0100

    Fix target port (BCKA)

commit 27e046e5efe744679c7652f65135cf3129481092
Author: Simon <simon@gb7fr.org.uk>
Date:   Mon Apr 12 22:50:35 2021 +0100

    Handle keyerror

commit 99c660fa813a300940b16e55d840218f1868cadb
Author: Simon <simon@gb7fr.org.uk>
Date:   Mon Apr 12 22:45:05 2021 +0100

    Stream trimmer for BCSQ

commit 9d6102be13813b27b752192e29e94d5eafbbb934
Author: Simon <simon@gb7fr.org.uk>
Date:   Mon Apr 12 21:32:54 2021 +0100

    Brack

commit 2f8a8cf620153cb0138e970c0dcd85328a47521c
Merge: f77a68a 17b6968
Author: Simon <simon@gb7fr.org.uk>
Date:   Sun Apr 11 22:16:38 2021 +0100

    Merge branch 'master' into bcsq

commit f77a68a96297fe050609b8ee5fb56b0866ec78f4
Merge: 7f4c6f5 9a3e5fb
Author: Simon <simon@gb7fr.org.uk>
Date:   Fri Apr 9 01:39:54 2021 +0100

    Merge branch 'master' into bcsq

commit 7f4c6f5375b589aa78e74f0002448afe95a625a2
Author: Simon <simon@gb7fr.org.uk>
Date:   Thu Apr 8 18:35:08 2021 +0100

    c

commit c0904242e0f532c26e33bdd9cba8d1bea5329abc
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 19:05:40 2021 +0100

    only try to quench the source once

commit 2748d0cf7c5557522a5b2d0d9e3b37e294711910
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 18:56:05 2021 +0100

    Comment out check for enahnced to tes

commit fa43a09db73862343f0f70a367786c4d77aaf9b9
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 18:51:19 2021 +0100

    Dst id not tgid

commit f02df6edf10cfa936147b0862671ed949a820785
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 18:48:44 2021 +0100

    More BCSQ

commit adf3bb4059f0710848f8e3349722141836889c41
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 18:44:15 2021 +0100

    more BCKA fixes

commit b4dc518d9b74a0d8c83b3b1bb17d7de4d04b9915
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 18:42:13 2021 +0100

    Fix broken BKCA

commit 637d772dbadb346ad49d5b80d8299c13b0fbfc79
Author: Simon <simon@gb7fr.org.uk>
Date:   Tue Apr 6 18:38:17 2021 +0100

    More work on BCSQ
This commit is contained in:
Simon 2021-04-12 23:28:01 +01:00
parent 17b6968930
commit 88334eaaf5
3 changed files with 56 additions and 7 deletions

View File

@ -366,17 +366,18 @@ def stream_trimmer_loop():
_slot = systems[system].STATUS[slot] _slot = systems[system].STATUS[slot]
# RX slot check # RX slot check
if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 60: if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5:
_slot['RX_TYPE'] = HBPF_SLT_VTERM _slot['RX_TYPE'] = HBPF_SLT_VTERM
logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \
system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
if CONFIG['REPORTS']['REPORT']: if CONFIG['REPORTS']['REPORT']:
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore'))
#Null stream_id - for loop control #Null stream_id - for loop control
if _slot['RX_TIME'] < _now - 60:
_slot['RX_STREAM_ID'] = b'\x00' _slot['RX_STREAM_ID'] = b'\x00'
# TX slot check # TX slot check
if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 60: if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5:
_slot['TX_TYPE'] = HBPF_SLT_VTERM _slot['TX_TYPE'] = HBPF_SLT_VTERM
logger.info('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ logger.info('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \
system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START'])
@ -417,6 +418,13 @@ def stream_trimmer_loop():
if CONFIG['REPORTS']['REPORT']: if CONFIG['REPORTS']['REPORT']:
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore'))
removed = systems[system].STATUS.pop(stream_id) removed = systems[system].STATUS.pop(stream_id)
try:
for tgid in _sysconfig['_bcsq']:
if sysconfig['_bcsq'][tgid] == stream_id:
removed = sysconfig['_bcsq'].pop(tgid)
except KeyError:
pass
else: else:
logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
@ -1072,6 +1080,12 @@ class routerOBP(OPENBRIDGE):
#We want to ignore this system and TS combination if it's called again for this packet #We want to ignore this system and TS combination if it's called again for this packet
_sysIgnore.append((_target['SYSTEM'],_target['TS'])) _sysIgnore.append((_target['SYSTEM'],_target['TS']))
if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_dst_id] == _stream_id):
#logger.info('(%s) Conference Bridge: %s, is Source Quenched for Stream ID: %s, skipping system: %s TS: %s, TGID: %s', self._system, _bridge, int_id(_stream_id), _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
continue
# Is this a new call stream on the target? # Is this a new call stream on the target?
if (_stream_id not in _target_status): if (_stream_id not in _target_status):
# This is a new call stream on the target # This is a new call stream on the target
@ -1261,8 +1275,9 @@ class routerOBP(OPENBRIDGE):
else: else:
if '_fin' in self.STATUS[_stream_id]: if '_fin' in self.STATUS[_stream_id] and '_finlog' not in self.STATUS[_stream_id]:
logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id))
self.STATUS[_stream_id]['_finlog'] = True
return return
# Loop Control # Loop Control
@ -1298,6 +1313,13 @@ class routerOBP(OPENBRIDGE):
logger.warning("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) logger.warning("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id))
self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LOOPLOG'] = True
self.STATUS[_stream_id]['LAST'] = pkt_time self.STATUS[_stream_id]['LAST'] = pkt_time
if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_stream_id]:
systems[self._system].send_bcsq(_dst_id,_stream_id)
logger.warning("(%s) OBP *BridgeControl* This system Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id))
self.STATUS[_stream_id]['_bcsq'] = True
return return
@ -1317,6 +1339,7 @@ class routerOBP(OPENBRIDGE):
#if _bridge[0:1] != '#': #if _bridge[0:1] != '#':
#if True: #if True:
for _system in BRIDGES[_bridge]: for _system in BRIDGES[_bridge]:
if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True:
_sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore) _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore)

View File

@ -72,6 +72,7 @@ DMRA = b'DMRA'
#Bridge Control commands #Bridge Control commands
BC = b'BC' BC = b'BC'
BCKA = b'BCKA' BCKA = b'BCKA'
BCSQ = b'BCSQ'
# Higheset peer ID permitted by HBP # Higheset peer ID permitted by HBP
PEER_MAX = 4294967295 PEER_MAX = 4294967295

View File

@ -116,7 +116,7 @@ class OPENBRIDGE(DatagramProtocol):
def startProtocol(self): def startProtocol(self):
if self._config['ENHANCED_OBP']: if self._config['ENHANCED_OBP']:
self._bcka = task.LoopingCall(self.send_bcka) self._bcka = task.LoopingCall(self.send_bcka())
self._bcka = self._bcka.start(10) self._bcka = self._bcka.start(10)
def dereg(self): def dereg(self):
@ -132,7 +132,7 @@ class OPENBRIDGE(DatagramProtocol):
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
#logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet)) #logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet))
else: else:
logger.error('(%s) OpenBridge system was asked to send non DMRD packet: %s', self._system, _packet) logger.error('(%s) OpenBridge system was asked to send non DMRD packet with send_system(): %s', self._system, _packet)
def send_bcka(self): def send_bcka(self):
_packet = BCKA _packet = BCKA
@ -140,6 +140,13 @@ class OPENBRIDGE(DatagramProtocol):
self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT']))
logger.debug('(%s) *BridgeControl* sent Keep Alive',self._system) logger.debug('(%s) *BridgeControl* sent Keep Alive',self._system)
def send_bcsq(self,_tgid,_stream_id):
_packet = b''.join([BCSQ, _tgid, _stream_id])
_packet = b''.join([_packet, (hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest())])
self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT']))
logger.debug('(%s) *BridgeControl* sent Source Quench, TG: %s, Stream ID: %s',self._system,int_id(_tgid), int_id(_stream_id))
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
pass pass
#print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id)) #print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id))
@ -209,6 +216,7 @@ class OPENBRIDGE(DatagramProtocol):
if self._config['ENHANCED_OBP']: if self._config['ENHANCED_OBP']:
if _packet[:2] == BC: # Bridge Control packet (Extended OBP) if _packet[:2] == BC: # Bridge Control packet (Extended OBP)
#Keep Alive
if _packet[:4] == BCKA: if _packet[:4] == BCKA:
#_data = _packet[:53] #_data = _packet[:53]
_hash = _packet[4:] _hash = _packet[4:]
@ -218,7 +226,7 @@ class OPENBRIDGE(DatagramProtocol):
self._config['_bcka'] = time() self._config['_bcka'] = time()
if _sockaddr != self._config['TARGET_SOCK']: if _sockaddr != self._config['TARGET_SOCK']:
h,p = _sockaddr h,p = _sockaddr
logger.info('(%s) *BridgeControl* Source IP and Port has changed for OBP from %s:%s to %s:%s, updating',self._system,self._config['TARGET_IP'],self._config['TARGET_port'],h,p) logger.info('(%s) *BridgeControl* Source IP and Port has changed for OBP from %s:%s to %s:%s, updating',self._system,self._config['TARGET_IP'],self._config['TARGET_PORT'],h,p)
self._config['TARGET_IP'] = h self._config['TARGET_IP'] = h
self._config['TARGET_PORT'] = p self._config['TARGET_PORT'] = p
self._config['TARGET_SOCK'] = (h,p) self._config['TARGET_SOCK'] = (h,p)
@ -226,6 +234,23 @@ class OPENBRIDGE(DatagramProtocol):
else: else:
h,p = _sockaddr h,p = _sockaddr
logger.info('(%s) *BridgeControl* BCKA invalid KeepAlive, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p) logger.info('(%s) *BridgeControl* BCKA invalid KeepAlive, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p)
#Source Quench
if _packet[:4] == BCSQ:
#_data = _packet[:11]
_hash = _packet[11:]
_tgid = _packet[4:7]
_stream_id = _packet[7:11]
_ckhs = hmac_new(self._config['PASSPHRASE'],_packet[:11],sha1).digest()
if compare_digest(_hash, _ckhs):
logger.info('(%s) *BridgeControl* Source Quench request received for TGID: %s, Stream ID: %s',self._system,int_id(_tgid), int_id(_stream_id))
if '_bcsq' not in self._config:
self._config['_bcsq'] = {}
self._config['_bcsq'][_tgid] = _stream_id
else:
h,p = _sockaddr
logger.info('(%s) *BridgeControl* BCSQ invalid Source Quench, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p)
#************************************************ #************************************************
# HB MASTER CLASS # HB MASTER CLASS