From b44eb18df615baccd06a077668d13808e21ffd89 Mon Sep 17 00:00:00 2001 From: f4exb Date: Sat, 23 Jul 2022 05:46:06 +0200 Subject: [PATCH] File Sink: applied new threadning method. Part of #1346 --- plugins/channelrx/filesink/filesink.cpp | 131 +++++++++++++----- plugins/channelrx/filesink/filesink.h | 6 +- .../channelrx/filesink/filesinkbaseband.cpp | 66 +++++---- plugins/channelrx/filesink/filesinkbaseband.h | 12 +- 4 files changed, 148 insertions(+), 67 deletions(-) diff --git a/plugins/channelrx/filesink/filesink.cpp b/plugins/channelrx/filesink/filesink.cpp index ed91febae..9ead19672 100644 --- a/plugins/channelrx/filesink/filesink.cpp +++ b/plugins/channelrx/filesink/filesink.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include "SWGChannelSettings.h" #include "SWGWorkspaceInfo.h" @@ -52,6 +54,8 @@ const char* const FileSink::m_channelId = "FileSink"; FileSink::FileSink(DeviceAPI *deviceAPI) : ChannelAPI(m_channelIdURI, ChannelAPI::StreamSingleSink), m_deviceAPI(deviceAPI), + m_mutex(QMutex::Recursive), + m_running(false), m_spectrumVis(SDR_RX_SCALEF), m_centerFrequency(0), m_frequencyOffset(0), @@ -59,10 +63,6 @@ FileSink::FileSink(DeviceAPI *deviceAPI) : { setObjectName(m_channelId); - m_basebandSink = new FileSinkBaseband(); - m_basebandSink->setSpectrumSink(&m_spectrumVis); - m_basebandSink->moveToThread(&m_thread); - applySettings(m_settings, true); m_deviceAPI->addChannelSink(this); @@ -81,10 +81,13 @@ FileSink::FileSink(DeviceAPI *deviceAPI) : this, &FileSink::handleIndexInDeviceSetChanged ); + + start(); } FileSink::~FileSink() { + qDebug("FileSink::~FileSink"); QObject::disconnect( m_networkManager, &QNetworkAccessManager::finished, @@ -95,11 +98,7 @@ FileSink::~FileSink() m_deviceAPI->removeChannelSinkAPI(this); m_deviceAPI->removeChannelSink(this); - if (m_basebandSink->isRunning()) { - stop(); - } - - delete m_basebandSink; + stop(); } void FileSink::setDeviceAPI(DeviceAPI *deviceAPI) @@ -117,7 +116,10 @@ void FileSink::setDeviceAPI(DeviceAPI *deviceAPI) void FileSink::setMessageQueueToGUI(MessageQueue* queue) { ChannelAPI::setMessageQueueToGUI(queue); - m_basebandSink->setMessageQueueToGUI(queue); + + if (m_running) { + m_basebandSink->setMessageQueueToGUI(queue); + } } uint32_t FileSink::getNumberOfDeviceStreams() const @@ -128,18 +130,55 @@ uint32_t FileSink::getNumberOfDeviceStreams() const void FileSink::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end, bool firstOfBurst) { (void) firstOfBurst; - m_basebandSink->feed(begin, end); + + if (m_running) { + m_basebandSink->feed(begin, end); + } } void FileSink::start() { + QMutexLocker m_lock(&m_mutex); + + if (m_running) { + return; + } + qDebug("FileSink::start"); - m_basebandSink->reset(); + m_thread = new QThread(); + m_basebandSink = new FileSinkBaseband(); + m_basebandSink->setFifoLabel(QString("%1 [%2:%3]") + .arg(m_channelId) + .arg(m_deviceAPI->getDeviceSetIndex()) + .arg(getIndexInDeviceSet()) + ); + m_basebandSink->setSpectrumSink(&m_spectrumVis); + m_basebandSink->moveToThread(m_thread); + + QObject::connect( + m_thread, + &QThread::started, + m_basebandSink, + &FileSinkBaseband::startWork + ); + QObject::connect( + m_thread, + &QThread::finished, + m_basebandSink, + &QObject::deleteLater + ); + QObject::connect( + m_thread, + &QThread::finished, + m_thread, + &QThread::deleteLater + ); + m_basebandSink->setMessageQueueToGUI(getMessageQueueToGUI()); m_basebandSink->setDeviceHwId(m_deviceAPI->getHardwareId()); m_basebandSink->setDeviceUId(m_deviceAPI->getDeviceUID()); m_basebandSink->startWork(); - m_thread.start(); + m_thread->start(); DSPSignalNotification *dspMsg = new DSPSignalNotification(m_basebandSampleRate, m_centerFrequency); m_basebandSink->getInputMessageQueue()->push(dspMsg); @@ -152,14 +191,23 @@ void FileSink::start() MsgReportStartStop *msg = MsgReportStartStop::create(true); getMessageQueueToGUI()->push(msg); } + + m_running = true; } void FileSink::stop() { - qDebug("FileSink::stop"); - m_basebandSink->stopWork(); - m_thread.exit(); - m_thread.wait(); + QMutexLocker m_lock(&m_mutex); + + if (!m_running) { + return; + } + + qDebug("FileSink::stop:"); + m_running = false; + + m_thread->quit(); + m_thread->wait(); if (getMessageQueueToGUI()) { @@ -180,8 +228,10 @@ bool FileSink::handleMessage(const Message& cmd) m_basebandSampleRate = cfg.getSampleRate(); m_centerFrequency = cfg.getCenterFrequency(); - DSPSignalNotification *notif = new DSPSignalNotification(cfg); - m_basebandSink->getInputMessageQueue()->push(notif); + + if (m_running) { + m_basebandSink->getInputMessageQueue()->push(new DSPSignalNotification(cfg)); + } if (getMessageQueueToGUI()) { getMessageQueueToGUI()->push(new DSPSignalNotification(cfg)); @@ -328,8 +378,11 @@ void FileSink::applySettings(const FileSinkSettings& settings, bool force) reverseAPIKeys.append("streamIndex"); } - FileSinkBaseband::MsgConfigureFileSinkBaseband *msg = FileSinkBaseband::MsgConfigureFileSinkBaseband::create(settings, force); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + FileSinkBaseband::MsgConfigureFileSinkBaseband *msg = FileSinkBaseband::MsgConfigureFileSinkBaseband::create(settings, force); + m_basebandSink->getInputMessageQueue()->push(msg); + } if ((settings.m_useReverseAPI) && (reverseAPIKeys.size() != 0)) { @@ -353,13 +406,16 @@ void FileSink::applySettings(const FileSinkSettings& settings, bool force) void FileSink::record(bool record) { - FileSinkBaseband::MsgConfigureFileSinkWork *msg = FileSinkBaseband::MsgConfigureFileSinkWork::create(record); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + FileSinkBaseband::MsgConfigureFileSinkWork *msg = FileSinkBaseband::MsgConfigureFileSinkWork::create(record); + m_basebandSink->getInputMessageQueue()->push(msg); + } } uint64_t FileSink::getMsCount() const { - if (m_basebandSink) { + if (m_running) { return m_basebandSink->getMsCount(); } else { return 0; @@ -368,7 +424,7 @@ uint64_t FileSink::getMsCount() const uint64_t FileSink::getByteCount() const { - if (m_basebandSink) { + if (m_running) { return m_basebandSink->getByteCount(); } else { return 0; @@ -377,7 +433,7 @@ uint64_t FileSink::getByteCount() const unsigned int FileSink::getNbTracks() const { - if (m_basebandSink) { + if (m_running) { return m_basebandSink->getNbTracks(); } else { return 0; @@ -455,8 +511,11 @@ int FileSink::webapiActionsPost( if (!m_settings.m_squelchRecordingEnable) { - FileSinkBaseband::MsgConfigureFileSinkWork *msg = FileSinkBaseband::MsgConfigureFileSinkWork::create(record); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + FileSinkBaseband::MsgConfigureFileSinkWork *msg = FileSinkBaseband::MsgConfigureFileSinkWork::create(record); + m_basebandSink->getInputMessageQueue()->push(msg); + } if (getMessageQueueToGUI()) { @@ -624,14 +683,18 @@ void FileSink::webapiFormatChannelSettings(SWGSDRangel::SWGChannelSettings& resp void FileSink::webapiFormatChannelReport(SWGSDRangel::SWGChannelReport& response) { - response.getFileSinkReport()->setSpectrumSquelch(m_basebandSink->isSquelchOpen() ? 1 : 0); - response.getFileSinkReport()->setSpectrumMax(m_basebandSink->getSpecMax()); - response.getFileSinkReport()->setSinkSampleRate(m_basebandSink->getSinkSampleRate()); response.getFileSinkReport()->setRecordTimeMs(getMsCount()); response.getFileSinkReport()->setRecordSize(getByteCount()); - response.getFileSinkReport()->setRecording(m_basebandSink->isRecording() ? 1 : 0); response.getFileSinkReport()->setRecordCaptures(getNbTracks()); - response.getFileSinkReport()->setChannelSampleRate(m_basebandSink->getChannelSampleRate()); + + if (m_running) + { + response.getFileSinkReport()->setSpectrumSquelch(m_basebandSink->isSquelchOpen() ? 1 : 0); + response.getFileSinkReport()->setSpectrumMax(m_basebandSink->getSpecMax()); + response.getFileSinkReport()->setSinkSampleRate(m_basebandSink->getSinkSampleRate()); + response.getFileSinkReport()->setRecording(m_basebandSink->isRecording() ? 1 : 0); + response.getFileSinkReport()->setChannelSampleRate(m_basebandSink->getChannelSampleRate()); + } } void FileSink::webapiReverseSendSettings(QList& channelSettingsKeys, const FileSinkSettings& settings, bool force) @@ -779,7 +842,7 @@ void FileSink::networkManagerFinished(QNetworkReply *reply) void FileSink::handleIndexInDeviceSetChanged(int index) { - if (index < 0) { + if (!m_running || (index < 0)) { return; } diff --git a/plugins/channelrx/filesink/filesink.h b/plugins/channelrx/filesink/filesink.h index 95ec66ffb..5468564ff 100644 --- a/plugins/channelrx/filesink/filesink.h +++ b/plugins/channelrx/filesink/filesink.h @@ -19,7 +19,6 @@ #define INCLUDE_FILESINK_H_ #include -#include #include #include @@ -31,6 +30,7 @@ class QNetworkAccessManager; class QNetworkReply; +class QThread; class DeviceAPI; class DeviceSampleSource; @@ -159,8 +159,10 @@ public: private: DeviceAPI *m_deviceAPI; - QThread m_thread; + QThread *m_thread; FileSinkBaseband *m_basebandSink; + QMutex m_mutex; + bool m_running; FileSinkSettings m_settings; SpectrumVis m_spectrumVis; diff --git a/plugins/channelrx/filesink/filesinkbaseband.cpp b/plugins/channelrx/filesink/filesinkbaseband.cpp index c53c5f4ae..284a37b9b 100644 --- a/plugins/channelrx/filesink/filesinkbaseband.cpp +++ b/plugins/channelrx/filesink/filesinkbaseband.cpp @@ -15,9 +15,9 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// +#include #include -#include "dsp/downchannelizer.h" #include "dsp/dspengine.h" #include "dsp/dspcommands.h" #include "dsp/spectrumvis.h" @@ -30,24 +30,22 @@ MESSAGE_CLASS_DEFINITION(FileSinkBaseband::MsgConfigureFileSinkBaseband, Message MESSAGE_CLASS_DEFINITION(FileSinkBaseband::MsgConfigureFileSinkWork, Message) FileSinkBaseband::FileSinkBaseband() : + m_channelizer(&m_sink), m_specMax(0), m_squelchLevel(0), m_squelchOpen(false), - m_running(false), m_mutex(QMutex::Recursive) { - m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000)); - m_channelizer = new DownChannelizer(&m_sink); - qDebug("FileSinkBaseband::FileSinkBaseband"); - connect(&m_timer, SIGNAL(timeout()), this, SLOT(tick())); - m_timer.start(200); + m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000)); } FileSinkBaseband::~FileSinkBaseband() { + qDebug("FileSinkBaseband::~FileSinkBaseband"); m_inputMessageQueue.clear(); - delete m_channelizer; + stopWork(); + qDebug("FileSinkBaseband::~FileSinkBaseband: done"); } void FileSinkBaseband::reset() @@ -60,6 +58,15 @@ void FileSinkBaseband::reset() void FileSinkBaseband::startWork() { QMutexLocker mutexLocker(&m_mutex); + qDebug("FileSinkBaseband::startWork"); + m_timer = new QTimer(); + connect( + m_timer, + &QTimer::timeout, + this, + &FileSinkBaseband::tick + ); + m_timer->start(200); QObject::connect( &m_sampleFifo, &SampleSinkFifo::dataReady, @@ -67,22 +74,33 @@ void FileSinkBaseband::startWork() &FileSinkBaseband::handleData, Qt::QueuedConnection ); - connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); - m_running = true; + QObject::connect( + &m_inputMessageQueue, + &MessageQueue::messageEnqueued, + this, + &FileSinkBaseband::handleInputMessages + ); } void FileSinkBaseband::stopWork() { QMutexLocker mutexLocker(&m_mutex); + qDebug("FileSinkBaseband::stopWork"); + m_timer->stop(); m_sink.stopRecording(); - disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); + QObject::disconnect( + &m_inputMessageQueue, + &MessageQueue::messageEnqueued, + this, + &FileSinkBaseband::handleInputMessages + ); QObject::disconnect( &m_sampleFifo, &SampleSinkFifo::dataReady, this, &FileSinkBaseband::handleData ); - m_running = false; + delete m_timer; } void FileSinkBaseband::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end) @@ -105,12 +123,12 @@ void FileSinkBaseband::handleData() // first part of FIFO data if (part1begin != part1end) { - m_channelizer->feed(part1begin, part1end); + m_channelizer.feed(part1begin, part1end); } // second part of FIFO data (used when block wraps around) if(part2begin != part2end) { - m_channelizer->feed(part2begin, part2end); + m_channelizer.feed(part2begin, part2end); } m_sampleFifo.readCommit((unsigned int) count); @@ -150,13 +168,13 @@ bool FileSinkBaseband::handleMessage(const Message& cmd) << " cnterFrequency: " << notif.getCenterFrequency(); m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(notif.getSampleRate())); m_centerFrequency = notif.getCenterFrequency(); - m_channelizer->setBasebandSampleRate(notif.getSampleRate()); - int desiredSampleRate = m_channelizer->getBasebandSampleRate() / (1<setChannelization(desiredSampleRate, m_settings.m_inputFrequencyOffset); + m_channelizer.setBasebandSampleRate(notif.getSampleRate()); + int desiredSampleRate = m_channelizer.getBasebandSampleRate() / (1<getChannelSampleRate(), + m_channelizer.getChannelSampleRate(), desiredSampleRate, - m_channelizer->getChannelFrequencyOffset(), + m_channelizer.getChannelFrequencyOffset(), m_centerFrequency + m_settings.m_inputFrequencyOffset); return true; @@ -193,12 +211,12 @@ void FileSinkBaseband::applySettings(const FileSinkSettings& settings, bool forc if ((settings.m_log2Decim != m_settings.m_log2Decim) || (settings.m_inputFrequencyOffset != m_settings.m_inputFrequencyOffset) || force) { - int desiredSampleRate = m_channelizer->getBasebandSampleRate() / (1<setChannelization(desiredSampleRate, settings.m_inputFrequencyOffset); + int desiredSampleRate = m_channelizer.getBasebandSampleRate() / (1<getChannelSampleRate(), + m_channelizer.getChannelSampleRate(), desiredSampleRate, - m_channelizer->getChannelFrequencyOffset(), + m_channelizer.getChannelFrequencyOffset(), m_centerFrequency + settings.m_inputFrequencyOffset); } @@ -218,7 +236,7 @@ void FileSinkBaseband::applySettings(const FileSinkSettings& settings, bool forc int FileSinkBaseband::getChannelSampleRate() const { - return m_channelizer->getChannelSampleRate(); + return m_channelizer.getChannelSampleRate(); } void FileSinkBaseband::tick() diff --git a/plugins/channelrx/filesink/filesinkbaseband.h b/plugins/channelrx/filesink/filesinkbaseband.h index cc006096e..62005c4dd 100644 --- a/plugins/channelrx/filesink/filesinkbaseband.h +++ b/plugins/channelrx/filesink/filesinkbaseband.h @@ -20,17 +20,17 @@ #include #include -#include #include "dsp/samplesinkfifo.h" +#include "dsp/downchannelizer.h" #include "util/message.h" #include "util/messagequeue.h" #include "filesinksink.h" #include "filesinksettings.h" -class DownChannelizer; class SpectrumVis; +class QTimer; class FileSinkBaseband : public QObject { @@ -84,12 +84,10 @@ public: void reset(); void startWork(); - void stopWork(); void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end); MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; } //!< Get the queue for asynchronous inbound communication int getChannelSampleRate() const; void setBasebandSampleRate(int sampleRate); - bool isRunning() const { return m_running; } void setSpectrumSink(SpectrumVis* spectrumSink) { m_spectrumSink = spectrumSink; m_sink.setSpectrumSink(spectrumSink); } uint64_t getMsCount() const { return m_sink.getMsCount(); } uint64_t getByteCount() const { return m_sink.getByteCount(); } @@ -105,7 +103,7 @@ public: private: SampleSinkFifo m_sampleFifo; - DownChannelizer *m_channelizer; + DownChannelizer m_channelizer; FileSinkSink m_sink; SpectrumVis *m_spectrumSink; MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication @@ -115,10 +113,10 @@ private: float m_squelchLevel; bool m_squelchOpen; int64_t m_centerFrequency; - bool m_running; QMutex m_mutex; - QTimer m_timer; + QTimer *m_timer; + void stopWork(); bool handleMessage(const Message& cmd); void applySettings(const FileSinkSettings& settings, bool force = false);