1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2024-11-15 12:51:49 -05:00

Data pipes redesign (2)

This commit is contained in:
f4exb 2022-02-21 02:18:51 +01:00
parent c678b40988
commit adfaac1545
6 changed files with 40 additions and 12 deletions

View File

@ -348,6 +348,7 @@ void DemodAnalyzer::setChannel(ChannelAPI *selectedChannel)
} }
m_dataPipe = mainCore->getDataPipes().registerProducerToConsumer(selectedChannel, this, "demod"); m_dataPipe = mainCore->getDataPipes().registerProducerToConsumer(selectedChannel, this, "demod");
connect(m_dataPipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleDataPipeToBeDeleted(int, QObject*)));
DataFifo *fifo = qobject_cast<DataFifo*>(m_dataPipe->m_element); DataFifo *fifo = qobject_cast<DataFifo*>(m_dataPipe->m_element);
if (fifo) if (fifo)
@ -601,3 +602,22 @@ void DemodAnalyzer::handleChannelMessageQueue(MessageQueue* messageQueue)
} }
} }
} }
void DemodAnalyzer::handleDataPipeToBeDeleted(int reason, QObject *object)
{
qDebug("DemodAnalyzer::handleDataPipeToBeDeleted: %d %p", reason, object);
if ((reason == 0) && (m_selectedChannel == object))
{
DataFifo *fifo = qobject_cast<DataFifo*>(m_dataPipe->m_element);
if (fifo && m_worker->isRunning())
{
DemodAnalyzerWorker::MsgConnectFifo *msg = DemodAnalyzerWorker::MsgConnectFifo::create(fifo, false);
m_worker->getInputMessageQueue()->push(msg);
}
updateChannels();
m_selectedChannel = nullptr;
}
}

View File

@ -217,6 +217,7 @@ private:
private slots: private slots:
void networkManagerFinished(QNetworkReply *reply); void networkManagerFinished(QNetworkReply *reply);
void handleChannelMessageQueue(MessageQueue *messageQueues); void handleChannelMessageQueue(MessageQueue *messageQueues);
void handleDataPipeToBeDeleted(int reason, QObject *object);
}; };
#endif // INCLUDE_FEATURE_DEMODANALYZER_H_ #endif // INCLUDE_FEATURE_DEMODANALYZER_H_

View File

