mirror of
				https://github.com/f4exb/sdrangel.git
				synced 2025-10-23 00:50:21 -04:00 
			
		
		
		
	
		
			
	
	
		
			305 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			305 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|  | #!/usr/bin/env python3 | ||
|  | """
 | ||
|  | Control PlutoDVB2 enabled Pluto Tx switchover watching UDP status of a DATV demodulator channel | ||
|  | This is a way to implement a DATV repeater system | ||
|  | Uses PlutoDVB2 MQTT "mute" command to switch on/off the Tx | ||
|  | """
 | ||
|  | from optparse import OptionParser # pylint: disable=deprecated-module | ||
|  | 
 | ||
|  | import json | ||
|  | import os | ||
|  | import socket | ||
|  | import signal | ||
|  | import traceback | ||
|  | import sys | ||
|  | import time | ||
|  | import requests | ||
|  | import paho.mqtt.client as mqttclient | ||
|  | import logging | ||
|  | 
 | ||
|  | logger = logging.getLogger(__name__) | ||
|  | mqtt_client = None | ||
|  | pluto_state = {} | ||
|  | 
 | ||
|  | FEC_TABLE = { | ||
|  |     0: "1/2", | ||
|  |     1: "2/3", | ||
|  |     2: "4/6", | ||
|  |     3: "3/4", | ||
|  |     4: "5/6", | ||
|  |     5: "7/8", | ||
|  |     6: "4/5", | ||
|  |     7: "8/9", | ||
|  |     8: "9/10", | ||
|  |     9: "1/4", | ||
|  |     10: "1/3", | ||
|  |     11: "2/5", | ||
|  |     12: "3/5" | ||
|  | } | ||
|  | 
 | ||
|  | CONSTEL_TABLE = { | ||
|  |     0: "BPSK", | ||
|  |     1: "QPSK", | ||
|  |     2: "PSK8", | ||
|  |     3: "APSK16", | ||
|  |     4: "APSK32", | ||
|  |     5: "APSK64E", | ||
|  |     6: "QAM16", | ||
|  |     7: "QAM64", | ||
|  |     8: "QAM256" | ||
|  | } | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def signal_handler(signum, frame): | ||
|  |     """ Signal handler """ | ||
|  |     logger.info("Signal handler called with signal %d", signum) | ||
|  |     # Clean up and exit | ||
|  |     # Close MQTT connection | ||
|  |     if mqtt_client: | ||
|  |         mqtt_client.disconnect() | ||
|  |     sys.exit(0) | ||
|  | 
 | ||
|  | signal.signal(signal.SIGINT, signal_handler) | ||
|  | signal.signal(signal.SIGTERM, signal_handler) | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def get_input_options(): | ||
|  |     """ Parse options """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     parser = OptionParser(usage="usage: %%prog [-t]\n") | ||
|  |     parser.add_option("-a", "--sdr_address", dest="sdr_address", help="SDRangel address and port. Default: 127.0.0.1:8091", metavar="ADDRESS:PORT", type="string") | ||
|  |     parser.add_option("-d", "--device", dest="device_index", help="Index of device set. Default 0", metavar="INT", type="int") | ||
|  |     parser.add_option("-c", "--channel", dest="channel_index", help="Index of DATV demod channel. Default 0", metavar="INT", type="int") | ||
|  |     parser.add_option("-A", "--pluto_address", dest="pluto_address", help="Pluto MQTT address and port. Mandatory", metavar="ADDRESS", type="string") | ||
|  |     parser.add_option("-P", "--pluto_port", dest="pluto_port", help="Pluto MQTT port. Default 1883", metavar="INT", type="int") | ||
|  |     parser.add_option("-C", "--callsign", dest="callsign", help="Amateur Radio callsign", metavar="CALLSIGN", type="string") | ||
|  |     parser.add_option("-l", "--clone", dest="clone", help="Clone symbol rate, constellation and fec to Pluto", metavar="BOOL", action="store_true", default=False) | ||
|  | 
 | ||
