mirror of https://github.com/craigerl/aprsd.git
180 lines
6.3 KiB
Python
180 lines
6.3 KiB
Python
import logging
|
|
import re
|
|
import time
|
|
|
|
from geopy.geocoders import ArcGIS, AzureMaps, Baidu, Bing, GoogleV3
|
|
from geopy.geocoders import HereV7, Nominatim, OpenCage, TomTom, What3WordsV3, Woosmap
|
|
from oslo_config import cfg
|
|
|
|
from aprsd import packets, plugin, plugin_utils
|
|
from aprsd.utils import trace
|
|
|
|
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger("APRSD")
|
|
|
|
|
|
class UsLocation:
|
|
raw = {}
|
|
|
|
def __init__(self, info):
|
|
self.info = info
|
|
|
|
def __str__(self):
|
|
return self.info
|
|
|
|
|
|
class USGov:
|
|
"""US Government geocoder that uses the geopy API.
|
|
|
|
This is a dummy class the implements the geopy reverse API,
|
|
so the factory can return an object that conforms to the API.
|
|
"""
|
|
def reverse(self, coordinates):
|
|
"""Reverse geocode a coordinate."""
|
|
LOG.info(f"USGov reverse geocode {coordinates}")
|
|
coords = coordinates.split(",")
|
|
lat = float(coords[0])
|
|
lon = float(coords[1])
|
|
result = plugin_utils.get_weather_gov_for_gps(lat, lon)
|
|
# LOG.info(f"WEATHER: {result}")
|
|
# LOG.info(f"area description {result['location']['areaDescription']}")
|
|
if 'location' in result:
|
|
loc = UsLocation(result['location']['areaDescription'])
|
|
else:
|
|
loc = UsLocation("Unknown Location")
|
|
|
|
LOG.info(f"USGov reverse geocode LOC {loc}")
|
|
return loc
|
|
|
|
|
|
def geopy_factory():
|
|
"""Factory function for geopy geocoders."""
|
|
geocoder = CONF.location_plugin.geopy_geocoder
|
|
LOG.info(f"Using geocoder: {geocoder}")
|
|
user_agent = CONF.location_plugin.user_agent
|
|
LOG.info(f"Using user_agent: {user_agent}")
|
|
|
|
if geocoder == "Nominatim":
|
|
return Nominatim(user_agent=user_agent)
|
|
elif geocoder == "USGov":
|
|
return USGov()
|
|
elif geocoder == "ArcGIS":
|
|
return ArcGIS(
|
|
username=CONF.location_plugin.arcgis_username,
|
|
password=CONF.location_plugin.arcgis_password,
|
|
user_agent=user_agent,
|
|
)
|
|
elif geocoder == "AzureMaps":
|
|
return AzureMaps(
|
|
user_agent=user_agent,
|
|
subscription_key=CONF.location_plugin.azuremaps_subscription_key,
|
|
)
|
|
elif geocoder == "Baidu":
|
|
return Baidu(user_agent=user_agent, api_key=CONF.location_plugin.baidu_api_key)
|
|
elif geocoder == "Bing":
|
|
return Bing(user_agent=user_agent, api_key=CONF.location_plugin.bing_api_key)
|
|
elif geocoder == "GoogleV3":
|
|
return GoogleV3(user_agent=user_agent, api_key=CONF.location_plugin.google_api_key)
|
|
elif geocoder == "HERE":
|
|
return HereV7(user_agent=user_agent, api_key=CONF.location_plugin.here_api_key)
|
|
elif geocoder == "OpenCage":
|
|
return OpenCage(user_agent=user_agent, api_key=CONF.location_plugin.opencage_api_key)
|
|
elif geocoder == "TomTom":
|
|
return TomTom(user_agent=user_agent, api_key=CONF.location_plugin.tomtom_api_key)
|
|
elif geocoder == "What3Words":
|
|
return What3WordsV3(user_agent=user_agent, api_key=CONF.location_plugin.what3words_api_key)
|
|
elif geocoder == "Woosmap":
|
|
return Woosmap(user_agent=user_agent, api_key=CONF.location_plugin.woosmap_api_key)
|
|
else:
|
|
raise ValueError(f"Unknown geocoder: {geocoder}")
|
|
|
|
|
|
class LocationPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
|
|
"""Location!"""
|
|
|
|
command_regex = r"^([l]|[l]\s|location)"
|
|
command_name = "location"
|
|
short_description = "Where in the world is a CALLSIGN's last GPS beacon?"
|
|
|
|
def setup(self):
|
|
self.ensure_aprs_fi_key()
|
|
|
|
@trace.trace
|
|
def process(self, packet: packets.MessagePacket):
|
|
LOG.info("Location Plugin")
|
|
fromcall = packet.from_call
|
|
message = packet.get("message_text", None)
|
|
|
|
api_key = CONF.aprs_fi.apiKey
|
|
|
|
# optional second argument is a callsign to search
|
|
a = re.search(r"^.*\s+(.*)", message)
|
|
if a is not None:
|
|
searchcall = a.group(1)
|
|
searchcall = searchcall.upper()
|
|
else:
|
|
# if no second argument, search for calling station
|
|
searchcall = fromcall
|
|
|
|
try:
|
|
aprs_data = plugin_utils.get_aprs_fi(api_key, searchcall)
|
|
except Exception as ex:
|
|
LOG.error(f"Failed to fetch aprs.fi '{ex}'")
|
|
return "Failed to fetch aprs.fi location"
|
|
|
|
LOG.debug(f"LocationPlugin: aprs_data = {aprs_data}")
|
|
if not len(aprs_data["entries"]):
|
|
LOG.error("Didn't get any entries from aprs.fi")
|
|
return "Failed to fetch aprs.fi location"
|
|
|
|
lat = float(aprs_data["entries"][0]["lat"])
|
|
lon = float(aprs_data["entries"][0]["lng"])
|
|
|
|
# Get some information about their location
|
|
try:
|
|
tic = time.perf_counter()
|
|
geolocator = geopy_factory()
|
|
LOG.info(f"Using GEOLOCATOR: {geolocator}")
|
|
coordinates = f"{lat:0.6f}, {lon:0.6f}"
|
|
location = geolocator.reverse(coordinates)
|
|
address = location.raw.get("address")
|
|
LOG.debug(f"GEOLOCATOR address: {address}")
|
|
toc = time.perf_counter()
|
|
if address:
|
|
LOG.info(f"Geopy address {address} took {toc - tic:0.4f}")
|
|
if address.get("country_code") == "us":
|
|
area_info = f"{address.get('county')}, {address.get('state')}"
|
|
else:
|
|
# what to do for address for non US?
|
|
area_info = f"{address.get('country'), 'Unknown'}"
|
|
else:
|
|
area_info = str(location)
|
|
except Exception as ex:
|
|
LOG.error(ex)
|
|
LOG.error(f"Failed to fetch Geopy address {ex}")
|
|
area_info = "Unknown Location"
|
|
|
|
try: # altitude not always provided
|
|
alt = float(aprs_data["entries"][0]["altitude"])
|
|
except Exception:
|
|
alt = 0
|
|
altfeet = int(alt * 3.28084)
|
|
aprs_lasttime_seconds = aprs_data["entries"][0]["lasttime"]
|
|
# aprs_lasttime_seconds = aprs_lasttime_seconds.encode(
|
|
# "ascii", errors="ignore"
|
|
# ) # unicode to ascii
|
|
delta_seconds = time.time() - int(aprs_lasttime_seconds)
|
|
delta_hours = delta_seconds / 60 / 60
|
|
|
|
reply = "{}: {} {}' {},{} {}h ago".format(
|
|
searchcall,
|
|
area_info,
|
|
str(altfeet),
|
|
f"{lat:0.2f}",
|
|
f"{lon:0.2f}",
|
|
str("%.1f" % round(delta_hours, 1)),
|
|
).rstrip()
|
|
|
|
return reply
|