Simplify SDRPostThread: no longer binding methods, directly use the true list of demodulators

This commit is contained in:
vsonnier 2017-08-27 12:39:28 +02:00
parent a8f8f4a7e2
commit 37712c7a81
9 changed files with 44 additions and 82 deletions

View File

@ -2273,7 +2273,7 @@ bool AppFrame::loadSession(std::string fileName) {
} }
if (demodsLoaded.size()) { if (demodsLoaded.size()) {
wxGetApp().bindDemodulators(demodsLoaded); wxGetApp().notifyDemodulatorsChanged();
} }
} // if l.rootNode()->hasAnother("demodulators") } // if l.rootNode()->hasAnother("demodulators")

View File

@ -828,16 +828,9 @@ SDRThread *CubicSDR::getSDRThread() {
} }
void CubicSDR::bindDemodulator(DemodulatorInstancePtr demod) { void CubicSDR::notifyDemodulatorsChanged() {
if (!demod) {
return;
}
sdrPostThread->bindDemodulator(demod);
}
void CubicSDR::bindDemodulators(const std::vector<DemodulatorInstancePtr>& demods) { sdrPostThread->notifyDemodulatorsChanged();
sdrPostThread->bindDemodulators(demods);
} }
long long CubicSDR::getSampleRate() { long long CubicSDR::getSampleRate() {
@ -849,7 +842,7 @@ void CubicSDR::removeDemodulator(DemodulatorInstancePtr demod) {
return; return;
} }
demod->setActive(false); demod->setActive(false);
sdrPostThread->removeDemodulator(demod); sdrPostThread->notifyDemodulatorsChanged();
wxGetApp().getAppFrame()->notifyUpdateModemProperties(); wxGetApp().getAppFrame()->notifyUpdateModemProperties();
} }

View File

@ -121,8 +121,8 @@ public:
SDRPostThread *getSDRPostThread(); SDRPostThread *getSDRPostThread();
SDRThread *getSDRThread(); SDRThread *getSDRThread();
void bindDemodulator(DemodulatorInstancePtr demod); void notifyDemodulatorsChanged();
void bindDemodulators(const std::vector<DemodulatorInstancePtr>& demods);
void removeDemodulator(DemodulatorInstancePtr demod); void removeDemodulator(DemodulatorInstancePtr demod);
void setFrequencySnap(int snap); void setFrequencySnap(int snap);

View File

@ -85,10 +85,10 @@ DemodulatorInstance::DemodulatorInstance() {
DemodulatorInstance::~DemodulatorInstance() { DemodulatorInstance::~DemodulatorInstance() {
//now that DemodulatorInstance are managed through shared_ptr, we //now that DemodulatorInstance are managed through shared_ptr, we
//should enter here ONLY when it is no longer used by any piece of code, anywahere. //should enter here ONLY when it is no longer used by any piece of code, anywhere.
//so active wait on IsTerminated(), then die. //so active wait on IsTerminated(), then die.
#define TERMINATION_SPIN_WAIT_MS (20) #define TERMINATION_SPIN_WAIT_MS (20)
#define MAX_WAIT_FOR_TERMINATION_MS (1000.0) #define MAX_WAIT_FOR_TERMINATION_MS (3000.0)
//this is a stupid busy plus sleep loop //this is a stupid busy plus sleep loop
int nbCyclesToWait = (MAX_WAIT_FOR_TERMINATION_MS / TERMINATION_SPIN_WAIT_MS) + 1; int nbCyclesToWait = (MAX_WAIT_FOR_TERMINATION_MS / TERMINATION_SPIN_WAIT_MS) + 1;
int currentCycle = 0; int currentCycle = 0;

View File

@ -9,9 +9,6 @@
//50 ms //50 ms
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
//1s
#define MAX_BLOCKING_DURATION_MICROS (1000 * 1000)
DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(), DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(),
commandQueue(nullptr), resultQueue(nullptr), cModem(nullptr), cModemKit(nullptr) { commandQueue(nullptr), resultQueue(nullptr), cModem(nullptr), cModemKit(nullptr) {
} }
@ -111,7 +108,7 @@ void DemodulatorWorkerThread::run() {
result.modemName = cModemName; result.modemName = cModemName;
//VSO: blocking push //VSO: blocking push
resultQueue->push(result, MAX_BLOCKING_DURATION_MICROS, "resultQueue"); resultQueue->push(result);
} }
} }
// std::cout << "Demodulator worker thread done." << std::endl; // std::cout << "Demodulator worker thread done." << std::endl;