|  |     (options, args) = parser.parse_args() | ||
|  |     if options.sdr_address is None: | ||
|  |         options.sdr_address = "127.0.0.1:8091" | ||
|  |     if options.device_index is None: | ||
|  |         options.device_index = 0 | ||
|  |     if options.channel_index is None: | ||
|  |         options.channel_index = 0 | ||
|  |     if options.pluto_port is None: | ||
|  |         options.pluto_port = 1883 | ||
|  |     if options.pluto_address is None: | ||
|  |         raise RuntimeError("Pluto address (-A or --pluto_address) is mandatory") | ||
|  |     if options.callsign is None: | ||
|  |         raise RuntimeError("Callsign (-C or --callsign) is mandatory") | ||
|  | 
 | ||
|  |     return options | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def pluto_state_change(state, value): | ||
|  |     """ Change Pluto state """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     global pluto_state | ||
|  |     if state not in pluto_state: | ||
|  |         pluto_state[state] = value | ||
|  |     else: | ||
|  |         pluto_state[state] = value | ||
|  |     logger.debug("Pluto state changed: %s = %s", state, value) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def connect_mqtt(options): | ||
|  |     """ Connect to Pluto MQTT broker """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     global mqtt_client | ||
|  |     mqtt_client = mqttclient.Client() | ||
|  |     mqtt_client.enable_logger(logger) | ||
|  |     mqtt_client.on_connect = on_connect | ||
|  |     mqtt_client.on_message = on_message | ||
|  |     mqtt_client.on_subscribe = on_subscribe | ||
|  | 
 | ||
