TeaSpeak-Client/native/serverconnection/src/audio/js/AudioConsumer.cpp

322 lines
10 KiB
C++

#include "AudioConsumer.h"
#include "AudioRecorder.h"
#include "AudioFilter.h"
#include "../AudioInput.h"
#include "../filter/Filter.h"
#include "../filter/FilterVad.h"
#include "../filter/FilterThreshold.h"
#include "../filter/FilterState.h"
#include "../../logger.h"
using namespace std;
using namespace tc::audio;
using namespace tc::audio::recorder;
NAN_MODULE_INIT(AudioConsumerWrapper::Init) {
auto klass = Nan::New<v8::FunctionTemplate>(AudioConsumerWrapper::NewInstance);
klass->SetClassName(Nan::New("AudioConsumer").ToLocalChecked());
klass->InstanceTemplate()->SetInternalFieldCount(1);
Nan::SetPrototypeMethod(klass, "get_filters", AudioConsumerWrapper::_get_filters);
Nan::SetPrototypeMethod(klass, "unregister_filter", AudioConsumerWrapper::_unregister_filter);
Nan::SetPrototypeMethod(klass, "create_filter_vad", AudioConsumerWrapper::_create_filter_vad);
Nan::SetPrototypeMethod(klass, "create_filter_threshold", AudioConsumerWrapper::_create_filter_threshold);
Nan::SetPrototypeMethod(klass, "create_filter_state", AudioConsumerWrapper::_create_filter_state);
constructor_template().Reset(klass);
constructor().Reset(Nan::GetFunction(klass).ToLocalChecked());
}
NAN_METHOD(AudioConsumerWrapper::NewInstance) {
if(!info.IsConstructCall())
Nan::ThrowError("invalid invoke!");
}
AudioConsumerWrapper::AudioConsumerWrapper(AudioRecorderWrapper* h, const std::shared_ptr<tc::audio::AudioConsumer> &handle) : _handle(handle), _recorder(h) {
log_allocate("AudioConsumerWrapper", this);
{
lock_guard read_lock(handle->on_read_lock);
handle->on_read = [&](const void* buffer, size_t length){ this->process_data(buffer, length); };
}
#ifdef DO_DEADLOCK_REF
this->_recorder->js_ref(); /* FML Mem leak! (In general the consumer live is related to the recorder handle, but for nodejs testing we want to keep this reference ) */
#endif
}
AudioConsumerWrapper::~AudioConsumerWrapper() {
log_free("AudioConsumerWrapper", this);
lock_guard lock(this->execute_lock);
this->unbind();
if(this->_handle->handle) {
this->_handle->handle->delete_consumer(this->_handle);
this->_handle = nullptr;
}
#ifdef DO_DEADLOCK_REF
if(this->_recorder)
this->_recorder->js_unref();
#endif
}
void AudioConsumerWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
this->Wrap(obj);
this->_call_data = Nan::async_callback([&] {
Nan::HandleScope scope;
auto handle = this->handle();
v8::Local<v8::Value> callback_function = Nan::Get(handle, Nan::New<v8::String>("callback_data").ToLocalChecked()).FromMaybe(v8::Local<v8::Value>{});
if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) {
lock_guard lock(this->_data_lock);
this->_data_entries.clear();
}
std::unique_ptr<DataEntry> buffer;
while(true) {
{
lock_guard lock(this->_data_lock);
if(this->_data_entries.empty())
break;
buffer = move(this->_data_entries.front());
this->_data_entries.pop_front();
}
const auto byte_length = buffer->sample_count * this->_handle->channel_count * 4;
auto js_buffer = v8::ArrayBuffer::New(Nan::GetCurrentContext()->GetIsolate(), byte_length);
auto js_fbuffer = v8::Float32Array::New(js_buffer, 0, byte_length / 4);
memcpy(js_buffer->GetContents().Data(), buffer->buffer, byte_length);
v8::Local<v8::Value> argv[1];
argv[0] = js_fbuffer;
callback_function.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
}
});
this->_call_ended = Nan::async_callback([&]{
Nan::HandleScope scope;
auto handle = this->handle();
v8::Local<v8::Value> callback_function = Nan::Get(handle, Nan::New<v8::String>("callback_ended").ToLocalChecked()).FromMaybe(v8::Local<v8::Value>{});
if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction())
return;
callback_function.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
});
this->_call_started = Nan::async_callback([&]{
Nan::HandleScope scope;
auto handle = this->handle();
v8::Local<v8::Value> callback_function = Nan::Get(handle, Nan::New<v8::String>("callback_started").ToLocalChecked()).FromMaybe(v8::Local<v8::Value>{});
if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction())
return;
callback_function.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
});
Nan::Set(this->handle(), Nan::New<v8::String>("frame_size").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->_handle->frame_size));
Nan::Set(this->handle(), Nan::New<v8::String>("sample_rate").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->_handle->sample_rate));
Nan::Set(this->handle(), Nan::New<v8::String>("channels").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->_handle->channel_count));
}
void AudioConsumerWrapper::unbind() {
if(this->_handle) {
lock_guard lock(this->_handle->on_read_lock);
this->_handle->on_read = nullptr;
}
}
void AudioConsumerWrapper::process_data(const void *buffer, size_t samples) {
lock_guard lock(this->execute_lock);
auto filters = this->filters();
for(const auto& filter : filters) {
auto _filter = filter->filter();
if(!_filter) continue;
if(_filter->frame_size() != samples) {
cerr << "Tried to use a filter, but frame size does not match!" << endl;
continue;
}
if(!_filter->process(buffer)) {
if(!this->last_consumed) {
this->last_consumed = true;
this->_call_ended();
unique_lock native_read_lock(this->native_read_callback_lock);
if(this->native_read_callback) {
auto callback = this->native_read_callback; /* copy */
native_read_lock.unlock();
callback(nullptr, 0); /* notify end */
}
}
return;
}
}
if(this->last_consumed)
this->_call_started();
this->last_consumed = false;
{
unique_lock native_read_lock(this->native_read_callback_lock);
if(this->native_read_callback) {
auto callback = this->native_read_callback; /* copy */
native_read_lock.unlock();
callback(buffer, samples);
return;
}
}
auto byte_length = samples * this->_handle->channel_count * 4;
auto buf = make_unique<DataEntry>();
buf->buffer = malloc(byte_length);
memcpy(buf->buffer, buffer, byte_length);
buf->sample_count = samples;
{
lock_guard lock(this->_data_lock);
this->_data_entries.push_back(move(buf));
}
this->_call_data();
}
std::shared_ptr<AudioFilterWrapper> AudioConsumerWrapper::create_filter(const std::string& name, const std::shared_ptr<filter::Filter> &impl) {
auto result = shared_ptr<AudioFilterWrapper>(new AudioFilterWrapper(name, impl), [](AudioFilterWrapper* ptr) {
assert(v8::Isolate::GetCurrent());
ptr->Unref();
});
/* wrap into object */
{
auto js_object = Nan::NewInstance(Nan::New(AudioFilterWrapper::constructor()), 0, nullptr).ToLocalChecked();
result->do_wrap(js_object);
result->Ref();
}
{
lock_guard lock(this->_filters_lock);
this->_filters.push_back(result);
}
return result;
}
void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) {
shared_ptr<AudioFilterWrapper> handle; /* need to keep the handle 'till everything has been finished */
{
lock_guard lock(this->_filters_lock);
for(auto& c : this->_filters) {
if(&*c == filter) {
handle = c;
break;
}
}
if(!handle)
return;
{
auto it = find(this->_filters.begin(), this->_filters.end(), handle);
if(it != this->_filters.end())
this->_filters.erase(it);
}
}
{
lock_guard lock(this->execute_lock); /* ensure that the filter isn't used right now */
handle->_filter = nullptr;
}
}
NAN_METHOD(AudioConsumerWrapper::_get_filters) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto filters = handle->filters();
auto result = Nan::New<v8::Array>((uint32_t) filters.size());
for(uint32_t index = 0; index < filters.size(); index++)
Nan::Set(result, index, filters[index]->handle());
info.GetReturnValue().Set(result);
}
NAN_METHOD(AudioConsumerWrapper::_unregister_filter) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
if(info.Length() != 1 || !info[0]->IsObject()) {
Nan::ThrowError("invalid argument");
return;
}
if(!Nan::New(AudioFilterWrapper::constructor_template())->HasInstance(info[0])) {
Nan::ThrowError("invalid consumer");
return;
}
auto consumer = ObjectWrap::Unwrap<AudioFilterWrapper>(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
handle->delete_filter(consumer);
}
NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto consumer = handle->_handle;
assert(consumer); /* should never be null! */
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("invalid argument");
return;
}
string error;
auto filter = make_shared<filter::VadFilter>(consumer->channel_count,consumer->sample_rate,consumer->frame_size);
if(!filter->initialize(error, info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) {
Nan::ThrowError(Nan::New<v8::String>("failed to initialize filter (" + error + ")").ToLocalChecked());
return;
}
auto object = handle->create_filter("vad", filter);
info.GetReturnValue().Set(object->handle());
}
NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto consumer = handle->_handle;
assert(consumer); /* should never be null! */
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("invalid argument");
return;
}
string error;
auto filter = make_shared<filter::ThresholdFilter>(consumer->channel_count,consumer->sample_rate,consumer->frame_size);
if(!filter->initialize(error, (float) info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) {
Nan::ThrowError(Nan::New<v8::String>("failed to initialize filter (" + error + ")").ToLocalChecked());
return;
}
auto object = handle->create_filter("threshold", filter);
info.GetReturnValue().Set(object->handle());
}
NAN_METHOD(AudioConsumerWrapper::_create_filter_state) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto consumer = handle->_handle;
assert(consumer); /* should never be null! */
string error;
auto filter = make_shared<filter::StateFilter>(consumer->channel_count, consumer->sample_rate, consumer->frame_size);
if(!filter->initialize(error)) {
Nan::ThrowError(Nan::New<v8::String>("failed to initialize filter (" + error + ")").ToLocalChecked());
return;
}
auto object = handle->create_filter("state", filter);
info.GetReturnValue().Set(object->handle());
}