mirror of
https://github.com/cjcliffe/CubicSDR.git
synced 2024-09-28 16:16:58 -04:00
Use the nuclear option to solve the hung problem:
Besides deadlocks, a thread can in theory get stuck in a blocking pop(), not seeing the stopping flag in particular. So assure liveness by making all pop() timed pop.
This commit is contained in:
parent
77a82e1617
commit
9b0ce69e8f
@ -127,7 +127,7 @@ bool IOThread::isTerminated(int waitMs) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl;
|
std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl << std::flush;
|
||||||
|
|
||||||
return terminated.load();
|
return terminated.load();
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) {
|
if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) {
|
||||||
std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl;
|
std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl << std::flush;
|
||||||
}
|
}
|
||||||
|
|
||||||
//3.We need to allocate a new buffer.
|
//3.We need to allocate a new buffer.
|
||||||
|
@ -11,6 +11,8 @@
|
|||||||
#include <memory.h>
|
#include <memory.h>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
std::map<int, AudioThread *> AudioThread::deviceController;
|
std::map<int, AudioThread *> AudioThread::deviceController;
|
||||||
std::map<int, int> AudioThread::deviceSampleRate;
|
std::map<int, int> AudioThread::deviceSampleRate;
|
||||||
@ -429,7 +431,9 @@ void AudioThread::run() {
|
|||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
AudioThreadCommand command;
|
AudioThreadCommand command;
|
||||||
|
|
||||||
cmdQueue.pop(command);
|
if (!cmdQueue.pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) {
|
if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) {
|
||||||
setupDevice(command.int_value);
|
setupDevice(command.int_value);
|
||||||
|
@ -12,6 +12,9 @@
|
|||||||
#include "CubicSDR.h"
|
#include "CubicSDR.h"
|
||||||
#include "DemodulatorInstance.h"
|
#include "DemodulatorInstance.h"
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThread(), iqResampler(NULL), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), iqOutputQueue(NULL)
|
DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThread(), iqResampler(NULL), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), iqOutputQueue(NULL)
|
||||||
{
|
{
|
||||||
initialized.store(false);
|
initialized.store(false);
|
||||||
@ -73,7 +76,9 @@ void DemodulatorPreThread::run() {
|
|||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
DemodulatorThreadIQDataPtr inp;
|
DemodulatorThreadIQDataPtr inp;
|
||||||
|
|
||||||
iqInputQueue->pop(inp);
|
if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (frequencyChanged.load()) {
|
if (frequencyChanged.load()) {
|
||||||
currentFrequency.store(newFrequency);
|
currentFrequency.store(newFrequency);
|
||||||
|
@ -12,6 +12,9 @@
|
|||||||
#define M_PI 3.14159265358979323846
|
#define M_PI 3.14159265358979323846
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#endif
|
#endif
|
||||||
@ -81,7 +84,9 @@ void DemodulatorThread::run() {
|
|||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
DemodulatorThreadPostIQDataPtr inp;
|
DemodulatorThreadPostIQDataPtr inp;
|
||||||
|
|
||||||
iqInputQueue->pop(inp);
|
if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
size_t bufSize = inp->data.size();
|
size_t bufSize = inp->data.size();
|
||||||
|
|
||||||
|
@ -6,6 +6,9 @@
|
|||||||
#include "CubicSDR.h"
|
#include "CubicSDR.h"
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(),
|
DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(),
|
||||||
commandQueue(NULL), resultQueue(NULL), cModem(nullptr), cModemKit(nullptr) {
|
commandQueue(NULL), resultQueue(NULL), cModem(nullptr), cModemKit(nullptr) {
|
||||||
}
|
}
|
||||||
@ -31,7 +34,9 @@ void DemodulatorWorkerThread::run() {
|
|||||||
//we are waiting for the first command to show up (blocking!)
|
//we are waiting for the first command to show up (blocking!)
|
||||||
//then consuming the commands until done.
|
//then consuming the commands until done.
|
||||||
while (!done) {
|
while (!done) {
|
||||||
commandQueue->pop(command);
|
if (!commandQueue->pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
switch (command.cmd) {
|
switch (command.cmd) {
|
||||||
case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS:
|
case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS:
|
||||||
|
@ -1411,7 +1411,7 @@ void BookmarkView::onEnterWindow( wxMouseEvent& event ) {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
setStatusText("You can mouse-drag a bookmark entry from one category to the next..etc. TODO: add more Bookmarks descriptions");
|
setStatusText("Drag & Drop to create / move bookmarks, Group and arrange bookmarks, quick Search by keywords.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -5,6 +5,9 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <ThreadBlockingQueue.h>
|
#include <ThreadBlockingQueue.h>
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) {
|
FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) {
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -29,7 +32,10 @@ void FFTDataDistributor::process() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DemodulatorThreadIQDataPtr inp;
|
DemodulatorThreadIQDataPtr inp;
|
||||||
input->pop(inp);
|
|
||||||
|
if (!input->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (inp) {
|
if (inp) {
|
||||||
//Settings have changed, set new values and dump all previous samples stored in inputBuffer:
|
//Settings have changed, set new values and dump all previous samples stored in inputBuffer:
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
#include "SpectrumVisualProcessor.h"
|
#include "SpectrumVisualProcessor.h"
|
||||||
#include "CubicSDR.h"
|
#include "CubicSDR.h"
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") {
|
SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") {
|
||||||
lastInputBandwidth = 0;
|
lastInputBandwidth = 0;
|
||||||
@ -194,7 +196,9 @@ void SpectrumVisualProcessor::process() {
|
|||||||
|
|
||||||
DemodulatorThreadIQDataPtr iqData;
|
DemodulatorThreadIQDataPtr iqData;
|
||||||
|
|
||||||
input->pop(iqData);
|
if (!input->pop(iqData, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!iqData) {
|
if (!iqData) {
|
||||||
return;
|
return;
|
||||||
|
@ -101,7 +101,7 @@ protected:
|
|||||||
//available outputs, previously set by attachOutput().
|
//available outputs, previously set by attachOutput().
|
||||||
//* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait.
|
//* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait.
|
||||||
//* \param[in] errorMessage an error message written on std::cout in case pf push timeout.
|
//* \param[in] errorMessage an error message written on std::cout in case pf push timeout.
|
||||||
void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") {
|
void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) {
|
||||||
|
|
||||||
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
|
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
|
||||||
//We will try to distribute 'output' among all 'outputs',
|
//We will try to distribute 'output' among all 'outputs',
|
||||||
|
@ -8,6 +8,9 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
|
||||||
|
//50 ms
|
||||||
|
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
|
||||||
|
|
||||||
SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) {
|
SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) {
|
||||||
iqDataInQueue = NULL;
|
iqDataInQueue = NULL;
|
||||||
iqDataOutQueue = NULL;
|
iqDataOutQueue = NULL;
|
||||||
@ -185,7 +188,9 @@ void SDRPostThread::run() {
|
|||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
SDRThreadIQDataPtr data_in;
|
SDRThreadIQDataPtr data_in;
|
||||||
|
|
||||||
iqDataInQueue->pop(data_in);
|
if (!iqDataInQueue->pop(data_in, HEARTBEAT_CHECK_PERIOD_MICROS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// std::lock_guard < std::mutex > lock(data_in->m_mutex);
|
// std::lock_guard < std::mutex > lock(data_in->m_mutex);
|
||||||
|
|
||||||
std::lock_guard < std::mutex > lock(busy_demod);
|
std::lock_guard < std::mutex > lock(busy_demod);
|
||||||
|
@ -74,10 +74,10 @@ public:
|
|||||||
* \param[in] item An item.
|
* \param[in] item An item.
|
||||||
* \param[in] timeout a max waiting timeout in microseconds for an item to be pushed.
|
* \param[in] timeout a max waiting timeout in microseconds for an item to be pushed.
|
||||||
* by default, = 0 means indefinite wait.
|
* by default, = 0 means indefinite wait.
|
||||||
* \param[in] errorMessage an error message written on std::cout in case of the timeout wait
|
* \param[in] errorMessage if != nullptr (is nullptr by default) an error message written on std::cout in case of the timeout wait
|
||||||
* \return true if an item was pushed into the queue, else a timeout has occured.
|
* \return true if an item was pushed into the queue, else a timeout has occured.
|
||||||
*/
|
*/
|
||||||
bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") {
|
bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = nullptr) {
|
||||||
std::unique_lock < std::mutex > lock(m_mutex);
|
std::unique_lock < std::mutex > lock(m_mutex);
|
||||||
|
|
||||||
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
||||||
@ -91,10 +91,13 @@ public:
|
|||||||
}
|
}
|
||||||
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
|
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
|
||||||
[this]() { return m_queue.size() < m_max_num_items; })) {
|
[this]() { return m_queue.size() < m_max_num_items; })) {
|
||||||
|
|
||||||
|
if (errorMessage != nullptr) {
|
||||||
std::thread::id currentThreadId = std::this_thread::get_id();
|
std::thread::id currentThreadId = std::this_thread::get_id();
|
||||||
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
||||||
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
|
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
|
||||||
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl;
|
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush;
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -123,10 +126,10 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
|
* Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
|
||||||
* \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait.
|
* \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait.
|
||||||
* \param[in] errorMessage an error message written on std::cout in case of the timeout wait
|
* \param[in] errorMessage if != nullptr (is nullptr by default) an error message written on std::cout in case of the timeout wait
|
||||||
* \return true if get an item from the queue, false if no item is received before the timeout.
|
* \return true if get an item from the queue, false if no item is received before the timeout.
|
||||||
*/
|
*/
|
||||||
bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") {
|
bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) {
|
||||||
std::unique_lock < std::mutex > lock(m_mutex);
|
std::unique_lock < std::mutex > lock(m_mutex);
|
||||||
|
|
||||||
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
||||||
@ -140,10 +143,13 @@ public:
|
|||||||
}
|
}
|
||||||
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
|
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
|
||||||
[this]() { return !m_queue.empty(); })) {
|
[this]() { return !m_queue.empty(); })) {
|
||||||
|
|
||||||
|
if (errorMessage != nullptr) {
|
||||||
std::thread::id currentThreadId = std::this_thread::get_id();
|
std::thread::id currentThreadId = std::this_thread::get_id();
|
||||||
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
||||||
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
|
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
|
||||||
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl;
|
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush;
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user