Merge branch 'decoding-full-dmr'

This commit is contained in:
Cort Buffington 2016-11-19 17:51:03 -06:00
commit 521dccc3ca
15 changed files with 4532 additions and 1245 deletions

207
bptc.py
View File

@ -8,8 +8,7 @@
from __future__ import print_function
from bitarray import bitarray
import hamming
from time import time
import hamming, crc, rs129
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Cortney T. Buffington, N0MJS'
@ -42,46 +41,21 @@ INDEX_181 = (
# BPTC(196,96) Decoding Routings
#------------------------------------------------------------------------------
# Converts a DMR frame using 98-68-98 (info-sync/EMB-info) into 196 bit array
# Applies interleave indecies de-interleave 196 bit array
def deinterleave_19696(_data):
deint = bitarray(196, endian='big')
for index in xrange(196):
deint[index] = _data[INDEX_181[index]] # the real math is slower: deint[index] = _data[(index * 181) % 196]
return deint
# Applies BTPC error detection/correction routines
# This routine, in practice, will not be used in HBlink or DMRlink - it's only usefull for OTA direct data
def error_check_19696(_data):
count = 0
column = bitarray(13, endian='big')
while True:
errors = False
for col in xrange(15):
pos = col + 1
for index in xrange(13):
column[index] = _data[pos]
pos += 15
result_1393 = hamming.dec_1393(column)
if result_1393[1]:
pos = col + 1
for index in xrange(13):
_data[pos] = result_1393[0][index]
pos += 15
errors = True
for index in xrange(9):
pos = (index*15) + 1
result_15113 = hamming.dec_15113(_data[pos:(pos+15)])
if result_15113[1]:
errors = True
_data[pos:(pos+15)] = result_15113[0]
count += 1
if not errors or count > 4: break
return (errors)
def decode_full_lc(_data):
binlc = bitarray(endian='big')
binlc.extend([_data[136],_data[121],_data[106],_data[91], _data[76], _data[61], _data[46], _data[31]])
binlc.extend([_data[152],_data[137],_data[122],_data[107],_data[92], _data[77], _data[62], _data[47], _data[32], _data[17], _data[2] ])
binlc.extend([_data[123],_data[108],_data[93], _data[78], _data[63], _data[48], _data[33], _data[18], _data[3], _data[184],_data[169]])
binlc.extend([_data[94], _data[79], _data[64], _data[49], _data[34], _data[19], _data[4], _data[185],_data[170],_data[155],_data[140]])
binlc.extend([_data[65], _data[50], _data[35], _data[20], _data[5], _data[186],_data[171],_data[156],_data[141],_data[126],_data[111]])
binlc.extend([_data[36], _data[21], _data[6], _data[187],_data[172],_data[157],_data[142],_data[127],_data[112],_data[97], _data[82] ])
binlc.extend([_data[7], _data[188],_data[173],_data[158],_data[143],_data[128],_data[113],_data[98], _data[83]])
'''
This is the rest of the Full LC data -- the RS1293 FEC that we don't need
_data[68],_data[53],_data[174],_data[159],_data[144],_data[129],_data[114],_data[99],_data[84],_data[69],_data[54],_data[39],
_data[24],_data[145],_data[130],_data[115],_data[100],_data[85],_data[70],_data[55],_data[40],_data[25],_data[10],_data[191]
'''
return binlc
#------------------------------------------------------------------------------
# BPTC(196,96) Encoding Routings
@ -133,11 +107,34 @@ def encode_19696(_data):
return _bdata
def encode_header_lc(_lc):
full_lc = _lc + rs129.lc_header_encode(_lc)
full_lc = encode_19696(full_lc)
full_lc = interleave_19696(full_lc)
return full_lc
def encode_terminator_lc(_lc):
full_lc = _lc + rs129.lc_terminator_encode(_lc)
full_lc = encode_19696(full_lc)
full_lc = interleave_19696(full_lc)
return full_lc
#------------------------------------------------------------------------------
# BPTC Embedded LC Decoding Routines
#------------------------------------------------------------------------------
def decode_emblc(_elc):
_binlc = bitarray(endian='big')
_binlc.extend([_elc[0],_elc[8], _elc[16],_elc[24],_elc[32],_elc[40],_elc[48],_elc[56],_elc[64],_elc[72] ,_elc[80]])
_binlc.extend([_elc[1],_elc[9], _elc[17],_elc[25],_elc[33],_elc[41],_elc[49],_elc[57],_elc[65],_elc[73] ,_elc[81]])
_binlc.extend([_elc[2],_elc[10],_elc[18],_elc[26],_elc[34],_elc[42],_elc[50],_elc[58],_elc[66],_elc[74]])
_binlc.extend([_elc[3],_elc[11],_elc[19],_elc[27],_elc[35],_elc[43],_elc[51],_elc[59],_elc[67],_elc[75]])
_binlc.extend([_elc[4],_elc[12],_elc[20],_elc[28],_elc[36],_elc[44],_elc[52],_elc[60],_elc[68],_elc[76]])
_binlc.extend([_elc[5],_elc[13],_elc[21],_elc[29],_elc[37],_elc[45],_elc[53],_elc[61],_elc[69],_elc[77]])
_binlc.extend([_elc[6],_elc[14],_elc[22],_elc[30],_elc[38],_elc[46],_elc[54],_elc[62],_elc[70],_elc[78]])
return(_binlc.tobytes())
#------------------------------------------------------------------------------
# BPTC Embedded LC Encoding Routines
@ -146,96 +143,108 @@ def encode_19696(_data):
# Accepts 12 byte LC header + 5-bit checksum, converts to binary and builts out the BPTC
# encoded result with hamming(16,11,4) and parity.
def encode_emblc(_lc, _csum5):
def encode_emblc(_lc):
# Get the 5-bit checksum for the Embedded LC
_csum = crc.csum5(_lc)
# Create a bitarray from the 4 bytes of LC data (includes 5-bit checksum).
_binlc = bitarray(endian='big')
_binlc.frombytes(_lc)
# Insert the checksum bits at the right location in the matrix (this is actually faster than with a for loop)
_binlc.insert(32,_csum5[0])
_binlc.insert(43,_csum5[1])
_binlc.insert(54,_csum5[2])
_binlc.insert(65,_csum5[3])
_binlc.insert(76,_csum5[4])
_binlc.insert(32,_csum[0])
_binlc.insert(43,_csum[1])
_binlc.insert(54,_csum[2])
_binlc.insert(65,_csum[3])
_binlc.insert(76,_csum[4])
# Insert the hamming bits at the right location in the matrix
for index in xrange(0,112,16):
for hindex,hbit in zip(xrange(index+11,index+16), hamming.enc_16114(_binlc[index:index+11])):
_binlc.insert(hindex,hbit)
# Insert the column parity bits at the right location in the matrix
for index in xrange(0,16):
_binlc.insert(index+112, _binlc[index+0] ^ _binlc[index+16] ^ _binlc[index+32] ^ _binlc[index+48] ^ _binlc[index+64] ^ _binlc[index+80] ^ _binlc[index+96])
# TO DO NEXT:
# INTERLEAVE, RETURN A TUPLE OR LIBRARY OR EACH SEGMENT OF THE LC
# EACH SEGMENT IS 4 COLUMNS, TOP TO BOTTOM, LEFT TO RIGHT (PAGE 124 ETSI)
# Create Embedded LC segments in 48 bit blocks
emblc_b = bitarray(endian='big')
emblc_b.extend([_binlc[0], _binlc[16],_binlc[32],_binlc[48],_binlc[64],_binlc[80],_binlc[96], _binlc[112]])
emblc_b.extend([_binlc[1], _binlc[17],_binlc[33],_binlc[49],_binlc[65],_binlc[81],_binlc[97], _binlc[113]])
emblc_b.extend([_binlc[2], _binlc[18],_binlc[34],_binlc[50],_binlc[66],_binlc[82],_binlc[98], _binlc[114]])
emblc_b.extend([_binlc[3], _binlc[19],_binlc[35],_binlc[51],_binlc[67],_binlc[83],_binlc[99], _binlc[115]])
emblc_c = bitarray(endian='big')
emblc_c.extend([_binlc[4], _binlc[20],_binlc[36],_binlc[52],_binlc[68],_binlc[84],_binlc[100],_binlc[116]])
emblc_c.extend([_binlc[5], _binlc[21],_binlc[37],_binlc[53],_binlc[69],_binlc[85],_binlc[101],_binlc[117]])
emblc_c.extend([_binlc[6], _binlc[22],_binlc[38],_binlc[54],_binlc[70],_binlc[86],_binlc[102],_binlc[118]])
emblc_c.extend([_binlc[7], _binlc[23],_binlc[39],_binlc[55],_binlc[71],_binlc[87],_binlc[103],_binlc[119]])
emblc_d = bitarray(endian='big')
emblc_d.extend([_binlc[8], _binlc[24],_binlc[40],_binlc[56],_binlc[72],_binlc[88],_binlc[104],_binlc[120]])
emblc_d.extend([_binlc[9], _binlc[24],_binlc[41],_binlc[57],_binlc[73],_binlc[89],_binlc[105],_binlc[121]])
emblc_d.extend([_binlc[10],_binlc[26],_binlc[42],_binlc[58],_binlc[74],_binlc[90],_binlc[106],_binlc[122]])
emblc_d.extend([_binlc[11],_binlc[27],_binlc[43],_binlc[59],_binlc[75],_binlc[91],_binlc[107],_binlc[123]])
emblc_e = bitarray(endian='big')
emblc_e.extend([_binlc[12],_binlc[28],_binlc[44],_binlc[60],_binlc[76],_binlc[92],_binlc[108],_binlc[124]])
emblc_e.extend([_binlc[13],_binlc[29],_binlc[45],_binlc[61],_binlc[77],_binlc[93],_binlc[109],_binlc[125]])
emblc_e.extend([_binlc[14],_binlc[30],_binlc[46],_binlc[62],_binlc[78],_binlc[94],_binlc[110],_binlc[126]])
emblc_e.extend([_binlc[15],_binlc[31],_binlc[47],_binlc[63],_binlc[79],_binlc[95],_binlc[111],_binlc[127]])
return({1: emblc_b, 2: emblc_c, 3: emblc_d, 4: emblc_e})
#------------------------------------------------------------------------------
# Used to execute the module directly to run built-in tests
#------------------------------------------------------------------------------
if __name__ == '__main__':
from binascii import b2a_hex as h
from time import time
import crc
def to_bytes(_bits):
#add_bits = 8 - (len(_bits) % 8)
#if add_bits < 8:
# for bit in xrange(add_bits):
# _bits.insert(0,0)
_string = _bits.tobytes()
return _string
# Validation Example
orig_data = '\x00\x10\x20\x00\x0c\x30\x2f\x9b\xe5\xda\xd4\x5a'
voice_h = '\x2b\x60\x04\x10\x1f\x84\x2d\xd0\x0d\xf0\x7d\x41\x04\x6d\xff\x57\xd7\x5d\xf5\xde\x30\x15\x2e\x20\x70\xb2\x0f\x80\x3f\x88\xc6\x95\xe2'
voice_hb = bitarray(endian='big')
voice_hb.frombytes(voice_h)
voice_hb = voice_hb[0:98] + voice_hb[166:264]
# Header LC -- Terminator similar
lc = '\x00\x10\x20\x00\x0c\x30\x2f\x9b\xe5' # \xda\xd4\x5a
t0 = time()
enc_data = encode_19696(orig_data)
inter_data = interleave_19696(enc_data)
full_lc_encode = encode_header_lc(lc)
t1 = time()
encode_time = t1-t0
# Good Data
dec_data = '\x2b\x60\x04\x10\x1f\x84\x2d\xd0\x0d\xf0\x7d\x41\x04\x6d\xff\x57\xd7\x5d\xf5\xde\x30\x15\x2e\x20\x70\xb2\x0f\x80\x3f\x88\xc6\x95\xe2'
# Bad Data
#dec_data = '\x2b\x60\xff\xff\xff\x85\x2d\xd0\x0d\xf0\x7d\x41\x04\x6d\xff\x57\xd7\x5d\xf5\xde\x30\x15\x2e\x20\x70\xb2\x0f\x80\x3f\x88\xc6\x95\xe2'
dec_bits = bitarray(endian='big')
dec_bits.frombytes(dec_data)
dec_bits = dec_bits[0:98] + dec_bits[166:264]
t0 = time()
deint_data = deinterleave_19696(dec_bits)
err_corrected = error_check_19696(deint_data) # This corrects deint_data in place -- it does not return a new array!!!
ext_data = to_bytes(deint_data)
full_lc_dec = decode_full_lc(full_lc_encode)
t1 = time()
decode_time = t1-t0
print('VALIDATION ROUTINE:')
print('VALIDATION ROUTINES:')
print('Orig Data: {}, {} bytes'.format(h(lc), len(lc)))
print('Orig Encoded: {}, {} bytes'.format(h(voice_hb), len(voice_hb.tobytes())))
print()
print('ENCODER TEST:')
print('Original Data: {}, {} bytes'.format(h(orig_data), len(orig_data)))
print('BPTC(196,96):')
print('Encoded data: {}, {} bytes'.format(h(full_lc_encode.tobytes()), len(full_lc_encode.tobytes())))
print('Encoding time: {} seconds'.format(encode_time))
print('Encoded data: {}, {} bits'.format(enc_data, len(enc_data)))
print()
print('DECODER TEST:')
print('Encoded data: {}, {} bytes'.format(h(dec_data), len(dec_data)))
print('Decoding Time: {} seconds'.format(t1-t0))
if err_corrected:
print('WARNING DATA COULD NOT BE CORRECTED')
else:
print('Decoded Data: {}, {} bytes'.format(h(ext_data), len(ext_data)))
print()
print('Decoded data: {}'.format(h(full_lc_dec)))
print('Decode Time: {} seconds'.format(decode_time))
# Embedded LC
t0 = time()
emblc = encode_emblc(lc)
t1 = time()
encode_time = t1 -t0
print('ENCODED vs. DECODED:')
print('enc:', enc_data)
print('dec:', deint_data)
print(enc_data == deint_data)
t0 = time()
decemblc = decode_emblc(emblc[0] + emblc[1] + emblc[2] + emblc[3])
t1 = time()
decode_time = t1 -t0
orig_data = '\x00\x10\x20\x00\x0c\x30\x2f\x9b\xe5'
orig_csum = crc.csum5(orig_data)
emblc = encode_emblc(orig_data, orig_csum)
print('\nEMBEDDED LC:')
print('Encoded Data: Burst B:{} Burst C:{} Burst D:{} Burst E:{}'.format(h(emblc[0].tobytes()), h(emblc[1].tobytes()), h(emblc[2].tobytes()), h(emblc[3].tobytes())))
print('Endoding Time: {}'.format(encode_time))
print('Decoded data: {}'.format(h(decemblc)))
print('Decoding Time: {}'.format(decode_time))

102
constants.py Normal file → Executable file
View File

@ -1,18 +1,108 @@
#!/usr/bin/env python
#
# This work is licensed under the Creative Attribution-NonCommercial-ShareAlike
# 3.0 Unported License.To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
# Creative Commons, 444 Castro Street, Suite 900, Mountain View,
# California, 94041, USA.
from __future__ import print_function
from bitarray import bitarray
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Cortney T. Buffington, N0MJS'
__copyright__ = 'Copyright (c) 2016 Cortney T. Buffington, N0MJS and the K0USY Group'
__credits__ = ''
__license__ = 'Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported'
__maintainer__ = 'Cort Buffington, N0MJS'
__email__ = 'n0mjs@me.com'
# Timers
STREAM_TO = .360
# HomeBrew Protocol Frame Types
HBPF_VOICE = 0x0
HBPF_VOICE_SYNC = 0x1
HBPF_DATA_SYNC = 0x2
HBPF_SLT_VHEAD = 0x1
HBPF_SLT_VTERM = 0x2
# Slot Type Data types
DMR_SLT_VHEAD = '\x01'
DMR_SLT_VTERM = '\x02'
# Sync patterns used for LC and Voice Burst A packets
BS_VOICE_SYNC = bitarray()
BS_DATA_SYNC = bitarray()
BS_VOICE_SYNC.frombytes(b'\x75\x5F\xD7\xDF\x75\xF7')
BS_DATA_SYNC.frombytes(b'\xDF\xF5\x7D\x75\xDF\x5D')
LCSS_SINGLE_FRAG = bitarray('00')
LCSS_FIRST_FRAG = bitarray('01')
LCSS_LAST_FRAG = bitarray('10')
LCSS_CONT_FRAG = bitarray('11')
SYNC = {
'BS_VOICE': BS_VOICE_SYNC,
'BS_DATA': BS_DATA_SYNC
}
# LC Options - Use for Group Voice
LC_OPT = '\x00\x00\x20'
# Precomputed EMB values, where CC always = 1, and PI always = 0
EMB = {
'BURST_B': bitarray('0001001110010001'),
'BURST_C': bitarray('0001011101110100'),
'BURST_D': bitarray('0001011101110100'),
'BURST_E': bitarray('0001010100000111'),
'BURST_F': bitarray('0001000111100010')
}
# Precomputed Slot Type values where CC always = 1
SLOT_TYPE = {
'PI_HEAD': bitarray('00010000001101100111'),
'VOICE_LC_HEAD': bitarray('00010001101110001100'),
'VOICE_LC_TERM': bitarray('00010010101001011001'),
'CSBK': bitarray('00010011001010110010'),
'MBC_HEAD': bitarray('00010100100111110000'),
'MBC_CONT': bitarray('00010101000100011011'),
'DATA_HEAD': bitarray('00010110000011001110'),
'1/2_DATA': bitarray('00010111100000100101'),
'3/4_DATA': bitarray('00011000111010100001'),
'IDLE': bitarray('00011001011001001010'),
'1/1_DATA': bitarray('00011010011110011111'),
'RES_1': bitarray('00011011111101110100'),
'RES_2': bitarray('00011100010000110110'),
'RES_3': bitarray('00011101110011011101'),
'RES_4': bitarray('00011110110100001000'),
'RES_5': bitarray('00011111010111100011')
}
# LC infor for first 3 Bytes:
# Byte 1: PF (1),Res(1),FLCO(6) -- Byte 2: FID(8) -- Byte 3: Service Options(8)
LC_VOICE = {
'FLCO-GRP': bitarray('00000000'),
'FLCO-USR': bitarray('00000011'),
'FID-GENC': bitarray('00000000'),
'FID-MOTO': bitarray('00010000'),
'SVC-OVCM': bitarray('00100000'),
'SVC-NONE': bitarray('00000000')
}
'''
EMB: CC(4b), PI(1b), LCSS(2b), EMB Parity(9b - QR 16,7,5)
Slot Type: CC(4b), DataType(4), Slot Type Parity(12b - )
'''
'''
#------------------------------------------------------------------------------
# Used to execute the module directly to run built-in tests
#------------------------------------------------------------------------------
if __name__ == '__main__':
from binascii import b2a_hex as h
from time import time
from pprint import pprint
pprint(SYNC)
pprint(EMB)
pprint(SLOT_TYPE)
print(LC_OPT)

View File

@ -10,7 +10,7 @@ from __future__ import print_function
from bitarray import bitarray
import bptc
import constants as const
#import constants as const
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Cortney T. Buffington, N0MJS'
@ -25,52 +25,39 @@ def to_bits(_string):
_bits.frombytes(_string)
return _bits
# Returns useable LC data - 9 bytes info + 3 bytes RS(12,9) ECC
def lc_data_12(_data):
return _data[4:12]+_data[16:27]+_data[31:42]+_data[46:57]+_data[61:72]+_data[76:87]+_data[91:102]+_data[106:117]+_data[121:132]
# Returns useable LC data - 9 bytes info, no ECC
def lc_data_9(_data):
return _data[4:12]+_data[16:27]+_data[31:42]+_data[46:57]+_data[61:72]+_data[76:87]+_data[91:100]
def voice_head_term(_string):
burst = to_bits(_string)
info = burst[0:98] + burst[166:264]
de_int_info = bptc.deinterleave_19696(info)
slot_type = burst[98:108] + burst[156:166]
sync = burst[108:156]
if sync == const.BS_DATA_SYNC:
sync = True
else:
sync = False
lc = to_bytes(lc_data_9(de_int_info))
lc = bptc.decode_full_lc(info).tobytes()
cc = to_bytes(slot_type[0:4])
dtype = to_bytes(slot_type[4:8])
return (lc, cc, dtype, sync)
return {'LC': lc, 'CC': cc, 'DTYPE': dtype, 'SYNC': sync}
def voice_burst(_string):
def voice_sync(_string):
burst = to_bits(_string)
ambe = [0,0,0]
ambe[0] = burst[0:72]
ambe[1] = burst[72:108] + burst[156:192]
ambe[2] = burst[192:264]
sync = burst [108:156]
if sync == const.BS_VOICE_SYNC:
cc = bitarray('00')
lcss = bitarray('00')
sync = True
else:
emb = burst[108:116] + burst[148:156]
embeded = burst[116:148]
cc = (emb[0:4])
# pi = (emb[4:5])
lcss = (emb[5:7])
sync = False
if not sync and lcss == const.LCSS_FIRST_FRAG or lcss == const.LCSS_CONT_FRAG or lcss == const.LCSS_LAST_FRAG:
pass
return (ambe, cc, lcss, sync)
sync = burst[108:156]
return {'AMBE': ambe, 'SYNC': sync}
def voice(_string):
burst = to_bits(_string)
ambe = [0,0,0]
ambe[0] = burst[0:72]
ambe[1] = burst[72:108] + burst[156:192]
ambe[2] = burst[192:264]
emb = burst[108:116] + burst[148:156]
embed = burst[116:148]
cc = (to_bytes(emb[0:4]))
lcss = (to_bytes(emb[5:7]))
return {'AMBE': ambe, 'CC': cc, 'LCSS': lcss, 'EMBED': embed}
def to_bytes(_bits):
@ -102,59 +89,85 @@ if __name__ == '__main__':
voice_f = '\xee\xe7\x81\x75\x74\x61\x4d\xf2\xff\xcc\xf4\xa0\x55\x11\x10\x00\x00\x00\x0e\x24\x30\x59\xe7\xf9\xe9\x08\xa0\x75\x62\x02\xcc\xd6\x22'
voice_term = '\x2b\x0f\x04\xc4\x1f\x34\x2d\xa8\x0d\x80\x7d\xe1\x04\xad\xff\x57\xd7\x5d\xf5\xd9\x65\x01\x2d\x18\x77\xd2\x03\xc0\x37\x88\xdf\x95\xd1'
embed_lc = bitarray()
print('Header Validation:')
print('DMR PACKET DECODER VALIDATION\n')
print('Header:')
t0 = time()
lc = voice_head_term(data_head)
t1 = time()
print(h(lc[0]), h(lc[1]), h(lc[2]), lc[3])
print(t1-t0, '\n')
print('LC: OPT-{} SRC-{} DST-{}, SLOT TYPE: CC-{} DTYPE-{}'.format(h(lc['LC'][0:3]),h(lc['LC'][3:6]),h(lc['LC'][6:9]),h(lc['CC']),h(lc['DTYPE'])))
print('Decode Time: {}\n'.format(t1-t0))
print('Voice Burst A Validation:')
print('Voice Burst A:')
t0 = time()
lc = voice_burst(voice_a)
pkt = voice_sync(voice_a)
t1 = time()
print(lc[0], h(lc[1]), h(lc[2]), lc[3])
print('VOICE SYNC: {}'.format(h(lc['SYNC'])))
print('AMBE 0: {}, {}'.format(pkt['AMBE'][0], len(pkt['AMBE'][0])))
print('AMBE 1: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][1])))
print('AMBE 2: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][2])))
print(t1-t0, '\n')
print('Voice Burst B Validation:')
print('Voice Burst B:')
t0 = time()
lc = voice_burst(voice_b)
pkt = voice(voice_b)
embed_lc += pkt['EMBED']
t1 = time()
print(lc[0], h(lc[1]), h(lc[2]), lc[3])
print('EMB: CC-{} LCSS-{}, EMBEDDED LC: {}'.format(h(pkt['CC']), h(pkt['LCSS']), h(pkt['EMBED'].tobytes())))
print('AMBE 0: {}, {}'.format(pkt['AMBE'][0], len(pkt['AMBE'][0])))
print('AMBE 1: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][1])))
print('AMBE 2: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][2])))
print(t1-t0, '\n')
print('Voice Burst C Validation:')
print('Voice Burst C:')
t0 = time()
lc = voice_burst(voice_c)
pkt = voice(voice_c)
embed_lc += pkt['EMBED']
t1 = time()
print(lc[0], h(lc[1]), h(lc[2]), lc[3])
print('EMB: CC-{} LCSS-{}, EMBEDDED LC: {}'.format(h(pkt['CC']), h(pkt['LCSS']), h(pkt['EMBED'].tobytes())))
print('AMBE 0: {}, {}'.format(pkt['AMBE'][0], len(pkt['AMBE'][0])))
print('AMBE 1: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][1])))
print('AMBE 2: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][2])))
print(t1-t0, '\n')
print('Voice Burst D Validation:')
print('Voice Burst D:')
t0 = time()
lc = voice_burst(voice_d)
pkt = voice(voice_d)
embed_lc += pkt['EMBED']
t1 = time()
print(lc[0], h(lc[1]), h(lc[2]), lc[3])
print('EMB: CC-{} LCSS-{}, EMBEDDED LC: {}'.format(h(pkt['CC']), h(pkt['LCSS']), h(pkt['EMBED'].tobytes())))
print('AMBE 0: {}, {}'.format(pkt['AMBE'][0], len(pkt['AMBE'][0])))
print('AMBE 1: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][1])))
print('AMBE 2: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][2])))
print(t1-t0, '\n')
print('Voice Burst E Validation:')
print('Voice Burst E:')
t0 = time()
lc = voice_burst(voice_e)
pkt = voice(voice_e)
embed_lc += pkt['EMBED']
embed_lc = bptc.decode_emblc(embed_lc)
t1 = time()
print(lc[0], h(lc[1]), h(lc[2]), lc[3])
print('EMB: CC-{} LCSS-{}, EMBEDDED LC: {}'.format(h(pkt['CC']), h(pkt['LCSS']), h(pkt['EMBED'].tobytes())))
print('COMPLETE EMBEDDED LC: {}'.format(h(embed_lc)))
print('AMBE 0: {}, {}'.format(pkt['AMBE'][0], len(pkt['AMBE'][0])))
print('AMBE 1: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][1])))
print('AMBE 2: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][2])))
print(t1-t0, '\n')
print('Voice Burst F Validation:')
print('Voice Burst F:')
t0 = time()
lc = voice_burst(voice_f)
pkt = voice(voice_f)
t1 = time()
print(lc[0], h(lc[1]), h(lc[2]), lc[3])
print('EMB: CC-{} LCSS-{}, EMBEDDED LC: {}'.format(h(pkt['CC']), h(pkt['LCSS']), h(pkt['EMBED'].tobytes())))
print('AMBE 0: {}, {}'.format(pkt['AMBE'][0], len(pkt['AMBE'][0])))
print('AMBE 1: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][1])))
print('AMBE 2: {}, {}'.format(pkt['AMBE'][1], len(pkt['AMBE'][2])))
print(t1-t0, '\n')
print('Terminator Validation:')
print('Terminator:')
t0 = time()
lc = voice_head_term(voice_term)
t1 = time()
print(h(lc[0]), h(lc[1]), h(lc[2]), lc[3])
print(t1-t0)
print('LC: OPT-{} SRC-{} DST-{} SLOT TYPE: CC-{} DTYPE-{}'.format(h(lc['LC'][0:3]),h(lc['LC'][3:6]),h(lc['LC'][6:9]),h(lc['CC']),h(lc['DTYPE'])))
print('Decode Time: {}\n'.format(t1-t0))

32
enc_dmr.py Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
#
# This work is licensed under the Creative Attribution-NonCommercial-ShareAlike
# 3.0 Unported License.To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
# Creative Commons, 444 Castro Street, Suite 900, Mountain View,
# California, 94041, USA.
from __future__ import print_function
from bitarray import bitarray
import constants
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Cortney T. Buffington, N0MJS'
__copyright__ = 'Copyright (c) 2016 Cortney T. Buffington, N0MJS and the K0USY Group'
__credits__ = 'Jonathan Naylor, G4KLX'
__license__ = 'Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported'
__maintainer__ = 'Cort Buffington, N0MJS'
__email__ = 'n0mjs@me.com'
#------------------------------------------------------------------------------
# Used to execute the module directly to run built-in tests
#------------------------------------------------------------------------------
if __name__ == '__main__':
from binascii import b2a_hex as h
from time import time
print(ENC_EMB)

View File

@ -31,40 +31,6 @@ def enc_15113(_data):
csum[3] = _data[0] ^ _data[1] ^ _data[2] ^ _data[4] ^ _data[6] ^ _data[7] ^ _data[10]
return csum
# DECODER - Returns a tuple of (decoded data, True if an error was corrected)
def dec_15113(_data):
chk0 = _data[0] ^ _data[1] ^ _data[2] ^ _data[3] ^ _data[5] ^ _data[7] ^ _data[8]
chk1 = _data[1] ^ _data[2] ^ _data[3] ^ _data[4] ^ _data[6] ^ _data[8] ^ _data[9]
chk2 = _data[2] ^ _data[3] ^ _data[4] ^ _data[5] ^ _data[7] ^ _data[9] ^ _data[10]
chk3 = _data[0] ^ _data[1] ^ _data[2] ^ _data[4] ^ _data[6] ^ _data[7] ^ _data[10]
n = 0
error = False
n |= 0x01 if chk0 != _data[11] else 0x00
n |= 0x02 if chk1 != _data[12] else 0x00
n |= 0x04 if chk2 != _data[13] else 0x00
n |= 0x08 if chk3 != _data[14] else 0x00
if n == 0x01: _data[11] = not _data[11]; return (_data, True)
if n == 0x02: _data[12] = not _data[12]; return (_data, True)
if n == 0x04: _data[13] = not _data[13]; return (_data, True)
if n == 0x08: _data[14] = not _data[14]; return (_data, True)
if n == 0x09: _data[0] = not _data[0]; return (_data, True)
if n == 0x0b: _data[1] = not _data[1]; return (_data, True)
if n == 0x0f: _data[2] = not _data[2]; return (_data, True)
if n == 0x07: _data[3] = not _data[3]; return (_data, True)
if n == 0x0e: _data[4] = not _data[4]; return (_data, True)
if n == 0x05: _data[5] = not _data[5]; return (_data, True)
if n == 0x0a: _data[6] = not _data[6]; return (_data, True)
if n == 0x0d: _data[7] = not _data[7]; return (_data, True)
if n == 0x03: _data[8] = not _data[8]; return (_data, True)
if n == 0x06: _data[9] = not _data[9]; return (_data, True)
if n == 0x0c: _data[10] = not _data[10]; return (_data, True)
return (_data, False)
#------------------------------------------------------------------------------
# Hamming 13,9,3 routines
@ -79,38 +45,7 @@ def enc_1393(_data):
csum[3] = _data[0] ^ _data[2] ^ _data[4] ^ _data[5] ^ _data[8]
return csum
# DECODER - Returns a tuple of (decoded data, True if an error was corrected)
def dec_1393(_data):
chk0 = _data[0] ^ _data[1] ^ _data[3] ^ _data[5] ^ _data[6]
chk1 = _data[0] ^ _data[1] ^ _data[2] ^ _data[4] ^ _data[6] ^ _data[7]
chk2 = _data[0] ^ _data[1] ^ _data[2] ^ _data[3] ^ _data[5] ^ _data[7] ^ _data[8]
chk3 = _data[0] ^ _data[2] ^ _data[4] ^ _data[5] ^ _data[8]
n = 0
error = False
n |= 0x01 if chk0 != _data[9] else 0x00
n |= 0x02 if chk1 != _data[10] else 0x00
n |= 0x04 if chk2 != _data[11] else 0x00
n |= 0x08 if chk3 != _data[12] else 0x00
if n == 0x01: _data[9] = not _data[9]; return (_data, True)
if n == 0x02: _data[10] = not _data[10]; return (_data, True)
if n == 0x04: _data[11] = not _data[11]; return (_data, True)
if n == 0x08: _data[12] = not _data[12]; return (_data, True)
if n == 0x0f: _data[0] = not _data[0]; return (_data, True)
if n == 0x07: _data[1] = not _data[1]; return (_data, True)
if n == 0x0e: _data[2] = not _data[2]; return (_data, True)
if n == 0x05: _data[3] = not _data[3]; return (_data, True)
if n == 0x0a: _data[4] = not _data[4]; return (_data, True)
if n == 0x0d: _data[5] = not _data[5]; return (_data, True)
if n == 0x03: _data[6] = not _data[6]; return (_data, True)
if n == 0x06: _data[7] = not _data[7]; return (_data, True)
if n == 0x0c: _data[8] = not _data[8]; return (_data, True)
return (_data, False)
#------------------------------------------------------------------------------
# Hamming 16,11,4 routines
#------------------------------------------------------------------------------
@ -124,69 +59,4 @@ def enc_16114(_data):
csum[2] = _data[2] ^ _data[3] ^ _data[4] ^ _data[5] ^ _data[7] ^ _data[9] ^ _data[10]
csum[3] = _data[0] ^ _data[1] ^ _data[2] ^ _data[4] ^ _data[6] ^ _data[7] ^ _data[10]
csum[4] = _data[0] ^ _data[2] ^ _data[5] ^ _data[6] ^ _data[8] ^ _data[9] ^ _data[10]
return csum
# DECODER - Returns a tuple of (decoded data, True if an error was corrected)
def dec_16114(_data):
chk0 = _data[0] ^ _data[1] ^ _data[2] ^ _data[3] ^ _data[5] ^ _data[7] ^ _data[8]
chk1 = _data[1] ^ _data[2] ^ _data[3] ^ _data[4] ^ _data[6] ^ _data[8] ^ _data[9]
chk2 = _data[2] ^ _data[3] ^ _data[4] ^ _data[5] ^ _data[7] ^ _data[9] ^ _data[10]
chk3 = _data[0] ^ _data[1] ^ _data[2] ^ _data[4] ^ _data[6] ^ _data[7] ^ _data[10]
chk4 = _data[0] ^ _data[2] ^ _data[5] ^ _data[6] ^ _data[8] ^ _data[9] ^ _data[10]
n = 0
error = False
n |= 0x01 if chk0 != _data[11] else 0x00
n |= 0x02 if chk1 != _data[12] else 0x00
n |= 0x04 if chk2 != _data[13] else 0x00
n |= 0x08 if chk3 != _data[14] else 0x00
n |= 0x10 if chk4 != _data[15] else 0x00
if n == 0x01: _data[11] = not _data[11]; return (_data, True)
if n == 0x02: _data[12] = not _data[12]; return (_data, True)
if n == 0x04: _data[13] = not _data[13]; return (_data, True)
if n == 0x08: _data[14] = not _data[14]; return (_data, True)
if n == 0x10: _data[15] = not _data[15]; return (_data, True)
if n == 0x19: _data[0] = not _data[0]; return (_data, True)
if n == 0x0b: _data[1] = not _data[1]; return (_data, True)
if n == 0x1f: _data[2] = not _data[2]; return (_data, True)
if n == 0x07: _data[3] = not _data[3]; return (_data, True)
if n == 0x0e: _data[4] = not _data[4]; return (_data, True)
if n == 0x15: _data[5] = not _data[5]; return (_data, True)
if n == 0x1a: _data[6] = not _data[6]; return (_data, True)
if n == 0x0d: _data[7] = not _data[7]; return (_data, True)
if n == 0x13: _data[8] = not _data[8]; return (_data, True)
if n == 0x16: _data[9] = not _data[9]; return (_data, True)
if n == 0x1c: _data[10] = not _data[10]; return (_data, True)
return (_data, False)
#------------------------------------------------------------------------------
# Used to execute the module directly to run built-in tests
#------------------------------------------------------------------------------
if __name__ == '__main__':
# Validation Example
good_data_15113 = bitarray('0000000000000000000100000011101000000000000000000000110001110011000000100100111110011010110111100101111001011010110101101100010110100110000110000111100111010011101101000010101001110000100101010100')
bad_data_15113 = bitarray('0000000000000000000110000011101000000000000000000000110001110011000000100100111110011010110111100101111001011010110101101100010110100110000110000111100111010011101101000010101001110000100101010100')
def check_15113(_data):
rows = (_data[1:16],_data[16:31],_data[31:46],_data[46:61],_data[61:76],_data[76:91],_data[91:106],_data[106:121],_data[121:136])
print('Processing New Integrity Check')
for row in rows:
print('original data:', row[0:11], 'original parity:', row[11:15])
hamming_dec = dec_15113(row[0:15])
code = hamming_dec[0]
error = hamming_dec[1]
print('\tDECODE: data: ', code[0:11], 'parity: ', code[11:15], 'error:', error)
print('\tENCODE: calculated parity:', enc_15113(row[0:11]))
print()
check_15113(good_data_15113)
check_15113(bad_data_15113)
return csum

View File

@ -79,14 +79,14 @@ def build_config(_config_file):
'MASTER_IP': gethostbyname(config.get(section, 'MASTER_IP')),
'MASTER_PORT': config.getint(section, 'MASTER_PORT'),
'PASSPHRASE': config.get(section, 'PASSPHRASE'),
'CALLSIGN': config.get(section, 'CALLSIGN').ljust(8),
'CALLSIGN': config.get(section, 'CALLSIGN').ljust(8)[:8],
'RADIO_ID': hex(int(config.get(section, 'RADIO_ID')))[2:].rjust(8,'0').decode('hex'),
'RX_FREQ': config.get(section, 'RX_FREQ').ljust(9),
'TX_FREQ': config.get(section, 'TX_FREQ').ljust(9),
'RX_FREQ': config.get(section, 'RX_FREQ').ljust(9)[:9],
'TX_FREQ': config.get(section, 'TX_FREQ').ljust(9)[:9],
'TX_POWER': config.get(section, 'TX_POWER').rjust(2,'0'),
'COLORCODE': config.get(section, 'COLORCODE').rjust(2,'0'),
'LATITUDE': config.get(section, 'LATITUDE').ljust(9),
'LONGITUDE': config.get(section, 'LONGITUDE').ljust(10),
'LATITUDE': config.get(section, 'LATITUDE').ljust(8)[:8],
'LONGITUDE': config.get(section, 'LONGITUDE').ljust(9)[:9],
'HEIGHT': config.get(section, 'HEIGHT').rjust(3,'0'),
'LOCATION': config.get(section, 'LOCATION').ljust(20)[:20],
'DESCRIPTION': config.get(section, 'DESCRIPTION').ljust(19)[:19],
@ -148,4 +148,4 @@ if __name__ == '__main__':
cli_args.CONFIG_FILE = os.path.dirname(os.path.abspath(__file__))+'/hblink.cfg'
pprint(build_config(cli_args.CONFIG_FILE))
pprint(build_config(cli_args.CONFIG_FILE))

View File

@ -11,9 +11,8 @@ from __future__ import print_function
# Python modules we need
import sys
from binascii import b2a_hex as h
# Debugging functions
from pprint import pprint
from bitarray import bitarray
from time import time
# Twisted is pretty important, so I keep it separate
from twisted.internet.protocol import DatagramProtocol
@ -21,7 +20,10 @@ from twisted.internet import reactor
from twisted.internet import task
# Things we import from the main hblink module
from hblink import CONFIG, HBMASTER, HBCLIENT, logger, systems, hex_str_3, int_id
from hblink import CONFIG, HBSYSTEM, logger, systems, hex_str_3, int_id, sub_alias, peer_alias, tg_alias
import dec_dmr
import bptc
import constants as const
# Import Bridging rules
# Note: A stanza *must* exist for any MASTER or CLIENT configured in the main
@ -45,6 +47,8 @@ for _system in RULES_FILE:
_rule['ON'][i] = hex_str_3(_rule['ON'][i])
for i, e in enumerate(_rule['OFF']):
_rule['OFF'][i] = hex_str_3(_rule['OFF'][i])
_rule['TIMEOUT']= _rule['TIMEOUT']*60
_rule['TIMER'] = time() + _rule['TIMEOUT']
if _system not in CONFIG['SYSTEMS']:
sys.exit('ERROR: Routing rules found for system not configured main configuration')
for _system in CONFIG['SYSTEMS']:
@ -53,8 +57,37 @@ for _system in CONFIG['SYSTEMS']:
RULES = RULES_FILE
# TEMPORARY DEBUGGING LINE -- TO BE REMOVED LATER
#pprint(RULES)
# Import subscriber ACL
# ACL may be a single list of subscriber IDs
# Global action is to allow or deny them. Multiple lists with different actions and ranges
# are not yet implemented.
try:
from sub_acl import ACL_ACTION, ACL
# uses more memory to build hex strings, but processes MUCH faster when checking for matches
for i, e in enumerate(ACL):
ACL[i] = hex_str_3(ACL[i])
logger.info('Subscriber access control file found, subscriber ACL imported')
except ImportError:
logger.critical('\'sub_acl.py\' not found - all subscriber IDs are valid')
ACL_ACTION = 'NONE'
# Depending on which type of ACL is used (PERMIT, DENY... or there isn't one)
# define a differnet function to be used to check the ACL
if ACL_ACTION == 'PERMIT':
def allow_sub(_sub):
if _sub in ACL:
return True
else:
return False
elif ACL_ACTION == 'DENY':
def allow_sub(_sub):
if _sub not in ACL:
return True
else:
return False
else:
def allow_sub(_sub):
return True
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Cortney T. Buffington, N0MJS'
@ -66,52 +99,262 @@ __email__ = 'n0mjs@me.com'
__status__ = 'pre-alpha'
class routerMASTER(HBMASTER):
def dmrd_received(self, _radio_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
_bits = int_id(_data[15])
if _call_type == 'group':
_routed = False
for rule in RULES[self._master]['GROUP_VOICE']:
_target = rule['DST_NET']
if (rule['SRC_GROUP'] == _dst_id and rule['SRC_TS'] == _slot and rule['ACTIVE'] == True):
if rule['SRC_TS'] != rule['DST_TS']:
_tmp_bits = _bits ^ 1 << 7
# Run this every minute for rule timer updates
def rule_timer_loop():
logger.info('(ALL HBSYSTEMS) Rule timer loop started')
_now = time()
for _system in RULES:
for _rule in RULES[_system]['GROUP_VOICE']:
if _rule['TO_TYPE'] == 'ON':
if _rule['ACTIVE'] == True:
if _rule['TIMER'] < _now:
_rule['ACTIVE'] = False
logger.info('(%s) Rule timout DEACTIVATE: Rule name: %s, Target HBSystem: %s, TS: %s, TGID: %s', _system, _rule['NAME'], _rule['DST_NET'], _rule['DST_TS']+1, int_id(_rule['DST_GROUP']))
else:
_tmp_bits = _bits
_tmp_data = _data[:8] + rule['DST_GROUP'] + _data[11:15] + chr(_tmp_bits) + _data[16:]
#print(h(_data))
#print(h(_tmp_data))
systems[_target].send_system(_tmp_data)
_routed = True
logger.debug('(%s) Packet routed to %s system: %s', self._master, CONFIG['SYSTEMS'][_target]['MODE'], _target)
if not _routed:
logger.debug('(%s) Packet router no target TS/TGID %s/%s', self._master, _slot, int_id(_dst_id))
timeout_in = _rule['TIMER'] - _now
logger.info('(%s) Rule ACTIVE with ON timer running: Timeout eligible in: %ds, Rule name: %s, Target HBSystem: %s, TS: %s, TGID: %s', _system, timeout_in, _rule['NAME'], _rule['DST_NET'], _rule['DST_TS']+1, int_id(_rule['DST_GROUP']))
elif _rule['TO_TYPE'] == 'OFF':
if _rule['ACTIVE'] == False:
if _rule['TIMER'] < _now:
_rule['ACTIVE'] = True
logger.info('(%s) Rule timout ACTIVATE: Rule name: %s, Target HBSystem: %s, TS: %s, TGID: %s', _system, _rule['NAME'], _rule['DST_NET'], _rule['DST_TS']+1, int_id(_rule['DST_GROUP']))
else:
timeout_in = _rule['TIMER'] - _now
logger.info('(%s) Rule DEACTIVE with OFF timer running: Timeout eligible in: %ds, Rule name: %s, Target HBSystem: %s, TS: %s, TGID: %s', _system, timeout_in, _rule['NAME'], _rule['DST_NET'], _rule['DST_TS']+1, int_id(_rule['DST_GROUP']))
else:
logger.debug('Rule timer loop made no rule changes')
class routerCLIENT(HBCLIENT):
class routerSYSTEM(HBSYSTEM):
def __init__(self, *args, **kwargs):
HBSYSTEM.__init__(self, *args, **kwargs)
# Status information for the system, TS1 & TS2
# 1 & 2 are "timeslot"
# In TX_EMB_LC, 2-5 are burst B-E
self.STATUS = {
1: {
'RX_START': time(),
'RX_SEQ': '\x00',
'RX_RFS': '\x00',
'TX_RFS': '\x00',
'RX_STREAM_ID': '\x00',
'TX_STREAM_ID': '\x00',
'RX_TGID': '\x00\x00\x00',
'TX_TGID': '\x00\x00\x00',
'RX_TIME': time(),
'TX_TIME': time(),
'RX_TYPE': const.HBPF_SLT_VTERM,
'RX_LC': '\x00',
'TX_H_LC': '\x00',
'TX_T_LC': '\x00',
'TX_EMB_LC': {
1: '\x00',
2: '\x00',
3: '\x00',
4: '\x00',
}
},
2: {
'RX_START': time(),
'RX_SEQ': '\x00',
'RX_RFS': '\x00',
'TX_RFS': '\x00',
'RX_STREAM_ID': '\x00',
'TX_STREAM_ID': '\x00',
'RX_TGID': '\x00\x00\x00',
'TX_TGID': '\x00\x00\x00',
'RX_TIME': time(),
'TX_TIME': time(),
'RX_TYPE': const.HBPF_SLT_VTERM,
'RX_LC': '\x00',
'TX_H_LC': '\x00',
'TX_T_LC': '\x00',
'TX_EMB_LC': {
1: '\x00',
2: '\x00',
3: '\x00',
4: '\x00',
}
}
}
def dmrd_received(self, _radio_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
pkt_time = time()
dmrpkt = _data[20:53]
_bits = int_id(_data[15])
if _call_type == 'group':
_routed = False
for rule in RULES[self._client]['GROUP_VOICE']:
# Check for ACL match, and return if the subscriber is not allowed
if allow_sub(_rf_src) == False:
logger.warning('(%s) Group Voice Packet ***REJECTED BY ACL*** From: %s, HBP Peer %s, Destination TGID %s', self._system, int_id(_rf_src), int_id(_radio_id), int_id(_dst_id))
return
# Is this a new call stream?
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']):
if (self.STATUS[_slot]['RX_TYPE'] != const.HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + const.STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']):
logger.warning('(%s) Packet received with STREAM ID: %s <FROM> SUB: %s REPEATER: %s <TO> TGID %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_radio_id), int_id(_dst_id), _slot)
return
# This is a new call stream
self.STATUS['RX_START'] = pkt_time
logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) REPEATER: %s (%s) TGID %s (%s), TS %s', self._system, int_id(_stream_id), sub_alias(_rf_src), int_id(_rf_src), peer_alias(_radio_id), int_id(_radio_id), tg_alias(_dst_id), int_id(_dst_id), _slot)
# If we can, use the LC from the voice header as to keep all options intact
if _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VHEAD:
decoded = dec_dmr.voice_head_term(dmrpkt)
self.STATUS[_slot]['RX_LC'] = decoded['LC']
# If we don't have a voice header then don't wait to decode it from the Embedded LC
# just make a new one from the HBP header. This is good enough, and it saves lots of time
else:
self.STATUS[_slot]['RX_LC'] = const.LC_OPT + _dst_id + _rf_src
for rule in RULES[self._system]['GROUP_VOICE']:
_target = rule['DST_NET']
_target_status = systems[_target].STATUS
if (rule['SRC_GROUP'] == _dst_id and rule['SRC_TS'] == _slot and rule['ACTIVE'] == True):
# BEGIN CONTENTION HANDLING
#
# The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is:
# From a different group than last RX from this HBSystem, but it has been less than Group Hangtime
# From a different group than last TX to this HBSystem, but it has been less than Group Hangtime
# From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout
# From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout
# The "continue" at the end of each means the next iteration of the for loop that tests for matching rules
#
if ((rule['DST_GROUP'] != _target_status[rule['DST_TS']]['RX_TGID']) and ((pkt_time - _target_status[rule['DST_TS']]['RX_TIME']) < RULES[_target]['GROUP_HANGTIME'])):
if _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VHEAD:
logger.info('(%s) Call not routed to TGID%s, target active or in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(rule['DST_GROUP']), _target, rule['DST_TS'], int_id(_target_status[rule['DST_TS']]['RX_TGID']))
continue
if ((rule['DST_GROUP'] != _target_status[rule['DST_TS']]['TX_TGID']) and ((pkt_time - _target_status[rule['DST_TS']]['TX_TIME']) < RULES[_target]['GROUP_HANGTIME'])):
if _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VHEAD:
logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(rule['DST_GROUP']), _target, rule['DST_TS'], int_id(_target_status[rule['DST_TS']]['TX_TGID']))
continue
if (rule['DST_GROUP'] == _target_status[rule['DST_TS']]['RX_TGID']) and ((pkt_time - _target_status[rule['DST_TS']]['RX_TIME']) < const.STREAM_TO):
if _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VHEAD:
logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(rule['DST_GROUP']), _target, rule['DST_TS'], int_id(_target_status[rule['DST_TS']]['RX_TGID']))
continue
if (rule['DST_GROUP'] == _target_status[rule['DST_TS']]['TX_TGID']) and (_rf_src != _target_status[rule['DST_TS']]['TX_RFS']) and ((pkt_time - _target_status[rule['DST_TS']]['TX_TIME']) < const.STREAM_TO):
if _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VHEAD:
logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target, rule['DST_TS'], int_id(_target_status[rule['DST_TS']]['TX_TGID']), _target_status[rule['DST_TS']]['TX_RFS'])
continue
# Set values for the contention handler to test next time there is a frame to forward
_target_status[rule['DST_TS']]['TX_TIME'] = pkt_time
if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']) or (_target_status[rule['DST_TS']]['TX_RFS'] != _rf_src) or (_target_status[rule['DST_TS']]['TX_TGID'] != rule['DST_GROUP']):
# Record the DST TGID and Stream ID
_target_status[rule['DST_TS']]['TX_TGID'] = rule['DST_GROUP']
_target_status[rule['DST_TS']]['TX_STREAM_ID'] = _stream_id
_target_status[rule['DST_TS']]['TX_RFS'] = _rf_src
# Generate LCs (full and EMB) for the TX stream
# if _dst_id != rule['DST_GROUP']:
dst_lc = self.STATUS[_slot]['RX_LC'][0:3] + rule['DST_GROUP'] + _rf_src
_target_status[rule['DST_TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
_target_status[rule['DST_TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
_target_status[rule['DST_TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
logger.debug('(%s) Packet DST TGID (%s) does not match SRC TGID(%s) - Generating FULL and EMB LCs', self._system, int_id(rule['DST_GROUP']), int_id(_dst_id))
# Handle any necessary re-writes for the destination
if rule['SRC_TS'] != rule['DST_TS']:
_tmp_bits = _bits ^ 1 << 7
else:
_tmp_bits = _bits
_tmp_data = _data[:8] + rule['DST_GROUP'] + _data[11:15] + chr(_bits) + _data[16:]
#print(h(_data))
#print(h(_tmp_data))
systems[_target].send_system(_tmp_data)
_routed = True
logger.debug('(%s) Packet routed to %s system: %s', self._client, CONFIG['SYSTEMS'][_target]['MODE'], _target)
# Assemble transmit HBP packet header
_tmp_data = _data[:8] + rule['DST_GROUP'] + _data[11:15] + chr(_tmp_bits) + _data[16:20]
# MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET
# MUST RE-WRITE DESTINATION TGID IF DIFFERENT
# if _dst_id != rule['DST_GROUP']:
dmrbits = bitarray(endian='big')
dmrbits.frombytes(dmrpkt)
# Create a voice header packet (FULL LC)
if _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VHEAD:
dmrbits = _target_status[rule['DST_TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[rule['DST_TS']]['TX_H_LC'][98:197]
# Create a voice terminator packet (FULL LC)
elif _frame_type == const.HBPF_DATA_SYNC and _dtype_vseq == const.HBPF_SLT_VTERM:
dmrbits = _target_status[rule['DST_TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[rule['DST_TS']]['TX_T_LC'][98:197]
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[rule['DST_TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
dmrpkt = dmrbits.tobytes()
_tmp_data = _tmp_data + dmrpkt + _data[53:55]
# Transmit the packet to the destination system
systems[_target].send_system(_tmp_data)
logger.debug('(%s) Packet routed by rule: %s to %s system: %s', self._system, rule['NAME'], CONFIG['SYSTEMS'][_target]['MODE'], _target)
# Final actions - Is this a voice terminator?
if (_frame_type == const.HBPF_DATA_SYNC) and (_dtype_vseq == const.HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != const.HBPF_SLT_VTERM):
call_duration = pkt_time - self.STATUS['RX_START']
logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) REPEATER: %s (%s) TGID %s (%s), TS %s, Duration: %s', self._system, int_id(_stream_id), sub_alias(_rf_src), int_id(_rf_src), peer_alias(_radio_id), int_id(_radio_id), tg_alias(_dst_id), int_id(_dst_id), _slot, call_duration)
#
# Begin in-band signalling for call end. This has nothign to do with routing traffic directly.
#
# Iterate the rules dictionary
for rule in RULES[self._system]['GROUP_VOICE']:
_target = rule['DST_NET']
# TGID matches a rule source, reset its timer
if _slot == rule['SRC_TS'] and _dst_id == rule['SRC_GROUP'] and ((rule['TO_TYPE'] == 'ON' and (rule['ACTIVE'] == True)) or (rule['TO_TYPE'] == 'OFF' and rule['ACTIVE'] == False)):
rule['TIMER'] = pkt_time + rule['TIMEOUT']
logger.info('(%s) Source group transmission match for rule \"%s\". Reset timeout to %s', self._system, rule['NAME'], rule['TIMER'])
# Scan for reciprocal rules and reset their timers as well.
for target_rule in RULES[_target]['GROUP_VOICE']:
if target_rule['NAME'] == rule['NAME']:
target_rule['TIMER'] = pkt_time + target_rule['TIMEOUT']
logger.info('(%s) Reciprocal group transmission match for rule \"%s\" on IPSC \"%s\". Reset timeout to %s', self._system, target_rule['NAME'], _target, rule['TIMER'])
# TGID matches an ACTIVATION trigger
if _dst_id in rule['ON']:
# Set the matching rule as ACTIVE
rule['ACTIVE'] = True
rule['TIMER'] = pkt_time + rule['TIMEOUT']
logger.info('(%s) Primary routing Rule \"%s\" changed to state: %s', self._system, rule['NAME'], rule['ACTIVE'])
# Set reciprocal rules for other IPSCs as ACTIVE
for target_rule in RULES[_target]['GROUP_VOICE']:
if target_rule['NAME'] == rule['NAME']:
target_rule['ACTIVE'] = True
target_rule['TIMER'] = pkt_time + target_rule['TIMEOUT']
logger.info('(%s) Reciprocal routing Rule \"%s\" in IPSC \"%s\" changed to state: %s', self._system, target_rule['NAME'], _target, rule['ACTIVE'])
# TGID matches an DE-ACTIVATION trigger
if _dst_id in rule['OFF']:
# Set the matching rule as ACTIVE
rule['ACTIVE'] = False
logger.info('(%s) Routing Rule \"%s\" changed to state: %s', self._system, rule['NAME'], rule['ACTIVE'])
# Set reciprocal rules for other IPSCs as ACTIVE
_target = rule['DST_NET']
for target_rule in RULES[_target]['GROUP_VOICE']:
if target_rule['NAME'] == rule['NAME']:
target_rule['ACTIVE'] = False
logger.info('(%s) Reciprocal routing Rule \"%s\" in IPSC \"%s\" changed to state: %s', self._system, target_rule['NAME'], _target, rule['ACTIVE'])
#
# END IN-BAND SIGNALLING
#
# Mark status variables for use later
self.STATUS[_slot]['RX_RFS'] = _rf_src
self.STATUS[_slot]['RX_TYPE'] = _dtype_vseq
self.STATUS[_slot]['RX_TGID'] = _dst_id
self.STATUS[_slot]['RX_TIME'] = pkt_time
self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id
if not _routed:
logger.debug('(%s) Packet router no target TS/TGID %s/%s', self._client, _slot, int_id(_dst_id))
#************************************************
# MAIN PROGRAM LOOP STARTS HERE
@ -120,14 +363,16 @@ class routerCLIENT(HBCLIENT):
if __name__ == '__main__':
logger.info('HBlink \'hb_router.py\' (c) 2016 N0MJS & the K0USY Group - SYSTEM STARTING...')
# HBlink instance creation
# HBlink instance creation
for system in CONFIG['SYSTEMS']:
if CONFIG['SYSTEMS'][system]['ENABLED']:
if CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER':
systems[system] = routerMASTER(system)
elif CONFIG['SYSTEMS'][system]['MODE'] == 'CLIENT':
systems[system] = routerCLIENT(system)
systems[system] = routerSYSTEM(system)
reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP'])
logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])
# Initialize the rule timer -- this if for user activated stuff
rule_timer = task.LoopingCall(rule_timer_loop)
rule_timer.start(60)
reactor.run()

View File

@ -1,3 +1,27 @@
'''
THIS EXAMPLE WILL NOT WORK AS IT IS - YOU MUST SPECIFY NAMES AND GROUP IDS!!!
NOTES:
* GROUP_HANGTIME should be set to the same value as the repeaters in the IPSC network
* NAME is any name you want, and is used to match reciprocal rules for user-activateion
* ACTIVE should be set to True if you want the rule active by default, False to be inactive
* ON and OFF are LISTS of Talkgroup IDs used to trigger this rule off and on. Even if you
only want one (as shown in the ON example), it has to be in list format. None can be
handled with an empty list, such as " 'ON': [] ".
* TO_TYPE is timeout type. If you want to use timers, ON means when it's turned on, it will
turn off afer the timout period and OFF means it will turn back on after the timout
period. If you don't want to use timers, set it to anything else, but 'NONE' might be
a good value for documentation!
* TIMOUT is a value in minutes for the timout timer. No, I won't make it 'seconds', so don't
ask. Timers are performance "expense".
DO YOU THINK THIS FILE IS TOO COMPLICATED?
Because you guys all want more and more features, this file is getting complicated. I have
dabbled with using a parser to make it easier to build. I'm torn. There is a HUGE benefit
to having it like it is. This is a python file. Simply running it
(i.e. "python hb_routing_rules.py) will tell you if there's a syntax error and where. Think
about that for a few minutes :)
'''
RULES = {
'MASTER-1': {
'GROUP_HANGTIME': 5,
@ -25,4 +49,4 @@ RULES = {
if __name__ == '__main__':
from pprint import pprint
pprint(RULES)
pprint(RULES)

238
hblink.py
View File

@ -23,7 +23,7 @@ from hashlib import sha256
from time import time
from urllib import URLopener
from csv import reader as csv_reader
from bitstring import BitArray
#from bitstring import BitArray
import socket
# Debugging functions
@ -149,7 +149,7 @@ def handler(_signal, _frame):
reactor.stop()
# Set signal handers so that we can gracefully exit if need be
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]:
for sig in [signal.SIGTERM, signal.SIGINT]:
signal.signal(sig, handler)
@ -167,7 +167,8 @@ def hex_str_3(_int_id):
# Create a 4 byte hex string from an integer
def hex_str_4(_int_id):
try:
return hex(_int_id)[2:].rjust(8,'0').decode('hex')
#return hex(_int_id)[2:].rjust(8,'0').decode('hex')
return format(_int_id,'x').rjust(8,'0').decode('hex')
except TypeError:
logger.error('hex_str_4: invalid integer length')
@ -212,42 +213,70 @@ class AMBE:
# HB MASTER CLASS
#************************************************
class HBMASTER(DatagramProtocol):
class HBSYSTEM(DatagramProtocol):
def __init__(self, *args, **kwargs):
if len(args) == 1:
# Define a few shortcuts to make the rest of the class more readable
self._master = args[0]
self._system = self._master
self._config = CONFIG['SYSTEMS'][self._master]
self._clients = CONFIG['SYSTEMS'][self._master]['CLIENTS']
self._system = args[0]
self._config = CONFIG['SYSTEMS'][self._system]
# Define shortcuts and generic function names based on the type of system we are
if self._config['MODE'] == 'MASTER':
self._clients = CONFIG['SYSTEMS'][self._system]['CLIENTS']
self.send_system = self.send_clients
self.maintenance_loop = self.master_maintenance_loop
self.datagramReceived = self.master_datagramReceived
elif self._config['MODE'] == 'CLIENT':
self._stats = self._config['STATS']
self.send_system = self.send_master
self.maintenance_loop = self.client_maintenance_loop
self.datagramReceived = self.client_datagramReceived
# Configure for AMBE audio export if enabled
if self._config['EXPORT_AMBE']:
self._ambe = AMBE()
else:
# If we didn't get called correctly, log it and quit.
logger.error('(%s) HBMASTER was not called with an argument. Terminating', self._master)
logger.error('(%s) HBMASTER was not called with an argument. Terminating', self._system)
sys.exit()
def startProtocol(self):
# Set up periodic loop for tracking pings from clients. Run every 'PING_TIME' seconds
self._master_maintenance = task.LoopingCall(self.master_maintenance_loop)
self._master_maintenance_loop = self._master_maintenance.start(CONFIG['GLOBAL']['PING_TIME'])
self._system_maintenance = task.LoopingCall(self.maintenance_loop)
self._system_maintenance_loop = self._system_maintenance.start(CONFIG['GLOBAL']['PING_TIME'])
# Aliased in __init__ to maintenance_loop if system is a master
def master_maintenance_loop(self):
logger.debug('(%s) Master maintenance loop started', self._master)
logger.debug('(%s) Master maintenance loop started', self._system)
for client in self._clients:
_this_client = self._clients[client]
# Check to see if any of the clients have been quiet (no ping) longer than allowed
if _this_client['LAST_PING']+CONFIG['GLOBAL']['PING_TIME']*CONFIG['GLOBAL']['MAX_MISSED'] < time():
logger.info('(%s) Client %s (%s) has timed out', self._master, _this_client['CALLSIGN'], _this_client['RADIO_ID'])
logger.info('(%s) Client %s (%s) has timed out', self._system, _this_client['CALLSIGN'], _this_client['RADIO_ID'])
# Remove any timed out clients from the configuration
del CONFIG['SYSTEMS'][self._master]['CLIENTS'][client]
del CONFIG['SYSTEMS'][self._system]['CLIENTS'][client]
# Aliased in __init__ to maintenance_loop if system is a client
def client_maintenance_loop(self):
logger.debug('(%s) Client maintenance loop started', self._system)
# If we're not connected, zero out the stats and send a login request RPTL
if self._stats['CONNECTION'] == 'NO' or self._stats['CONNECTION'] == 'RTPL_SENT':
self._stats['PINGS_SENT'] = 0
self._stats['PINGS_ACKD'] = 0
self._stats['CONNECTION'] = 'RTPL_SENT'
self.send_master('RPTL'+self._config['RADIO_ID'])
logger.info('(%s) Sending login request to master %s:%s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'])
# If we are connected, sent a ping to the master and increment the counter
if self._stats['CONNECTION'] == 'YES':
self.send_master('RPTPING'+self._config['RADIO_ID'])
self._stats['PINGS_SENT'] += 1
logger.debug('(%s) RPTPING Sent to Master. Pings Since Connected: %s', self._system, self._stats['PINGS_SENT'])
def send_clients(self, _packet):
for _client in self._clients:
self.send_client(_client, _packet)
#logger.debug('(%s) Packet sent to client %s', self._master, self._clients[_client]['RADIO_ID'])
#logger.debug('(%s) Packet sent to client %s', self._system, self._clients[_client]['RADIO_ID'])
def send_client(self, _client, _packet):
_ip = self._clients[_client]['IP']
@ -256,16 +285,18 @@ class HBMASTER(DatagramProtocol):
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
#logger.debug('(%s) TX Packet to %s on port %s: %s', self._clients[_client]['RADIO_ID'], self._clients[_client]['IP'], self._clients[_client]['PORT'], h(_packet))
# Alias for other programs to use a common name to send a packet
# regardless of the system type (MASTER or CLIENT)
send_system = send_clients
def send_master(self, _packet):
self.transport.write(_packet, (self._config['MASTER_IP'], self._config['MASTER_PORT']))
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
#logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], h(_packet))
def dmrd_received(self, _radio_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
pass
def datagramReceived(self, _data, (_host, _port)):
# Aliased in __init__ to datagramReceived if system is a master
def master_datagramReceived(self, _data, (_host, _port)):
# Keep This Line Commented Unless HEAVILY Debugging!
#logger.debug('(%s) RX packet from %s:%s -- %s', self._master, _host, _port, h(_data))
#logger.debug('(%s) RX packet from %s:%s -- %s', self._system, _host, _port, h(_data))
# Extract the command, which is various length, all but one 4 significant characters -- RPTCL
_command = _data[:4]
@ -282,29 +313,21 @@ class HBMASTER(DatagramProtocol):
_bits = int_id(_data[15])
_slot = 2 if (_bits & 0x80) else 1
_call_type = 'unit' if (_bits & 0x40) else 'group'
_raw_frame_type = (_bits & 0x30) >> 4
if _raw_frame_type == 0b00:
_frame_type = 'voice'
elif _raw_frame_type == 0b01:
_frame_type = 'voice_sync'
elif _raw_frame_type == 0b10:
_frame_type = 'data_sync'
else:
_frame_type = 'none'
_frame_type = (_bits & 0x30) >> 4
_dtype_vseq = (_bits & 0xF) # data, 1=voice header, 2=voice terminator; voice, 0=burst A ... 5=burst F
_stream_id = _data[16:20]
#logger.debug('(%s) DMRD - Seqence: %s, RF Source: %s, Destination ID: %s', self._master, int_id(_seq), int_id(_rf_src), int_id(_dst_id))
#logger.debug('(%s) DMRD - Seqence: %s, RF Source: %s, Destination ID: %s', self._system, int_id(_seq), int_id(_rf_src), int_id(_dst_id))
# If AMBE audio exporting is configured...
if self._config['EXPORT_AMBE']:
self._ambe.parseAMBE(self._master, _data)
self._ambe.parseAMBE(self._system, _data)
# The basic purpose of a master is to repeat to the clients
if self._config['REPEAT'] == True:
for _client in self._clients:
if _client != _radio_id:
self.send_client(_client, _data)
logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to client: %s (%s) [Stream ID: %s]', self._master, _slot, self._clients[_radio_id]['CALLSIGN'], int_id(_radio_id), int_id(_dst_id), self._clients[_client]['CALLSIGN'], int_id(_client), int_id(_stream_id))
logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to client: %s (%s) [Stream ID: %s]', self._system, _slot, self._clients[_radio_id]['CALLSIGN'], int_id(_radio_id), int_id(_dst_id), self._clients[_client]['CALLSIGN'], int_id(_client), int_id(_stream_id))
# Userland actions -- typically this is the function you subclass for an application
self.dmrd_received(_radio_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
@ -335,14 +358,14 @@ class HBMASTER(DatagramProtocol):
'SOFTWARE_ID': '',
'PACKAGE_ID': '',
}})
logger.info('(%s) Repeater Logging in with Radio ID: %s, %s:%s', self._master, int_id(_radio_id), _host, _port)
logger.info('(%s) Repeater Logging in with Radio ID: %s, %s:%s', self._system, int_id(_radio_id), _host, _port)
_salt_str = hex_str_4(self._clients[_radio_id]['SALT'])
self.send_client(_radio_id, 'RPTACK'+_salt_str)
self._clients[_radio_id]['CONNECTION'] = 'CHALLENGE_SENT'
logger.info('(%s) Sent Challenge Response to %s for login: %s', self._master, int_id(_radio_id), self._clients[_radio_id]['SALT'])
logger.info('(%s) Sent Challenge Response to %s for login: %s', self._system, int_id(_radio_id), self._clients[_radio_id]['SALT'])
else:
self.transport.write('MSTNAK'+_radio_id, (_host, _port))
logger.warning('(%s) Invalid Login from Radio ID: %s', self._master, int_id(_radio_id))
logger.warning('(%s) Invalid Login from Radio ID: %s', self._system, int_id(_radio_id))
elif _command == 'RPTK': # Repeater has answered our login challenge
_radio_id = _data[4:8]
@ -358,14 +381,14 @@ class HBMASTER(DatagramProtocol):
if _sent_hash == _calc_hash:
_this_client['CONNECTION'] = 'WAITING_CONFIG'
self.send_client(_radio_id, 'RPTACK'+_radio_id)
logger.info('(%s) Client %s has completed the login exchange successfully', self._master, _this_client['RADIO_ID'])
logger.info('(%s) Client %s has completed the login exchange successfully', self._system, _this_client['RADIO_ID'])
else:
logger.info('(%s) Client %s has FAILED the login exchange successfully', self._master, _this_client['RADIO_ID'])
logger.info('(%s) Client %s has FAILED the login exchange successfully', self._system, _this_client['RADIO_ID'])
self.transport.write('MSTNAK'+_radio_id, (_host, _port))
del self._clients[_radio_id]
else:
self.transport.write('MSTNAK'+_radio_id, (_host, _port))
logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._master, int_id(_radio_id))
logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._system, int_id(_radio_id))
elif _command == 'RPTC': # Repeater is sending it's configuraiton OR disconnecting
if _data[:5] == 'RPTCL': # Disconnect command
@ -374,9 +397,10 @@ class HBMASTER(DatagramProtocol):
and self._clients[_radio_id]['CONNECTION'] == 'YES' \
and self._clients[_radio_id]['IP'] == _host \
and self._clients[_radio_id]['PORT'] == _port:
logger.info('(%s) Client is closing down: %s (%s)', self._master, self._clients[_radio_id]['CALLSIGN'], int_id(_radio_id))
logger.info('(%s) Client is closing down: %s (%s)', self._system, self._clients[_radio_id]['CALLSIGN'], int_id(_radio_id))
self.transport.write('MSTNAK'+_radio_id, (_host, _port))
del self._clients[_radio_id]
else:
_radio_id = _data[4:8] # Configure Command
if _radio_id in self._clients \
@ -391,21 +415,21 @@ class HBMASTER(DatagramProtocol):
_this_client['TX_FREQ'] = _data[25:34]
_this_client['TX_POWER'] = _data[34:36]
_this_client['COLORCODE'] = _data[36:38]
_this_client['LATITUDE'] = _data[38:47]
_this_client['LONGITUDE'] = _data[47:57]
_this_client['HEIGHT'] = _data[57:60]
_this_client['LOCATION'] = _data[60:80]
_this_client['DESCRIPTION'] = _data[80:99]
_this_client['SLOTS'] = _data[99:100]
_this_client['URL'] = _data[100:224]
_this_client['SOFTWARE_ID'] = _data[224:264]
_this_client['PACKAGE_ID'] = _data[264:304]
_this_client['LATITUDE'] = _data[38:46]
_this_client['LONGITUDE'] = _data[46:55]
_this_client['HEIGHT'] = _data[55:58]
_this_client['LOCATION'] = _data[58:78]
_this_client['DESCRIPTION'] = _data[78:97]
_this_client['SLOTS'] = _data[97:98]
_this_client['URL'] = _data[98:222]
_this_client['SOFTWARE_ID'] = _data[222:262]
_this_client['PACKAGE_ID'] = _data[262:302]
self.send_client(_radio_id, 'RPTACK'+_radio_id)
logger.info('(%s) Client %s (%s) has sent repeater configuration', self._master, _this_client['CALLSIGN'], _this_client['RADIO_ID'])
logger.info('(%s) Client %s (%s) has sent repeater configuration', self._system, _this_client['CALLSIGN'], _this_client['RADIO_ID'])
else:
self.transport.write('MSTNAK'+_radio_id, (_host, _port))
logger.warning('(%s) Client info from Radio ID that has not logged in: %s', self._master, int_id(_radio_id))
logger.warning('(%s) Client info from Radio ID that has not logged in: %s', self._system, int_id(_radio_id))
elif _command == 'RPTP': # RPTPing -- client is pinging us
_radio_id = _data[7:11]
@ -415,70 +439,18 @@ class HBMASTER(DatagramProtocol):
and self._clients[_radio_id]['PORT'] == _port:
self._clients[_radio_id]['LAST_PING'] = time()
self.send_client(_radio_id, 'MSTPONG'+_radio_id)
logger.debug('(%s) Received and answered RPTPING from client %s (%s)', self._master, self._clients[_radio_id]['CALLSIGN'], int_id(_radio_id))
logger.debug('(%s) Received and answered RPTPING from client %s (%s)', self._system, self._clients[_radio_id]['CALLSIGN'], int_id(_radio_id))
else:
self.transport.write('MSTNAK'+_radio_id, (_host, _port))
logger.warning('(%s) Client info from Radio ID that has not logged in: %s', self._master, int_id(_radio_id))
logger.warning('(%s) Client info from Radio ID that has not logged in: %s', self._system, int_id(_radio_id))
else:
logger.error('(%s) Unrecognized command from: %s. Packet: %s', self._master, int_id(_radio_id), h(_data))
#************************************************
# HB CLIENT CLASS
#************************************************
class HBCLIENT(DatagramProtocol):
def __init__(self, *args, **kwargs):
if len(args) == 1:
self._client = args[0]
self._system = self._client
self._config = CONFIG['SYSTEMS'][self._client]
self._stats = self._config['STATS']
# Configure for AMBE audio export if enabled
if self._config['EXPORT_AMBE']:
self._ambe = AMBE()
else:
# If we didn't get called correctly, log it!
logger.error('(%s) HBCLIENT was not called with an argument. Terminating', self._client)
sys.exit()
def startProtocol(self):
# Set up periodic loop for sending pings to the master. Run every 'PING_TIME' seconds
self._client_maintenance = task.LoopingCall(self.client_maintenance_loop)
self._client_maintenance_loop = self._client_maintenance.start(CONFIG['GLOBAL']['PING_TIME'])
def client_maintenance_loop(self):
logger.debug('(%s) Client maintenance loop started', self._client)
# If we're not connected, zero out the stats and send a login request RPTL
if self._stats['CONNECTION'] == 'NO' or self._stats['CONNECTION'] == 'RTPL_SENT':
self._stats['PINGS_SENT'] = 0
self._stats['PINGS_ACKD'] = 0
self._stats['CONNECTION'] = 'RTPL_SENT'
self.send_master('RPTL'+self._config['RADIO_ID'])
logger.info('(%s) Sending login request to master %s:%s', self._client, self._config['MASTER_IP'], self._config['MASTER_PORT'])
# If we are connected, sent a ping to the master and increment the counter
if self._stats['CONNECTION'] == 'YES':
self.send_master('RPTPING'+self._config['RADIO_ID'])
self._stats['PINGS_SENT'] += 1
logger.debug('(%s) RPTPING Sent to Master. Pings Since Connected: %s', self._client, self._stats['PINGS_SENT'])
def send_master(self, _packet):
self.transport.write(_packet, (self._config['MASTER_IP'], self._config['MASTER_PORT']))
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
#logger.debug('(%s) TX Packet to %s:%s -- %s', self._client, self._config['MASTER_IP'], self._config['MASTER_PORT'], h(_packet))
# Alias for other programs to use a common name to send a packet
# regardless of the system type (MASTER or CLIENT)
send_system = send_master
def dmrd_received(self, _radio_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
pass
def datagramReceived(self, _data, (_host, _port)):
logger.error('(%s) Unrecognized command from: %s. Packet: %s', self._system, int_id(_radio_id), h(_data))
# Aliased in __init__ to datagramReceived if system is a client
def client_datagramReceived(self, _data, (_host, _port)):
# Keep This Line Commented Unless HEAVILY Debugging!
# logger.debug('(%s) RX packet from %s:%s -- %s', self._client, _host, _port, h(_data))
# logger.debug('(%s) RX packet from %s:%s -- %s', self._system, _host, _port, h(_data))
# Validate that we receveived this packet from the master - security check!
if self._config['MASTER_IP'] == _host and self._config['MASTER_PORT'] == _port:
@ -493,23 +465,14 @@ class HBCLIENT(DatagramProtocol):
_bits = int_id(_data[15])
_slot = 2 if (_bits & 0x80) else 1
_call_type = 'unit' if (_bits & 0x40) else 'group'
_raw_frame_type = (_bits & 0x30) >> 4
if _raw_frame_type == 0b00:
_frame_type = 'voice'
elif _raw_frame_type == 0b01:
_frame_type = 'voice_sync'
elif _raw_frame_type == 0b10:
_frame_type = 'data_sync'
else:
_frame_type = 'none'
_frame_type = (_bits & 0x30) >> 4
_dtype_vseq = (_bits & 0xF) # data, 1=voice header, 2=voice terminator; voice, 0=burst A ... 5=burst F
_stream_id = _data[16:20]
#logger.debug('(%s) DMRD - Seqence: %s, RF Source: %s, Destination ID: %s', self._client, h(_seq), int_id(_rf_src), int_id(_dst_id))
#logger.debug('(%s) DMRD - Seqence: %s, RF Source: %s, Destination ID: %s', self._system, int_id(_seq), int_id(_rf_src), int_id(_dst_id))
# If AMBE audio exporting is configured...
if self._config['EXPORT_AMBE']:
self._ambe.parseAMBE(self._client, _data)
self._ambe.parseAMBE(self._system, _data)
# Userland actions -- typically this is the function you subclass for an application
self.dmrd_received(_radio_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
@ -517,14 +480,14 @@ class HBCLIENT(DatagramProtocol):
elif _command == 'MSTN': # Actually MSTNAK -- a NACK from the master
_radio_id = _data[4:8]
if _radio_id == self._config['RADIO_ID']: # Validate the source and intended target
logger.warning('(%s) MSTNAK Received', self._client)
logger.warning('(%s) MSTNAK Received', self._system)
self._stats['CONNECTION'] = 'NO' # Disconnect ourselves and re-register
elif _command == 'RPTA': # Actually RPTACK -- an ACK from the master
# Depending on the state, an RPTACK means different things, in each clause, we check and/or set the state
if self._stats['CONNECTION'] == 'RTPL_SENT': # If we've sent a login request...
_login_int32 = _data[6:10]
logger.info('(%s) Repeater Login ACK Received with 32bit ID: %s', self._client, int_id(_login_int32))
logger.info('(%s) Repeater Login ACK Received with 32bit ID: %s', self._system, int_id(_login_int32))
_pass_hash = sha256(_login_int32+self._config['PASSPHRASE']).hexdigest()
_pass_hash = a(_pass_hash)
self.send_master('RPTK'+self._config['RADIO_ID']+_pass_hash)
@ -532,7 +495,7 @@ class HBCLIENT(DatagramProtocol):
elif self._stats['CONNECTION'] == 'AUTHENTICATED': # If we've sent the login challenge...
if _data[6:10] == self._config['RADIO_ID']:
logger.info('(%s) Repeater Authentication Accepted', self._client)
logger.info('(%s) Repeater Authentication Accepted', self._system)
_config_packet = self._config['RADIO_ID']+\
self._config['CALLSIGN']+\
self._config['RX_FREQ']+\
@ -551,32 +514,32 @@ class HBCLIENT(DatagramProtocol):
self.send_master('RPTC'+_config_packet)
self._stats['CONNECTION'] = 'CONFIG-SENT'
logger.info('(%s) Repeater Configuration Sent', self._client)
logger.info('(%s) Repeater Configuration Sent', self._system)
else:
self._stats['CONNECTION'] = 'NO'
logger.error('(%s) Master ACK Contained wrong ID - Connection Reset', self._client)
logger.error('(%s) Master ACK Contained wrong ID - Connection Reset', self._system)
elif self._stats['CONNECTION'] == 'CONFIG-SENT': # If we've sent out configuration to the master
if _data[6:10] == self._config['RADIO_ID']:
logger.info('(%s) Repeater Configuration Accepted', self._client)
logger.info('(%s) Repeater Configuration Accepted', self._system)
self._stats['CONNECTION'] = 'YES'
logger.info('(%s) Connection to Master Completed', self._client)
logger.info('(%s) Connection to Master Completed', self._system)
else:
self._stats['CONNECTION'] = 'NO'
logger.error('(%s) Master ACK Contained wrong ID - Connection Reset', self._client)
logger.error('(%s) Master ACK Contained wrong ID - Connection Reset', self._system)
elif _command == 'MSTP': # Actually MSTPONG -- a reply to RPTPING (send by client)
if _data [7:11] == self._config['RADIO_ID']:
self._stats['PINGS_ACKD'] += 1
logger.debug('(%s) MSTPONG Received. Pongs Since Connected: %s', self._client, self._stats['PINGS_ACKD'])
logger.debug('(%s) MSTPONG Received. Pongs Since Connected: %s', self._system, self._stats['PINGS_ACKD'])
elif _command == 'MSTC': # Actually MSTCL -- notify us the master is closing down
if _data[5:9] == self._config['RADIO_ID']:
self._stats['CONNECTION'] = 'NO'
logger.info('(%s) MSTCL Recieved', self._client)
logger.info('(%s) MSTCL Recieved', self._system)
else:
logger.error('(%s) Received an invalid command in packet: %s', self._client, h(_data))
logger.error('(%s) Received an invalid command in packet: %s', self._system, h(_data))
#************************************************
@ -589,10 +552,7 @@ if __name__ == '__main__':
# HBlink instance creation
for system in CONFIG['SYSTEMS']:
if CONFIG['SYSTEMS'][system]['ENABLED']:
if CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER':
systems[system] = HBMASTER(system)
elif CONFIG['SYSTEMS'][system]['MODE'] == 'CLIENT':
systems[system] = HBCLIENT(system)
systems[system] = HBSYSTEM(system)
reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP'])
logger.debug('%s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])

File diff suppressed because it is too large Load Diff

18
qr.py
View File

@ -91,4 +91,20 @@ def decode(_data):
if __name__ == '__main__':
from binascii import b2a_hex as h
from time import time
from time import time
EMB_bits = [0,0,0,0]
EMB_bits[0] = bitarray('0001000') # 111100010
EMB_bits[1] = bitarray('0001001') # 110010001
EMB_bits[2] = bitarray('0001010') # 100000111
EMB_bits[3] = bitarray('0001011') # 101110100
print(EMB_bits)
for seq in xrange(4):
out = 0
for bit in EMB_bits[seq]:
out = (out << 1) | bit
print(out)
emb = ENCODE_1676[out]
print(bin(emb))

View File

4
sub_acl.py Normal file
View File

@ -0,0 +1,4 @@
ACL_ACTION = "DENY" # May be PERMIT|DENY
ACL = [
1,2,3,4,5,6,7,8,9,10,100
]

File diff suppressed because it is too large Load Diff

View File

@ -1 +1 @@
1,Worldwide 2,Local 3,North America 9,BrandMeister 13,Worldwide English 310,TAC 310 3100,DCI Bridge 2 3160,DCI 1 3169,Midwest 3172,Northeast 3174,Southeast 3112,Flordia 3120,Kansas Statewide 3125,Massachussetts 3129,Missouri 3777215,DCI Comm 1 9998,Echo Server
1,Worldwide 2,Local 3,North America 9,BrandMeister 13,Worldwide English 310,TAC 310 3100,DCI Bridge 2 3160,DCI 1 3169,Midwest 3172,Northeast 3174,Southeast 3112,Flordia 3120,Kansas Statewide 3125,Massachussetts 3129,Missouri 31201,BYRG KC 3777215,DCI Comm 1 9998,Echo Server
1 1 Worldwide 2 Local 3 North America 9 BrandMeister 13 Worldwide English 310 TAC 310 3100 DCI Bridge 2 3160 DCI 1 3169 Midwest 3172 Northeast 3174 Southeast 3112 Flordia 3120 Kansas Statewide 3125 Massachussetts 3129 Missouri 3777215 Missouri 31201 BYRG KC 3777215 DCI Comm 1 9998 Echo Server