diff --git a/sdrbase/CMakeLists.txt b/sdrbase/CMakeLists.txt index d9cc7cf6d..8bdeffc42 100644 --- a/sdrbase/CMakeLists.txt +++ b/sdrbase/CMakeLists.txt @@ -171,8 +171,11 @@ set(sdrbase_SOURCES pipes/datapipes.cpp pipes/datapipesgcworker.cpp pipes/messagepipes.cpp + pipes/messagepipes2.cpp pipes/messagepipescommon.cpp pipes/messagepipesgcworker.cpp + pipes/messagepipes2gcworker.cpp + pipes/messagequeuestore.cpp pipes/pipeendpoint.cpp pipes/objectpipe.cpp pipes/objectpipesregistrations.cpp @@ -379,8 +382,11 @@ set(sdrbase_HEADERS pipes/elementpipescommon.h pipes/elementpipesgc.h pipes/messagepipes.h + pipes/messagepipes2.h pipes/messagepipescommon.h pipes/messagepipesgcworker.h + pipes/messagepipes2gcworker.h + pipes/messagequeuestore.h pipes/pipeendpoint.h pipes/objectpipe.h pipes/objectpipesregistrations.h diff --git a/sdrbase/maincore.h b/sdrbase/maincore.h index 6cbc0d0d0..d333dc402 100644 --- a/sdrbase/maincore.h +++ b/sdrbase/maincore.h @@ -28,6 +28,7 @@ #include "settings/mainsettings.h" #include "util/message.h" #include "pipes/messagepipes.h" +#include "pipes/messagepipes2.h" #include "pipes/datapipes.h" #include "channel/channelapi.h" @@ -731,6 +732,7 @@ public: void clearFeatures(FeatureSet *featureSet); // pipes MessagePipes& getMessagePipes() { return m_messagePipes; } + MessagePipes2& getMessagePipes2() { return m_messagePipes2; } DataPipes& getDataPipes() { return m_dataPipes; } friend class MainServer; @@ -751,6 +753,7 @@ private: QMap m_featuresMap; //!< Feature to feature set map PluginManager* m_pluginManager; MessagePipes m_messagePipes; + MessagePipes2 m_messagePipes2; DataPipes m_dataPipes; void debugMaps(); diff --git a/sdrbase/pipes/datapipes.cpp b/sdrbase/pipes/datapipes.cpp index f99e87a6d..fc947c4db 100644 --- a/sdrbase/pipes/datapipes.cpp +++ b/sdrbase/pipes/datapipes.cpp @@ -15,7 +15,6 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#include "dsp/datafifo.h" #include "datapipes.h" #include "datapipesgcworker.h" diff --git a/sdrbase/pipes/datapipes.h b/sdrbase/pipes/datapipes.h index 079bbf8e8..df5a7d0fd 100644 --- a/sdrbase/pipes/datapipes.h +++ b/sdrbase/pipes/datapipes.h @@ -25,7 +25,6 @@ #include "objectpipesregistrations.h" #include "datafifostore.h" -class DataFifo; class DataPipesGCWorker; class SDRBASE_API DataPipes : public QObject diff --git a/sdrbase/pipes/datapipesgcworker.cpp b/sdrbase/pipes/datapipesgcworker.cpp index 5907727d5..28b690850 100644 --- a/sdrbase/pipes/datapipesgcworker.cpp +++ b/sdrbase/pipes/datapipesgcworker.cpp @@ -15,7 +15,6 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#include "dsp/datafifo.h" #include "datapipesgcworker.h" DataPipesGCWorker::DataPipesGCWorker(ObjectPipesRegistrations& objectPipesRegistrations) : diff --git a/sdrbase/pipes/datapipesgcworker.h b/sdrbase/pipes/datapipesgcworker.h index 618b506f7..bf918e68d 100644 --- a/sdrbase/pipes/datapipesgcworker.h +++ b/sdrbase/pipes/datapipesgcworker.h @@ -24,8 +24,6 @@ #include "export.h" #include "objectpipesregistrations.h" -class DataFifo; - class SDRBASE_API DataPipesGCWorker : public QObject { Q_OBJECT diff --git a/sdrbase/pipes/messagepipes2.cpp b/sdrbase/pipes/messagepipes2.cpp new file mode 100644 index 000000000..7f4202015 --- /dev/null +++ b/sdrbase/pipes/messagepipes2.cpp @@ -0,0 +1,67 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "messagepipes2.h" +#include "messagepipes2gcworker.h" + +MessagePipes2::MessagePipes2() : + m_registrations(&m_messageQueueStore) +{ + m_gcWorker = new MessagePipes2GCWorker(m_registrations); + m_gcWorker->moveToThread(&m_gcThread); + startGC(); +} + +MessagePipes2::~MessagePipes2() +{ + if (m_gcWorker->isRunning()) { + stopGC(); + } + + m_gcWorker->deleteLater(); +} + +ObjectPipe *MessagePipes2::registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) +{ + return m_registrations.registerProducerToConsumer(producer, consumer, type); +} + +ObjectPipe *MessagePipes2::unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) +{ + return m_registrations.unregisterProducerToConsumer(producer, consumer, type); +} + +void MessagePipes2::getMessagePipes(const QObject *producer, const QString& type, QList& pipes) +{ + return m_registrations.getPipes(producer, type, pipes); +} + +void MessagePipes2::startGC() +{ + qDebug("MessagePipes2::startGC"); + + m_gcWorker->startWork(); + m_gcThread.start(); +} + +void MessagePipes2::stopGC() +{ + qDebug("MessagePipes2::stopGC"); + m_gcWorker->stopWork(); + m_gcThread.quit(); + m_gcThread.wait(); +} diff --git a/sdrbase/pipes/messagepipes2.h b/sdrbase/pipes/messagepipes2.h new file mode 100644 index 000000000..fe51edfd0 --- /dev/null +++ b/sdrbase/pipes/messagepipes2.h @@ -0,0 +1,54 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_MESSAGEPIPES2_H_ +#define SDRBASE_PIPES_MESSAGEPIPES2_H_ + +#include +#include + +#include "export.h" +#include "objectpipesregistrations.h" +#include "messagequeuestore.h" + +class MessagePipes2GCWorker; + +class SDRBASE_API MessagePipes2 : public QObject +{ + Q_OBJECT +public: + MessagePipes2(); + MessagePipes2(const MessagePipes2&) = delete; + MessagePipes2& operator=(const MessagePipes2&) = delete; + ~MessagePipes2(); + + ObjectPipe *registerProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type); + ObjectPipe *unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type); + void getMessagePipes(const QObject *producer, const QString& type, QList& pipes); + +private: + MessageQueueStore m_messageQueueStore; + ObjectPipesRegistrations m_registrations; + QThread m_gcThread; //!< Garbage collector thread + MessagePipes2GCWorker *m_gcWorker; //!< Garbage collector + + void startGC(); //!< Start garbage collector + void stopGC(); //!< Stop garbage collector +}; + + +#endif // SDRBASE_PIPES_MESSAGEPIPES2_H_ diff --git a/sdrbase/pipes/messagepipes2gcworker.cpp b/sdrbase/pipes/messagepipes2gcworker.cpp new file mode 100644 index 000000000..6591f060e --- /dev/null +++ b/sdrbase/pipes/messagepipes2gcworker.cpp @@ -0,0 +1,45 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "messagepipes2gcworker.h" + +MessagePipes2GCWorker::MessagePipes2GCWorker(ObjectPipesRegistrations& objectPipesRegistrations) : + m_running(false), + m_objectPipesRegistrations(objectPipesRegistrations) +{} + +MessagePipes2GCWorker::~MessagePipes2GCWorker() +{} + +void MessagePipes2GCWorker::startWork() +{ + connect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); + m_gcTimer.start(10000); // collect garbage every 10s + m_running = true; +} + +void MessagePipes2GCWorker::stopWork() +{ + m_running = false; + m_gcTimer.stop(); + disconnect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); +} + +void MessagePipes2GCWorker::processGC() +{ + m_objectPipesRegistrations.processGC(); +} diff --git a/sdrbase/pipes/messagepipes2gcworker.h b/sdrbase/pipes/messagepipes2gcworker.h new file mode 100644 index 000000000..a34097315 --- /dev/null +++ b/sdrbase/pipes/messagepipes2gcworker.h @@ -0,0 +1,47 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_MESSAGEPIPES2GCWORKER_H_ +#define SDRBASE_PIPES_MESSAGEPIPES2GCWORKER_H_ + +#include +#include + +#include "export.h" +#include "objectpipesregistrations.h" + +class SDRBASE_API MessagePipes2GCWorker : public QObject +{ + Q_OBJECT +public: + MessagePipes2GCWorker(ObjectPipesRegistrations& objectPipesRegistrations); + ~MessagePipes2GCWorker(); + + void startWork(); + void stopWork(); + bool isRunning() const { return m_running; } + +private: + bool m_running; + QTimer m_gcTimer; + ObjectPipesRegistrations& m_objectPipesRegistrations; + +private slots: + void processGC(); //!< Collect garbage +}; + +#endif // SDRBASE_PIPES_MESSAGEPIPES2GCWORKER_H_ diff --git a/sdrbase/pipes/messagepipesgcworker.cpp b/sdrbase/pipes/messagepipesgcworker.cpp index 24b75ff60..4a3ee0e5b 100644 --- a/sdrbase/pipes/messagepipesgcworker.cpp +++ b/sdrbase/pipes/messagepipesgcworker.cpp @@ -35,22 +35,10 @@ bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const PipeEndPoint *pi } void MessagePipesGCWorker::MessagePipesGC::sendMessageToConsumer( - const MessageQueue *messageQueue, - MessagePipesCommon::ChannelRegistrationKey channelKey, - PipeEndPoint *pipeEndPoint) + const MessageQueue *, + MessagePipesCommon::ChannelRegistrationKey, + PipeEndPoint *) { - MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create( - messageQueue, channelKey); - if (MainCore::instance()->existsFeature((const Feature *)pipeEndPoint)) // Use RTTI instead? - { - Feature *feature = (Feature *)pipeEndPoint; - feature->getInputMessageQueue()->push(msg); - } - else - { - ChannelAPI *channel = (ChannelAPI *)pipeEndPoint; - channel->getChannelMessageQueue()->push(msg); - } } MessagePipesGCWorker::MessagePipesGCWorker() : diff --git a/sdrbase/pipes/messagequeuestore.cpp b/sdrbase/pipes/messagequeuestore.cpp new file mode 100644 index 000000000..0fea3f60c --- /dev/null +++ b/sdrbase/pipes/messagequeuestore.cpp @@ -0,0 +1,56 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "util/messagequeue.h" +#include "messagequeuestore.h" + +MessageQueueStore::MessageQueueStore() +{} + +MessageQueueStore::~MessageQueueStore() +{ + deleteAllElements(); +} + +QObject *MessageQueueStore::createElement() +{ + MessageQueue *messageQueue = new MessageQueue(); + m_messageQueues.push_back(messageQueue); + qDebug("MessageQueueStore::createElement: %d added", m_messageQueues.size() - 1); + return messageQueue; +} + +void MessageQueueStore::deleteElement(QObject *element) +{ + int i = m_messageQueues.indexOf((MessageQueue*) element); + + if (i >= 0) + { + qDebug("MessageQueueStore::deleteElement: delte element at %d", i); + delete m_messageQueues[i]; + m_messageQueues.removeAt(i); + } +} + +void MessageQueueStore::deleteAllElements() +{ + for (auto& messageQueue : m_messageQueues) { + delete messageQueue; + } + + m_messageQueues.clear(); +} diff --git a/sdrbase/pipes/messagequeuestore.h b/sdrbase/pipes/messagequeuestore.h new file mode 100644 index 000000000..3eeedd958 --- /dev/null +++ b/sdrbase/pipes/messagequeuestore.h @@ -0,0 +1,43 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2022 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef SDRBASE_PIPES_MESSAGEQUEUESTORE_H_ +#define SDRBASE_PIPES_MESSAGEQUEUESTORE_H_ + +#include + +#include "export.h" +#include "objectpipeelementsstore.h" + +class MessageQueue; + +class SDRBASE_API MessageQueueStore : public ObjectPipeElementsStore +{ +public: + MessageQueueStore(); + virtual ~MessageQueueStore(); + + virtual QObject *createElement(); + virtual void deleteElement(QObject*); + +private: + void deleteAllElements(); + QList m_messageQueues; +}; + + +#endif // SDRBASE_PIPES_MESSAGEQUEUESTORE_H_ diff --git a/sdrbase/pipes/objectpipesregistrations.cpp b/sdrbase/pipes/objectpipesregistrations.cpp index 58c758923..87084ff44 100644 --- a/sdrbase/pipes/objectpipesregistrations.cpp +++ b/sdrbase/pipes/objectpipesregistrations.cpp @@ -73,7 +73,7 @@ ObjectPipe *ObjectPipesRegistrations::registerProducerToConsumer(const QObject * ObjectPipe *ObjectPipesRegistrations::unregisterProducerToConsumer(const QObject *producer, const QObject *consumer, const QString& type) { - qDebug("ObjectPipesRegistrations::unregisterProducerToConsumer: %p %p %s", producer, consumer, qPrintable("type")); + qDebug("ObjectPipesRegistrations::unregisterProducerToConsumer: %p %p %s", producer, consumer, qPrintable(type)); ObjectPipe *pipe = nullptr; if (m_typeIds.contains(type))