Added logging, and fixed instance STATUS dictionary for HBP systems - Start time was incorrectly recorded.
This commit is contained in:
parent
fe8b2e3595
commit
8acaded9dc
231
hb_confbridge.py
231
hb_confbridge.py
@ -57,40 +57,39 @@ import cPickle as pickle
|
||||
|
||||
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
|
||||
__author__ = 'Cortney T. Buffington, N0MJS'
|
||||
__copyright__ = 'Copyright (c) 2016-2018PEER Cortney T. Buffington, N0MJS and the K0USY Group'
|
||||
__copyright__ = 'Copyright (c) 2016-2018 Cortney T. Buffington, N0MJS and the K0USY Group'
|
||||
__credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4IRR; Jonathan Naylor, G4KLX; Hans Barthen, DL5DI; Torsten Shultze, DG1HT'
|
||||
__license__ = 'GNU GPLv3'
|
||||
__maintainer__ = 'Cort Buffington, N0MJS'
|
||||
__email__ = 'n0mjs@me.com'
|
||||
__status__ = 'pre-alpha'
|
||||
|
||||
# Module gobal varaibles
|
||||
|
||||
# Timed loop used for reporting HBP status
|
||||
#
|
||||
# REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE
|
||||
def config_reports(_config, _logger, _factory):
|
||||
def config_reports(_config, _logger, _factory):
|
||||
if True: #_config['REPORTS']['REPORT']:
|
||||
def reporting_loop(_logger, _server):
|
||||
_logger.debug('Periodic reporting loop started')
|
||||
_server.send_config()
|
||||
_server.send_bridge()
|
||||
|
||||
|
||||
_logger.info('HBlink TCP reporting server configured')
|
||||
|
||||
|
||||
report_server = _factory(_config, _logger)
|
||||
report_server.clients = []
|
||||
reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server)
|
||||
|
||||
|
||||
reporting = task.LoopingCall(reporting_loop, _logger, report_server)
|
||||
reporting.start(_config['REPORTS']['REPORT_INTERVAL'])
|
||||
|
||||
|
||||
return report_server
|
||||
|
||||
|
||||
# Import Bridging rules
|
||||
# Note: A stanza *must* exist for any MASTER or CLIENT configured in the main
|
||||
# configuration file and listed as "active". It can be empty,
|
||||
# configuration file and listed as "active". It can be empty,
|
||||
# but it has to exist.
|
||||
def make_bridges(_hb_confbridge_bridges):
|
||||
try:
|
||||
@ -98,14 +97,14 @@ def make_bridges(_hb_confbridge_bridges):
|
||||
logger.info('Routing bridges file found and bridges imported')
|
||||
except ImportError:
|
||||
sys.exit('Routing bridges file not found or invalid')
|
||||
|
||||
|
||||
# Convert integer GROUP ID numbers from the config into hex strings
|
||||
# we need to send in the actual data packets.
|
||||
for _bridge in bridge_file.BRIDGES:
|
||||
for _system in bridge_file.BRIDGES[_bridge]:
|
||||
if _system['SYSTEM'] not in CONFIG['SYSTEMS']:
|
||||
sys.exit('ERROR: Conference bridges found for system not configured main configuration')
|
||||
|
||||
|
||||
_system['TGID'] = hex_str_3(_system['TGID'])
|
||||
for i, e in enumerate(_system['ON']):
|
||||
_system['ON'][i] = hex_str_3(_system['ON'][i])
|
||||
@ -133,7 +132,7 @@ def build_acl(_sub_acl):
|
||||
ACL_ACTION = sections[0]
|
||||
entries_str = sections[1]
|
||||
|
||||
|
||||
|
||||
for entry in entries_str.split(','):
|
||||
if '-' in entry:
|
||||
start,end = entry.split('-')
|
||||
@ -143,9 +142,9 @@ def build_acl(_sub_acl):
|
||||
else:
|
||||
id = int(entry)
|
||||
ACL.add(hex_str_3(id))
|
||||
|
||||
|
||||
logger.info('ACL loaded: action "{}" for {:,} radio IDs'.format(ACL_ACTION, len(ACL)))
|
||||
|
||||
|
||||
except ImportError:
|
||||
logger.info('ACL file not found or invalid - all subscriber IDs are valid')
|
||||
ACL_ACTION = 'NONE'
|
||||
@ -168,13 +167,13 @@ def build_acl(_sub_acl):
|
||||
else:
|
||||
def allow_sub(_sub):
|
||||
return True
|
||||
|
||||
|
||||
return ACL
|
||||
|
||||
|
||||
# Run this every minute for rule timer updates
|
||||
def rule_timer_loop():
|
||||
logger.info('(ALL HBSYSTEMS) Rule timer loop started')
|
||||
logger.debug('(ALL HBSYSTEMS) Rule timer loop started')
|
||||
_now = time()
|
||||
|
||||
for _bridge in BRIDGES:
|
||||
@ -210,26 +209,36 @@ def rule_timer_loop():
|
||||
def stream_trimmer_loop():
|
||||
logger.debug('(ALL OPENBRIDGE SYSTEMS) Trimming inactive stream IDs from system lists')
|
||||
_now = time()
|
||||
|
||||
|
||||
for system in systems:
|
||||
remove_list = []
|
||||
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
|
||||
for stream_id in systems[system].STATUS:
|
||||
if systems[system].STATUS[stream_id]['LAST'] < _now - 5:
|
||||
|
||||
remove_list.append(stream_id)
|
||||
|
||||
for stream_id in remove_list:
|
||||
_system = systems[system].STATUS[stream_id]
|
||||
_config = CONFIG['SYSTEMS'][system]
|
||||
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \
|
||||
system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START'])
|
||||
# self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration))
|
||||
removed = systems[system].STATUS.pop(stream_id)
|
||||
logger.debug('Inactive OpenBridge Stream ID removed from System: %s, Stream ID %s', system, int_id(stream_id))
|
||||
for system in systems:
|
||||
if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE':
|
||||
for slot in range(1,3):
|
||||
_slot = systems[system].STATUS[slot]
|
||||
if _slot['RX_TYPE'] != hb_const.HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5:
|
||||
_slot['RX_TYPE'] = hb_const.HBPF_SLT_VTERM
|
||||
logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s (%s) TGID %s (%s), TS %s, Duration: %s', \
|
||||
system, int_id(_slot['RX_STREAM_ID']), get_alias(_slot['RX_RFS'], subscriber_ids), int_id(_slot['RX_RFS']), get_alias(_slot['RX_TGID'], talkgroup_ids), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
|
||||
|
||||
|
||||
class routerOBP(OPENBRIDGE):
|
||||
|
||||
|
||||
def __init__(self, _name, _config, _logger, _report):
|
||||
OPENBRIDGE.__init__(self, _name, _config, _logger, _report)
|
||||
self.STATUS = {}
|
||||
|
||||
|
||||
|
||||
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
|
||||
pkt_time = time()
|
||||
@ -237,14 +246,14 @@ class routerOBP(OPENBRIDGE):
|
||||
_bits = int_id(_data[15])
|
||||
|
||||
if _call_type == 'group':
|
||||
|
||||
|
||||
# Check for ACL match, and return if the subscriber is not allowed
|
||||
if allow_sub(_rf_src) == False:
|
||||
self._logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_peer_id), int_id(_dst_id))
|
||||
return
|
||||
|
||||
# Is this a new call stream?
|
||||
if (_stream_id not in self.STATUS):
|
||||
|
||||
# Is this a new call stream?
|
||||
if (_stream_id not in self.STATUS):
|
||||
# This is a new call stream
|
||||
self.STATUS[_stream_id] = {
|
||||
'START': pkt_time,
|
||||
@ -257,33 +266,33 @@ class routerOBP(OPENBRIDGE):
|
||||
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD:
|
||||
decoded = decode.voice_head_term(dmrpkt)
|
||||
self.STATUS[_stream_id]['LC'] = decoded['LC']
|
||||
|
||||
|
||||
# If we don't have a voice header then don't wait to decode the Embedded LC
|
||||
# just make a new one from the HBP header. This is good enough, and it saves lots of time
|
||||
else:
|
||||
self.STATUS[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src
|
||||
|
||||
|
||||
|
||||
|
||||
self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \
|
||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
|
||||
if CONFIG['REPORTS']['REPORT']:
|
||||
self._report.send_bridgeEvent('GROUP VOICE,START,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)))
|
||||
|
||||
|
||||
|
||||
|
||||
self.STATUS[_stream_id]['LAST'] = pkt_time
|
||||
|
||||
|
||||
for _bridge in BRIDGES:
|
||||
for _system in BRIDGES[_bridge]:
|
||||
|
||||
|
||||
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
||||
|
||||
|
||||
for _target in BRIDGES[_bridge]:
|
||||
if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']):
|
||||
_target_status = systems[_target['SYSTEM']].STATUS
|
||||
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
||||
if _target_system['MODE'] == 'OPENBRIDGE':
|
||||
# Is this a new call stream on the target?
|
||||
# Is this a new call stream on the target?
|
||||
if (_stream_id not in _target_status):
|
||||
# This is a new call stream on the target
|
||||
_target_status[_stream_id] = {
|
||||
@ -297,26 +306,26 @@ class routerOBP(OPENBRIDGE):
|
||||
decoded = decode.voice_head_term(dmrpkt)
|
||||
_target_status[_stream_id]['LC'] = decoded['LC']
|
||||
self._logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
# If we don't have a voice header then don't wait to decode the Embedded LC
|
||||
# just make a new one from the HBP header. This is good enough, and it saves lots of time
|
||||
else:
|
||||
_target_status[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src
|
||||
self._logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
_target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(_target_status[_stream_id]['LC'])
|
||||
_target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(_target_status[_stream_id]['LC'])
|
||||
_target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC'])
|
||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
|
||||
# Record the time of this packet so we can later identify a stale stream
|
||||
# Record the time of this packet so we can later identify a stale stream
|
||||
_target_status[_stream_id]['LAST'] = pkt_time
|
||||
# Clear the TS bit -- all OpenBridge streams are effectively on TS1
|
||||
_tmp_bits = _bits & ~(1 << 7)
|
||||
|
||||
|
||||
# Assemble transmit HBP packet header
|
||||
_tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
|
||||
|
||||
|
||||
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
|
||||
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
|
||||
# if _dst_id != rule['DST_GROUP']:
|
||||
@ -333,7 +342,7 @@ class routerOBP(OPENBRIDGE):
|
||||
dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
|
||||
dmrpkt = dmrbits.tobytes()
|
||||
_tmp_data = _tmp_data + dmrpkt #+ _data[53:55]
|
||||
|
||||
|
||||
else:
|
||||
# BEGIN CONTENTION HANDLING
|
||||
#
|
||||
@ -364,11 +373,11 @@ class routerOBP(OPENBRIDGE):
|
||||
self.STATUS[_stream_id]['CONTENTION'] = True
|
||||
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
||||
continue
|
||||
|
||||
|
||||
# Set values for the contention handler to test next time there is a frame to forward
|
||||
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
||||
|
||||
if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||
|
||||
if (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||
# Record the DST TGID and Stream ID
|
||||
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
||||
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
||||
@ -380,16 +389,16 @@ class routerOBP(OPENBRIDGE):
|
||||
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
||||
self._logger.debug('(%s) Generating TX FULL and EMB LCs for destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
# Handle any necessary re-writes for the destination
|
||||
if _system['TS'] != _target['TS']:
|
||||
_tmp_bits = _bits ^ 1 << 7
|
||||
else:
|
||||
_tmp_bits = _bits
|
||||
|
||||
|
||||
# Assemble transmit HBP packet header
|
||||
_tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
|
||||
|
||||
|
||||
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
|
||||
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
|
||||
# if _dst_id != rule['DST_GROUP']:
|
||||
@ -406,13 +415,13 @@ class routerOBP(OPENBRIDGE):
|
||||
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
|
||||
dmrpkt = dmrbits.tobytes()
|
||||
_tmp_data = _tmp_data + dmrpkt + _data[53:55]
|
||||
|
||||
|
||||
# Transmit the packet to the destination system
|
||||
systems[_target['SYSTEM']].send_system(_tmp_data)
|
||||
#self._logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Final actions - Is this a voice terminator?
|
||||
if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM):
|
||||
call_duration = pkt_time - self.STATUS[_stream_id]['START']
|
||||
@ -426,16 +435,19 @@ class routerOBP(OPENBRIDGE):
|
||||
self_logger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id))
|
||||
|
||||
class routerHBP(HBSYSTEM):
|
||||
|
||||
|
||||
def __init__(self, _name, _config, _logger, _report):
|
||||
HBSYSTEM.__init__(self, _name, _config, _logger, _report)
|
||||
|
||||
|
||||
# Status information for the system, TS1 & TS2
|
||||
# 1 & 2 are "timeslot"
|
||||
# In TX_EMB_LC, 2-5 are burst B-E
|
||||
self.STATUS = {
|
||||
1: {
|
||||
'RX_ACTIVE': False,
|
||||
'TX_ACTIVE': False,
|
||||
'RX_START': time(),
|
||||
'TX_START': time(),
|
||||
'RX_SEQ': '\x00',
|
||||
'RX_RFS': '\x00',
|
||||
'TX_RFS': '\x00',
|
||||
@ -457,7 +469,10 @@ class routerHBP(HBSYSTEM):
|
||||
}
|
||||
},
|
||||
2: {
|
||||
'RX_ACTIVE': False,
|
||||
'TX_ACTIVE': False,
|
||||
'RX_START': time(),
|
||||
'TX_START': time(),
|
||||
'RX_SEQ': '\x00',
|
||||
'RX_RFS': '\x00',
|
||||
'TX_RFS': '\x00',
|
||||
@ -486,30 +501,30 @@ class routerHBP(HBSYSTEM):
|
||||
_bits = int_id(_data[15])
|
||||
|
||||
if _call_type == 'group':
|
||||
|
||||
|
||||
# Check for ACL match, and return if the subscriber is not allowed
|
||||
if allow_sub(_rf_src) == False:
|
||||
self._logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_peer_id), int_id(_dst_id))
|
||||
return
|
||||
|
||||
# Is this a new call stream?
|
||||
|
||||
# Is this a new call stream?
|
||||
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']):
|
||||
if (self.STATUS[_slot]['RX_TYPE'] != hb_const.HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + hb_const.STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']):
|
||||
self._logger.warning('(%s) Packet received with STREAM ID: %s <FROM> SUB: %s PEER: %s <TO> TGID %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot)
|
||||
return
|
||||
|
||||
|
||||
# This is a new call stream
|
||||
self.STATUS['RX_START'] = pkt_time
|
||||
self.STATUS[_slot]['RX_START'] = pkt_time
|
||||
self._logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \
|
||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
|
||||
if CONFIG['REPORTS']['REPORT']:
|
||||
self._report.send_bridgeEvent('GROUP VOICE,START,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)))
|
||||
|
||||
|
||||
# If we can, use the LC from the voice header as to keep all options intact
|
||||
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD:
|
||||
decoded = decode.voice_head_term(dmrpkt)
|
||||
self.STATUS[_slot]['RX_LC'] = decoded['LC']
|
||||
|
||||
|
||||
# If we don't have a voice header then don't wait to decode it from the Embedded LC
|
||||
# just make a new one from the HBP header. This is good enough, and it saves lots of time
|
||||
else:
|
||||
@ -517,17 +532,17 @@ class routerHBP(HBSYSTEM):
|
||||
|
||||
for _bridge in BRIDGES:
|
||||
for _system in BRIDGES[_bridge]:
|
||||
|
||||
|
||||
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
||||
|
||||
|
||||
for _target in BRIDGES[_bridge]:
|
||||
if _target['SYSTEM'] != self._system:
|
||||
if _target['ACTIVE']:
|
||||
_target_status = systems[_target['SYSTEM']].STATUS
|
||||
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
||||
|
||||
|
||||
if _target_system['MODE'] == 'OPENBRIDGE':
|
||||
# Is this a new call stream on the target?
|
||||
# Is this a new call stream on the target?
|
||||
if (_stream_id not in _target_status):
|
||||
# This is a new call stream on the target
|
||||
_target_status[_stream_id] = {
|
||||
@ -541,26 +556,26 @@ class routerHBP(HBSYSTEM):
|
||||
decoded = decode.voice_head_term(dmrpkt)
|
||||
_target_status[_stream_id]['LC'] = decoded['LC']
|
||||
self._logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
# If we don't have a voice header then don't wait to decode the Embedded LC
|
||||
# just make a new one from the HBP header. This is good enough, and it saves lots of time
|
||||
else:
|
||||
_target_status[_stream_id]['LC'] = const.LC_OPT + _dst_id + _rf_src
|
||||
self._logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
_target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(_target_status[_stream_id]['LC'])
|
||||
_target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(_target_status[_stream_id]['LC'])
|
||||
_target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC'])
|
||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
|
||||
# Record the time of this packet so we can later identify a stale stream
|
||||
|
||||
# Record the time of this packet so we can later identify a stale stream
|
||||
_target_status[_stream_id]['LAST'] = pkt_time
|
||||
# Clear the TS bit -- all OpenBridge streams are effectively on TS1
|
||||
_tmp_bits = _bits & ~(1 << 7)
|
||||
|
||||
|
||||
# Assemble transmit HBP packet header
|
||||
_tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
|
||||
|
||||
|
||||
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
|
||||
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
|
||||
# if _dst_id != rule['DST_GROUP']:
|
||||
@ -604,11 +619,11 @@ class routerHBP(HBSYSTEM):
|
||||
if _frame_type == hb_const.HBPF_DATA_SYNC and _dtype_vseq == hb_const.HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _seq:
|
||||
self._logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
|
||||
continue
|
||||
|
||||
|
||||
# Set values for the contention handler to test next time there is a frame to forward
|
||||
_target_status[_target['TS']]['TX_TIME'] = pkt_time
|
||||
|
||||
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||
|
||||
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']):
|
||||
# Record the DST TGID and Stream ID
|
||||
_target_status[_target['TS']]['TX_TGID'] = _target['TGID']
|
||||
_target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id
|
||||
@ -620,16 +635,16 @@ class routerHBP(HBSYSTEM):
|
||||
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
|
||||
self._logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
self._logger.info('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
# Handle any necessary re-writes for the destination
|
||||
if _system['TS'] != _target['TS']:
|
||||
_tmp_bits = _bits ^ 1 << 7
|
||||
else:
|
||||
_tmp_bits = _bits
|
||||
|
||||
|
||||
# Assemble transmit HBP packet header
|
||||
_tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
|
||||
|
||||
|
||||
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
|
||||
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
|
||||
# if _dst_id != rule['DST_GROUP']:
|
||||
@ -646,36 +661,36 @@ class routerHBP(HBSYSTEM):
|
||||
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
|
||||
dmrpkt = dmrbits.tobytes()
|
||||
_tmp_data = _tmp_data + dmrpkt + _data[53:55]
|
||||
|
||||
|
||||
# Transmit the packet to the destination system
|
||||
systems[_target['SYSTEM']].send_system(_tmp_data)
|
||||
#self._logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Final actions - Is this a voice terminator?
|
||||
if (_frame_type == hb_const.HBPF_DATA_SYNC) and (_dtype_vseq == hb_const.HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != hb_const.HBPF_SLT_VTERM):
|
||||
call_duration = pkt_time - self.STATUS['RX_START']
|
||||
call_duration = pkt_time - self.STATUS[_slot]['RX_START']
|
||||
self._logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %s', \
|
||||
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
|
||||
if CONFIG['REPORTS']['REPORT']:
|
||||
self._report.send_bridgeEvent('GROUP VOICE,END,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration))
|
||||
|
||||
|
||||
#
|
||||
# Begin in-band signalling for call end. This has nothign to do with routing traffic directly.
|
||||
#
|
||||
|
||||
|
||||
# Iterate the rules dictionary
|
||||
|
||||
|
||||
for _bridge in BRIDGES:
|
||||
for _system in BRIDGES[_bridge]:
|
||||
if _system['SYSTEM'] == self._system:
|
||||
|
||||
|
||||
# TGID matches a rule source, reset its timer
|
||||
if _slot == _system['TS'] and _dst_id == _system['TGID'] and ((_system['TO_TYPE'] == 'ON' and (_system['ACTIVE'] == True)) or (_system['TO_TYPE'] == 'OFF' and _system['ACTIVE'] == False)):
|
||||
_system['TIMER'] = pkt_time + _system['TIMEOUT']
|
||||
self._logger.info('(%s) Transmission match for Bridge: %s. Reset timeout to %s', self._system, _bridge, _system['TIMER'])
|
||||
|
||||
|
||||
# TGID matches an ACTIVATION trigger
|
||||
if (_dst_id in _system['ON'] or _dst_id in _system['RESET']) and _slot == _system['TS']:
|
||||
# Set the matching rule as ACTIVE
|
||||
@ -692,7 +707,7 @@ class routerHBP(HBSYSTEM):
|
||||
if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON':
|
||||
_system['TIMER'] = pkt_time + _system['TIMEOUT']
|
||||
self._logger.info('(%s) Bridge: %s, timeout timer reset to: %s', self._system, _bridge, _system['TIMER'] - pkt_time)
|
||||
|
||||
|
||||
# TGID matches an DE-ACTIVATION trigger
|
||||
if (_dst_id in _system['OFF'] or _dst_id in _system['RESET']) and _slot == _system['TS']:
|
||||
# Set the matching rule as ACTIVE
|
||||
@ -713,11 +728,11 @@ class routerHBP(HBSYSTEM):
|
||||
_system['TIMER'] = pkt_time
|
||||
self._logger.info('(%s) Bridge: %s set to ON with and "OFF" timer rule: timeout timer cancelled', self._system, _bridge)
|
||||
|
||||
#
|
||||
#
|
||||
# END IN-BAND SIGNALLING
|
||||
#
|
||||
|
||||
|
||||
|
||||
|
||||
# Mark status variables for use later
|
||||
self.STATUS[_slot]['RX_SEQ'] = _seq
|
||||
self.STATUS[_slot]['RX_RFS'] = _rf_src
|
||||
@ -725,16 +740,16 @@ class routerHBP(HBSYSTEM):
|
||||
self.STATUS[_slot]['RX_TGID'] = _dst_id
|
||||
self.STATUS[_slot]['RX_TIME'] = pkt_time
|
||||
self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id
|
||||
|
||||
|
||||
#
|
||||
# Socket-based reporting section
|
||||
#
|
||||
class confbridgeReportFactory(reportFactory):
|
||||
|
||||
|
||||
def send_bridge(self):
|
||||
serialized = pickle.dumps(BRIDGES, protocol=pickle.HIGHEST_PROTOCOL)
|
||||
self.send_clients(REPORT_OPCODES['BRIDGE_SND']+serialized)
|
||||
|
||||
|
||||
def send_bridgeEvent(self, _data):
|
||||
self.send_clients(REPORT_OPCODES['BRDG_EVENT']+_data)
|
||||
|
||||
@ -744,13 +759,13 @@ class confbridgeReportFactory(reportFactory):
|
||||
#************************************************
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
import signal
|
||||
from dmr_utils.utils import try_download, mk_id_dict
|
||||
|
||||
|
||||
# Change the current directory to the location of the application
|
||||
os.chdir(os.path.dirname(os.path.realpath(sys.argv[0])))
|
||||
|
||||
@ -766,27 +781,27 @@ if __name__ == '__main__':
|
||||
|
||||
# Call the external routine to build the configuration dictionary
|
||||
CONFIG = hb_config.build_config(cli_args.CONFIG_FILE)
|
||||
|
||||
|
||||
# Start the system logger
|
||||
if cli_args.LOG_LEVEL:
|
||||
CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL
|
||||
logger = hb_log.config_logging(CONFIG['LOGGER'])
|
||||
logger.debug('Logging system started, anything from here on gets logged')
|
||||
|
||||
|
||||
# Set up the signal handler
|
||||
def sig_handler(_signal, _frame):
|
||||
logger.info('SHUTDOWN: HBROUTER IS TERMINATING WITH SIGNAL %s', str(_signal))
|
||||
hblink_handler(_signal, _frame, logger)
|
||||
logger.info('SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR')
|
||||
reactor.stop()
|
||||
|
||||
|
||||
# Set signal handers so that we can gracefully exit if need be
|
||||
for sig in [signal.SIGTERM, signal.SIGINT]:
|
||||
signal.signal(sig, sig_handler)
|
||||
|
||||
|
||||
# Build the Access Control List
|
||||
REG_ACL = build_reg_acl('reg_acl', logger)
|
||||
|
||||
|
||||
# ID ALIAS CREATION
|
||||
# Download
|
||||
if CONFIG['ALIASES']['TRY_DOWNLOAD'] == True:
|
||||
@ -796,32 +811,32 @@ if __name__ == '__main__':
|
||||
# Try updating subscriber aliases file
|
||||
result = try_download(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['SUBSCRIBER_FILE'], CONFIG['ALIASES']['SUBSCRIBER_URL'], CONFIG['ALIASES']['STALE_TIME'])
|
||||
logger.info(result)
|
||||
|
||||
|
||||
# Make Dictionaries
|
||||
peer_ids = mk_id_dict(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['PEER_FILE'])
|
||||
if peer_ids:
|
||||
logger.info('ID ALIAS MAPPER: peer_ids dictionary is available')
|
||||
|
||||
|
||||
subscriber_ids = mk_id_dict(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['SUBSCRIBER_FILE'])
|
||||
if subscriber_ids:
|
||||
logger.info('ID ALIAS MAPPER: subscriber_ids dictionary is available')
|
||||
|
||||
|
||||
talkgroup_ids = mk_id_dict(CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['TGID_FILE'])
|
||||
if talkgroup_ids:
|
||||
logger.info('ID ALIAS MAPPER: talkgroup_ids dictionary is available')
|
||||
|
||||
|
||||
# Build the routing rules file
|
||||
BRIDGES = make_bridges('hb_confbridge_rules')
|
||||
|
||||
|
||||
# Build the Access Control List
|
||||
ACL = build_acl('sub_acl')
|
||||
|
||||
|
||||
# Build the Registration Access Control List
|
||||
REG_ACL = build_reg_acl('reg_acl', logger)
|
||||
|
||||
|
||||
# INITIALIZE THE REPORTING LOOP
|
||||
report_server = config_reports(CONFIG, logger, confbridgeReportFactory)
|
||||
|
||||
|
||||
# HBlink instance creation
|
||||
logger.info('HBlink \'hb_router.py\' (c) 2016 N0MJS & the K0USY Group - SYSTEM STARTING...')
|
||||
for system in CONFIG['SYSTEMS']:
|
||||
@ -832,11 +847,11 @@ if __name__ == '__main__':
|
||||
systems[system] = routerHBP(system, CONFIG, logger, report_server)
|
||||
reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP'])
|
||||
logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])
|
||||
|
||||
|
||||
# Initialize the rule timer -- this if for user activated stuff
|
||||
rule_timer = task.LoopingCall(rule_timer_loop)
|
||||
rule_timer.start(60)
|
||||
|
||||
|
||||
# Initialize the stream trimmer
|
||||
stream_trimmer = task.LoopingCall(stream_trimmer_loop)
|
||||
stream_trimmer.start(5)
|
||||
|
Loading…
Reference in New Issue
Block a user