implement subscriber id tracking via SVRD

This commit is contained in:
KF7EEL 2021-09-06 11:20:47 -07:00
parent 9690cdf98c
commit 370c4694ac
3 changed files with 102 additions and 6 deletions

View File

@ -338,6 +338,16 @@ def config_reports(_config, _factory):
return report_server
# Send data to all OBP connections that have an encryption key. Data such as subscribers are sent to other HBNet servers.
def svrd_send_all(_svrd_data):
_svrd_packet = SVRD
for system in CONFIG['SYSTEMS']:
if CONFIG['SYSTEMS'][system]['ENABLED']:
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
if CONFIG['SYSTEMS'][system]['ENCRYPTION_KEY'] != b'':
systems[system].send_system(_svrd_packet + _svrd_data)
## pass
# Import Bridging rules
# Note: A stanza *must* exist for any MASTER or CLIENT configured in the main
@ -405,6 +415,8 @@ def rule_timer_loop(unit_flood_time):
else:
logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
for unit in UNIT_MAP:
svrd_send_all(b'UNIT' + unit)
_then = _now - unit_flood_time
remove_list = []
for unit in UNIT_MAP:
@ -423,6 +435,7 @@ def rule_timer_loop(unit_flood_time):
# run this every 10 seconds to trim orphaned stream ids
def stream_trimmer_loop():
print(UNIT_MAP)
ping(CONFIG)
logger.debug('(ROUTER) Trimming inactive stream IDs from system lists')
_now = time()
@ -482,6 +495,12 @@ class routerOBP(OPENBRIDGE):
# list of self._targets for unit (subscriber, private) calls
self._targets = []
def svrd_received(self, _mode, _data):
logger.info('SVRD Received. Mode: ' + str(_mode) + ' Data: ' + str(_data))
if _mode == b'UNIT':
UNIT_MAP[_data] = (self._system, time())
def group_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data):
pkt_time = time()
dmrpkt = _data[20:53]
@ -685,6 +704,9 @@ class routerOBP(OPENBRIDGE):
# Make/update this unit in the UNIT_MAP cache
UNIT_MAP[_rf_src] = (self.name, pkt_time)
# Send update to all OpenBridge connections
## svrd_send_all(b'UNIT' + _rf_src) # + b'TIME' + pkt_time)
# Is this a new call stream?
if (_stream_id not in self.STATUS):
@ -910,6 +932,9 @@ class routerHBP(HBSYSTEM):
# Make/update an entry in the UNIT_MAP for this subscriber
UNIT_MAP[_rf_src] = (self.name, pkt_time)
# Update other servers via OBP
## svrd_send_all(b'UNIT' + _rf_src) # + b'TIME' + pkt_time)
# Is this a new call stream?
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']):
if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']):
@ -917,6 +942,10 @@ class routerHBP(HBSYSTEM):
return
# This is a new call stream
# Send subscriber ID over OBP
svrd_send_all(b'UNIT' + _rf_src)
self.STATUS[_slot]['RX_START'] = pkt_time
logger.info('(%s) *GROUP CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot)
@ -1187,6 +1216,9 @@ class routerHBP(HBSYSTEM):
# Make/update this unit in the UNIT_MAP cache
UNIT_MAP[_rf_src] = (self.name, pkt_time)
# Update other servers via OBP
## svrd_send_all(b'UNIT' + _rf_src) # + b'TIME' + pkt_time)
# Is this a new call stream?
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']):
@ -1208,6 +1240,7 @@ class routerHBP(HBSYSTEM):
self._targets.remove(self._system)
# This is a new call stream, so log & report
svrd_send_all(b'UNIT' + _rf_src)
self.STATUS[_slot]['RX_START'] = pkt_time
logger.info('(%s) *UNIT CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) UNIT: %s (%s), TS: %s, FORWARD: %s', \
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, self._targets)

View File

@ -111,6 +111,7 @@ __email__ = 'n0mjs@me.com'
hdr_type = ''
btf = -1
ssid = ''
UNIT_MAP = {}
# From dmr_utils3, modified to decode entire packet. Works for 1/2 rate coded data.
def decode_full(_data):
@ -889,6 +890,30 @@ def data_que_send():
except Exception as e:
logger.info(e)
# the APRS RX process
def aprs_rx(aprs_rx_login, aprs_passcode, aprs_server, aprs_port, aprs_filter, user_ssid):
global AIS
AIS = aprslib.IS(aprs_rx_login, passwd=int(aprs_passcode), host=aprs_server, port=int(aprs_port))
user_settings = ast.literal_eval(os.popen('cat ' + user_settings_file).read())
AIS.set_filter(aprs_filter)#parser.get('DATA_CONFIG', 'APRS_FILTER'))
try:
if 'N0CALL' in aprs_callsign:
logger.info()
logger.info('APRS callsighn set to N0CALL, not connecting to APRS-IS')
logger.info()
pass
else:
AIS.connect()
print('Connecting to APRS-IS')
if int(CONFIG['DATA_CONFIG']['IGATE_BEACON_TIME']) == 0:
logger.info('APRS beacon disabled')
if int(CONFIG['DATA_CONFIG']['IGATE_BEACON_TIME']) != 0:
aprs_beacon=task.LoopingCall(aprs_beacon_send)
aprs_beacon.start(int(CONFIG['DATA_CONFIG']['IGATE_BEACON_TIME'])*60)
AIS.consumer(aprs_process, raw=True, immortal=False)
except Exception as e:
logger.info(e)
##### DMR data function ####
def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
# Capture data headers
@ -1142,7 +1167,21 @@ def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _fr
######
##call_type = 'unit'
def rule_timer_loop():
global UNIT_MAP
logger.debug('(ROUTER) routerHBP Rule timer loop started')
_now = time()
_then = _now - 60
remove_list = []
for unit in UNIT_MAP:
if UNIT_MAP[unit][1] < (_then):
remove_list.append(unit)
for unit in remove_list:
del UNIT_MAP[unit]
logger.debug('Removed unit(s) %s from UNIT_MAP', remove_list)
class OBP(OPENBRIDGE):
@ -1151,9 +1190,13 @@ class OBP(OPENBRIDGE):
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
UNIT_MAP[_rf_src] = (self._system, time())
print('OBP RCVD')
data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
## pass
def svrd_received(self, _mode, _data):
if _mode == b'UNIT':
UNIT_MAP[_data] = (self._system, time())
class HBP(HBSYSTEM):
@ -1162,6 +1205,7 @@ class HBP(HBSYSTEM):
HBSYSTEM.__init__(self, _name, _config, _report)
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
UNIT_MAP[_rf_src] = (self._system, time())
print('MMDVM RCVD')
data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
## pass
@ -1307,5 +1351,17 @@ if __name__ == '__main__':
logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure)
reactor.stop()
# Initialize the rule timer -- this if for user activated stuff
rule_timer_task = task.LoopingCall(rule_timer_loop)
rule_timer = rule_timer_task.start(60)
rule_timer.addErrback(loopingErrHandle)
if 'N0CALL' in aprs_callsign:
logger.info('APRS callsighn set to N0CALL, packet not sent.')
pass
else:
aprs_thread = threading.Thread(target=aprs_rx, args=(aprs_callsign, aprs_passcode, aprs_server, aprs_port, aprs_filter, user_ssid,))
aprs_thread.daemon = True
aprs_thread.start()
reactor.run()

