From 8acaded9dcee6b3071e3685d3aa042afe56d586d Mon Sep 17 00:00:00 2001 From: n0mjs710 Date: Sun, 11 Nov 2018 15:17:35 -0600 Subject: [PATCH] Added logging, and fixed instance STATUS dictionary for HBP systems - Start time was incorrectly recorded. --- hb_confbridge.py | 231 +++++++++++++++++++++++++---------------------- 1 file changed, 123 insertions(+), 108 deletions(-) diff --git a/hb_confbridge.py b/hb_confbridge.py index 19b6b46..1fffb59 100755 --- a/hb_confbridge.py +++ b/hb_confbridge.py @@ -57,40 +57,39 @@ import cPickle as pickle # Does anybody read this stuff? There's a PEP somewhere that says I should do this. __author__ = 'Cortney T. Buffington, N0MJS' -__copyright__ = 'Copyright (c) 2016-2018PEER Cortney T. Buffington, N0MJS and the K0USY Group' +__copyright__ = 'Copyright (c) 2016-2018 Cortney T. Buffington, N0MJS and the K0USY Group' __credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4IRR; Jonathan Naylor, G4KLX; Hans Barthen, DL5DI; Torsten Shultze, DG1HT' __license__ = 'GNU GPLv3' __maintainer__ = 'Cort Buffington, N0MJS' __email__ = 'n0mjs@me.com' -__status__ = 'pre-alpha' # Module gobal varaibles # Timed loop used for reporting HBP status # # REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE -def config_reports(_config, _logger, _factory): +def config_reports(_config, _logger, _factory): if True: #_config['REPORTS']['REPORT']: def reporting_loop(_logger, _server): _logger.debug('Periodic reporting loop started') _server.send_config() _server.send_bridge() - + _logger.info('HBlink TCP reporting server configured') - + report_server = _factory(_config, _logger) report_server.clients = [] reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server) - + reporting = task.LoopingCall(reporting_loop, _logger, report_server) reporting.start(_config['REPORTS']['REPORT_INTERVAL']) - + return report_server # Import Bridging rules # Note: A stanza *must* exist for any MASTER or CLIENT configured in the main -# configuration file and listed as "active". It can be empty, +# configuration file and listed as "active". It can be empty, # but it has to exist. def make_bridges(_hb_confbridge_bridges): try: @@ -98,14 +97,14 @@ def make_bridges(_hb_confbridge_bridges): logger.info('Routing bridges file found and bridges imported') except ImportError: sys.exit('Routing bridges file not found or invalid') - + # Convert integer GROUP ID numbers from the config into hex strings # we need to send in the actual data packets. for _bridge in bridge_file.BRIDGES: for _system in bridge_file.BRIDGES[_bridge]: if _system['SYSTEM'] not in CONFIG['SYSTEMS']: sys.exit('ERROR: Conference bridges found for system not configured main configuration') - + _system['TGID'] = hex_str_3(_system['TGID']) for i, e in enumerate(_system['ON']): _system['ON'][i] = hex_str_3(_system['ON'][i]) @@ -133,7 +132,7 @@ def build_acl(_sub_acl): ACL_ACTION = sections[0] entries_str = sections[1] - + for entry in entries_str.split(','): if '-' in entry: start,end = entry.split('-') @@ -143,9 +142,9 @@ def build_acl(_sub_acl): else: id = int(entry) ACL.add(hex_str_3(id)) - + logger.info('ACL loaded: action "{}" for {:,} radio IDs'.format(ACL_ACTION, len(ACL))) - + except ImportError: logger.info('ACL file not found or invalid - all subscriber IDs are valid') ACL_ACTION = 'NONE' @@ -168,13 +167,13 @@ def build_acl(_sub_acl): else: def allow_sub(_sub): return True - + return ACL # Run this every minute for rule timer updates def rule_timer_loop(): - logger.info('(ALL HBSYSTEMS) Rule timer loop started') + logger.debug('(ALL HBSYSTEMS) Rule timer loop started') _now = time() for _bridge in BRIDGES: @@ -210,26 +209,36 @@ def rule_timer_loop(): def stream_trimmer_loop(): logger.debug('(ALL OPENBRIDGE SYSTEMS) Trimming inactive stream IDs from system lists') _now = time() - + for system in systems: remove_list = [] if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': for stream_id in systems[system].STATUS: if systems[system].STATUS[stream_id]['LAST'] < _now - 5: - remove_list.append(stream_id) - for stream_id in remove_list: + _system = systems[system].STATUS[stream_id] + _config = CONFIG['SYSTEMS'][system] + logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \ + system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START']) + # self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration)) removed = systems[system].STATUS.pop(stream_id) logger.debug('Inactive OpenBridge Stream ID removed from System: %s, Stream ID %s', system, int_id(stream_id)) + for system in systems: + if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for slot in range(1,3): + _slot = systems[system].STATUS[slot] + if _slot['RX_TYPE'] != hb_const.HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: + _slot['RX_TYPE'] = hb_const.HBPF_SLT_VTERM + logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s (%s) TGID %s (%s), TS %s, Duration: %s', \ + system, int_id(_slot['RX_STREAM_ID']), get_alias(_slot['RX_RFS'], subscriber_ids), int_id(_slot['RX_RFS']), get_alias(_slot['RX_TGID'], talkgroup_ids), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) - class routerOBP(OPENBRIDGE): - + def __init__(self, _name, _config, _logger, _report): OPENBRIDGE.__init__(self, _name, _config, _logger, _report) self.STATUS = {} - + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() @@ -237,14 +246,14 @@ class routerOBP(OPENBRIDGE): _bits = int_id(_data[15]) if _call_type == 'group': - + # Check for ACL match, and return if the subscriber is not allowed if allow_sub(_rf_src) == False: self._logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_peer_id), int_id(_dst_id)) return - - # Is this a new call stream? - if (_stream_id not in self.STATUS): + + # Is this a new call stream? + if (_stream_id not in self.STATUS): # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, @@ -257,33 +266,33 @@ class routerOBP(OPENBRIDGE): if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_stream_id]['LC'] = decoded['LC'] - + # If we don't have a voice header then don't wait to decode the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: self.STATUS[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src - - + + self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,START,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id))) - - + + self.STATUS[_stream_id]['LAST'] = pkt_time for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: - + if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True): - + for _target in BRIDGES[_bridge]: if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']): _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] if _target_system['MODE'] == 'OPENBRIDGE': - # Is this a new call stream on the target? + # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target _target_status[_stream_id] = { @@ -297,26 +306,26 @@ class routerOBP(OPENBRIDGE): decoded = decode.voice_head_term(dmrpkt) _target_status[_stream_id]['LC'] = decoded['LC'] self._logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) - + # If we don't have a voice header then don't wait to decode the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: _target_status[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src self._logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) - + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(_target_status[_stream_id]['LC']) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(_target_status[_stream_id]['LC']) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC']) self._logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - # Record the time of this packet so we can later identify a stale stream + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) - + # Assemble transmit HBP packet header _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20] - + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -333,7 +342,7 @@ class routerOBP(OPENBRIDGE): dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = _tmp_data + dmrpkt #+ _data[53:55] - + else: # BEGIN CONTENTION HANDLING # @@ -364,11 +373,11 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - + # Set values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time - - if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): + + if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): # Record the DST TGID and Stream ID _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id @@ -380,16 +389,16 @@ class routerOBP(OPENBRIDGE): _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) self._logger.info('(%s) Conference Bridge: %s, Call Bridged HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - + # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - + # Assemble transmit HBP packet header _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20] - + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -406,13 +415,13 @@ class routerOBP(OPENBRIDGE): dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = _tmp_data + dmrpkt + _data[53:55] - + # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data) #self._logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - - - + + + # Final actions - Is this a voice terminator? if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM): call_duration = pkt_time - self.STATUS[_stream_id]['START'] @@ -426,16 +435,19 @@ class routerOBP(OPENBRIDGE): self_logger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id)) class routerHBP(HBSYSTEM): - + def __init__(self, _name, _config, _logger, _report): HBSYSTEM.__init__(self, _name, _config, _logger, _report) - + # Status information for the system, TS1 & TS2 # 1 & 2 are "timeslot" # In TX_EMB_LC, 2-5 are burst B-E self.STATUS = { 1: { + 'RX_ACTIVE': False, + 'TX_ACTIVE': False, 'RX_START': time(), + 'TX_START': time(), 'RX_SEQ': '\x00', 'RX_RFS': '\x00', 'TX_RFS': '\x00', @@ -457,7 +469,10 @@ class routerHBP(HBSYSTEM): } }, 2: { + 'RX_ACTIVE': False, + 'TX_ACTIVE': False, 'RX_START': time(), + 'TX_START': time(), 'RX_SEQ': '\x00', 'RX_RFS': '\x00', 'TX_RFS': '\x00', @@ -486,30 +501,30 @@ class routerHBP(HBSYSTEM): _bits = int_id(_data[15]) if _call_type == 'group': - + # Check for ACL match, and return if the subscriber is not allowed if allow_sub(_rf_src) == False: self._logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_peer_id), int_id(_dst_id)) return - - # Is this a new call stream? + + # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): if (self.STATUS[_slot]['RX_TYPE'] != hb_const.HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + hb_const.STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): self._logger.warning('(%s) Packet received with STREAM ID: %s SUB: %s PEER: %s TGID %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) return - + # This is a new call stream - self.STATUS['RX_START'] = pkt_time + self.STATUS[_slot]['RX_START'] = pkt_time self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,START,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id))) - + # If we can, use the LC from the voice header as to keep all options intact if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_slot]['RX_LC'] = decoded['LC'] - + # If we don't have a voice header then don't wait to decode it from the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: @@ -517,17 +532,17 @@ class routerHBP(HBSYSTEM): for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: - + if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True): - + for _target in BRIDGES[_bridge]: if _target['SYSTEM'] != self._system: if _target['ACTIVE']: _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] - + if _target_system['MODE'] == 'OPENBRIDGE': - # Is this a new call stream on the target? + # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target _target_status[_stream_id] = { @@ -541,26 +556,26 @@ class routerHBP(HBSYSTEM): decoded = decode.voice_head_term(dmrpkt) _target_status[_stream_id]['LC'] = decoded['LC'] self._logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) - + # If we don't have a voice header then don't wait to decode the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: _target_status[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src self._logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) - + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(_target_status[_stream_id]['LC']) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(_target_status[_stream_id]['LC']) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC']) self._logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - - # Record the time of this packet so we can later identify a stale stream + + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) - + # Assemble transmit HBP packet header _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20] - + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -604,11 +619,11 @@ class routerHBP(HBSYSTEM): if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _seq: self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - + # Set values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time - - if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): + + if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): # Record the DST TGID and Stream ID _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id @@ -620,16 +635,16 @@ class routerHBP(HBSYSTEM): _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - + # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - + # Assemble transmit HBP packet header _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20] - + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -646,36 +661,36 @@ class routerHBP(HBSYSTEM): dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = _tmp_data + dmrpkt + _data[53:55] - + # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data) #self._logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - - - + + + # Final actions - Is this a voice terminator? if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != hb_const.HBPF_SLT_VTERM): - call_duration = pkt_time - self.STATUS['RX_START'] + call_duration = pkt_time - self.STATUS[_slot]['RX_START'] self._logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration)) - + # # Begin in-band signalling for call end. This has nothign to do with routing traffic directly. # - + # Iterate the rules dictionary - + for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: if _system['SYSTEM'] == self._system: - + # TGID matches a rule source, reset its timer if _slot == _system['TS'] and _dst_id == _system['TGID'] and ((_system['TO_TYPE'] == 'ON' and (_system['ACTIVE'] == True)) or (_system['TO_TYPE'] == 'OFF' and _system['ACTIVE'] == False)): _system['TIMER'] = pkt_time + _system['TIMEOUT'] self._logger.info('(%s) Transmission match for Bridge: %s. Reset timeout to %s', self._system, _bridge, _system['TIMER']) - + # TGID matches an ACTIVATION trigger if (_dst_id in _system['ON'] or _dst_id in _system['RESET']) and _slot == _system['TS']: # Set the matching rule as ACTIVE @@ -692,7 +707,7 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON': _system['TIMER'] = pkt_time + _system['TIMEOUT'] self._logger.info('(%s) Bridge: %s, timeout timer reset to: %s', self._system, _bridge, _system['TIMER'] - pkt_time) - + # TGID matches an DE-ACTIVATION trigger if (_dst_id in _system['OFF'] or _dst_id in _system['RESET']) and _slot == _system['TS']: # Set the matching rule as ACTIVE @@ -713,11 +728,11 @@ class routerHBP(HBSYSTEM): _system['TIMER'] = pkt_time self._logger.info('(%s) Bridge: %s set to ON with and "OFF" timer rule: timeout timer cancelled', self._system, _bridge) - # + # # END IN-BAND SIGNALLING # - - + + # Mark status variables for use later self.STATUS[_slot]['RX_SEQ'] = _seq self.STATUS[_slot]['RX_RFS'] = _rf_src @@ -725,16 +740,16 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TGID'] = _dst_id self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id - + # # Socket-based reporting section # class confbridgeReportFactory(reportFactory): - + def send_bridge(self): serialized = pickle.dumps(BRIDGES, protocol=pickle.HIGHEST_PROTOCOL) self.send_clients(REPORT_OPCODES['BRIDGE_SND']+serialized) - + def send_bridgeEvent(self, _data): self.send_clients(REPORT_OPCODES['BRDG_EVENT']+_data) @@ -744,13 +759,13 @@ class confbridgeReportFactory(reportFactory): #************************************************ if __name__ == '__main__': - + import argparse import sys import os import signal from dmr_utils.utils import try_download, mk_id_dict - + # Change the current directory to the location of the application os.chdir(os.path.dirname(os.path.realpath(sys.argv[0]))) @@ -766,27 +781,27 @@ if __name__ == '__main__': # Call the external routine to build the configuration dictionary CONFIG = hb_config.build_config(cli_args.CONFIG_FILE) - + # Start the system logger if cli_args.LOG_LEVEL: CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL logger = hb_log.config_logging(CONFIG['LOGGER']) logger.debug('Logging system started, anything from here on gets logged') - + # Set up the signal handler def sig_handler(_signal, _frame): logger.info('SHUTDOWN: HBROUTER IS TERMINATING WITH SIGNAL %s', str(_signal)) hblink_handler(_signal, _frame, logger) logger.info('SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') reactor.stop() - + # Set signal handers so that we can gracefully exit if need be for sig in [signal.SIGTERM, signal.SIGINT]: signal.signal(sig, sig_handler) - + # Build the Access Control List REG_ACL = build_reg_acl('reg_acl', logger) - + # ID ALIAS CREATION # Download if CONFIG['ALIASES']['TRY_DOWNLOAD'] == True: @@ -796,32 +811,32 @@ if __name__ == '__main__': # Try updating subscriber aliases file result = try_download(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['SUBSCRIBER_FILE'], CONFIG['ALIASES']['SUBSCRIBER_URL'], CONFIG['ALIASES']['STALE_TIME']) logger.info(result) - + # Make Dictionaries peer_ids = mk_id_dict(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['PEER_FILE']) if peer_ids: logger.info('ID ALIAS MAPPER: peer_ids dictionary is available') - + subscriber_ids = mk_id_dict(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['SUBSCRIBER_FILE']) if subscriber_ids: logger.info('ID ALIAS MAPPER: subscriber_ids dictionary is available') - + talkgroup_ids = mk_id_dict(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['TGID_FILE']) if talkgroup_ids: logger.info('ID ALIAS MAPPER: talkgroup_ids dictionary is available') - + # Build the routing rules file BRIDGES = make_bridges('hb_confbridge_rules') - + # Build the Access Control List ACL = build_acl('sub_acl') - + # Build the Registration Access Control List REG_ACL = build_reg_acl('reg_acl', logger) - + # INITIALIZE THE REPORTING LOOP report_server = config_reports(CONFIG, logger, confbridgeReportFactory) - + # HBlink instance creation logger.info('HBlink \'hb_router.py\' (c) 2016 N0MJS & the K0USY Group - SYSTEM STARTING...') for system in CONFIG['SYSTEMS']: @@ -832,11 +847,11 @@ if __name__ == '__main__': systems[system] = routerHBP(system, CONFIG, logger, report_server) reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) - + # Initialize the rule timer -- this if for user activated stuff rule_timer = task.LoopingCall(rule_timer_loop) rule_timer.start(60) - + # Initialize the stream trimmer stream_trimmer = task.LoopingCall(stream_trimmer_loop) stream_trimmer.start(5)