Updated from first repo
This commit is contained in:
parent
c6ffb44c00
commit
874315cdfb
|
@ -1,54 +1,161 @@
|
||||||
|
"""Main module."""
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import queue
|
||||||
|
|
||||||
from aprsd import messaging, plugin, trace
|
import paho.mqtt.client as mqtt
|
||||||
|
from aprsd import plugin, threads, trace, utils
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger("APRSD")
|
LOG = logging.getLogger("APRSD")
|
||||||
|
|
||||||
|
mqtt_queue = queue.Queue(maxsize=20)
|
||||||
|
|
||||||
|
|
||||||
class WeewxMQTTPlugin(plugin.APRSDRegexCommandPluginBase):
|
class WeewxMQTTPlugin(plugin.APRSDRegexCommandPluginBase):
|
||||||
|
"""Weather
|
||||||
|
|
||||||
|
Syntax of request
|
||||||
|
|
||||||
|
weather
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
version = "1.0"
|
version = "1.0"
|
||||||
# Look for any command that starts with w or W
|
|
||||||
command_regex = "^[wW]"
|
command_regex = "^[wW]"
|
||||||
# the command is for ?
|
|
||||||
command_name = "weather"
|
command_name = "weather"
|
||||||
|
|
||||||
enabled = False
|
enabled = False
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
# Do some checks here?
|
"""Ensure that the plugin has been configured."""
|
||||||
self.enabled = True
|
try:
|
||||||
|
LOG.info("Looking for weewx.mqtt.host config entry")
|
||||||
|
utils.check_config_option(self.config, ["services", "weewx", "mqtt", "host"])
|
||||||
|
self.enabled = True
|
||||||
|
except Exception as ex:
|
||||||
|
LOG.error(f"Failed to find config weewx:mqtt:host {ex}")
|
||||||
|
LOG.info("Disabling the weewx mqtt subsription thread.")
|
||||||
|
self.enabled = False
|
||||||
|
|
||||||
def create_threads(self):
|
def create_threads(self):
|
||||||
"""This allows you to create and return a custom APRSDThread object.
|
|
||||||
|
|
||||||
Create a child of the aprsd.threads.APRSDThread object and return it
|
|
||||||
It will automatically get started.
|
|
||||||
|
|
||||||
You can see an example of one here:
|
|
||||||
https://github.com/craigerl/aprsd/blob/master/aprsd/threads.py#L141
|
|
||||||
"""
|
|
||||||
if self.enabled:
|
if self.enabled:
|
||||||
# You can create a background APRSDThread object here
|
LOG.info("Creating WeewxMQTTThread")
|
||||||
# Just return it for example:
|
return WeewxMQTTThread(
|
||||||
# https://github.com/hemna/aprsd-weewx-plugin/blob/master/aprsd_weewx_plugin/aprsd_weewx_plugin.py#L42-L50
|
msg_queues=mqtt_queue,
|
||||||
#
|
config=self.config,
|
||||||
return []
|
)
|
||||||
|
else:
|
||||||
|
LOG.info("WeewxMQTTPlugin not enabled due to missing config.")
|
||||||
|
|
||||||
@trace.trace
|
@trace.trace
|
||||||
def process(self, packet):
|
def process(self, packet):
|
||||||
|
LOG.info("WeewxMQTT Plugin")
|
||||||
"""This is called when a received packet matches self.command_regex."""
|
|
||||||
|
|
||||||
LOG.info("WeewxMQTTPlugin Plugin")
|
|
||||||
|
|
||||||
packet.get("from")
|
packet.get("from")
|
||||||
packet.get("message_text", None)
|
packet.get("message_text", None)
|
||||||
|
# ack = packet.get("msgNo", "0")
|
||||||
|
|
||||||
if self.enabled:
|
if self.enabled:
|
||||||
# Now we can process
|
# see if there are any weather messages in the queue.
|
||||||
return "some reply message"
|
msg = None
|
||||||
|
LOG.info("Looking for a message")
|
||||||
|
if not mqtt_queue.empty():
|
||||||
|
msg = mqtt_queue.get(timeout=1)
|
||||||
|
else:
|
||||||
|
msg = mqtt_queue.get(timeout=30)
|
||||||
|
|
||||||
|
if not msg:
|
||||||
|
return "No Weewx data"
|
||||||
|
else:
|
||||||
|
LOG.info(f"Got a message {msg}")
|
||||||
|
# Wants format of 71.5F/54.0F Wind 1@49G7 54%
|
||||||
|
if "outTemp_F" in msg:
|
||||||
|
temperature = "{:0.2f}F".format(float(msg["outTemp_F"]))
|
||||||
|
dewpoint = "{:0.2f}F".format(float(msg["dewpoint_F"]))
|
||||||
|
else:
|
||||||
|
temperature = "{:0.2f}C".format(float(msg["outTemp_C"]))
|
||||||
|
dewpoint = "{:0.2f}C".format(float(msg["dewpoint_C"]))
|
||||||
|
|
||||||
|
wind_direction = "{:0.0f}".format(float(msg["windDir"]))
|
||||||
|
if "windSpeed_mps" in msg:
|
||||||
|
wind_speed = "{:0.0f}".format(float(msg["windSpeed_mps"]))
|
||||||
|
wind_gust = "{:0.0f}".format(float(msg["windGust_mps"]))
|
||||||
|
else:
|
||||||
|
wind_speed = "{:0.0f}".format(float(msg["windSpeed_mph"]))
|
||||||
|
wind_gust = "{:0.0f}".format(float(msg["windGust_mph"]))
|
||||||
|
|
||||||
|
wind = "{}@{}G{}".format(
|
||||||
|
wind_speed,
|
||||||
|
wind_direction,
|
||||||
|
wind_gust,
|
||||||
|
)
|
||||||
|
|
||||||
|
humidity = "{:0.0f}%".format(float(msg["outHumidity"]))
|
||||||
|
|
||||||
|
ts = int("{:0.0f}".format(float(msg["dateTime"])))
|
||||||
|
ts = datetime.datetime.fromtimestamp(ts)
|
||||||
|
|
||||||
|
wx = "{} {}/{} Wind {} {}".format(
|
||||||
|
ts,
|
||||||
|
temperature,
|
||||||
|
dewpoint,
|
||||||
|
wind,
|
||||||
|
humidity,
|
||||||
|
)
|
||||||
|
LOG.debug(
|
||||||
|
"Got weather {} -- len {}".format(
|
||||||
|
wx,
|
||||||
|
len(wx),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return wx
|
||||||
|
|
||||||
else:
|
else:
|
||||||
LOG.warning("WeewxMQTTPlugin is disabled.")
|
return "WeewxMQTT Not enabled"
|
||||||
return messaging.NULL
|
|
||||||
|
|
||||||
|
class WeewxMQTTThread(threads.APRSDThread):
|
||||||
|
def __init__(self, msg_queues, config):
|
||||||
|
super().__init__("WeewxMQTTThread")
|
||||||
|
self.msg_queues = msg_queues
|
||||||
|
self.config = config
|
||||||
|
self.setup()
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
LOG.info("Creating mqtt client")
|
||||||
|
self._mqtt_host = self.config["services"]["weewx"]["mqtt"]["host"]
|
||||||
|
self._mqtt_port = self.config["services"]["weewx"]["mqtt"]["port"]
|
||||||
|
self._mqtt_user = self.config["services"]["weewx"]["mqtt"]["user"]
|
||||||
|
self._mqtt_pass = self.config["services"]["weewx"]["mqtt"]["password"]
|
||||||
|
LOG.info(
|
||||||
|
"Connecting to mqtt {}:XXXX@{}:{}".format(
|
||||||
|
self._mqtt_user,
|
||||||
|
self._mqtt_host,
|
||||||
|
self._mqtt_port,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
self.client = mqtt.Client(client_id="WeewxMQTTPlugin")
|
||||||
|
self.client.on_connect = self.on_connect
|
||||||
|
self.client.on_message = self.on_message
|
||||||
|
self.client.connect(self._mqtt_host, self._mqtt_port, 60)
|
||||||
|
self.client.username_pw_set(username="hemna", password="ass")
|
||||||
|
|
||||||
|
def on_connect(self, client, userdata, flags, rc):
|
||||||
|
LOG.info(f"Connected to MQTT {self._mqtt_host} ({rc})")
|
||||||
|
client.subscribe("weather/loop")
|
||||||
|
|
||||||
|
def on_message(self, client, userdata, msg):
|
||||||
|
wx_data = json.loads(msg.payload)
|
||||||
|
LOG.debug(f"Got WX data {wx_data}")
|
||||||
|
mqtt_queue.put(wx_data)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
LOG.info("calling disconnect")
|
||||||
|
self.thread_stop = True
|
||||||
|
self.client.disconnect()
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
LOG.info("Looping bitch")
|
||||||
|
self.client.loop_forever()
|
||||||
|
return True
|
||||||
|
|
Loading…
Reference in New Issue