View File

@ -172,6 +172,7 @@ class OPENBRIDGE(DatagramProtocol):
# logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet))
# Special Server Data packet, encrypted using frenet, send
elif _packet[:4] == SVRD:
print(_packet)
_enc_pkt = encrypt_packet(self._config['ENCRYPTION_KEY'], _packet)
_packet = b'SVRD' + _enc_pkt
self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT']))
@ -183,6 +184,9 @@ class OPENBRIDGE(DatagramProtocol):
pass
#print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id))
def svrd_received(self, _mode, _data):
pass
def datagramReceived(self, _packet, _sockaddr):
# Keep This Line Commented Unless HEAVILY Debugging!
## logger.debug('(%s) RX packet from %s -- %s', self._system, _sockaddr, ahex(_packet))
@ -196,10 +200,10 @@ class OPENBRIDGE(DatagramProtocol):
_data = _packet[:53]
_hash = _packet[53:]
_ckhs = hmac_new(self._config['PASSPHRASE'],_data,sha1).digest()
print(ahex(_ckhs))
print(ahex(_hash))
## print(ahex(_ckhs))
## print(ahex(_hash))
print(compare_digest(_hash, _ckhs))
## print(compare_digest(_hash, _ckhs))
if compare_digest(_hash, _ckhs) and _sockaddr == self._config['TARGET_SOCK']:
_peer_id = _data[11:15]
@ -261,6 +265,7 @@ class OPENBRIDGE(DatagramProtocol):
# Server Data packet, decrypt and process it.
elif _packet[:4] == SVRD:
_d_pkt = decrypt_packet(self._config['ENCRYPTION_KEY'], _packet[4:])
## logger.info('SVRD Received: ' + str(_d_pkt))
# DMR Data packet, sent via SVRD
if _d_pkt[:4] == b'DATA':
@ -282,6 +287,8 @@ class OPENBRIDGE(DatagramProtocol):
_dtype_vseq = (_bits & 0xF) # data, 1=voice header, 2=voice terminator; voice, 0=burst A ... 5=burst F
_stream_id = _data[16:20]
self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
else:
self.svrd_received(_d_pkt[:4], _d_pkt[4:])
#************************************************
# HB MASTER CLASS