SDR and Audio thread now using std::thread

Next up, demod thread..
This commit is contained in:
Charles J. Cliffe 2014-11-22 20:57:06 -05:00
parent 35830afed0
commit 26168a2713
17 changed files with 291 additions and 500 deletions

View File

@ -90,15 +90,11 @@ SET (cubicsdr_sources
src/CubicSDR.cpp
src/AppFrame.cpp
src/sdr/SDRThread.cpp
src/sdr/SDRThreadQueue.cpp
src/sdr/SDRThreadTask.cpp
src/demod/DemodulatorThread.cpp
src/demod/DemodulatorThreadQueue.cpp
src/demod/DemodulatorThreadTask.cpp
src/demod/DemodulatorMgr.cpp
src/audio/AudioThread.cpp
src/audio/AudioThreadQueue.cpp
src/audio/AudioThreadTask.cpp
src/util/Gradient.cpp
src/util/Timer.cpp
src/visual/PrimaryGLContext.cpp
@ -115,15 +111,11 @@ SET (cubicsdr_headers
src/CubicSDR.h
src/AppFrame.h
src/sdr/SDRThread.h
src/sdr/SDRThreadQueue.h
src/sdr/SDRThreadTask.h
src/demod/DemodulatorThread.h
src/demod/DemodulatorThreadQueue.h
src/demod/DemodulatorThreadTask.h
src/demod/DemodulatorMgr.h
src/audio/AudioThread.h
src/audio/AudioThreadQueue.h
src/audio/AudioThreadTask.h
src/util/Gradient.h
src/util/Timer.h
src/util/ThreadQueue.h

View File

@ -28,11 +28,6 @@ wxEND_EVENT_TABLE()
AppFrame::AppFrame() :
wxFrame(NULL, wxID_ANY, wxT("CubicSDR")), frequency(DEFAULT_FREQ) {
audioThreadQueue = new ThreadQueue<std::string>;
audioThread = new AudioThreadNew(audioThreadQueue);
t1 = new std::thread(&AudioThreadNew::threadMain, audioThread);
wxBoxSizer *vbox = new wxBoxSizer(wxVERTICAL);
scopeCanvas = new ScopeCanvas(this, NULL);
@ -63,51 +58,35 @@ AppFrame::AppFrame() :
Centre();
Show();
threadQueueSDR = new SDRThreadQueue(this);
t_SDR = new SDRThread(threadQueueSDR);
if (t_SDR->Run() != wxTHREAD_NO_ERROR) {
wxLogError
("Can't create the SDR thread!");
delete t_SDR;
t_SDR = NULL;
}
t_SDR->SetPriority(80);
audioInputQueue = new AudioThreadInputQueue;
audioThread = new AudioThread(audioInputQueue);
threadQueueAudio = new AudioThreadQueue(this);
t_Audio = new AudioThread(threadQueueAudio);
if (t_Audio->Run() != wxTHREAD_NO_ERROR) {
wxLogError
("Can't create the Audio thread!");
delete t_Audio;
t_Audio = NULL;
}
t_Audio->SetPriority(80);
t1 = new std::thread(&AudioThread::threadMain, audioThread);
demodulatorTest = demodMgr.newThread(this);
demodulatorTest->params.audioQueue = threadQueueAudio;
demodulatorTest->params.audioInputQueue = audioInputQueue;
demodulatorTest->run();
t_SDR->bindDemodulator(*demodulatorTest);
threadCmdQueueSDR = new SDRThreadCommandQueue;
sdrThread = new SDRThread(threadCmdQueueSDR);
sdrThread->bindDemodulator(*demodulatorTest);
iqVisualQueue = new SDRThreadIQDataQueue;
sdrThread->setIQVisualQueue(iqVisualQueue);
t_SDR = new std::thread(&SDRThread::threadMain, sdrThread);
// static const int attribs[] = { WX_GL_RGBA, WX_GL_DOUBLEBUFFER, 0 };
// wxLogStatus("Double-buffered display %s supported", wxGLCanvas::IsDisplaySupported(attribs) ? "is" : "not");
// ShowFullScreen(true);
}
AppFrame::~AppFrame() {
{
wxCriticalSectionLocker enter(m_pThreadCS);
if (t_SDR) {
wxMessageOutputDebug().Printf("CubicSDR: deleting thread");
if (t_SDR->Delete() != wxTHREAD_NO_ERROR) {
wxLogError
("Can't delete the thread!");
}
}
}
// delete t_SDR;
delete threadQueueAudio;
delete threadQueueSDR;
delete audioInputQueue;
delete audioThread;
delete threadCmdQueueSDR;
}
void AppFrame::OnClose(wxCommandEvent& WXUNUSED(event)) {
@ -131,20 +110,7 @@ void AppFrame::OnThread(wxCommandEvent& event) {
switch (event.GetId()) {
// SDR IQ -> Demodulator
case SDRThreadTask::SDR_THREAD_DATA:
iqData = (SDRThreadIQData *) event.GetClientData();
new_uc_buffer = &(iqData->data);
if (new_uc_buffer->size()) {
spectrumCanvas->setData(new_uc_buffer);
waterfallCanvas->setData(new_uc_buffer);
} else {
std::cout << "Incoming IQ data empty?" << std::endl;
}
delete iqData;
// audioThreadQueue->push(asdf);
break; // thread wants to exit: disable controls and destroy main window
// Demodulator -> Audio
case DemodulatorThreadTask::DEMOD_THREAD_AUDIO_DATA:
demodAudioData = (DemodulatorThreadAudioData *) event.GetClientData();
new_float_buffer = &(demodAudioData->data);
@ -171,15 +137,31 @@ void AppFrame::OnThread(wxCommandEvent& event) {
}
void AppFrame::OnIdle(wxIdleEvent& event) {
bool work_done = false;
event.Skip();
if (!iqVisualQueue->empty()) {
SDRThreadIQData iqData;
iqVisualQueue->pop(iqData);
if (iqData.data.size()) {
spectrumCanvas->setData(&iqData.data);
waterfallCanvas->setData(&iqData.data);
} else {
std::cout << "Incoming IQ data empty?" << std::endl;
}
work_done = true;
}
if (!work_done) {
event.Skip();
}
}
void AppFrame::setFrequency(unsigned int freq) {
frequency = freq;
SDRThreadTask task = SDRThreadTask(SDRThreadTask::SDR_THREAD_TUNING);
task.setUInt(freq);
threadQueueSDR->addTask(task, SDRThreadQueue::SDR_PRIORITY_HIGHEST);
SDRThreadCommand command(SDRThreadCommand::SDR_THREAD_CMD_TUNE);
command.int_value = freq;
threadCmdQueueSDR->push(command);
}
int AppFrame::getFrequency() {

View File

@ -32,10 +32,6 @@ private:
SpectrumCanvas *spectrumCanvas;
WaterfallCanvas *waterfallCanvas;
SDRThread *t_SDR;
SDRThreadQueue* threadQueueSDR;
AudioThread *t_Audio;
AudioThreadQueue* threadQueueAudio;
DemodulatorMgr demodMgr;
wxCriticalSection m_pThreadCS;
@ -43,9 +39,16 @@ private:
DemodulatorInstance *demodulatorTest;
ThreadQueue<std::string> *audioThreadQueue;
AudioThreadNew *audioThread;
AudioThreadInputQueue *audioInputQueue;
AudioThread *audioThread;
SDRThread *sdrThread;
SDRThreadCommandQueue* threadCmdQueueSDR;
SDRThreadIQDataQueue* iqVisualQueue;
std::thread *t1;
std::thread *t_SDR;
// event table
wxDECLARE_EVENT_TABLE();
};

View File

@ -2,45 +2,6 @@
#include "CubicSDRDefs.h"
#include <vector>
#ifdef WIN32
#include <algorithm>
#include <functional>
#include <cctype>
#include <locale>
#include <sstream>
// trim from start
static inline std::wstring &wltrim(std::wstring &s) {
s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun<int, int>(std::isspace))));
return s;
}
// trim from end
static inline std::wstring &wrtrim(std::wstring &s) {
s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun<int, int>(std::isspace))).base(), s.end());
return s;
}
// trim from both ends
static inline std::wstring &wtrim(std::wstring &s) {
return wltrim(wrtrim(s));
}
#endif
//wxDEFINE_EVENT(wxEVT_COMMAND_AudioThread_INPUT, wxThreadEvent);
AudioThread::AudioThread(AudioThreadQueue* pQueue, int id) :
wxThread(wxTHREAD_DETACHED), m_pQueue(pQueue), m_ID(id), audio_queue_ptr(0), stream(NULL) {
}
AudioThread::~AudioThread() {
PaError err;
err = Pa_StopStream(stream);
err = Pa_CloseStream(stream);
Pa_Terminate();
}
static int audioCallback(const void *inputBuffer, void *outputBuffer, unsigned long framesPerBuffer, const PaStreamCallbackTimeInfo* timeInfo,
PaStreamCallbackFlags statusFlags, void *userData) {
@ -83,55 +44,30 @@ static int audioCallback(const void *inputBuffer, void *outputBuffer, unsigned l
return paContinue;
}
wxThread::ExitCode AudioThread::Entry() {
void AudioThread::threadMain() {
PaError err;
err = Pa_Initialize();
if (err != paNoError) {
std::cout << "Error starting :(\n";
return (wxThread::ExitCode) 1;
std::cout << "Error starting portaudio :(\n";
return;
}
int preferred_device = -1;
/*
#ifdef WIN32
wchar_t dev_str[255];
memset(dev_str, 0, sizeof(wchar_t) * 255);
std::wstring env_name(L"PA_RECOMMENDED_OUTPUT_DEVICE");
GetEnvironmentVariable(wtrim(env_name).c_str(), dev_str, 255);
std::wistringstream env_result(dev_str);
if (!env_result.eof()) {
int env_dev = -1;
env_result >> env_dev;
if (env_result.eof()) { // read everything, was all a number
if (env_dev >= 0) {
std::cout << "Using preferred PortAudio device PA_RECOMMENDED_OUTPUT_DEVICE=" << env_dev << std::endl;
preferred_device = env_dev;
} else {
std::cout << "Environment variable PA_RECOMMENDED_OUTPUT_DEVICE not set, using PortAudio defaults." << std::endl;
}
} else {
std::cout << "Environment variable PA_RECOMMENDED_OUTPUT_DEVICE didn't evaluate to a number, using PortAudio defaults." << std::endl;
}
}
#endif
*/
outputParameters.device = (preferred_device != -1) ? preferred_device : Pa_GetDefaultOutputDevice();
if (outputParameters.device == paNoDevice) {
std::cout << "Error: No default output device.\n";
}
outputParameters.channelCount = 2; /* Stereo output, most likely supported. */
outputParameters.sampleFormat = paFloat32; /* 32 bit floating point output. */
outputParameters.channelCount = 2;
outputParameters.sampleFormat = paFloat32;
outputParameters.suggestedLatency = Pa_GetDeviceInfo(outputParameters.device)->defaultLowOutputLatency;
outputParameters.hostApiSpecificStreamInfo = NULL;
stream = NULL;
err = Pa_OpenStream(&stream, NULL, &outputParameters, AUDIO_FREQUENCY, paFramesPerBufferUnspecified, paPrimeOutputBuffersUsingStreamCallback|paClipOff, &audioCallback, this);
err = Pa_OpenStream(&stream, NULL, &outputParameters, AUDIO_FREQUENCY, paFramesPerBufferUnspecified,
paPrimeOutputBuffersUsingStreamCallback | paClipOff, &audioCallback, this);
err = Pa_StartStream(stream);
if (err != paNoError) {
@ -139,28 +75,101 @@ wxThread::ExitCode AudioThread::Entry() {
std::cout << "\tPortAudio error: " << Pa_GetErrorText(err) << std::endl;
}
while (!TestDestroy()) {
if (m_pQueue->stackSize()) {
while (m_pQueue->stackSize()) {
AudioThreadTask task = m_pQueue->pop(); // pop a task from the queue. this will block the worker thread if queue is empty
switch (task.m_cmd) {
case AudioThreadTask::AUDIO_THREAD_DATA:
if (!TestDestroy()) {
audio_queue.push(task.data->data);
}
delete task.data;
break;
}
}
} else {
this->Yield();
this->Sleep(1);
}
while (1) {
AudioThreadInput inp;
inputQueue->pop(inp);
audio_queue.push(inp.data);
}
std::cout << std::endl << "Audio Thread Done." << std::endl << std::endl;
return (wxThread::ExitCode) 0;
}
/*
#ifdef WIN32
#include <algorithm>
#include <functional>
#include <cctype>
#include <locale>
#include <sstream>
// trim from start
static inline std::wstring &wltrim(std::wstring &s) {
s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun<int, int>(std::isspace))));
return s;
}
// trim from end
static inline std::wstring &wrtrim(std::wstring &s) {
s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun<int, int>(std::isspace))).base(), s.end());
return s;
}
// trim from both ends
static inline std::wstring &wtrim(std::wstring &s) {
return wltrim(wrtrim(s));
}
#endif
//wxDEFINE_EVENT(wxEVT_COMMAND_AudioThread_INPUT, wxThreadEvent);
AudioThread::AudioThread(AudioThreadQueue* pQueue, int id) :
wxThread(wxTHREAD_DETACHED), m_pQueue(pQueue), m_ID(id), audio_queue_ptr(0), stream(NULL) {
}
AudioThread::~AudioThread() {
}
wxThread::ExitCode AudioThread::Entry() {
//#ifdef WIN32
// wchar_t dev_str[255];
// memset(dev_str, 0, sizeof(wchar_t) * 255);
// std::wstring env_name(L"PA_RECOMMENDED_OUTPUT_DEVICE");
// GetEnvironmentVariable(wtrim(env_name).c_str(), dev_str, 255);
// std::wistringstream env_result(dev_str);
//
// if (!env_result.eof()) {
// int env_dev = -1;
// env_result >> env_dev;
//
// if (env_result.eof()) { // read everything, was all a number
// if (env_dev >= 0) {
// std::cout << "Using preferred PortAudio device PA_RECOMMENDED_OUTPUT_DEVICE=" << env_dev << std::endl;
// preferred_device = env_dev;
// } else {
// std::cout << "Environment variable PA_RECOMMENDED_OUTPUT_DEVICE not set, using PortAudio defaults." << std::endl;
// }
// } else {
// std::cout << "Environment variable PA_RECOMMENDED_OUTPUT_DEVICE didn't evaluate to a number, using PortAudio defaults." << std::endl;
// }
// }
//#endif
while (!TestDestroy()) {
if (m_pQueue->stackSize()) {
while (m_pQueue->stackSize()) {
AudioThreadTask task = m_pQueue->pop(); // pop a task from the queue. this will block the worker thread if queue is empty
switch (task.m_cmd) {
case AudioThreadTask::AUDIO_THREAD_DATA:
if (!TestDestroy()) {
audio_queue.push(task.data->data);
}
delete task.data;
break;
}
}
} else {
this->Yield();
this->Sleep(1);
}
}
std::cout << std::endl << "Audio Thread Done." << std::endl << std::endl;
return (wxThread::ExitCode) 0;
}
*/

View File

@ -11,46 +11,47 @@
#include "wx/thread.h"
#include "AudioThreadQueue.h"
#include "AudioThread.h"
#include "ThreadQueue.h"
#include "portaudio.h"
static int audioCallback(const void *inputBuffer, void *outputBuffer, unsigned long framesPerBuffer, const PaStreamCallbackTimeInfo* timeInfo,
PaStreamCallbackFlags statusFlags, void *userData);
class AudioThread: public wxThread {
class AudioThreadInput {
public:
int frequency;
int sampleRate;
std::vector<float> data;
};
typedef ThreadQueue<AudioThreadInput> AudioThreadInputQueue;
class AudioThread {
public:
std::queue<std::vector<float> > audio_queue;
unsigned int audio_queue_ptr;
AudioThread(AudioThreadQueue* pQueue, int id = 0);
~AudioThread();
AudioThread(AudioThreadInputQueue *inputQueue) :
inputQueue(inputQueue), stream(NULL), audio_queue_ptr(0) {
protected:
virtual ExitCode Entry();
AudioThreadQueue* m_pQueue;
int m_ID;
}
~AudioThread() {
PaError err;
err = Pa_StopStream(stream);
err = Pa_CloseStream(stream);
Pa_Terminate();
std::cout << std::endl << "Audio Thread Done." << std::endl << std::endl;
}
void threadMain();
private:
AudioThreadInputQueue *inputQueue;
PaStreamParameters outputParameters;
PaStream *stream;
};
class AudioThreadNew {
private:
ThreadQueue<std::string> *threadQueue;
public:
AudioThreadNew(ThreadQueue<std::string> *tq_in) {
threadQueue = tq_in;
}
void threadMain() {
while (1) {
while (!threadQueue->empty()) {
std::string str;
if (threadQueue->try_pop(str)) {
std::cout << str << std::endl;
}
}
}
}
};

View File

@ -1,43 +0,0 @@
#include "AudioThreadQueue.h"
#include "wx/wxprec.h"
#ifndef WX_PRECOMP
#include "wx/wx.h"
#endif
AudioThreadQueue::AudioThreadQueue(wxEvtHandler* pParent) :
m_pParent(pParent) {
}
void AudioThreadQueue::addTask(const AudioThreadTask& task, const AUDIO_PRIORITY& priority) {
wxMutexLocker lock(m_MutexQueue);
m_Tasks.insert(std::make_pair(priority, task));
m_QueueCount.Post();
}
AudioThreadTask AudioThreadQueue::pop() {
AudioThreadTask element;
m_QueueCount.Wait();
m_MutexQueue.Lock();
element = (m_Tasks.begin())->second;
m_Tasks.erase(m_Tasks.begin());
m_MutexQueue.Unlock();
return element;
}
void AudioThreadQueue::report(const AudioThreadTask::AUDIO_THREAD_COMMAND& cmd, const wxString& sArg, int iArg) {
wxCommandEvent evt(wxEVT_THREAD, cmd);
evt.SetString(sArg);
evt.SetInt(iArg);
m_pParent->AddPendingEvent(evt);
}
size_t AudioThreadQueue::stackSize() {
wxMutexLocker lock(m_MutexQueue);
return m_Tasks.size();
}
wxEvtHandler* AudioThreadQueue::getHandler() {
return m_pParent;
}

View File

@ -1,28 +0,0 @@
#pragma once
#include <map>
#include "AudioThreadTask.h"
#include "wx/event.h"
class AudioThreadQueue {
public:
enum AUDIO_PRIORITY {
AUDIO_PRIORITY_HIGHEST, AUDIO_PRIORITY_HIGHER, AUDIO_PRIORITY_NORMAL, AUDIO_PRIORITY_BELOW_NORMAL, AUDIO_PRIORITY_LOW, AUDIO_PRIORITY_IDLE
};
AudioThreadQueue(wxEvtHandler* pParent);
void addTask(const AudioThreadTask& task, const AUDIO_PRIORITY& priority = AUDIO_PRIORITY_NORMAL);
void report(const AudioThreadTask::AUDIO_THREAD_COMMAND& cmd, const wxString& sArg = wxEmptyString, int iArg = 0);
AudioThreadTask pop();
size_t stackSize();
wxEvtHandler* getHandler();
private:
wxEvtHandler* m_pParent;
std::multimap<AUDIO_PRIORITY, AudioThreadTask> m_Tasks;
wxMutex m_MutexQueue;
wxSemaphore m_QueueCount;
};

View File

@ -1,2 +0,0 @@
#include "AudioThreadTask.h"

View File

@ -1,48 +0,0 @@
#pragma once
#include <vector>
#include "wx/defs.h"
#include "wx/string.h"
#include "wx/object.h"
class AudioThreadData: public wxObject {
public:
unsigned int frequency;
unsigned int sampleRate;
unsigned short channels;
std::vector<float> data;
AudioThreadData(unsigned int frequency, unsigned int sampleRate, std::vector<float> data) :
data(data), sampleRate(sampleRate), frequency(frequency) {
channels = 1;
}
~AudioThreadData() {
}
};
class AudioThreadTask {
public:
enum AUDIO_THREAD_COMMAND {
AUDIO_THREAD_EXIT = wxID_EXIT,
AUDIO_THREAD_NULL = wxID_HIGHEST + 200,
AUDIO_THREAD_STARTED,
AUDIO_THREAD_PROCESS,
AUDIO_THREAD_ERROR,
AUDIO_THREAD_DATA
};
AudioThreadTask() :
m_cmd(AUDIO_THREAD_NULL), data(NULL) {
}
AudioThreadTask(AUDIO_THREAD_COMMAND cmd) :
m_cmd(cmd), data(NULL) {
}
AudioThreadData *data;
AUDIO_THREAD_COMMAND m_cmd;
};

View File

@ -140,10 +140,13 @@ wxThread::ExitCode DemodulatorThread::Entry() {
m_pQueue->sendAudioData(DemodulatorThreadTask::DEMOD_THREAD_AUDIO_DATA,audioOut);
if (params.audioQueue != NULL) {
AudioThreadTask audio_task = AudioThreadTask(AudioThreadTask::AUDIO_THREAD_DATA);
audio_task.data = new AudioThreadData(task.data->frequency, params.audioSampleRate, newBuffer);
params.audioQueue->addTask(audio_task, AudioThreadQueue::AUDIO_PRIORITY_HIGHEST);
if (params.audioInputQueue != NULL) {
AudioThreadInput ati;
ati.data = newBuffer;
params.audioInputQueue->push(ati);
// AudioThreadTask audio_task = AudioThreadTask(AudioThreadTask::AUDIO_THREAD_DATA);
// audio_task.data = new AudioThreadData(task.data->frequency, params.audioSampleRate, newBuffer);
// params.audioQueue->addTask(audio_task, AudioThreadQueue::AUDIO_PRIORITY_HIGHEST);
}
}

View File

@ -5,7 +5,7 @@
#include "wx/string.h"
#include "wx/object.h"
#include "CubicSDRDefs.h"
#include "AudioThreadQueue.h"
#include "AudioThread.h"
enum DemodulatorType {
DEMOD_TYPE_NULL, DEMOD_TYPE_AM, DEMOD_TYPE_FM, DEMOD_TYPE_LSB, DEMOD_TYPE_USB, DEMOD_TYPE_WFM
@ -52,12 +52,12 @@ public:
unsigned int demodResampleRate;
unsigned int filterFrequency;
unsigned int audioSampleRate;
AudioThreadQueue *audioQueue;
AudioThreadInputQueue *audioInputQueue;
DemodulatorType demodType;
DemodulatorThreadParameters() :
audioQueue(NULL), inputRate(SRATE), inputResampleRate(200000), demodResampleRate(100000), audioSampleRate(48000), filterFrequency(32000), demodType(DEMOD_TYPE_WFM) {
audioInputQueue(NULL), inputRate(SRATE), inputResampleRate(200000), demodResampleRate(100000), audioSampleRate(48000), filterFrequency(32000), demodType(DEMOD_TYPE_WFM) {
}

View File

@ -3,15 +3,15 @@
#include <vector>
#include "CubicSDR.h"
//wxDEFINE_EVENT(wxEVT_COMMAND_SDRThread_INPUT, wxThreadEvent);
SDRThread::SDRThread(SDRThreadQueue* pQueue, int id) :
wxThread(wxTHREAD_DETACHED), m_pQueue(pQueue), m_ID(id) {
SDRThread::SDRThread(SDRThreadCommandQueue* pQueue) :
m_pQueue(pQueue), iqDataOutQueue(NULL), iqVisualQueue(NULL) {
dev = NULL;
sample_rate = SRATE;
}
SDRThread::~SDRThread() {
SDRThread::~SDRThread() {
std::cout << std::endl << "SDR Thread Done." << std::endl << std::endl;
rtlsdr_close(dev);
}
int SDRThread::enumerate_rtl() {
@ -89,19 +89,19 @@ int SDRThread::enumerate_rtl() {
}
wxThread::ExitCode SDRThread::Entry() {
void SDRThread::threadMain() {
int dev_count = rtlsdr_get_device_count();
int first_available = enumerate_rtl();
if (first_available == -1) {
std::cout << "No devices found.. SDR Thread exiting.." << std::endl;
return (wxThread::ExitCode) 0;
return;
} else {
std::cout << "Using first available RTL-SDR device #" << first_available << std::endl;
}
signed char *buf = (signed char *) malloc(BUF_SIZE);
signed char buf[BUF_SIZE];
unsigned int frequency = DEFAULT_FREQ;
unsigned int bandwidth = SRATE;
@ -121,19 +121,20 @@ wxThread::ExitCode SDRThread::Entry() {
double seconds = 0.0;
std::cout << "Sampling..";
while (!TestDestroy()) {
if (m_pQueue->stackSize()) {
while (1) {
if (!m_pQueue->empty()) {
bool freq_changed = false;
float new_freq;
while (m_pQueue->stackSize()) {
SDRThreadTask task = m_pQueue->pop(); // pop a task from the queue. this will block the worker thread if queue is empty
switch (task.m_cmd) {
case SDRThreadTask::SDR_THREAD_TUNING:
std::cout << "Set frequency: " << task.getUInt() << std::endl;
while (!m_pQueue->empty()) {
SDRThreadCommand command;
m_pQueue->pop(command);
switch (command.cmd) {
case SDRThreadCommand::SDR_THREAD_CMD_TUNE:
std::cout << "Set frequency: " << command.int_value << std::endl;
freq_changed = true;
new_freq = task.getUInt();
new_freq = command.int_value;
break;
}
}
@ -146,40 +147,38 @@ wxThread::ExitCode SDRThread::Entry() {
rtlsdr_read_sync(dev, buf, BUF_SIZE, &n_read);
if (!TestDestroy()) {
std::vector<signed char> new_buffer;
std::vector<signed char> new_buffer;
for (int i = 0; i < n_read; i++) {
new_buffer.push_back(buf[i] - 127);
}
double time_slice = (double) n_read / (double) sample_rate;
seconds += time_slice;
// std::cout << "Time Slice: " << time_slice << std::endl;
if (!TestDestroy()) {
SDRThreadIQData *iqData = new SDRThreadIQData(bandwidth,frequency,new_buffer);
m_pQueue->sendIQData(SDRThreadTask::SDR_THREAD_DATA, iqData);
if (demodulators.size()) {
for (int i = 0, iMax = demodulators.size(); i<iMax; i++) {
DemodulatorThreadQueue *demodQueue = demodulators[i];
DemodulatorThreadTask demod_task = DemodulatorThreadTask(DemodulatorThreadTask::DEMOD_THREAD_DATA);
demod_task.data = new DemodulatorThreadIQData(iqData->bandwidth, iqData->frequency, iqData->data);
demodQueue->addTask(demod_task, DemodulatorThreadQueue::DEMOD_PRIORITY_HIGHEST);
}
}
}
} else {
this->Yield();
this->Sleep(1);
for (int i = 0; i < n_read; i++) {
new_buffer.push_back(buf[i] - 127);
}
double time_slice = (double) n_read / (double) sample_rate;
seconds += time_slice;
SDRThreadIQData dataOut;
dataOut.frequency = frequency;
dataOut.bandwidth = bandwidth;
dataOut.data = new_buffer;
if (iqDataOutQueue != NULL) {
iqDataOutQueue->push(dataOut);
}
if (iqVisualQueue != NULL) {
iqVisualQueue->push(dataOut);
}
if (demodulators.size()) {
for (int i = 0, iMax = demodulators.size(); i < iMax; i++) {
DemodulatorThreadQueue *demodQueue = demodulators[i];
DemodulatorThreadTask demod_task = DemodulatorThreadTask(DemodulatorThreadTask::DEMOD_THREAD_DATA);
demod_task.data = new DemodulatorThreadIQData(bandwidth, frequency, new_buffer);
demodQueue->addTask(demod_task, DemodulatorThreadQueue::DEMOD_PRIORITY_HIGHEST);
}
}
}
std::cout << std::endl << "Done." << std::endl << std::endl;
rtlsdr_close(dev);
free(buf);
return (wxThread::ExitCode) 0;
}

View File

@ -9,15 +9,58 @@
#include "wx/thread.h"
#include "SDRThreadQueue.h"
#include "DemodulatorThreadQueue.h"
#include "DemodulatorMgr.h"
#include "ThreadQueue.h"
class SDRThread: public wxThread {
class SDRThreadCommand {
public:
enum SDRThreadCommandEnum {
SDR_THREAD_CMD_NULL,
SDR_THREAD_CMD_TUNE
};
SDRThreadCommand() : cmd(cmd), int_value(SDR_THREAD_CMD_NULL){
}
SDRThreadCommand(SDRThreadCommandEnum cmd) : cmd(cmd), int_value(0) {
}
SDRThreadCommandEnum cmd;
int int_value;
};
class SDRThreadIQData {
public:
unsigned int frequency;
unsigned int bandwidth;
std::vector<signed char> data;
SDRThreadIQData(): frequency(0), bandwidth(0) {
}
SDRThreadIQData(unsigned int bandwidth, unsigned int frequency, std::vector<signed char> data) :
data(data), frequency(frequency), bandwidth(bandwidth) {
}
~SDRThreadIQData() {
}
};
typedef ThreadQueue<SDRThreadCommand> SDRThreadCommandQueue;
typedef ThreadQueue<SDRThreadIQData> SDRThreadIQDataQueue;
class SDRThread {
public:
rtlsdr_dev_t *dev;
SDRThread(SDRThreadQueue* pQueue, int id = 0);
SDRThread(SDRThreadCommandQueue* pQueue);
~SDRThread();
int enumerate_rtl();
@ -26,11 +69,21 @@ public:
demodulators.push_back(demod.threadQueueDemod);
}
void threadMain();
void setIQDataOutQueue(SDRThreadIQDataQueue* iqDataQueue) {
iqDataOutQueue = iqDataQueue;
}
void setIQVisualQueue(SDRThreadIQDataQueue *iqVisQueue) {
iqVisualQueue = iqVisQueue;
iqVisualQueue->set_max_num_items(1);
}
protected:
virtual ExitCode Entry();
uint32_t sample_rate;
SDRThreadQueue* m_pQueue;
int m_ID;
SDRThreadCommandQueue* m_pQueue;
SDRThreadIQDataQueue* iqDataOutQueue;
SDRThreadIQDataQueue* iqVisualQueue;
std::vector<DemodulatorThreadQueue *> demodulators;
};

View File

@ -1,44 +0,0 @@
#include "SDRThreadQueue.h"
#include "wx/wxprec.h"
#ifndef WX_PRECOMP
#include "wx/wx.h"
#endif
SDRThreadQueue::SDRThreadQueue(wxEvtHandler* pParent) :
m_pParent(pParent) {
}
void SDRThreadQueue::addTask(const SDRThreadTask& task, const SDR_PRIORITY& priority) {
wxMutexLocker lock(m_MutexQueue);
m_Tasks.insert(std::make_pair(priority, task));
m_QueueCount.Post();
}
SDRThreadTask SDRThreadQueue::pop() {
SDRThreadTask element;
m_QueueCount.Wait();
m_MutexQueue.Lock();
element = (m_Tasks.begin())->second;
m_Tasks.erase(m_Tasks.begin());
m_MutexQueue.Unlock();
return element;
}
void SDRThreadQueue::sendIQData(const SDRThreadTask::SDR_COMMAND& cmd, SDRThreadIQData *data) {
wxCommandEvent evt(wxEVT_THREAD, cmd);
evt.SetClientData(data);
m_pParent->AddPendingEvent(evt);
}
size_t SDRThreadQueue::stackSize() {
wxMutexLocker lock(m_MutexQueue);
return m_Tasks.size();
}
wxEvtHandler* SDRThreadQueue::getHandler() {
return m_pParent;
}

View File

@ -1,29 +0,0 @@
#pragma once
#include <map>
#include <vector>
#include "SDRThreadTask.h"
#include "wx/event.h"
class SDRThreadQueue {
public:
enum SDR_PRIORITY {
SDR_PRIORITY_HIGHEST, SDR_PRIORITY_HIGHER, SDR_PRIORITY_NORMAL, SDR_PRIORITY_BELOW_NORMAL, SDR_PRIORITY_LOW, SDR_PRIORITY_IDLE
};
SDRThreadQueue(wxEvtHandler* pParent);
void addTask(const SDRThreadTask& task, const SDR_PRIORITY& priority = SDR_PRIORITY_NORMAL);
void sendIQData(const SDRThreadTask::SDR_COMMAND& cmd, SDRThreadIQData *data);
SDRThreadTask pop();
size_t stackSize();
wxEvtHandler* getHandler();
private:
wxEvtHandler* m_pParent;
std::multimap<SDR_PRIORITY, SDRThreadTask> m_Tasks;
wxMutex m_MutexQueue;
wxSemaphore m_QueueCount;
};

View File

@ -1,9 +0,0 @@
#include "SDRThreadTask.h"
void SDRThreadTask::setUInt(unsigned int i) {
arg_int = i;
}
unsigned int SDRThreadTask::getUInt() {
return arg_int;
}

View File

@ -1,48 +0,0 @@
#pragma once
#include <vector>
#include "wx/defs.h"
#include "wx/string.h"
#include "wx/object.h"
class SDRThreadIQData: public wxObject {
public:
unsigned int frequency;
unsigned int bandwidth;
std::vector<signed char> data;
SDRThreadIQData(unsigned int bandwidth, unsigned int frequency, std::vector<signed char> data) :
data(data), frequency(frequency), bandwidth(bandwidth) {
}
~SDRThreadIQData() {
}
};
class SDRThreadTask {
public:
enum SDR_COMMAND {
SDR_THREAD_EXIT = wxID_EXIT,
SDR_THREAD_NULL = wxID_HIGHEST + 1,
SDR_THREAD_STARTED,
SDR_THREAD_PROCESS,
SDR_THREAD_ERROR,
SDR_THREAD_TUNING,
SDR_THREAD_DATA
};
SDRThreadTask() :
m_cmd(SDR_THREAD_NULL), arg_int(0) {
}
SDRThreadTask(SDR_COMMAND cmd) :
arg_int(0), m_cmd(cmd) {
}
void setUInt(unsigned int i);
unsigned int getUInt();
SDR_COMMAND m_cmd;
unsigned int arg_int;
};