// Copyright (c) Charles J. Cliffe // SPDX-License-Identifier: GPL-2.0+ #pragma once #include #include #include #include #include #include #include #include #include #include #include "ThreadBlockingQueue.h" #include "Timer.h" #include "SpinMutex.h" struct map_string_less { bool operator()(const std::string& a,const std::string& b) const { return a.compare(b) < 0; } }; template class ReBufferAge { public: ReBufferAge(PtrType p, int a) { ptr = p; age = a; } PtrType ptr; int age; virtual ~ReBufferAge() = default;; }; #define REBUFFER_GC_LIMIT 100 #define REBUFFER_WARNING_THRESHOLD 2000 template class ReBuffer { typedef typename std::shared_ptr ReBufferPtr; public: //Virtual destructor to assure correct freeing of all descendants. virtual ~ReBuffer() = default; //constructor explicit ReBuffer(std::string bufferId) : bufferId(bufferId) { //nothing } /// Return a new ReBuffer_ptr usable by the application. ReBufferPtr getBuffer() { std::lock_guard < SpinMutex > lock(m_mutex); // iterate the ReBufferAge list: if the std::shared_ptr count == 1, it means //it is only referenced in outputBuffers itself, so available for re-use. //else if the std::shared_ptr count <= 1, make it age. //else the ReBufferPtr is in use, don't use it. ReBufferPtr buf = nullptr; outputBuffersI it = outputBuffers.begin(); while (it != outputBuffers.end()) { //careful here: take care of reading the use_count directly //through the iterator, else it's value is wrong if a temp variable //is used. long use = it->ptr.use_count(); //1. If we encounter a ReBufferPtr with a use count of 0, this //is a bug since it is supposed to be at least 1, because it is referenced here. //in this case, purge it from here and trace. if (use == 0) { std::cout << "Warning: in ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "', found 1 dangling buffer !" << std::endl << std::flush; it = outputBuffers.erase(it); } else if (use == 1) { if (buf == nullptr) { it->age = 1; //select this one. buf = it->ptr; // std::cout << "**" << std::flush; it++; } else { //make the other unused buffers age it->age--; it++; } } else { it++; } } //end while //2.1 Garbage collect the oldest (last element) if it aged too much, and return the buffer if (buf != nullptr) { if (outputBuffers.back().age < -REBUFFER_GC_LIMIT) { //by the nature of the shared_ptr, memory will ne deallocated automatically. outputBuffers.pop_back(); //std::cout << "--" << std::flush; } return buf; } if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) { std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl << std::flush; } //3.We need to allocate a new buffer. ReBufferAge < ReBufferPtr > newBuffer(std::make_shared(), 1); outputBuffers.push_back(newBuffer); // std::cout << "++" << std::flush; return newBuffer.ptr; } /// Purge the cache. void purge() { std::lock_guard < SpinMutex > lock(m_mutex); // since outputBuffers are full std::shared_ptr, //purging if will effectively loose the local reference, // so the std::shared_ptr will naturally be deallocated //when their time comes. outputBuffers.clear(); } private: //name of the buffer cache kind std::string bufferId; //the ReBuffer cache: use a std:deque to also release //memory when ReBufferPtr are GCed. std::deque< ReBufferAge < ReBufferPtr > > outputBuffers; typedef typename std::deque< ReBufferAge < ReBufferPtr > >::iterator outputBuffersI; //mutex protecting access to outputBuffers. SpinMutex m_mutex; }; class IOThread { public: IOThread(); virtual ~IOThread(); static void *pthread_helper(void *context); #ifdef __APPLE__ virtual void *threadMain(); #else //the thread Main call back itself virtual void threadMain(); #endif virtual void setup(); virtual void run(); //Request for termination (asynchronous) virtual void terminate(); //Returns true if the thread is indeed terminated, i.e the run() method //has returned. //If wait > 0 ms, the call is blocking at most 'waitMs' milliseconds for the thread to die, then returns. //If wait < 0, the wait in infinite until the thread dies. bool isTerminated(int waitMs = 0); virtual void onBindOutput(std::string name, ThreadQueueBasePtr threadQueue); virtual void onBindInput(std::string name, ThreadQueueBasePtr threadQueue); void setInputQueue(const std::string& qname, const ThreadQueueBasePtr& threadQueue); ThreadQueueBasePtr getInputQueue(const std::string& qname); void setOutputQueue(const std::string& qname, const ThreadQueueBasePtr& threadQueue); ThreadQueueBasePtr getOutputQueue(const std::string& qname); protected: std::map input_queues; std::map output_queues; //this protects against concurrent changes in input/output bindings: get/set/Input/OutPutQueue std::mutex m_queue_bindings_mutex; //true when a termination is ordered std::atomic_bool stopping; Timer gTimer; private: //true when the thread has really ended, i.e run() from threadMain() has returned. std::atomic_bool terminated; };