Clean-up / re-factor

thread queue and task handler compiling and program functional
This commit is contained in:
Charles J. Cliffe 2014-11-04 18:39:08 -05:00
parent 8dee90cd63
commit 9896808b22
9 changed files with 168 additions and 148 deletions

View File

@ -78,6 +78,7 @@ SET (cubicsdr_sources
src/IQBufferThread.cpp src/IQBufferThread.cpp
src/PrimaryGLContext.cpp src/PrimaryGLContext.cpp
src/AppFrame.cpp src/AppFrame.cpp
src/SDRThreadQueue.cpp
) )
SET (cubicsdr_headers SET (cubicsdr_headers
@ -87,6 +88,7 @@ SET (cubicsdr_headers
src/PrimaryGLContext.h src/PrimaryGLContext.h
src/AppFrame.h src/AppFrame.h
src/CubicSDRDefs.h src/CubicSDRDefs.h
src/SDRThreadQueue.h
) )
#configure_files(${PROJECT_SOURCE_DIR}/shaders ${PROJECT_BINARY_DIR}/shaders COPYONLY) #configure_files(${PROJECT_SOURCE_DIR}/shaders ${PROJECT_BINARY_DIR}/shaders COPYONLY)
#configure_files(${PROJECT_SOURCE_DIR}/png ${PROJECT_BINARY_DIR}/png COPYONLY) #configure_files(${PROJECT_SOURCE_DIR}/png ${PROJECT_BINARY_DIR}/png COPYONLY)

View File

@ -42,8 +42,9 @@ AppFrame::AppFrame() :
Centre(); Centre();
Show(); Show();
m_pQueue = new SDRThreadQueue(this);
t_SDR = new SDRThread(appframe); t_SDR = new SDRThread(m_pQueue);
if (t_SDR->Run() != wxTHREAD_NO_ERROR) { if (t_SDR->Run() != wxTHREAD_NO_ERROR) {
wxLogError wxLogError
("Can't create the thread!"); ("Can't create the thread!");
@ -51,42 +52,48 @@ AppFrame::AppFrame() :
t_SDR = NULL; t_SDR = NULL;
} }
t_IQBuffer = new IQBufferThread(this); // t_IQBuffer = new IQBufferThread(this);
if (t_IQBuffer->Run() != wxTHREAD_NO_ERROR) { // if (t_IQBuffer->Run() != wxTHREAD_NO_ERROR) {
wxLogError // wxLogError
("Can't create the thread!"); // ("Can't create the thread!");
delete t_IQBuffer; // delete t_IQBuffer;
t_IQBuffer = NULL; t_IQBuffer = NULL;
} // }
// static const int attribs[] = { WX_GL_RGBA, WX_GL_DOUBLEBUFFER, 0 }; // static const int attribs[] = { WX_GL_RGBA, WX_GL_DOUBLEBUFFER, 0 };
// wxLogStatus("Double-buffered display %s supported", wxGLCanvas::IsDisplaySupported(attribs) ? "is" : "not"); // wxLogStatus("Double-buffered display %s supported", wxGLCanvas::IsDisplaySupported(attribs) ? "is" : "not");
// ShowFullScreen(true); // ShowFullScreen(true);
} }
AppFrame::~AppFrame() {
delete t_SDR;
// delete t_IQBuffer;
delete m_pQueue;
}
void AppFrame::OnClose(wxCommandEvent& WXUNUSED(event)) { void AppFrame::OnClose(wxCommandEvent& WXUNUSED(event)) {
{ {
wxCriticalSectionLocker enter(m_pThreadCS); wxCriticalSectionLocker enter(m_pThreadCS);
if (t_SDR) { if (t_SDR) {
wxMessageOutputDebug().Printf("CubicSDR: deleting thread"); wxMessageOutputDebug().Printf("CubicSDR: deleting thread");
if (t_SDR->Delete() != wxTHREAD_NO_ERROR) { if (t_SDR->Delete() != wxTHREAD_NO_ERROR) {
wxLogError wxLogError
("Can't delete the thread!"); ("Can't delete the thread!");
} }
} }
} }
{ {
wxCriticalSectionLocker enter(m_pThreadCS); wxCriticalSectionLocker enter(m_pThreadCS);
if (t_IQBuffer) { if (t_IQBuffer) {
wxMessageOutputDebug().Printf("CubicSDR: deleting thread"); wxMessageOutputDebug().Printf("CubicSDR: deleting thread");
if (t_IQBuffer->Delete() != wxTHREAD_NO_ERROR) { if (t_IQBuffer->Delete() != wxTHREAD_NO_ERROR) {
wxLogError wxLogError
("Can't delete the thread!"); ("Can't delete the thread!");
} }
} }
} }
// true is to force the frame to close // true is to force the frame to close
Close(true); Close(true);
} }

