diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index ea6043f..b3a29fa 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -440,19 +440,19 @@ void AudioThread::run() { setSampleRate(command.int_value); } } - - //Thread termination, prevent fancy things to happen, lock the whole thing: - //This way audioThreadCallback is rightly protected from thread termination - std::lock_guard lock(m_mutex); - + // Drain any remaining inputs, with a non-blocking pop if (inputQueue != nullptr) { inputQueue->flush(); } - //Nullify currentInput... - currentInput = nullptr; - + //Thread termination, prevent fancy things to happen, lock the whole thing: + //This way audioThreadCallback is rightly protected from thread termination + std::lock_guard lock(m_mutex); + + //Nullify currentInput... + currentInput = nullptr; + //Stop if (deviceController[parameters.deviceId] != this) { deviceController[parameters.deviceId]->removeThread(this); diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index c1f02b3..e5afd6e 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -24,19 +24,17 @@ public: typedef std::shared_ptr VisualInputQueueTypePtr; typedef std::shared_ptr VisualOutputQueueTypePtr; - typedef typename std::vector< VisualOutputQueueTypePtr >::iterator outputs_i; - virtual ~VisualProcessor() { } bool isInputEmpty() { - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); return input->empty(); } bool isOutputEmpty() { - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); for (VisualOutputQueueTypePtr single_output : outputs) { if (single_output->full()) { @@ -47,7 +45,7 @@ public: } bool isAnyOutputEmpty() { - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); for (VisualOutputQueueTypePtr single_output : outputs) { if (!(single_output)->full()) { @@ -59,7 +57,7 @@ public: //Set a (new) 'input' queue for incoming data. void setInput(VisualInputQueueTypePtr vis_in) { - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); input = vis_in; } @@ -68,27 +66,34 @@ public: //dispatched by distribute(). void attachOutput(VisualOutputQueueTypePtr vis_out) { // attach an output queue - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); outputs.push_back(vis_out); } //reverse of attachOutput(), removed an existing attached vis_out. void removeOutput(VisualOutputQueueTypePtr vis_out) { // remove an output queue - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); - outputs_i i = std::find(outputs.begin(), outputs.end(), vis_out); - if (i != outputs.end()) { - outputs.erase(i); + auto it = std::find(outputs.begin(), outputs.end(), vis_out); + if (it != outputs.end()) { + outputs.erase(it); } } //Flush all queues, either input or outputs clearing their accumulated messages. - //this is purposefully non-blocking call. + //this is purposefully (almost) non-blocking call. void flushQueues() { - //DO NOT take the busy_update, we want a never blocking op how imperfect it could be. + input->flush(); - for (auto single_output : outputs) { + //scoped-lock: create a local copy of outputs, and work with it. + std::vector local_outputs; + { + std::lock_guard < std::mutex > busy_lock(busy_update); + local_outputs = outputs; + } + + for (auto single_output : local_outputs) { single_output->flush(); } @@ -96,17 +101,18 @@ public: //Call process() repeateadly until all available 'input' data is consumed. void run() { - - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + + //capture a local copy atomically, so we don't need to protect input. + VisualInputQueueTypePtr localInput = input; - if (input && !input->empty()) { + if (localInput && !localInput->empty()) { process(); } } protected: // derived class must implement a process() interface - //where typically 'input' data is consummed, procerssed, and then dispatched + //where typically 'input' data is consummed, processed, and then dispatched //with distribute() to all 'outputs'. virtual void process() = 0; @@ -117,15 +123,15 @@ protected: //* \param[in] errorMessage an error message written on std::cout in case pf push timeout. void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { - std::lock_guard < std::recursive_mutex > busy_lock(busy_update); + std::lock_guard < std::mutex > busy_lock(busy_update); //We will try to distribute 'output' among all 'outputs', //so 'output' will a-priori be shared among all 'outputs'. for (VisualOutputQueueTypePtr single_output : outputs) { - //'output' can fail to be given to an outputs_i, + //'output' can fail to be given to an single_output, //using a blocking push, with a timeout if (!(single_output)->push(item, timeout, errorMessage)) { - //TODO : trace ? + //trace will be std::output if timeout != 0 is set and errorMessage != null. } } } @@ -136,8 +142,8 @@ protected: //the n-outputs where to process()-ed data is distribute()-ed. std::vector outputs; - //protects input and outputs, must be recursive because of re-entrance - std::recursive_mutex busy_update; + //protects input and outputs + std::mutex busy_update; }; //Specialization much like VisualDataReDistributor, except @@ -153,10 +159,9 @@ protected: while (VisualProcessor::input->try_pop(inp)) { + //do not try to distribute if all outputs are already full. if (!VisualProcessor::isAnyOutputEmpty()) { - if (inp) { - //nothing - } + return; } @@ -187,10 +192,9 @@ protected: while (VisualProcessor::input->try_pop(inp)) { + //do not try to distribute if all outputs are already full. if (!VisualProcessor::isAnyOutputEmpty()) { - if (inp) { - //nothing - } + return; }