From 874315cdfb32fa28af16fb61d8759a262b80dcf5 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 14 Sep 2021 14:22:16 -0400 Subject: [PATCH] Updated from first repo --- AUTHORS | 1 + ChangeLog | 7 + aprsd_weewx_plugin/aprsd_weewx_plugin.py | 161 +++++++++++++++++++---- 3 files changed, 142 insertions(+), 27 deletions(-) create mode 100644 AUTHORS create mode 100644 ChangeLog diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..cc71ac7 --- /dev/null +++ b/AUTHORS @@ -0,0 +1 @@ +Hemna diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000..5ebd906 --- /dev/null +++ b/ChangeLog @@ -0,0 +1,7 @@ +CHANGES +======= + +v0.1.2 +------ + +* Initial commit diff --git a/aprsd_weewx_plugin/aprsd_weewx_plugin.py b/aprsd_weewx_plugin/aprsd_weewx_plugin.py index 3ea2e69..159222f 100644 --- a/aprsd_weewx_plugin/aprsd_weewx_plugin.py +++ b/aprsd_weewx_plugin/aprsd_weewx_plugin.py @@ -1,54 +1,161 @@ +"""Main module.""" +import datetime +import json import logging +import queue -from aprsd import messaging, plugin, trace +import paho.mqtt.client as mqtt +from aprsd import plugin, threads, trace, utils LOG = logging.getLogger("APRSD") +mqtt_queue = queue.Queue(maxsize=20) + class WeewxMQTTPlugin(plugin.APRSDRegexCommandPluginBase): + """Weather + + Syntax of request + + weather + + """ version = "1.0" - # Look for any command that starts with w or W command_regex = "^[wW]" - # the command is for ? command_name = "weather" enabled = False def setup(self): - # Do some checks here? - self.enabled = True + """Ensure that the plugin has been configured.""" + try: + LOG.info("Looking for weewx.mqtt.host config entry") + utils.check_config_option(self.config, ["services", "weewx", "mqtt", "host"]) + self.enabled = True + except Exception as ex: + LOG.error(f"Failed to find config weewx:mqtt:host {ex}") + LOG.info("Disabling the weewx mqtt subsription thread.") + self.enabled = False def create_threads(self): - """This allows you to create and return a custom APRSDThread object. - - Create a child of the aprsd.threads.APRSDThread object and return it - It will automatically get started. - - You can see an example of one here: - https://github.com/craigerl/aprsd/blob/master/aprsd/threads.py#L141 - """ if self.enabled: - # You can create a background APRSDThread object here - # Just return it for example: - # https://github.com/hemna/aprsd-weewx-plugin/blob/master/aprsd_weewx_plugin/aprsd_weewx_plugin.py#L42-L50 - # - return [] + LOG.info("Creating WeewxMQTTThread") + return WeewxMQTTThread( + msg_queues=mqtt_queue, + config=self.config, + ) + else: + LOG.info("WeewxMQTTPlugin not enabled due to missing config.") @trace.trace def process(self, packet): - - """This is called when a received packet matches self.command_regex.""" - - LOG.info("WeewxMQTTPlugin Plugin") - + LOG.info("WeewxMQTT Plugin") packet.get("from") packet.get("message_text", None) + # ack = packet.get("msgNo", "0") if self.enabled: - # Now we can process - return "some reply message" + # see if there are any weather messages in the queue. + msg = None + LOG.info("Looking for a message") + if not mqtt_queue.empty(): + msg = mqtt_queue.get(timeout=1) + else: + msg = mqtt_queue.get(timeout=30) + + if not msg: + return "No Weewx data" + else: + LOG.info(f"Got a message {msg}") + # Wants format of 71.5F/54.0F Wind 1@49G7 54% + if "outTemp_F" in msg: + temperature = "{:0.2f}F".format(float(msg["outTemp_F"])) + dewpoint = "{:0.2f}F".format(float(msg["dewpoint_F"])) + else: + temperature = "{:0.2f}C".format(float(msg["outTemp_C"])) + dewpoint = "{:0.2f}C".format(float(msg["dewpoint_C"])) + + wind_direction = "{:0.0f}".format(float(msg["windDir"])) + if "windSpeed_mps" in msg: + wind_speed = "{:0.0f}".format(float(msg["windSpeed_mps"])) + wind_gust = "{:0.0f}".format(float(msg["windGust_mps"])) + else: + wind_speed = "{:0.0f}".format(float(msg["windSpeed_mph"])) + wind_gust = "{:0.0f}".format(float(msg["windGust_mph"])) + + wind = "{}@{}G{}".format( + wind_speed, + wind_direction, + wind_gust, + ) + + humidity = "{:0.0f}%".format(float(msg["outHumidity"])) + + ts = int("{:0.0f}".format(float(msg["dateTime"]))) + ts = datetime.datetime.fromtimestamp(ts) + + wx = "{} {}/{} Wind {} {}".format( + ts, + temperature, + dewpoint, + wind, + humidity, + ) + LOG.debug( + "Got weather {} -- len {}".format( + wx, + len(wx), + ), + ) + return wx + else: - LOG.warning("WeewxMQTTPlugin is disabled.") - return messaging.NULL + return "WeewxMQTT Not enabled" + + +class WeewxMQTTThread(threads.APRSDThread): + def __init__(self, msg_queues, config): + super().__init__("WeewxMQTTThread") + self.msg_queues = msg_queues + self.config = config + self.setup() + + def setup(self): + LOG.info("Creating mqtt client") + self._mqtt_host = self.config["services"]["weewx"]["mqtt"]["host"] + self._mqtt_port = self.config["services"]["weewx"]["mqtt"]["port"] + self._mqtt_user = self.config["services"]["weewx"]["mqtt"]["user"] + self._mqtt_pass = self.config["services"]["weewx"]["mqtt"]["password"] + LOG.info( + "Connecting to mqtt {}:XXXX@{}:{}".format( + self._mqtt_user, + self._mqtt_host, + self._mqtt_port, + ), + ) + self.client = mqtt.Client(client_id="WeewxMQTTPlugin") + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + self.client.connect(self._mqtt_host, self._mqtt_port, 60) + self.client.username_pw_set(username="hemna", password="ass") + + def on_connect(self, client, userdata, flags, rc): + LOG.info(f"Connected to MQTT {self._mqtt_host} ({rc})") + client.subscribe("weather/loop") + + def on_message(self, client, userdata, msg): + wx_data = json.loads(msg.payload) + LOG.debug(f"Got WX data {wx_data}") + mqtt_queue.put(wx_data) + + def stop(self): + LOG.info("calling disconnect") + self.thread_stop = True + self.client.disconnect() + + def loop(self): + LOG.info("Looping bitch") + self.client.loop_forever() + return True