Optimized VisualProcessor by using a non-recursive mutex and making some ops non-blocking, small AudioThread opt as well

This commit is contained in:
vsonnier 2017-10-14 10:55:48 +02:00
parent 2c1be22c51
commit f4107c1541
2 changed files with 41 additions and 37 deletions

View File

@ -440,19 +440,19 @@ void AudioThread::run() {
setSampleRate(command.int_value); setSampleRate(command.int_value);
} }
} }
//Thread termination, prevent fancy things to happen, lock the whole thing:
//This way audioThreadCallback is rightly protected from thread termination
std::lock_guard<std::recursive_mutex> lock(m_mutex);
// Drain any remaining inputs, with a non-blocking pop // Drain any remaining inputs, with a non-blocking pop
if (inputQueue != nullptr) { if (inputQueue != nullptr) {
inputQueue->flush(); inputQueue->flush();
} }
//Nullify currentInput... //Thread termination, prevent fancy things to happen, lock the whole thing:
currentInput = nullptr; //This way audioThreadCallback is rightly protected from thread termination
std::lock_guard<std::recursive_mutex> lock(m_mutex);
//Nullify currentInput...
currentInput = nullptr;
//Stop //Stop
if (deviceController[parameters.deviceId] != this) { if (deviceController[parameters.deviceId] != this) {
deviceController[parameters.deviceId]->removeThread(this); deviceController[parameters.deviceId]->removeThread(this);

View File

@ -24,19 +24,17 @@ public:
typedef std::shared_ptr<VisualInputQueueType> VisualInputQueueTypePtr; typedef std::shared_ptr<VisualInputQueueType> VisualInputQueueTypePtr;
typedef std::shared_ptr<VisualOutputQueueType> VisualOutputQueueTypePtr; typedef std::shared_ptr<VisualOutputQueueType> VisualOutputQueueTypePtr;
typedef typename std::vector< VisualOutputQueueTypePtr >::iterator outputs_i;
virtual ~VisualProcessor() { virtual ~VisualProcessor() {
} }
bool isInputEmpty() { bool isInputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
return input->empty(); return input->empty();
} }
bool isOutputEmpty() { bool isOutputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
for (VisualOutputQueueTypePtr single_output : outputs) { for (VisualOutputQueueTypePtr single_output : outputs) {
if (single_output->full()) { if (single_output->full()) {
@ -47,7 +45,7 @@ public:
} }
bool isAnyOutputEmpty() { bool isAnyOutputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
for (VisualOutputQueueTypePtr single_output : outputs) { for (VisualOutputQueueTypePtr single_output : outputs) {
if (!(single_output)->full()) { if (!(single_output)->full()) {
@ -59,7 +57,7 @@ public:
//Set a (new) 'input' queue for incoming data. //Set a (new) 'input' queue for incoming data.
void setInput(VisualInputQueueTypePtr vis_in) { void setInput(VisualInputQueueTypePtr vis_in) {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
input = vis_in; input = vis_in;
} }
@ -68,27 +66,34 @@ public:
//dispatched by distribute(). //dispatched by distribute().
void attachOutput(VisualOutputQueueTypePtr vis_out) { void attachOutput(VisualOutputQueueTypePtr vis_out) {
// attach an output queue // attach an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
outputs.push_back(vis_out); outputs.push_back(vis_out);
} }
//reverse of attachOutput(), removed an existing attached vis_out. //reverse of attachOutput(), removed an existing attached vis_out.
void removeOutput(VisualOutputQueueTypePtr vis_out) { void removeOutput(VisualOutputQueueTypePtr vis_out) {
// remove an output queue // remove an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
outputs_i i = std::find(outputs.begin(), outputs.end(), vis_out); auto it = std::find(outputs.begin(), outputs.end(), vis_out);
if (i != outputs.end()) { if (it != outputs.end()) {
outputs.erase(i); outputs.erase(it);
} }
} }
//Flush all queues, either input or outputs clearing their accumulated messages. //Flush all queues, either input or outputs clearing their accumulated messages.
//this is purposefully non-blocking call. //this is purposefully (almost) non-blocking call.
void flushQueues() { void flushQueues() {
//DO NOT take the busy_update, we want a never blocking op how imperfect it could be.
input->flush(); input->flush();
for (auto single_output : outputs) { //scoped-lock: create a local copy of outputs, and work with it.
std::vector<VisualOutputQueueTypePtr> local_outputs;
{
std::lock_guard < std::mutex > busy_lock(busy_update);
local_outputs = outputs;
}
for (auto single_output : local_outputs) {
single_output->flush(); single_output->flush();
} }
@ -96,17 +101,18 @@ public:
//Call process() repeateadly until all available 'input' data is consumed. //Call process() repeateadly until all available 'input' data is consumed.
void run() { void run() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); //capture a local copy atomically, so we don't need to protect input.
VisualInputQueueTypePtr localInput = input;
if (input && !input->empty()) { if (localInput && !localInput->empty()) {
process(); process();
} }
} }
protected: protected:
// derived class must implement a process() interface // derived class must implement a process() interface
//where typically 'input' data is consummed, procerssed, and then dispatched //where typically 'input' data is consummed, processed, and then dispatched
//with distribute() to all 'outputs'. //with distribute() to all 'outputs'.
virtual void process() = 0; virtual void process() = 0;
@ -117,15 +123,15 @@ protected:
//* \param[in] errorMessage an error message written on std::cout in case pf push timeout. //* \param[in] errorMessage an error message written on std::cout in case pf push timeout.
void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::mutex > busy_lock(busy_update);
//We will try to distribute 'output' among all 'outputs', //We will try to distribute 'output' among all 'outputs',
//so 'output' will a-priori be shared among all 'outputs'. //so 'output' will a-priori be shared among all 'outputs'.
for (VisualOutputQueueTypePtr single_output : outputs) { for (VisualOutputQueueTypePtr single_output : outputs) {
//'output' can fail to be given to an outputs_i, //'output' can fail to be given to an single_output,
//using a blocking push, with a timeout //using a blocking push, with a timeout
if (!(single_output)->push(item, timeout, errorMessage)) { if (!(single_output)->push(item, timeout, errorMessage)) {
//TODO : trace ? //trace will be std::output if timeout != 0 is set and errorMessage != null.
} }
} }
} }
@ -136,8 +142,8 @@ protected:
//the n-outputs where to process()-ed data is distribute()-ed. //the n-outputs where to process()-ed data is distribute()-ed.
std::vector<VisualOutputQueueTypePtr> outputs; std::vector<VisualOutputQueueTypePtr> outputs;
//protects input and outputs, must be recursive because of re-entrance //protects input and outputs
std::recursive_mutex busy_update; std::mutex busy_update;
}; };
//Specialization much like VisualDataReDistributor, except //Specialization much like VisualDataReDistributor, except
@ -153,10 +159,9 @@ protected:
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) { while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
//do not try to distribute if all outputs are already full.
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) { if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
if (inp) {
//nothing
}
return; return;
} }
@ -187,10 +192,9 @@ protected:
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) { while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
//do not try to distribute if all outputs are already full.
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) { if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
if (inp) {
//nothing
}
return; return;
} }