From c2442d6b10dab6b896434a6db3c2aae42eea2cdf Mon Sep 17 00:00:00 2001 From: f4exb Date: Sat, 23 Jul 2022 12:25:03 +0200 Subject: [PATCH] SigMF file sink: applied new threading model. Part of #1346 --- .../channelrx/sigmffilesink/sigmffilesink.cpp | 130 +++++++++++++----- .../channelrx/sigmffilesink/sigmffilesink.h | 6 +- .../sigmffilesink/sigmffilesinkbaseband.cpp | 68 +++++---- .../sigmffilesink/sigmffilesinkbaseband.h | 12 +- 4 files changed, 148 insertions(+), 68 deletions(-) diff --git a/plugins/channelrx/sigmffilesink/sigmffilesink.cpp b/plugins/channelrx/sigmffilesink/sigmffilesink.cpp index 6057e16f2..b4d755283 100644 --- a/plugins/channelrx/sigmffilesink/sigmffilesink.cpp +++ b/plugins/channelrx/sigmffilesink/sigmffilesink.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include "SWGChannelSettings.h" #include "SWGWorkspaceInfo.h" @@ -52,6 +54,8 @@ const char* const SigMFFileSink::m_channelId = "SigMFFileSink"; SigMFFileSink::SigMFFileSink(DeviceAPI *deviceAPI) : ChannelAPI(m_channelIdURI, ChannelAPI::StreamSingleSink), m_deviceAPI(deviceAPI), + m_mutex(QMutex::Recursive), + m_running(false), m_spectrumVis(SDR_RX_SCALEF), m_centerFrequency(0), m_frequencyOffset(0), @@ -59,10 +63,6 @@ SigMFFileSink::SigMFFileSink(DeviceAPI *deviceAPI) : { setObjectName(m_channelId); - m_basebandSink = new SigMFFileSinkBaseband(); - m_basebandSink->setSpectrumSink(&m_spectrumVis); - m_basebandSink->moveToThread(&m_thread); - applySettings(m_settings, true); m_deviceAPI->addChannelSink(this); @@ -81,6 +81,8 @@ SigMFFileSink::SigMFFileSink(DeviceAPI *deviceAPI) : this, &SigMFFileSink::handleIndexInDeviceSetChanged ); + + start(); } SigMFFileSink::~SigMFFileSink() @@ -95,11 +97,7 @@ SigMFFileSink::~SigMFFileSink() m_deviceAPI->removeChannelSinkAPI(this); m_deviceAPI->removeChannelSink(this); - if (m_basebandSink->isRunning()) { - stop(); - } - - delete m_basebandSink; + stop(); } void SigMFFileSink::setDeviceAPI(DeviceAPI *deviceAPI) @@ -117,7 +115,10 @@ void SigMFFileSink::setDeviceAPI(DeviceAPI *deviceAPI) void SigMFFileSink::setMessageQueueToGUI(MessageQueue* queue) { ChannelAPI::setMessageQueueToGUI(queue); - m_basebandSink->setMessageQueueToGUI(queue); + + if (m_running) { + m_basebandSink->setMessageQueueToGUI(queue); + } } uint32_t SigMFFileSink::getNumberOfDeviceStreams() const @@ -128,18 +129,54 @@ uint32_t SigMFFileSink::getNumberOfDeviceStreams() const void SigMFFileSink::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end, bool firstOfBurst) { (void) firstOfBurst; - m_basebandSink->feed(begin, end); + + if (m_running) { + m_basebandSink->feed(begin, end); + } } void SigMFFileSink::start() { + QMutexLocker m_lock(&m_mutex); + + if (m_running) { + return; + } + qDebug("SigMFFileSink::start"); - m_basebandSink->reset(); + m_thread = new QThread(); + m_basebandSink = new SigMFFileSinkBaseband(); + m_basebandSink->setFifoLabel(QString("%1 [%2:%3]") + .arg(m_channelId) + .arg(m_deviceAPI->getDeviceSetIndex()) + .arg(getIndexInDeviceSet()) + ); + m_basebandSink->setSpectrumSink(&m_spectrumVis); + m_basebandSink->moveToThread(m_thread); + + QObject::connect( + m_thread, + &QThread::started, + m_basebandSink, + &SigMFFileSinkBaseband::startWork + ); + QObject::connect( + m_thread, + &QThread::finished, + m_basebandSink, + &QObject::deleteLater + ); + QObject::connect( + m_thread, + &QThread::finished, + m_thread, + &QThread::deleteLater + ); + m_basebandSink->setMessageQueueToGUI(getMessageQueueToGUI()); m_basebandSink->setDeviceHwId(m_deviceAPI->getHardwareId()); m_basebandSink->setDeviceUId(m_deviceAPI->getDeviceUID()); - m_basebandSink->startWork(); - m_thread.start(); + m_thread->start(); DSPSignalNotification *dspMsg = new DSPSignalNotification(m_basebandSampleRate, m_centerFrequency); m_basebandSink->getInputMessageQueue()->push(dspMsg); @@ -152,14 +189,22 @@ void SigMFFileSink::start() MsgReportStartStop *msg = MsgReportStartStop::create(true); getMessageQueueToGUI()->push(msg); } + + m_running = true; } void SigMFFileSink::stop() { + QMutexLocker m_lock(&m_mutex); + + if (!m_running) { + return; + } + qDebug("SigMFFileSink::stop"); - m_basebandSink->stopWork(); - m_thread.exit(); - m_thread.wait(); + m_running = false; + m_thread->quit(); + m_thread->wait(); if (getMessageQueueToGUI()) { @@ -180,8 +225,10 @@ bool SigMFFileSink::handleMessage(const Message& cmd) m_basebandSampleRate = cfg.getSampleRate(); m_centerFrequency = cfg.getCenterFrequency(); - DSPSignalNotification *notif = new DSPSignalNotification(cfg); - m_basebandSink->getInputMessageQueue()->push(notif); + + if (m_running) { + m_basebandSink->getInputMessageQueue()->push(new DSPSignalNotification(cfg)); + } if (getMessageQueueToGUI()) { getMessageQueueToGUI()->push(new DSPSignalNotification(cfg)); @@ -328,8 +375,11 @@ void SigMFFileSink::applySettings(const SigMFFileSinkSettings& settings, bool fo reverseAPIKeys.append("streamIndex"); } - SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkBaseband *msg = SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkBaseband::create(settings, force); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkBaseband *msg = SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkBaseband::create(settings, force); + m_basebandSink->getInputMessageQueue()->push(msg); + } if ((settings.m_useReverseAPI) && (reverseAPIKeys.size() != 0)) { @@ -353,13 +403,16 @@ void SigMFFileSink::applySettings(const SigMFFileSinkSettings& settings, bool fo void SigMFFileSink::record(bool record) { - SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork *msg = SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork::create(record); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork *msg = SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork::create(record); + m_basebandSink->getInputMessageQueue()->push(msg); + } } uint64_t SigMFFileSink::getMsCount() const { - if (m_basebandSink) { + if (m_running) { return m_basebandSink->getMsCount(); } else { return 0; @@ -368,7 +421,7 @@ uint64_t SigMFFileSink::getMsCount() const uint64_t SigMFFileSink::getByteCount() const { - if (m_basebandSink) { + if (m_running) { return m_basebandSink->getByteCount(); } else { return 0; @@ -377,7 +430,7 @@ uint64_t SigMFFileSink::getByteCount() const unsigned int SigMFFileSink::getNbTracks() const { - if (m_basebandSink) { + if (m_running) { return m_basebandSink->getNbTracks(); } else { return 0; @@ -455,8 +508,11 @@ int SigMFFileSink::webapiActionsPost( if (!m_settings.m_squelchRecordingEnable) { - SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork *msg = SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork::create(record); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork *msg = SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork::create(record); + m_basebandSink->getInputMessageQueue()->push(msg); + } if (getMessageQueueToGUI()) { @@ -624,14 +680,18 @@ void SigMFFileSink::webapiFormatChannelSettings(SWGSDRangel::SWGChannelSettings& void SigMFFileSink::webapiFormatChannelReport(SWGSDRangel::SWGChannelReport& response) { - response.getSigMfFileSinkReport()->setSpectrumSquelch(m_basebandSink->isSquelchOpen() ? 1 : 0); - response.getSigMfFileSinkReport()->setSpectrumMax(m_basebandSink->getSpecMax()); - response.getSigMfFileSinkReport()->setSinkSampleRate(m_basebandSink->getSinkSampleRate()); + if (m_running) + { + response.getSigMfFileSinkReport()->setSpectrumSquelch(m_basebandSink->isSquelchOpen() ? 1 : 0); + response.getSigMfFileSinkReport()->setSpectrumMax(m_basebandSink->getSpecMax()); + response.getSigMfFileSinkReport()->setSinkSampleRate(m_basebandSink->getSinkSampleRate()); + response.getSigMfFileSinkReport()->setRecording(m_basebandSink->isRecording() ? 1 : 0); + response.getSigMfFileSinkReport()->setChannelSampleRate(m_basebandSink->getChannelSampleRate()); + } + + response.getSigMfFileSinkReport()->setRecordCaptures(getNbTracks()); response.getSigMfFileSinkReport()->setRecordTimeMs(getMsCount()); response.getSigMfFileSinkReport()->setRecordSize(getByteCount()); - response.getSigMfFileSinkReport()->setRecording(m_basebandSink->isRecording() ? 1 : 0); - response.getSigMfFileSinkReport()->setRecordCaptures(getNbTracks()); - response.getSigMfFileSinkReport()->setChannelSampleRate(m_basebandSink->getChannelSampleRate()); } void SigMFFileSink::webapiReverseSendSettings(QList& channelSettingsKeys, const SigMFFileSinkSettings& settings, bool force) @@ -779,7 +839,7 @@ void SigMFFileSink::networkManagerFinished(QNetworkReply *reply) void SigMFFileSink::handleIndexInDeviceSetChanged(int index) { - if (index < 0) { + if (!m_running || (index < 0)) { return; } diff --git a/plugins/channelrx/sigmffilesink/sigmffilesink.h b/plugins/channelrx/sigmffilesink/sigmffilesink.h index 75b1785ee..a1c959484 100644 --- a/plugins/channelrx/sigmffilesink/sigmffilesink.h +++ b/plugins/channelrx/sigmffilesink/sigmffilesink.h @@ -19,7 +19,6 @@ #define INCLUDE_SIGMFFILESINK_H_ #include -#include #include #include @@ -31,6 +30,7 @@ class QNetworkAccessManager; class QNetworkReply; +class QThread; class DeviceAPI; class DeviceSampleSource; @@ -159,8 +159,10 @@ public: private: DeviceAPI *m_deviceAPI; - QThread m_thread; + QThread *m_thread; SigMFFileSinkBaseband *m_basebandSink; + QMutex m_mutex; + bool m_running; SigMFFileSinkSettings m_settings; SpectrumVis m_spectrumVis; diff --git a/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.cpp b/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.cpp index 24fe9735c..591674604 100644 --- a/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.cpp +++ b/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.cpp @@ -15,9 +15,9 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// +#include #include -#include "dsp/downchannelizer.h" #include "dsp/dspengine.h" #include "dsp/dspcommands.h" #include "dsp/spectrumvis.h" @@ -30,24 +30,20 @@ MESSAGE_CLASS_DEFINITION(SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkBaseban MESSAGE_CLASS_DEFINITION(SigMFFileSinkBaseband::MsgConfigureSigMFFileSinkWork, Message) SigMFFileSinkBaseband::SigMFFileSinkBaseband() : + m_channelizer(&m_sink), m_specMax(0), m_squelchLevel(0), m_squelchOpen(false), - m_running(false), m_mutex(QMutex::Recursive) { - m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000)); - m_channelizer = new DownChannelizer(&m_sink); - qDebug("SigMFFileSinkBaseband::SigMFFileSinkBaseband"); - connect(&m_timer, SIGNAL(timeout()), this, SLOT(tick())); - m_timer.start(200); + m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000)); } SigMFFileSinkBaseband::~SigMFFileSinkBaseband() { m_inputMessageQueue.clear(); - delete m_channelizer; + stopWork(); } void SigMFFileSinkBaseband::reset() @@ -67,22 +63,46 @@ void SigMFFileSinkBaseband::startWork() &SigMFFileSinkBaseband::handleData, Qt::QueuedConnection ); - connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); - m_running = true; + QObject::connect( + &m_inputMessageQueue, + &MessageQueue::messageEnqueued, + this, + &SigMFFileSinkBaseband::handleInputMessages + ); + m_timer = new QTimer(); + QObject::connect( + m_timer, + &QTimer::timeout, + this, + &SigMFFileSinkBaseband::tick + ); + m_timer->start(200); } void SigMFFileSinkBaseband::stopWork() { QMutexLocker mutexLocker(&m_mutex); m_sink.stopRecording(); - disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); + QObject::disconnect( + &m_inputMessageQueue, + &MessageQueue::messageEnqueued, + this, + &SigMFFileSinkBaseband::handleInputMessages + ); QObject::disconnect( &m_sampleFifo, &SampleSinkFifo::dataReady, this, &SigMFFileSinkBaseband::handleData ); - m_running = false; + QObject::disconnect( + m_timer, + &QTimer::timeout, + this, + &SigMFFileSinkBaseband::tick + ); + m_timer->stop(); + delete m_timer; } void SigMFFileSinkBaseband::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end) @@ -105,12 +125,12 @@ void SigMFFileSinkBaseband::handleData() // first part of FIFO data if (part1begin != part1end) { - m_channelizer->feed(part1begin, part1end); + m_channelizer.feed(part1begin, part1end); } // second part of FIFO data (used when block wraps around) if(part2begin != part2end) { - m_channelizer->feed(part2begin, part2end); + m_channelizer.feed(part2begin, part2end); } m_sampleFifo.readCommit((unsigned int) count); @@ -150,13 +170,13 @@ bool SigMFFileSinkBaseband::handleMessage(const Message& cmd) << " cnterFrequency: " << notif.getCenterFrequency(); m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(notif.getSampleRate())); m_centerFrequency = notif.getCenterFrequency(); - m_channelizer->setBasebandSampleRate(notif.getSampleRate()); - int desiredSampleRate = m_channelizer->getBasebandSampleRate() / (1<setChannelization(desiredSampleRate, m_settings.m_inputFrequencyOffset); + m_channelizer.setBasebandSampleRate(notif.getSampleRate()); + int desiredSampleRate = m_channelizer.getBasebandSampleRate() / (1<getChannelSampleRate(), + m_channelizer.getChannelSampleRate(), desiredSampleRate, - m_channelizer->getChannelFrequencyOffset(), + m_channelizer.getChannelFrequencyOffset(), m_centerFrequency + m_settings.m_inputFrequencyOffset); return true; @@ -193,12 +213,12 @@ void SigMFFileSinkBaseband::applySettings(const SigMFFileSinkSettings& settings, if ((settings.m_log2Decim != m_settings.m_log2Decim) || (settings.m_inputFrequencyOffset != m_settings.m_inputFrequencyOffset) || force) { - int desiredSampleRate = m_channelizer->getBasebandSampleRate() / (1<setChannelization(desiredSampleRate, settings.m_inputFrequencyOffset); + int desiredSampleRate = m_channelizer.getBasebandSampleRate() / (1<getChannelSampleRate(), + m_channelizer.getChannelSampleRate(), desiredSampleRate, - m_channelizer->getChannelFrequencyOffset(), + m_channelizer.getChannelFrequencyOffset(), m_centerFrequency + settings.m_inputFrequencyOffset); } @@ -218,7 +238,7 @@ void SigMFFileSinkBaseband::applySettings(const SigMFFileSinkSettings& settings, int SigMFFileSinkBaseband::getChannelSampleRate() const { - return m_channelizer->getChannelSampleRate(); + return m_channelizer.getChannelSampleRate(); } void SigMFFileSinkBaseband::tick() diff --git a/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.h b/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.h index 246770ba1..791e74ff9 100644 --- a/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.h +++ b/plugins/channelrx/sigmffilesink/sigmffilesinkbaseband.h @@ -20,16 +20,16 @@ #include #include -#include #include "dsp/samplesinkfifo.h" +#include "dsp/downchannelizer.h" #include "util/message.h" #include "util/messagequeue.h" #include "sigmffilesinksink.h" #include "sigmffilesinksettings.h" -class DownChannelizer; +class QTimer; class SpectrumVis; class SigMFFileSinkBaseband : public QObject @@ -84,12 +84,10 @@ public: void reset(); void startWork(); - void stopWork(); void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end); MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; } //!< Get the queue for asynchronous inbound communication int getChannelSampleRate() const; void setBasebandSampleRate(int sampleRate); - bool isRunning() const { return m_running; } void setSpectrumSink(SpectrumVis* spectrumSink) { m_spectrumSink = spectrumSink; m_sink.setSpectrumSink(spectrumSink); } uint64_t getMsCount() const { return m_sink.getMsCount(); } uint64_t getByteCount() const { return m_sink.getByteCount(); } @@ -105,7 +103,7 @@ public: private: SampleSinkFifo m_sampleFifo; - DownChannelizer *m_channelizer; + DownChannelizer m_channelizer; SigMFFileSinkSink m_sink; SpectrumVis *m_spectrumSink; MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication @@ -115,10 +113,10 @@ private: float m_squelchLevel; bool m_squelchOpen; int64_t m_centerFrequency; - bool m_running; QMutex m_mutex; - QTimer m_timer; + QTimer *m_timer; + void stopWork(); bool handleMessage(const Message& cmd); void applySettings(const SigMFFileSinkSettings& settings, bool force = false);