Threads vs. Queues lifetimes, cleanups.

- Ideally Queues must outlive the threads using them, but wasn't done so. Yes, std::shared_ptr them!
- Now queues are always valid in the context of the threads using them.
- No longer need tedious queues deallocation by the original owner.
- Misc cleanups.
This commit is contained in:
vsonnier
2017-08-13 18:49:47 +02:00
parent 98c7c30aee
commit c64baab99d
31 changed files with 162 additions and 153 deletions
+6 -5
View File
@@ -27,11 +27,12 @@ SpectrumVisualProcessor *FFTVisualDataThread::getProcessor() {
}
void FFTVisualDataThread::run() {
DemodulatorThreadInputQueue *pipeIQDataIn = static_cast<DemodulatorThreadInputQueue *>(getInputQueue("IQDataInput"));
SpectrumVisualDataQueue *pipeFFTDataOut = static_cast<SpectrumVisualDataQueue *>(getOutputQueue("FFTDataOutput"));
DemodulatorThreadInputQueuePtr pipeIQDataIn = std::static_pointer_cast<DemodulatorThreadInputQueue>(getInputQueue("IQDataInput"));
SpectrumVisualDataQueuePtr pipeFFTDataOut = std::static_pointer_cast<SpectrumVisualDataQueue>(getOutputQueue("FFTDataOutput"));
fftQueue.set_max_num_items(100);
fftQueue->set_max_num_items(100);
pipeFFTDataOut->set_max_num_items(100);
//FFT distributor plumbing:
@@ -39,10 +40,10 @@ void FFTVisualDataThread::run() {
fftDistrib.setInput(pipeIQDataIn);
//The FFT distributor has actually 1 output only, so it doesn't distribute at all :)
fftDistrib.attachOutput(&fftQueue);
fftDistrib.attachOutput(fftQueue);
//FFT Distributor output is ==> SpectrumVisualProcessor input.
wproc.setInput(&fftQueue);
wproc.setInput(fftQueue);
wproc.attachOutput(pipeFFTDataOut);
wproc.setup(DEFAULT_FFT_SIZE);
+2 -2
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-2.0+
#pragma once
#include <memory>
#include "IOThread.h"
#include "SpectrumVisualProcessor.h"
#include "FFTDataDistributor.h"
@@ -20,7 +20,7 @@ public:
protected:
FFTDataDistributor fftDistrib;
DemodulatorThreadInputQueue fftQueue;
DemodulatorThreadInputQueuePtr fftQueue = std::make_shared<DemodulatorThreadInputQueue>();
SpectrumVisualProcessor wproc;
std::atomic_int linesPerSecond;
+2
View File
@@ -28,6 +28,8 @@ typedef std::shared_ptr<ScopeRenderData> ScopeRenderDataPtr;
typedef ThreadBlockingQueue<ScopeRenderDataPtr> ScopeRenderDataQueue;
typedef std::shared_ptr<ScopeRenderDataQueue> ScopeRenderDataQueuePtr;
class ScopeVisualProcessor : public VisualProcessor<AudioThreadInput, ScopeRenderData> {
public:
ScopeVisualProcessor();
+1
View File
@@ -24,6 +24,7 @@ public:
typedef std::shared_ptr<SpectrumVisualData> SpectrumVisualDataPtr;
typedef ThreadBlockingQueue<SpectrumVisualDataPtr> SpectrumVisualDataQueue;
typedef std::shared_ptr<SpectrumVisualDataQueue> SpectrumVisualDataQueuePtr;
class SpectrumVisualProcessor : public VisualProcessor<DemodulatorThreadIQData, SpectrumVisualData> {
public:
+15 -12
View File
@@ -21,7 +21,10 @@ public:
typedef ThreadBlockingQueue<InputDataTypePtr> VisualInputQueueType;
typedef ThreadBlockingQueue<OutputDataTypePtr> VisualOutputQueueType;
typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i;
typedef std::shared_ptr<VisualInputQueueType> VisualInputQueueTypePtr;
typedef std::shared_ptr<VisualOutputQueueType> VisualOutputQueueTypePtr;
typedef typename std::vector< VisualOutputQueueTypePtr >::iterator outputs_i;
virtual ~VisualProcessor() {
}
@@ -35,8 +38,8 @@ public:
bool isOutputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
if ((*it)->full()) {
for (VisualOutputQueueTypePtr single_output : outputs) {
if (single_output->full()) {
return false;
}
}
@@ -46,8 +49,8 @@ public:
bool isAnyOutputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
if (!(*it)->full()) {
for (VisualOutputQueueTypePtr single_output : outputs) {
if (!(single_output)->full()) {
return true;
}
}
@@ -55,7 +58,7 @@ public:
}
//Set a (new) 'input' queue for incoming data.
void setInput(VisualInputQueueType *vis_in) {
void setInput(VisualInputQueueTypePtr vis_in) {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
input = vis_in;
@@ -63,14 +66,14 @@ public:
//Add a vis_out queue where to consumed 'input' data will be
//dispatched by distribute().
void attachOutput(VisualOutputQueueType *vis_out) {
void attachOutput(VisualOutputQueueTypePtr vis_out) {
// attach an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
outputs.push_back(vis_out);
}
//reverse of attachOutput(), removed an existing attached vis_out.
void removeOutput(VisualOutputQueueType *vis_out) {
void removeOutput(VisualOutputQueueTypePtr vis_out) {
// remove an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
@@ -108,20 +111,20 @@ protected:
//We will try to distribute 'output' among all 'outputs',
//so 'output' will a-priori be shared among all 'outputs'.
for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
for (VisualOutputQueueTypePtr single_output : outputs) {
//'output' can fail to be given to an outputs_i,
//using a blocking push, with a timeout
if (!(*it)->push(item, timeout, errorMessage)) {
if (!(single_output)->push(item, timeout, errorMessage)) {
//TODO : trace ?
}
}
}
//the incoming data queue
VisualInputQueueType *input = nullptr;
VisualInputQueueTypePtr input = nullptr;
//the n-outputs where to process()-ed data is distribute()-ed.
std::vector<VisualOutputQueueType *> outputs;
std::vector<VisualOutputQueueTypePtr> outputs;
//protects input and outputs, must be recursive because of re-entrance
std::recursive_mutex busy_update;