From df287e8f29f5658da7072ea17c34aadf209d99f2 Mon Sep 17 00:00:00 2001 From: f4exb Date: Sat, 26 Mar 2022 05:37:54 +0100 Subject: [PATCH] AIS and Radiosonde: implemented new message pipes. Part of #1154 --- plugins/channelrx/demodais/aisdemod.cpp | 15 +-- plugins/channelrx/demodais/aisdemodgui.cpp | 15 +-- .../demodradiosonde/radiosondedemod.cpp | 17 ++- .../demodradiosonde/radiosondedemodgui.cpp | 15 +-- plugins/feature/ais/ais.cpp | 104 ++++++++++++++++-- plugins/feature/ais/ais.h | 10 +- plugins/feature/radiosonde/radiosonde.cpp | 104 ++++++++++++++++-- plugins/feature/radiosonde/radiosonde.h | 10 +- 8 files changed, 221 insertions(+), 69 deletions(-) diff --git a/plugins/channelrx/demodais/aisdemod.cpp b/plugins/channelrx/demodais/aisdemod.cpp index df7c9e779..c24d3d601 100644 --- a/plugins/channelrx/demodais/aisdemod.cpp +++ b/plugins/channelrx/demodais/aisdemod.cpp @@ -163,18 +163,15 @@ bool AISDemod::handleMessage(const Message& cmd) getMessageQueueToGUI()->push(msg); } - MessagePipesLegacy& messagePipes = MainCore::instance()->getMessagePipesLegacy(); + QList aisPipes; + MainCore::instance()->getMessagePipes().getMessagePipes(this, "ais", aisPipes); // Forward to AIS feature - QList *aisMessageQueues = messagePipes.getMessageQueues(this, "ais"); - if (aisMessageQueues) + for (const auto& pipe : aisPipes) { - QList::iterator it = aisMessageQueues->begin(); - for (; it != aisMessageQueues->end(); ++it) - { - MainCore::MsgPacket *msg = MainCore::MsgPacket::create(this, report.getMessage(), report.getDateTime()); - (*it)->push(msg); - } + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + MainCore::MsgPacket *msg = MainCore::MsgPacket::create(this, report.getMessage(), report.getDateTime()); + messageQueue->push(msg); } // Forward via UDP diff --git a/plugins/channelrx/demodais/aisdemodgui.cpp b/plugins/channelrx/demodais/aisdemodgui.cpp index ff2372f53..0f167d629 100644 --- a/plugins/channelrx/demodais/aisdemodgui.cpp +++ b/plugins/channelrx/demodais/aisdemodgui.cpp @@ -709,8 +709,8 @@ void AISDemodGUI::on_logOpen_clicked() bool cancelled = false; QStringList cols; - MessagePipesLegacy& messagePipes = MainCore::instance()->getMessagePipesLegacy(); - QList *aisMessageQueues = messagePipes.getMessageQueues(m_aisDemod, "ais"); + QList aisPipes; + MainCore::instance()->getMessagePipes().getMessagePipes(this, "ais", aisPipes); while (!cancelled && CSV::readRow(in, &cols)) { @@ -725,14 +725,11 @@ void AISDemodGUI::on_logOpen_clicked() messageReceived(bytes, dateTime); // Forward to AIS feature - if (aisMessageQueues) + for (const auto& pipe : aisPipes) { - QList::iterator it = aisMessageQueues->begin(); - for (; it != aisMessageQueues->end(); ++it) - { - MainCore::MsgPacket *msg = MainCore::MsgPacket::create(m_aisDemod, bytes, dateTime); - (*it)->push(msg); - } + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + MainCore::MsgPacket *msg = MainCore::MsgPacket::create(m_aisDemod, bytes, dateTime); + messageQueue->push(msg); } if (count % 1000 == 0) diff --git a/plugins/channelrx/demodradiosonde/radiosondedemod.cpp b/plugins/channelrx/demodradiosonde/radiosondedemod.cpp index 36fe6a1ab..66bbbb146 100644 --- a/plugins/channelrx/demodradiosonde/radiosondedemod.cpp +++ b/plugins/channelrx/demodradiosonde/radiosondedemod.cpp @@ -178,18 +178,15 @@ bool RadiosondeDemod::handleMessage(const Message& cmd) getMessageQueueToGUI()->push(msg); } - MessagePipesLegacy& messagePipes = MainCore::instance()->getMessagePipesLegacy(); - // Forward to Radiosonde feature - QList *radiosondeMessageQueues = messagePipes.getMessageQueues(this, "radiosonde"); - if (radiosondeMessageQueues) + QList radiosondePipes; + MainCore::instance()->getMessagePipes().getMessagePipes(this, "radiosonde", radiosondePipes); + + for (const auto& pipe : radiosondePipes) { - QList::iterator it = radiosondeMessageQueues->begin(); - for (; it != radiosondeMessageQueues->end(); ++it) - { - MainCore::MsgPacket *msg = MainCore::MsgPacket::create(this, report.getMessage(), report.getDateTime()); - (*it)->push(msg); - } + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + MainCore::MsgPacket *msg = MainCore::MsgPacket::create(this, report.getMessage(), report.getDateTime()); + messageQueue->push(msg); } // Forward via UDP diff --git a/plugins/channelrx/demodradiosonde/radiosondedemodgui.cpp b/plugins/channelrx/demodradiosonde/radiosondedemodgui.cpp index cb7773eca..4481ad3ed 100644 --- a/plugins/channelrx/demodradiosonde/radiosondedemodgui.cpp +++ b/plugins/channelrx/demodradiosonde/radiosondedemodgui.cpp @@ -846,8 +846,8 @@ void RadiosondeDemodGUI::on_logOpen_clicked() bool cancelled = false; QStringList cols; - MessagePipesLegacy& framePipes = MainCore::instance()->getMessagePipesLegacy(); - QList *radiosondeMessageQueues = framePipes.getMessageQueues(m_radiosondeDemod, "radiosonde"); + QList radiosondePipes; + MainCore::instance()->getMessagePipes().getMessagePipes(this, "radiosonde", radiosondePipes); while (!cancelled && CSV::readRow(in, &cols)) { @@ -862,14 +862,11 @@ void RadiosondeDemodGUI::on_logOpen_clicked() frameReceived(bytes, dateTime, 0, 0); // Forward to Radiosonde feature - if (radiosondeMessageQueues) + for (const auto& pipe : radiosondePipes) { - QList::iterator it = radiosondeMessageQueues->begin(); - for (; it != radiosondeMessageQueues->end(); ++it) - { - MainCore::MsgPacket *msg = MainCore::MsgPacket::create(m_radiosondeDemod, bytes, dateTime); - (*it)->push(msg); - } + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + MainCore::MsgPacket *msg = MainCore::MsgPacket::create(m_radiosondeDemod, bytes, dateTime); + messageQueue->push(msg); } if (count % 100 == 0) diff --git a/plugins/feature/ais/ais.cpp b/plugins/feature/ais/ais.cpp index 066324f47..d2e033f3b 100644 --- a/plugins/feature/ais/ais.cpp +++ b/plugins/feature/ais/ais.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include "SWGFeatureSettings.h" #include "SWGFeatureReport.h" @@ -48,8 +47,6 @@ AIS::AIS(WebAPIAdapterInterface *webAPIAdapterInterface) : setObjectName(m_featureId); m_state = StIdle; m_errorMessage = "AIS error"; - connect(&m_updatePipesTimer, SIGNAL(timeout()), this, SLOT(updatePipes())); - m_updatePipesTimer.start(1000); m_networkManager = new QNetworkAccessManager(); QObject::connect( m_networkManager, @@ -57,10 +54,13 @@ AIS::AIS(WebAPIAdapterInterface *webAPIAdapterInterface) : this, &AIS::networkManagerFinished ); + scanAvailableChannels(); + connect(MainCore::instance(), SIGNAL(channelAdded(int, ChannelAPI*)), this, SLOT(handleChannelAdded(int, ChannelAPI*))); } AIS::~AIS() { + disconnect(MainCore::instance(), SIGNAL(channelAdded(int, ChannelAPI*)), this, SLOT(handleChannelAdded(int, ChannelAPI*))); QObject::disconnect( m_networkManager, &QNetworkAccessManager::finished, @@ -108,15 +108,6 @@ bool AIS::handleMessage(const Message& cmd) } } -void AIS::updatePipes() -{ - QList availablePipes = updateAvailablePipeSources("ais", AISSettings::m_pipeTypes, AISSettings::m_pipeURIs, this); - - if (availablePipes != m_availablePipes) { - m_availablePipes = availablePipes; - } -} - QByteArray AIS::serialize() const { return m_settings.serialize(); @@ -335,3 +326,92 @@ void AIS::networkManagerFinished(QNetworkReply *reply) reply->deleteLater(); } + +void AIS::scanAvailableChannels() +{ + MainCore *mainCore = MainCore::instance(); + MessagePipes& messagePipes = mainCore->getMessagePipes(); + std::vector& deviceSets = mainCore->getDeviceSets(); + m_availableChannels.clear(); + + for (const auto& deviceSet : deviceSets) + { + DSPDeviceSourceEngine *deviceSourceEngine = deviceSet->m_deviceSourceEngine; + + if (deviceSourceEngine) + { + for (int chi = 0; chi < deviceSet->getNumberOfChannels(); chi++) + { + ChannelAPI *channel = deviceSet->getChannelAt(chi); + + if ((channel->getURI() == "sdrangel.channel.aisdemod") && !m_availableChannels.contains(channel)) + { + qDebug("AIS::scanAvailableChannels: register %d:%d (%p)", deviceSet->getIndex(), chi, channel); + ObjectPipe *pipe = messagePipes.registerProducerToConsumer(channel, this, "ais"); + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + this, + [=](){ this->handleChannelMessageQueue(messageQueue); }, + Qt::QueuedConnection + ); + connect(pipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleMessagePipeToBeDeleted(int, QObject*))); + m_availableChannels.insert(channel); + } + } + } + } +} + +void AIS::handleChannelAdded(int deviceSetIndex, ChannelAPI *channel) +{ + qDebug("AIS::handleChannelAdded: deviceSetIndex: %d:%d channel: %s (%p)", + deviceSetIndex, channel->getIndexInDeviceSet(), qPrintable(channel->getURI()), channel); + std::vector& deviceSets = MainCore::instance()->getDeviceSets(); + DeviceSet *deviceSet = deviceSets[deviceSetIndex]; + DSPDeviceSourceEngine *deviceSourceEngine = deviceSet->m_deviceSourceEngine; + + if (deviceSourceEngine && (channel->getURI() == "sdrangel.channel.aisdemod")) + { + if (!m_availableChannels.contains(channel)) + { + MessagePipes& messagePipes = MainCore::instance()->getMessagePipes(); + ObjectPipe *pipe = messagePipes.registerProducerToConsumer(channel, this, "ais"); + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + this, + [=](){ this->handleChannelMessageQueue(messageQueue); }, + Qt::QueuedConnection + ); + connect(pipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleMessagePipeToBeDeleted(int, QObject*))); + m_availableChannels.insert(channel); + } + } +} + +void AIS::handleMessagePipeToBeDeleted(int reason, QObject* object) +{ + if (reason == 0) // producer (channel) + { + if (m_availableChannels.contains((ChannelAPI*) object)) + { + qDebug("AIS::handleMessagePipeToBeDeleted: removing channel at (%p)", object); + m_availableChannels.remove((ChannelAPI*) object); + } + } +} + +void AIS::handleChannelMessageQueue(MessageQueue* messageQueue) +{ + Message* message; + + while ((message = messageQueue->pop()) != nullptr) + { + if (handleMessage(*message)) { + delete message; + } + } +} diff --git a/plugins/feature/ais/ais.h b/plugins/feature/ais/ais.h index 4e83fd970..59ce986e1 100644 --- a/plugins/feature/ais/ais.h +++ b/plugins/feature/ais/ais.h @@ -21,7 +21,7 @@ #include #include -#include +#include #include "feature/feature.h" #include "util/message.h" @@ -97,8 +97,7 @@ public: private: AISSettings m_settings; - QList m_availablePipes; - QTimer m_updatePipesTimer; + QSet m_availableChannels; QNetworkAccessManager *m_networkManager; QNetworkRequest m_networkRequest; @@ -107,10 +106,13 @@ private: void stop(); void applySettings(const AISSettings& settings, bool force = false); void webapiReverseSendSettings(QList& featureSettingsKeys, const AISSettings& settings, bool force); + void scanAvailableChannels(); private slots: - void updatePipes(); void networkManagerFinished(QNetworkReply *reply); + void handleChannelAdded(int deviceSetIndex, ChannelAPI *channel); + void handleMessagePipeToBeDeleted(int reason, QObject* object); + void handleChannelMessageQueue(MessageQueue* messageQueue); }; #endif // INCLUDE_FEATURE_AIS_H_ diff --git a/plugins/feature/radiosonde/radiosonde.cpp b/plugins/feature/radiosonde/radiosonde.cpp index 7216cb039..e7aa7aea1 100644 --- a/plugins/feature/radiosonde/radiosonde.cpp +++ b/plugins/feature/radiosonde/radiosonde.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include "SWGFeatureSettings.h" #include "SWGFeatureReport.h" @@ -48,8 +47,6 @@ Radiosonde::Radiosonde(WebAPIAdapterInterface *webAPIAdapterInterface) : setObjectName(m_featureId); m_state = StIdle; m_errorMessage = "Radiosonde error"; - connect(&m_updatePipesTimer, SIGNAL(timeout()), this, SLOT(updatePipes())); - m_updatePipesTimer.start(1000); m_networkManager = new QNetworkAccessManager(); QObject::connect( m_networkManager, @@ -57,10 +54,13 @@ Radiosonde::Radiosonde(WebAPIAdapterInterface *webAPIAdapterInterface) : this, &Radiosonde::networkManagerFinished ); + scanAvailableChannels(); + connect(MainCore::instance(), SIGNAL(channelAdded(int, ChannelAPI*)), this, SLOT(handleChannelAdded(int, ChannelAPI*))); } Radiosonde::~Radiosonde() { + disconnect(MainCore::instance(), SIGNAL(channelAdded(int, ChannelAPI*)), this, SLOT(handleChannelAdded(int, ChannelAPI*))); QObject::disconnect( m_networkManager, &QNetworkAccessManager::finished, @@ -108,15 +108,6 @@ bool Radiosonde::handleMessage(const Message& cmd) } } -void Radiosonde::updatePipes() -{ - QList availablePipes = updateAvailablePipeSources("radiosonde", RadiosondeSettings::m_pipeTypes, RadiosondeSettings::m_pipeURIs, this); - - if (availablePipes != m_availablePipes) { - m_availablePipes = availablePipes; - } -} - QByteArray Radiosonde::serialize() const { return m_settings.serialize(); @@ -335,3 +326,92 @@ void Radiosonde::networkManagerFinished(QNetworkReply *reply) reply->deleteLater(); } + +void Radiosonde::scanAvailableChannels() +{ + MainCore *mainCore = MainCore::instance(); + MessagePipes& messagePipes = mainCore->getMessagePipes(); + std::vector& deviceSets = mainCore->getDeviceSets(); + m_availableChannels.clear(); + + for (const auto& deviceSet : deviceSets) + { + DSPDeviceSourceEngine *deviceSourceEngine = deviceSet->m_deviceSourceEngine; + + if (deviceSourceEngine) + { + for (int chi = 0; chi < deviceSet->getNumberOfChannels(); chi++) + { + ChannelAPI *channel = deviceSet->getChannelAt(chi); + + if ((channel->getURI() == "sdrangel.channel.radiosondedemod") && !m_availableChannels.contains(channel)) + { + qDebug("Radiosonde::scanAvailableChannels: register %d:%d (%p)", deviceSet->getIndex(), chi, channel); + ObjectPipe *pipe = messagePipes.registerProducerToConsumer(channel, this, "radiosonde"); + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + this, + [=](){ this->handleChannelMessageQueue(messageQueue); }, + Qt::QueuedConnection + ); + connect(pipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleMessagePipeToBeDeleted(int, QObject*))); + m_availableChannels.insert(channel); + } + } + } + } +} + +void Radiosonde::handleChannelAdded(int deviceSetIndex, ChannelAPI *channel) +{ + qDebug("Radiosonde::handleChannelAdded: deviceSetIndex: %d:%d channel: %s (%p)", + deviceSetIndex, channel->getIndexInDeviceSet(), qPrintable(channel->getURI()), channel); + std::vector& deviceSets = MainCore::instance()->getDeviceSets(); + DeviceSet *deviceSet = deviceSets[deviceSetIndex]; + DSPDeviceSourceEngine *deviceSourceEngine = deviceSet->m_deviceSourceEngine; + + if (deviceSourceEngine && (channel->getURI() == "sdrangel.channel.radiosondedemod")) + { + if (!m_availableChannels.contains(channel)) + { + MessagePipes& messagePipes = MainCore::instance()->getMessagePipes(); + ObjectPipe *pipe = messagePipes.registerProducerToConsumer(channel, this, "radiosonde"); + MessageQueue *messageQueue = qobject_cast(pipe->m_element); + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + this, + [=](){ this->handleChannelMessageQueue(messageQueue); }, + Qt::QueuedConnection + ); + connect(pipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleMessagePipeToBeDeleted(int, QObject*))); + m_availableChannels.insert(channel); + } + } +} + +void Radiosonde::handleMessagePipeToBeDeleted(int reason, QObject* object) +{ + if (reason == 0) // producer (channel) + { + if (m_availableChannels.contains((ChannelAPI*) object)) + { + qDebug("Radiosonde::handleMessagePipeToBeDeleted: removing channel at (%p)", object); + m_availableChannels.remove((ChannelAPI*) object); + } + } +} + +void Radiosonde::handleChannelMessageQueue(MessageQueue* messageQueue) +{ + Message* message; + + while ((message = messageQueue->pop()) != nullptr) + { + if (handleMessage(*message)) { + delete message; + } + } +} diff --git a/plugins/feature/radiosonde/radiosonde.h b/plugins/feature/radiosonde/radiosonde.h index efbb344a6..d98521764 100644 --- a/plugins/feature/radiosonde/radiosonde.h +++ b/plugins/feature/radiosonde/radiosonde.h @@ -21,7 +21,7 @@ #include #include -#include +#include #include "feature/feature.h" #include "util/message.h" @@ -97,8 +97,7 @@ public: private: RadiosondeSettings m_settings; - QList m_availablePipes; - QTimer m_updatePipesTimer; + QSet m_availableChannels; QNetworkAccessManager *m_networkManager; QNetworkRequest m_networkRequest; @@ -107,10 +106,13 @@ private: void stop(); void applySettings(const RadiosondeSettings& settings, bool force = false); void webapiReverseSendSettings(QList& featureSettingsKeys, const RadiosondeSettings& settings, bool force); + void scanAvailableChannels(); private slots: - void updatePipes(); void networkManagerFinished(QNetworkReply *reply); + void handleChannelAdded(int deviceSetIndex, ChannelAPI *channel); + void handleMessagePipeToBeDeleted(int reason, QObject* object); + void handleChannelMessageQueue(MessageQueue* messageQueue); }; #endif // INCLUDE_FEATURE_RADIOSONDE_H_