Finished stream detection for bridge.py
This commit is contained in:
parent
35b1092504
commit
da55a4ec63
77
bridge.py
77
bridge.py
@ -148,7 +148,7 @@ def rule_timer_loop():
|
|||||||
report_server.send_clients(b'bridge updated')
|
report_server.send_clients(b'bridge updated')
|
||||||
|
|
||||||
|
|
||||||
# run this every 10 seconds to trim orphaned stream ids
|
## run this every 10 seconds to trim orphaned stream ids
|
||||||
def stream_trimmer_loop():
|
def stream_trimmer_loop():
|
||||||
logger.debug('(ROUTER) Trimming inactive stream IDs from system lists')
|
logger.debug('(ROUTER) Trimming inactive stream IDs from system lists')
|
||||||
_now = time()
|
_now = time()
|
||||||
@ -166,6 +166,9 @@ def stream_trimmer_loop():
|
|||||||
system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
|
system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore'))
|
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore'))
|
||||||
|
#Null stream_id - for loop control
|
||||||
|
if _slot['RX_TIME'] < _now - 60:
|
||||||
|
_slot['RX_STREAM_ID'] = b'\x00'
|
||||||
|
|
||||||
# TX slot check
|
# TX slot check
|
||||||
if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5:
|
if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5:
|
||||||
@ -179,23 +182,60 @@ def stream_trimmer_loop():
|
|||||||
# We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later
|
# We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later
|
||||||
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
|
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
|
||||||
remove_list = []
|
remove_list = []
|
||||||
|
fin_list = []
|
||||||
for stream_id in systems[system].STATUS:
|
for stream_id in systems[system].STATUS:
|
||||||
try:
|
|
||||||
if systems[system].STATUS[stream_id]['LAST'] < _now - 5:
|
#if stream already marked as finished, just remove it
|
||||||
remove_list.append(stream_id)
|
if '_fin' in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 180:
|
||||||
except:
|
logger.info('(%s) *FINISHED STREAM* STREAM ID: %s',system, int_id(stream_id))
|
||||||
logger.debug("(%s) Keyerror - stream trimmer Stream ID: %s Start: %s Contention: %s RFS: %s TGID: %s",stream_id,systems[system].STATUS[stream_id]['START'],systems[system].STATUS[stream_id]['CONTENTION'],systems[system].STATUS[stream_id]['RFS'],int_id(systems[system].STATUS[stream_id]['TGID']))
|
fin_list.append(stream_id)
|
||||||
systems[system].STATUS[stream_id]['LAST'] = _now
|
|
||||||
continue
|
continue
|
||||||
for stream_id in remove_list:
|
|
||||||
if stream_id in systems[system].STATUS:
|
#try:
|
||||||
|
if '_to' not in systems[system].STATUS[stream_id] and '_fin' not in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 5:
|
||||||
_stream = systems[system].STATUS[stream_id]
|
_stream = systems[system].STATUS[stream_id]
|
||||||
_sysconfig = CONFIG['SYSTEMS'][system]
|
_sysconfig = CONFIG['SYSTEMS'][system]
|
||||||
|
#systems[system].STATUS[stream_id]['_fin'] = True
|
||||||
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \
|
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \
|
||||||
system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START'])
|
system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START'])
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore'))
|
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore'))
|
||||||
|
systems[system].STATUS[stream_id]['_to'] = True
|
||||||
|
continue
|
||||||
|
#except:
|
||||||
|
#logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id)
|
||||||
|
#systems[system].STATUS[stream_id]['LAST'] = _now
|
||||||
|
#continue
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
if systems[system].STATUS[stream_id]['LAST'] < _now - 180:
|
||||||
|
remove_list.append(stream_id)
|
||||||
|
except:
|
||||||
|
logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id)
|
||||||
|
systems[system].STATUS[stream_id]['LAST'] = _now
|
||||||
|
continue
|
||||||
|
|
||||||
|
#remove finished
|
||||||
|
for stream_id in fin_list:
|
||||||
|
removed = systems[system].STATUS.pop(stream_id)
|
||||||
|
|
||||||
|
for stream_id in remove_list:
|
||||||
|
if stream_id in systems[system].STATUS:
|
||||||
|
_stream = systems[system].STATUS[stream_id]
|
||||||
|
_sysconfig = CONFIG['SYSTEMS'][system]
|
||||||
|
|
||||||
removed = systems[system].STATUS.pop(stream_id)
|
removed = systems[system].STATUS.pop(stream_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
_bcsq_remove = []
|
||||||
|
for tgid in _sysconfig['_bcsq']:
|
||||||
|
if _sysconfig['_bcsq'][tgid] == stream_id:
|
||||||
|
_bcsq_remove.append(tgid)
|
||||||
|
for bcrm in _bcsq_remove:
|
||||||
|
removed = _sysconfig['_bcsq'].pop(bcrm)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
|
logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
|
||||||
|
|
||||||
@ -242,7 +282,15 @@ class routerOBP(OPENBRIDGE):
|
|||||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
|
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
self._report.send_bridgeEvent('GROUP VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore'))
|
self._report.send_bridgeEvent('GROUP VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore'))
|
||||||
else:
|
else:
|
||||||
|
|
||||||
|
#Finished stream handling#
|
||||||
|
if '_fin' in self.STATUS[_stream_id]:
|
||||||
|
if '_finlog' not in self.STATUS[_stream_id]:
|
||||||
|
logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id))
|
||||||
|
self.STATUS[_stream_id]['_finlog'] = True
|
||||||
|
return
|
||||||
|
|
||||||
#Duplicate handling#
|
#Duplicate handling#
|
||||||
#Duplicate complete packet
|
#Duplicate complete packet
|
||||||
if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1:
|
if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1:
|
||||||
@ -420,10 +468,11 @@ class routerOBP(OPENBRIDGE):
|
|||||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
|
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
|
||||||
if CONFIG['REPORTS']['REPORT']:
|
if CONFIG['REPORTS']['REPORT']:
|
||||||
self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
|
self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
|
||||||
removed = self.STATUS.pop(_stream_id)
|
self.STATUS[_stream_id]['_fin'] = True
|
||||||
logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id))
|
#removed = self.STATUS.pop(_stream_id)
|
||||||
if not removed:
|
#logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id))
|
||||||
selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id))
|
#if not removed:
|
||||||
|
#selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id))
|
||||||
|
|
||||||
#Reset sequence number
|
#Reset sequence number
|
||||||
self._lastSeq = False
|
self._lastSeq = False
|
||||||
|
Loading…
Reference in New Issue
Block a user