View File

@ -845,7 +845,7 @@ void BookmarkView::activateBookmark(BookmarkEntryPtr bmEnt) {
matchingDemod = wxGetApp().getDemodMgr().loadInstance(bmEnt->node); matchingDemod = wxGetApp().getDemodMgr().loadInstance(bmEnt->node);
matchingDemod->run(); matchingDemod->run();
wxGetApp().bindDemodulator(matchingDemod); wxGetApp().notifyDemodulatorsChanged();
} }
matchingDemod->setActive(true); matchingDemod->setActive(true);

View File

@ -36,41 +36,12 @@ SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), vi
SDRPostThread::~SDRPostThread() { SDRPostThread::~SDRPostThread() {
} }
void SDRPostThread::bindDemodulator(DemodulatorInstancePtr demod) { void SDRPostThread::notifyDemodulatorsChanged() {
std::lock_guard < std::mutex > lock(busy_demod);
demodulators.push_back(demod);
doRefresh.store(true); doRefresh.store(true);
} }
void SDRPostThread::bindDemodulators(const std::vector<DemodulatorInstancePtr >& demods) {
std::lock_guard < std::mutex > lock(busy_demod);
for (auto di : demods) {
demodulators.push_back(di);
doRefresh.store(true);
}
}
void SDRPostThread::removeDemodulator(DemodulatorInstancePtr demod) {
if (!demod) {
return;
}
std::lock_guard < std::mutex > lock(busy_demod);
auto it = std::find(demodulators.begin(), demodulators.end(), demod);
if (it != demodulators.end()) {
demodulators.erase(it);
doRefresh.store(true);
}
}
void SDRPostThread::initPFBChannelizer() { void SDRPostThread::initPFBChannelizer() {
// std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl; // std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl;
if (channelizer) { if (channelizer) {
@ -93,6 +64,9 @@ void SDRPostThread::updateActiveDemodulators() {
long long centerFreq = wxGetApp().getFrequency(); long long centerFreq = wxGetApp().getFrequency();
//retreive the current list of demodulators:
auto demodulators = wxGetApp().getDemodMgr().getDemodulators();
for (auto demod : demodulators) { for (auto demod : demodulators) {
// not in range? // not in range?
@ -189,9 +163,8 @@ void SDRPostThread::run() {
if (!iqDataInQueue->pop(data_in, HEARTBEAT_CHECK_PERIOD_MICROS)) { if (!iqDataInQueue->pop(data_in, HEARTBEAT_CHECK_PERIOD_MICROS)) {
continue; continue;
} }
// std::lock_guard < std::mutex > lock(data_in->m_mutex);
std::lock_guard < std::mutex > lock(busy_demod); bool doUpdate = false;
if (data_in && data_in->data.size()) { if (data_in && data_in->data.size()) {
if(data_in->numChannels > 1) { if(data_in->numChannels > 1) {
@ -201,7 +174,6 @@ void SDRPostThread::run() {
} }
} }
bool doUpdate = false;
for (size_t j = 0; j < nRunDemods; j++) { for (size_t j = 0; j < nRunDemods; j++) {
DemodulatorInstancePtr demod = runDemods[j]; DemodulatorInstancePtr demod = runDemods[j];
if (abs(frequency - demod->getFrequency()) > (sampleRate / 2)) { if (abs(frequency - demod->getFrequency()) > (sampleRate / 2)) {
@ -210,7 +182,7 @@ void SDRPostThread::run() {
} }
//Only update the list of demodulators here //Only update the list of demodulators here
if (doUpdate) { if (doUpdate || doRefresh) {
updateActiveDemodulators(); updateActiveDemodulators();
} }
} //end while } //end while
@ -259,9 +231,9 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
} }
size_t refCount = nRunDemods; size_t refCount = nRunDemods;
bool doIQDataOut = (iqDataOutQueue != NULL && !iqDataOutQueue->full()); bool doIQDataOut = (iqDataOutQueue != nullptr && !iqDataOutQueue->full());
bool doDemodVisOut = (nRunDemods && iqActiveDemodVisualQueue != NULL && !iqActiveDemodVisualQueue->full()); bool doDemodVisOut = (nRunDemods && iqActiveDemodVisualQueue != NULL && !iqActiveDemodVisualQueue->full());
bool doVisOut = (iqVisualQueue != NULL && !iqVisualQueue->full()); bool doVisOut = (iqVisualQueue != nullptr && !iqVisualQueue->full());
if (doIQDataOut) { if (doIQDataOut) {
refCount++; refCount++;
@ -290,22 +262,26 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
if (doDemodVisOut) { if (doDemodVisOut) {
//VSO: blocking push //VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() iqActiveDemodVisualQueue"); iqActiveDemodVisualQueue->push(demodDataOut);
} }
if (doIQDataOut) { if (doIQDataOut) {
//VSO: blocking push //VSO: blocking push
iqDataOutQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS,"runSingleCH() iqDataOutQueue"); iqDataOutQueue->push(demodDataOut);
} }
if (doVisOut) { if (doVisOut) {
//VSO: blocking push //VSO: blocking push
iqVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() iqVisualQueue"); iqVisualQueue->push(demodDataOut);
} }
for (size_t i = 0; i < nRunDemods; i++) { for (size_t i = 0; i < nRunDemods; i++) {
//VSO: blocking push //VSO: timed-push
runDemods[i]->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() runDemods[i]->getIQInputDataPipe()"); if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() runDemods[i]->getIQInputDataPipe()")) {
//some runDemods are no longer there, bail out from runSingleCH() entirely.
doRefresh = true;
return;
}
} }
} }
} }
@ -342,11 +318,11 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
//VSO: blocking push //VSO: blocking push
iqDataOutQueue->push(iqDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqDataOutQueue"); iqDataOutQueue->push(iqDataOut);
if (doVis) { if (doVis) {
//VSO: blocking push //VSO: blocking push
iqVisualQueue->push(iqDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqVisualQueue"); iqVisualQueue->push(iqDataOut);
} }
} }
@ -442,16 +418,20 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
if (doDemodVis) { if (doDemodVis) {
//VSO: blocking push //VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqActiveDemodVisualQueue"); iqActiveDemodVisualQueue->push(demodDataOut);
} }
for (size_t j = 0; j < nRunDemods; j++) { for (size_t j = 0; j < nRunDemods; j++) {
if (demodChannel[j] == i) { if (demodChannel[j] == i) {
//VSO: blocking push //VSO: timed- push
runDemods[j]->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() demod->getIQInputDataPipe()"); if (!runDemods[j]->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() runDemods[j]->getIQInputDataPipe()")) {
//Some runDemods are no longer there, bail out from runPFBCH() entirely.
doRefresh = true;
return;
}
} }
} } //end for
} }
} }
} }

