Make processing queues timed-push, to be able to escape deadlocks. Rollback 'restart device when blocked' that is ineffective in practice.

This commit is contained in:
vsonnier 2017-08-26 23:34:48 +02:00
parent ebca762ea8
commit c112026a2c
5 changed files with 32 additions and 43 deletions

View File

@ -9,6 +9,9 @@
//50 ms //50 ms
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
//1s
#define MAX_BLOCKING_DURATION_MICROS (1000 * 1000)
DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(), DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(),
commandQueue(nullptr), resultQueue(nullptr), cModem(nullptr), cModemKit(nullptr) { commandQueue(nullptr), resultQueue(nullptr), cModem(nullptr), cModemKit(nullptr) {
} }
@ -108,7 +111,7 @@ void DemodulatorWorkerThread::run() {
result.modemName = cModemName; result.modemName = cModemName;
//VSO: blocking push //VSO: blocking push
resultQueue->push(result); resultQueue->push(result, MAX_BLOCKING_DURATION_MICROS, "resultQueue");
} }
} }
// std::cout << "Demodulator worker thread done." << std::endl; // std::cout << "Demodulator worker thread done." << std::endl;

View File

@ -5,6 +5,10 @@
#include <cstring> #include <cstring>
#include <string> #include <string>
//2s
#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000)
ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcessorBuffers") { ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcessorBuffers") {
scopeEnabled.store(true); scopeEnabled.store(true);
spectrumEnabled.store(true); spectrumEnabled.store(true);
@ -116,7 +120,7 @@ void ScopeVisualProcessor::process() {
} }
renderData->spectrum = false; renderData->spectrum = false;
distribute(renderData); distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData");
} }
if (spectrumEnabled) { if (spectrumEnabled) {
@ -212,7 +216,7 @@ void ScopeVisualProcessor::process() {
renderData->fft_size = fftSize/2; renderData->fft_size = fftSize/2;
renderData->spectrum = true; renderData->spectrum = true;
distribute(renderData); distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData");
} }
} //end if try_pop() } //end if try_pop()
} }

View File

@ -7,6 +7,10 @@
//50 ms //50 ms
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
//2s
#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000)
SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") { SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") {
lastInputBandwidth = 0; lastInputBandwidth = 0;
lastBandwidth = 0; lastBandwidth = 0;
@ -592,7 +596,7 @@ void SpectrumVisualProcessor::process() {
output->centerFreq = centerFreq; output->centerFreq = centerFreq;
output->bandwidth = bandwidth; output->bandwidth = bandwidth;
distribute(output); distribute(output, MAX_BLOCKING_DURATION_MICROS, "output");
} }
} }

View File

@ -12,6 +12,9 @@
//50 ms //50 ms
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
//1s
#define MAX_BLOCKING_DURATION_MICROS (1000 * 1000)
SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) { SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) {
iqDataInQueue = NULL; iqDataInQueue = NULL;
iqDataOutQueue = NULL; iqDataOutQueue = NULL;
@ -203,10 +206,6 @@ void SDRPostThread::run() {
} }
} }
if (data_in) {
//nothing
}
bool doUpdate = false; bool doUpdate = false;
for (size_t j = 0; j < nRunDemods; j++) { for (size_t j = 0; j < nRunDemods; j++) {
DemodulatorInstance *demod = runDemods[j]; DemodulatorInstance *demod = runDemods[j];
@ -296,22 +295,22 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
if (doDemodVisOut) { if (doDemodVisOut) {
//VSO: blocking push //VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut); iqActiveDemodVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() iqActiveDemodVisualQueue");
} }
if (doIQDataOut) { if (doIQDataOut) {
//VSO: blocking push //VSO: blocking push
iqDataOutQueue->push(demodDataOut); iqDataOutQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS,"runSingleCH() iqDataOutQueue");
} }
if (doVisOut) { if (doVisOut) {
//VSO: blocking push //VSO: blocking push
iqVisualQueue->push(demodDataOut); iqVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() iqVisualQueue");
} }
for (size_t i = 0; i < nRunDemods; i++) { for (size_t i = 0; i < nRunDemods; i++) {
//VSO: blocking push //VSO: blocking push
runDemods[i]->getIQInputDataPipe()->push(demodDataOut); runDemods[i]->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() runDemods[i]->getIQInputDataPipe()");
} }
} }
} }
@ -334,7 +333,7 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
dataOut.resize(outSize); dataOut.resize(outSize);
} }
if (iqDataOutQueue != NULL && !iqDataOutQueue->full()) { if (iqDataOutQueue != nullptr && !iqDataOutQueue->full()) {
DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer(); DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer();
bool doVis = false; bool doVis = false;
@ -348,11 +347,11 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
//VSO: blocking push //VSO: blocking push
iqDataOutQueue->push(iqDataOut); iqDataOutQueue->push(iqDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqDataOutQueue");
if (doVis) { if (doVis) {
//VSO: blocking push //VSO: blocking push
iqVisualQueue->push(iqDataOut); iqVisualQueue->push(iqDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqVisualQueue");
} }
} }
@ -448,14 +447,14 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
if (doDemodVis) { if (doDemodVis) {
//VSO: blocking push //VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut); iqActiveDemodVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqActiveDemodVisualQueue");
} }
for (size_t j = 0; j < nRunDemods; j++) { for (size_t j = 0; j < nRunDemods; j++) {
if (demodChannel[j] == i) { if (demodChannel[j] == i) {
DemodulatorInstance *demod = runDemods[j]; DemodulatorInstance *demod = runDemods[j];
//VSO: blocking push //VSO: blocking push
demod->getIQInputDataPipe()->push(demodDataOut); demod->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() demod->getIQInputDataPipe()");
} }
} }
} }

