IOThread all the things!

This commit is contained in:
Charles J. Cliffe 2015-07-29 20:57:02 -04:00
parent 3ab8669d06
commit 05cd99bbf1
15 changed files with 128 additions and 136 deletions

View File

@ -1 +1,66 @@
#include "IOThread.h"
#include "IOThread.h"
IOThread::IOThread() {
terminated.store(false);
}
IOThread::~IOThread() {
}
#ifdef __APPLE__
void *IOThread::threadMain() {
run();
return this;
};
void *IOThread::pthread_helper(void *context) {
return ((IOThread *) context)->threadMain();
};
#else
void IOThread::threadMain() {
run();
};
#endif
void IOThread::setup() {
};
void IOThread::run() {
};
void IOThread::terminate() {
terminated.store(true);
};
void IOThread::onBindOutput(std::string name, ThreadQueueBase* threadQueue) {
};
void IOThread::onBindInput(std::string name, ThreadQueueBase* threadQueue) {
};
void IOThread::setInputQueue(std::string qname, ThreadQueueBase *threadQueue) {
input_queues[qname] = threadQueue;
this->onBindInput(qname, threadQueue);
};
void *IOThread::getInputQueue(std::string qname) {
return input_queues[qname];
};
void IOThread::setOutputQueue(std::string qname, ThreadQueueBase *threadQueue) {
output_queues[qname] = threadQueue;
this->onBindOutput(qname, threadQueue);
};
void *IOThread::getOutputQueue(std::string qname) {
return output_queues[qname];
};
bool IOThread::isTerminated() {
return terminated.load();
}

View File

@ -6,6 +6,7 @@
#include <map>
#include <string>
#include "ThreadQueue.h"
struct map_string_less : public std::binary_function<std::string,std::string,bool>
{
@ -69,61 +70,32 @@ private:
class IOThread {
public:
virtual void setup() {
};
virtual void init() {
};
virtual void onBindOutput(std::string name, void* threadQueue) {
};
virtual void onBindInput(std::string name, void* threadQueue) {
};
IOThread();
~IOThread();
static void *pthread_helper(void *context);
#ifdef __APPLE__
virtual void *threadMain() {
return 0;
};
static void *pthread_helper(void *context) {
return ((IOThread *) context)->threadMain();
};
virtual void *threadMain();
#else
virtual void threadMain() {
};
virtual void threadMain();
#endif
virtual void terminate() {
};
void setInputQueue(std::string qname, void *threadQueue) {
input_queues[qname] = threadQueue;
this->onBindInput(qname, threadQueue);
};
void *getInputQueue(std::string qname) {
return input_queues[qname];
};
void setOutputQueue(std::string qname, void *threadQueue) {
output_queues[qname] = threadQueue;
this->onBindOutput(qname, threadQueue);
};
void *getOutputQueue(std::string qname) {
return output_queues[qname];
};
virtual void setup();
virtual void run();
virtual void terminate();
bool isTerminated();
virtual void onBindOutput(std::string name, ThreadQueueBase* threadQueue);
virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue);
void setInputQueue(std::string qname, ThreadQueueBase *threadQueue);
void *getInputQueue(std::string qname);
void setOutputQueue(std::string qname, ThreadQueueBase *threadQueue);
void *getOutputQueue(std::string qname);
protected:
std::map<std::string, void *, map_string_less> input_queues;
std::map<std::string, void *, map_string_less> output_queues;
std::map<std::string, ThreadQueueBase *, map_string_less> input_queues;
std::map<std::string, ThreadQueueBase *, map_string_less> output_queues;
std::atomic_bool terminated;
};

View File

