#include "AudioConsumer.h" #include "AudioRecorder.h" #include "AudioFilter.h" #include "../AudioInput.h" #include "../filter/Filter.h" #include "../filter/FilterVad.h" #include "../filter/FilterThreshold.h" #include "../filter/FilterState.h" #include "../../logger.h" using namespace std; using namespace tc::audio; using namespace tc::audio::recorder; NAN_MODULE_INIT(AudioConsumerWrapper::Init) { auto klass = Nan::New(AudioConsumerWrapper::NewInstance); klass->SetClassName(Nan::New("AudioConsumer").ToLocalChecked()); klass->InstanceTemplate()->SetInternalFieldCount(1); Nan::SetPrototypeMethod(klass, "get_filters", AudioConsumerWrapper::_get_filters); Nan::SetPrototypeMethod(klass, "unregister_filter", AudioConsumerWrapper::_unregister_filter); Nan::SetPrototypeMethod(klass, "create_filter_vad", AudioConsumerWrapper::_create_filter_vad); Nan::SetPrototypeMethod(klass, "create_filter_threshold", AudioConsumerWrapper::_create_filter_threshold); Nan::SetPrototypeMethod(klass, "create_filter_state", AudioConsumerWrapper::_create_filter_state); constructor_template().Reset(klass); constructor().Reset(Nan::GetFunction(klass).ToLocalChecked()); } NAN_METHOD(AudioConsumerWrapper::NewInstance) { if(!info.IsConstructCall()) Nan::ThrowError("invalid invoke!"); } AudioConsumerWrapper::AudioConsumerWrapper(AudioRecorderWrapper* h, const std::shared_ptr &handle) : _handle(handle), _recorder(h) { log_allocate("AudioConsumerWrapper", this); { lock_guard read_lock(handle->on_read_lock); handle->on_read = [&](const void* buffer, size_t length){ this->process_data(buffer, length); }; } #ifdef DO_DEADLOCK_REF this->_recorder->js_ref(); /* FML Mem leak! (In general the consumer live is related to the recorder handle, but for nodejs testing we want to keep this reference ) */ #endif } AudioConsumerWrapper::~AudioConsumerWrapper() { log_free("AudioConsumerWrapper", this); lock_guard lock(this->execute_lock); this->unbind(); if(this->_handle->handle) { this->_handle->handle->delete_consumer(this->_handle); this->_handle = nullptr; } #ifdef DO_DEADLOCK_REF if(this->_recorder) this->_recorder->js_unref(); #endif } void AudioConsumerWrapper::do_wrap(const v8::Local &obj) { this->Wrap(obj); this->_call_data = Nan::async_callback([&] { Nan::HandleScope scope; auto handle = this->handle(); v8::Local callback_function = handle->Get(Nan::New("callback_data").ToLocalChecked()); if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) { lock_guard lock(this->_data_lock); this->_data_entries.clear(); } std::unique_ptr buffer; while(true) { { lock_guard lock(this->_data_lock); if(this->_data_entries.empty()) break; buffer = move(this->_data_entries.front()); this->_data_entries.pop_front(); } const auto byte_length = buffer->sample_count * this->_handle->channel_count * 4; auto js_buffer = v8::ArrayBuffer::New(Nan::GetCurrentContext()->GetIsolate(), byte_length); auto js_fbuffer = v8::Float32Array::New(js_buffer, 0, byte_length / 4); memcpy(js_buffer->GetContents().Data(), buffer->buffer, byte_length); v8::Local argv[1]; argv[0] = js_fbuffer; callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } }); this->_call_ended = Nan::async_callback([&]{ Nan::HandleScope scope; auto handle = this->handle(); v8::Local callback_function = handle->Get(Nan::New("callback_ended").ToLocalChecked()); if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) return; callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); }); this->_call_started = Nan::async_callback([&]{ Nan::HandleScope scope; auto handle = this->handle(); v8::Local callback_function = handle->Get(Nan::New("callback_started").ToLocalChecked()); if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) return; callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); }); Nan::Set(this->handle(), Nan::New("frame_size").ToLocalChecked(), Nan::New(this->_handle->frame_size)); Nan::Set(this->handle(), Nan::New("sample_rate").ToLocalChecked(), Nan::New(this->_handle->sample_rate)); Nan::Set(this->handle(), Nan::New("channels").ToLocalChecked(), Nan::New(this->_handle->channel_count)); } void AudioConsumerWrapper::unbind() { if(this->_handle) { lock_guard lock(this->_handle->on_read_lock); this->_handle->on_read = nullptr; } } void AudioConsumerWrapper::process_data(const void *buffer, size_t samples) { lock_guard lock(this->execute_lock); auto filters = this->filters(); for(const auto& filter : filters) { auto _filter = filter->filter(); if(!_filter) continue; if(_filter->frame_size() != samples) { cerr << "Tried to use a filter, but frame size does not match!" << endl; continue; } if(!_filter->process(buffer)) { if(!this->last_consumed) { this->last_consumed = true; this->_call_ended(); unique_lock native_read_lock(this->native_read_callback_lock); if(this->native_read_callback) { auto callback = this->native_read_callback; /* copy */ native_read_lock.unlock(); callback(nullptr, 0); /* notify end */ } } return; } } if(this->last_consumed) this->_call_started(); this->last_consumed = false; { unique_lock native_read_lock(this->native_read_callback_lock); if(this->native_read_callback) { auto callback = this->native_read_callback; /* copy */ native_read_lock.unlock(); callback(buffer, samples); return; } } auto byte_length = samples * this->_handle->channel_count * 4; auto buf = make_unique(); buf->buffer = malloc(byte_length); memcpy(buf->buffer, buffer, byte_length); buf->sample_count = samples; { lock_guard lock(this->_data_lock); this->_data_entries.push_back(move(buf)); } this->_call_data(); } std::shared_ptr AudioConsumerWrapper::create_filter(const std::string& name, const std::shared_ptr &impl) { auto result = shared_ptr(new AudioFilterWrapper(name, impl), [](AudioFilterWrapper* ptr) { assert(v8::Isolate::GetCurrent()); ptr->Unref(); }); /* wrap into object */ { auto js_object = Nan::NewInstance(Nan::New(AudioFilterWrapper::constructor()), 0, nullptr).ToLocalChecked(); result->do_wrap(js_object); result->Ref(); } { lock_guard lock(this->_filters_lock); this->_filters.push_back(result); } return result; } void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) { shared_ptr handle; /* need to keep the handle 'till everything has been finished */ { lock_guard lock(this->_filters_lock); for(auto& c : this->_filters) { if(&*c == filter) { handle = c; break; } } if(!handle) return; { auto it = find(this->_filters.begin(), this->_filters.end(), handle); if(it != this->_filters.end()) this->_filters.erase(it); } } { lock_guard lock(this->execute_lock); /* ensure that the filter isn't used right now */ handle->_filter = nullptr; } } NAN_METHOD(AudioConsumerWrapper::_get_filters) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto filters = handle->filters(); auto result = Nan::New(filters.size()); for(size_t index = 0; index < filters.size(); index++) result->Set(index, filters[index]->handle()); info.GetReturnValue().Set(result); } NAN_METHOD(AudioConsumerWrapper::_unregister_filter) { auto handle = ObjectWrap::Unwrap(info.Holder()); if(info.Length() != 1 || !info[0]->IsObject()) { Nan::ThrowError("invalid argument"); return; } if(!Nan::New(AudioFilterWrapper::constructor_template())->HasInstance(info[0])) { Nan::ThrowError("invalid consumer"); return; } auto consumer = ObjectWrap::Unwrap(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); handle->delete_filter(consumer); } NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto consumer = handle->_handle; assert(consumer); /* should never be null! */ if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); return; } string error; auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); if(!filter->initialize(error, info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; } auto object = handle->create_filter("vad", filter); info.GetReturnValue().Set(object->handle()); } NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto consumer = handle->_handle; assert(consumer); /* should never be null! */ if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); return; } string error; auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); if(!filter->initialize(error, info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; } auto object = handle->create_filter("threshold", filter); info.GetReturnValue().Set(object->handle()); } NAN_METHOD(AudioConsumerWrapper::_create_filter_state) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto consumer = handle->_handle; assert(consumer); /* should never be null! */ string error; auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); if(!filter->initialize(error)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; } auto object = handle->create_filter("state", filter); info.GetReturnValue().Set(object->handle()); }