View File

@ -2,11 +2,13 @@
#include "wx/frame.h" #include "wx/frame.h"
#include "PrimaryGLContext.h" #include "PrimaryGLContext.h"
#include "SDRThread.h"
// Define a new frame type // Define a new frame type
class AppFrame: public wxFrame { class AppFrame: public wxFrame {
public: public:
AppFrame(); AppFrame();
~AppFrame();
void OnEventInput(wxThreadEvent& event); void OnEventInput(wxThreadEvent& event);
private: private:
@ -18,6 +20,7 @@ private:
SDRThread *t_SDR; SDRThread *t_SDR;
IQBufferThread *t_IQBuffer; IQBufferThread *t_IQBuffer;
wxCriticalSection m_pThreadCS; wxCriticalSection m_pThreadCS;
SDRThreadQueue* m_pQueue;
wxDECLARE_EVENT_TABLE(); wxDECLARE_EVENT_TABLE();
}; };

View File

@ -4,8 +4,6 @@
//WX_GL_MAJOR_VERSION 3 //WX_GL_MAJOR_VERSION 3
//WX_GL_MINOR_VERSION 2 //WX_GL_MINOR_VERSION 2
#include "SDRThread.h"
#include "IQBufferThread.h"
#include "wx/glcanvas.h" #include "wx/glcanvas.h"
#include "PrimaryGLContext.h" #include "PrimaryGLContext.h"
@ -13,7 +11,6 @@ class CubicSDR: public wxApp {
public: public:
CubicSDR() { CubicSDR() {
m_glContext = NULL; m_glContext = NULL;
t_SDR = NULL;
} }
PrimaryGLContext &GetContext(wxGLCanvas *canvas); PrimaryGLContext &GetContext(wxGLCanvas *canvas);

View File

@ -67,6 +67,8 @@ void PrimaryGLContext::Plot(std::vector<float> &points) {
glMatrixMode(GL_MODELVIEW); glMatrixMode(GL_MODELVIEW);
glLoadIdentity(); glLoadIdentity();
// glEnable(GL_LINE_SMOOTH);
glPushMatrix(); glPushMatrix();
glTranslatef(-1.0f, -0.9f, 0.0f); glTranslatef(-1.0f, -0.9f, 0.0f);
glScalef(2.0f, 1.8f, 1.0f); glScalef(2.0f, 1.8f, 1.0f);
@ -106,6 +108,8 @@ TestGLCanvas::TestGLCanvas(wxWindow *parent, int *attribList) :
out[1] = (fftw_complex*) fftw_malloc(sizeof(fftw_complex) * out_block_size); out[1] = (fftw_complex*) fftw_malloc(sizeof(fftw_complex) * out_block_size);
plan[0] = fftw_plan_dft_1d(out_block_size, in, out[0], FFTW_BACKWARD, FFTW_MEASURE); plan[0] = fftw_plan_dft_1d(out_block_size, in, out[0], FFTW_BACKWARD, FFTW_MEASURE);
plan[1] = fftw_plan_dft_1d(out_block_size, in, out[1], FFTW_FORWARD, FFTW_MEASURE); plan[1] = fftw_plan_dft_1d(out_block_size, in, out[1], FFTW_FORWARD, FFTW_MEASURE);
fft_ceil_ma=fft_ceil_maa=1.0;
} }
void TestGLCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { void TestGLCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) {
@ -159,45 +163,48 @@ void TestGLCanvas::setData(std::vector<signed char> *data) {
double fft_ceil = 0; double fft_ceil = 0;
// fft_floor, // fft_floor,
if (fft_result.size()<FFT_SIZE) { if (fft_result.size() < FFT_SIZE) {
fft_result.resize(FFT_SIZE); fft_result.resize(FFT_SIZE);
fft_result_ma.resize(FFT_SIZE); fft_result_ma.resize(FFT_SIZE);
fft_result_maa.resize(FFT_SIZE); fft_result_maa.resize(FFT_SIZE);
} }
for (int j = 0; j < 2; j++) { for (int j = 0; j < 2; j++) {
for (int i = 0, iMax = FFT_SIZE / 2; i < iMax; i++) { for (int i = 0, iMax = FFT_SIZE / 2; i < iMax; i++) {
double a = out[j][j?i:((iMax-1)-i)][0]; double a = out[j][j ? i : ((iMax - 1) - i)][0];
double b = out[j][j?i:((iMax-1)-i)][1]; double b = out[j][j ? i : ((iMax - 1) - i)][1];
double c = sqrt(a * a + b * b); double c = sqrt(a * a + b * b);
double x = out[j?0:1][j?((FFT_SIZE-1)-i):((FFT_SIZE/2)+i)][0]; double x = out[j ? 0 : 1][j ? ((FFT_SIZE - 1) - i) : ((FFT_SIZE / 2) + i)][0];
double y = out[j?0:1][j?((FFT_SIZE-1)-i):((FFT_SIZE/2)+i)][1]; double y = out[j ? 0 : 1][j ? ((FFT_SIZE - 1) - i) : ((FFT_SIZE / 2) + i)][1];
double z = sqrt(x * x + y * y); double z = sqrt(x * x + y * y);
double r = (c<z)?c:z; double r = (c < z) ? c : z;
if (!j) { if (!j) {
fft_result[i] = r; fft_result[i] = r;
} else { } else {
fft_result[(FFT_SIZE/2) + i] = r; fft_result[(FFT_SIZE / 2) + i] = r;
} }
} }
} }
float time_slice = (float)SRATE/(float)(BUF_SIZE/2); float time_slice = (float) SRATE / (float) (BUF_SIZE / 2);
for (int i = 0, iMax = FFT_SIZE; i < iMax; i++) { for (int i = 0, iMax = FFT_SIZE; i < iMax; i++) {
fft_result_maa[i] += (fft_result_ma[i] - fft_result_maa[i])*0.65; fft_result_maa[i] += (fft_result_ma[i] - fft_result_maa[i]) * 0.65;
fft_result_ma[i] += (fft_result[i] - fft_result_ma[i])*0.65; fft_result_ma[i] += (fft_result[i] - fft_result_ma[i]) * 0.65;
if (fft_result_maa[i] > fft_ceil) { if (fft_result_maa[i] > fft_ceil) {
fft_ceil = fft_result_maa[i]; fft_ceil = fft_result_maa[i];
} }
} }
fft_ceil_ma = fft_ceil_ma + (fft_ceil - fft_ceil_ma)*0.05;
fft_ceil_maa = fft_ceil_maa + (fft_ceil - fft_ceil_maa)*0.05;
for (int i = 0, iMax = FFT_SIZE; i < iMax; i++) { for (int i = 0, iMax = FFT_SIZE; i < iMax; i++) {
points[i * 2 + 1] = fft_result_maa[i] / fft_ceil; points[i * 2 + 1] = fft_result_maa[i] / fft_ceil_maa;
points[i * 2] = ((double) i / (double) iMax); points[i * 2] = ((double) i / (double) iMax);
} }
} }