@ -11,13 +11,12 @@ std::map<int, AudioThread *> AudioThread::deviceController;
std::map<int, int> AudioThread::deviceSampleRate;
std::map<int, std::thread *> AudioThread::deviceThread;
AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) :
AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(),
currentInput(NULL), inputQueue(inputQueue), gain(
1.0), threadQueueNotify(threadQueueNotify), sampleRate(0), nBufferFrames(1024) {
audioQueuePtr.store(0);
underflowCount.store(0);
terminated.store(false);
active.store(false);
outputDevice.store(-1);
@ -56,7 +55,7 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu
float *out = (float*) outputBuffer;
memset(out, 0, nBufferFrames * 2 * sizeof(float));
if (src->terminated) {
if (src->isTerminated()) {
return 1;
}
@ -72,17 +71,17 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu
for (int j = 0; j < src->boundThreads.load()->size(); j++) {
AudioThread *srcmix = (*(src->boundThreads.load()))[j];
if (srcmix->terminated || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
continue;
}
if (!srcmix->currentInput) {
srcmix->audioQueuePtr = 0;
if (srcmix->terminated || srcmix->inputQueue->empty()) {
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
continue;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->terminated) {
if (srcmix->isTerminated()) {
continue;
}
continue;
@ -117,11 +116,11 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu
srcmix->currentInput->decRefCount();
srcmix->currentInput = NULL;
}
if (srcmix->terminated || srcmix->inputQueue->empty()) {
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
continue;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->terminated) {
if (srcmix->isTerminated()) {
continue;
}
}
@ -138,11 +137,11 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu
srcmix->currentInput->decRefCount();
srcmix->currentInput = NULL;
}
if (srcmix->terminated || srcmix->inputQueue->empty()) {
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
break;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->terminated) {
if (srcmix->isTerminated()) {
break;
}
float srcPeak = srcmix->currentInput->peak * srcmix->gain;
@ -165,11 +164,11 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu
srcmix->currentInput->decRefCount();
srcmix->currentInput = NULL;
}
if (srcmix->terminated || srcmix->inputQueue->empty()) {
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
break;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->terminated) {
if (srcmix->isTerminated()) {
break;
}
float srcPeak = srcmix->currentInput->peak * srcmix->gain;
@ -359,7 +358,7 @@ void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) {
this->sampleRate = sampleRate;
}
void AudioThread::threadMain() {
void AudioThread::run() {
#ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread
int priority = sched_get_priority_max( SCHED_RR) - 1;
@ -378,8 +377,6 @@ void AudioThread::threadMain() {
std::cout << "Audio thread started." << std::endl;
terminated = false;
while (!terminated) {
AudioThreadCommand command;
cmdQueue.pop(command);

View File

@ -47,14 +47,13 @@ public:
typedef ThreadQueue<AudioThreadInput *> AudioThreadInputQueue;
typedef ThreadQueue<AudioThreadCommand> AudioThreadCommandQueue;
class AudioThread {
class AudioThread : public IOThread {
public:
AudioThreadInput *currentInput;
AudioThreadInputQueue *inputQueue;
std::atomic_uint audioQueuePtr;
std::atomic_uint underflowCount;
std::atomic_bool terminated;
std::atomic_bool initialized;
std::atomic_bool active;
std::atomic_int outputDevice;
@ -70,7 +69,7 @@ public:
int getOutputDevice();
void setSampleRate(int sampleRate);
int getSampleRate();
void threadMain();
void run();
void terminate();
bool isActive();

View File

@ -9,11 +9,10 @@
#include "CubicSDR.h"
DemodulatorPreThread::DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQueue, DemodulatorThreadPostInputQueue* iqOutputQueue,
DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify) :
DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(),
iqInputQueue(iqInputQueue), iqOutputQueue(iqOutputQueue), audioResampler(NULL), stereoResampler(NULL), iqResampleRatio(
1), audioResampleRatio(1), firStereoRight(NULL), firStereoLeft(NULL), iirStereoPilot(NULL), iqResampler(NULL), commandQueue(NULL), threadQueueNotify(threadQueueNotify), threadQueueControl(
threadQueueControl) {
terminated.store(false);
initialized.store(false);
freqShifter = nco_crcf_create(LIQUID_VCO);
@ -80,11 +79,7 @@ DemodulatorPreThread::~DemodulatorPreThread() {
delete workerResults;
}
#ifdef __APPLE__
void *DemodulatorPreThread::threadMain() {
#else
void DemodulatorPreThread::threadMain() {
#endif
void DemodulatorPreThread::run() {
#ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread
int priority = sched_get_priority_max( SCHED_FIFO) - 1;
@ -105,8 +100,6 @@ void DemodulatorPreThread::threadMain() {
// liquid_float_complex carrySample; // Keep the stream count even to simplify some demod operations
// bool carrySampleFlag = false;
terminated = false;
while (!terminated) {
DemodulatorThreadIQData *inp;
iqInputQueue->pop(inp);
@ -319,10 +312,6 @@ void DemodulatorPreThread::threadMain() {
tCmd.context = this;
threadQueueNotify->push(tCmd);
std::cout << "Demodulator preprocessor thread done." << std::endl;
#ifdef __APPLE__
return this;
#endif
}
void DemodulatorPreThread::terminate() {

View File

@ -7,18 +7,14 @@
#include "DemodDefs.h"
#include "DemodulatorWorkerThread.h"
class DemodulatorPreThread {
class DemodulatorPreThread : public IOThread {
public:
DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQueue, DemodulatorThreadPostInputQueue* iqOutputQueue,
DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify);
~DemodulatorPreThread();
#ifdef __APPLE__
void *threadMain();
#else
void threadMain();
#endif
void run();
void setCommandQueue(DemodulatorThreadCommandQueue *tQueue) {
commandQueue = tQueue;
@ -68,7 +64,6 @@ protected:
nco_crcf freqShifter;
int shiftFrequency;
std::atomic_bool terminated;
std::atomic_bool initialized;
DemodulatorWorkerThread *workerThread;

View File

@ -12,14 +12,13 @@
#endif
DemodulatorThread::DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl,
DemodulatorThreadCommandQueue* threadQueueNotify) :
DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(),
iqInputQueue(iqInputQueue), audioVisOutputQueue(NULL), audioOutputQueue(NULL), iqAutoGain(NULL), amOutputCeil(1), amOutputCeilMA(1), amOutputCeilMAA(
1), threadQueueNotify(threadQueueNotify), threadQueueControl(threadQueueControl), squelchLevel(0), signalLevel(
0), squelchEnabled(false), audioSampleRate(0) {
stereo.store(false);
agcEnabled.store(false);
terminated.store(false);
demodulatorType.store(DEMOD_TYPE_FM);
demodFM = freqdem_create(0.5);
@ -33,11 +32,7 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQue
DemodulatorThread::~DemodulatorThread() {
}
#ifdef __APPLE__
void *DemodulatorThread::threadMain() {
#else
void DemodulatorThread::threadMain() {
#endif
void DemodulatorThread::run() {
#ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread
int priority = sched_get_priority_max( SCHED_FIFO )-1;
@ -89,8 +84,6 @@ void DemodulatorThread::threadMain() {
break;
}
terminated = false;
while (!terminated) {
DemodulatorThreadPostIQData *inp;
iqInputQueue->pop(inp);

View File

@ -10,18 +10,14 @@ typedef ThreadQueue<AudioThreadInput *> DemodulatorThreadOutputQueue;
#define DEMOD_VIS_SIZE 1024
class DemodulatorThread {
class DemodulatorThread : public IOThread {
public:
DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl,
DemodulatorThreadCommandQueue* threadQueueNotify);
~DemodulatorThread();
#ifdef __APPLE__
void *threadMain();
#else
void threadMain();
#endif
void run();
void setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue);
void setAudioOutputQueue(AudioThreadInputQueue *tQueue);
@ -76,7 +72,6 @@ protected:
std::atomic_bool stereo;
std::atomic_bool agcEnabled;
std::atomic_bool terminated;
std::atomic_int demodulatorType;
int audioSampleRate;

View File

@ -2,15 +2,14 @@
#include "CubicSDRDefs.h"
#include <vector>
DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) :
DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) : IOThread(),
commandQueue(in), resultQueue(out) {
terminated.store(false);
}
DemodulatorWorkerThread::~DemodulatorWorkerThread() {
}
void DemodulatorWorkerThread::threadMain() {
void DemodulatorWorkerThread::run() {
std::cout << "Demodulator worker thread started.." << std::endl;

View File

@ -70,13 +70,13 @@ public:
typedef ThreadQueue<DemodulatorWorkerThreadCommand> DemodulatorThreadWorkerCommandQueue;
typedef ThreadQueue<DemodulatorWorkerThreadResult> DemodulatorThreadWorkerResultQueue;
class DemodulatorWorkerThread {
class DemodulatorWorkerThread : public IOThread {
public:
DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out);
~DemodulatorWorkerThread();
void threadMain();
void run();
void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) {
commandQueue = tQueue;
@ -92,6 +92,4 @@ protected:
DemodulatorThreadWorkerCommandQueue *commandQueue;
DemodulatorThreadWorkerResultQueue *resultQueue;
std::atomic_bool terminated;
};

View File

@ -5,10 +5,9 @@
#include <vector>
#include <deque>
SDRPostThread::SDRPostThread() :
SDRPostThread::SDRPostThread() : IOThread(),
iqDataInQueue(NULL), iqDataOutQueue(NULL), iqVisualQueue(NULL), dcFilter(NULL), num_vis_samples(16384*2) {
terminated.store(false);
swapIQ.store(false);
// create a lookup table
@ -77,10 +76,7 @@ bool SDRPostThread::getSwapIQ() {
return this->swapIQ.load();
}
void SDRPostThread::threadMain() {
int n_read;
double seconds = 0.0;
void SDRPostThread::run() {
#ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread
int priority = sched_get_priority_max( SCHED_FIFO) - 1;

View File

@ -3,7 +3,7 @@
#include "SDRThread.h"
#include <algorithm>
class SDRPostThread {
class SDRPostThread : public IOThread {
public:
SDRPostThread();
~SDRPostThread();
@ -21,7 +21,7 @@ public:
void setSwapIQ(bool swapIQ);
bool getSwapIQ();
void threadMain();
void run();
void terminate();
protected:
@ -31,7 +31,6 @@ protected:
std::mutex busy_demod;
std::vector<DemodulatorInstance *> demodulators;
std::atomic_bool terminated;
iirfilt_crcf dcFilter;
int num_vis_samples;
std::atomic_bool swapIQ;

View File

@ -3,9 +3,8 @@
#include <vector>
#include "CubicSDR.h"
SDRThread::SDRThread(SDRThreadCommandQueue* pQueue) :
SDRThread::SDRThread(SDRThreadCommandQueue* pQueue) : IOThread(),
commandQueue(pQueue), iqDataOutQueue(NULL) {
terminated.store(false);
offset.store(0);
deviceId.store(-1);
dev = NULL;
@ -114,7 +113,7 @@ int SDRThread::enumerate_rtl(std::vector<SDRDeviceInfo *> *devs) {
}
void SDRThread::threadMain() {
void SDRThread::run() {
#ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread
int priority = sched_get_priority_max( SCHED_FIFO) - 1;
@ -124,8 +123,6 @@ void SDRThread::threadMain() {
std::cout << "SDR thread initializing.." << std::endl;
int devCount = rtlsdr_get_device_count();
std::vector<SDRDeviceInfo *> devs;
if (deviceId == -1) {
deviceId = enumerate_rtl(&devs);
@ -303,6 +300,3 @@ void SDRThread::threadMain() {
std::cout << "SDR thread done." << std::endl;
}
void SDRThread::terminate() {
terminated = true;
}

View File

@ -122,7 +122,7 @@ public:
typedef ThreadQueue<SDRThreadCommand> SDRThreadCommandQueue;
typedef ThreadQueue<SDRThreadIQData *> SDRThreadIQDataQueue;
class SDRThread {
class SDRThread : public IOThread {
public:
rtlsdr_dev_t *dev;
@ -131,14 +131,12 @@ public:
static int enumerate_rtl(std::vector<SDRDeviceInfo *> *devs);
void threadMain();
void run();
void setIQDataOutQueue(SDRThreadIQDataQueue* iqDataQueue) {
iqDataOutQueue = iqDataQueue;
}
void terminate();
int getDeviceId() const {
return deviceId.load();
}
@ -153,6 +151,5 @@ protected:
std::atomic<SDRThreadCommandQueue*> commandQueue;
std::atomic<SDRThreadIQDataQueue*> iqDataOutQueue;
std::atomic_bool terminated;
std::atomic_int deviceId;
};

View File

@ -15,9 +15,13 @@
#include <cstdint>
#include <condition_variable>
class ThreadQueueBase {
};
/** A thread-safe asynchronous queue */
template<class T, class Container = std::list<T>>
class ThreadQueue {
class ThreadQueue : public ThreadQueueBase {
typedef typename Container::value_type value_type;
typedef typename Container::size_type size_type;