View File

@ -357,6 +357,7 @@ int SDRThread::readStream(SDRThreadIQDataQueuePtr iqDataOutQueue) {
//The rest of the system saturates, //The rest of the system saturates,
//finally the push didn't suceeded. //finally the push didn't suceeded.
readStreamCode = -32;
std::cout << "SDRThread::readStream(): 3.2 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; std::cout << "SDRThread::readStream(): 3.2 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl;
//saturation, let a chance to the other threads to consume the existing samples //saturation, let a chance to the other threads to consume the existing samples
@ -364,6 +365,7 @@ int SDRThread::readStream(SDRThreadIQDataQueuePtr iqDataOutQueue) {
} }
} }
else { else {
readStreamCode = -31;
std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl;
//saturation, let a chance to the other threads to consume the existing samples //saturation, let a chance to the other threads to consume the existing samples
std::this_thread::yield(); std::this_thread::yield();
@ -374,9 +376,7 @@ int SDRThread::readStream(SDRThreadIQDataQueuePtr iqDataOutQueue) {
void SDRThread::readLoop() { void SDRThread::readLoop() {
#define STREAM_READ_WATCHDOG_S (2)
SDRThreadIQDataQueuePtr iqDataOutQueue = std::static_pointer_cast<SDRThreadIQDataQueue>( getOutputQueue("IQDataOutput")); SDRThreadIQDataQueuePtr iqDataOutQueue = std::static_pointer_cast<SDRThreadIQDataQueue>( getOutputQueue("IQDataOutput"));
if (iqDataOutQueue == nullptr) { if (iqDataOutQueue == nullptr) {
@ -384,34 +384,13 @@ void SDRThread::readLoop() {
} }
updateGains(); updateGains();
auto streamWatchDog = std::chrono::steady_clock::now();
while (!stopping.load()) { while (!stopping.load()) {
updateSettings(); updateSettings();
if (readStream(iqDataOutQueue) > 0) { readStream(iqDataOutQueue);
// record the date of the last good read.
streamWatchDog = std::chrono::steady_clock::now();
}
auto now = std::chrono::steady_clock::now();
//check watchdog value: if the date is too old, deinit end init the device.
std::chrono::duration<double> diff = now - streamWatchDog;
if (diff.count() > STREAM_READ_WATCHDOG_S) {
std::cout << "SDRThread::readStream(): Restarting stream after too many read erros..." << std::endl << std::flush;
deinit();
init();
streamWatchDog = std::chrono::steady_clock::now();
std::cout << "SDRThread::readStream(): stream restarted." << std::endl << std::flush;
}
} //End while } //End while
iqDataOutQueue->flush(); iqDataOutQueue->flush();