diff --git a/plugins/feature/afc/afc.cpp b/plugins/feature/afc/afc.cpp index 395549167..7e3fbaf05 100644 --- a/plugins/feature/afc/afc.cpp +++ b/plugins/feature/afc/afc.cpp @@ -135,7 +135,7 @@ bool AFC::handleMessage(const Message& cmd) qDebug() << "AFC::handleMessage: MessagePipesCommon::MsgReportChannelDeleted"; MessagePipesCommon::MsgReportChannelDeleted& report = (MessagePipesCommon::MsgReportChannelDeleted&) cmd; const MessagePipesCommon::ChannelRegistrationKey& channelKey = report.getChannelRegistrationKey(); - const ChannelAPI *channel = channelKey.m_channel; + const ChannelAPI *channel = channelKey.m_key; MainCore::instance()->getMessagePipes().unregisterChannelToFeature(channel, this, "settings"); return true; diff --git a/plugins/feature/vorlocalizer/vorlocalizer.cpp b/plugins/feature/vorlocalizer/vorlocalizer.cpp index 90ce38fcc..5ba65fe99 100644 --- a/plugins/feature/vorlocalizer/vorlocalizer.cpp +++ b/plugins/feature/vorlocalizer/vorlocalizer.cpp @@ -244,7 +244,7 @@ bool VORLocalizer::handleMessage(const Message& cmd) qDebug() << "VORLocalizer::handleMessage: MsgReportChannelDeleted"; MessagePipesCommon::MsgReportChannelDeleted& report = (MessagePipesCommon::MsgReportChannelDeleted&) cmd; const MessagePipesCommon::ChannelRegistrationKey& channelKey = report.getChannelRegistrationKey(); - const ChannelAPI *channel = channelKey.m_channel; + const ChannelAPI *channel = channelKey.m_key; m_availableChannels.remove(const_cast(channel)); updateChannels(); MainCore::instance()->getMessagePipes().unregisterChannelToFeature(channel, this, "report"); diff --git a/sdrbase/CMakeLists.txt b/sdrbase/CMakeLists.txt index 82465b34f..1b163b135 100644 --- a/sdrbase/CMakeLists.txt +++ b/sdrbase/CMakeLists.txt @@ -99,6 +99,7 @@ set(sdrbase_SOURCES dsp/channelsamplesource.cpp dsp/cwkeyer.cpp dsp/cwkeyersettings.cpp + dsp/datafifo.cpp dsp/decimatorsff.cpp dsp/decimatorsfi.cpp dsp/decimatorc.cpp @@ -160,6 +161,9 @@ set(sdrbase_SOURCES limerfe/limerfeusbcalib.cpp + pipes/datapipes.cpp + pipes/datapipescommon.cpp + pipes/datapipesgcworker.cpp pipes/messagepipes.cpp pipes/messagepipescommon.cpp pipes/messagepipesgcworker.cpp @@ -248,6 +252,7 @@ set(sdrbase_HEADERS dsp/ctcssfrequencies.h dsp/cwkeyer.h dsp/cwkeyersettings.h + dsp/datafifo.h dsp/decimators.h dsp/decimatorsif.h dsp/decimatorsff.h @@ -336,6 +341,11 @@ set(sdrbase_HEADERS limerfe/limerfeusbcalib.h + pipes/datapipes.h + pipes/datapipescommon.h + pipes/datapipesgcworker.h + pipes/elementpipescommon.h + pipes/elementpipesgc.h pipes/messagepipes.h pipes/messagepipescommon.h pipes/messagepipesgcworker.h diff --git a/sdrbase/channel/channelapi.h b/sdrbase/channel/channelapi.h index c9bcf4862..6be93b38a 100644 --- a/sdrbase/channel/channelapi.h +++ b/sdrbase/channel/channelapi.h @@ -29,7 +29,6 @@ #include "export.h" class DeviceAPI; -class Feature; namespace SWGSDRangel { diff --git a/sdrbase/dsp/datafifo.cpp b/sdrbase/dsp/datafifo.cpp new file mode 100644 index 000000000..50cd2ab30 --- /dev/null +++ b/sdrbase/dsp/datafifo.cpp @@ -0,0 +1,287 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 F4EXB // +// written by Edouard Griffiths // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "datafifo.h" + +void DataFifo::create(unsigned int s) +{ + m_size = 0; + m_fill = 0; + m_head = 0; + m_tail = 0; + + m_data.resize(s); + m_size = m_data.size(); +} + +void DataFifo::reset() +{ + m_suppressed = -1; + m_fill = 0; + m_head = 0; + m_tail = 0; +} + +DataFifo::DataFifo(QObject* parent) : + QObject(parent), + m_data() +{ + m_suppressed = -1; + m_size = 0; + m_fill = 0; + m_head = 0; + m_tail = 0; +} + +DataFifo::DataFifo(int size, QObject* parent) : + QObject(parent), + m_data() +{ + m_suppressed = -1; + create(size); +} + +DataFifo::DataFifo(const DataFifo& other) : + QObject(other.parent()), + m_data(other.m_data) +{ + m_suppressed = -1; + m_size = m_data.size(); + m_fill = 0; + m_head = 0; + m_tail = 0; +} + +DataFifo::~DataFifo() +{ + QMutexLocker mutexLocker(&m_mutex); + m_size = 0; +} + +bool DataFifo::setSize(int size) +{ + create(size); + + return m_data.size() == (unsigned int)size; +} + +unsigned int DataFifo::write(const quint8* data, unsigned int count) +{ + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + const quint8* begin = (const quint8*) data; + //count /= sizeof(Sample); + + total = std::min(count, m_size - m_fill); + + if (total < count) + { + if (m_suppressed < 0) + { + m_suppressed = 0; + m_msgRateTimer.start(); + qCritical("DataFifo::write: overflow - dropping %u samples", count - total); + } + else + { + if (m_msgRateTimer.elapsed() > 2500) + { + qCritical("DataFifo::write: %u messages dropped", m_suppressed); + qCritical("DataFifo::write: overflow - dropping %u samples", count - total); + m_suppressed = -1; + } + else + { + m_suppressed++; + } + } + } + + remaining = total; + + while (remaining > 0) + { + len = std::min(remaining, m_size - m_tail); + std::copy(begin, begin + len, m_data.begin() + m_tail); + m_tail += len; + m_tail %= m_size; + m_fill += len; + begin += len; + remaining -= len; + } + + if (m_fill > 0) { + emit dataReady(); + } + + return total; +} + +unsigned int DataFifo::write(QByteArray::const_iterator begin, QByteArray::const_iterator end) +{ + QMutexLocker mutexLocker(&m_mutex); + unsigned int count = end - begin; + unsigned int total; + unsigned int remaining; + unsigned int len; + + total = std::min(count, m_size - m_fill); + + if (total < count) + { + if (m_suppressed < 0) + { + m_suppressed = 0; + m_msgRateTimer.start(); + qCritical("DataFifo::write: overflow - dropping %u samples", count - total); + } + else + { + if (m_msgRateTimer.elapsed() > 2500) + { + qCritical("DataFifo::write: %u messages dropped", m_suppressed); + qCritical("DataFifo::write: overflow - dropping %u samples", count - total); + m_suppressed = -1; + } + else + { + m_suppressed++; + } + } + } + + remaining = total; + + while (remaining > 0) + { + len = std::min(remaining, m_size - m_tail); + std::copy(begin, begin + len, m_data.begin() + m_tail); + m_tail += len; + m_tail %= m_size; + m_fill += len; + begin += len; + remaining -= len; + } + + if (m_fill > 0) { + emit dataReady(); + } + + return total; +} + +unsigned int DataFifo::read(QByteArray::iterator begin, QByteArray::iterator end) +{ + QMutexLocker mutexLocker(&m_mutex); + unsigned int count = end - begin; + unsigned int total; + unsigned int remaining; + unsigned int len; + + total = std::min(count, m_fill); + + if (total < count) { + qCritical("DataFifo::read: underflow - missing %u samples", count - total); + } + + remaining = total; + + while (remaining > 0) + { + len = std::min(remaining, m_size - m_head); + std::copy(m_data.begin() + m_head, m_data.begin() + m_head + len, begin); + m_head += len; + m_head %= m_size; + m_fill -= len; + begin += len; + remaining -= len; + } + + return total; +} + +unsigned int DataFifo::readBegin(unsigned int count, + QByteArray::iterator* part1Begin, QByteArray::iterator* part1End, + QByteArray::iterator* part2Begin, QByteArray::iterator* part2End) +{ + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + unsigned int head = m_head; + + total = std::min(count, m_fill); + + if (total < count) { + qCritical("DataFifo::readBegin: underflow - missing %u samples", count - total); + } + + remaining = total; + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + *part1Begin = m_data.begin() + head; + *part1End = m_data.begin() + head + len; + head += len; + head %= m_size; + remaining -= len; + } + else + { + *part1Begin = m_data.end(); + *part1End = m_data.end(); + } + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + *part2Begin = m_data.begin() + head; + *part2End = m_data.begin() + head + len; + } + else + { + *part2Begin = m_data.end(); + *part2End = m_data.end(); + } + + return total; +} + +unsigned int DataFifo::readCommit(unsigned int count) +{ + QMutexLocker mutexLocker(&m_mutex); + + if (count > m_fill) + { + qCritical("DataFifo::readCommit: cannot commit more than available samples"); + count = m_fill; + } + + m_head = (m_head + count) % m_size; + m_fill -= count; + + return count; +} + +unsigned int DataFifo::getSizePolicy(unsigned int sampleRate) +{ + return (sampleRate/100)*64; // .64s +} diff --git a/sdrbase/dsp/datafifo.h b/sdrbase/dsp/datafifo.h new file mode 100644 index 000000000..4adf4c3c5 --- /dev/null +++ b/sdrbase/dsp/datafifo.h @@ -0,0 +1,73 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 F4EXB // +// written by Edouard Griffiths // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef INCLUDE_DATAFIFO_H +#define INCLUDE_DATAFIFO_H + +#include +#include +#include +#include + +#include "dsp/dsptypes.h" +#include "export.h" + +class SDRBASE_API DataFifo : public QObject { + Q_OBJECT + +private: + QMutex m_mutex; + QElapsedTimer m_msgRateTimer; + int m_suppressed; + + QByteArray m_data; + + unsigned int m_size; + unsigned int m_fill; + unsigned int m_head; + unsigned int m_tail; + + void create(unsigned int s); + +public: + DataFifo(QObject* parent = nullptr); + DataFifo(int size, QObject* parent = nullptr); + DataFifo(const DataFifo& other); + ~DataFifo(); + + bool setSize(int size); + void reset(); + inline unsigned int size() const { return m_size; } + inline unsigned int fill() { QMutexLocker mutexLocker(&m_mutex); unsigned int fill = m_fill; return fill; } + + unsigned int write(const quint8* data, unsigned int count); + unsigned int write(QByteArray::const_iterator begin, QByteArray::const_iterator end); + + unsigned int read(QByteArray::iterator begin, QByteArray::iterator end); + + unsigned int readBegin(unsigned int count, + QByteArray::iterator* part1Begin, QByteArray::iterator* part1End, + QByteArray::iterator* part2Begin, QByteArray::iterator* part2End); + unsigned int readCommit(unsigned int count); + static unsigned int getSizePolicy(unsigned int sampleRate); + +signals: + void dataReady(); +}; + +#endif // INCLUDE_DATAFIFO_H diff --git a/sdrbase/pipes/datapipes.cpp b/sdrbase/pipes/datapipes.cpp new file mode 100644 index 000000000..e9f6ac0ac --- /dev/null +++ b/sdrbase/pipes/datapipes.cpp @@ -0,0 +1,71 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "dsp/datafifo.h" + +#include "datapipesgcworker.h" +#include "datapipes.h" + +DataPipes::DataPipes() +{ + m_gcWorker = new DataPipesGCWorker(); + m_gcWorker->setC2FRegistrations( + m_registrations.getMutex(), + m_registrations.getElements(), + m_registrations.getConsumers() + ); + m_gcWorker->moveToThread(&m_gcThread); + startGC(); +} + +DataPipes::~DataPipes() +{ + if (m_gcWorker->isRunning()) { + stopGC(); + } +} + +DataFifo *DataPipes::registerChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) +{ + return m_registrations.registerProducerToConsumer(source, feature, type); +} + +void DataPipes::unregisterChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) +{ + m_registrations.unregisterProducerToConsumer(source, feature, type); +} + +QList* DataPipes::getFifos(const ChannelAPI *source, const QString& type) +{ + return m_registrations.getElements(source, type); +} + +void DataPipes::startGC() +{ + qDebug("DataPipes::startGC"); + + m_gcWorker->startWork(); + m_gcThread.start(); +} + +void DataPipes::stopGC() +{ + qDebug("DataPipes::stopGC"); + m_gcWorker->stopWork(); + m_gcThread.quit(); + m_gcThread.wait(); +} diff --git a/sdrbase/pipes/datapipes.h b/sdrbase/pipes/datapipes.h new file mode 100644 index 000000000..30941b22b --- /dev/null +++ b/sdrbase/pipes/datapipes.h @@ -0,0 +1,59 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_DATAPIPES_H_ +#define SDRBASE_PIPES_DATAPIPES_H_ + +#include +#include +#include +#include +#include + +#include "export.h" + +#include "datapipescommon.h" +#include "elementpipesregistrations.h" + +class ChannelAPI; +class Feature; +class DataPipesGCWorker; +class DataFifo; + +class SDRBASE_API DataPipes : public QObject +{ + Q_OBJECT +public: + DataPipes(); + DataPipes(const DataPipes&) = delete; + DataPipes& operator=(const DataPipes&) = delete; + ~DataPipes(); + + DataFifo *registerChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type); + void unregisterChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type); + QList* getFifos(const ChannelAPI *source, const QString& type); + +private: + ElementPipesRegistrations m_registrations; + QThread m_gcThread; //!< Garbage collector thread + DataPipesGCWorker *m_gcWorker; //!< Garbage collector + + void startGC(); //!< Start garbage collector + void stopGC(); //!< Stop garbage collector +}; + +#endif // SDRBASE_PIPES_DATAPIPES_H_ diff --git a/sdrbase/pipes/datapipescommon.cpp b/sdrbase/pipes/datapipescommon.cpp new file mode 100644 index 000000000..28f8fd0a6 --- /dev/null +++ b/sdrbase/pipes/datapipescommon.cpp @@ -0,0 +1,20 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "datapipescommon.h" + +MESSAGE_CLASS_DEFINITION(DataPipesCommon::MsgReportChannelDeleted, Message) diff --git a/sdrbase/pipes/datapipescommon.h b/sdrbase/pipes/datapipescommon.h new file mode 100644 index 000000000..e6682067a --- /dev/null +++ b/sdrbase/pipes/datapipescommon.h @@ -0,0 +1,63 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_DATAPIPESCOMON_H_ +#define SDRBASE_PIPES_DATAPIPESCOMON_H_ + +#include +#include +#include + +#include "export.h" +#include "util/message.h" + +#include "elementpipescommon.h" + +class ChannelAPI; +class Feature; +class DataFifo; + +class SDRBASE_API DataPipesCommon +{ +public: + typedef ElementPipesCommon::RegistrationKey ChannelRegistrationKey; + + /** Send this message to stakeholders when the garbage collector finds that a channel was deleted */ + class SDRBASE_API MsgReportChannelDeleted : public Message { + MESSAGE_CLASS_DECLARATION + + public: + const DataFifo *getFifo() const { return m_fifo; } + const ChannelRegistrationKey& getChannelRegistrationKey() const { return m_channelRegistrationKey; } + + static MsgReportChannelDeleted* create(const DataFifo *fifo, const ChannelRegistrationKey& channelRegistrationKey) { + return new MsgReportChannelDeleted(fifo, channelRegistrationKey); + } + + private: + const DataFifo *m_fifo; + ChannelRegistrationKey m_channelRegistrationKey; + + MsgReportChannelDeleted(const DataFifo *fifo, const ChannelRegistrationKey& channelRegistrationKey) : + Message(), + m_fifo(fifo), + m_channelRegistrationKey(channelRegistrationKey) + { } + }; +}; + +#endif // SDRBASE_PIPES_DATAPIPESCOMON_H_ diff --git a/sdrbase/pipes/datapipesgcworker.cpp b/sdrbase/pipes/datapipesgcworker.cpp new file mode 100644 index 000000000..28abc6b1e --- /dev/null +++ b/sdrbase/pipes/datapipesgcworker.cpp @@ -0,0 +1,66 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "feature/feature.h" +#include "dsp/datafifo.h" +#include "maincore.h" +#include "datapipescommon.h" +#include "datapipesgcworker.h" + +bool DataPipesGCWorker::DataPipesGC::existsProducer(const ChannelAPI *channel) +{ + return MainCore::instance()->existsChannel(channel); +} + +bool DataPipesGCWorker::DataPipesGC::existsConsumer(const Feature *feature) +{ + return MainCore::instance()->existsFeature(feature); +} + +void DataPipesGCWorker::DataPipesGC::sendMessageToConsumer(const DataFifo *fifo, DataPipesCommon::ChannelRegistrationKey channelKey, Feature *feature) +{ + DataPipesCommon::MsgReportChannelDeleted *msg = DataPipesCommon::MsgReportChannelDeleted::create( + fifo, channelKey); + feature->getInputMessageQueue()->push(msg); +} + +DataPipesGCWorker::DataPipesGCWorker() : + m_running(false) +{} + +DataPipesGCWorker::~DataPipesGCWorker() +{} + +void DataPipesGCWorker::startWork() +{ + connect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); + m_gcTimer.start(10000); // collect garbage every 10s + m_running = true; +} + +void DataPipesGCWorker::stopWork() +{ + m_running = false; + m_gcTimer.stop(); + disconnect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); +} + +void DataPipesGCWorker::processGC() +{ + // qDebug("MessagePipesGCWorker::processGC"); + m_dataPipesGC.processGC(); +} diff --git a/sdrbase/pipes/datapipesgcworker.h b/sdrbase/pipes/datapipesgcworker.h new file mode 100644 index 000000000..714aeae72 --- /dev/null +++ b/sdrbase/pipes/datapipesgcworker.h @@ -0,0 +1,69 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_DATAPIPESGCWORKER_H_ +#define SDRBASE_PIPES_DATAPIPESGCWORKER_H_ + +#include +#include + +#include "export.h" + +#include "elementpipesgc.h" +#include "datapipescommon.h" + +class QMutex; +class DataFifo; + +class SDRBASE_API DataPipesGCWorker : public QObject +{ + Q_OBJECT +public: + DataPipesGCWorker(); + ~DataPipesGCWorker(); + + void setC2FRegistrations( + QMutex *c2fMutex, + QMap> *c2fFifos, + QMap> *c2fFeatures + ) + { + m_dataPipesGC.setRegistrations(c2fMutex, c2fFifos, c2fFeatures); + } + + void startWork(); + void stopWork(); + bool isRunning() const { return m_running; } + +private: + class DataPipesGC : public ElementPipesGC + { + private: + virtual bool existsProducer(const ChannelAPI *channelAPI); + virtual bool existsConsumer(const Feature *feature); + virtual void sendMessageToConsumer(const DataFifo *fifo, DataPipesCommon::ChannelRegistrationKey key, Feature *feature); + }; + + DataPipesGC m_dataPipesGC; + bool m_running; + QTimer m_gcTimer; + +private slots: + void processGC(); //!< Collect garbage +}; + +#endif // SDRBASE_PIPES_DATAPIPESGCWORKER_H_ diff --git a/sdrbase/pipes/elementpipescommon.h b/sdrbase/pipes/elementpipescommon.h new file mode 100644 index 000000000..c2f38e374 --- /dev/null +++ b/sdrbase/pipes/elementpipescommon.h @@ -0,0 +1,45 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_ELEMNTPIPESCOMMON_H_ +#define SDRBASE_PIPES_ELEMNTPIPESCOMMON_H_ + +namespace ElementPipesCommon { + +template +struct RegistrationKey +{ + const T *m_key; + int m_typeId; + + RegistrationKey() = default; + RegistrationKey(const RegistrationKey&) = default; + RegistrationKey& operator=(const RegistrationKey&) = default; + + bool operator<(const RegistrationKey& other) const + { + if (m_key != other.m_key) { + return m_key < other.m_key; + } else { + return m_typeId < other.m_typeId; + } + } +}; + +} // ElementPipesCommon + +#endif // SDRBASE_PIPES_ELEMNTPIPESCOMMON_H_ diff --git a/sdrbase/pipes/elementpipesgc.h b/sdrbase/pipes/elementpipesgc.h new file mode 100644 index 000000000..7e8948968 --- /dev/null +++ b/sdrbase/pipes/elementpipesgc.h @@ -0,0 +1,139 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_ELEMNTPIPESGCWORKER_H_ +#define SDRBASE_PIPES_ELEMNTPIPESGCWORKER_H_ + +#include +#include +#include + +#include "elementpipescommon.h" + +template +class ElementPipesGC +{ +public: + ElementPipesGC() : + m_mutex(nullptr), + m_elements(nullptr), + m_consumers(nullptr) + {} + + ~ElementPipesGC() + {} + + void setRegistrations( + QMutex *mutex, + QMap, QList> *elements, + QMap, QList> *consumers + ) + { + m_mutex = mutex; + m_elements = elements; + m_consumers = consumers; + } + + void processGC() + { + if (m_mutex) + { + QMutexLocker mlock(m_mutex); + typename QMap, QList>::iterator cIt = m_consumers->begin(); + + // remove fifos to be deleted from last run + for (typename QList::iterator elIt = m_elementsToDelete.begin(); elIt != m_elementsToDelete.end(); ++elIt) { + delete *elIt; + } + + m_elementsToDelete.clear(); + + // remove keys with empty features + while (cIt != m_consumers->end()) + { + if (cIt.value().size() == 0) { + cIt = m_consumers->erase(cIt); + } else { + ++cIt; + } + } + + // remove keys with empty fifos + typename QMap, QList>::iterator elIt = m_elements->begin(); + + while (elIt != m_elements->end()) + { + if (elIt.value().size() == 0) { + elIt = m_elements->erase(elIt); + } else { + ++elIt; + } + } + + // check deleted channels and features + cIt = m_consumers->begin(); + + for (;cIt != m_consumers->end(); ++cIt) + { + ElementPipesCommon::RegistrationKey producerKey = cIt.key(); + const Producer *producer = producerKey.m_key; + + if (existsProducer(producer)) // look for deleted features + { + QList& consumers = cIt.value(); + int i = 0; + + while (i < consumers.size()) + { + if (existsConsumer(consumers[i])) { + i++; + } + else + { + consumers.removeAt(i); + Element *element = m_elements->operator[](producerKey)[i]; + m_elementsToDelete.append(element); + m_elements->operator[](producerKey).removeAt(i); + } + } + } + else // channel was destroyed + { + QList& consumers = cIt.value(); + + for (int i = 0; i < consumers.size(); i++) { + sendMessageToConsumer(m_elements->operator[](producerKey)[i], producerKey, consumers[i]); + } + } + } + } + } + +protected: + virtual bool existsProducer(const Producer *producer) = 0; + virtual bool existsConsumer(const Consumer *consumer) = 0; + virtual void sendMessageToConsumer(const Element *element, ElementPipesCommon::RegistrationKey producerKey, Consumer *consumer) = 0; + +private: + QMutex *m_mutex; + QMap, QList> *m_elements; + QMap, QList> *m_consumers; + QList m_elementsToDelete; +}; + + +#endif // SDRBASE_PIPES_ELEMNTPIPESGCWORKER_H_ diff --git a/sdrbase/pipes/elementpipesregistrations.h b/sdrbase/pipes/elementpipesregistrations.h new file mode 100644 index 000000000..1b2a9ed3c --- /dev/null +++ b/sdrbase/pipes/elementpipesregistrations.h @@ -0,0 +1,135 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2020 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_ELEMNTPIPESREGISTRATION_H_ +#define SDRBASE_PIPES_ELEMNTPIPESREGISTRATION_H_ + +#include +#include +#include +#include + +#include "elementpipescommon.h" + +template +class ElementPipesRegistrations +{ +public: + ElementPipesRegistrations() : + m_typeCount(0), + m_mutex(QMutex::Recursive) + {} + + ~ElementPipesRegistrations() + { + typename QMap, QList>::iterator mit = m_elements.begin(); + + for (; mit != m_elements.end(); ++mit) + { + typename QList::iterator elIt = mit->begin(); + + for (; elIt != mit->end(); ++elIt) { + delete *elIt; + } + } + } + + Element *registerProducerToConsumer(const Producer *producer, Consumer *consumer, const QString& type) + { + int typeId; + QMutexLocker mlock(&m_mutex); + + if (m_typeIds.contains(type)) + { + typeId = m_typeIds.value(type); + } + else + { + typeId++; + m_typeIds.insert(type, typeId); + } + + const typename ElementPipesCommon::RegistrationKey regKey + = ElementPipesCommon::RegistrationKey{producer, typeId}; + Element *element; + + if (m_consumers[regKey].contains(consumer)) + { + int i = m_consumers[regKey].indexOf(consumer); + element = m_elements[regKey][i]; + } + else + { + element = new Element(); + m_elements[regKey].append(element); + m_consumers[regKey].append(consumer); + } + + return element; + } + + void unregisterProducerToConsumer(const Producer *producer, Consumer *consumer, const QString& type) + { + if (m_typeIds.contains(type)) + { + int typeId = m_typeIds.value(type); + const typename ElementPipesCommon::RegistrationKey regKey + = ElementPipesCommon::RegistrationKey{producer, typeId}; + + if (m_consumers.contains(regKey) && m_consumers[regKey].contains(consumer)) + { + QMutexLocker mlock(&m_mutex); + int i = m_consumers[regKey].indexOf(consumer); + m_consumers[regKey].removeAt(i); + Element *element = m_elements[regKey][i]; + delete element; + m_elements[regKey].removeAt(i); + } + } + } + + QList* getElements(const Producer *producer, const QString& type) + { + if (!m_typeIds.contains(type)) { + return nullptr; + } + + QMutexLocker mlock(&m_mutex); + const typename ElementPipesCommon::RegistrationKey regKey + = ElementPipesCommon::RegistrationKey{producer, m_typeIds.value(type)}; + + if (m_elements.contains(regKey)) { + return &m_elements[regKey]; + } else { + return nullptr; + } + } + + QMap, QList> *getElements() { return &m_elements; } + QMap, QList> *getConsumers() { return &m_consumers; } + QMutex *getMutex() { return &m_mutex; } + + +private: + QHash m_typeIds; + int m_typeCount; + QMap, QList> m_elements; + QMap, QList> m_consumers; + QMutex m_mutex; +}; + +#endif // SDRBASE_PIPES_ELEMNTPIPESREGISTRATION_H_ diff --git a/sdrbase/pipes/messagepipes.cpp b/sdrbase/pipes/messagepipes.cpp index 139e1e6df..377f0831a 100644 --- a/sdrbase/pipes/messagepipes.cpp +++ b/sdrbase/pipes/messagepipes.cpp @@ -17,15 +17,19 @@ #include +#include "util/messagequeue.h" + #include "messagepipesgcworker.h" #include "messagepipes.h" -MessagePipes::MessagePipes() : - m_typeCount(0), - m_c2fMutex(QMutex::Recursive) +MessagePipes::MessagePipes() { m_gcWorker = new MessagePipesGCWorker(); - m_gcWorker->setC2FRegistrations(&m_c2fMutex, &m_c2fQueues, &m_c2fFEatures); + m_gcWorker->setC2FRegistrations( + m_registrations.getMutex(), + m_registrations.getElements(), + m_registrations.getConsumers() + ); m_gcWorker->moveToThread(&m_gcThread); startGC(); } @@ -35,91 +39,26 @@ MessagePipes::~MessagePipes() if (m_gcWorker->isRunning()) { stopGC(); } - - QMap>::iterator mit = m_c2fQueues.begin(); - - for (; mit != m_c2fQueues.end(); ++mit) - { - QList::iterator lit = mit->begin(); - - for (; lit != mit->end(); ++lit) { - delete *lit; - } - } } MessageQueue *MessagePipes::registerChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) { - int typeId; - QMutexLocker mlock(&m_c2fMutex); - - if (m_typeIds.contains(type)) - { - typeId = m_typeIds.value(type); - } - else - { - typeId++; - m_typeIds.insert(type, typeId); - } - - const MessagePipesCommon::ChannelRegistrationKey regKey = MessagePipesCommon::ChannelRegistrationKey{source, typeId}; - MessageQueue *messageQueue; - - if (m_c2fFEatures[regKey].contains(feature)) - { - int i = m_c2fFEatures[regKey].indexOf(feature); - messageQueue = m_c2fQueues[regKey][i]; - } - else - { - messageQueue = new MessageQueue(); - m_c2fQueues[regKey].append(messageQueue); - m_c2fFEatures[regKey].append(feature); - } - - return messageQueue; + return m_registrations.registerProducerToConsumer(source, feature, type); } void MessagePipes::unregisterChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) { - if (m_typeIds.contains(type)) - { - int typeId = m_typeIds.value(type); - const MessagePipesCommon::ChannelRegistrationKey regKey = MessagePipesCommon::ChannelRegistrationKey{source, typeId}; - - if (m_c2fFEatures.contains(regKey) && m_c2fFEatures[regKey].contains(feature)) - { - QMutexLocker mlock(&m_c2fMutex); - int i = m_c2fFEatures[regKey].indexOf(feature); - m_c2fFEatures[regKey].removeAt(i); - MessageQueue *messageQueue = m_c2fQueues[regKey][i]; - delete messageQueue; - m_c2fQueues[regKey].removeAt(i); - } - } + m_registrations.unregisterProducerToConsumer(source, feature, type); } QList* MessagePipes::getMessageQueues(const ChannelAPI *source, const QString& type) { - if (!m_typeIds.contains(type)) { - return nullptr; - } - - QMutexLocker mlock(&m_c2fMutex); - const MessagePipesCommon::ChannelRegistrationKey regKey = MessagePipesCommon::ChannelRegistrationKey{source, m_typeIds.value(type)}; - - if (m_c2fQueues.contains(regKey)) { - return &m_c2fQueues[regKey]; - } else { - return nullptr; - } + return m_registrations.getElements(source, type); } void MessagePipes::startGC() { qDebug("MessagePipes::startGC"); - m_gcWorker->startWork(); m_gcThread.start(); } diff --git a/sdrbase/pipes/messagepipes.h b/sdrbase/pipes/messagepipes.h index 8145e1252..5f54d7b6f 100644 --- a/sdrbase/pipes/messagepipes.h +++ b/sdrbase/pipes/messagepipes.h @@ -25,13 +25,14 @@ #include #include "export.h" -#include "util/messagequeue.h" #include "messagepipescommon.h" +#include "elementpipesregistrations.h" class ChannelAPI; class Feature; class MessagePipesGCWorker; +class MessageQueue; class SDRBASE_API MessagePipes : public QObject { @@ -47,11 +48,7 @@ public: QList* getMessageQueues(const ChannelAPI *source, const QString& type); private: - QHash m_typeIds; - int m_typeCount; - QMap> m_c2fQueues; - QMap> m_c2fFEatures; - QMutex m_c2fMutex; + ElementPipesRegistrations m_registrations; QThread m_gcThread; //!< Garbage collector thread MessagePipesGCWorker *m_gcWorker; //!< Garbage collector diff --git a/sdrbase/pipes/messagepipescommon.cpp b/sdrbase/pipes/messagepipescommon.cpp index 1244d1a10..f749fb2e5 100644 --- a/sdrbase/pipes/messagepipescommon.cpp +++ b/sdrbase/pipes/messagepipescommon.cpp @@ -18,12 +18,3 @@ #include "messagepipescommon.h" MESSAGE_CLASS_DEFINITION(MessagePipesCommon::MsgReportChannelDeleted, Message) - -bool MessagePipesCommon::ChannelRegistrationKey::operator<(const ChannelRegistrationKey& other) const -{ - if (m_channel != other.m_channel) { - return m_channel < other.m_channel; - } else { - return m_typeId < other.m_typeId; - } -} diff --git a/sdrbase/pipes/messagepipescommon.h b/sdrbase/pipes/messagepipescommon.h index f5fd362ab..d51b05e50 100644 --- a/sdrbase/pipes/messagepipescommon.h +++ b/sdrbase/pipes/messagepipescommon.h @@ -24,6 +24,7 @@ #include "export.h" #include "util/message.h" +#include "elementpipescommon.h" class ChannelAPI; class Feature; @@ -32,16 +33,7 @@ class MessageQueue; class SDRBASE_API MessagePipesCommon { public: - struct ChannelRegistrationKey - { - const ChannelAPI *m_channel; - int m_typeId; - - ChannelRegistrationKey() = default; - ChannelRegistrationKey(const ChannelRegistrationKey&) = default; - ChannelRegistrationKey& operator=(const ChannelRegistrationKey&) = default; - bool operator<(const ChannelRegistrationKey& other) const; - }; + typedef ElementPipesCommon::RegistrationKey ChannelRegistrationKey; /** Send this message to stakeholders when the garbage collector finds that a channel was deleted */ class SDRBASE_API MsgReportChannelDeleted : public Message { diff --git a/sdrbase/pipes/messagepipesgcworker.cpp b/sdrbase/pipes/messagepipesgcworker.cpp index 05147eb1c..c9dad9f78 100644 --- a/sdrbase/pipes/messagepipesgcworker.cpp +++ b/sdrbase/pipes/messagepipesgcworker.cpp @@ -16,15 +16,33 @@ /////////////////////////////////////////////////////////////////////////////////// #include "feature/feature.h" +#include "util/messagequeue.h" #include "maincore.h" #include "messagepipescommon.h" #include "messagepipesgcworker.h" +bool MessagePipesGCWorker::MessagePipesGC::existsProducer(const ChannelAPI *channel) +{ + return MainCore::instance()->existsChannel(channel); +} + +bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const Feature *feature) +{ + return MainCore::instance()->existsFeature(feature); +} + +void MessagePipesGCWorker::MessagePipesGC::sendMessageToConsumer( + const MessageQueue *messageQueue, + MessagePipesCommon::ChannelRegistrationKey channelKey, + Feature *feature) +{ + MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create( + messageQueue, channelKey); + feature->getInputMessageQueue()->push(msg); +} + MessagePipesGCWorker::MessagePipesGCWorker() : - m_running(false), - m_c2fMutex(nullptr), - m_c2fQueues(nullptr), - m_c2fFeatures(nullptr) + m_running(false) {} MessagePipesGCWorker::~MessagePipesGCWorker() @@ -47,78 +65,5 @@ void MessagePipesGCWorker::stopWork() void MessagePipesGCWorker::processGC() { // qDebug("MessagePipesGCWorker::processGC"); - if (m_c2fMutex) - { - QMutexLocker mlock(m_c2fMutex); - QMap>::iterator fIt = m_c2fFeatures->begin(); - - // remove queues to be deleted from last run - for (QList::iterator dqIt = m_c2fQueuesToDelete.begin(); dqIt != m_c2fQueuesToDelete.end(); ++dqIt) { - delete *dqIt; - } - - m_c2fQueuesToDelete.clear(); - - // remove keys with empty features - fIt = m_c2fFeatures->begin(); - - while (fIt != m_c2fFeatures->end()) - { - if (fIt.value().size() == 0) { - fIt = m_c2fFeatures->erase(fIt); - } else { - ++fIt; - } - } - - // remove keys with empty message queues - QMap>::iterator qIt = m_c2fQueues->begin(); - - while (qIt != m_c2fQueues->end()) - { - if (qIt.value().size() == 0) { - qIt = m_c2fQueues->erase(qIt); - } else { - ++qIt; - } - } - - // check deleted channels and features - for (;fIt != m_c2fFeatures->end(); ++fIt) - { - MessagePipesCommon::ChannelRegistrationKey channelKey = fIt.key(); - const ChannelAPI *channel = channelKey.m_channel; - - if (MainCore::instance()->existsChannel(channel)) // look for deleted features - { - QList& features = fIt.value(); - int i = 0; - - while (i < features.size()) - { - if (MainCore::instance()->existsFeature(features[i])) { - i++; - } - else - { - features.removeAt(i); - MessageQueue *messageQueue = m_c2fQueues->operator[](channelKey)[i]; - m_c2fQueuesToDelete.append(messageQueue); - m_c2fQueues->operator[](channelKey).removeAt(i); - } - } - } - else // channel was destroyed - { - QList& features = fIt.value(); - - for (int i = 0; i < features.size(); i++) - { - MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create( - m_c2fQueues->operator[](channelKey)[i], channelKey); - features[i]->getInputMessageQueue()->push(msg); - } - } - } - } + m_messagePipesGC.processGC(); } diff --git a/sdrbase/pipes/messagepipesgcworker.h b/sdrbase/pipes/messagepipesgcworker.h index a57dc6f3e..b692b7af1 100644 --- a/sdrbase/pipes/messagepipesgcworker.h +++ b/sdrbase/pipes/messagepipesgcworker.h @@ -24,6 +24,7 @@ #include "export.h" #include "messagepipescommon.h" +#include "elementpipesgc.h" class QMutex; @@ -40,9 +41,7 @@ public: QMap> *c2fFeatures ) { - m_c2fMutex = c2fMutex; - m_c2fQueues = c2fQueues; - m_c2fFeatures = c2fFeatures; + m_messagePipesGC.setRegistrations(c2fMutex, c2fQueues, c2fFeatures); } void startWork(); @@ -50,11 +49,16 @@ public: bool isRunning() const { return m_running; } private: + class MessagePipesGC : public ElementPipesGC + { + private: + virtual bool existsProducer(const ChannelAPI *channelAPI); + virtual bool existsConsumer(const Feature *feature); + virtual void sendMessageToConsumer(const MessageQueue *messageQueue, MessagePipesCommon::ChannelRegistrationKey key, Feature *feature); + }; + + MessagePipesGC m_messagePipesGC; bool m_running; - QMutex *m_c2fMutex; - QMap> *m_c2fQueues; - QMap> *m_c2fFeatures; - QList m_c2fQueuesToDelete; QTimer m_gcTimer; private slots: