Merge pull request #713 from cjcliffe/vso_use_spin_locks

Let's go to production and ask for forgiveness later.
This commit is contained in:
Vincent Sonnier 2019-03-03 13:08:44 +01:00 committed by GitHub
commit 8b0d4c2449
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 87 additions and 51 deletions

View File

@ -466,6 +466,7 @@ SET (cubicsdr_headers
src/util/GLExt.h src/util/GLExt.h
src/util/GLFont.h src/util/GLFont.h
src/util/DataTree.h src/util/DataTree.h
src/util/SpinMutex.h
src/panel/ScopePanel.h src/panel/ScopePanel.h
src/panel/SpectrumPanel.h src/panel/SpectrumPanel.h
src/panel/WaterfallPanel.h src/panel/WaterfallPanel.h

View File

@ -15,6 +15,7 @@
#include <climits> #include <climits>
#include "ThreadBlockingQueue.h" #include "ThreadBlockingQueue.h"
#include "Timer.h" #include "Timer.h"
#include "SpinMutex.h"
struct map_string_less : public std::binary_function<std::string,std::string,bool> struct map_string_less : public std::binary_function<std::string,std::string,bool>
{ {
@ -62,7 +63,7 @@ public:
/// Return a new ReBuffer_ptr usable by the application. /// Return a new ReBuffer_ptr usable by the application.
ReBufferPtr getBuffer() { ReBufferPtr getBuffer() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < SpinMutex > lock(m_mutex);
// iterate the ReBufferAge list: if the std::shared_ptr count == 1, it means // iterate the ReBufferAge list: if the std::shared_ptr count == 1, it means
//it is only referenced in outputBuffers itself, so available for re-use. //it is only referenced in outputBuffers itself, so available for re-use.
@ -131,7 +132,7 @@ public:
/// Purge the cache. /// Purge the cache.
void purge() { void purge() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < SpinMutex > lock(m_mutex);
// since outputBuffers are full std::shared_ptr, // since outputBuffers are full std::shared_ptr,
//purging if will effectively loose the local reference, //purging if will effectively loose the local reference,
@ -152,7 +153,7 @@ private:
typedef typename std::deque< ReBufferAge < ReBufferPtr > >::iterator outputBuffersI; typedef typename std::deque< ReBufferAge < ReBufferPtr > >::iterator outputBuffersI;
//mutex protecting access to outputBuffers. //mutex protecting access to outputBuffers.
std::mutex m_mutex; SpinMutex m_mutex;
}; };

View File

@ -39,13 +39,13 @@ void DemodulatorThread::onBindOutput(std::string name, ThreadQueueBasePtr thread
if (name == "AudioVisualOutput") { if (name == "AudioVisualOutput") {
//protects because it may be changed at runtime //protects because it may be changed at runtime
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);
audioVisOutputQueue = std::static_pointer_cast<DemodulatorThreadOutputQueue>(threadQueue); audioVisOutputQueue = std::static_pointer_cast<DemodulatorThreadOutputQueue>(threadQueue);
} }
if (name == "AudioSink") { if (name == "AudioSink") {
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);
audioSinkOutputQueue = std::static_pointer_cast<AudioThreadInputQueue>(threadQueue); audioSinkOutputQueue = std::static_pointer_cast<AudioThreadInputQueue>(threadQueue);
} }
@ -247,7 +247,7 @@ void DemodulatorThread::run() {
//variable, and works with it with now on until the next while-turn. //variable, and works with it with now on until the next while-turn.
DemodulatorThreadOutputQueuePtr localAudioVisOutputQueue = nullptr; DemodulatorThreadOutputQueuePtr localAudioVisOutputQueue = nullptr;
{ {
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);
localAudioVisOutputQueue = audioVisOutputQueue; localAudioVisOutputQueue = audioVisOutputQueue;
} }
@ -336,7 +336,7 @@ void DemodulatorThread::run() {
// Capture audioSinkOutputQueue state in a local variable // Capture audioSinkOutputQueue state in a local variable
DemodulatorThreadOutputQueuePtr localAudioSinkOutputQueue = nullptr; DemodulatorThreadOutputQueuePtr localAudioSinkOutputQueue = nullptr;
{ {
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); std::lock_guard < SpinMutex > lock(m_mutexAudioVisOutputQueue);
localAudioSinkOutputQueue = audioSinkOutputQueue; localAudioSinkOutputQueue = audioSinkOutputQueue;
} }

View File

@ -10,6 +10,7 @@
#include "DemodDefs.h" #include "DemodDefs.h"
#include "AudioThread.h" #include "AudioThread.h"
#include "Modem.h" #include "Modem.h"
#include "SpinMutex.h"
#define DEMOD_VIS_SIZE 2048 #define DEMOD_VIS_SIZE 2048
#define DEMOD_SIGNAL_MIN -30 #define DEMOD_SIGNAL_MIN -30
@ -70,5 +71,5 @@ protected:
DemodulatorThreadOutputQueuePtr audioSinkOutputQueue = nullptr; DemodulatorThreadOutputQueuePtr audioSinkOutputQueue = nullptr;
//protects the audioVisOutputQueue dynamic binding change at runtime (in DemodulatorMgr) //protects the audioVisOutputQueue dynamic binding change at runtime (in DemodulatorMgr)
std::mutex m_mutexAudioVisOutputQueue; SpinMutex m_mutexAudioVisOutputQueue;
}; };

