From 05cd99bbf10ef5f4e97a4f62fcadc81b332aaaf6 Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Wed, 29 Jul 2015 20:57:02 -0400 Subject: [PATCH] IOThread all the things! --- src/IOThread.cpp | 67 +++++++++++++++++++++++- src/IOThread.h | 74 +++++++++------------------ src/audio/AudioThread.cpp | 27 +++++----- src/audio/AudioThread.h | 5 +- src/demod/DemodulatorPreThread.cpp | 15 +----- src/demod/DemodulatorPreThread.h | 9 +--- src/demod/DemodulatorThread.cpp | 11 +--- src/demod/DemodulatorThread.h | 9 +--- src/demod/DemodulatorWorkerThread.cpp | 5 +- src/demod/DemodulatorWorkerThread.h | 6 +-- src/sdr/SDRPostThread.cpp | 8 +-- src/sdr/SDRPostThread.h | 5 +- src/sdr/SDRThread.cpp | 10 +--- src/sdr/SDRThread.h | 7 +-- src/util/ThreadQueue.h | 6 ++- 15 files changed, 128 insertions(+), 136 deletions(-) diff --git a/src/IOThread.cpp b/src/IOThread.cpp index 5bde1fe..3a93e95 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -1 +1,66 @@ -#include "IOThread.h" \ No newline at end of file +#include "IOThread.h" + +IOThread::IOThread() { + terminated.store(false); +} + +IOThread::~IOThread() { + +} + +#ifdef __APPLE__ +void *IOThread::threadMain() { + run(); + return this; +}; + +void *IOThread::pthread_helper(void *context) { + return ((IOThread *) context)->threadMain(); +}; +#else +void IOThread::threadMain() { + run(); +}; +#endif + +void IOThread::setup() { + +}; + +void IOThread::run() { + +}; + +void IOThread::terminate() { + terminated.store(true); +}; + +void IOThread::onBindOutput(std::string name, ThreadQueueBase* threadQueue) { + +}; + +void IOThread::onBindInput(std::string name, ThreadQueueBase* threadQueue) { + +}; + +void IOThread::setInputQueue(std::string qname, ThreadQueueBase *threadQueue) { + input_queues[qname] = threadQueue; + this->onBindInput(qname, threadQueue); +}; + +void *IOThread::getInputQueue(std::string qname) { + return input_queues[qname]; +}; + +void IOThread::setOutputQueue(std::string qname, ThreadQueueBase *threadQueue) { + output_queues[qname] = threadQueue; + this->onBindOutput(qname, threadQueue); +}; + +void *IOThread::getOutputQueue(std::string qname) { + return output_queues[qname]; +}; + +bool IOThread::isTerminated() { + return terminated.load(); +} \ No newline at end of file diff --git a/src/IOThread.h b/src/IOThread.h index 8adb726..dc76267 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -6,6 +6,7 @@ #include #include +#include "ThreadQueue.h" struct map_string_less : public std::binary_function { @@ -69,61 +70,32 @@ private: class IOThread { public: - virtual void setup() { - - }; - - virtual void init() { - - }; - - virtual void onBindOutput(std::string name, void* threadQueue) { - - }; - - virtual void onBindInput(std::string name, void* threadQueue) { - - }; - + IOThread(); + ~IOThread(); + + static void *pthread_helper(void *context); + #ifdef __APPLE__ - virtual void *threadMain() { - return 0; - }; - - static void *pthread_helper(void *context) { - return ((IOThread *) context)->threadMain(); - }; - + virtual void *threadMain(); #else - virtual void threadMain() { - - }; + virtual void threadMain(); #endif - - virtual void terminate() { - - }; - - void setInputQueue(std::string qname, void *threadQueue) { - input_queues[qname] = threadQueue; - this->onBindInput(qname, threadQueue); - }; - - void *getInputQueue(std::string qname) { - return input_queues[qname]; - }; - - void setOutputQueue(std::string qname, void *threadQueue) { - output_queues[qname] = threadQueue; - this->onBindOutput(qname, threadQueue); - }; - - void *getOutputQueue(std::string qname) { - return output_queues[qname]; - }; + + virtual void setup(); + virtual void run(); + virtual void terminate(); + bool isTerminated(); + virtual void onBindOutput(std::string name, ThreadQueueBase* threadQueue); + virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue); + + void setInputQueue(std::string qname, ThreadQueueBase *threadQueue); + void *getInputQueue(std::string qname); + void setOutputQueue(std::string qname, ThreadQueueBase *threadQueue); + void *getOutputQueue(std::string qname); protected: - std::map input_queues; - std::map output_queues; + std::map input_queues; + std::map output_queues; + std::atomic_bool terminated; }; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 384b28d..458d70c 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -11,13 +11,12 @@ std::map AudioThread::deviceController; std::map AudioThread::deviceSampleRate; std::map AudioThread::deviceThread; -AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : +AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(), currentInput(NULL), inputQueue(inputQueue), gain( 1.0), threadQueueNotify(threadQueueNotify), sampleRate(0), nBufferFrames(1024) { audioQueuePtr.store(0); underflowCount.store(0); - terminated.store(false); active.store(false); outputDevice.store(-1); @@ -56,7 +55,7 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu float *out = (float*) outputBuffer; memset(out, 0, nBufferFrames * 2 * sizeof(float)); - if (src->terminated) { + if (src->isTerminated()) { return 1; } @@ -72,17 +71,17 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu for (int j = 0; j < src->boundThreads.load()->size(); j++) { AudioThread *srcmix = (*(src->boundThreads.load()))[j]; - if (srcmix->terminated || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) { + if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) { continue; } if (!srcmix->currentInput) { srcmix->audioQueuePtr = 0; - if (srcmix->terminated || srcmix->inputQueue->empty()) { + if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { continue; } srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->terminated) { + if (srcmix->isTerminated()) { continue; } continue; @@ -117,11 +116,11 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu srcmix->currentInput->decRefCount(); srcmix->currentInput = NULL; } - if (srcmix->terminated || srcmix->inputQueue->empty()) { + if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { continue; } srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->terminated) { + if (srcmix->isTerminated()) { continue; } } @@ -138,11 +137,11 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu srcmix->currentInput->decRefCount(); srcmix->currentInput = NULL; } - if (srcmix->terminated || srcmix->inputQueue->empty()) { + if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { break; } srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->terminated) { + if (srcmix->isTerminated()) { break; } float srcPeak = srcmix->currentInput->peak * srcmix->gain; @@ -165,11 +164,11 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu srcmix->currentInput->decRefCount(); srcmix->currentInput = NULL; } - if (srcmix->terminated || srcmix->inputQueue->empty()) { + if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { break; } srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->terminated) { + if (srcmix->isTerminated()) { break; } float srcPeak = srcmix->currentInput->peak * srcmix->gain; @@ -359,7 +358,7 @@ void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) { this->sampleRate = sampleRate; } -void AudioThread::threadMain() { +void AudioThread::run() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread int priority = sched_get_priority_max( SCHED_RR) - 1; @@ -378,8 +377,6 @@ void AudioThread::threadMain() { std::cout << "Audio thread started." << std::endl; - terminated = false; - while (!terminated) { AudioThreadCommand command; cmdQueue.pop(command); diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 2e2adbf..1409c49 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -47,14 +47,13 @@ public: typedef ThreadQueue AudioThreadInputQueue; typedef ThreadQueue AudioThreadCommandQueue; -class AudioThread { +class AudioThread : public IOThread { public: AudioThreadInput *currentInput; AudioThreadInputQueue *inputQueue; std::atomic_uint audioQueuePtr; std::atomic_uint underflowCount; - std::atomic_bool terminated; std::atomic_bool initialized; std::atomic_bool active; std::atomic_int outputDevice; @@ -70,7 +69,7 @@ public: int getOutputDevice(); void setSampleRate(int sampleRate); int getSampleRate(); - void threadMain(); + void run(); void terminate(); bool isActive(); diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index a449699..eb9958c 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -9,11 +9,10 @@ #include "CubicSDR.h" DemodulatorPreThread::DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQueue, DemodulatorThreadPostInputQueue* iqOutputQueue, - DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify) : + DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(), iqInputQueue(iqInputQueue), iqOutputQueue(iqOutputQueue), audioResampler(NULL), stereoResampler(NULL), iqResampleRatio( 1), audioResampleRatio(1), firStereoRight(NULL), firStereoLeft(NULL), iirStereoPilot(NULL), iqResampler(NULL), commandQueue(NULL), threadQueueNotify(threadQueueNotify), threadQueueControl( threadQueueControl) { - terminated.store(false); initialized.store(false); freqShifter = nco_crcf_create(LIQUID_VCO); @@ -80,11 +79,7 @@ DemodulatorPreThread::~DemodulatorPreThread() { delete workerResults; } -#ifdef __APPLE__ -void *DemodulatorPreThread::threadMain() { -#else -void DemodulatorPreThread::threadMain() { -#endif +void DemodulatorPreThread::run() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread int priority = sched_get_priority_max( SCHED_FIFO) - 1; @@ -105,8 +100,6 @@ void DemodulatorPreThread::threadMain() { // liquid_float_complex carrySample; // Keep the stream count even to simplify some demod operations // bool carrySampleFlag = false; - terminated = false; - while (!terminated) { DemodulatorThreadIQData *inp; iqInputQueue->pop(inp); @@ -319,10 +312,6 @@ void DemodulatorPreThread::threadMain() { tCmd.context = this; threadQueueNotify->push(tCmd); std::cout << "Demodulator preprocessor thread done." << std::endl; - -#ifdef __APPLE__ - return this; -#endif } void DemodulatorPreThread::terminate() { diff --git a/src/demod/DemodulatorPreThread.h b/src/demod/DemodulatorPreThread.h index b5e5ec2..6e3585a 100644 --- a/src/demod/DemodulatorPreThread.h +++ b/src/demod/DemodulatorPreThread.h @@ -7,18 +7,14 @@ #include "DemodDefs.h" #include "DemodulatorWorkerThread.h" -class DemodulatorPreThread { +class DemodulatorPreThread : public IOThread { public: DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQueue, DemodulatorThreadPostInputQueue* iqOutputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify); ~DemodulatorPreThread(); -#ifdef __APPLE__ - void *threadMain(); -#else - void threadMain(); -#endif + void run(); void setCommandQueue(DemodulatorThreadCommandQueue *tQueue) { commandQueue = tQueue; @@ -68,7 +64,6 @@ protected: nco_crcf freqShifter; int shiftFrequency; - std::atomic_bool terminated; std::atomic_bool initialized; DemodulatorWorkerThread *workerThread; diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 0194adb..a52d64e 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -12,14 +12,13 @@ #endif DemodulatorThread::DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl, - DemodulatorThreadCommandQueue* threadQueueNotify) : + DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(), iqInputQueue(iqInputQueue), audioVisOutputQueue(NULL), audioOutputQueue(NULL), iqAutoGain(NULL), amOutputCeil(1), amOutputCeilMA(1), amOutputCeilMAA( 1), threadQueueNotify(threadQueueNotify), threadQueueControl(threadQueueControl), squelchLevel(0), signalLevel( 0), squelchEnabled(false), audioSampleRate(0) { stereo.store(false); agcEnabled.store(false); - terminated.store(false); demodulatorType.store(DEMOD_TYPE_FM); demodFM = freqdem_create(0.5); @@ -33,11 +32,7 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQue DemodulatorThread::~DemodulatorThread() { } -#ifdef __APPLE__ -void *DemodulatorThread::threadMain() { -#else -void DemodulatorThread::threadMain() { -#endif +void DemodulatorThread::run() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread int priority = sched_get_priority_max( SCHED_FIFO )-1; @@ -89,8 +84,6 @@ void DemodulatorThread::threadMain() { break; } - terminated = false; - while (!terminated) { DemodulatorThreadPostIQData *inp; iqInputQueue->pop(inp); diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index bad7050..55335b0 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -10,18 +10,14 @@ typedef ThreadQueue DemodulatorThreadOutputQueue; #define DEMOD_VIS_SIZE 1024 -class DemodulatorThread { +class DemodulatorThread : public IOThread { public: DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify); ~DemodulatorThread(); -#ifdef __APPLE__ - void *threadMain(); -#else - void threadMain(); -#endif + void run(); void setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue); void setAudioOutputQueue(AudioThreadInputQueue *tQueue); @@ -76,7 +72,6 @@ protected: std::atomic_bool stereo; std::atomic_bool agcEnabled; - std::atomic_bool terminated; std::atomic_int demodulatorType; int audioSampleRate; diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index a3d13cf..d7df90b 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -2,15 +2,14 @@ #include "CubicSDRDefs.h" #include -DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) : +DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) : IOThread(), commandQueue(in), resultQueue(out) { - terminated.store(false); } DemodulatorWorkerThread::~DemodulatorWorkerThread() { } -void DemodulatorWorkerThread::threadMain() { +void DemodulatorWorkerThread::run() { std::cout << "Demodulator worker thread started.." << std::endl; diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h index 86c5baf..00afbcc 100644 --- a/src/demod/DemodulatorWorkerThread.h +++ b/src/demod/DemodulatorWorkerThread.h @@ -70,13 +70,13 @@ public: typedef ThreadQueue DemodulatorThreadWorkerCommandQueue; typedef ThreadQueue DemodulatorThreadWorkerResultQueue; -class DemodulatorWorkerThread { +class DemodulatorWorkerThread : public IOThread { public: DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out); ~DemodulatorWorkerThread(); - void threadMain(); + void run(); void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) { commandQueue = tQueue; @@ -92,6 +92,4 @@ protected: DemodulatorThreadWorkerCommandQueue *commandQueue; DemodulatorThreadWorkerResultQueue *resultQueue; - - std::atomic_bool terminated; }; diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index de155d5..6a52046 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -5,10 +5,9 @@ #include #include -SDRPostThread::SDRPostThread() : +SDRPostThread::SDRPostThread() : IOThread(), iqDataInQueue(NULL), iqDataOutQueue(NULL), iqVisualQueue(NULL), dcFilter(NULL), num_vis_samples(16384*2) { - terminated.store(false); swapIQ.store(false); // create a lookup table @@ -77,10 +76,7 @@ bool SDRPostThread::getSwapIQ() { return this->swapIQ.load(); } -void SDRPostThread::threadMain() { - int n_read; - double seconds = 0.0; - +void SDRPostThread::run() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread int priority = sched_get_priority_max( SCHED_FIFO) - 1; diff --git a/src/sdr/SDRPostThread.h b/src/sdr/SDRPostThread.h index a1aed7d..d95e2a6 100644 --- a/src/sdr/SDRPostThread.h +++ b/src/sdr/SDRPostThread.h @@ -3,7 +3,7 @@ #include "SDRThread.h" #include -class SDRPostThread { +class SDRPostThread : public IOThread { public: SDRPostThread(); ~SDRPostThread(); @@ -21,7 +21,7 @@ public: void setSwapIQ(bool swapIQ); bool getSwapIQ(); - void threadMain(); + void run(); void terminate(); protected: @@ -31,7 +31,6 @@ protected: std::mutex busy_demod; std::vector demodulators; - std::atomic_bool terminated; iirfilt_crcf dcFilter; int num_vis_samples; std::atomic_bool swapIQ; diff --git a/src/sdr/SDRThread.cpp b/src/sdr/SDRThread.cpp index b5eb73c..99202bb 100644 --- a/src/sdr/SDRThread.cpp +++ b/src/sdr/SDRThread.cpp @@ -3,9 +3,8 @@ #include #include "CubicSDR.h" -SDRThread::SDRThread(SDRThreadCommandQueue* pQueue) : +SDRThread::SDRThread(SDRThreadCommandQueue* pQueue) : IOThread(), commandQueue(pQueue), iqDataOutQueue(NULL) { - terminated.store(false); offset.store(0); deviceId.store(-1); dev = NULL; @@ -114,7 +113,7 @@ int SDRThread::enumerate_rtl(std::vector *devs) { } -void SDRThread::threadMain() { +void SDRThread::run() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread int priority = sched_get_priority_max( SCHED_FIFO) - 1; @@ -124,8 +123,6 @@ void SDRThread::threadMain() { std::cout << "SDR thread initializing.." << std::endl; - int devCount = rtlsdr_get_device_count(); - std::vector devs; if (deviceId == -1) { deviceId = enumerate_rtl(&devs); @@ -303,6 +300,3 @@ void SDRThread::threadMain() { std::cout << "SDR thread done." << std::endl; } -void SDRThread::terminate() { - terminated = true; -} diff --git a/src/sdr/SDRThread.h b/src/sdr/SDRThread.h index 67318a9..1ddaec0 100644 --- a/src/sdr/SDRThread.h +++ b/src/sdr/SDRThread.h @@ -122,7 +122,7 @@ public: typedef ThreadQueue SDRThreadCommandQueue; typedef ThreadQueue SDRThreadIQDataQueue; -class SDRThread { +class SDRThread : public IOThread { public: rtlsdr_dev_t *dev; @@ -131,14 +131,12 @@ public: static int enumerate_rtl(std::vector *devs); - void threadMain(); + void run(); void setIQDataOutQueue(SDRThreadIQDataQueue* iqDataQueue) { iqDataOutQueue = iqDataQueue; } - void terminate(); - int getDeviceId() const { return deviceId.load(); } @@ -153,6 +151,5 @@ protected: std::atomic commandQueue; std::atomic iqDataOutQueue; - std::atomic_bool terminated; std::atomic_int deviceId; }; diff --git a/src/util/ThreadQueue.h b/src/util/ThreadQueue.h index 66dff2a..6ef108a 100644 --- a/src/util/ThreadQueue.h +++ b/src/util/ThreadQueue.h @@ -15,9 +15,13 @@ #include #include +class ThreadQueueBase { + +}; + /** A thread-safe asynchronous queue */ template> -class ThreadQueue { +class ThreadQueue : public ThreadQueueBase { typedef typename Container::value_type value_type; typedef typename Container::size_type size_type;