OpenBridge Improvements, bug fixes, logging
This commit is contained in:
parent
8acaded9dc
commit
b639f83057
|
@ -211,27 +211,31 @@ def stream_trimmer_loop():
|
||||||
_now = time()
|
_now = time()
|
||||||
|
|
||||||
for system in systems:
|
for system in systems:
|
||||||
remove_list = []
|
# HBP systems, master and peer
|
||||||
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
|
|
||||||
for stream_id in systems[system].STATUS:
|
|
||||||
if systems[system].STATUS[stream_id]['LAST'] < _now - 5:
|
|
||||||
remove_list.append(stream_id)
|
|
||||||
for stream_id in remove_list:
|
|
||||||
_system = systems[system].STATUS[stream_id]
|
|
||||||
_config = CONFIG['SYSTEMS'][system]
|
|
||||||
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \
|
|
||||||
system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START'])
|
|
||||||
# self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration))
|
|
||||||
removed = systems[system].STATUS.pop(stream_id)
|
|
||||||
logger.debug('Inactive OpenBridge Stream ID removed from System: %s, Stream ID %s', system, int_id(stream_id))
|
|
||||||
for system in systems:
|
|
||||||
if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE':
|
if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE':
|
||||||
for slot in range(1,3):
|
for slot in range(1,3):
|
||||||
_slot = systems[system].STATUS[slot]
|
_slot = systems[system].STATUS[slot]
|
||||||
if _slot['RX_TYPE'] != hb_const.HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5:
|
if _slot['RX_TYPE'] != hb_const.HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5:
|
||||||
_slot['RX_TYPE'] = hb_const.HBPF_SLT_VTERM
|
_slot['RX_TYPE'] = hb_const.HBPF_SLT_VTERM
|
||||||
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s (%s) TGID %s (%s), TS %s, Duration: %s', \
|
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %s', \
|
||||||
system, int_id(_slot['RX_STREAM_ID']), get_alias(_slot['RX_RFS'], subscriber_ids), int_id(_slot['RX_RFS']), get_alias(_slot['RX_TGID'], talkgroup_ids), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
|
system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
|
||||||
|
# OBP systems
|
||||||
|
# We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later
|
||||||
|
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
|
||||||
|
remove_list = []
|
||||||
|
for stream_id in systems[system].STATUS:
|
||||||
|
if systems[system].STATUS[stream_id]['LAST'] < _now - 5:
|
||||||
|
remove_list.append(stream_id)
|
||||||
|
for stream_id in remove_list:
|
||||||
|
if stream_id in systems[system].STATUS:
|
||||||
|
_system = systems[system].STATUS[stream_id]
|
||||||
|
_config = CONFIG['SYSTEMS'][system]
|
||||||
|
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \
|
||||||
|
system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START'])
|
||||||
|
# self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration))
|
||||||
|
removed = systems[system].STATUS.pop(stream_id)
|
||||||
|
else:
|
||||||
|
logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
|
||||||
|
|
||||||
class routerOBP(OPENBRIDGE):
|
class routerOBP(OPENBRIDGE):
|
||||||
|
|
||||||
|
@ -374,11 +378,11 @@ class routerOBP(OPENBRIDGE):
|
||||||
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Set values for the contention handler to test next time there is a frame to forward
|
# Is this a new call stream?
|
||||||
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): #(_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||||
|
#if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||||
if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
|
||||||
# Record the DST TGID and Stream ID
|
# Record the DST TGID and Stream ID
|
||||||
|
_target_status[_target['TS']]['TX_START'] = pkt_time
|
||||||
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
||||||
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
||||||
_target_status[_target['TS']]['TX_RFS'] = _rf_src
|
_target_status[_target['TS']]['TX_RFS'] = _rf_src
|
||||||
|
@ -387,8 +391,11 @@ class routerOBP(OPENBRIDGE):
|
||||||
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
|
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
|
||||||
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
|
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
|
||||||
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
||||||
self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
|
|
||||||
|
# Set other values for the contention handler to test next time there is a frame to forward
|
||||||
|
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
||||||
|
|
||||||
# Handle any necessary re-writes for the destination
|
# Handle any necessary re-writes for the destination
|
||||||
if _system['TS'] != _target['TS']:
|
if _system['TS'] != _target['TS']:
|
||||||
|
@ -444,8 +451,6 @@ class routerHBP(HBSYSTEM):
|
||||||
# In TX_EMB_LC, 2-5 are burst B-E
|
# In TX_EMB_LC, 2-5 are burst B-E
|
||||||
self.STATUS = {
|
self.STATUS = {
|
||||||
1: {
|
1: {
|
||||||
'RX_ACTIVE': False,
|
|
||||||
'TX_ACTIVE': False,
|
|
||||||
'RX_START': time(),
|
'RX_START': time(),
|
||||||
'TX_START': time(),
|
'TX_START': time(),
|
||||||
'RX_SEQ': '\x00',
|
'RX_SEQ': '\x00',
|
||||||
|
@ -469,8 +474,6 @@ class routerHBP(HBSYSTEM):
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
2: {
|
2: {
|
||||||
'RX_ACTIVE': False,
|
|
||||||
'TX_ACTIVE': False,
|
|
||||||
'RX_START': time(),
|
'RX_START': time(),
|
||||||
'TX_START': time(),
|
'TX_START': time(),
|
||||||
'RX_SEQ': '\x00',
|
'RX_SEQ': '\x00',
|
||||||
|
@ -620,21 +623,23 @@ class routerHBP(HBSYSTEM):
|
||||||
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Set values for the contention handler to test next time there is a frame to forward
|
# Is this a new call stream?
|
||||||
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
|
||||||
|
|
||||||
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||||
# Record the DST TGID and Stream ID
|
# Record the DST TGID and Stream ID
|
||||||
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
_target_status[_target['TS']]['TX_START'] = pkt_time
|
||||||
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
||||||
_target_status[_target['TS']]['TX_RFS'] = _rf_src
|
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
||||||
# Generate LCs (full and EMB) for the TX stream
|
_target_status[_target['TS']]['TX_RFS'] = _rf_src
|
||||||
dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src
|
# Generate LCs (full and EMB) for the TX stream
|
||||||
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
|
dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + _target['TGID'] + _rf_src
|
||||||
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
|
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
|
||||||
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
|
||||||
self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
||||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
|
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||||
|
|
||||||
|
# Set other values for the contention handler to test next time there is a frame to forward
|
||||||
|
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
||||||
|
|
||||||
# Handle any necessary re-writes for the destination
|
# Handle any necessary re-writes for the destination
|
||||||
if _system['TS'] != _target['TS']:
|
if _system['TS'] != _target['TS']:
|
||||||
|
@ -848,12 +853,18 @@ if __name__ == '__main__':
|
||||||
reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP'])
|
reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP'])
|
||||||
logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])
|
logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])
|
||||||
|
|
||||||
|
def loopingErrHandle(failure):
|
||||||
|
logger.error('STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure)
|
||||||
|
reactor.stop()
|
||||||
|
|
||||||
# Initialize the rule timer -- this if for user activated stuff
|
# Initialize the rule timer -- this if for user activated stuff
|
||||||
rule_timer = task.LoopingCall(rule_timer_loop)
|
rule_timer_task = task.LoopingCall(rule_timer_loop)
|
||||||
rule_timer.start(60)
|
rule_timer = rule_timer_task.start(60)
|
||||||
|
rule_timer.addErrback(loopingErrHandle)
|
||||||
|
|
||||||
# Initialize the stream trimmer
|
# Initialize the stream trimmer
|
||||||
stream_trimmer = task.LoopingCall(stream_trimmer_loop)
|
stream_trimmer_task = task.LoopingCall(stream_trimmer_loop)
|
||||||
stream_trimmer.start(5)
|
stream_trimmer = stream_trimmer_task.start(5)
|
||||||
|
stream_trimmer.addErrback(loopingErrHandle)
|
||||||
|
|
||||||
reactor.run()
|
reactor.run()
|
||||||
|
|
Loading…
Reference in New Issue