"""Main module.""" import datetime import json import logging import queue import time import paho.mqtt.client as mqtt from aprsd import plugin, threads from aprsd.threads import tx from aprsd.packets import core from oslo_config import cfg from aprsd_weewx_plugin import conf # noqa CONF = cfg.CONF LOG = logging.getLogger("APRSD") class ClearableQueue(queue.Queue): def clear(self): try: while True: self.get_nowait() except queue.Empty: pass class WeewxMQTTPlugin(plugin.APRSDRegexCommandPluginBase): """Weather Syntax of request weather """ version = "1.0" command_regex = "^[wW]" command_name = "weather" enabled = False def setup(self): """Ensure that the plugin has been configured.""" self.enabled = True if not CONF.aprsd_weewx_plugin.mqtt_host: LOG.error("aprsd_weewx_plugin.mqtt_host is not set in config!") self.enabled = False if not CONF.aprsd_weewx_plugin.mqtt_user: LOG.warning("aprsd_weewx_plugin.mqtt_user is not set") if not CONF.aprsd_weewx_plugin.mqtt_password: LOG.warning("aprsd_weewx_plugin.mqtt_password is not set") def create_threads(self): if self.enabled: LOG.info("Creating WeewxMQTTThread") self.queue = ClearableQueue(maxsize=1) self.wx_queue = ClearableQueue(maxsize=1) mqtt_thread = WeewxMQTTThread( wx_queue=self.wx_queue, msg_queue=self.queue, ) threads = [mqtt_thread] # if we have position and a callsign to report # Then we can periodically report weather data # to APRS if ( CONF.aprsd_weewx_plugin.latitude and CONF.aprsd_weewx_plugin.longitude ): LOG.info("Creating WeewxWXAPRSThread") wx_thread = WeewxWXAPRSThread( wx_queue=self.wx_queue, ) threads.append(wx_thread) else: LOG.info( "NOT starting Weewx WX APRS Thread due to missing " "GPS location settings. Please set " "aprsd_weewx_plugin.latitude and " "aprsd_weewx_plugin.longitude to start reporting as an " "aprs weather station.", ) return threads else: LOG.info("WeewxMQTTPlugin not enabled due to missing config.") def process(self, packet): LOG.info("WeewxMQTT Plugin") packet.get("from") packet.get("message_text", None) # ack = packet.get("msgNo", "0") # see if there are any weather messages in the queue. msg = None LOG.info("Looking for a message") if not self.queue.empty(): msg = self.queue.get(timeout=1) else: try: msg = self.queue.get(timeout=30) except Exception: return "No Weewx Data" if not msg: return "No Weewx data" else: LOG.info(f"Got a message {msg}") # Wants format of 71.5F/54.0F Wind 1@49G7 54% if "outTemp_F" in msg: temperature = "{:0.2f}F".format(float(msg["outTemp_F"])) dewpoint = "{:0.2f}F".format(float(msg["dewpoint_F"])) else: temperature = "{:0.2f}C".format(float(msg["outTemp_C"])) dewpoint = "{:0.2f}C".format(float(msg["dewpoint_C"])) wind_direction = "{:0.0f}".format(float(msg.get("windDir", 0))) LOG.info(f"wind direction {wind_direction}") if "windSpeed_mps" in msg: wind_speed = "{:0.0f}".format(float(msg["windSpeed_mps"])) wind_gust = "{:0.0f}".format(float(msg["windGust_mps"])) else: wind_speed = "{:0.0f}".format(float(msg["windSpeed_mph"])) wind_gust = "{:0.0f}".format(float(msg["windGust_mph"])) wind = "{}@{}G{}".format( wind_speed, wind_direction, wind_gust, ) humidity = "{:0.0f}%".format(float(msg["outHumidity"])) ts = int("{:0.0f}".format(float(msg["dateTime"]))) ts = datetime.datetime.fromtimestamp(ts) # do rain in format of last hour/day/month/year rain = "RA {:.2f} {:.2f}/hr".format( float(msg.get("dayRain_in", 0.00)), float(msg.get("rainRate_inch_per_hour", 0.00)), ) wx = "WX: {}/{} Wind {} {} {} {:.2f}inHg".format( temperature, dewpoint, wind, humidity, rain, float(msg.get("pressure_inHg", 0.00)), ) LOG.debug( "Got weather {} -- len {}".format( wx, len(wx), ), ) return wx class WeewxMQTTThread(threads.APRSDThread): _mqtt_host = None _mqtt_port = None client = None def __init__(self, wx_queue, msg_queue): super().__init__("WeewxMQTTThread") self.msg_queue = msg_queue self.wx_queue = wx_queue self.setup() def setup(self): LOG.info("Creating mqtt client") self._mqtt_host = CONF.aprsd_weewx_plugin.mqtt_host self._mqtt_port = CONF.aprsd_weewx_plugin.mqtt_port LOG.info( "Connecting to mqtt {}:{}".format( self._mqtt_host, self._mqtt_port, ), ) self.client = mqtt.Client(client_id="WeewxMQTTPlugin") self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message self.client.connect(self._mqtt_host, self._mqtt_port, 60) if CONF.aprsd_weewx_plugin.mqtt_user: username = CONF.aprsd_weewx_plugin.mqtt_user password = CONF.aprsd_weewx_plugin.mqtt_password LOG.info(f"Using MQTT username/password {username}/XXXXXX") self.client.username_pw_set( username=username, password=password, ) else: LOG.info("Not using MQTT username/password") def on_connect(self, client, userdata, flags, rc): LOG.info(f"Connected to MQTT {self._mqtt_host} ({rc})") client.subscribe("weather/loop") def on_disconnect(self, client, userdata, rc): LOG.info(f"Disconnected from MQTT {self._mqtt_host} ({rc})") def on_message(self, client, userdata, msg): wx_data = json.loads(msg.payload) LOG.debug("Got WX data") # Make sure we have only 1 item in the queue if self.msg_queue.qsize() >= 1: self.msg_queue.clear() self.msg_queue.put(wx_data) self.wx_queue.clear() self.wx_queue.put(wx_data) def stop(self): LOG.info(__class__.__name__+" Stop") self.thread_stop = True LOG.info("Stopping loop") self.client.loop_stop() LOG.info("Disconnecting from MQTT") self.client.disconnect() def loop(self): LOG.info("Loop") self.client.loop_forever() # self.client.loop(timeout=10, max_packets=10) return True class WeewxWXAPRSThread(threads.APRSDThread): def __init__(self, wx_queue): super().__init__(self.__class__.__name__) self.wx_queue = wx_queue self.latitude = CONF.aprsd_weewx_plugin.latitude self.longitude = CONF.aprsd_weewx_plugin.longitude self.callsign = CONF.callsign self.report_interval = CONF.aprsd_weewx_plugin.report_interval self.last_send = datetime.datetime.now() if self.latitude and self.longitude: self.position = self.get_latlon( float(self.latitude), float(self.longitude), ) def decdeg2dmm_m(self, degrees_decimal): is_positive = degrees_decimal >= 0 degrees_decimal = abs(degrees_decimal) minutes, seconds = divmod(degrees_decimal * 3600, 60) degrees, minutes = divmod(minutes, 60) degrees = degrees if is_positive else -degrees # degrees = str(int(degrees)).zfill(2).replace("-", "0") degrees = abs(int(degrees)) # minutes = str(round(minutes + (seconds / 60), 2)).zfill(5) minutes = int(round(minutes + (seconds / 60), 2)) hundredths = round(seconds / 60, 2) return { "degrees": degrees, "minutes": minutes, "seconds": seconds, "hundredths": hundredths, } def convert_latitude(self, degrees_decimal): det = self.decdeg2dmm_m(degrees_decimal) if degrees_decimal > 0: direction = "N" else: direction = "S" degrees = str(det.get("degrees")).zfill(2) minutes = str(det.get("minutes")).zfill(2) det.get("seconds") hundredths = str(det.get("hundredths")).split(".")[1] lat = f"{degrees}{str(minutes)}.{hundredths}{direction}" return lat def convert_longitude(self, degrees_decimal): det = self.decdeg2dmm_m(degrees_decimal) if degrees_decimal > 0: direction = "E" else: direction = "W" degrees = str(det.get("degrees")).zfill(3) minutes = str(det.get("minutes")).zfill(2) det.get("seconds") hundredths = str(det.get("hundredths")).split(".")[1] lon = f"{degrees}{str(minutes)}.{hundredths}{direction}" return lon def get_latlon(self, latitude_str, longitude_str): return "{}/{}_".format( self.convert_latitude(float(latitude_str)), self.convert_longitude(float(longitude_str)), ) def str_or_dots(self, number, length): # If parameter is None, fill with dots, otherwise pad with zero # if not number: # retn_value = "." * length # else: format_type = {"int": "d", "float": ".0f"}[type(number).__name__] retn_value = "".join(("%0", str(length), format_type)) % number return retn_value def build_wx_packet(self, wx_data): wind_dir = float(wx_data.get("windDir", 0.00)) wind_speed = float(wx_data.get("windSpeed_mph", 0.00)) wind_gust = float(wx_data.get("windGust_mph", 0.00)) temperature = float(wx_data.get("outTemp_F", 0.00)) rain_last_hr = float(wx_data.get("hourRain_in", 0.00)) rain_last_24_hrs = float(wx_data.get("rain24_in", 0.00)) rain_since_midnight = float(wx_data.get("day_Rain_in", 0.00)) humidity = float(wx_data.get("outHumidity", 0.00)) # * 330.863886667 # inHg * 33.8639 = mBar pressure = float(wx_data.get("pressure_inHg", 0.00)) * 33.8639 * 10 return core.WeatherPacket( from_call=self.callsign, to_call="APRS", latitude=self.convert_latitude(float(self.latitude)), longitude=self.convert_longitude(float(self.longitude)), wind_direction=int(wind_dir), wind_speed=wind_speed, wind_gust=wind_gust, temperature=temperature, rain_1h=rain_last_hr, rain_24h=rain_last_24_hrs, rain_since_midnight=rain_since_midnight, humidity=int(round(humidity)), pressure=pressure, comment="APRSD WX http://pypi.org/project/aprsd", ) def loop(self): now = datetime.datetime.now() delta = now - self.last_send max_timeout = {"seconds": self.report_interval} max_delta = datetime.timedelta(**max_timeout) if delta >= max_delta: if not self.wx_queue.empty(): wx = self.wx_queue.get(timeout=1) else: try: wx = self.wx_queue.get(timeout=5) except Exception: time.sleep(1) return True if not wx: # just keep looping time.sleep(1) return True # we have Weather now, so lets format the data # and then send it out to APRS packet = self.build_wx_packet(wx) packet.retry_count = 1 tx.send(packet) self.last_send = datetime.datetime.now() time.sleep(1) return True else: time.sleep(1) return True