View File

@ -49,21 +49,21 @@ SpectrumVisualProcessor::~SpectrumVisualProcessor() {
bool SpectrumVisualProcessor::isView() { bool SpectrumVisualProcessor::isView() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return is_view; return is_view;
} }
void SpectrumVisualProcessor::setView(bool bView) { void SpectrumVisualProcessor::setView(bool bView) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
is_view = bView; is_view = bView;
} }
void SpectrumVisualProcessor::setView(bool bView, long long centerFreq_in, long bandwidth_in) { void SpectrumVisualProcessor::setView(bool bView, long long centerFreq_in, long bandwidth_in) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
is_view = bView; is_view = bView;
bandwidth = bandwidth_in; bandwidth = bandwidth_in;
centerFreq = centerFreq_in; centerFreq = centerFreq_in;
@ -72,49 +72,49 @@ void SpectrumVisualProcessor::setView(bool bView, long long centerFreq_in, long
void SpectrumVisualProcessor::setFFTAverageRate(float fftAverageRate) { void SpectrumVisualProcessor::setFFTAverageRate(float fftAverageRate) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
this->fft_average_rate = fftAverageRate; this->fft_average_rate = fftAverageRate;
} }
float SpectrumVisualProcessor::getFFTAverageRate() { float SpectrumVisualProcessor::getFFTAverageRate() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return this->fft_average_rate; return this->fft_average_rate;
} }
void SpectrumVisualProcessor::setCenterFrequency(long long centerFreq_in) { void SpectrumVisualProcessor::setCenterFrequency(long long centerFreq_in) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
centerFreq = centerFreq_in; centerFreq = centerFreq_in;
} }
long long SpectrumVisualProcessor::getCenterFrequency() { long long SpectrumVisualProcessor::getCenterFrequency() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return centerFreq; return centerFreq;
} }
void SpectrumVisualProcessor::setBandwidth(long bandwidth_in) { void SpectrumVisualProcessor::setBandwidth(long bandwidth_in) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
bandwidth = bandwidth_in; bandwidth = bandwidth_in;
} }
long SpectrumVisualProcessor::getBandwidth() { long SpectrumVisualProcessor::getBandwidth() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return bandwidth; return bandwidth;
} }
void SpectrumVisualProcessor::setPeakHold(bool peakHold_in) { void SpectrumVisualProcessor::setPeakHold(bool peakHold_in) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
if (peakHold && peakHold_in) { if (peakHold && peakHold_in) {
peakReset = PEAK_RESET_COUNT; peakReset = PEAK_RESET_COUNT;
@ -126,20 +126,20 @@ void SpectrumVisualProcessor::setPeakHold(bool peakHold_in) {
bool SpectrumVisualProcessor::getPeakHold() { bool SpectrumVisualProcessor::getPeakHold() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return peakHold; return peakHold;
} }
int SpectrumVisualProcessor::getDesiredInputSize() { int SpectrumVisualProcessor::getDesiredInputSize() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return desiredInputSize; return desiredInputSize;
} }
void SpectrumVisualProcessor::setup(unsigned int fftSize_in) { void SpectrumVisualProcessor::setup(unsigned int fftSize_in) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
fftSize = fftSize_in; fftSize = fftSize_in;
fftSizeInternal = fftSize_in * SPECTRUM_VZM; fftSizeInternal = fftSize_in * SPECTRUM_VZM;
@ -180,7 +180,7 @@ void SpectrumVisualProcessor::setup(unsigned int fftSize_in) {
void SpectrumVisualProcessor::setFFTSize(unsigned int fftSize_in) { void SpectrumVisualProcessor::setFFTSize(unsigned int fftSize_in) {
//then get the busy_lock //then get the busy_lock
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
if (fftSize_in == fftSize) { if (fftSize_in == fftSize) {
return; return;
@ -192,7 +192,7 @@ void SpectrumVisualProcessor::setFFTSize(unsigned int fftSize_in) {
unsigned int SpectrumVisualProcessor::getFFTSize() { unsigned int SpectrumVisualProcessor::getFFTSize() {
//then get the busy_lock //then get the busy_lock
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
if (fftSizeChanged) { if (fftSizeChanged) {
return newFFTSize; return newFFTSize;
@ -203,7 +203,7 @@ unsigned int SpectrumVisualProcessor::getFFTSize() {
void SpectrumVisualProcessor::setHideDC(bool hideDC) { void SpectrumVisualProcessor::setHideDC(bool hideDC) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
this->hideDC = hideDC; this->hideDC = hideDC;
} }
@ -220,7 +220,7 @@ void SpectrumVisualProcessor::process() {
bool executeSetup = false; bool executeSetup = false;
{ // scoped lock here { // scoped lock here
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
if (fftSizeChanged) { if (fftSizeChanged) {
executeSetup = true; executeSetup = true;
fftSizeChanged = false; fftSizeChanged = false;
@ -242,7 +242,7 @@ void SpectrumVisualProcessor::process() {
} }
//then get the busy_lock for the rest of the processing. //then get the busy_lock for the rest of the processing.
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
bool doPeak = peakHold && (peakReset == 0); bool doPeak = peakHold && (peakReset == 0);
@ -638,14 +638,14 @@ void SpectrumVisualProcessor::process() {
void SpectrumVisualProcessor::setScaleFactor(float sf) { void SpectrumVisualProcessor::setScaleFactor(float sf) {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
scaleFactor = sf; scaleFactor = sf;
} }
float SpectrumVisualProcessor::getScaleFactor() { float SpectrumVisualProcessor::getScaleFactor() {
std::lock_guard < std::mutex > busy_lock(busy_run); std::lock_guard < SpinMutex > busy_lock(busy_run);
return scaleFactor; return scaleFactor;
} }

View File

@ -7,6 +7,7 @@
#include "DemodDefs.h" #include "DemodDefs.h"
#include <cmath> #include <cmath>
#include <memory> #include <memory>
#include "SpinMutex.h"
#define SPECTRUM_VZM 2 #define SPECTRUM_VZM 2
#define PEAK_RESET_COUNT 30 #define PEAK_RESET_COUNT 30
@ -65,7 +66,7 @@ protected:
private: private:
//protects all access to fields below //protects all access to fields below
std::mutex busy_run; SpinMutex busy_run;
bool is_view; bool is_view;
size_t fftSize, newFFTSize; size_t fftSize, newFFTSize;

View File

@ -9,6 +9,7 @@
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include <typeinfo> #include <typeinfo>
#include "SpinMutex.h"
template<typename InputDataType, typename OutputDataType> template<typename InputDataType, typename OutputDataType>
class VisualProcessor { class VisualProcessor {
@ -28,7 +29,7 @@ public:
} }
bool isInputEmpty() { bool isInputEmpty() {
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
if (input) { if (input) {
return input->empty(); return input->empty();
@ -38,7 +39,7 @@ public:
} }
bool isOutputEmpty() { bool isOutputEmpty() {
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
for (VisualOutputQueueTypePtr single_output : outputs) { for (VisualOutputQueueTypePtr single_output : outputs) {
if (single_output->full()) { if (single_output->full()) {
@ -49,7 +50,7 @@ public:
} }
bool isAnyOutputEmpty() { bool isAnyOutputEmpty() {
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
for (VisualOutputQueueTypePtr single_output : outputs) { for (VisualOutputQueueTypePtr single_output : outputs) {
if (!(single_output)->full()) { if (!(single_output)->full()) {
@ -61,7 +62,7 @@ public:
//Set a (new) 'input' queue for incoming data. //Set a (new) 'input' queue for incoming data.
void setInput(VisualInputQueueTypePtr vis_in) { void setInput(VisualInputQueueTypePtr vis_in) {
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
input = vis_in; input = vis_in;
} }
@ -70,14 +71,14 @@ public:
//dispatched by distribute(). //dispatched by distribute().
void attachOutput(VisualOutputQueueTypePtr vis_out) { void attachOutput(VisualOutputQueueTypePtr vis_out) {
// attach an output queue // attach an output queue
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
outputs.push_back(vis_out); outputs.push_back(vis_out);
} }
//reverse of attachOutput(), removed an existing attached vis_out. //reverse of attachOutput(), removed an existing attached vis_out.
void removeOutput(VisualOutputQueueTypePtr vis_out) { void removeOutput(VisualOutputQueueTypePtr vis_out) {
// remove an output queue // remove an output queue
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
auto it = std::find(outputs.begin(), outputs.end(), vis_out); auto it = std::find(outputs.begin(), outputs.end(), vis_out);
if (it != outputs.end()) { if (it != outputs.end()) {
@ -98,7 +99,7 @@ public:
//scoped-lock: create a local copy of outputs, and work with it. //scoped-lock: create a local copy of outputs, and work with it.
std::vector<VisualOutputQueueTypePtr> local_outputs; std::vector<VisualOutputQueueTypePtr> local_outputs;
{ {
std::lock_guard < std::mutex > busy_lock(busy_update); std::lock_guard < SpinMutex > busy_lock(busy_update);
local_outputs = outputs; local_outputs = outputs;
} }
@ -132,11 +133,17 @@ protected:
//* \param[in] errorMessage an error message written on std::cout in case pf push timeout. //* \param[in] errorMessage an error message written on std::cout in case pf push timeout.
void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) {
std::lock_guard < std::mutex > busy_lock(busy_update); //scoped-lock: create a local copy of outputs, and work with it.
//We will try to distribute 'output' among all 'outputs', std::vector<VisualOutputQueueTypePtr> local_outputs;
//so 'output' will a-priori be shared among all 'outputs'. {
std::lock_guard < SpinMutex > busy_lock(busy_update);
local_outputs = outputs;
}
//We will try to distribute 'output' among all 'local_outputs',
//so 'output' will a-priori be shared among all 'local_outputs'.
for (VisualOutputQueueTypePtr single_output : outputs) { for (VisualOutputQueueTypePtr single_output : local_outputs) {
//'output' can fail to be given to an single_output, //'output' can fail to be given to an single_output,
//using a blocking push, with a timeout //using a blocking push, with a timeout
if (!(single_output)->push(item, timeout, errorMessage)) { if (!(single_output)->push(item, timeout, errorMessage)) {
@ -152,7 +159,7 @@ protected:
std::vector<VisualOutputQueueTypePtr> outputs; std::vector<VisualOutputQueueTypePtr> outputs;
//protects input and outputs //protects input and outputs
std::mutex busy_update; SpinMutex busy_update;
}; };
//Specialization much like VisualDataReDistributor, except //Specialization much like VisualDataReDistributor, except

View File

@ -521,7 +521,7 @@ void GLFont::drawString(const std::wstring& str, int pxHeight, float xpos, float
if (cacheable) { if (cacheable) {
gcCounter++; gcCounter++;
std::lock_guard<std::mutex> lock(cache_busy); std::lock_guard<SpinMutex> lock(cache_busy);
if (gcCounter > GC_DRAW_COUNT_PERIOD) { if (gcCounter > GC_DRAW_COUNT_PERIOD) {
@ -793,7 +793,7 @@ void GLFont::doCacheGC() {
void GLFont::clearCache() { void GLFont::clearCache() {
std::lock_guard<std::mutex> lock(cache_busy); std::lock_guard<SpinMutex> lock(cache_busy);
std::map<std::wstring, GLFontStringCache * >::iterator cache_iter; std::map<std::wstring, GLFontStringCache * >::iterator cache_iter;

View File

@ -13,6 +13,8 @@
#include "wx/filename.h" #include "wx/filename.h"
#include "wx/stdpaths.h" #include "wx/stdpaths.h"
#include "SpinMutex.h"
class GLFontStringCache { class GLFontStringCache {
public: public:
GLFontStringCache(); GLFontStringCache();
@ -76,9 +78,6 @@ private:
class GLFont { class GLFont {
public: public:
enum Align { enum Align {
GLFONT_ALIGN_LEFT, GLFONT_ALIGN_RIGHT, GLFONT_ALIGN_CENTER, GLFONT_ALIGN_TOP, GLFONT_ALIGN_BOTTOM GLFONT_ALIGN_LEFT, GLFONT_ALIGN_RIGHT, GLFONT_ALIGN_CENTER, GLFONT_ALIGN_TOP, GLFONT_ALIGN_BOTTOM
}; };
@ -176,7 +175,7 @@ private:
GLuint texId; GLuint texId;
int gcCounter; int gcCounter;
std::mutex cache_busy; SpinMutex cache_busy;
public: public:

25
src/util/SpinMutex.h Normal file
View File

@ -0,0 +1,25 @@
// Copyright (c) Charles J. Cliffe
// SPDX-License-Identifier: GPL-2.0+
#pragma once
#include <atomic>
// A non-recursive Mutex implemented as a spin-lock.
class SpinMutex {
public:
SpinMutex() = default;
SpinMutex(const SpinMutex&) = delete;
SpinMutex& operator=(const SpinMutex&) = delete;
~SpinMutex() { lock_state.clear(std::memory_order_release); }
void lock() { while (lock_state.test_and_set(std::memory_order_acquire)); }
void unlock() { lock_state.clear(std::memory_order_release); }
private:
std::atomic_flag lock_state = ATOMIC_FLAG_INIT;
};

View File

@ -88,7 +88,7 @@ void WaterfallCanvas::attachSpectrumCanvas(SpectrumCanvas *canvas_in) {
} }
void WaterfallCanvas::processInputQueue() { void WaterfallCanvas::processInputQueue() {
std::lock_guard < std::mutex > lock(tex_update); std::lock_guard < SpinMutex > lock(tex_update);
gTimer.update(); gTimer.update();
@ -127,7 +127,7 @@ void WaterfallCanvas::processInputQueue() {
} }
void WaterfallCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { void WaterfallCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) {
std::lock_guard < std::mutex > lock(tex_update); std::lock_guard < SpinMutex > lock(tex_update);
wxPaintDC dc(this); wxPaintDC dc(this);
const wxSize ClientSize = GetClientSize(); const wxSize ClientSize = GetClientSize();
@ -913,7 +913,7 @@ void WaterfallCanvas::updateCenterFrequency(long long freq) {
} }
void WaterfallCanvas::setLinesPerSecond(int lps) { void WaterfallCanvas::setLinesPerSecond(int lps) {
std::lock_guard < std::mutex > lock(tex_update); std::lock_guard < SpinMutex > lock(tex_update);
linesPerSecond = lps; linesPerSecond = lps;

View File

@ -14,6 +14,7 @@
#include "SpectrumCanvas.h" #include "SpectrumCanvas.h"
#include "WaterfallPanel.h" #include "WaterfallPanel.h"
#include "Timer.h" #include "Timer.h"
#include "SpinMutex.h"
class WaterfallCanvas: public InteractiveCanvas { class WaterfallCanvas: public InteractiveCanvas {
public: public:
@ -93,7 +94,7 @@ private:
Timer gTimer; Timer gTimer;
double lpsIndex; double lpsIndex;
bool preBuf; bool preBuf;
std::mutex tex_update; SpinMutex tex_update;
int minBandwidth; int minBandwidth;
std::atomic_bool fft_size_changed; std::atomic_bool fft_size_changed;
// event table // event table