From 90a889ec3bdb9cead5b28afc320b6cb99ad6e822 Mon Sep 17 00:00:00 2001 From: Cort Buffington Date: Mon, 7 Jan 2019 09:44:36 -0600 Subject: [PATCH] Large updates, mostly contact to join --- bridge.py | 128 +++++++++++++++++++++-------------------------- config.py | 8 +-- hblink.py | 146 +++++++++++++++++++++++++++--------------------------- 3 files changed, 135 insertions(+), 147 deletions(-) diff --git a/bridge.py b/bridge.py index bcaef6c..641535d 100755 --- a/bridge.py +++ b/bridge.py @@ -19,7 +19,7 @@ ############################################################################### ''' -This application, in conjuction with it's rule file (hb_confbridge_rules.py) will +This application, in conjuction with it's rule file (rules.py) will work like a "conference bridge". This is similar to what most hams think of as a reflector. You define conference bridges and any system joined to that conference bridge will both receive traffic from, and send traffic to any other system @@ -50,8 +50,8 @@ import log from const import * # Stuff for socket reporting -import pickle as pickle -from datetime import datetime +import pickle +# REMOVE LATER from datetime import datetime # The module needs logging, but handlers, etc. are controlled by the parent import logging logger = logging.getLogger(__name__) @@ -73,11 +73,11 @@ __email__ = 'n0mjs@me.com' def config_reports(_config, _factory): if True: #_config['REPORTS']['REPORT']: def reporting_loop(logger, _server): - logger.debug('Periodic reporting loop started') + logger.debug('(REPORT) Periodic reporting loop started') _server.send_config() _server.send_bridge() - logger.info('HBlink TCP reporting server configured') + logger.info('(REPORT) HBlink TCP reporting server configured') report_server = _factory(_config) report_server.clients = [] @@ -93,12 +93,12 @@ def config_reports(_config, _factory): # Note: A stanza *must* exist for any MASTER or CLIENT configured in the main # configuration file and listed as "active". It can be empty, # but it has to exist. -def make_bridges(_hb_confbridge_bridges): +def make_bridges(_rules): try: - bridge_file = import_module(_hb_confbridge_bridges) - logger.info('Routing bridges file found and bridges imported') + bridge_file = import_module(_rules) + logger.info('(ROUTER) Routing bridges file found and bridges imported') except ImportError: - sys.exit('Routing bridges file not found or invalid') + sys.exit('(ROUTER) TERMINATING: Routing bridges file not found or invalid') # Convert integer GROUP ID numbers from the config into hex strings # we need to send in the actual data packets. @@ -122,7 +122,7 @@ def make_bridges(_hb_confbridge_bridges): # Run this every minute for rule timer updates def rule_timer_loop(): - logger.debug('(ALL HBSYSTEMS) Rule timer loop started') + logger.debug('(ROUTER) routerHBP Rule timer loop started') _now = time() for _bridge in BRIDGES: @@ -131,24 +131,24 @@ def rule_timer_loop(): if _system['ACTIVE'] == True: if _system['TIMER'] < _now: _system['ACTIVE'] = False - logger.info('Conference Bridge TIMEOUT: DEACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.info('(ROUTER) Conference Bridge TIMEOUT: DEACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: timeout_in = _system['TIMER'] - _now - logger.info('Conference Bridge ACTIVE (ON timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %ss,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) + logger.info('(ROUTER) Conference Bridge ACTIVE (ON timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %ss,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) elif _system['ACTIVE'] == False: - logger.debug('Conference Bridge INACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge INACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) elif _system['TO_TYPE'] == 'OFF': if _system['ACTIVE'] == False: if _system['TIMER'] < _now: _system['ACTIVE'] = True - logger.info('Conference Bridge TIMEOUT: ACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.info('(ROUTER) Conference Bridge TIMEOUT: ACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: timeout_in = _system['TIMER'] - _now - logger.info('Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %ss,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) + logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %ss,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) elif _system['ACTIVE'] == True: - logger.debug('Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: - logger.debug('Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') @@ -156,7 +156,7 @@ def rule_timer_loop(): # run this every 10 seconds to trim orphaned stream ids def stream_trimmer_loop(): - logger.debug('(ALL OPENBRIDGE SYSTEMS) Trimming inactive stream IDs from system lists') + logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') _now = time() for system in systems: @@ -164,6 +164,8 @@ def stream_trimmer_loop(): if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': for slot in range(1,3): _slot = systems[system].STATUS[slot] + + # RX slot check if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: _slot['RX_TYPE'] = HBPF_SLT_VTERM logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %s', \ @@ -171,8 +173,7 @@ def stream_trimmer_loop(): if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) - for slot in range(1,3): - _slot = systems[system].STATUS[slot] + # TX slot check if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: _slot['TX_TYPE'] = HBPF_SLT_VTERM logger.info('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %s', \ @@ -189,12 +190,12 @@ def stream_trimmer_loop(): remove_list.append(stream_id) for stream_id in remove_list: if stream_id in systems[system].STATUS: - _system = systems[system].STATUS[stream_id] - _config = CONFIG['SYSTEMS'][system] + _stream = systems[system].STATUS[stream_id] + _sysconfig = CONFIG['SYSTEMS'][system] logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %s', \ - system, int_id(stream_id), get_alias(int_id(_system['RFS']), subscriber_ids), get_alias(int_id(_config['NETWORK_ID']), peer_ids), get_alias(int_id(_system['TGID']), talkgroup_ids), _system['LAST'] - _system['START']) + system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_config['NETWORK_ID']), int_id(_system['RFS']), 1, int_id(_system['TGID']), _system['LAST'] - _system['START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) removed = systems[system].STATUS.pop(stream_id) else: logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) @@ -260,21 +261,12 @@ class routerOBP(OPENBRIDGE): 'RFS': _rf_src, 'TGID': _dst_id, } - # If we can, use the LC from the voice header as to keep all options intact - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: - decoded = decode.voice_head_term(dmrpkt) - _target_status[_stream_id]['LC'] = decoded['LC'] - logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) + # Generate LCs (full and EMB) for the TX stream + dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) + _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) + _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) - # If we don't have a voice header then don't wait to decode the Embedded LC - # just make a new one from the HBP header. This is good enough, and it saves lots of time - else: - _target_status[_stream_id]['LC'] = LC_OPT + _dst_id + _rf_src - logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) - - _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(_target_status[_stream_id]['LC']) - _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(_target_status[_stream_id]['LC']) - _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC']) logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) # Record the time of this packet so we can later identify a stale stream @@ -283,7 +275,7 @@ class routerOBP(OPENBRIDGE): _tmp_bits = _bits & ~(1 << 7) # Assemble transmit HBP packet header - _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + _tmp_bits.to_bytes(1, 'big') + _data[16:20] + _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT @@ -300,7 +292,7 @@ class routerOBP(OPENBRIDGE): elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() - _tmp_data = _tmp_data + dmrpkt #+ _data[53:55] + _tmp_data = b''.join([_tmp_data, dmrpkt]) else: # BEGIN CONTENTION HANDLING @@ -334,8 +326,7 @@ class routerOBP(OPENBRIDGE): continue # Is this a new call stream? - if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): #(_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): - #if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[_target['TS']]['TX_RFS'] != _rf_src) or (_target_status[_target['TS']]['TX_TGID'] != _target['TGID']): + if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): # Record the DST TGID and Stream ID _target_status[_target['TS']]['TX_START'] = pkt_time _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] @@ -343,7 +334,7 @@ class routerOBP(OPENBRIDGE): _target_status[_target['TS']]['TX_RFS'] = _rf_src _target_status[_target['TS']]['TX_PEER'] = _peer_id # Generate LCs (full and EMB) for the TX stream - dst_lc = self.STATUS[_stream_id]['LC'][0:3] + _target['TGID'] + _rf_src + dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) @@ -363,7 +354,7 @@ class routerOBP(OPENBRIDGE): _tmp_bits = _bits # Assemble transmit HBP packet header - _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + _tmp_bits.to_bytes(1, 'big') + _data[16:20] + _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT @@ -382,7 +373,7 @@ class routerOBP(OPENBRIDGE): elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() - _tmp_data = _tmp_data + dmrpkt + b'\x00\x00' # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55] + _tmp_data = b''.join([_tmp_data, dmrpkt, b'\x00\x00']) # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55] # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data) @@ -516,21 +507,12 @@ class routerHBP(HBSYSTEM): 'RFS': _rf_src, 'TGID': _dst_id, } - # If we can, use the LC from the voice header as to keep all options intact - if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: - decoded = decode.voice_head_term(dmrpkt) - _target_status[_stream_id]['LC'] = decoded['LC'] - logger.debug('(%s) Created LC for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) + # Generate LCs (full and EMB) for the TX stream + dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) + _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) + _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) - # If we don't have a voice header then don't wait to decode the Embedded LC - # just make a new one from the HBP header. This is good enough, and it saves lots of time - else: - _target_status[_stream_id]['LC'] = LC_OPT + _dst_id + _rf_src - logger.info('(%s) Created LC with *LATE ENTRY* for OpenBridge destination: System: %s, TGID: %s', self._system, _target['SYSTEM'], int_id(_target['TGID'])) - - _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(_target_status[_stream_id]['LC']) - _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(_target_status[_stream_id]['LC']) - _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(_target_status[_stream_id]['LC']) logger.info('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) # Record the time of this packet so we can later identify a stale stream @@ -539,7 +521,7 @@ class routerHBP(HBSYSTEM): _tmp_bits = _bits & ~(1 << 7) # Assemble transmit HBP packet header - _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + _tmp_bits.to_bytes(1, 'big') + _data[16:20] + _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT @@ -556,7 +538,7 @@ class routerHBP(HBSYSTEM): elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() - _tmp_data = _tmp_data + dmrpkt #+ _data[53:55] + _tmp_data = b''.join([_tmp_data, dmrpkt]) else: # BEGIN STANDARD CONTENTION HANDLING @@ -614,7 +596,7 @@ class routerHBP(HBSYSTEM): _tmp_bits = _bits # Assemble transmit HBP packet header - _tmp_data = _data[:8] + _target['TGID'] + _data[11:15] + _tmp_bits.to_bytes(1, 'big') + _data[16:20] + _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT @@ -633,7 +615,7 @@ class routerHBP(HBSYSTEM): elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() - _tmp_data = _tmp_data + dmrpkt + _data[53:55] + _tmp_data = b''.join([_tmp_data, dmrpkt, _data[53:55]]) # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data) @@ -718,7 +700,7 @@ class routerHBP(HBSYSTEM): # # Socket-based reporting section # -class confbridgeReportFactory(reportFactory): +class bridgeReportFactory(reportFactory): def send_bridge(self): serialized = pickle.dumps(BRIDGES, protocol=2) #.decode("utf-8", errors='ignore') @@ -763,13 +745,13 @@ if __name__ == '__main__': CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL logger = log.config_logging(CONFIG['LOGGER']) logger.info('\n\nCopyright (c) 2013, 2014, 2015, 2016, 2018\n\tThe Founding Members of the K0USY Group. All rights reserved.\n') - logger.debug('Logging system started, anything from here on gets logged') + logger.debug('(GLOBAL) Logging system started, anything from here on gets logged') # Set up the signal handler def sig_handler(_signal, _frame): - logger.info('SHUTDOWN: CONFBRIDGE IS TERMINATING WITH SIGNAL %s', str(_signal)) + logger.info('(GLOBAL) SHUTDOWN: CONFBRIDGE IS TERMINATING WITH SIGNAL %s', str(_signal)) hblink_handler(_signal, _frame) - logger.info('SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') + logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') reactor.stop() # Set signal handers so that we can gracefully exit if need be @@ -783,10 +765,14 @@ if __name__ == '__main__': BRIDGES = make_bridges('rules') # INITIALIZE THE REPORTING LOOP - report_server = config_reports(CONFIG, confbridgeReportFactory) + if CONFIG['REPORTS']['REPORT']: + report_server = config_reports(CONFIG, bridgereportFactory) + else: + report_server = None + logger.info('(REPORT) TCP Socket reporting not configured') # HBlink instance creation - logger.info('HBlink \'hb_confbridge.py\' -- SYSTEM STARTING...') + logger.info('(GLOBAL) HBlink \'bridge.py\' -- SYSTEM STARTING...') for system in CONFIG['SYSTEMS']: if CONFIG['SYSTEMS'][system]['ENABLED']: if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -794,10 +780,10 @@ if __name__ == '__main__': else: systems[system] = routerHBP(system, CONFIG, report_server) reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) - logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) + logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) def loopingErrHandle(failure): - logger.error('STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) + logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) reactor.stop() # Initialize the rule timer -- this if for user activated stuff diff --git a/config.py b/config.py index 2fe47b8..ae8fbc6 100755 --- a/config.py +++ b/config.py @@ -67,7 +67,7 @@ def acl_build(_acl, _max): if not _acl: return(True, set((const.ID_MIN, _max))) - acl = set() + acl = [] #set() sections = _acl.split(':') if sections[0] == 'PERMIT': @@ -77,20 +77,20 @@ def acl_build(_acl, _max): for entry in sections[1].split(','): if entry == 'ALL': - acl.add((const.ID_MIN, _max)) + acl.append((const.ID_MIN, _max)) break elif '-' in entry: start,end = entry.split('-') start,end = int(start), int(end) if (const.ID_MIN <= start <= _max) or (const.ID_MIN <= end <= _max): - acl.add((start, end)) + acl.append((start, end)) else: sys.exit('ACL CREATION ERROR, VALUE OUT OF RANGE (} - {})IN RANGE-BASED ENTRY: {}'.format(const.ID_MIN, _max, entry)) else: id = int(entry) if (const.ID_MIN <= id <= _max): - acl.add((id, id)) + acl.append((id, id)) else: sys.exit('ACL CREATION ERROR, VALUE OUT OF RANGE ({} - {}) IN SINGLE ID ENTRY: {}'.format(const.ID_MIN, _max, entry)) diff --git a/hblink.py b/hblink.py index 252e2cf..3d3614f 100755 --- a/hblink.py +++ b/hblink.py @@ -27,8 +27,6 @@ works stand-alone before troubleshooting any applications that use it. It has sufficient logging to be used standalone as a troubleshooting application. ''' - - # Specifig functions from modules we need from binascii import b2a_hex as ahex from binascii import a2b_hex as bhex @@ -36,8 +34,6 @@ from random import randint from hashlib import sha256, sha1 from hmac import new as hmac_new, compare_digest from time import time -from bitstring import BitArray -from importlib import import_module from collections import deque # Twisted is pretty important, so I keep it separate @@ -52,7 +48,7 @@ from const import * from dmr_utils3.utils import int_id, bytes_4, try_download, mk_id_dict # Imports for the reporting server -import pickle as pickle +import pickle from reporting_const import * # The module needs logging logging, but handlers, etc. are controlled by the parent @@ -71,22 +67,19 @@ __email__ = 'n0mjs@me.com' systems = {} # Timed loop used for reporting HBP status -# -# REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE def config_reports(_config, _factory): - if True: #_config['REPORTS']['REPORT']: - def reporting_loop(_logger, _server): - _logger.debug('Periodic reporting loop started') - _server.send_config() + def reporting_loop(_logger, _server): + _logger.debug('(GLOBAL) Periodic reporting loop started') + _server.send_config() - logger.info('HBlink TCP reporting server configured') + logger.info('(GLOBAL) HBlink TCP reporting server configured') - report_server = _factory(_config) - report_server.clients = [] - reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server) + report_server = _factory(_config) + report_server.clients = [] + reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server) - reporting = task.LoopingCall(reporting_loop, logger, report_server) - reporting.start(_config['REPORTS']['REPORT_INTERVAL']) + reporting = task.LoopingCall(reporting_loop, logger, report_server) + reporting.start(_config['REPORTS']['REPORT_INTERVAL']) return report_server @@ -94,7 +87,7 @@ def config_reports(_config, _factory): # Shut ourselves down gracefully by disconnecting from the masters and peers. def hblink_handler(_signal, _frame): for system in systems: - logger.info('SHUTDOWN: DE-REGISTER SYSTEM: %s', system) + logger.info('(GLOBAL) SHUTDOWN: DE-REGISTER SYSTEM: %s', system) systems[system].dereg() # Check a supplied ID against the ACL provided. Returns action (True|False) based @@ -125,8 +118,10 @@ class OPENBRIDGE(DatagramProtocol): def send_system(self, _packet): if _packet[:4] == 'DMRD': - _packet = _packet[:11] + self._config['NETWORK_ID'] + _packet[15:] - _packet += hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest() + #_packet = _packet[:11] + self._config['NETWORK_ID'] + _packet[15:] + _packet = b''.join([_packet[:11], self._config['NETWORK_ID'], _packet[15:]]) + #_packet += hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest() + _packet = b''.join([_packet, (hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest())]) self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! # logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet)) @@ -260,11 +255,11 @@ class HBSYSTEM(DatagramProtocol): self._stats['NUM_OUTSTANDING'] = 0 self._stats['PING_OUTSTANDING'] = False self._stats['CONNECTION'] = 'RPTL_SENT' - self.send_master(RPTL + self._config['RADIO_ID']) + self.send_master(b''.join([RPTL, self._config['RADIO_ID']])) logger.info('(%s) Sending login request to master %s:%s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT']) # If we are connected, sent a ping to the master and increment the counter if self._stats['CONNECTION'] == 'YES': - self.send_master(RPTPING + self._config['RADIO_ID']) + self.send_master(b''.join([RPTPING, self._config['RADIO_ID']])) logger.debug('(%s) RPTPING Sent to Master. Total Sent: %s, Total Missed: %s, Currently Outstanding: %s', self._system, self._stats['PINGS_SENT'], self._stats['PINGS_SENT'] - self._stats['PINGS_ACKD'], self._stats['NUM_OUTSTANDING']) self._stats['PINGS_SENT'] += 1 self._stats['PING_OUTSTANDING'] = True @@ -281,8 +276,9 @@ class HBSYSTEM(DatagramProtocol): #logger.debug('(%s) TX Packet to %s on port %s: %s', self._peers[_peer]['RADIO_ID'], self._peers[_peer]['IP'], self._peers[_peer]['PORT'], ahex(_packet)) def send_master(self, _packet): - if _packet[:4] == b'DMRD': - _packet = _packet[:11] + self._config['RADIO_ID'] + _packet[15:] + #if _packet[:4] == b'DMRD': + # _packet = _packet[:11] + self._config['RADIO_ID'] + _packet[15:] + _packet = b''.join([_packet[:11], self._config['RADIO_ID'], _packet[15:]]) self.transport.write(_packet, self._config['MASTER_SOCKADDR']) # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! # logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], ahex(_packet)) @@ -415,14 +411,14 @@ class HBSYSTEM(DatagramProtocol): }}) logger.info('(%s) Repeater Logging in with Radio ID: %s, %s:%s', self._system, int_id(_peer_id), _sockaddr[0], _sockaddr[1]) _salt_str = bytes_4(self._peers[_peer_id]['SALT']) - self.send_peer(_peer_id, RPTACK + _salt_str) + self.send_peer(_peer_id, b''.join([RPTACK, _salt_str])) self._peers[_peer_id]['CONNECTION'] = 'CHALLENGE_SENT' logger.info('(%s) Sent Challenge Response to %s for login: %s', self._system, int_id(_peer_id), self._peers[_peer_id]['SALT']) else: - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Invalid Login from Radio ID: %s Denied by Registation ACL', self._system, int_id(_peer_id)) else: - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Registration denied from Radio ID: %s Maximum number of peers exceeded', self._system, int_id(_peer_id)) elif _command == RPTK: # Repeater has answered our login challenge @@ -437,14 +433,14 @@ class HBSYSTEM(DatagramProtocol): _calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest()) if _sent_hash == _calc_hash: _this_peer['CONNECTION'] = 'WAITING_CONFIG' - self.send_peer(_peer_id, RPTACK + _peer_id) + self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s has completed the login exchange successfully', self._system, _this_peer['RADIO_ID']) else: logger.info('(%s) Peer %s has FAILED the login exchange successfully', self._system, _this_peer['RADIO_ID']) - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) del self._peers[_peer_id] else: - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) elif _command == RPTC: # Repeater is sending it's configuraiton OR disconnecting @@ -454,7 +450,7 @@ class HBSYSTEM(DatagramProtocol): and self._peers[_peer_id]['CONNECTION'] == 'YES' \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: logger.info('(%s) Peer is closing down: %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) del self._peers[_peer_id] else: @@ -481,10 +477,10 @@ class HBSYSTEM(DatagramProtocol): _this_peer['SOFTWARE_ID'] = _data[222:262] _this_peer['PACKAGE_ID'] = _data[262:302] - self.send_peer(_peer_id, RPTACK + _peer_id) + self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s (%s) has sent repeater configuration', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID']) else: - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Peer info from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) elif _command == RPTP: # RPTPing -- peer is pinging us @@ -494,10 +490,10 @@ class HBSYSTEM(DatagramProtocol): and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: self._peers[_peer_id]['PINGS_RECEIVED'] += 1 self._peers[_peer_id]['LAST_PING'] = time() - self.send_peer(_peer_id, MSTPONG + _peer_id) + self.send_peer(_peer_id, b''.join([MSTPONG, _peer_id])) logger.debug('(%s) Received and answered RPTPING from peer %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) else: - self.transport.write(MSTNAK + _peer_id, _sockaddr) + self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Ping from Radio ID that is not logged in: %s', self._system, int_id(_peer_id)) else: @@ -589,32 +585,34 @@ class HBSYSTEM(DatagramProtocol): if self._stats['CONNECTION'] == 'RPTL_SENT': # If we've sent a login request... _login_int32 = _data[6:10] logger.info('(%s) Repeater Login ACK Received with 32bit ID: %s', self._system, int_id(_login_int32)) - _pass_hash = sha256(_login_int32+self._config['PASSPHRASE']).hexdigest() + _pass_hash = sha256(b''.join([_login_int32, self._config['PASSPHRASE']])).hexdigest() _pass_hash = bhex(_pass_hash) - self.send_master(RPTK + self._config['RADIO_ID']+_pass_hash) + self.send_master(b''.join([RPTK, self._config['RADIO_ID'], _pass_hash])) self._stats['CONNECTION'] = 'AUTHENTICATED' elif self._stats['CONNECTION'] == 'AUTHENTICATED': # If we've sent the login challenge... _peer_id = _data[6:10] if self._config['LOOSE'] or _peer_id == self._config['RADIO_ID']: # Validate the Radio_ID unless using loose validation logger.info('(%s) Repeater Authentication Accepted', self._system) - _config_packet = self._config['RADIO_ID']+\ - self._config['CALLSIGN']+\ - self._config['RX_FREQ']+\ - self._config['TX_FREQ']+\ - self._config['TX_POWER']+\ - self._config['COLORCODE']+\ - self._config['LATITUDE']+\ - self._config['LONGITUDE']+\ - self._config['HEIGHT']+\ - self._config['LOCATION']+\ - self._config['DESCRIPTION']+\ - self._config['SLOTS']+\ - self._config['URL']+\ - self._config['SOFTWARE_ID']+\ - self._config['PACKAGE_ID'] + _config_packet = b''.join([\ + self._config['RADIO_ID'],\ + self._config['CALLSIGN'],\ + self._config['RX_FREQ'],\ + self._config['TX_FREQ'],\ + self._config['TX_POWER'],\ + self._config['COLORCODE'],\ + self._config['LATITUDE'],\ + self._config['LONGITUDE'],\ + self._config['HEIGHT'],\ + self._config['LOCATION'],\ + self._config['DESCRIPTION'],\ + self._config['SLOTS'],\ + self._config['URL'],\ + self._config['SOFTWARE_ID'],\ + self._config['PACKAGE_ID']\ + ]) - self.send_master(RPTC + _config_packet) + self.send_master(b''.join([RPTC, _config_packet])) self._stats['CONNECTION'] = 'CONFIG-SENT' logger.info('(%s) Repeater Configuration Sent', self._system) else: @@ -626,7 +624,7 @@ class HBSYSTEM(DatagramProtocol): if self._config['LOOSE'] or _peer_id == self._config['RADIO_ID']: # Validate the Radio_ID unless using loose validation logger.info('(%s) Repeater Configuration Accepted', self._system) if self._config['OPTIONS']: - self.send_master(RPTO + self._config['RADIO_ID']+self._config['OPTIONS']) + self.send_master(b''.join([RPTO, self._config['RADIO_ID'], self._config['OPTIONS']])) self._stats['CONNECTION'] = 'OPTIONS-SENT' logger.info('(%s) Sent options: (%s)', self._system, self._config['OPTIONS']) else: @@ -674,10 +672,10 @@ class report(NetstringReceiver): def connectionMade(self): self._factory.clients.append(self) - logger.info('HBlink reporting client connected: %s', self.transport.getPeer()) + logger.info('(REPORT) HBlink reporting client connected: %s', self.transport.getPeer()) def connectionLost(self, reason): - logger.info('HBlink reporting client disconnected: %s', self.transport.getPeer()) + logger.info('(REPORT) HBlink reporting client disconnected: %s', self.transport.getPeer()) self._factory.clients.remove(self) def stringReceived(self, data): @@ -686,10 +684,10 @@ class report(NetstringReceiver): def process_message(self, _message): opcode = _message[:1] if opcode == REPORT_OPCODES['CONFIG_REQ']: - logger.info('HBlink reporting client sent \'CONFIG_REQ\': %s', self.transport.getPeer()) + logger.info('(REPORT) HBlink reporting client sent \'CONFIG_REQ\': %s', self.transport.getPeer()) self.send_config() else: - logger.error('got unknown opcode') + logger.error('(REPORT) got unknown opcode') class reportFactory(Factory): def __init__(self, config): @@ -697,10 +695,10 @@ class reportFactory(Factory): def buildProtocol(self, addr): if (addr.host) in self._config['REPORTS']['REPORT_CLIENTS'] or '*' in self._config['REPORTS']['REPORT_CLIENTS']: - logger.debug('Permitting report server connection attempt from: %s:%s', addr.host, addr.port) + logger.debug('(REPORT) Permitting report server connection attempt from: %s:%s', addr.host, addr.port) return report(self) else: - logger.error('Invalid report server connection attempt from: %s:%s', addr.host, addr.port) + logger.error('(REPORT) Invalid report server connection attempt from: %s:%s', addr.host, addr.port) return None def send_clients(self, _message): @@ -709,7 +707,7 @@ class reportFactory(Factory): def send_config(self): serialized = pickle.dumps(self._config['SYSTEMS'], protocol=2) #.decode('utf-8', errors='ignore') #pickle.HIGHEST_PROTOCOL) - self.send_clients(REPORT_OPCODES['CONFIG_SND']+serialized) + self.send_clients(b''.join([REPORT_OPCODES['CONFIG_SND'], serialized])) # ID ALIAS CREATION @@ -718,23 +716,23 @@ def mk_aliases(_config): if _config['ALIASES']['TRY_DOWNLOAD'] == True: # Try updating peer aliases file result = try_download(_config['ALIASES']['PATH'], _config['ALIASES']['PEER_FILE'], _config['ALIASES']['PEER_URL'], _config['ALIASES']['STALE_TIME']) - logger.info(result) + logger.info('(GLOBAL) %s', result) # Try updating subscriber aliases file result = try_download(_config['ALIASES']['PATH'], _config['ALIASES']['SUBSCRIBER_FILE'], _config['ALIASES']['SUBSCRIBER_URL'], _config['ALIASES']['STALE_TIME']) - logger.info(result) + logger.info('(GLOBAL) %s', result) # Make Dictionaries peer_ids = mk_id_dict(_config['ALIASES']['PATH'], _config['ALIASES']['PEER_FILE']) if peer_ids: - logger.info('ID ALIAS MAPPER: peer_ids dictionary is available') + logger.info('(GLOBAL) ID ALIAS MAPPER: peer_ids dictionary is available') subscriber_ids = mk_id_dict(_config['ALIASES']['PATH'], _config['ALIASES']['SUBSCRIBER_FILE']) if subscriber_ids: - logger.info('ID ALIAS MAPPER: subscriber_ids dictionary is available') + logger.info('(GLOBAL) ID ALIAS MAPPER: subscriber_ids dictionary is available') talkgroup_ids = mk_id_dict(_config['ALIASES']['PATH'], _config['ALIASES']['TGID_FILE']) if talkgroup_ids: - logger.info('ID ALIAS MAPPER: talkgroup_ids dictionary is available') + logger.info('(GLOBAL) ID ALIAS MAPPER: talkgroup_ids dictionary is available') return peer_ids, subscriber_ids, talkgroup_ids @@ -771,13 +769,13 @@ if __name__ == '__main__': CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL logger = log.config_logging(CONFIG['LOGGER']) logger.info('\n\nCopyright (c) 2013, 2014, 2015, 2016, 2018\n\tThe Founding Members of the K0USY Group. All rights reserved.\n') - logger.debug('Logging system started, anything from here on gets logged') + logger.debug('(GLOBAL) Logging system started, anything from here on gets logged') # Set up the signal handler def sig_handler(_signal, _frame): - logger.info('SHUTDOWN: HBLINK IS TERMINATING WITH SIGNAL %s', str(_signal)) + logger.info('(GLOBAL) SHUTDOWN: HBLINK IS TERMINATING WITH SIGNAL %s', str(_signal)) hblink_handler(_signal, _frame) - logger.info('SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') + logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') reactor.stop() # Set signal handers so that we can gracefully exit if need be @@ -787,10 +785,14 @@ if __name__ == '__main__': peer_ids, subscriber_ids, talkgroup_ids = mk_aliases(CONFIG) # INITIALIZE THE REPORTING LOOP - report_server = config_reports(CONFIG, reportFactory) + if CONFIG['REPORTS']['REPORT']: + report_server = config_reports(CONFIG, reportFactory) + else: + report_server = None + logger.info('(REPORT) TCP Socket reporting not configured') # HBlink instance creation - logger.info('HBlink \'HBlink.py\' -- SYSTEM STARTING...') + logger.info('(GLOBAL) HBlink \'HBlink.py\' -- SYSTEM STARTING...') for system in CONFIG['SYSTEMS']: if CONFIG['SYSTEMS'][system]['ENABLED']: if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -798,6 +800,6 @@ if __name__ == '__main__': else: systems[system] = HBSYSTEM(system, CONFIG, report_server) reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) - logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) + logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) reactor.run()