@ -156,11 +156,11 @@ bool DemodAnalyzerWorker::handleMessage(const Message& cmd)
} }
else if (MsgConnectFifo::match(cmd)) else if (MsgConnectFifo::match(cmd))
{ {
qDebug("DemodAnalyzerWorker::handleMessage: MsgConnectFifo");
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
MsgConnectFifo& msg = (MsgConnectFifo&) cmd; MsgConnectFifo& msg = (MsgConnectFifo&) cmd;
m_dataFifo = msg.getFifo(); m_dataFifo = msg.getFifo();
bool doConnect = msg.getConnect(); bool doConnect = msg.getConnect();
qDebug("DemodAnalyzerWorker::handleMessage: MsgConnectFifo: %s", (doConnect ? "connect" : "disconnect"));
if (doConnect) { if (doConnect) {
QObject::connect( QObject::connect(
@ -170,7 +170,9 @@ bool DemodAnalyzerWorker::handleMessage(const Message& cmd)
&DemodAnalyzerWorker::handleData, &DemodAnalyzerWorker::handleData,
Qt::QueuedConnection Qt::QueuedConnection
); );
} else { }
else
{
QObject::disconnect( QObject::disconnect(
m_dataFifo, m_dataFifo,
&DataFifo::dataReady, &DataFifo::dataReady,

View File

@ -26,10 +26,10 @@ ObjectPipe::ObjectPipe() :
m_gcCount(0) m_gcCount(0)
{} {}
void ObjectPipe::setToBeDeleted(int reason) void ObjectPipe::setToBeDeleted(int reason, QObject *object)
{ {
m_gcCount = 2; // will defer actual deletion by one GC pass m_gcCount = 2; // will defer actual deletion by one GC pass
emit toBeDeleted(reason); emit toBeDeleted(reason, object);
} }
int ObjectPipe::getGCCount() const { int ObjectPipe::getGCCount() const {

View File

@ -30,7 +30,7 @@ public:
ObjectPipe(const ObjectPipe&) = default; ObjectPipe(const ObjectPipe&) = default;
ObjectPipe& operator=(const ObjectPipe&) = default; ObjectPipe& operator=(const ObjectPipe&) = default;
void setToBeDeleted(int reason); void setToBeDeleted(int reason, QObject *object);
int getGCCount() const; int getGCCount() const;
int decreaseGCCount(); int decreaseGCCount();
@ -41,7 +41,7 @@ public:
QObject *m_element; QObject *m_element;
signals: signals:
void toBeDeleted(int reason); void toBeDeleted(int reason, QObject *object);
private: private:
int m_gcCount; int m_gcCount;

View File

@ -29,6 +29,7 @@ ObjectPipesRegistrations::~ObjectPipesRegistrations()
ObjectPipe *ObjectPipesRegistrations::registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) ObjectPipe *ObjectPipesRegistrations::registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type)
{ {
qDebug("ObjectPipesRegistrations::registerProducerToConsumer: %p %p %s", producer, consumer, qPrintable("type"));
int typeId; int typeId;
QMutexLocker mlock(&m_mutex); QMutexLocker mlock(&m_mutex);
@ -72,6 +73,7 @@ ObjectPipe *ObjectPipesRegistrations::registerProducerToConsumer(const QObject *
ObjectPipe *ObjectPipesRegistrations::unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) ObjectPipe *ObjectPipesRegistrations::unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type)
{ {
qDebug("ObjectPipesRegistrations::unregisterProducerToConsumer: %p %p %s", producer, consumer, qPrintable("type"));
ObjectPipe *pipe = nullptr; ObjectPipe *pipe = nullptr;
if (m_typeIds.contains(type)) if (m_typeIds.contains(type))
@ -102,7 +104,7 @@ ObjectPipe *ObjectPipesRegistrations::unregisterProducerToConsumer(const QObject
m_producerAndTypeIdPipes.remove(std::make_tuple(producer, typeId)); m_producerAndTypeIdPipes.remove(std::make_tuple(producer, typeId));
} }
pipe->setToBeDeleted(PipeDeletionReason::PipeDeleted); pipe->setToBeDeleted(PipeDeletionReason::PipeDeleted, pipe);
} }
} }
@ -123,7 +125,6 @@ void ObjectPipesRegistrations::getPipes(const QObject *producer, const QString&
void ObjectPipesRegistrations::processGC() void ObjectPipesRegistrations::processGC()
{ {
qDebug("ObjectPipesRegistrations::processGC");
QMutexLocker mlock(&m_mutex); QMutexLocker mlock(&m_mutex);
typename QList<ObjectPipe*>::iterator itPipe = m_pipes.begin(); typename QList<ObjectPipe*>::iterator itPipe = m_pipes.begin();
@ -143,12 +144,16 @@ void ObjectPipesRegistrations::processGC()
++itPipe; ++itPipe;
} }
} }
else
{
++itPipe;
}
} }
} }
void ObjectPipesRegistrations::removeProducer(QObject *producer) void ObjectPipesRegistrations::removeProducer(QObject *producer)
{ {
qDebug("ObjectPipesRegistrations::removeProducer"); qDebug("ObjectPipesRegistrations::removeProducer: %p", producer);
QMutexLocker mlock(&m_mutex); QMutexLocker mlock(&m_mutex);
if (m_producerPipes.contains(producer) && (m_producerPipes[producer].size() != 0)) if (m_producerPipes.contains(producer) && (m_producerPipes[producer].size() != 0))
@ -165,7 +170,7 @@ void ObjectPipesRegistrations::removeProducer(QObject *producer)
m_typeIdPipes[typeId].removeAll(pipe); m_typeIdPipes[typeId].removeAll(pipe);
} }
pipe->setToBeDeleted(PipeDeletionReason::PipeProducerDeleted); pipe->setToBeDeleted(PipeDeletionReason::PipeProducerDeleted, producer);
} }
m_producerPipes.remove(producer); m_producerPipes.remove(producer);
@ -196,7 +201,7 @@ void ObjectPipesRegistrations::removeProducer(QObject *producer)
void ObjectPipesRegistrations::removeConsumer(QObject *consumer) void ObjectPipesRegistrations::removeConsumer(QObject *consumer)
{ {
qDebug("ObjectPipesRegistrations::removeConsumer"); qDebug("ObjectPipesRegistrations::removeConsumer: %p", consumer);
QMutexLocker mlock(&m_mutex); QMutexLocker mlock(&m_mutex);
if (m_consumerPipes.contains(consumer) && (m_consumerPipes[consumer].size() != 0)) if (m_consumerPipes.contains(consumer) && (m_consumerPipes[consumer].size() != 0))
@ -217,7 +222,7 @@ void ObjectPipesRegistrations::removeConsumer(QObject *consumer)
m_producerAndTypeIdPipes[producerAndTypeId].removeAll(pipe); m_producerAndTypeIdPipes[producerAndTypeId].removeAll(pipe);
} }
pipe->setToBeDeleted(PipeDeletionReason::PipeConsumerDeleted); pipe->setToBeDeleted(PipeDeletionReason::PipeConsumerDeleted, consumer);
} }
m_consumerPipes.remove(consumer); m_consumerPipes.remove(consumer);