From 28a2e7ba70c5513a3797a0a63ce7bf225246273b Mon Sep 17 00:00:00 2001 From: KF7EEL Date: Mon, 30 Nov 2020 12:38:50 -0800 Subject: [PATCH] APRS position via SMS for grid square --- gps_data.py | 98 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 90 insertions(+), 8 deletions(-) diff --git a/gps_data.py b/gps_data.py index 94b06c7..c602e11 100644 --- a/gps_data.py +++ b/gps_data.py @@ -136,6 +136,16 @@ def aprs_send(packet): AIS.sendall(packet) AIS.close() +# Thanks for this forum post for this - https://stackoverflow.com/questions/2579535/convert-dd-decimal-degrees-to-dms-degrees-minutes-seconds-in-python + +def decdeg2dms(dd): + is_positive = dd >= 0 + dd = abs(dd) + minutes,seconds = divmod(dd*3600,60) + degrees,minutes = divmod(minutes,60) + degrees = degrees if is_positive else -degrees + return (degrees,minutes,seconds) + def user_setting_write(dmr_id, setting, value): ## try: # Open file and load as dict for modification @@ -163,22 +173,93 @@ def user_setting_write(dmr_id, setting, value): ## #Path('./user_settings.txt').mkdir(parents=True, exist_ok=True) ## Path('./user_settings.txt').touch() +##def retrieve_aprs_settings(_rf_src): +## user_settings = ast.literal_eval(os.popen('cat ./user_settings.txt').read()) +## if int_id(_rf_src) not in user_settings: +## aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + str(user_ssid) + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(loc.lat[0:7]) + str(loc.lat_dir) + '/' + str(loc.lon[0:8]) + str(loc.lon_dir) + '[' + str(round(loc.true_course)).zfill(3) + '/' + str(round(loc.spd_over_grnd)).zfill(3) + '/' + aprs_comment + ' DMR ID: ' + str(int_id(_rf_src)) +## else: +## if user_settings[int_id(_rf_src)][1]['ssid'] == '': +## ssid = user_ssid +## if user_settings[int_id(_rf_src)][3]['comment'] == '': +## comment = aprs_comment + ' DMR ID: ' + str(int_id(_rf_src)) +## if user_settings[int_id(_rf_src)][2]['icon'] == '': +## icon_table = '/' +## icon_icon = '[' +## if user_settings[int_id(_rf_src)][2]['icon'] != '': +## icon_table = user_settings[int_id(_rf_src)][2]['icon'][0] +## icon_icon = user_settings[int_id(_rf_src)][2]['icon'][1] +## if user_settings[int_id(_rf_src)][1]['ssid'] != '': +## ssid = user_settings[int_id(_rf_src)][1]['ssid'] +## if user_settings[int_id(_rf_src)][3]['comment'] != '': +## comment = user_settings[int_id(_rf_src)][3]['comment'] +## return ssid, icon, comment +## # Process SMS, do something bases on message -def process_sms(from_id, sms): +def process_sms(_rf_src, sms): if sms == 'ID': logger.info(str(get_alias(int_id(from_id), subscriber_ids)) + ' - ' + str(int_id(from_id))) if sms == 'TEST': logger.info('It works!') if '@ICON' in sms: - user_setting_write(int_id(from_id), re.sub(' .*|@','',sms), re.sub('@ICON| ','',sms)) + user_setting_write(int_id(_rf_src), re.sub(' .*|@','',sms), re.sub('@ICON| ','',sms)) if '@SSID' in sms: - user_setting_write(int_id(from_id), re.sub(' .*|@','',sms), re.sub('@SSID| ','',sms)) + user_setting_write(int_id(_rf_src), re.sub(' .*|@','',sms), re.sub('@SSID| ','',sms)) if '@COM' in sms: - user_setting_write(int_id(from_id), re.sub(' .*|@','',sms), re.sub('@COM |@COM','',sms)) + user_setting_write(int_id(_rf_src), re.sub(' .*|@','',sms), re.sub('@COM |@COM','',sms)) if '@MH' in sms: - pass + grid_square = re.sub('@MH ', '', sms) + if len(grid_square) < 6: + pass + else: + lat = decdeg2dms(mh.to_location(grid_square)[0]) + lon = decdeg2dms(mh.to_location(grid_square)[1]) + + if lon[0] < 0: + lon_dir = 'W' + if lon[0] > 0: + lon_dir = 'E' + if lat[0] < 0: + lat_dir = 'S' + if lat[0] > 0: + lat_dir = 'N' + #logger.info(lat) + #logger.info(lat_dir) + aprs_lat = str(str(re.sub('\..*|-', '', str(lat[0]))) + str(re.sub('\..*', '', str(lat[1])) + '.').ljust(5) + lat_dir) + aprs_lon = str(str(re.sub('\..*|-', '', str(lon[0]))) + str(re.sub('\..*', '', str(lon[1])) + '.').ljust(5) + lon_dir) + #logger.info(mh.to_location(grid_square)) + #logger.info(str(lat) + ', ' + str(lon)) + logger.info('Latitude: ' + str(aprs_lat)) + logger.info('Longitude: ' + str(aprs_lon)) + user_settings = ast.literal_eval(os.popen('cat ./user_settings.txt').read()) + if int_id(_rf_src) not in user_settings: + aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + str(user_ssid) + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(aprs_lat) + '/' + str(aprs_lon) + '[/' + aprs_comment + ' DMR ID: ' + str(int_id(_rf_src)) + else: + if user_settings[int_id(_rf_src)][1]['ssid'] == '': + ssid = user_ssid + if user_settings[int_id(_rf_src)][3]['comment'] == '': + comment = aprs_comment + ' DMR ID: ' + str(int_id(_rf_src)) + if user_settings[int_id(_rf_src)][2]['icon'] == '': + icon_table = '/' + icon_icon = '[' + if user_settings[int_id(_rf_src)][2]['icon'] != '': + icon_table = user_settings[int_id(_rf_src)][2]['icon'][0] + icon_icon = user_settings[int_id(_rf_src)][2]['icon'][1] + if user_settings[int_id(_rf_src)][1]['ssid'] != '': + ssid = user_settings[int_id(_rf_src)][1]['ssid'] + if user_settings[int_id(_rf_src)][3]['comment'] != '': + comment = user_settings[int_id(_rf_src)][3]['comment'] + aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + ssid + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(aprs_lat) + icon_table + str(aprs_lon) + icon_icon + '/' + str(comment) + logger.info(aprs_loc_packet) + try: + aprslib.parse(aprs_loc_packet) + aprs_send(aprs_loc_packet) + except: + logger.info('Exception. Not uploaded') + packet_assembly = '' + + try: if sms in cmd_list: logger.info('Executing command/script.') @@ -250,8 +331,8 @@ class DATA_SYSTEM(HBSYSTEM): ## aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + str(user_ssid) + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(final_packet[29:36]) + str(final_packet[39]) + '/' + str(re.sub(',', '', final_packet[41:49])) + str(final_packet[52]) + '[/' + aprs_comment + ' DMR ID: ' + str(int_id(_rf_src)) try: # Disable opening file for reading to reduce "collision" or reading and writing at same time. -## with open("./user_settings.txt", 'r') as f: -## user_settings = ast.literal_eval(f.read()) + with open("./user_settings.txt", 'r') as f: + user_settings = ast.literal_eval(f.read()) user_settings = ast.literal_eval(os.popen('cat ./user_settings.txt').read()) if int_id(_rf_src) not in user_settings: aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + str(user_ssid) + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(loc.lat[0:7]) + str(loc.lat_dir) + '/' + str(loc.lon[0:8]) + str(loc.lon_dir) + '[' + str(round(loc.true_course)).zfill(3) + '/' + str(round(loc.spd_over_grnd)).zfill(3) + '/' + aprs_comment + ' DMR ID: ' + str(int_id(_rf_src)) @@ -270,7 +351,8 @@ class DATA_SYSTEM(HBSYSTEM): ssid = user_settings[int_id(_rf_src)][1]['ssid'] if user_settings[int_id(_rf_src)][3]['comment'] != '': comment = user_settings[int_id(_rf_src)][3]['comment'] - aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + ssid + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(loc.lat[0:7]) + str(loc.lat_dir) + icon_table + str(loc.lon[0:8]) + str(loc.lon_dir) + icon_icon + str(round(loc.true_course)).zfill(3) + '/' + str(round(loc.spd_over_grnd)).zfill(3) + '/' + str(comment) + #logger.info(retrieve_aprs_settings(_rf_src)) + aprs_loc_packet = str(get_alias(int_id(_rf_src), subscriber_ids)) + '-' + ssid + '>APRS,TCPIP*:/' + str(datetime.datetime.utcnow().strftime("%H%M%Sh")) + str(loc.lat[0:7]) + str(loc.lat_dir) + icon_table + str(loc.lon[0:8]) + str(loc.lon_dir) + icon_icon + str(round(loc.true_course)).zfill(3) + '/' + str(round(loc.spd_over_grnd)).zfill(3) + '/' + str(comment) logger.info(aprs_loc_packet) logger.info('User comment: ' + comment) logger.info('User SSID: ' + ssid)