From da55a4ec63677b186f34a7824b6da425bdb4a4a2 Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 20 Apr 2021 23:18:19 +0100 Subject: [PATCH] Finished stream detection for bridge.py --- bridge.py | 77 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/bridge.py b/bridge.py index ec7aab7..7b4da59 100755 --- a/bridge.py +++ b/bridge.py @@ -148,7 +148,7 @@ def rule_timer_loop(): report_server.send_clients(b'bridge updated') -# run this every 10 seconds to trim orphaned stream ids +## run this every 10 seconds to trim orphaned stream ids def stream_trimmer_loop(): logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') _now = time() @@ -166,6 +166,9 @@ def stream_trimmer_loop(): system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) + #Null stream_id - for loop control + if _slot['RX_TIME'] < _now - 60: + _slot['RX_STREAM_ID'] = b'\x00' # TX slot check if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: @@ -179,23 +182,60 @@ def stream_trimmer_loop(): # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': remove_list = [] + fin_list = [] for stream_id in systems[system].STATUS: - try: - if systems[system].STATUS[stream_id]['LAST'] < _now - 5: - remove_list.append(stream_id) - except: - logger.debug("(%s) Keyerror - stream trimmer Stream ID: %s Start: %s Contention: %s RFS: %s TGID: %s",stream_id,systems[system].STATUS[stream_id]['START'],systems[system].STATUS[stream_id]['CONTENTION'],systems[system].STATUS[stream_id]['RFS'],int_id(systems[system].STATUS[stream_id]['TGID'])) - systems[system].STATUS[stream_id]['LAST'] = _now + + #if stream already marked as finished, just remove it + if '_fin' in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 180: + logger.info('(%s) *FINISHED STREAM* STREAM ID: %s',system, int_id(stream_id)) + fin_list.append(stream_id) continue - for stream_id in remove_list: - if stream_id in systems[system].STATUS: + + #try: + if '_to' not in systems[system].STATUS[stream_id] and '_fin' not in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 5: _stream = systems[system].STATUS[stream_id] _sysconfig = CONFIG['SYSTEMS'][system] + #systems[system].STATUS[stream_id]['_fin'] = True logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) + systems[system].STATUS[stream_id]['_to'] = True + continue + #except: + #logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id) + #systems[system].STATUS[stream_id]['LAST'] = _now + #continue + + + try: + if systems[system].STATUS[stream_id]['LAST'] < _now - 180: + remove_list.append(stream_id) + except: + logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id) + systems[system].STATUS[stream_id]['LAST'] = _now + continue + + #remove finished + for stream_id in fin_list: + removed = systems[system].STATUS.pop(stream_id) + + for stream_id in remove_list: + if stream_id in systems[system].STATUS: + _stream = systems[system].STATUS[stream_id] + _sysconfig = CONFIG['SYSTEMS'][system] + removed = systems[system].STATUS.pop(stream_id) + + try: + _bcsq_remove = [] + for tgid in _sysconfig['_bcsq']: + if _sysconfig['_bcsq'][tgid] == stream_id: + _bcsq_remove.append(tgid) + for bcrm in _bcsq_remove: + removed = _sysconfig['_bcsq'].pop(bcrm) + except KeyError: + pass else: logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) @@ -242,7 +282,15 @@ class routerOBP(OPENBRIDGE): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - else: + else: + + #Finished stream handling# + if '_fin' in self.STATUS[_stream_id]: + if '_finlog' not in self.STATUS[_stream_id]: + logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) + self.STATUS[_stream_id]['_finlog'] = True + return + #Duplicate handling# #Duplicate complete packet if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: @@ -420,10 +468,11 @@ class routerOBP(OPENBRIDGE): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) - removed = self.STATUS.pop(_stream_id) - logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id)) - if not removed: - selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id)) + self.STATUS[_stream_id]['_fin'] = True + #removed = self.STATUS.pop(_stream_id) + #logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id)) + #if not removed: + #selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id)) #Reset sequence number self._lastSeq = False