2015-07-29 18:34:58 -04:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <mutex>
|
|
|
|
#include <atomic>
|
|
|
|
#include <deque>
|
|
|
|
#include <map>
|
2016-06-07 19:54:36 -04:00
|
|
|
#include <set>
|
2015-07-29 18:34:58 -04:00
|
|
|
#include <string>
|
2015-08-12 21:45:02 -04:00
|
|
|
#include <iostream>
|
2016-06-28 15:04:52 -04:00
|
|
|
#include <thread>
|
2015-07-29 18:34:58 -04:00
|
|
|
|
2015-07-29 20:57:02 -04:00
|
|
|
#include "ThreadQueue.h"
|
2015-09-09 23:29:38 -04:00
|
|
|
#include "Timer.h"
|
2015-07-29 18:34:58 -04:00
|
|
|
|
|
|
|
struct map_string_less : public std::binary_function<std::string,std::string,bool>
|
|
|
|
{
|
|
|
|
bool operator()(const std::string& a,const std::string& b) const
|
|
|
|
{
|
|
|
|
return a.compare(b) < 0;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
class ReferenceCounter {
|
2016-06-01 13:51:01 -04:00
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
public:
|
2016-06-09 13:20:16 -04:00
|
|
|
|
|
|
|
//default constructor, initialized with refcont 1, sounds very natural
|
|
|
|
ReferenceCounter() {
|
|
|
|
refCount = 1;
|
|
|
|
}
|
2015-07-29 18:34:58 -04:00
|
|
|
|
2016-06-08 22:08:14 -04:00
|
|
|
// void setIndex(int idx) {
|
|
|
|
// std::lock_guard < std::recursive_mutex > lock(m_mutex);
|
|
|
|
// index = idx;
|
|
|
|
// }
|
2016-06-08 21:31:52 -04:00
|
|
|
|
2016-06-08 22:08:14 -04:00
|
|
|
// int getIndex() {
|
|
|
|
// std::lock_guard < std::recursive_mutex > lock(m_mutex);
|
|
|
|
// return index;
|
|
|
|
// }
|
2016-06-08 21:31:52 -04:00
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
void setRefCount(int rc) {
|
2016-06-01 13:46:45 -04:00
|
|
|
std::lock_guard < std::recursive_mutex > lock(m_mutex);
|
|
|
|
refCount = rc;
|
2015-07-29 18:34:58 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
void decRefCount() {
|
2016-06-01 13:46:45 -04:00
|
|
|
std::lock_guard < std::recursive_mutex > lock(m_mutex);
|
|
|
|
refCount--;
|
2015-07-29 18:34:58 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
int getRefCount() {
|
2016-06-01 13:46:45 -04:00
|
|
|
std::lock_guard < std::recursive_mutex > lock(m_mutex);
|
|
|
|
return refCount;
|
2015-07-29 18:34:58 -04:00
|
|
|
}
|
2016-06-02 17:56:31 -04:00
|
|
|
|
|
|
|
// Access to the own mutex protecting the ReferenceCounter, i.e the monitor of the class
|
|
|
|
std::recursive_mutex& getMonitor() const {
|
|
|
|
return m_mutex;
|
|
|
|
}
|
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
protected:
|
2016-06-01 13:46:45 -04:00
|
|
|
//this is a basic mutex for all ReferenceCounter derivatives operations INCLUDING the counter itself for consistency !
|
|
|
|
mutable std::recursive_mutex m_mutex;
|
|
|
|
|
|
|
|
private:
|
2016-06-08 22:08:14 -04:00
|
|
|
int refCount;
|
|
|
|
// int index;
|
2015-07-29 18:34:58 -04:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2015-08-11 21:19:55 -04:00
|
|
|
#define REBUFFER_GC_LIMIT 100
|
|
|
|
|
2016-06-07 19:54:36 -04:00
|
|
|
class ReBufferGC {
|
|
|
|
public:
|
|
|
|
static void garbageCollect() {
|
|
|
|
std::lock_guard < std::mutex > lock(g_mutex);
|
|
|
|
|
|
|
|
std::deque<ReferenceCounter *> garbageRemoval;
|
|
|
|
for (typename std::set<ReferenceCounter *>::iterator i = garbage.begin(); i != garbage.end(); i++) {
|
|
|
|
if ((*i)->getRefCount() <= 0) {
|
|
|
|
garbageRemoval.push_back(*i);
|
|
|
|
}
|
|
|
|
else {
|
2016-06-08 22:08:14 -04:00
|
|
|
// std::cout << "Garbage in queue buffer idx #" << (*i)->getIndex() << ", " << (*i)->getRefCount() << " usage(s)" << std::endl;
|
|
|
|
std::cout << "Garbage in queue buffer with " << (*i)->getRefCount() << " usage(s)" << std::endl;
|
2016-06-07 19:54:36 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if ( garbageRemoval.size() ) {
|
|
|
|
std::cout << "Garbage collecting " << garbageRemoval.size() << " ReBuffer(s)" << std::endl;
|
|
|
|
while (!garbageRemoval.empty()) {
|
|
|
|
ReferenceCounter *ref = garbageRemoval.back();
|
|
|
|
garbageRemoval.pop_back();
|
|
|
|
garbage.erase(ref);
|
|
|
|
delete ref;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void addGarbage(ReferenceCounter *ref) {
|
|
|
|
std::lock_guard < std::mutex > lock(g_mutex);
|
|
|
|
garbage.insert(ref);
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
static std::mutex g_mutex;
|
|
|
|
static std::set<ReferenceCounter *> garbage;
|
|
|
|
};
|
|
|
|
|
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
template<class BufferType = ReferenceCounter>
|
|
|
|
class ReBuffer {
|
|
|
|
|
|
|
|
public:
|
2015-12-04 22:10:51 -05:00
|
|
|
ReBuffer(std::string bufferId) : bufferId(bufferId) {
|
2016-06-08 22:08:14 -04:00
|
|
|
// indexCounter.store(0);
|
2015-12-04 22:10:51 -05:00
|
|
|
}
|
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
BufferType *getBuffer() {
|
2016-06-01 13:51:01 -04:00
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
|
|
|
|
|
|
|
BufferType* buf = nullptr;
|
2015-07-29 18:34:58 -04:00
|
|
|
for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) {
|
2016-06-01 13:51:01 -04:00
|
|
|
if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) {
|
2015-08-11 21:19:55 -04:00
|
|
|
buf = (*outputBuffersI);
|
2016-06-08 19:48:46 -04:00
|
|
|
buf->setRefCount(1);
|
2015-08-11 21:19:55 -04:00
|
|
|
} else if ((*outputBuffersI)->getRefCount() <= 0) {
|
|
|
|
(*outputBuffersI)->decRefCount();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-01 13:51:01 -04:00
|
|
|
if (buf != nullptr) {
|
2015-08-11 21:19:55 -04:00
|
|
|
if (outputBuffers.back()->getRefCount() < -REBUFFER_GC_LIMIT) {
|
|
|
|
BufferType *ref = outputBuffers.back();
|
|
|
|
outputBuffers.pop_back();
|
|
|
|
delete ref;
|
2015-07-29 18:34:58 -04:00
|
|
|
}
|
2016-06-08 22:08:14 -04:00
|
|
|
// buf->setIndex(indexCounter++);
|
2015-08-11 21:19:55 -04:00
|
|
|
return buf;
|
2015-07-29 18:34:58 -04:00
|
|
|
}
|
|
|
|
|
2015-12-04 22:10:51 -05:00
|
|
|
#define REBUFFER_WARNING_THRESHOLD 100
|
|
|
|
if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) {
|
|
|
|
std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl;
|
|
|
|
}
|
|
|
|
|
2016-06-09 13:20:16 -04:00
|
|
|
//by default created with refcount = 1
|
2015-07-29 18:34:58 -04:00
|
|
|
buf = new BufferType();
|
2016-06-08 22:08:14 -04:00
|
|
|
// buf->setIndex(indexCounter++);
|
2015-07-29 18:34:58 -04:00
|
|
|
outputBuffers.push_back(buf);
|
|
|
|
|
|
|
|
return buf;
|
|
|
|
}
|
|
|
|
|
|
|
|
void purge() {
|
2016-06-01 13:51:01 -04:00
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
2016-06-08 21:54:02 -04:00
|
|
|
// if (bufferId == "DemodulatorThreadBuffers") {
|
|
|
|
// std::cout << "'" << bufferId << "' purging.. total indexes: " << indexCounter.load() << std::endl;
|
|
|
|
// }
|
2015-07-29 18:34:58 -04:00
|
|
|
while (!outputBuffers.empty()) {
|
|
|
|
BufferType *ref = outputBuffers.front();
|
|
|
|
outputBuffers.pop_front();
|
2016-06-07 19:54:36 -04:00
|
|
|
if (ref->getRefCount() <= 0) {
|
|
|
|
delete ref;
|
|
|
|
} else {
|
2016-06-08 21:54:02 -04:00
|
|
|
// Something isn't done with it yet; throw it on the pile.. keep this as a bug indicator for now..
|
2016-06-08 19:48:46 -04:00
|
|
|
std::cout << "'" << bufferId << "' pushed garbage.." << std::endl;
|
2016-06-07 19:54:36 -04:00
|
|
|
ReBufferGC::addGarbage(ref);
|
|
|
|
}
|
2015-07-29 18:34:58 -04:00
|
|
|
}
|
|
|
|
}
|
2016-06-07 19:54:36 -04:00
|
|
|
|
|
|
|
private:
|
2015-12-04 22:10:51 -05:00
|
|
|
std::string bufferId;
|
2015-07-29 18:34:58 -04:00
|
|
|
std::deque<BufferType*> outputBuffers;
|
|
|
|
typename std::deque<BufferType*>::iterator outputBuffersI;
|
2016-06-01 13:51:01 -04:00
|
|
|
mutable std::mutex m_mutex;
|
2016-06-08 22:08:14 -04:00
|
|
|
// std::atomic_int indexCounter;
|
2015-07-29 18:34:58 -04:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
class IOThread {
|
|
|
|
public:
|
2015-07-29 20:57:02 -04:00
|
|
|
IOThread();
|
2015-07-29 22:52:54 -04:00
|
|
|
virtual ~IOThread();
|
2015-07-29 20:57:02 -04:00
|
|
|
|
|
|
|
static void *pthread_helper(void *context);
|
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
#ifdef __APPLE__
|
2015-07-29 20:57:02 -04:00
|
|
|
virtual void *threadMain();
|
2015-07-29 18:34:58 -04:00
|
|
|
#else
|
2016-06-28 15:04:52 -04:00
|
|
|
|
|
|
|
//the thread Main call back itself
|
2015-07-29 20:57:02 -04:00
|
|
|
virtual void threadMain();
|
2015-07-29 18:34:58 -04:00
|
|
|
#endif
|
2015-07-29 20:57:02 -04:00
|
|
|
|
|
|
|
virtual void setup();
|
|
|
|
virtual void run();
|
2016-06-28 15:04:52 -04:00
|
|
|
|
|
|
|
//Request for termination (asynchronous)
|
2015-07-29 20:57:02 -04:00
|
|
|
virtual void terminate();
|
2016-06-28 15:04:52 -04:00
|
|
|
|
|
|
|
//Returns true if the thread is indeed terminated, i.e the run() method
|
|
|
|
//has returned.
|
|
|
|
//If wait > 0 ms, the call is blocking at most 'waitMs' milliseconds for the thread to die, then returns.
|
|
|
|
//If wait < 0, the wait in infinite until the thread dies.
|
|
|
|
bool isTerminated(int waitMs = 0);
|
|
|
|
|
2015-07-29 20:57:02 -04:00
|
|
|
virtual void onBindOutput(std::string name, ThreadQueueBase* threadQueue);
|
|
|
|
virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue);
|
|
|
|
|
|
|
|
void setInputQueue(std::string qname, ThreadQueueBase *threadQueue);
|
2016-06-01 13:42:11 -04:00
|
|
|
ThreadQueueBase *getInputQueue(std::string qname);
|
2015-07-29 20:57:02 -04:00
|
|
|
void setOutputQueue(std::string qname, ThreadQueueBase *threadQueue);
|
2016-06-01 13:42:11 -04:00
|
|
|
ThreadQueueBase *getOutputQueue(std::string qname);
|
2015-07-29 18:34:58 -04:00
|
|
|
|
|
|
|
protected:
|
2015-07-29 20:57:02 -04:00
|
|
|
std::map<std::string, ThreadQueueBase *, map_string_less> input_queues;
|
|
|
|
std::map<std::string, ThreadQueueBase *, map_string_less> output_queues;
|
2016-06-28 15:04:52 -04:00
|
|
|
|
|
|
|
//true when a termination is ordered
|
|
|
|
std::atomic_bool stopping;
|
2015-09-09 23:29:38 -04:00
|
|
|
Timer gTimer;
|
2016-06-28 15:04:52 -04:00
|
|
|
|
|
|
|
private:
|
|
|
|
//true when the thread has really ended, i.e run() from threadMain() has returned.
|
|
|
|
std::atomic_bool terminated;
|
|
|
|
|
2015-07-29 18:34:58 -04:00
|
|
|
};
|