TH_CLEAN_3: Use of non-blocking try_pop() when possible,

AudioThread concurrent access hardening and simplified,
and misc.
This commit is contained in:
vsonnier
2016-07-05 21:43:45 +02:00
parent 3bf17d0f40
commit b495b388c9
16 changed files with 223 additions and 149 deletions
+87 -46
View File
@@ -6,13 +6,14 @@
#include "DemodulatorThread.h"
#include "DemodulatorInstance.h"
#include <memory.h>
#include <mutex>
std::map<int, AudioThread *> AudioThread::deviceController;
std::map<int, int> AudioThread::deviceSampleRate;
std::map<int, std::thread *> AudioThread::deviceThread;
AudioThread::AudioThread() : IOThread(),
currentInput(NULL), inputQueue(NULL), nBufferFrames(1024), sampleRate(0) {
currentInput(nullptr), inputQueue(nullptr), nBufferFrames(1024), sampleRate(0) {
audioQueuePtr.store(0);
underflowCount.store(0);
@@ -29,13 +30,24 @@ AudioThread::~AudioThread() {
delete vBoundThreads;
}
std::recursive_mutex & AudioThread::getMutex()
{
return m_mutex;
}
void AudioThread::bindThread(AudioThread *other) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other) == boundThreads.load()->end()) {
boundThreads.load()->push_back(other);
}
}
void AudioThread::removeThread(AudioThread *other) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
std::vector<AudioThread *>::iterator i;
i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other);
if (i != boundThreads.load()->end()) {
@@ -44,6 +56,7 @@ void AudioThread::removeThread(AudioThread *other) {
}
void AudioThread::deviceCleanup() {
std::map<int, AudioThread *>::iterator i;
for (i = deviceController.begin(); i != deviceController.end(); i++) {
@@ -53,53 +66,67 @@ void AudioThread::deviceCleanup() {
static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status,
void *userData) {
AudioThread *src = (AudioThread *) userData;
float *out = (float*) outputBuffer;
float *out = (float*)outputBuffer;
//Zero output buffer in all cases: this allow to mute audio if no AudioThread data is
//actually active.
memset(out, 0, nBufferFrames * 2 * sizeof(float));
AudioThread *src = (AudioThread *) userData;
std::lock_guard<std::recursive_mutex> lock(src->getMutex());
if (src->isTerminated()) {
return 1;
}
if (status) {
std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl;
std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl;
}
if (src->boundThreads.load()->empty()) {
return 0;
return 0;
}
float peak = 0.0;
double peak = 0.0;
//for all boundThreads
for (size_t j = 0; j < src->boundThreads.load()->size(); j++) {
AudioThread *srcmix = (*(src->boundThreads.load()))[j];
//lock every single boundThread srcmix in succession the time we process
//its audio samples.
std::lock_guard<std::recursive_mutex> lock(srcmix->getMutex());
if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
continue;
}
if (!srcmix->currentInput) {
srcmix->audioQueuePtr = 0;
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
continue;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->isTerminated()) {
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
continue;
}
continue;
}
if (srcmix->currentInput->sampleRate != src->getSampleRate()) {
while (srcmix->inputQueue->size()) {
srcmix->inputQueue->pop(srcmix->currentInput);
while (srcmix->inputQueue->try_pop(srcmix->currentInput)) {
if (srcmix->currentInput) {
if (srcmix->currentInput->sampleRate == src->getSampleRate()) {
break;
}
srcmix->currentInput->decRefCount();
}
srcmix->currentInput = NULL;
}
srcmix->currentInput = nullptr;
} //end while
srcmix->audioQueuePtr = 0;
@@ -114,37 +141,35 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
srcmix->audioQueuePtr = 0;
if (srcmix->currentInput) {
srcmix->currentInput->decRefCount();
srcmix->currentInput = NULL;
srcmix->currentInput = nullptr;
}
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
continue;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->isTerminated()) {
continue;
}
}
}
continue;
}
float mixPeak = srcmix->currentInput->peak * srcmix->gain;
double mixPeak = srcmix->currentInput->peak * srcmix->gain;
if (srcmix->currentInput->channels == 1) {
for (unsigned int i = 0; i < nBufferFrames; i++) {
if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
srcmix->audioQueuePtr = 0;
if (srcmix->currentInput) {
srcmix->currentInput->decRefCount();
srcmix->currentInput = NULL;
srcmix->currentInput = nullptr;
}
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
break;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->isTerminated()) {
break;
}
float srcPeak = srcmix->currentInput->peak * srcmix->gain;
double srcPeak = srcmix->currentInput->peak * srcmix->gain;
if (mixPeak < srcPeak) {
mixPeak = srcPeak;
}
@@ -158,25 +183,25 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
}
} else {
for (int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) {
if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
srcmix->audioQueuePtr = 0;
if (srcmix->currentInput) {
srcmix->currentInput->decRefCount();
srcmix->currentInput = NULL;
srcmix->currentInput = nullptr;
}
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
break;
}
srcmix->inputQueue->pop(srcmix->currentInput);
if (srcmix->isTerminated()) {
break;
}
float srcPeak = srcmix->currentInput->peak * srcmix->gain;
double srcPeak = srcmix->currentInput->peak * srcmix->gain;
if (mixPeak < srcPeak) {
mixPeak = srcPeak;
}
}
if (srcmix->currentInput && srcmix->currentInput->data.size()) {
out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
}
srcmix->audioQueuePtr++;
@@ -186,11 +211,15 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
peak += mixPeak;
}
//normalize volume
if (peak > 1.0) {
float invPeak = (float)(1.0 / peak);
for (unsigned int i = 0; i < nBufferFrames * 2; i++) {
out[i] /= peak;
out[i] *= invPeak;
}
}
return 0;
}
@@ -247,6 +276,7 @@ void AudioThread::enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs) {
}
void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
if (deviceController.find(deviceId) != deviceController.end()) {
AudioThreadCommand refreshDevice;
refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
@@ -261,6 +291,8 @@ void AudioThread::setSampleRate(int sampleRate) {
dac.stopStream();
dac.closeStream();
std::lock_guard<std::recursive_mutex> lock(m_mutex);
for (size_t j = 0; j < boundThreads.load()->size(); j++) {
AudioThread *srcmix = (*(boundThreads.load()))[j];
@@ -323,6 +355,7 @@ void AudioThread::setupDevice(int deviceId) {
deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]);
} else if (deviceController[parameters.deviceId] == this) {
//Attach callback
dac.openStream(&parameters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts);
dac.startStream();
} else {
@@ -379,6 +412,7 @@ void AudioThread::run() {
inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
//Infinite loop, witing for commands or for termination
while (!stopping) {
AudioThreadCommand command;
cmdQueue.pop(command);
@@ -391,20 +425,26 @@ void AudioThread::run() {
}
}
// Drain any remaining inputs
if (inputQueue) while (!inputQueue->empty()) {
AudioThreadInput *ref;
inputQueue->pop(ref);
//Thread termination, prevent fancy things to happen, lock the whole thing:
//This way audioThreadCallback is rightly protected from thread termination
std::lock_guard<std::recursive_mutex> lock(m_mutex);
// Drain any remaining inputs, with a non-blocking pop
AudioThreadInput *ref;
while (inputQueue && inputQueue->try_pop(ref)) {
if (ref) {
ref->decRefCount();
}
}
} //end while
//Nullify currentInput...
if (currentInput) {
currentInput->setRefCount(0);
currentInput = nullptr;
}
//Stop
if (deviceController[parameters.deviceId] != this) {
deviceController[parameters.deviceId]->removeThread(this);
} else {
@@ -444,8 +484,9 @@ void AudioThread::setActive(bool state) {
// Activity state changing, clear any inputs
if(inputQueue) {
while (!inputQueue->empty()) { // flush queue
inputQueue->pop(dummy);
while (inputQueue->try_pop(dummy)) { // flush queue, non-blocking pop
if (dummy) {
dummy->decRefCount();
}
+6
View File
@@ -88,7 +88,13 @@ private:
AudioThreadCommandQueue cmdQueue;
int sampleRate;
//The own m_mutex protecting this AudioThread
std::recursive_mutex m_mutex;
public:
//give access to the this AudioThread lock
std::recursive_mutex& getMutex();
void bindThread(AudioThread *other);
void removeThread(AudioThread *other);