1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-09-04 14:17:50 -04:00

Deep redesign: debug session #2 phase #2: fixed multi-threading of channelizers

This commit is contained in:
f4exb 2015-08-21 08:54:28 +02:00
parent 38bc6563d4
commit bc287a4c33
22 changed files with 129 additions and 101 deletions

View File

@ -25,6 +25,7 @@
class SampleSource; class SampleSource;
class SampleSink; class SampleSink;
class ThreadedSampleSink;
class AudioFifo; class AudioFifo;
class SDRANGELOVE_API DSPPing : public Message { class SDRANGELOVE_API DSPPing : public Message {
@ -105,28 +106,28 @@ private:
SampleSink* m_sampleSink; SampleSink* m_sampleSink;
}; };
class SDRANGELOVE_API DSPAddThreadedSink : public Message { class SDRANGELOVE_API DSPAddThreadedSampleSink : public Message {
MESSAGE_CLASS_DECLARATION MESSAGE_CLASS_DECLARATION
public: public:
DSPAddThreadedSink(SampleSink* sampleSink) : Message(), m_sampleSink(sampleSink) { } DSPAddThreadedSampleSink(ThreadedSampleSink* threadedSampleSink) : Message(), m_threadedSampleSink(threadedSampleSink) { }
SampleSink* getSampleSink() const { return m_sampleSink; } ThreadedSampleSink* getThreadedSampleSink() const { return m_threadedSampleSink; }
private: private:
SampleSink* m_sampleSink; ThreadedSampleSink* m_threadedSampleSink;
}; };
class SDRANGELOVE_API DSPRemoveThreadedSink : public Message { class SDRANGELOVE_API DSPRemoveThreadedSampleSink : public Message {
MESSAGE_CLASS_DECLARATION MESSAGE_CLASS_DECLARATION
public: public:
DSPRemoveThreadedSink(SampleSink* sampleSink) : Message(), m_sampleSink(sampleSink) { } DSPRemoveThreadedSampleSink(ThreadedSampleSink* threadedSampleSink) : Message(), m_threadedSampleSink(threadedSampleSink) { }
SampleSink* getSampleSink() const { return m_sampleSink; } ThreadedSampleSink* getThreadedSampleSink() const { return m_threadedSampleSink; }
private: private:
SampleSink* m_sampleSink; ThreadedSampleSink* m_threadedSampleSink;
}; };
class SDRANGELOVE_API DSPAddAudioSink : public Message { class SDRANGELOVE_API DSPAddAudioSink : public Message {

View File

@ -67,8 +67,8 @@ public:
void addSink(SampleSink* sink); //!< Add a sample sink void addSink(SampleSink* sink); //!< Add a sample sink
void removeSink(SampleSink* sink); //!< Remove a sample sink void removeSink(SampleSink* sink); //!< Remove a sample sink
void addThreadedSink(SampleSink* sink); //!< Add a sample sink that will run on its own thread void addThreadedSink(ThreadedSampleSink* sink); //!< Add a sample sink that will run on its own thread
void removeThreadedSink(SampleSink* sink); //!< Remove a sample sink that runs on its own thread void removeThreadedSink(ThreadedSampleSink* sink); //!< Remove a sample sink that runs on its own thread
void addAudioSink(AudioFifo* audioFifo); //!< Add the audio sink void addAudioSink(AudioFifo* audioFifo); //!< Add the audio sink
void removeAudioSink(AudioFifo* audioFifo); //!< Remove the audio sink void removeAudioSink(AudioFifo* audioFifo); //!< Remove the audio sink

View File

@ -19,7 +19,6 @@
#define INCLUDE_THREADEDSAMPLESINK_H #define INCLUDE_THREADEDSAMPLESINK_H
#include <QMutex> #include <QMutex>
#include <QThread>
#include "samplesink.h" #include "samplesink.h"
#include "dsp/samplefifo.h" #include "dsp/samplefifo.h"
#include "util/messagequeue.h" #include "util/messagequeue.h"
@ -27,15 +26,16 @@
#include "util/syncmessenger.h" #include "util/syncmessenger.h"
class SampleSink; class SampleSink;
class QThread;
/** /**
* This class is a wrapper for SampleSink that runs the SampleSink object in its own thread * This class is a wrapper for SampleSink that runs the SampleSink object in its own thread
*/ */
class SDRANGELOVE_API ThreadedSampleSink : public QThread { class SDRANGELOVE_API ThreadedSampleSink : public QObject {
Q_OBJECT Q_OBJECT
public: public:
ThreadedSampleSink(SampleSink* sampleSink); ThreadedSampleSink(SampleSink* sampleSink, QObject *parent = 0);
~ThreadedSampleSink(); ~ThreadedSampleSink();
const SampleSink *getSink() const { return m_sampleSink; } const SampleSink *getSink() const { return m_sampleSink; }
@ -45,20 +45,19 @@ public:
void start(); //!< this thread start() void start(); //!< this thread start()
void stop(); //!< this thread exit() and wait() void stop(); //!< this thread exit() and wait()
bool sendWaitSink(Message& cmd); //!< Send message to sink synchronously bool handleSinkMessage(Message& cmd); //!< Send message to sink synchronously
void feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly); //!< Feed sink with samples void feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly); //!< Feed sink with samples
QString getSampleSinkObjectName() const; QString getSampleSinkObjectName() const;
protected: protected:
QThread *m_thread; //!< The thead object
SyncMessenger m_syncMessenger; //!< Used to process messages synchronously with the thread SyncMessenger m_syncMessenger; //!< Used to process messages synchronously with the thread
SampleSink* m_sampleSink; SampleSink* m_sampleSink;
SampleFifo m_sampleFifo;
private: protected slots:
void run(); //!< this thread run() method void handleData();
private slots:
void handleSynchronousMessages(); //!< Handle synchronous messages with the thread
}; };
#endif // INCLUDE_THREADEDSAMPLESINK_H #endif // INCLUDE_THREADEDSAMPLESINK_H

View File

@ -124,11 +124,11 @@ void AMDemod::feed(SampleVector::const_iterator begin, SampleVector::const_itera
{ {
uint res = m_audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill, 1); uint res = m_audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill, 1);
/* FIXME: Not necessarily bad, There is a race between threads but generally it works i.e. samples are not lost // FIXME: Not necessarily bad, There is a race between threads but generally it works i.e. samples are not lost
if (res != m_audioBufferFill) if (res != m_audioBufferFill)
{ {
qDebug("AMDemod::feed: %u/%u audio samples lost", m_audioBufferFill - res, m_audioBufferFill); qDebug("AMDemod::feed: %u/%u audio samples lost", m_audioBufferFill - res, m_audioBufferFill);
}*/ }
m_audioBufferFill = 0; m_audioBufferFill = 0;
} }
@ -141,7 +141,8 @@ void AMDemod::feed(SampleVector::const_iterator begin, SampleVector::const_itera
{ {
uint res = m_audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill, 1); uint res = m_audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill, 1);
/* SAme remark as above // Same remark as above
/*
if (res != m_audioBufferFill) if (res != m_audioBufferFill)
{ {
qDebug("AMDemod::feed: %u samples written vs %u requested", res, m_audioBufferFill); qDebug("AMDemod::feed: %u samples written vs %u requested", res, m_audioBufferFill);

View File

@ -204,8 +204,9 @@ AMDemodGUI::AMDemodGUI(PluginAPI* pluginAPI, QWidget* parent) :
m_audioFifo = new AudioFifo(4, 48000); m_audioFifo = new AudioFifo(4, 48000);
m_amDemod = new AMDemod(m_audioFifo, 0); m_amDemod = new AMDemod(m_audioFifo, 0);
m_channelizer = new Channelizer(m_amDemod); m_channelizer = new Channelizer(m_amDemod);
m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this);
DSPEngine::instance()->addAudioSink(m_audioFifo); DSPEngine::instance()->addAudioSink(m_audioFifo);
DSPEngine::instance()->addThreadedSink(m_channelizer); DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
m_channelMarker = new ChannelMarker(this); m_channelMarker = new ChannelMarker(this);
m_channelMarker->setColor(Qt::yellow); m_channelMarker->setColor(Qt::yellow);
@ -222,7 +223,8 @@ AMDemodGUI::~AMDemodGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeAudioSink(m_audioFifo); DSPEngine::instance()->removeAudioSink(m_audioFifo);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_amDemod; delete m_amDemod;
delete m_audioFifo; delete m_audioFifo;

View File

@ -8,6 +8,7 @@ class PluginAPI;
class ChannelMarker; class ChannelMarker;
class AudioFifo; class AudioFifo;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class AMDemod; class AMDemod;
@ -51,6 +52,7 @@ private:
bool m_doApplySettings; bool m_doApplySettings;
AudioFifo* m_audioFifo; AudioFifo* m_audioFifo;
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
AMDemod* m_amDemod; AMDemod* m_amDemod;

View File

@ -1,6 +1,7 @@
#include <QDockWidget> #include <QDockWidget>
#include <QMainWindow> #include <QMainWindow>
#include "ui_chanalyzergui.h" #include "ui_chanalyzergui.h"
#include "dsp/threadedsamplesink.h"
#include "dsp/channelizer.h" #include "dsp/channelizer.h"
#include "dsp/spectrumscopecombovis.h" #include "dsp/spectrumscopecombovis.h"
#include "dsp/spectrumvis.h" #include "dsp/spectrumvis.h"
@ -283,8 +284,9 @@ ChannelAnalyzerGUI::ChannelAnalyzerGUI(PluginAPI* pluginAPI, QWidget* parent) :
m_spectrumScopeComboVis = new SpectrumScopeComboVis(m_spectrumVis, m_scopeVis); m_spectrumScopeComboVis = new SpectrumScopeComboVis(m_spectrumVis, m_scopeVis);
m_channelAnalyzer = new ChannelAnalyzer(m_spectrumScopeComboVis); m_channelAnalyzer = new ChannelAnalyzer(m_spectrumScopeComboVis);
m_channelizer = new Channelizer(m_channelAnalyzer); m_channelizer = new Channelizer(m_channelAnalyzer);
m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this);
connect(m_channelizer, SIGNAL(inputSampleRateChanged()), this, SLOT(channelSampleRateChanged())); connect(m_channelizer, SIGNAL(inputSampleRateChanged()), this, SLOT(channelSampleRateChanged()));
DSPEngine::instance()->addThreadedSink(m_channelizer); DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
ui->deltaFrequency->setColorMapper(ColorMapper(ColorMapper::ReverseGold)); ui->deltaFrequency->setColorMapper(ColorMapper(ColorMapper::ReverseGold));
@ -315,7 +317,8 @@ ChannelAnalyzerGUI::ChannelAnalyzerGUI(PluginAPI* pluginAPI, QWidget* parent) :
ChannelAnalyzerGUI::~ChannelAnalyzerGUI() ChannelAnalyzerGUI::~ChannelAnalyzerGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_channelAnalyzer; delete m_channelAnalyzer;
delete m_spectrumVis; delete m_spectrumVis;

View File

@ -8,6 +8,7 @@ class PluginAPI;
class ChannelMarker; class ChannelMarker;
//class AudioFifo; //class AudioFifo;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class ChannelAnalyzer; class ChannelAnalyzer;
class SpectrumScopeComboVis; class SpectrumScopeComboVis;
@ -56,6 +57,7 @@ private:
int m_rate; int m_rate;
int m_spanLog2; int m_spanLog2;
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
ChannelAnalyzer* m_channelAnalyzer; ChannelAnalyzer* m_channelAnalyzer;
SpectrumScopeComboVis* m_spectrumScopeComboVis; SpectrumScopeComboVis* m_spectrumScopeComboVis;

View File

@ -155,7 +155,8 @@ LoRaDemodGUI::LoRaDemodGUI(PluginAPI* pluginAPI, QWidget* parent) :
m_spectrumVis = new SpectrumVis(ui->glSpectrum); m_spectrumVis = new SpectrumVis(ui->glSpectrum);
m_LoRaDemod = new LoRaDemod(m_spectrumVis); m_LoRaDemod = new LoRaDemod(m_spectrumVis);
m_channelizer = new Channelizer(m_LoRaDemod); m_channelizer = new Channelizer(m_LoRaDemod);
DSPEngine::instance()->addThreadedSink(m_channelizer); m_threadedChannelizer = new ThreadedSampleSink(m_channelizer);
DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
ui->glSpectrum->setCenterFrequency(16000); ui->glSpectrum->setCenterFrequency(16000);
ui->glSpectrum->setSampleRate(32000); ui->glSpectrum->setSampleRate(32000);
@ -180,7 +181,8 @@ LoRaDemodGUI::LoRaDemodGUI(PluginAPI* pluginAPI, QWidget* parent) :
LoRaDemodGUI::~LoRaDemodGUI() LoRaDemodGUI::~LoRaDemodGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_LoRaDemod; delete m_LoRaDemod;
delete m_spectrumVis; delete m_spectrumVis;

View File

@ -8,6 +8,7 @@
class PluginAPI; class PluginAPI;
class ChannelMarker; class ChannelMarker;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class LoRaDemod; class LoRaDemod;
class SpectrumVis; class SpectrumVis;
@ -47,6 +48,7 @@ private:
bool m_basicSettingsShown; bool m_basicSettingsShown;
bool m_doApplySettings; bool m_doApplySettings;
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
LoRaDemod* m_LoRaDemod; LoRaDemod* m_LoRaDemod;
SpectrumVis* m_spectrumVis; SpectrumVis* m_spectrumVis;

View File

@ -236,8 +236,9 @@ NFMDemodGUI::NFMDemodGUI(PluginAPI* pluginAPI, QWidget* parent) :
//ui->deltaFrequency->setBold(true); //ui->deltaFrequency->setBold(true);
m_channelizer = new Channelizer(m_nfmDemod); m_channelizer = new Channelizer(m_nfmDemod);
m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this);
DSPEngine::instance()->addAudioSink(m_audioFifo); DSPEngine::instance()->addAudioSink(m_audioFifo);
DSPEngine::instance()->addThreadedSink(m_channelizer); DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
m_channelMarker = new ChannelMarker(this); m_channelMarker = new ChannelMarker(this);
m_channelMarker->setColor(Qt::red); m_channelMarker->setColor(Qt::red);
@ -254,7 +255,8 @@ NFMDemodGUI::~NFMDemodGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeAudioSink(m_audioFifo); DSPEngine::instance()->removeAudioSink(m_audioFifo);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_nfmDemod; delete m_nfmDemod;
delete m_audioFifo; delete m_audioFifo;

View File

@ -9,6 +9,7 @@ class PluginAPI;
class ChannelMarker; class ChannelMarker;
class AudioFifo; class AudioFifo;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class NFMDemod; class NFMDemod;
@ -54,6 +55,7 @@ private:
bool m_doApplySettings; bool m_doApplySettings;
AudioFifo* m_audioFifo; AudioFifo* m_audioFifo;
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
NFMDemod* m_nfmDemod; NFMDemod* m_nfmDemod;

View File

@ -259,8 +259,9 @@ SSBDemodGUI::SSBDemodGUI(PluginAPI* pluginAPI, QWidget* parent) :
m_spectrumVis = new SpectrumVis(ui->glSpectrum); m_spectrumVis = new SpectrumVis(ui->glSpectrum);
m_ssbDemod = new SSBDemod(m_audioFifo, m_spectrumVis); m_ssbDemod = new SSBDemod(m_audioFifo, m_spectrumVis);
m_channelizer = new Channelizer(m_ssbDemod); m_channelizer = new Channelizer(m_ssbDemod);
m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this);
DSPEngine::instance()->addAudioSink(m_audioFifo); DSPEngine::instance()->addAudioSink(m_audioFifo);
DSPEngine::instance()->addThreadedSink(m_channelizer); DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
ui->deltaFrequency->setColorMapper(ColorMapper(ColorMapper::ReverseGold)); ui->deltaFrequency->setColorMapper(ColorMapper(ColorMapper::ReverseGold));
@ -289,7 +290,8 @@ SSBDemodGUI::~SSBDemodGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeAudioSink(m_audioFifo); DSPEngine::instance()->removeAudioSink(m_audioFifo);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_ssbDemod; delete m_ssbDemod;
delete m_spectrumVis; delete m_spectrumVis;

View File

@ -8,6 +8,7 @@ class PluginAPI;
class ChannelMarker; class ChannelMarker;
class AudioFifo; class AudioFifo;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class SSBDemod; class SSBDemod;
class SpectrumVis; class SpectrumVis;
@ -54,6 +55,7 @@ private:
int m_spanLog2; int m_spanLog2;
AudioFifo* m_audioFifo; AudioFifo* m_audioFifo;
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
SSBDemod* m_ssbDemod; SSBDemod* m_ssbDemod;
SpectrumVis* m_spectrumVis; SpectrumVis* m_spectrumVis;

View File

@ -1,6 +1,7 @@
#include "tcpsrcgui.h" #include "tcpsrcgui.h"
#include "plugin/pluginapi.h" #include "plugin/pluginapi.h"
#include "tcpsrc.h" #include "tcpsrc.h"
#include "dsp/threadedsamplesink.h"
#include "dsp/channelizer.h" #include "dsp/channelizer.h"
#include "dsp/spectrumvis.h" #include "dsp/spectrumvis.h"
#include "dsp/dspengine.h" #include "dsp/dspengine.h"
@ -176,7 +177,8 @@ TCPSrcGUI::TCPSrcGUI(PluginAPI* pluginAPI, QWidget* parent) :
m_spectrumVis = new SpectrumVis(ui->glSpectrum); m_spectrumVis = new SpectrumVis(ui->glSpectrum);
m_tcpSrc = new TCPSrc(m_pluginAPI->getMainWindowMessageQueue(), this, m_spectrumVis); m_tcpSrc = new TCPSrc(m_pluginAPI->getMainWindowMessageQueue(), this, m_spectrumVis);
m_channelizer = new Channelizer(m_tcpSrc); m_channelizer = new Channelizer(m_tcpSrc);
DSPEngine::instance()->addThreadedSink(m_channelizer); m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this);
DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
ui->glSpectrum->setCenterFrequency(0); ui->glSpectrum->setCenterFrequency(0);
ui->glSpectrum->setSampleRate(ui->sampleRate->text().toInt()); ui->glSpectrum->setSampleRate(ui->sampleRate->text().toInt());
@ -200,7 +202,8 @@ TCPSrcGUI::TCPSrcGUI(PluginAPI* pluginAPI, QWidget* parent) :
TCPSrcGUI::~TCPSrcGUI() TCPSrcGUI::~TCPSrcGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_tcpSrc; delete m_tcpSrc;
delete m_spectrumVis; delete m_spectrumVis;

View File

@ -8,6 +8,7 @@
class PluginAPI; class PluginAPI;
class ChannelMarker; class ChannelMarker;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class TCPSrc; class TCPSrc;
class SpectrumVis; class SpectrumVis;
@ -59,6 +60,7 @@ private:
bool m_doApplySettings; bool m_doApplySettings;
// RF path // RF path
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
SpectrumVis* m_spectrumVis; SpectrumVis* m_spectrumVis;

View File

@ -214,8 +214,9 @@ WFMDemodGUI::WFMDemodGUI(PluginAPI* pluginAPI, QWidget* parent) :
m_audioFifo = new AudioFifo(4, 250000); // TODO: check. Room for 1s FIFO at max rate m_audioFifo = new AudioFifo(4, 250000); // TODO: check. Room for 1s FIFO at max rate
m_wfmDemod = new WFMDemod(m_audioFifo, 0); m_wfmDemod = new WFMDemod(m_audioFifo, 0);
m_channelizer = new Channelizer(m_wfmDemod); m_channelizer = new Channelizer(m_wfmDemod);
m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this);
DSPEngine::instance()->addAudioSink(m_audioFifo); DSPEngine::instance()->addAudioSink(m_audioFifo);
DSPEngine::instance()->addThreadedSink(m_channelizer); DSPEngine::instance()->addThreadedSink(m_threadedChannelizer);
m_channelMarker = new ChannelMarker(this); m_channelMarker = new ChannelMarker(this);
m_channelMarker->setColor(Qt::blue); m_channelMarker->setColor(Qt::blue);
@ -232,7 +233,8 @@ WFMDemodGUI::~WFMDemodGUI()
{ {
m_pluginAPI->removeChannelInstance(this); m_pluginAPI->removeChannelInstance(this);
DSPEngine::instance()->removeAudioSink(m_audioFifo); DSPEngine::instance()->removeAudioSink(m_audioFifo);
DSPEngine::instance()->removeThreadedSink(m_channelizer); DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer);
delete m_threadedChannelizer;
delete m_channelizer; delete m_channelizer;
delete m_wfmDemod; delete m_wfmDemod;
delete m_audioFifo; delete m_audioFifo;

View File

@ -8,6 +8,7 @@ class PluginAPI;
class ChannelMarker; class ChannelMarker;
class AudioFifo; class AudioFifo;
class ThreadedSampleSink;
class Channelizer; class Channelizer;
class WFMDemod; class WFMDemod;
@ -51,6 +52,7 @@ private:
bool m_doApplySettings; bool m_doApplySettings;
AudioFifo* m_audioFifo; AudioFifo* m_audioFifo;
ThreadedSampleSink* m_threadedChannelizer;
Channelizer* m_channelizer; Channelizer* m_channelizer;
WFMDemod* m_wfmDemod; WFMDemod* m_wfmDemod;

View File

@ -2,6 +2,7 @@
#include "dsp/inthalfbandfilter.h" #include "dsp/inthalfbandfilter.h"
#include "dsp/dspcommands.h" #include "dsp/dspcommands.h"
#include <QString>
#include <QDebug> #include <QDebug>
MESSAGE_CLASS_DEFINITION(Channelizer::MsgChannelizerNotification, Message) MESSAGE_CLASS_DEFINITION(Channelizer::MsgChannelizerNotification, Message)
@ -14,7 +15,8 @@ Channelizer::Channelizer(SampleSink* sampleSink) :
m_currentOutputSampleRate(0), m_currentOutputSampleRate(0),
m_currentCenterFrequency(0) m_currentCenterFrequency(0)
{ {
setObjectName("Channelizer"); QString name = "Channelizer(" + m_sampleSink->objectName() + ")";
setObjectName(name);
} }
Channelizer::~Channelizer() Channelizer::~Channelizer()
@ -55,7 +57,7 @@ void Channelizer::start()
{ {
if(m_sampleSink != NULL) if(m_sampleSink != NULL)
{ {
qDebug() << "Channelizer::start"; qDebug() << "Channelizer::start: thread: " << thread();
m_sampleSink->start(); m_sampleSink->start();
} }
} }

View File

@ -27,8 +27,8 @@ MESSAGE_CLASS_DEFINITION(DSPGetErrorMessage, Message)
MESSAGE_CLASS_DEFINITION(DSPSetSource, Message) MESSAGE_CLASS_DEFINITION(DSPSetSource, Message)
MESSAGE_CLASS_DEFINITION(DSPAddSink, Message) MESSAGE_CLASS_DEFINITION(DSPAddSink, Message)
MESSAGE_CLASS_DEFINITION(DSPRemoveSink, Message) MESSAGE_CLASS_DEFINITION(DSPRemoveSink, Message)
MESSAGE_CLASS_DEFINITION(DSPAddThreadedSink, Message) MESSAGE_CLASS_DEFINITION(DSPAddThreadedSampleSink, Message)
MESSAGE_CLASS_DEFINITION(DSPRemoveThreadedSink, Message) MESSAGE_CLASS_DEFINITION(DSPRemoveThreadedSampleSink, Message)
MESSAGE_CLASS_DEFINITION(DSPAddAudioSink, Message) MESSAGE_CLASS_DEFINITION(DSPAddAudioSink, Message)
MESSAGE_CLASS_DEFINITION(DSPRemoveAudioSink, Message) MESSAGE_CLASS_DEFINITION(DSPRemoveAudioSink, Message)
//MESSAGE_CLASS_DEFINITION(DSPConfigureSpectrumVis, Message) //MESSAGE_CLASS_DEFINITION(DSPConfigureSpectrumVis, Message)

View File

@ -131,17 +131,17 @@ void DSPEngine::removeSink(SampleSink* sink)
m_syncMessenger.sendWait(cmd); m_syncMessenger.sendWait(cmd);
} }
void DSPEngine::addThreadedSink(SampleSink* sink) void DSPEngine::addThreadedSink(ThreadedSampleSink* sink)
{ {
qDebug() << "DSPEngine::addThreadedSink: " << sink->objectName().toStdString().c_str(); qDebug() << "DSPEngine::addThreadedSink: " << sink->objectName().toStdString().c_str();
DSPAddThreadedSink cmd(sink); DSPAddThreadedSampleSink cmd(sink);
m_syncMessenger.sendWait(cmd); m_syncMessenger.sendWait(cmd);
} }
void DSPEngine::removeThreadedSink(SampleSink* sink) void DSPEngine::removeThreadedSink(ThreadedSampleSink* sink)
{ {
qDebug() << "DSPEngine::removeThreadedSink: " << sink->objectName().toStdString().c_str(); qDebug() << "DSPEngine::removeThreadedSink: " << sink->objectName().toStdString().c_str();
DSPRemoveThreadedSink cmd(sink); DSPRemoveThreadedSampleSink cmd(sink);
m_syncMessenger.sendWait(cmd); m_syncMessenger.sendWait(cmd);
} }
@ -418,7 +418,7 @@ DSPEngine::State DSPEngine::gotoInit()
for (ThreadedSampleSinks::const_iterator it = m_threadedSampleSinks.begin(); it != m_threadedSampleSinks.end(); ++it) for (ThreadedSampleSinks::const_iterator it = m_threadedSampleSinks.begin(); it != m_threadedSampleSinks.end(); ++it)
{ {
qDebug() << " - initializing ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")"; qDebug() << " - initializing ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")";
(*it)->sendWaitSink(notif); (*it)->handleSinkMessage(notif);
} }
// pass data to listeners // pass data to listeners
@ -580,32 +580,17 @@ void DSPEngine::handleSynchronousMessages()
m_sampleSinks.remove(sink); m_sampleSinks.remove(sink);
} }
else if (DSPAddThreadedSink::match(*message)) else if (DSPAddThreadedSampleSink::match(*message))
{ {
SampleSink* sink = ((DSPAddThreadedSink*) message)->getSampleSink(); ThreadedSampleSink *threadedSink = ((DSPAddThreadedSampleSink*) message)->getThreadedSampleSink();
ThreadedSampleSink *threadedSink = new ThreadedSampleSink(sink);
m_threadedSampleSinks.push_back(threadedSink); m_threadedSampleSinks.push_back(threadedSink);
threadedSink->start(); threadedSink->start();
} }
else if (DSPRemoveThreadedSink::match(*message)) else if (DSPRemoveThreadedSampleSink::match(*message))
{ {
SampleSink* sink = ((DSPRemoveThreadedSink*) message)->getSampleSink(); ThreadedSampleSink* threadedSink = ((DSPRemoveThreadedSampleSink*) message)->getThreadedSampleSink();
ThreadedSampleSinks::iterator threadedSinkIt = m_threadedSampleSinks.begin(); threadedSink->stop();
m_threadedSampleSinks.remove(threadedSink);
for (; threadedSinkIt != m_threadedSampleSinks.end(); ++threadedSinkIt)
{
if ((*threadedSinkIt)->getSink() == sink)
{
break;
}
}
if (threadedSinkIt != m_threadedSampleSinks.end())
{
(*threadedSinkIt)->stop();
m_threadedSampleSinks.remove(*threadedSinkIt);
delete (*threadedSinkIt);
}
} }
else if (DSPAddAudioSink::match(*message)) else if (DSPAddAudioSink::match(*message))
{ {
@ -684,7 +669,7 @@ void DSPEngine::handleSourceMessages()
for (ThreadedSampleSinks::const_iterator it = m_threadedSampleSinks.begin(); it != m_threadedSampleSinks.end(); ++it) for (ThreadedSampleSinks::const_iterator it = m_threadedSampleSinks.begin(); it != m_threadedSampleSinks.end(); ++it)
{ {
qDebug() << "DSPEngine::handleSourceMessages: forward message to ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")"; qDebug() << "DSPEngine::handleSourceMessages: forward message to ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")";
(*it)->sendWaitSink(*message); (*it)->handleSinkMessage(*message);
} }
// forward changes to listeners on DSP output queue // forward changes to listeners on DSP output queue

View File

@ -1,60 +1,59 @@
#include <QThread> #include <QThread>
#include <QDebug> #include <QDebug>
#include <QApplication>
#include "dsp/threadedsamplesink.h" #include "dsp/threadedsamplesink.h"
#include "dsp/dspcommands.h" #include "dsp/dspcommands.h"
#include "util/message.h" #include "util/message.h"
ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink) : ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink, QObject *parent) :
m_sampleSink(sampleSink) m_sampleSink(sampleSink)
{ {
moveToThread(this); QString name = "ThreadedSampleSink(" + m_sampleSink->objectName() + ")";
setObjectName(name);
qDebug() << "ThreadedSampleSink::ThreadedSampleSink: " << name;
m_thread = new QThread(parent);
moveToThread(m_thread); // FIXME: the intermediate FIFO should be handled within the sink. Define a new type of sink that is compatible with threading
m_sampleSink->moveToThread(m_thread);
m_sampleFifo.moveToThread(m_thread);
connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData()));
m_sampleFifo.setSize(262144);
qDebug() << "ThreadedSampleSink::ThreadedSampleSink: thread: " << thread() << " m_thread: " << m_thread;
} }
ThreadedSampleSink::~ThreadedSampleSink() ThreadedSampleSink::~ThreadedSampleSink()
{ {
wait(); delete m_thread;
} }
void ThreadedSampleSink::start() void ThreadedSampleSink::start()
{ {
qDebug() << "ThreadedSampleSink::start"; qDebug() << "ThreadedSampleSink::start";
DSPPing cmd; m_thread->start();
QThread::start(); m_sampleSink->start();
m_syncMessenger.sendWait(cmd);
} }
void ThreadedSampleSink::stop() void ThreadedSampleSink::stop()
{ {
qDebug() << "ThreadedSampleSink::stop"; qDebug() << "ThreadedSampleSink::stop";
exit(); m_sampleSink->stop();
wait(); m_thread->exit();
} m_thread->wait();
m_sampleFifo.readCommit(m_sampleFifo.fill());
void ThreadedSampleSink::run()
{
qDebug() << "ThreadedSampleSink::run";
connect(&m_syncMessenger, SIGNAL(messageSent()), this, SLOT(handleSynchronousMessages()), Qt::QueuedConnection);
m_syncMessenger.done(); // Release start() that is waiting in calling trhead
exec();
} }
void ThreadedSampleSink::feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly) void ThreadedSampleSink::feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly)
{ {
m_sampleSink->feed(begin, end, positiveOnly); // m_sampleSink->feed(begin, end, positiveOnly);
m_sampleFifo.write(begin, end);
} }
bool ThreadedSampleSink::sendWaitSink(Message& cmd) bool ThreadedSampleSink::handleSinkMessage(Message& cmd)
{ {
m_syncMessenger.sendWait(cmd); return m_sampleSink->handleMessage(cmd);
}
void ThreadedSampleSink::handleSynchronousMessages()
{
qDebug() << "ThreadedSampleSink::handleSynchronousMessages";
Message *message = m_syncMessenger.getMessage();
qDebug() << " - message: " << message->getIdentifier();
m_sampleSink->handleMessage(*message); // just delegate to the sink
m_syncMessenger.done();
} }
QString ThreadedSampleSink::getSampleSinkObjectName() const QString ThreadedSampleSink::getSampleSinkObjectName() const
@ -63,33 +62,44 @@ QString ThreadedSampleSink::getSampleSinkObjectName() const
} }
/* void ThreadedSampleSink::handleData() // FIXME: Move it to the new threadable sink class
void ThreadedSampleSink::handleData()
{ {
bool positiveOnly = false; bool positiveOnly = false;
while((m_sampleFifo.fill() > 0) && (m_messageQueue.countPending() == 0)) { while ((m_sampleFifo.fill() > 0) && (m_sampleSink->getInputMessageQueue()->size() == 0))
{
SampleVector::iterator part1begin; SampleVector::iterator part1begin;
SampleVector::iterator part1end; SampleVector::iterator part1end;
SampleVector::iterator part2begin; SampleVector::iterator part2begin;
SampleVector::iterator part2end; SampleVector::iterator part2end;
size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end); std::size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end);
// first part of FIFO data // first part of FIFO data
if(count > 0) {
if (count > 0)
{
// handle data // handle data
if(m_sampleSink != NULL) if(m_sampleSink != NULL)
{
m_sampleSink->feed(part1begin, part1end, positiveOnly); m_sampleSink->feed(part1begin, part1end, positiveOnly);
}
m_sampleFifo.readCommit(part1end - part1begin); m_sampleFifo.readCommit(part1end - part1begin);
} }
// second part of FIFO data (used when block wraps around) // second part of FIFO data (used when block wraps around)
if(part2begin != part2end) {
if(part2begin != part2end)
{
// handle data // handle data
if(m_sampleSink != NULL) if(m_sampleSink != NULL)
{
m_sampleSink->feed(part2begin, part2end, positiveOnly); m_sampleSink->feed(part2begin, part2end, positiveOnly);
}
m_sampleFifo.readCommit(part2end - part2begin); m_sampleFifo.readCommit(part2end - part2begin);
} }
} }
}*/ }