TeaSpeak-Client/native/serverconnection/src/connection/audio/VoiceClient.cpp

634 lines
22 KiB
C++

#include "VoiceClient.h"
#include "../../audio/AudioOutput.h"
#include "../../audio/codec/Converter.h"
#include "../../audio/codec/OpusConverter.h"
#include "../../audio/AudioMerger.h"
#include "../../audio/js/AudioOutputStream.h"
#include "AudioEventLoop.h"
#include "../../logger.h"
using namespace std;
using namespace tc;
using namespace tc::audio::codec;
using namespace tc::connection;
extern tc::audio::AudioOutput* global_audio_output;
#define DEBUG_PREMATURE_PACKETS
#ifdef WIN32
#define _field_(name, value) value
#else
#define _field_(name, value) .name = value
#endif
const codec::condec_info codec::info[6] = {
{
_field_(supported, false),
_field_(name, "speex_narrowband"),
_field_(new_converter, nullptr)
},
{
_field_(supported, false),
_field_(name, "speex_wideband"),
_field_(new_converter, nullptr)
},
{
_field_(supported, false),
_field_(name, "speex_ultra_wideband"),
_field_(new_converter, nullptr)
},
{
_field_(supported, false),
_field_(name, "celt_mono"),
_field_(new_converter, nullptr)
},
{
_field_(supported, true),
_field_(name, "opus_voice"),
_field_(new_converter, [](string& error) -> shared_ptr<Converter> {
auto result = make_shared<OpusConverter>(1, 48000, 960);
if(!result->initialize(error, OPUS_APPLICATION_VOIP))
return nullptr;
return dynamic_pointer_cast<Converter>(result);
})
},
{
_field_(supported, true),
_field_(name, "opus_music"),
_field_(new_converter, [](string& error) -> shared_ptr<Converter> {
auto result = make_shared<OpusConverter>(2, 48000, 960);
if(!result->initialize(error, OPUS_APPLICATION_AUDIO))
return nullptr;
return dynamic_pointer_cast<Converter>(result);
})
}
};
void VoiceClientWrap::do_wrap(const v8::Local<v8::Object> &object) {
this->Wrap(object);
auto handle = this->_handle.lock();
if(!handle) {
Nan::ThrowError("weak handle");
return;
}
Nan::Set(object, Nan::New<v8::String>("client_id").ToLocalChecked(), Nan::New<v8::Number>(handle->client_id()));
handle->on_state_changed = [&]{ this->call_state_changed(); };
this->call_state_changed = Nan::async_callback([&]{
Nan::HandleScope scope;
this->_call_state_changed();
});
}
void VoiceClientWrap::_call_state_changed() {
auto handle = this->_handle.lock();
if(!handle) {
log_warn(category::voice_connection, tr("State changed on invalid handle!"));
return;
}
auto state = handle->state();
auto call_playback_callback = state == VoiceClient::state::playing && !this->_currently_playing;
auto call_stopped_callback = state == VoiceClient::state::stopped && this->_currently_playing;
if(state == VoiceClient::state::stopped)
this->_currently_playing = false;
if(state == VoiceClient::state::playing)
this->_currently_playing = true;
if(call_playback_callback) {
auto callback = Nan::Get(this->handle(), Nan::New<v8::String>("callback_playback").ToLocalChecked()).ToLocalChecked();
if(callback->IsFunction())
callback.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
}
if(call_stopped_callback) {
auto callback = Nan::Get(this->handle(), Nan::New<v8::String>("callback_stopped").ToLocalChecked()).ToLocalChecked();
if(callback->IsFunction())
callback.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
}
auto callback = Nan::Get(this->handle(), Nan::New<v8::String>("callback_state_changed").ToLocalChecked()).ToLocalChecked();
if(callback->IsFunction()) {
v8::Local<v8::Value> argv[1] = {
Nan::New<v8::Number>(state)
};
callback.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
}
}
NAN_MODULE_INIT(VoiceClientWrap::Init) {
auto klass = Nan::New<v8::FunctionTemplate>(VoiceClientWrap::NewInstance);
klass->SetClassName(Nan::New("VoiceConnection").ToLocalChecked());
klass->InstanceTemplate()->SetInternalFieldCount(1);
Nan::SetPrototypeMethod(klass, "get_state", VoiceClientWrap::_get_state);
Nan::SetPrototypeMethod(klass, "get_volume", VoiceClientWrap::_get_volume);
Nan::SetPrototypeMethod(klass, "set_volume", VoiceClientWrap::_set_volume);
Nan::SetPrototypeMethod(klass, "abort_replay", VoiceClientWrap::_abort_replay);
Nan::SetPrototypeMethod(klass, "get_stream", VoiceClientWrap::_get_stream);
constructor().Reset(Nan::GetFunction(klass).ToLocalChecked());
}
NAN_METHOD(VoiceClientWrap::NewInstance) {
if(!info.IsConstructCall())
Nan::ThrowError("invalid invoke!");
}
NAN_METHOD(VoiceClientWrap::_get_volume) {
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
auto handle = client->_handle.lock();
if(!handle) {
Nan::ThrowError("weak handle");
return;
}
info.GetReturnValue().Set(handle->get_volume());
}
NAN_METHOD(VoiceClientWrap::_set_volume) {
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
auto handle = client->_handle.lock();
if(!handle) {
Nan::ThrowError("weak handle");
return;
}
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("Invalid arguments");
return;
}
handle->set_volume(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0));
}
NAN_METHOD(VoiceClientWrap::_abort_replay) {
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
auto handle = client->_handle.lock();
if(!handle) {
Nan::ThrowError("weak handle");
return;
}
handle->cancel_replay();
}
NAN_METHOD(VoiceClientWrap::_get_state) {
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
auto handle = client->_handle.lock();
if(!handle) {
Nan::ThrowError("weak handle");
return;
}
info.GetReturnValue().Set(handle->state());
}
NAN_METHOD(VoiceClientWrap::_get_stream) {
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
auto handle = client->_handle.lock();
if(!handle) {
Nan::ThrowError("weak handle");
return;
}
auto wrapper = new audio::AudioOutputStreamWrapper(handle->output_stream(), false);
auto object = Nan::NewInstance(Nan::New(audio::AudioOutputStreamWrapper::constructor()), 0, nullptr).ToLocalChecked();
wrapper->do_wrap(object);
info.GetReturnValue().Set(object);
}
VoiceClientWrap::VoiceClientWrap(const std::shared_ptr<VoiceClient>& client) : _handle(client) { }
VoiceClientWrap::~VoiceClientWrap() {}
VoiceClient::VoiceClient(const std::shared_ptr<VoiceConnection>&, uint16_t client_id) : _client_id(client_id) {
this->output_source = global_audio_output->create_source();
this->output_source->overflow_strategy = audio::overflow_strategy::ignore;
this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 0.5);
this->output_source->min_buffer = (size_t) ceil(this->output_source->sample_rate * 0.04);
this->output_source->on_underflow = [&]{
if(this->_state == state::stopping)
this->set_state(state::stopped);
else if(this->_state != state::stopped) {
if(this->_last_received_packet + chrono::seconds(1) < chrono::system_clock::now()) {
this->set_state(state::stopped);
log_warn(category::audio, tr("Client {} has a audio buffer underflow and not received any data for one second. Stopping replay."), this->_client_id);
} else {
if(this->_state != state::buffering) {
log_warn(category::audio, tr("Client {} has a audio buffer underflow. Buffer again."), this->_client_id);
this->set_state(state::buffering);
}
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
}
}
return false;
};
this->output_source->on_overflow = [&](size_t count){
log_warn(category::audio, tr("Client {} has a audio buffer overflow of {}."), this->_client_id, count);
};
this->execute_lock_timeout = std::chrono::microseconds{500};
}
VoiceClient::~VoiceClient() {
if(v8::Isolate::GetCurrent())
this->finalize_js_object();
else {
assert(this->_js_handle.IsEmpty());
}
this->cancel_replay(); /* cleanup all buffers */
this->output_source->on_underflow = nullptr; /* to ensure */
global_audio_output->delete_source(this->output_source);
}
void VoiceClient::initialize_js_object() {
if(!this->_js_handle.IsEmpty())
return;
auto object_wrap = new VoiceClientWrap(this->ref());
auto object = Nan::NewInstance(Nan::New(VoiceClientWrap::constructor()), 0, nullptr).ToLocalChecked();
Nan::TryCatch tc;
object_wrap->do_wrap(object);
if(tc.HasCaught()) {
tc.ReThrow();
return;
}
this->_js_handle.Reset(Nan::GetCurrentContext()->GetIsolate(), object);
}
void VoiceClient::finalize_js_object() {
this->_js_handle.Reset();
}
/**
* @param lower The packet ID which should be lower than the other
* @param upper The packet id which should be higher than the lower one
* @param clip_window The size how long the "overflow" counts
* @return true if lower is less than upper
*/
inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t window) {
constexpr auto bounds = std::numeric_limits<uint16_t>::max();
if(bounds - window <= lower) {
uint16_t max_clip = lower + window;
if(upper <= max_clip)
return true;
else if(upper > lower)
return true;
return false;
} else {
if(lower >= upper)
return false;
return upper - lower <= window;
}
}
inline constexpr uint16_t packet_id_diff(uint16_t lower, uint16_t upper) {
if(upper < lower)
return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower);
return upper - lower;
}
#define MAX_LOST_PACKETS (6)
#define target_buffer_length 16384
void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value codec, bool is_head) {
if(codec < 0 || codec > this->codec.size()) {
log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->_client_id, codec);
return;
}
auto& codec_data = this->codec[codec];
if(codec_data.state == AudioCodec::State::UNINITIALIZED)
this->initialize_code(codec);
if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) {
log_warn(category::voice_connection, tr("Dropping audio packet because audio codec {} hasn't been initialized successfully (state: {})"), codec, (int) codec_data.state);
return;
}
//TODO: short circuit handling if we've muted him (e.g. volume = 0)
auto encoded_buffer = new EncodedBuffer{};
encoded_buffer->packet_id = packet_id;
encoded_buffer->codec = codec;
encoded_buffer->receive_timestamp = chrono::system_clock::now();
encoded_buffer->buffer = buffer.own_buffer();
encoded_buffer->head = is_head;
this->_last_received_packet = encoded_buffer->receive_timestamp;
{
lock_guard lock{codec_data.pending_lock};
if(codec_data.stream_timeout() < encoded_buffer->receive_timestamp) {
//Old stream hasn't been terminated successfully.
//TODO: Cleanup packets which are too old?
codec_data.force_replay = encoded_buffer;
} else if(encoded_buffer->buffer.empty()) {
//Flush replay and stop
codec_data.force_replay = encoded_buffer;
}
if(packet_id_less(encoded_buffer->packet_id, codec_data.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == codec_data.last_packet_id) {
log_debug(category::voice_connection,
tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, codec_data.last_packet_id);
return;
}
/* insert the new buffer */
{
EncodedBuffer* prv_head{nullptr};
auto head{codec_data.pending_buffers};
while(head && packet_id_less(head->packet_id, encoded_buffer->packet_id, MAX_LOST_PACKETS)) {
prv_head = head;
head = head->next;
}
encoded_buffer->next = head;
if(prv_head)
prv_head->next = encoded_buffer;
else
codec_data.pending_buffers = encoded_buffer;
}
codec_data.last_packet_timestamp = encoded_buffer->receive_timestamp;
codec_data.process_pending = true;
}
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
}
void VoiceClient::cancel_replay() {
log_trace(category::voice_connection, tr("Cancel replay for client {}"), this->_client_id);
this->output_source->clear();
this->set_state(state::stopped);
audio::decode_event_loop->cancel(static_pointer_cast<event::EventEntry>(this->ref()));
auto execute_lock = this->execute_lock(true);
for(auto& codec : this->codec) {
auto head = codec.pending_buffers;
while(head) {
auto tmp = head->next;
delete head;
head = tmp;
}
codec.pending_buffers = nullptr;
codec.force_replay = nullptr;
}
}
void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) {
static auto max_time = chrono::milliseconds(10);
auto reschedule{false};
string error;
auto timeout = chrono::system_clock::now() + max_time;
for(auto& audio_codec : this->codec) {
if(!audio_codec.process_pending)
continue;
unique_lock lock{audio_codec.pending_lock};
do {
EncodedBuffer* replay_head{nullptr};
uint16_t local_last_pid{audio_codec.last_packet_id};
/* nothing to play */
if(!audio_codec.pending_buffers) {
audio_codec.process_pending = false;
break;
}
if(audio_codec.force_replay) {
replay_head = audio_codec.pending_buffers;
audio_codec.pending_buffers = audio_codec.force_replay->next;
audio_codec.force_replay->next = nullptr;
audio_codec.force_replay = nullptr;
} else {
EncodedBuffer* prv_head{nullptr};
EncodedBuffer* head{nullptr};
//Trying to replay the sequence
head = audio_codec.pending_buffers;
while(head && head->packet_id == audio_codec.last_packet_id + 1) {
if(!replay_head)
replay_head = audio_codec.pending_buffers;
audio_codec.last_packet_id++;
prv_head = head;
head = head->next;
}
audio_codec.pending_buffers = head;
if(prv_head) {
prv_head->next = nullptr; /* mark the tail */
} else {
assert(!replay_head); /* could not be set, else prv_head would be set */
//No packet found here, test if we've more than n packets in a row somewhere
#define SKIP_SEQ_LENGTH (1)
EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1];
memset(skip_ptr, 0, sizeof(skip_ptr));
skip_ptr[0] = audio_codec.pending_buffers;
while(skip_ptr[0]->next) {
for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) {
if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id)
break;
skip_ptr[i + 1] = skip_ptr[i]->next;
}
if(skip_ptr[SKIP_SEQ_LENGTH])
break;
skip_ptr[0] = skip_ptr[0]->next;
}
if(skip_ptr[SKIP_SEQ_LENGTH]) {
/* we've tree packets in a row */
replay_head = audio_codec.pending_buffers;
audio_codec.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH];
skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr;
log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), audio_codec.last_packet_id, head->packet_id, SKIP_SEQ_LENGTH);
/* Do not set process_pending to false, because we're not done
* We're just replaying all loose packets which are not within a sequence until we reach a sequence
* In the next loop the sequence will be played
*/
} else {
head = audio_codec.pending_buffers;
while(head) {
if(packet_id_diff(audio_codec.last_packet_id, head->packet_id) >= 5) {
break;
}
head = head->next;
}
if(head) {
replay_head = audio_codec.pending_buffers;
audio_codec.pending_buffers = head->next;
head->next = nullptr;
log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"),
audio_codec.last_packet_id, head->packet_id);
/* do not negate process_pending here. Same reason as with the 3 sequence */
} else {
/* no packets we're willing to replay */
audio_codec.process_pending = false;
}
}
}
}
if(!replay_head) {
audio_codec.process_pending = false;
break;
}
{
auto head = replay_head;
while(head->next)
head = head->next;
audio_codec.last_packet_id = head->packet_id;
assert(!audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10));
}
lock.unlock();
while(replay_head) {
if(replay_head->buffer.empty()) {
this->set_state(state::stopping);
log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->_client_id);
} else {
auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1;
if(lost_packets > 6) {
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->_client_id, lost_packets, local_last_pid, replay_head->packet_id);
replay_head->reset_decoder = true;
} else if(lost_packets > 0) {
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. Skipping ahead."), this->_client_id, lost_packets);
if(audio_codec.converter->decode_lost(error, lost_packets))
log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error);
}
if(replay_head->reset_decoder)
audio_codec.converter->reset_decoder();
auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer);
this->output_source->enqueue_samples(decoded);
this->set_state(state::playing);
}
local_last_pid = replay_head->packet_id;
replay_head = replay_head->next;
}
lock.lock(); //Check for more packets
//TODO: Check for timeout?
} while(audio_codec.process_pending);
}
if(reschedule) {
log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"),
chrono::duration_cast<chrono::microseconds>(max_time).count());
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
}
}
void VoiceClient::initialize_code(const codec::value &audio_codec) {
string error;
auto& codec_data = this->codec[audio_codec];
if(codec_data.state != AudioCodec::State::UNINITIALIZED) {
log_warn(category::voice_connection, tr("Could not initialize codec of type {} because it isn't in uninitialized state anymore!"), (int) codec_data.state);
return;
}
codec_data.codec = audio_codec;
auto info = codec::get_info(audio_codec);
if(!info || !info->supported) {
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Codec is not supported"), audio_codec, this->_client_id);
codec_data.state = AudioCodec::State::UNSUPPORTED;
return;
}
codec_data.state = AudioCodec::State::INITIALIZED_FAIL;
codec_data.converter = info->new_converter(error);
if(!codec_data.converter) {
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize decoder: {}"), audio_codec, this->_client_id, error);
return;
}
codec_data.resampler = make_shared<audio::AudioResampler>(codec_data.converter->sample_rate(), this->output_source->sample_rate, codec_data.converter->channels());
if(!codec_data.resampler->valid()) {
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize resampler"), audio_codec, this->_client_id);
return;
}
codec_data.state = AudioCodec::State::INITIALIZED_SUCCESSFULLY;
log_trace(category::voice_connection, tr("Successfully initialized codec {} for client {}."), audio_codec, this->_client_id);
}
std::shared_ptr<audio::SampleBuffer> VoiceClient::decode_buffer(const codec::value &audio_codec, const pipes::buffer_view &buffer) {
auto& codec_data = this->codec[audio_codec];
if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) {
log_trace(category::audio, tr("Cant decode auto buffer of codec {} because codec isn't successfully initialized (state: {})"), audio_codec, (int) codec_data.state);
return nullptr;
}
string error;
char target_buffer[target_buffer_length];
if(target_buffer_length < codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())) {
log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length()));
return nullptr;
}
auto samples = codec_data.converter->decode(error, buffer.data_ptr(), buffer.length(), target_buffer);
if(samples < 0) {
log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error);
return nullptr;
}
if(target_buffer_length < codec_data.resampler->estimated_output_size(samples) * codec_data.resampler->channels() * 4) {
log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, (codec_data.resampler->estimated_output_size(samples) * codec_data.resampler->channels() * 4));
return nullptr;
}
auto resampled_samples = codec_data.resampler->process(target_buffer, target_buffer, samples);
if(resampled_samples <= 0) {
log_warn(category::voice_connection, tr("Failed to resample audio data. Resampler resulted in {}"), resampled_samples);
return nullptr;
}
if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count, target_buffer, codec_data.resampler->channels(), resampled_samples)) {
log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!"));
return nullptr;
}
if(this->_volume != 1) {
auto buf = (float*) target_buffer;
auto count = this->output_source->channel_count * resampled_samples;
while(count-- > 0)
*(buf++) *= this->_volume;
}
auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count, (uint16_t) resampled_samples);
audio_buffer->sample_index = 0;
memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count * resampled_samples * 4);
return audio_buffer;
}
void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) {
if(audio_decode_event_dropped.exchange(true))
//Is not really a warning, it happens all the time and isn't really an issue
;//log_warn(category::voice_connection, tr("Dropped auto enqueue event execution two or more times in a row for client {}"), this->_client_id);
}