From 63a1e17f8d1403fa43591ad42b975e267fcc8961 Mon Sep 17 00:00:00 2001 From: f4exb Date: Fri, 14 Oct 2022 00:03:57 +0200 Subject: [PATCH] Local sink: updated threading model. Part of #1346 --- plugins/channelrx/localsink/localsink.cpp | 76 ++++++++++++++++------- plugins/channelrx/localsink/localsink.h | 1 + 2 files changed, 56 insertions(+), 21 deletions(-) diff --git a/plugins/channelrx/localsink/localsink.cpp b/plugins/channelrx/localsink/localsink.cpp index 3eb87911b..93bfeb64e 100644 --- a/plugins/channelrx/localsink/localsink.cpp +++ b/plugins/channelrx/localsink/localsink.cpp @@ -49,16 +49,14 @@ const char* const LocalSink::m_channelId = "LocalSink"; LocalSink::LocalSink(DeviceAPI *deviceAPI) : ChannelAPI(m_channelIdURI, ChannelAPI::StreamSingleSink), m_deviceAPI(deviceAPI), + m_thread(nullptr), + m_basebandSink(nullptr), + m_running(false), m_centerFrequency(0), m_frequencyOffset(0), m_basebandSampleRate(48000) { setObjectName(m_channelId); - - m_thread = new QThread(this); - m_basebandSink = new LocalSinkBaseband(); - m_basebandSink->moveToThread(m_thread); - applySettings(m_settings, true); m_deviceAPI->addChannelSink(this); @@ -77,6 +75,7 @@ LocalSink::LocalSink(DeviceAPI *deviceAPI) : this, &LocalSink::handleIndexInDeviceSetChanged ); + start(); } LocalSink::~LocalSink() @@ -90,8 +89,7 @@ LocalSink::~LocalSink() delete m_networkManager; m_deviceAPI->removeChannelSinkAPI(this); m_deviceAPI->removeChannelSink(this); - delete m_basebandSink; - delete m_thread; + stop(); } void LocalSink::setDeviceAPI(DeviceAPI *deviceAPI) @@ -114,19 +112,43 @@ uint32_t LocalSink::getNumberOfDeviceStreams() const void LocalSink::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end, bool firstOfBurst) { (void) firstOfBurst; - m_basebandSink->feed(begin, end); + + if (m_running) { + m_basebandSink->feed(begin, end); + } } void LocalSink::start() { + if (m_running) { + return; + } + qDebug("LocalSink::start"); + m_thread = new QThread(this); + m_basebandSink = new LocalSinkBaseband(); + m_basebandSink->moveToThread(m_thread); + + QObject::connect(m_thread, &QThread::finished, m_basebandSink, &QObject::deleteLater); + QObject::connect(m_thread, &QThread::finished, m_thread, &QThread::deleteLater); + m_basebandSink->reset(); m_thread->start(); + + LocalSinkBaseband::MsgConfigureLocalSinkBaseband *msg = LocalSinkBaseband::MsgConfigureLocalSinkBaseband::create(m_settings, true); + m_basebandSink->getInputMessageQueue()->push(msg); + + m_running = true; } void LocalSink::stop() { + if (!m_running) { + return; + } + qDebug("LocalSink::stop"); + m_running = false; m_thread->exit(); m_thread->wait(); } @@ -147,8 +169,11 @@ bool LocalSink::handleMessage(const Message& cmd) calculateFrequencyOffset(m_settings.m_log2Decim, m_settings.m_filterChainHash); // This is when device sample rate changes propagateSampleRateAndFrequency(m_settings.m_localDeviceIndex, m_settings.m_log2Decim); - DSPSignalNotification *msg = new DSPSignalNotification(notif.getSampleRate(), notif.getCenterFrequency()); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + DSPSignalNotification *msg = new DSPSignalNotification(notif.getSampleRate(), notif.getCenterFrequency()); + m_basebandSink->getInputMessageQueue()->push(msg); + } if (getMessageQueueToGUI()) { getMessageQueueToGUI()->push(new DSPSignalNotification(notif)); @@ -283,10 +308,14 @@ void LocalSink::applySettings(const LocalSinkSettings& settings, bool force) { reverseAPIKeys.append("localDeviceIndex"); propagateSampleRateAndFrequency(settings.m_localDeviceIndex, settings.m_log2Decim); - DeviceSampleSource *deviceSource = getLocalDevice(settings.m_localDeviceIndex); - LocalSinkBaseband::MsgConfigureLocalDeviceSampleSource *msg = - LocalSinkBaseband::MsgConfigureLocalDeviceSampleSource::create(deviceSource); - m_basebandSink->getInputMessageQueue()->push(msg); + + if (m_running) + { + DeviceSampleSource *deviceSource = getLocalDevice(settings.m_localDeviceIndex); + LocalSinkBaseband::MsgConfigureLocalDeviceSampleSource *msg = + LocalSinkBaseband::MsgConfigureLocalDeviceSampleSource::create(deviceSource); + m_basebandSink->getInputMessageQueue()->push(msg); + } } if ((settings.m_log2Decim != m_settings.m_log2Decim) @@ -299,10 +328,12 @@ void LocalSink::applySettings(const LocalSinkSettings& settings, bool force) if ((settings.m_play != m_settings.m_play) || force) { reverseAPIKeys.append("play"); - LocalSinkBaseband::MsgConfigureLocalSinkWork *msg = LocalSinkBaseband::MsgConfigureLocalSinkWork::create( - settings.m_play - ); - m_basebandSink->getInputMessageQueue()->push(msg); + + if (m_running) + { + LocalSinkBaseband::MsgConfigureLocalSinkWork *msg = LocalSinkBaseband::MsgConfigureLocalSinkWork::create(settings.m_play); + m_basebandSink->getInputMessageQueue()->push(msg); + } } if (m_settings.m_streamIndex != settings.m_streamIndex) @@ -318,8 +349,11 @@ void LocalSink::applySettings(const LocalSinkSettings& settings, bool force) reverseAPIKeys.append("streamIndex"); } - LocalSinkBaseband::MsgConfigureLocalSinkBaseband *msg = LocalSinkBaseband::MsgConfigureLocalSinkBaseband::create(settings, force); - m_basebandSink->getInputMessageQueue()->push(msg); + if (m_running) + { + LocalSinkBaseband::MsgConfigureLocalSinkBaseband *msg = LocalSinkBaseband::MsgConfigureLocalSinkBaseband::create(settings, force); + m_basebandSink->getInputMessageQueue()->push(msg); + } if ((settings.m_useReverseAPI) && (reverseAPIKeys.size() != 0)) { @@ -638,7 +672,7 @@ void LocalSink::networkManagerFinished(QNetworkReply *reply) void LocalSink::handleIndexInDeviceSetChanged(int index) { - if (index < 0) { + if (!m_running || (index < 0)) { return; } diff --git a/plugins/channelrx/localsink/localsink.h b/plugins/channelrx/localsink/localsink.h index d5ac97f47..cc21ed061 100644 --- a/plugins/channelrx/localsink/localsink.h +++ b/plugins/channelrx/localsink/localsink.h @@ -126,6 +126,7 @@ private: DeviceAPI *m_deviceAPI; QThread *m_thread; LocalSinkBaseband *m_basebandSink; + bool m_running; LocalSinkSettings m_settings; uint64_t m_centerFrequency;