1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2024-11-25 01:18:38 -05:00

Remote Sink: reworked threading model. Fixed sigabort at exit time

This commit is contained in:
f4exb 2021-12-23 01:47:38 +01:00
parent 7c8cb7a85a
commit d482471a59
8 changed files with 136 additions and 58 deletions

View File

@ -55,9 +55,8 @@ RemoteSink::RemoteSink(DeviceAPI *deviceAPI) :
{ {
setObjectName(m_channelId); setObjectName(m_channelId);
m_thread = new QThread(this);
m_basebandSink = new RemoteSinkBaseband(); m_basebandSink = new RemoteSinkBaseband();
m_basebandSink->moveToThread(m_thread); m_basebandSink->moveToThread(&m_thread);
applySettings(m_settings, true); applySettings(m_settings, true);
@ -74,8 +73,12 @@ RemoteSink::~RemoteSink()
delete m_networkManager; delete m_networkManager;
m_deviceAPI->removeChannelSinkAPI(this); m_deviceAPI->removeChannelSinkAPI(this);
m_deviceAPI->removeChannelSink(this); m_deviceAPI->removeChannelSink(this);
if (m_basebandSink->isRunning()) {
stop();
}
delete m_basebandSink; delete m_basebandSink;
delete m_thread;
} }
uint32_t RemoteSink::getNumberOfDeviceStreams() const uint32_t RemoteSink::getNumberOfDeviceStreams() const
@ -93,21 +96,20 @@ void RemoteSink::start()
{ {
qDebug("RemoteSink::start: m_basebandSampleRate: %d", m_basebandSampleRate); qDebug("RemoteSink::start: m_basebandSampleRate: %d", m_basebandSampleRate);
m_basebandSink->reset(); m_basebandSink->reset();
m_basebandSink->startWork();
m_thread.start();
if (m_basebandSampleRate != 0) { if (m_basebandSampleRate != 0) {
m_basebandSink->setBasebandSampleRate(m_basebandSampleRate); m_basebandSink->setBasebandSampleRate(m_basebandSampleRate);
} }
m_thread->start();
m_basebandSink->startSender();
} }
void RemoteSink::stop() void RemoteSink::stop()
{ {
qDebug("RemoteSink::stop"); qDebug("RemoteSink::stop");
m_basebandSink->stopSender(); m_basebandSink->stopWork();
m_thread->exit(); m_thread.quit();
m_thread->wait(); m_thread.wait();
} }
bool RemoteSink::handleMessage(const Message& cmd) bool RemoteSink::handleMessage(const Message& cmd)

View File

@ -26,6 +26,7 @@
#include <QObject> #include <QObject>
#include <QNetworkRequest> #include <QNetworkRequest>
#include <QThread>
#include "dsp/basebandsamplesink.h" #include "dsp/basebandsamplesink.h"
#include "channel/channelapi.h" #include "channel/channelapi.h"
@ -115,7 +116,7 @@ public:
private: private:
DeviceAPI *m_deviceAPI; DeviceAPI *m_deviceAPI;
QThread *m_thread; QThread m_thread;
RemoteSinkBaseband *m_basebandSink; RemoteSinkBaseband *m_basebandSink;
RemoteSinkSettings m_settings; RemoteSinkSettings m_settings;

View File

@ -28,18 +28,9 @@ MESSAGE_CLASS_DEFINITION(RemoteSinkBaseband::MsgConfigureRemoteSinkBaseband, Mes
RemoteSinkBaseband::RemoteSinkBaseband() : RemoteSinkBaseband::RemoteSinkBaseband() :
m_mutex(QMutex::Recursive) m_mutex(QMutex::Recursive)
{ {
qDebug("RemoteSinkBaseband::RemoteSinkBaseband");
m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000)); m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000));
m_channelizer = new DownChannelizer(&m_sink); m_channelizer = new DownChannelizer(&m_sink);
qDebug("RemoteSinkBaseband::RemoteSinkBaseband");
QObject::connect(
&m_sampleFifo,
&SampleSinkFifo::dataReady,
this,
&RemoteSinkBaseband::handleData,
Qt::QueuedConnection
);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
} }
@ -55,6 +46,35 @@ void RemoteSinkBaseband::reset()
m_sink.init(); m_sink.init();
} }
void RemoteSinkBaseband::startWork()
{
QMutexLocker mutexLocker(&m_mutex);
QObject::connect(
&m_sampleFifo,
&SampleSinkFifo::dataReady,
this,
&RemoteSinkBaseband::handleData,
Qt::QueuedConnection
);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
m_sink.start();
m_running = true;
}
void RemoteSinkBaseband::stopWork()
{
QMutexLocker mutexLocker(&m_mutex);
m_sink.stop();
disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
QObject::disconnect(
&m_sampleFifo,
&SampleSinkFifo::dataReady,
this,
&RemoteSinkBaseband::handleData
);
m_running = false;
}
void RemoteSinkBaseband::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end) void RemoteSinkBaseband::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end)
{ {
m_sampleFifo.write(begin, end); m_sampleFifo.write(begin, end);

View File

@ -62,8 +62,9 @@ public:
void reset(); void reset();
void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end); void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end);
void startSender() { m_sink.startSender(); } void startWork();
void stopSender() { m_sink.stopSender(); } void stopWork();
bool isRunning() const { return m_running; }
void setNbTxBytes(uint32_t nbTxBytes) { m_sink.setNbTxBytes(nbTxBytes); } void setNbTxBytes(uint32_t nbTxBytes) { m_sink.setNbTxBytes(nbTxBytes); }
MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; } //!< Get the queue for asynchronous inbound communication MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; } //!< Get the queue for asynchronous inbound communication
@ -71,6 +72,7 @@ public:
void setBasebandSampleRate(int sampleRate); void setBasebandSampleRate(int sampleRate);
private: private:
bool m_running;
SampleSinkFifo m_sampleFifo; SampleSinkFifo m_sampleFifo;
DownChannelizer *m_channelizer; DownChannelizer *m_channelizer;
int m_basebandSampleRate; int m_basebandSampleRate;

