Threading crash fixes, reusable IQ buffer queue

This commit is contained in:
Charles J. Cliffe 2014-12-23 23:37:18 -05:00
parent ef78ffc6f3
commit b7375ce09f
3 changed files with 37 additions and 6 deletions

View File

@ -5,6 +5,7 @@
#include "liquid/liquid.h"
#include <atomic>
#include <mutex>
enum DemodulatorType {
DEMOD_TYPE_NULL,
@ -60,6 +61,7 @@ public:
unsigned int frequency;
unsigned int bandwidth;
std::vector<signed char> data;
mutable std::mutex m_mutex;
DemodulatorThreadIQData() :
frequency(0), bandwidth(0), refCount(0) {

View File

@ -157,6 +157,7 @@ void DemodulatorPreThread::threadMain() {
continue;
}
std::lock_guard < std::mutex > lock(inp->m_mutex);
std::vector<signed char> *data = &inp->data;
if (data->size()) {
int bufSize = data->size() / 2;
@ -173,6 +174,8 @@ void DemodulatorPreThread::threadMain() {
in_buf[i].imag = (float) (*data)[i * 2 + 1] / 127.0f;
}
inp->decRefCount();
if (shift_freq != 0) {
if (shift_freq < 0) {
nco_crcf_mix_block_up(nco_shift, in_buf, out_buf, bufSize);
@ -195,10 +198,8 @@ void DemodulatorPreThread::threadMain() {
resamp->resampler = resampler;
postInputQueue->push(resamp);
} else {
inp->decRefCount();
if (inp->getRefCount()<=0) {
delete inp;
}
}
if (!workerResults->empty()) {

View File

@ -1,8 +1,10 @@
#include "SDRPostThread.h"
#include "CubicSDRDefs.h"
#include <vector>
#include "CubicSDR.h"
#include <vector>
#include <deque>
SDRPostThread::SDRPostThread() :
sample_rate(SRATE), iqDataOutQueue(NULL), iqDataInQueue(NULL), iqVisualQueue(NULL), terminated(false), dcFilter(NULL) {
}
@ -49,6 +51,9 @@ void SDRPostThread::threadMain() {
std::cout << "SDR post-processing thread started.." << std::endl;
std::deque<DemodulatorThreadIQData *> buffers;
std::deque<DemodulatorThreadIQData *>::iterator buffers_i;
while (!terminated) {
SDRThreadIQData *data_in;
@ -117,7 +122,22 @@ void SDRPostThread::threadMain() {
}
if (demodulators.size()) {
DemodulatorThreadIQData *demodDataOut = new DemodulatorThreadIQData;
DemodulatorThreadIQData *demodDataOut = NULL;
for (buffers_i = buffers.begin(); buffers_i != buffers.end(); buffers_i++) {
if ((*buffers_i)->getRefCount() <= 0) {
demodDataOut = (*buffers_i);
break;
}
}
if (demodDataOut == NULL) {
demodDataOut = new DemodulatorThreadIQData;
buffers.push_back(demodDataOut);
}
std::lock_guard < std::mutex > lock(demodDataOut->m_mutex);
demodDataOut->frequency = data_in->frequency;
demodDataOut->bandwidth = data_in->bandwidth;
demodDataOut->setRefCount(activeDemods);
@ -150,7 +170,7 @@ void SDRPostThread::threadMain() {
}
if (!pushedData) {
delete demodDataOut;
demodDataOut->setRefCount(0);
}
}
}
@ -160,6 +180,14 @@ void SDRPostThread::threadMain() {
delete data_in;
}
}
while (!buffers.empty()) {
DemodulatorThreadIQData *demodDataDel = buffers.front();
buffers.pop_front();
std::lock_guard < std::mutex > lock(demodDataDel->m_mutex);
delete demodDataDel;
}
std::cout << "SDR post-processing thread done." << std::endl;
}