Adjustments for the new client

This commit is contained in:
WolverinDEV
2019-08-21 10:00:27 +02:00
parent ef7e5d5f66
commit ea375bc07e
46 changed files with 2201 additions and 1099 deletions
@@ -21,10 +21,10 @@ AudioConsumer::AudioConsumer(tc::audio::AudioInput *handle, size_t channel_count
void AudioConsumer::handle_framed_data(const void *buffer, size_t samples) {
unique_lock read_callback_lock(this->on_read_lock);
if(!this->on_read)
return;
auto function = this->on_read; /* copy */
read_callback_lock.unlock();
if(!function)
return;
function(buffer, samples);
}
@@ -52,8 +52,10 @@ bool AudioInput::open_device(std::string& error, PaDeviceIndex index) {
this->close_device();
this->_current_device_index = index;
this->_current_device = Pa_GetDeviceInfo(index);
if(index == paNoDevice)
return true;
this->_current_device = Pa_GetDeviceInfo(index);
if(!this->_current_device) {
this->_current_device_index = paNoDevice;
error = "failed to get device info";
@@ -141,9 +143,22 @@ int AudioInput::_audio_callback(const void *a, void *b, unsigned long c, const P
int AudioInput::audio_callback(const void *input, void *output, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags) {
if (!input) /* hmmm.. suspicious */
return 0;
if(this->_volume != 1) {
auto ptr = (float*) input;
auto left = frameCount * this->_channel_count;
while(left-- > 0)
*(ptr++) *= this->_volume;
}
auto begin = chrono::system_clock::now();
for(const auto& consumer : this->consumers()) {
consumer->process_data(input, frameCount);
}
auto end = chrono::system_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end - begin).count();
if(ms > 5) {
log_warn(category::audio, tr("Processing of audio input needed {}ms. This could be an issue!"), chrono::duration_cast<chrono::milliseconds>(end - begin).count());
}
return 0;
}
@@ -6,6 +6,7 @@
#include <iostream>
#include <functional>
#include <portaudio.h>
#include <misc/spin_lock.h>
#include "AudioSamples.h"
namespace tc {
@@ -23,7 +24,7 @@ namespace tc {
size_t const frame_size = 0;
std::mutex on_read_lock; /* locked to access the function */
spin_lock on_read_lock; /* locked to access the function */
std::function<void(const void* /* buffer */, size_t /* samples */)> on_read;
private:
AudioConsumer(AudioInput* handle, size_t channel_count, size_t sample_rate, size_t frame_size);
@@ -86,7 +86,7 @@ ssize_t AudioOutputSource::enqueue_samples(const std::shared_ptr<tc::audio::Samp
this->sample_buffers.clear();
break;
case overflow_strategy::discard_buffer_half:
this->sample_buffers.erase(this->sample_buffers.begin(), this->sample_buffers.begin() + (int) (this->sample_buffers.size() / 2));
this->sample_buffers.erase(this->sample_buffers.begin(), this->sample_buffers.begin() + (int) ceil(this->sample_buffers.size() / 2));
break;
case overflow_strategy::ignore:
break;
@@ -36,8 +36,10 @@ void Reframer::process(const void *source, size_t samples) {
}
}
auto _on_frame = this->on_frame;
while(samples > this->_frame_size) {
this->on_frame(source);
if(_on_frame)
_on_frame(source);
samples -= this->_frame_size;
source = (char*) source + this->_frame_size * this->_channels * 4;
}
@@ -40,7 +40,9 @@ AudioConsumerWrapper::AudioConsumerWrapper(AudioRecorderWrapper* h, const std::s
handle->on_read = [&](const void* buffer, size_t length){ this->process_data(buffer, length); };
}
//this->_recorder->js_ref(); /* FML FIXME: Mem leak! (In general the consumer live is related to the recorder handle) */
#ifdef DO_DEADLOCK_REF
this->_recorder->js_ref(); /* FML Mem leak! (In general the consumer live is related to the recorder handle, but for nodejs testing we want to keep this reference ) */
#endif
}
AudioConsumerWrapper::~AudioConsumerWrapper() {
@@ -53,8 +55,10 @@ AudioConsumerWrapper::~AudioConsumerWrapper() {
this->_handle = nullptr;
}
#ifdef DO_DEADLOCK_REF
if(this->_recorder)
this->_recorder->js_unref();
#endif
}
void AudioConsumerWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
@@ -77,7 +77,7 @@ namespace tc {
void do_wrap(const v8::Local<v8::Object>& /* object */);
void unbind(); /* called with execute_lock locked */
void process_data(const void* /* buffer */, size_t /* samples */); /* TODO: Lock the execute_lock! */
void process_data(const void* /* buffer */, size_t /* samples */);
struct DataEntry {
void* buffer = nullptr;
@@ -96,7 +96,6 @@ namespace tc {
Nan::callback_t<> _call_ended;
Nan::callback_t<> _call_started;
/*
*
callback_data: (buffer: Float32Array) => any;
callback_ended: () => any;
*/
@@ -68,6 +68,8 @@ AudioFilterWrapper::~AudioFilterWrapper() {
auto threshold_filter = dynamic_pointer_cast<filter::ThresholdFilter>(this->_filter);
if(threshold_filter)
threshold_filter->on_analyze = nullptr;
this->_callback_analyzed.Reset();
}
void AudioFilterWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
@@ -34,6 +34,9 @@ NAN_MODULE_INIT(AudioRecorderWrapper::Init) {
Nan::SetPrototypeMethod(klass, "started", AudioRecorderWrapper::_started);
Nan::SetPrototypeMethod(klass, "stop", AudioRecorderWrapper::_stop);
Nan::SetPrototypeMethod(klass, "get_volume", AudioRecorderWrapper::_get_volume);
Nan::SetPrototypeMethod(klass, "set_volume", AudioRecorderWrapper::_set_volume);
Nan::SetPrototypeMethod(klass, "get_consumers", AudioRecorderWrapper::_get_consumers);
Nan::SetPrototypeMethod(klass, "create_consumer", AudioRecorderWrapper::_create_consumer);
Nan::SetPrototypeMethod(klass, "delete_consumer", AudioRecorderWrapper::_delete_consumer);
@@ -126,28 +129,69 @@ NAN_METHOD(AudioRecorderWrapper::_set_device) {
auto handle = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder());
auto input = handle->_input;
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("invalid argument");
if(info.Length() != 2 || !info[0]->IsNumber() || !info[1]->IsFunction()) {
Nan::ThrowError("invalid arguments");
return;
}
auto device_id = info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0);
string error;
if(!input->open_device(error, device_id)) {
Nan::ThrowError(Nan::New<v8::String>("failed to open device (" + error + ")").ToLocalChecked());
return;
}
unique_ptr<Nan::Persistent<v8::Function>> _callback = make_unique<Nan::Persistent<v8::Function>>(info[1].As<v8::Function>());
unique_ptr<Nan::Persistent<v8::Object>> _recorder = make_unique<Nan::Persistent<v8::Object>>(info.Holder());
auto _async_callback = Nan::async_callback([call = std::move(_callback), recorder = move(_recorder)](bool result, std::string error) mutable {
Nan::HandleScope scope;
auto callback_function = call->Get(Nan::GetCurrentContext()->GetIsolate());
v8::Local<v8::Value> argv[1];
if(result)
argv[0] = v8::Boolean::New(Nan::GetCurrentContext()->GetIsolate(), result);
else
argv[0] = Nan::NewOneByteString((uint8_t*) error.data(), error.length()).ToLocalChecked();
callback_function->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
recorder->Reset();
call->Reset();
}).option_destroyed_execute(true);
std::thread([_async_callback, input, device_id]{
string error;
auto flag = input->open_device(error, device_id);
_async_callback(std::forward<bool>(flag), std::forward<std::string>(error));
}).detach();
}
NAN_METHOD(AudioRecorderWrapper::_start) {
auto handle = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder());
auto input = handle->_input;
if(!input->record()) {
Nan::ThrowError("failed to start");
if(info.Length() != 1) {
Nan::ThrowError("missing callback");
return;
}
if(!info[0]->IsFunction()) {
Nan::ThrowError("not a function");
return;
}
unique_ptr<Nan::Persistent<v8::Function>> _callback = make_unique<Nan::Persistent<v8::Function>>(info[0].As<v8::Function>());
unique_ptr<Nan::Persistent<v8::Object>> _recorder = make_unique<Nan::Persistent<v8::Object>>(info.Holder());
auto _async_callback = Nan::async_callback([call = std::move(_callback), recorder = move(_recorder)](bool result) mutable {
Nan::HandleScope scope;
auto callback_function = call->Get(Nan::GetCurrentContext()->GetIsolate());
v8::Local<v8::Value> argv[1];
argv[0] = v8::Boolean::New(Nan::GetCurrentContext()->GetIsolate(), result);
callback_function->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
recorder->Reset();
call->Reset();
}).option_destroyed_execute(true);
auto handle = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder());
auto input = handle->_input;
std::thread([_async_callback, input]{
_async_callback(input->record());
}).detach();
}
NAN_METHOD(AudioRecorderWrapper::_started) {
@@ -203,4 +247,20 @@ NAN_METHOD(AudioRecorderWrapper::_delete_consumer) {
auto consumer = ObjectWrap::Unwrap<AudioConsumerWrapper>(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
handle->delete_consumer(consumer);
}
NAN_METHOD(AudioRecorderWrapper::_set_volume) {
auto handle = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder());
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("invalid argument");
return;
}
handle->_input->set_volume(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0));
}
NAN_METHOD(AudioRecorderWrapper::_get_volume) {
auto handle = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder());
info.GetReturnValue().Set(handle->_input->volume());
}
@@ -38,6 +38,9 @@ namespace tc {
static NAN_METHOD(_get_consumers);
static NAN_METHOD(_delete_consumer);
static NAN_METHOD(_set_volume);
static NAN_METHOD(_get_volume);
std::shared_ptr<AudioConsumerWrapper> create_consumer();
void delete_consumer(const AudioConsumerWrapper*);
@@ -444,7 +444,7 @@ bool ProtocolHandler::create_datagram_packets(std::vector<pipes::buffer> &result
} else {
packet->applyPacketId(this->_packet_id_manager);
}
log_trace(category::connection, tr("Packet {} got packet id {}"), packet->type().name(), packet->packetId());
//log_trace(category::connection, tr("Packet {} got packet id {}"), packet->type().name(), packet->packetId());
}
if(!this->crypt_handler.progressPacketOut(packet.get(), error, false)) {
log_error(category::connection, tr("Failed to encrypt packet: {}"), error);
@@ -75,6 +75,8 @@ namespace tc {
ecc_key& get_identity_key() { return this->crypto.identity; }
inline std::chrono::microseconds current_ping() { return this->ping.value; }
connection_state::value connection_state = connection_state::INITIALIZING;
server_type::value server_type = server_type::TEASPEAK;
private:
@@ -60,29 +60,6 @@ void ProtocolHandler::handlePacketCommand(const std::shared_ptr<ts::protocol::Se
void ProtocolHandler::handlePacketVoice(const std::shared_ptr<ts::protocol::ServerPacket> &packet) {
this->handle->voice_connection->process_packet(packet);
/*
if(packet->type() == PacketTypeInfo::Voice) {
if(packet->data().length() < 5) {
//TODO log invalid voice packet
return;
}
auto container = make_unique<ServerConnection::VoicePacket>();
container->packet_id = be2le16(&packet->data()[0]);
container->client_id = be2le16(&packet->data()[2]);
container->codec_id = (uint8_t) packet->data()[4];
container->flag_head = packet->hasFlag(PacketFlag::Compressed);
container->voice_data = packet->data().length() > 5 ? packet->data().range(5) : pipes::buffer{};
{
lock_guard lock(this->handle->pending_voice_lock);
this->handle->pending_voice.push_back(move(container));
}
this->handle->execute_pending_voice.call(true);
} else {
//TODO implement whisper
}
*/
}
void ProtocolHandler::handlePacketPing(const std::shared_ptr<ts::protocol::ServerPacket> &packet) {
@@ -58,6 +58,7 @@ NAN_MODULE_INIT(ServerConnection::Init) {
Nan::SetPrototypeMethod(klass, "send_command", ServerConnection::_send_command);
Nan::SetPrototypeMethod(klass, "send_voice_data", ServerConnection::_send_voice_data);
Nan::SetPrototypeMethod(klass, "send_voice_data_raw", ServerConnection::_send_voice_data_raw);
Nan::SetPrototypeMethod(klass, "current_ping", ServerConnection::_current_ping);
constructor().Reset(Nan::GetFunction(klass).ToLocalChecked());
}
@@ -641,4 +642,13 @@ void ServerConnection::_execute_callback_disconnect(const std::string &reason) {
arguments[0] = Nan::New<v8::String>(reason).ToLocalChecked();
callback->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, arguments);
}
NAN_METHOD(ServerConnection::_current_ping) {
auto connection = ObjectWrap::Unwrap<ServerConnection>(info.Holder());
auto& phandler = connection->protocol_handler;
if(phandler)
info.GetReturnValue().Set((uint32_t) chrono::floor<microseconds>(phandler->current_ping()).count());
else
info.GetReturnValue().Set(-1);
}
@@ -85,6 +85,7 @@ namespace tc {
static NAN_METHOD(_send_voice_data);
static NAN_METHOD(_send_voice_data_raw);
static NAN_METHOD(_error_message);
static NAN_METHOD(_current_ping);
std::unique_ptr<Nan::Callback> callback_connect;
std::unique_ptr<Nan::Callback> callback_disconnect;
@@ -208,12 +208,27 @@ VoiceClientWrap::~VoiceClientWrap() {}
VoiceClient::VoiceClient(const std::shared_ptr<VoiceConnection>&, uint16_t client_id) : _client_id(client_id) {
this->output_source = global_audio_output->create_source();
this->output_source->overflow_strategy = audio::overflow_strategy::discard_buffer_half;
this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 0.12);
this->output_source->overflow_strategy = audio::overflow_strategy::ignore;
this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 1);
this->output_source->min_buffer = (size_t) ceil(this->output_source->sample_rate * 0.025);
this->output_source->on_underflow = [&]{
this->set_state(state::stopped);
if(this->_state == state::stopping)
this->set_state(state::stopped);
else if(this->_state != state::stopped) {
if(this->_last_received_packet + chrono::seconds(1) < chrono::system_clock::now()) {
this->set_state(state::stopped);
log_warn(category::audio, tr("Client {} has a audio buffer underflow and not received any data for one second. Stopping replay."), this->_client_id);
} else {
if(this->_state != state::buffering) {
log_warn(category::audio, tr("Client {} has a audio buffer underflow. Buffer again."), this->_client_id);
this->set_state(state::buffering);
}
}
}
};
this->output_source->on_overflow = [&](size_t count){
log_warn(category::audio, tr("Client {} has a audio buffer overflow of {}."), this->_client_id, count);
};
}
@@ -265,6 +280,7 @@ void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& b
encoded_buffer->buffer = buffer.own_buffer();
encoded_buffer->head = head;
this->_last_received_packet = encoded_buffer->receive_timestamp;
{
lock_guard lock(this->audio_decode_queue_lock);
this->audio_decode_queue.push_back(move(encoded_buffer));
@@ -356,7 +372,7 @@ void VoiceClient::process_encoded_buffer(const std::unique_ptr<EncodedBuffer> &b
diff = buffer->packet_id - codec_data->last_packet_id;
}
if(codec_data->last_packet_timestamp + chrono::seconds(1) < buffer->receive_timestamp)
if(codec_data->last_packet_timestamp + chrono::seconds(1) < buffer->receive_timestamp || this->_state >= state::stopping)
diff = 0xFFFF;
const auto old_packet_id = codec_data->last_packet_id;
@@ -427,6 +443,7 @@ void VoiceClient::process_encoded_buffer(const std::unique_ptr<EncodedBuffer> &b
}
auto enqueued = this->output_source->enqueue_samples(target_buffer, resampled_samples);
if(enqueued != resampled_samples)
log_warn(category::voice_connection, tr("Failed to enqueue all samples for client {}. Enqueued {} of {}"), this->_client_id, enqueued, resampled_samples);
this->set_state(state::playing);
//cout << "Enqueued " << enqueued << " samples" << endl;
}
@@ -112,6 +112,7 @@ namespace tc {
uint16_t _client_id;
float _volume = 1.f;
std::chrono::system_clock::time_point _last_received_packet;
state::value _state = state::stopped;
inline void set_state(state::value value) {
if(value == this->_state)
@@ -299,6 +299,7 @@ void VoiceConnection::process_packet(const std::shared_ptr<ts::protocol::ServerP
auto flag_head = packet->has_flag(PacketFlag::Compressed);
//container->voice_data = packet->data().length() > 5 ? packet->data().range(5) : pipes::buffer{};
//log_info(category::voice_connection, tr("Received voice packet from {}. Packet ID: {}"), client_id, packet_id);
auto client = this->find_client(client_id);
if(!client) {
log_warn(category::voice_connection, tr("Received voice packet from unknown client {}. Dropping packet!"), client_id);