View File

@ -21,11 +21,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. // // along with this program. If not, see <http://www.gnu.org/licenses/>. //
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
#include <QThread>
#include <thread>
#include <chrono>
#include <QUdpSocket>
#include "cm256cc/cm256.h" #include "cm256cc/cm256.h"
@ -33,14 +29,24 @@
#include "remotesinksender.h" #include "remotesinksender.h"
RemoteSinkSender::RemoteSinkSender() : RemoteSinkSender::RemoteSinkSender() :
m_running(false),
m_fifo(20, this), m_fifo(20, this),
m_address(QHostAddress::LocalHost), m_address(QHostAddress::LocalHost),
m_socket(nullptr) m_socket(this)
{ {
qDebug("RemoteSinkSender::RemoteSinkSender"); qDebug("RemoteSinkSender::RemoteSinkSender");
m_cm256p = m_cm256.isInitialized() ? &m_cm256 : nullptr; m_cm256p = m_cm256.isInitialized() ? &m_cm256 : nullptr;
m_socket = new QUdpSocket(this); }
RemoteSinkSender::~RemoteSinkSender()
{
qDebug("RemoteSinkSender::~RemoteSinkSender");
m_socket.close();
}
bool RemoteSinkSender::startWork()
{
qDebug("RemoteSinkSender::startWork");
QObject::connect( QObject::connect(
&m_fifo, &m_fifo,
&RemoteSinkFifo::dataBlockServed, &RemoteSinkFifo::dataBlockServed,
@ -48,13 +54,31 @@ RemoteSinkSender::RemoteSinkSender() :
&RemoteSinkSender::handleData, &RemoteSinkSender::handleData,
Qt::QueuedConnection Qt::QueuedConnection
); );
connect(thread(), SIGNAL(started()), this, SLOT(started()));
connect(thread(), SIGNAL(finished()), this, SLOT(finished()));
m_running = true;
return m_running;
} }
RemoteSinkSender::~RemoteSinkSender() void RemoteSinkSender::started()
{ {
qDebug("RemoteSinkSender::~RemoteSinkSender"); disconnect(thread(), SIGNAL(started()), this, SLOT(started()));
m_socket->close(); }
m_socket->deleteLater();
void RemoteSinkSender::stopWork()
{
qDebug("RemoteSinkSender::stopWork");
}
void RemoteSinkSender::finished()
{
// Close any existing connection
if (m_socket.isOpen()) {
m_socket.close();
}
m_running = false;
disconnect(thread(), SIGNAL(finished()), this, SLOT(finished()));
} }
RemoteDataFrame *RemoteSinkSender::getDataFrame() RemoteDataFrame *RemoteSinkSender::getDataFrame()
@ -91,11 +115,8 @@ void RemoteSinkSender::sendDataFrame(RemoteDataFrame *dataFrame)
if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode
{ {
if (m_socket) for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send block via UDP
{ m_socket.writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort);
for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send block via UDP
m_socket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort);
}
} }
} }
else else
@ -133,11 +154,8 @@ void RemoteSinkSender::sendDataFrame(RemoteDataFrame *dataFrame)
} }
// Transmit all blocks // Transmit all blocks
if (m_socket) for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send block via UDP
{ m_socket.writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort);
for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send block via UDP
m_socket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort);
}
} }
} }

View File