View File

@ -11,9 +11,7 @@ public:
SDRPostThread(); SDRPostThread();
~SDRPostThread(); ~SDRPostThread();
void bindDemodulator(DemodulatorInstancePtr demod); void notifyDemodulatorsChanged();
void bindDemodulators(const std::vector<DemodulatorInstancePtr>& demods);
void removeDemodulator(DemodulatorInstancePtr demod);
virtual void run(); virtual void run();
virtual void terminate(); virtual void terminate();
@ -28,12 +26,6 @@ protected:
DemodulatorThreadInputQueuePtr iqVisualQueue; DemodulatorThreadInputQueuePtr iqVisualQueue;
DemodulatorThreadInputQueuePtr iqActiveDemodVisualQueue; DemodulatorThreadInputQueuePtr iqActiveDemodVisualQueue;
//protects access to demodulators lists and such
std::mutex busy_demod;
std::vector<DemodulatorInstancePtr> demodulators;
private: private:
void initPFBChannelizer(); void initPFBChannelizer();

View File

@ -729,7 +729,7 @@ void WaterfallCanvas::OnMouseReleased(wxMouseEvent& event) {
demod->writeModemSettings(mgr->getLastModemSettings(mgr->getLastDemodulatorType())); demod->writeModemSettings(mgr->getLastModemSettings(mgr->getLastDemodulatorType()));
demod->run(); demod->run();
wxGetApp().bindDemodulator(demod); wxGetApp().notifyDemodulatorsChanged();
DemodulatorThread::releaseSquelchLock(nullptr); DemodulatorThread::releaseSquelchLock(nullptr);
} }
@ -829,7 +829,7 @@ void WaterfallCanvas::OnMouseReleased(wxMouseEvent& event) {
demod->run(); demod->run();
wxGetApp().bindDemodulator(demod); wxGetApp().notifyDemodulatorsChanged();
} }
if (demod == NULL) { if (demod == NULL) {