change sending que path, generate dictionaries, moved locap_apps and authorized_users to rules.py

This commit is contained in:
KF7EEL 2021-04-25 09:35:58 -07:00
parent aa3ae3d3a1
commit dd4c7d91a1
6 changed files with 71 additions and 33 deletions

View File

@ -178,12 +178,11 @@ USER_SETTINGS_FILE: /path/to/user_settings.txt
# Server name - can be different than dashboard title, used to identify this network/server
USE_API: True
AUTHORIZED_TOKENS_FILE: /tmp/hblink_auth_tokens.txt
AUTHORIZED_USERS_FILE: /path/to/authorized_users.txt
ACCESS_SYSTEMS_FILE: /path/to/access_systems.txt
MY_API_NAME: ABC
MY_SERVER_SHORTCUT: ABC
SERVER_NAME: Test HBLink Network
USE_PUBLIC_APPS: True
PUBLIC_APPS_LIST: https://raw.githubusercontent.com/kf7eel/hblink_sms_external_apps/main/public_systems.txt
RULES_PATH: /path/to/rules.py
# The following options are used for the dashboard. The dashboard is optional.

View File

@ -255,7 +255,7 @@ def send_app_request(url, message, source_id):
auth_token.close()
app_request = {
'mode':'app',
'system_shortcut':CONFIG['GPS_DATA']['MY_API_NAME'],
'system_shortcut':CONFIG['GPS_DATA']['MY_SERVER_SHORTCUT'],
'server_name':CONFIG['GPS_DATA']['SERVER_NAME'],
'response_url':CONFIG['GPS_DATA']['DASHBOARD_URL'] + '/api',
'auth_token':the_token,
@ -276,7 +276,7 @@ def send_msg_xfer(url, user, password, message, source_id, dest_id):
url = url + '/api/msg_xfer'
msg_xfer = {
'mode':'msg_xfer',
'system_shortcut':CONFIG['GPS_DATA']['MY_API_NAME'],
'system_shortcut':CONFIG['GPS_DATA']['MY_SERVER_SHORTCUT'],
'response_url':CONFIG['GPS_DATA']['DASHBOARD_URL'] + '/api',
'auth_type':'private',
'credentials': {
@ -724,7 +724,7 @@ def create_sms_seq(dst_id, src_id, peer_id, _slot, _call_type, dmr_string):
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 7, rand_seq, i)#(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
mmdvm_send_seq.append(ahex(the_mmdvm_pkt))
cap_in = cap_in + 1
with open('/tmp/.hblink_data_que/' + str(random.randint(1000, 9999)) + '.mmdvm_seq', "w") as packet_write_file:
with open('/tmp/.hblink_data_que_' + str(CONFIG['GPS_DATA']['APRS_LOGIN_CALL']).upper() + '/' + str(random.randint(1000, 9999)) + '.mmdvm_seq', "w") as packet_write_file:
packet_write_file.write(str(mmdvm_send_seq))
return mmdvm_send_seq
@ -827,9 +827,9 @@ def data_que_send():
#logger.info('Check SMS que')
try:
#logger.info(UNIT_MAP)
for packet_file in os.listdir('/tmp/.hblink_data_que/'):
for packet_file in os.listdir('/tmp/.hblink_data_que_' + str(CONFIG['GPS_DATA']['APRS_LOGIN_CALL']).upper() + '/'):
snd_seq = ast.literal_eval(os.popen('cat /tmp/.hblink_data_que/' + packet_file).read())
snd_seq = ast.literal_eval(os.popen('cat /tmp/.hblink_data_que_' + str(CONFIG['GPS_DATA']['APRS_LOGIN_CALL']).upper() + '/' + packet_file).read())
for data in snd_seq:
# Get dest id
@ -856,7 +856,7 @@ def data_que_send():
reactor.callFromThread(systems[data_target].send_system,bytes.fromhex(re.sub("b'|'", '', str(data))))
logger.info('Sending data to ' + str(data[10:16])[2:-1] + ' on system ' + data_target)
os.system('rm /tmp/.hblink_data_que/' + packet_file)
os.system('rm /tmp/.hblink_data_que_' + str(CONFIG['GPS_DATA']['APRS_LOGIN_CALL']).upper() + '/' + packet_file)
#routerHBP.send_peer('MASTER-2', bytes.fromhex(re.sub("b'|'", '', str(data))))
## os.system('rm /tmp/.hblink_data_que/' + packet_file)
@ -2331,7 +2331,7 @@ if __name__ == '__main__':
mailbox_file.write("[]")
mailbox_file.close()
try:
Path('/tmp/.hblink_data_que/').mkdir(parents=True, exist_ok=True)
Path('/tmp/.hblink_data_que_' + str(CONFIG['GPS_DATA']['APRS_LOGIN_CALL']).upper() + '/').mkdir(parents=True, exist_ok=True)
except:
logger.info('Unable to create data que directory')
pass

View File

@ -80,11 +80,11 @@ authorized_users = {
## 'user':'test_name',
## 'password':'passw0rd'
## },
## 'DEF':{
## 'mode':'msg_xfer',
## 'user':'test_name',
## 'password':'passw0rd'
## }
'XYZ':{
'mode':'msg_xfer',
'user':'test_name',
'password':'passw0rd'
}
}
local_systems = {

View File

@ -31,6 +31,8 @@ from datetime import datetime
import argparse
from configparser import ConfigParser
from send_sms import *
import importlib.util
import requests
@ -199,6 +201,41 @@ def user_setting_write(dmr_id, input_ssid, input_icon, input_comment, input_aprs
user_dict_file.close()
def generate_apps():
global access_systems, authorized_users
mod = importlib.util.spec_from_file_location("rules_data", parser.get('GPS_DATA', 'RULES_PATH'))
rules = importlib.util.module_from_spec(mod)
mod.loader.exec_module(rules)
local_apps = rules.local_apps
authorized_users = rules.authorized_users
#rules_data = ast.literal_eval(os.popen('cat ' + parser.get('GPS_DATA', 'RULES_PATH')).read())
#rules_data
public_systems_file = requests.get(parser.get('GPS_DATA', 'PUBLIC_APPS_LIST'))
public_apps = ast.literal_eval(public_systems_file.text)
access_systems = {}
#combined = public_apps.items() + local_acess_systems.items()
print(type(parser.get('GPS_DATA', 'USE_PUBLIC_APPS')))
if parser.get('GPS_DATA', 'USE_PUBLIC_APPS') == 'True':
for i in public_apps.items():
key = str(i[0])
access_systems[key] = i[1]
for i in local_apps.items():
key = str(i[0])
access_systems[key] = i[1]
print(access_systems)
print(authorized_users)
print(local_apps)
#print(rules_data)
#print(type(public_apps))
#print(type(local_acess_systems))
#print()
#print(combined)
#print(local_acess_systems.update(public_apps))
#return access_systems
@app.route('/')
def index():
value = Markup('<strong>The HTML String</strong>')
@ -223,7 +260,7 @@ def about():
@app.route('/external_apps')
def external_apps():
access_systems = ast.literal_eval(os.popen('cat ' + access_systems_file).read())
#access_systems = ast.literal_eval(os.popen('cat ' + access_systems_file).read())
msg_lst = ''
app_lst = ''
for i_msg in access_systems.items():
@ -793,8 +830,8 @@ def api(api_mode=None):
api_content = '<h3 style="text-align: center;"><strong>API Enabled: ' + str(use_api) + '</strong></h3>'
return render_template('generic.html', title = dashboard_title, dashboard_url = dashboard_url, logo = logo, content = Markup(api_content), api = use_api)
if use_api == 'True' or use_api == "true":
access_systems = ast.literal_eval(os.popen('cat ' + access_systems_file).read())
authorized_users = ast.literal_eval(os.popen('cat ' + authorized_users_file).read())
#access_systems = ast.literal_eval(os.popen('cat ' + access_systems_file).read())
#authorized_users = ast.literal_eval(os.popen('cat ' + authorized_users_file).read())
api_data = request.json
#print(type(api_data))
#print((api_data))
@ -822,7 +859,7 @@ def api(api_mode=None):
send_slot = 0
if sms_data['slot'] == 2:
send_slot = 1
send_sms(False, sms_data['destination_id'], sms_data['source_id'], 0000, 'unit', send_slot, sms_data['message'])
send_sms(False, sms_data['destination_id'], sms_data['source_id'], 0000, 'unit', send_slot, sms_data['message'], que_dir)
return jsonify(
mode=api_data['mode'],
status='Generated SMS',
@ -883,7 +920,7 @@ def api(api_mode=None):
send_slot = 0
if sms_data['slot'] == 2:
send_slot = 1
send_sms(False, sms_data['destination_id'], 0000, 0000, 'unit', send_slot, sms_data['message'])
send_sms(False, sms_data['destination_id'], 0000, 0000, 'unit', send_slot, sms_data['message'], que_dir)
new_auth_file = auth_file
with open(auth_token_file, 'w') as auth_token:
auth_token.write(str(auth_file))
@ -973,9 +1010,11 @@ if __name__ == '__main__':
auth_token_file = parser.get('GPS_DATA', 'AUTHORIZED_TOKENS_FILE')
use_api = parser.get('GPS_DATA', 'USE_API')
access_systems_file = parser.get('GPS_DATA', 'ACCESS_SYSTEMS_FILE')
authorized_users_file = parser.get('GPS_DATA', 'AUTHORIZED_USERS_FILE')
#access_systems_file = parser.get('GPS_DATA', 'ACCESS_SYSTEMS_FILE')
#authorized_users_file = parser.get('GPS_DATA', 'AUTHORIZED_USERS_FILE')
que_dir = '/tmp/.hblink_data_que_' + str(parser.get('GPS_DATA', 'APRS_LOGIN_CALL').upper()) + '/'
generate_apps()
#Only create if API enabled
if use_api == True:
if Path(auth_token_file).is_file():
@ -992,8 +1031,9 @@ if __name__ == '__main__':
try:
#global authorized_users, other_systems
#from authorized_apps import authorized_users, access_systems
access_systems = ast.literal_eval(os.popen('cat ' + access_systems_file).read())
authorized_users = ast.literal_eval(os.popen('cat ' + authorized_users_file).read())
#access_systems = ast.literal_eval(os.popen('cat ' + access_systems_file).read())
#authorized_users = ast.literal_eval(os.popen('cat ' + authorized_users_file).read())
print('generaty')
except Exception as e:
print(e)

View File

@ -167,7 +167,7 @@ def dmr_encode(packet_list, _slot):
return send_seq
def create_sms_seq(dst_id, src_id, peer_id, _slot, _call_type, dmr_string):
def create_sms_seq(dst_id, src_id, peer_id, _slot, _call_type, dmr_string, que_dir):
rand_seq = random.randint(1, 999999)
block_seq = block_sequence(dmr_string)
dmr_list = dmr_encode(block_seq, _slot)
@ -197,12 +197,12 @@ def create_sms_seq(dst_id, src_id, peer_id, _slot, _call_type, dmr_string):
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 7, rand_seq, i)#(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
mmdvm_send_seq.append(ahex(the_mmdvm_pkt))
cap_in = cap_in + 1
with open('/tmp/.hblink_data_que/' + str(random.randint(1000, 9999)) + '.mmdvm_seq', "w") as packet_write_file:
with open(que_dir + str(random.randint(1000, 9999)) + '.mmdvm_seq', "w") as packet_write_file:
packet_write_file.write(str(mmdvm_send_seq))
return mmdvm_send_seq
try:
Path('/tmp/.hblink_data_que/').mkdir(parents=True, exist_ok=True)
Path(que_dir).mkdir(parents=True, exist_ok=True)
except:
pass
@ -271,7 +271,7 @@ def gen_header(to_id, from_id, call_type):
seq_header = '824A' + to_id + from_id + '9550'
return seq_header
def send_sms(csbk, to_id, from_id, peer_id, call_type, slot, msg):
def send_sms(csbk, to_id, from_id, peer_id, call_type, slot, msg, que_dir):
global use_csbk
#to_id = str(hex(to_id))[2:].zfill(6)
#from_id = str(hex(from_id))[2:].zfill(6)
@ -287,10 +287,10 @@ def send_sms(csbk, to_id, from_id, peer_id, call_type, slot, msg):
new_call_type = 0
if csbk == 'yes':
use_csbk = True
create_sms_seq(to_id, from_id, peer_id, int(slot), new_call_type, csbk_gen(to_id, from_id) + create_crc16(gen_header(to_id, from_id, new_call_type)) + create_crc32(format_sms(msg, to_id, from_id)))
create_sms_seq(to_id, from_id, peer_id, int(slot), new_call_type, csbk_gen(to_id, from_id) + create_crc16(gen_header(to_id, from_id, new_call_type)) + create_crc32(format_sms(msg, to_id, from_id)), que_dir)
else:
use_csbk = False
create_sms_seq(to_id, from_id, peer_id, int(slot), new_call_type, create_crc16(gen_header(to_id, from_id, new_call_type)) + create_crc32(format_sms(msg, to_id, from_id)))
create_sms_seq(to_id, from_id, peer_id, int(slot), new_call_type, create_crc16(gen_header(to_id, from_id, new_call_type)) + create_crc32(format_sms(msg, to_id, from_id)), que_dir)
print('Call type: ' + call_type)
print('Destination: ' + str(to_id))
print('Source: ' + str(from_id))

View File

@ -172,13 +172,12 @@ def build_config(_config_file):
'USER_SETTINGS_FILE': config.get(section, 'USER_SETTINGS_FILE'),
'USE_API': config.getboolean(section, 'USE_API'),
'AUTHORIZED_TOKENS_FILE': config.get(section, 'AUTHORIZED_TOKENS_FILE'),
'AUTHORIZED_USERS_FILE': config.get(section, 'AUTHORIZED_USERS_FILE'),
'ACCESS_SYSTEMS_FILE': config.get(section, 'ACCESS_SYSTEMS_FILE'),
'USE_PUBLIC_APPS': config.getboolean(section, 'USE_PUBLIC_APPS'),
'PUBLIC_APPS_LIST': config.get(section, 'PUBLIC_APPS_LIST'),
'MY_API_NAME': config.get(section, 'MY_API_NAME'),
'MY_SERVER_SHORTCUT': config.get(section, 'MY_SERVER_SHORTCUT'),
'DASHBOARD_URL': config.get(section, 'DASHBOARD_URL'),
'SERVER_NAME': config.get(section, 'SERVER_NAME'),
'RULES_PATH': config.get(section, 'RULES_PATH'),
})