@ -28,6 +28,7 @@
#include <QMutex> #include <QMutex>
#include <QWaitCondition> #include <QWaitCondition>
#include <QHostAddress> #include <QHostAddress>
#include <QUdpSocket>
#include "cm256cc/cm256.h" #include "cm256cc/cm256.h"
@ -47,19 +48,24 @@ public:
RemoteSinkSender(); RemoteSinkSender();
~RemoteSinkSender(); ~RemoteSinkSender();
bool startWork();
void stopWork();
RemoteDataFrame *getDataFrame(); RemoteDataFrame *getDataFrame();
private: private:
volatile bool m_running;
RemoteSinkFifo m_fifo; RemoteSinkFifo m_fifo;
CM256 m_cm256; CM256 m_cm256;
CM256 *m_cm256p; CM256 *m_cm256p;
QHostAddress m_address; QHostAddress m_address;
QUdpSocket *m_socket; QUdpSocket m_socket;
void sendDataFrame(RemoteDataFrame *dataFrame); void sendDataFrame(RemoteDataFrame *dataFrame);
private slots: private slots:
void started();
void finished();
void handleData(); void handleData();
}; };

View File

@ -28,6 +28,8 @@
#include "remotesinksink.h" #include "remotesinksink.h"
RemoteSinkSink::RemoteSinkSink() : RemoteSinkSink::RemoteSinkSink() :
m_running(false),
m_remoteSinkSender(nullptr),
m_txBlockIndex(0), m_txBlockIndex(0),
m_frameCount(0), m_frameCount(0),
m_sampleIndex(0), m_sampleIndex(0),
@ -41,32 +43,56 @@ RemoteSinkSink::RemoteSinkSink() :
m_dataPort(9090) m_dataPort(9090)
{ {
qDebug("RemoteSinkSink::RemoteSinkSink"); qDebug("RemoteSinkSink::RemoteSinkSink");
m_senderThread = new QThread(this);
m_remoteSinkSender = new RemoteSinkSender();
m_remoteSinkSender->moveToThread(m_senderThread);
applySettings(m_settings, true); applySettings(m_settings, true);
} }
RemoteSinkSink::~RemoteSinkSink() RemoteSinkSink::~RemoteSinkSink()
{ {
qDebug("RemoteSinkSink::~RemoteSinkSink"); qDebug("RemoteSinkSink::~RemoteSinkSink");
delete m_remoteSinkSender; stop();
delete m_senderThread; }
void RemoteSinkSink::start()
{
qDebug("RemoteSinkSink::start");
if (m_running) {
stop();
}
m_remoteSinkSender = new RemoteSinkSender();
m_remoteSinkSender->moveToThread(&m_senderThread);
startSender();
m_running = true;
}
void RemoteSinkSink::stop()
{
qDebug("RemoteSinkSink::stop");
if (m_remoteSinkSender)
{
stopSender();
m_remoteSinkSender->deleteLater();
m_remoteSinkSender = nullptr;
}
m_running = false;
} }
void RemoteSinkSink::startSender() void RemoteSinkSink::startSender()
{ {
qDebug("RemoteSinkSink::startSender"); qDebug("RemoteSinkSink::startSender");
m_senderThread->start(); m_remoteSinkSender->startWork();
m_senderThread.start();
} }
void RemoteSinkSink::stopSender() void RemoteSinkSink::stopSender()
{ {
qDebug("RemoteSinkSink::stopSender"); qDebug("RemoteSinkSink::stopSender");
m_senderThread->exit(); m_remoteSinkSender->stopWork();
m_senderThread->wait(); m_senderThread.quit();
m_senderThread.wait();
} }
void RemoteSinkSink::init() void RemoteSinkSink::init()

View File

@ -19,6 +19,7 @@
#define INCLUDE_REMOTESINKSINK_H_ #define INCLUDE_REMOTESINKSINK_H_
#include <QObject> #include <QObject>
#include <QThread>
#include "dsp/channelsamplesink.h" #include "dsp/channelsamplesink.h"
#include "channel/remotedatablock.h" #include "channel/remotedatablock.h"
@ -28,7 +29,6 @@
class DeviceSampleSource; class DeviceSampleSource;
class RemoteSinkSender; class RemoteSinkSender;
class QThread;
class RemoteSinkSink : public QObject, public ChannelSampleSink { class RemoteSinkSink : public QObject, public ChannelSampleSink {
Q_OBJECT Q_OBJECT
@ -38,8 +38,8 @@ public:
virtual void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end); virtual void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end);
void startSender(); void start();
void stopSender(); void stop();
void init(); void init();
void setNbTxBytes(uint32_t nbTxBytes) { m_nbTxBytes = nbTxBytes; } void setNbTxBytes(uint32_t nbTxBytes) { m_nbTxBytes = nbTxBytes; }
@ -49,7 +49,8 @@ public:
private: private:
RemoteSinkSettings m_settings; RemoteSinkSettings m_settings;
QThread *m_senderThread; bool m_running;
QThread m_senderThread;
RemoteSinkSender *m_remoteSinkSender; RemoteSinkSender *m_remoteSinkSender;
int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row
@ -67,6 +68,8 @@ private:
QString m_dataAddress; QString m_dataAddress;
uint16_t m_dataPort; uint16_t m_dataPort;
void startSender();
void stopSender();
void setNbBlocksFEC(int nbBlocksFEC); void setNbBlocksFEC(int nbBlocksFEC);
uint32_t getNbSampleBits(); uint32_t getNbSampleBits();