View File

@ -31,6 +31,8 @@ public:
fftw_complex *in, *out[2]; fftw_complex *in, *out[2];
fftw_plan plan[2]; fftw_plan plan[2];
float fft_ceil_ma, fft_ceil_maa;
std::vector<float> fft_result; std::vector<float> fft_result;
std::vector<float> fft_result_ma; std::vector<float> fft_result_ma;
std::vector<float> fft_result_maa; std::vector<float> fft_result_maa;

View File

@ -2,12 +2,12 @@
#include "CubicSDRDefs.h" #include "CubicSDRDefs.h"
#include <vector> #include <vector>
//wxDEFINE_EVENT(wxEVT_COMMAND_SDRThread_INPUT, wxThreadEvent); //wxDEFINE_EVENT(wxEVT_COMMAND_SDRThread_INPUT, wxThreadEvent);
SDRThread::SDRThread(SDRThreadQueue* pQueue, int id=0) : SDRThread::SDRThread(SDRThreadQueue* pQueue, int id) :
wxThread(wxTHREAD_DETACHED), m_pQueue(pQueue), m_ID(id) { wxThread(wxTHREAD_DETACHED), m_pQueue(pQueue), m_ID(id) {
dev = NULL; dev = NULL;
sample_rate = SRATE;
} }
SDRThread::~SDRThread() { SDRThread::~SDRThread() {
@ -84,11 +84,10 @@ void SDRThread::enumerate_rtl() {
wxThread::ExitCode SDRThread::Entry() { wxThread::ExitCode SDRThread::Entry() {
signed char *buf = (signed char *) malloc(BUF_SIZE); signed char *buf = (signed char *) malloc(BUF_SIZE);
int use_my_dev = 1; int use_my_dev = 1;
int dev_count = rtlsdr_get_device_count(); int dev_count = rtlsdr_get_device_count();
if (use_my_dev > dev_count-1) { if (use_my_dev > dev_count - 1) {
use_my_dev = 0; use_my_dev = 0;
} }
@ -111,28 +110,27 @@ wxThread::ExitCode SDRThread::Entry() {
std::cout << "Sampling.."; std::cout << "Sampling..";
while (!TestDestroy()) { while (!TestDestroy()) {
if (m_pQueue->Stacksize()) { if (m_pQueue->Stacksize()) {
while (m_pQueue->Stacksize()) { while (m_pQueue->Stacksize()) {
SDRThreadTask task=m_pQueue->Pop(); // pop a task from the queue. this will block the worker thread if queue is empty SDRThreadTask task = m_pQueue->Pop(); // pop a task from the queue. this will block the worker thread if queue is empty
switch(task.m_cmd) switch (task.m_cmd) {
{ // case SDRThreadTask::SDR_THREAD_EXIT: // thread should exit
case SDRThreadTask::SDR_THREAD_EXIT: // thread should exit // Sleep(1000); // wait a while
Sleep(1000); // wait a while // throw SDRThreadTask::SDR_THREAD_EXIT; // confirm exit command
throw SDRThreadTask::SDR_THREAD_EXIT; // confirm exit command // case SDRThreadTask::SDR_THREAD_TASK: // process a standard task
case SDRThreadTask::SDR_THREAD_JOB: // process a standard task // Sleep(2000);
Sleep(2000); // m_pQueue->Report(SDRThreadTask::SDR_THREAD_TASK, wxString::Format(wxT("Task #%s done."), task.m_Arg.c_str()), m_ID); // report successful completion
m_pQueue->Report(SDRThreadTask::SDR_THREAD_JOB, wxString::Format(wxT("Task #%s done."), task.m_Arg.c_str()), m_ID); // report successful completion // break;
break; // case SDRThreadTask::SDR_THREAD_JOBERR: // process a task that terminates with an error
case SDRThreadTask::SDR_THREAD_JOBERR: // process a task that terminates with an error // m_pQueue->Report(SDRThreadTask::SDR_THREAD_TASK, wxString::Format(wxT("Task #%s errorneous."), task.m_Arg.c_str()), m_ID);
m_pQueue->Report(SDRThreadTask::SDR_THREAD_JOB, wxString::Format(wxT("Task #%s errorneous."), task.m_Arg.c_str()), m_ID); // Sleep(1000);
Sleep(1000); // throw SDRThreadTask::SDR_THREAD_EXIT; // report exit of worker thread
throw SDRThreadTask::SDR_THREAD_EXIT; // report exit of worker thread // break;
break; // case SDRThreadTask::SDR_THREAD_NULL: // dummy command
case SDRThreadTask::SDR_THREAD_NULL: // dummy command // default: break; // default
default: break; // default }
} // switch(task.m_cmd) }
} }
}
rtlsdr_read_sync(dev, buf, BUF_SIZE, &n_read); rtlsdr_read_sync(dev, buf, BUF_SIZE, &n_read);
// move around // move around
@ -147,17 +145,17 @@ wxThread::ExitCode SDRThread::Entry() {
new_buffer->push_back(buf[i] - 127); new_buffer->push_back(buf[i] - 127);
} }
double time_slice = (double)n_read/(double)sample_rate; double time_slice = (double) n_read / (double) sample_rate;
seconds += time_slice; seconds += time_slice;
// std::cout << "Time Slice: " << time_slice << std::endl; // std::cout << "Time Slice: " << time_slice << std::endl;
// if (!TestDestroy()) { if (!TestDestroy()) {
// wxThreadEvent event(wxEVT_THREAD, EVENT_SDR_INPUT); wxThreadEvent event(wxEVT_THREAD, EVENT_SDR_INPUT);
// event.SetPayload(new_buffer); event.SetPayload(new_buffer);
// wxQueueEvent(frame, event.Clone()); wxQueueEvent(m_pQueue->getHandler(), event.Clone());
// } else { } else {
delete new_buffer; delete new_buffer;
// } }
} }
} }
std::cout << std::endl << "Done." << std::endl << std::endl; std::cout << std::endl << "Done." << std::endl << std::endl;

View File

@ -9,7 +9,8 @@
#include "wx/thread.h" #include "wx/thread.h"
#include "AppFrame.h" #include "SDRThread.h"
#include "IQBufferThread.h"
#include "SDRThreadQueue.h" #include "SDRThreadQueue.h"
// declare a new type of event, to be used by our SDRThread class: // declare a new type of event, to be used by our SDRThread class:
@ -18,21 +19,21 @@
//wxDECLARE_EVENT(wxEVT_COMMAND_SDRThread_INPUT, wxThreadEvent); //wxDECLARE_EVENT(wxEVT_COMMAND_SDRThread_INPUT, wxThreadEvent);
enum { enum {
EVENT_SDR_INPUT = wxID_HIGHEST+1 EVENT_SDR_INPUT = wxID_HIGHEST + 1
}; };
class SDRThread: public wxThread { class SDRThread: public wxThread {
public: public:
rtlsdr_dev_t *dev; rtlsdr_dev_t *dev;
SDRThread(SDRThreadQueue* pQueue, int id=0); SDRThread(SDRThreadQueue* pQueue, int id = 0);
~SDRThread(); ~SDRThread();
void enumerate_rtl(); void enumerate_rtl();
protected: protected:
virtual ExitCode Entry(); virtual ExitCode Entry();
uint32_t sample_rate; uint32_t sample_rate;
SDRThreadQueue* m_pQueue; SDRThreadQueue* m_pQueue;
int m_ID; int m_ID;
}; };

View File

@ -1,60 +1,63 @@
#pragma once #pragma once
class SDRThreadTask #include <map>
{
public:
enum SDR_COMMAND
{
SDR_THREAD_EXIT=wxID_EXIT,
SDR_THREAD_NULL=wxID_HIGHEST+1,
SDR_THREAD_STARTED,
SDR_THREAD_PROCESS,
SDR_THREAD_ERROR,
};
SDRThreadTask() : m_cmd(eID_THREAD_NULL) {} class SDRThreadTask {
SDRThreadTask(SDR_COMMAND cmd, const wxString& arg) : m_cmd(cmd), m_Arg(arg) {} public:
SDR_COMMAND m_cmd; enum SDR_COMMAND {
wxString m_Arg; SDR_THREAD_EXIT = wxID_EXIT, SDR_THREAD_NULL = wxID_HIGHEST + 1, SDR_THREAD_STARTED, SDR_THREAD_PROCESS, SDR_THREAD_ERROR,
}; };
class SDRThreadQueue SDRThreadTask() :
{ m_cmd(SDR_THREAD_NULL) {
public: }
enum SDR_PRIORITY { SDR_PRIORITY_HIGHEST, SDR_PRIORITY_HIGHER, SDR_PRIORITY_NORMAL, SDR_PRIORITY_BELOW_NORMAL, SDR_PRIORITY_LOW, SDR_PRIORITY_IDLE }; SDRThreadTask(SDR_COMMAND cmd, const wxString& arg) :
SDRThreadQueue(wxEvtHandler* pParent) : m_pParent(pParent) {} m_cmd(cmd), m_Arg(arg) {
void AddTask(const SDRThreadTask& task, const SDR_PRIORITY& priority=SDR_PRIORITY_NORMAL) }
{ SDR_COMMAND m_cmd;
wxMutexLocker lock(m_MutexQueue); wxString m_Arg;
m_Tasks.insert(std::make_pair(priority, task)); };
m_QueueCount.Post();
}
SDRThreadTask Pop()
{
SDRThreadTask element;
m_QueueCount.Wait();
m_MutexQueue.Lock();
element=(m_Tasks.begin())->second;
m_Tasks.erase(m_Tasks.begin());
m_MutexQueue.Unlock();
return element;
}
void Report(const SDRThreadTask::SDR_COMMAND& cmd, const wxString& sArg=wxEmptyString, int iArg=0)
{
wxCommandEvent evt(wxEVT_THREAD, cmd);
evt.SetString(sArg);
evt.SetInt(iArg);
m_pParent->AddPendingEvent(evt);
}
size_t Stacksize()
{
wxMutexLocker lock(m_MutexQueue);
return m_Tasks.size();
}
private: class SDRThreadQueue {
wxEvtHandler* m_pParent; public:
std::multimap<tPRIORITY, SDRThreadTask> m_Tasks; enum SDR_PRIORITY {
wxMutex m_MutexQueue; SDR_PRIORITY_HIGHEST, SDR_PRIORITY_HIGHER, SDR_PRIORITY_NORMAL, SDR_PRIORITY_BELOW_NORMAL, SDR_PRIORITY_LOW, SDR_PRIORITY_IDLE
wxSemaphore m_QueueCount; };
}; SDRThreadQueue(wxEvtHandler* pParent) :
m_pParent(pParent) {
}
void AddTask(const SDRThreadTask& task, const SDR_PRIORITY& priority = SDR_PRIORITY_NORMAL) {
wxMutexLocker lock(m_MutexQueue);
m_Tasks.insert(std::make_pair(priority, task));
m_QueueCount.Post();
}
SDRThreadTask Pop() {
SDRThreadTask element;
m_QueueCount.Wait();
m_MutexQueue.Lock();
element = (m_Tasks.begin())->second;
m_Tasks.erase(m_Tasks.begin());
m_MutexQueue.Unlock();
return element;
}
void Report(const SDRThreadTask::SDR_COMMAND& cmd, const wxString& sArg = wxEmptyString, int iArg = 0) {
wxCommandEvent evt(wxEVT_THREAD, cmd);
evt.SetString(sArg);
evt.SetInt(iArg);
m_pParent->AddPendingEvent(evt);
}
size_t Stacksize() {
wxMutexLocker lock(m_MutexQueue);
return m_Tasks.size();
}
wxEvtHandler* getHandler() {
return m_pParent;
}
private:
wxEvtHandler* m_pParent;
std::multimap<SDR_PRIORITY, SDRThreadTask> m_Tasks;
wxMutex m_MutexQueue;
wxSemaphore m_QueueCount;
};