|  |     # Connect to the MQTT broker | ||
|  |     logger.info("Connecting to Pluto MQTT broker at %s:%d", options.pluto_address, options.pluto_port) | ||
|  |     try: | ||
|  |         mqtt_client.connect(options.pluto_address, options.pluto_port, 60) | ||
|  |         mqtt_client.loop_start() | ||
|  |     except Exception as ex: | ||
|  |         raise RuntimeError(f"Failed to connect to Pluto MQTT broker: {ex}") | ||
|  |     logger.info("Connected to Pluto MQTT broker at %s:%d", options.pluto_address, options.pluto_port) | ||
|  |     return mqtt_client | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def on_connect(client, userdata, flags, rc): | ||
|  |     """ Callback for MQTT connection """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     logger.info("Connected to MQTT broker with result code %d", rc) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def on_message(client, userdata, msg): | ||
|  |     """ Callback for MQTT message """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     logger.debug("Received message on topic %s: %s", msg.topic, msg.payload) | ||
|  |     # Parse the message | ||
|  |     try: | ||
|  |         if msg.topic.endswith("/mute"): | ||
|  |             logger.debug("Tx mute message received") | ||
|  |             pluto_state_change("muted", msg.payload == b'1') | ||
|  |         elif msg.topic.endswith("/sr"): | ||
|  |             logger.debug("Tx symbol rate message received: %d", int(msg.payload)) | ||
|  |             pluto_state_change("symbol_rate", int(msg.payload)) | ||
|  |         elif msg.topic.endswith("/constel"): | ||
|  |             logger.debug("Tx constellation message received: %s", msg.payload.decode('utf-8')) | ||
|  |             pluto_state_change("constellation", msg.payload.decode('utf-8')) | ||
|  |         elif msg.topic.endswith("/fec"): | ||
|  |             logger.debug("Tx fec message received: %s", msg.payload.decode('utf-8')) | ||
|  |             pluto_state_change("fec", msg.payload.decode('utf-8')) | ||
|  |         else: | ||
|  |             message = json.loads(msg.payload) | ||
|  |             logger.debug("Message payload is JSON: %s", message) | ||
|  |     except json.JSONDecodeError as ex: | ||
|  |         logger.error("Failed to decode JSON message: %s", ex) | ||
|  |     except Exception as ex: | ||
|  |         logger.error("Failed to handle message: %s", ex) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def on_subscribe(client, userdata, mid, granted_qos): | ||
|  |     """ Callback for MQTT subscription """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     logger.info("Subscribed to topic with mid %d and granted QoS %s", mid, granted_qos) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def subscribe_to_pluto(options, mqtt_client): | ||
|  |     """ Subscribe to Pluto MQTT broker """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     if mqtt_client is None: | ||
|  |         raise RuntimeError("MQTT client is not connected") | ||
|  |     tx_base_topic = f"dt/pluto/{options.callsign.upper()}/tx" | ||
|  |     mute_dt_topic = f"{tx_base_topic}/mute" | ||
|  |     logger.info("Subscribing to topic %s", mute_dt_topic) | ||
|  |     mqtt_client.subscribe(mute_dt_topic) | ||
|  |     logger.info("Subscribed to topic %s", mute_dt_topic) | ||
|  |     if options.clone: | ||
|  |         # Subscribe to Pluto Tx symbol rate | ||
|  |         symbol_rate_topic = f"{tx_base_topic}/dvbs2/sr" | ||
|  |         logger.info("Subscribing to topic %s", symbol_rate_topic) | ||
|  |         mqtt_client.subscribe(symbol_rate_topic) | ||
|  |         # Subscribe to Pluto Tx constellation | ||
|  |         constellation_topic = f"{tx_base_topic}/dvbs2/constel" | ||
|  |         logger.info("Subscribing to topic %s", constellation_topic) | ||
|  |         mqtt_client.subscribe(constellation_topic) | ||
|  |         # Subscribe to Pluto Tx fec | ||
|  |         fec_topic = f"{tx_base_topic}/dvbs2/fec" | ||
|  |         logger.info("Subscribing to topic %s", fec_topic) | ||
|  |         mqtt_client.subscribe(fec_topic) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def mute_pluto_tx(options, mute): | ||
|  |     """ Mute or unmute Pluto Tx """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     global mqtt_client | ||
|  |     if mqtt_client is None: | ||
|  |         raise RuntimeError("MQTT client is not connected") | ||
|  |     topic = f"cmd/pluto/{options.callsign.upper()}/tx/mute" | ||
|  |     message = b'1' if mute else b'0' | ||
|  |     logger.info("Publishing message to topic %s: %s", topic, message) | ||
|  |     mqtt_client.publish(topic, message) | ||
|  |     logger.info("Published message to topic %s", topic) | ||
|  |     # Update Pluto state | ||
|  |     pluto_state_change("muted", mute) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def set_pluto_tx_dvbs2(options, symbol_rate, constellation, fec): | ||
|  |     """ Set Pluto Tx DVBS2 parameters """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     global mqtt_client | ||
|  |     if mqtt_client is None: | ||
|  |         raise RuntimeError("MQTT client is not connected") | ||
|  |     topic_dvbs2 = f"cmd/pluto/{options.callsign.upper()}/tx/dvbs2" | ||
|  |     topic_sr = f"{topic_dvbs2}/sr" | ||
|  |     topic_constel = f"{topic_dvbs2}/constel" | ||
|  |     topic_fec = f"{topic_dvbs2}/fec" | ||
|  |     logger.info("Publishing message to topic %s: %d", topic_sr, symbol_rate) | ||
|  |     mqtt_client.publish(topic_sr, symbol_rate) | ||
|  |     logger.info("Published message to topic %s", topic_sr) | ||
|  |     logger.info("Publishing message to topic %s: %s", topic_constel, constellation) | ||
|  |     mqtt_client.publish(topic_constel, constellation) | ||
|  |     logger.info("Published message to topic %s", topic_constel) | ||
|  |     logger.info("Publishing message to topic %s: %s", topic_fec, fec) | ||
|  |     mqtt_client.publish(topic_fec, fec) | ||
|  |     logger.info("Published message to topic %s", topic_fec) | ||
|  |     # Update Pluto state | ||
|  |     pluto_state_change("symbol_rate", symbol_rate) | ||
|  |     pluto_state_change("constellation", constellation) | ||
|  |     pluto_state_change("fec", fec) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def monitor_datv_demod(options): | ||
|  |     """ Monitor DATV demodulator channel and control Pluto Tx """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     # Check DATV demodulator channel status | ||
|  |     sdrangel_url = f"http://{options.sdr_address}/sdrangel" | ||
|  |     report_url = f"{sdrangel_url}/deviceset/{options.device_index}/channel/{options.channel_index}/report" | ||
|  |     response = requests.get(report_url) | ||
|  |     if response.status_code != 200: | ||
|  |         raise RuntimeError(f"Failed to read report at {report_url}") | ||
|  |     datv_channel_report = response.json().get("DATVDemodReport", None) | ||
|  |     if not datv_channel_report: | ||
|  |         raise RuntimeError(f"Failed to read DATV demodulator report at {report_url}") | ||
|  |     udp_running = datv_channel_report.get("udpRunning", None) | ||
|  |     if udp_running is None: | ||
|  |         raise RuntimeError(f"Failed to read udpRunning in {datv_channel_report}") | ||
|  |     logger.debug("DATV UDP: %d", udp_running) | ||
|  |     if "muted" in pluto_state and pluto_state["muted"] == udp_running or "muted" not in pluto_state: | ||
|  |         logger.info("Pluto Tx %s", "muted" if not udp_running else "unmuted") | ||
|  |         mute_pluto_tx(options, not udp_running) | ||
|  |         logger.info("Pluto state: %s", pluto_state) | ||
|  |     if options.clone and datv_channel_report.get("setByModcod", None): | ||
|  |         mod = datv_channel_report.get("modcodModulation", -1) | ||
|  |         logger.debug("DATV Modulation: %s", CONSTEL_TABLE.get(mod, "Unknown")) | ||
|  |         fec = datv_channel_report.get("modcodCodeRate", -1) | ||
|  |         logger.debug("DATV FEC: %s", FEC_TABLE.get(fec, "Unknown")) | ||
|  |         symbol_rate = datv_channel_report.get("symbolRate", 0) | ||
|  |         logger.debug("DATV Symbol Rate: %d", symbol_rate) | ||
|  |         if "symbol_rate" in pluto_state and pluto_state["symbol_rate"] == symbol_rate and \ | ||
|  |            "constellation" in pluto_state and pluto_state["constellation"].upper() == CONSTEL_TABLE.get(mod, "Unknown") and \ | ||
|  |            "fec" in pluto_state and pluto_state["fec"] == FEC_TABLE.get(fec, "Unknown"): | ||
|  |             logger.debug("Pluto Tx parameters unchanged") | ||
|  |         else: | ||
|  |             logger.info("Pluto Tx parameters changed") | ||
|  |             set_pluto_tx_dvbs2(options, symbol_rate, CONSTEL_TABLE.get(mod, "Unknown").lower(), FEC_TABLE.get(fec, "Unknown")) | ||
|  |             logger.info("Pluto state: %s", pluto_state) | ||
|  |     return | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | def main(): | ||
|  |     """ Main program """ | ||
|  | # ---------------------------------------------------------------------- | ||
|  |     try: | ||
|  |         FORMAT = '%(asctime)s %(levelname)s %(message)s' | ||
|  |         logging.basicConfig(format=FORMAT, level=logging.INFO) | ||
|  |         options = get_input_options() | ||
|  | 
 | ||
|  |         connect_mqtt(options) | ||
|  |         subscribe_to_pluto(options, mqtt_client) | ||
|  | 
 | ||
|  |         # Run forever | ||
|  |         logger.info("Start monitoring SDRangel channel %d:%d at %s", options.device_index, options.channel_index, options.sdr_address) | ||
|  |         while True: | ||
|  |             # Monitor DATV demodulator channel | ||
|  |             monitor_datv_demod(options) | ||
|  | 
 | ||
|  |             # Sleep for a while before checking again | ||
|  |             time.sleep(1) | ||
|  | 
 | ||
|  |     except Exception as ex: # pylint: disable=broad-except | ||
|  |         tb = traceback.format_exc() | ||
|  |         print(f"Exception caught {ex}") | ||
|  |         print(tb, file=sys.stderr) | ||
|  |         sys.exit(1) | ||
|  | 
 | ||
|  | # ====================================================================== | ||
|  | if __name__ == "__main__": | ||
|  |     main() |