743 lines
26 KiB
C++
743 lines
26 KiB
C++
#include "VoiceClient.h"
|
|
#include "../../audio/AudioOutput.h"
|
|
#include "../../audio/codec/Converter.h"
|
|
#include "../../audio/codec/OpusConverter.h"
|
|
#include "../../audio/AudioMerger.h"
|
|
#include "../../audio/js/AudioOutputStream.h"
|
|
#include "../../audio/AudioEventLoop.h"
|
|
#include "../../logger.h"
|
|
#include "../../audio/AudioGain.h"
|
|
|
|
using namespace std;
|
|
using namespace tc;
|
|
using namespace tc::audio::codec;
|
|
using namespace tc::connection;
|
|
|
|
extern tc::audio::AudioOutput* global_audio_output;
|
|
|
|
#define DEBUG_PREMATURE_PACKETS
|
|
|
|
#ifdef WIN32
|
|
#define _field_(name, value) value
|
|
#else
|
|
#define _field_(name, value) .name = value
|
|
#endif
|
|
const codec::condec_info codec::info[6] = {
|
|
{
|
|
_field_(supported, false),
|
|
_field_(name, "speex_narrowband"),
|
|
_field_(new_converter, nullptr)
|
|
},
|
|
{
|
|
_field_(supported, false),
|
|
_field_(name, "speex_wideband"),
|
|
_field_(new_converter, nullptr)
|
|
},
|
|
{
|
|
_field_(supported, false),
|
|
_field_(name, "speex_ultra_wideband"),
|
|
_field_(new_converter, nullptr)
|
|
},
|
|
{
|
|
_field_(supported, false),
|
|
_field_(name, "celt_mono"),
|
|
_field_(new_converter, nullptr)
|
|
},
|
|
{
|
|
_field_(supported, true),
|
|
_field_(name, "opus_voice"),
|
|
_field_(new_converter, [](string& error) -> shared_ptr<Converter> {
|
|
auto result = make_shared<OpusConverter>(1, 48000, 960);
|
|
if(!result->initialize(error, OPUS_APPLICATION_VOIP))
|
|
return nullptr;
|
|
return dynamic_pointer_cast<Converter>(result);
|
|
})
|
|
},
|
|
{
|
|
_field_(supported, true),
|
|
_field_(name, "opus_music"),
|
|
_field_(new_converter, [](string& error) -> shared_ptr<Converter> {
|
|
auto result = make_shared<OpusConverter>(2, 48000, 960);
|
|
if(!result->initialize(error, OPUS_APPLICATION_AUDIO))
|
|
return nullptr;
|
|
return dynamic_pointer_cast<Converter>(result);
|
|
})
|
|
}
|
|
};
|
|
|
|
void VoiceClientWrap::do_wrap(const v8::Local<v8::Object> &object) {
|
|
this->Wrap(object);
|
|
|
|
auto handle = this->_handle.lock();
|
|
if(!handle) {
|
|
Nan::ThrowError("weak handle");
|
|
return;
|
|
}
|
|
Nan::Set(object, Nan::New<v8::String>("client_id").ToLocalChecked(), Nan::New<v8::Number>(handle->client_id()));
|
|
handle->on_state_changed = [&]{ this->call_state_changed(); };
|
|
|
|
this->call_state_changed = Nan::async_callback([&]{
|
|
Nan::HandleScope scope{};
|
|
this->call_state_changed_();
|
|
});
|
|
}
|
|
|
|
void VoiceClientWrap::call_state_changed_() {
|
|
auto handle = this->_handle.lock();
|
|
if(!handle) {
|
|
log_warn(category::voice_connection, tr("State changed on invalid handle!"));
|
|
return;
|
|
}
|
|
|
|
auto state = handle->state();
|
|
|
|
const auto was_playing = this->currently_playing_;
|
|
if(state == VoiceClient::state::stopped) {
|
|
this->currently_playing_ = false;
|
|
} else if(state == VoiceClient::state::playing) {
|
|
this->currently_playing_ = true;
|
|
}
|
|
|
|
if(!was_playing && this->currently_playing_) {
|
|
auto callback = Nan::Get(this->handle(), Nan::New<v8::String>("callback_playback").ToLocalChecked()).ToLocalChecked();
|
|
if(callback->IsFunction()) {
|
|
(void) callback.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
|
|
}
|
|
}
|
|
if(was_playing && !this->currently_playing_) {
|
|
auto callback = Nan::Get(this->handle(), Nan::New<v8::String>("callback_stopped").ToLocalChecked()).ToLocalChecked();
|
|
if(callback->IsFunction()) {
|
|
(void) callback.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
|
|
}
|
|
}
|
|
|
|
auto callback = Nan::Get(this->handle(), Nan::New<v8::String>("callback_state_changed").ToLocalChecked()).ToLocalChecked();
|
|
if(callback->IsFunction()) {
|
|
v8::Local<v8::Value> argv[1] = {
|
|
Nan::New<v8::Number>(state)
|
|
};
|
|
(void) callback.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
|
|
}
|
|
}
|
|
|
|
NAN_MODULE_INIT(VoiceClientWrap::Init) {
|
|
auto klass = Nan::New<v8::FunctionTemplate>(VoiceClientWrap::NewInstance);
|
|
klass->SetClassName(Nan::New("VoiceConnection").ToLocalChecked());
|
|
klass->InstanceTemplate()->SetInternalFieldCount(1);
|
|
|
|
Nan::SetPrototypeMethod(klass, "get_state", VoiceClientWrap::_get_state);
|
|
Nan::SetPrototypeMethod(klass, "get_volume", VoiceClientWrap::_get_volume);
|
|
Nan::SetPrototypeMethod(klass, "set_volume", VoiceClientWrap::_set_volume);
|
|
Nan::SetPrototypeMethod(klass, "abort_replay", VoiceClientWrap::_abort_replay);
|
|
Nan::SetPrototypeMethod(klass, "get_stream", VoiceClientWrap::_get_stream);
|
|
|
|
constructor().Reset(Nan::GetFunction(klass).ToLocalChecked());
|
|
}
|
|
|
|
NAN_METHOD(VoiceClientWrap::NewInstance) {
|
|
if(!info.IsConstructCall())
|
|
Nan::ThrowError("invalid invoke!");
|
|
}
|
|
|
|
NAN_METHOD(VoiceClientWrap::_get_volume) {
|
|
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
|
|
|
|
auto handle = client->_handle.lock();
|
|
if(!handle) {
|
|
Nan::ThrowError("weak handle");
|
|
return;
|
|
}
|
|
|
|
info.GetReturnValue().Set(handle->get_volume());
|
|
}
|
|
|
|
NAN_METHOD(VoiceClientWrap::_set_volume) {
|
|
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
|
|
|
|
auto handle = client->_handle.lock();
|
|
if(!handle) {
|
|
Nan::ThrowError("weak handle");
|
|
return;
|
|
}
|
|
|
|
if(info.Length() != 1 || !info[0]->IsNumber()) {
|
|
Nan::ThrowError("Invalid arguments");
|
|
return;
|
|
}
|
|
|
|
handle->set_volume((float) info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0));
|
|
}
|
|
NAN_METHOD(VoiceClientWrap::_abort_replay) {
|
|
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
|
|
|
|
auto handle = client->_handle.lock();
|
|
if(!handle) {
|
|
Nan::ThrowError("weak handle");
|
|
return;
|
|
}
|
|
|
|
handle->cancel_replay();
|
|
}
|
|
|
|
NAN_METHOD(VoiceClientWrap::_get_state) {
|
|
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
|
|
|
|
auto handle = client->_handle.lock();
|
|
if(!handle) {
|
|
Nan::ThrowError("weak handle");
|
|
return;
|
|
}
|
|
|
|
info.GetReturnValue().Set(handle->state());
|
|
}
|
|
|
|
|
|
NAN_METHOD(VoiceClientWrap::_get_stream) {
|
|
auto client = ObjectWrap::Unwrap<VoiceClientWrap>(info.Holder());
|
|
|
|
auto handle = client->_handle.lock();
|
|
if(!handle) {
|
|
Nan::ThrowError("weak handle");
|
|
return;
|
|
}
|
|
|
|
auto wrapper = new audio::AudioOutputStreamWrapper(handle->output_stream(), false);
|
|
auto object = Nan::NewInstance(Nan::New(audio::AudioOutputStreamWrapper::constructor()), 0, nullptr).ToLocalChecked();
|
|
wrapper->do_wrap(object);
|
|
info.GetReturnValue().Set(object);
|
|
}
|
|
|
|
VoiceClientWrap::VoiceClientWrap(const std::shared_ptr<VoiceClient>& client) : _handle(client) { }
|
|
|
|
VoiceClientWrap::~VoiceClientWrap() = default;
|
|
|
|
VoiceClient::VoiceClient(const std::shared_ptr<VoiceConnection>&, uint16_t client_id) : client_id_(client_id) {
|
|
this->execute_lock_timeout = std::chrono::microseconds{500};
|
|
}
|
|
|
|
VoiceClient::~VoiceClient() {
|
|
if(v8::Isolate::GetCurrent()) {
|
|
this->finalize_js_object();
|
|
} else {
|
|
assert(this->js_handle_.IsEmpty());
|
|
}
|
|
|
|
this->cancel_replay(); /* cleanup all buffers */
|
|
if(this->output_source) {
|
|
this->output_source->on_underflow = nullptr; /* to ensure */
|
|
this->output_source = nullptr;
|
|
}
|
|
}
|
|
|
|
void VoiceClient::initialize() {
|
|
auto weak_this = this->ref_;
|
|
|
|
audio::initialize([weak_this]{
|
|
auto client = weak_this.lock();
|
|
if(!client) {
|
|
return;
|
|
}
|
|
|
|
assert(global_audio_output);
|
|
client->output_source = global_audio_output->create_source();
|
|
client->output_source->overflow_strategy = audio::overflow_strategy::ignore;
|
|
client->output_source->set_max_buffered_samples((size_t) ceil(client->output_source->sample_rate * 0.5));
|
|
client->output_source->set_min_buffered_samples((size_t) ceil(client->output_source->sample_rate * 0.04));
|
|
|
|
client->output_source->on_underflow = [weak_this](size_t sample_count){ /* this callback will never be called when the client has been deallocated */
|
|
auto client = weak_this.lock();
|
|
if(!client) {
|
|
return false;
|
|
}
|
|
|
|
if(client->state_ == state::stopping) {
|
|
client->set_state(state::stopped);
|
|
} else if(client->state_ != state::stopped) {
|
|
if(client->_last_received_packet + chrono::seconds{1} < chrono::system_clock::now()) {
|
|
client->set_state(state::stopped);
|
|
log_warn(category::audio, tr("Client {} has a audio buffer underflow for {} samples and not received any data for one second. Stopping replay."), client->client_id_, sample_count);
|
|
} else {
|
|
if(client->state_ != state::buffering) {
|
|
log_warn(category::audio, tr("Client {} has a audio buffer underflow for {} samples. Buffer again."), client->client_id_, sample_count);
|
|
client->set_state(state::buffering);
|
|
}
|
|
|
|
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(client));
|
|
}
|
|
}
|
|
|
|
return false;
|
|
};
|
|
client->output_source->on_overflow = [weak_this](size_t count){
|
|
auto client = weak_this.lock();
|
|
if(!client) {
|
|
return;
|
|
}
|
|
|
|
log_warn(category::audio, tr("Client {} has a audio buffer overflow of {}."), client->client_id_, count);
|
|
};
|
|
});
|
|
}
|
|
|
|
void VoiceClient::execute_tick() {
|
|
if(this->state_ == state::buffering && this->_last_received_packet + chrono::milliseconds{250} < chrono::system_clock::now()) {
|
|
this->set_state(state::stopped);
|
|
log_debug(category::audio, tr("Audio stop packet for client {} seems to be lost. Stopping playback."), this->client_id_);
|
|
}
|
|
}
|
|
|
|
void VoiceClient::initialize_js_object() {
|
|
if(!this->js_handle_.IsEmpty())
|
|
return;
|
|
|
|
auto object_wrap = new VoiceClientWrap(this->ref());
|
|
auto object = Nan::NewInstance(Nan::New(VoiceClientWrap::constructor()), 0, nullptr).ToLocalChecked();
|
|
Nan::TryCatch tc{};
|
|
object_wrap->do_wrap(object);
|
|
if(tc.HasCaught()) {
|
|
tc.ReThrow();
|
|
return;
|
|
}
|
|
|
|
this->js_handle_.Reset(Nan::GetCurrentContext()->GetIsolate(), object);
|
|
}
|
|
|
|
void VoiceClient::finalize_js_object() {
|
|
this->js_handle_.Reset();
|
|
}
|
|
|
|
/**
|
|
* @param lower The packet ID which should be lower than the other
|
|
* @param upper The packet id which should be higher than the lower one
|
|
* @param clip_window The size how long the "overflow" counts
|
|
* @return true if lower is less than upper
|
|
*/
|
|
#ifdef max
|
|
#undef max
|
|
#endif
|
|
inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t window) {
|
|
constexpr auto bounds = std::numeric_limits<uint16_t>::max();
|
|
|
|
if(bounds - window <= lower) {
|
|
uint16_t max_clip = lower + window;
|
|
if(upper <= max_clip)
|
|
return true;
|
|
else if(upper > lower)
|
|
return true;
|
|
return false;
|
|
} else {
|
|
if(lower >= upper)
|
|
return false;
|
|
|
|
return upper - lower <= window;
|
|
}
|
|
}
|
|
|
|
inline constexpr uint16_t packet_id_diff(uint16_t lower, uint16_t upper) {
|
|
if(upper < lower)
|
|
return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower);
|
|
return upper - lower;
|
|
}
|
|
|
|
#define MAX_LOST_PACKETS (6)
|
|
#define target_buffer_length 16384
|
|
void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value codec, bool is_head) {
|
|
#if 0
|
|
if(rand() % 10 == 0) {
|
|
log_info(category::audio, tr("Dropping audio packet id {}"), packet_id);
|
|
return;
|
|
}
|
|
#endif
|
|
if(codec < 0 || codec > this->codec.size()) {
|
|
log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->client_id_, codec);
|
|
return;
|
|
}
|
|
|
|
if(!this->output_source) {
|
|
/* audio hasn't been initialized yet */
|
|
return;
|
|
}
|
|
|
|
auto& codec_data = this->codec[codec];
|
|
if(codec_data.state == AudioCodec::State::UNINITIALIZED)
|
|
this->initialize_code(codec);
|
|
|
|
if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) {
|
|
log_warn(category::voice_connection, tr("Dropping audio packet because audio codec {} hasn't been initialized successfully (state: {})"), codec, (int) codec_data.state);
|
|
return;
|
|
}
|
|
//TODO: short circuit handling if we've muted him (e.g. volume = 0)
|
|
|
|
auto encoded_buffer = new EncodedBuffer{};
|
|
encoded_buffer->packet_id = packet_id;
|
|
encoded_buffer->codec = codec;
|
|
encoded_buffer->receive_timestamp = chrono::system_clock::now();
|
|
encoded_buffer->buffer = buffer.own_buffer();
|
|
encoded_buffer->head = is_head;
|
|
this->_last_received_packet = encoded_buffer->receive_timestamp;
|
|
|
|
|
|
{
|
|
lock_guard lock{codec_data.pending_lock};
|
|
if(codec_data.stream_timeout() < encoded_buffer->receive_timestamp) {
|
|
//Old stream hasn't been terminated successfully.
|
|
//TODO: Cleanup packets which are too old?
|
|
codec_data.force_replay = encoded_buffer;
|
|
} else if(encoded_buffer->buffer.empty()) {
|
|
//Flush replay and stop
|
|
codec_data.force_replay = encoded_buffer;
|
|
}
|
|
|
|
if(packet_id_less(encoded_buffer->packet_id, codec_data.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == codec_data.last_packet_id) {
|
|
log_debug(category::voice_connection,
|
|
tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, codec_data.last_packet_id);
|
|
return;
|
|
}
|
|
|
|
/* insert the new buffer */
|
|
{
|
|
EncodedBuffer* prv_head{nullptr};
|
|
auto head{codec_data.pending_buffers};
|
|
while(head && packet_id_less(head->packet_id, encoded_buffer->packet_id, MAX_LOST_PACKETS)) {
|
|
prv_head = head;
|
|
head = head->next;
|
|
}
|
|
|
|
encoded_buffer->next = head;
|
|
if(prv_head)
|
|
prv_head->next = encoded_buffer;
|
|
else
|
|
codec_data.pending_buffers = encoded_buffer;
|
|
}
|
|
codec_data.last_packet_timestamp = encoded_buffer->receive_timestamp;
|
|
codec_data.process_pending = true;
|
|
}
|
|
|
|
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
|
|
}
|
|
|
|
void VoiceClient::cancel_replay() {
|
|
log_trace(category::voice_connection, tr("Cancel replay for client {}"), this->client_id_);
|
|
|
|
auto output = this->output_source;
|
|
if(output) {
|
|
output->clear();
|
|
}
|
|
|
|
this->set_state(state::stopped);
|
|
audio::decode_event_loop->cancel(static_pointer_cast<event::EventEntry>(this->ref()));
|
|
|
|
auto execute_lock = this->execute_lock(true);
|
|
for(auto& codec_entry : this->codec) {
|
|
auto head = codec_entry.pending_buffers;
|
|
while(head) {
|
|
auto tmp = head->next;
|
|
delete head;
|
|
head = tmp;
|
|
}
|
|
|
|
codec_entry.pending_buffers = nullptr;
|
|
codec_entry.force_replay = nullptr;
|
|
}
|
|
}
|
|
|
|
void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) {
|
|
if(!this->output_source) {
|
|
/* Audio hasn't been initialized yet. This also means there is no audio to be processed */
|
|
return;
|
|
}
|
|
|
|
static auto max_time = chrono::milliseconds(10);
|
|
auto reschedule{false};
|
|
string error;
|
|
|
|
auto timeout = chrono::system_clock::now() + max_time;
|
|
|
|
for(auto& audio_codec : this->codec) {
|
|
if(!audio_codec.process_pending) {
|
|
continue;
|
|
}
|
|
|
|
unique_lock lock{audio_codec.pending_lock};
|
|
do {
|
|
EncodedBuffer* replay_head{nullptr};
|
|
uint16_t local_last_pid{audio_codec.last_packet_id};
|
|
|
|
/* nothing to play */
|
|
if(!audio_codec.pending_buffers) {
|
|
audio_codec.process_pending = false;
|
|
break;
|
|
}
|
|
|
|
if(audio_codec.force_replay) {
|
|
replay_head = audio_codec.pending_buffers;
|
|
audio_codec.pending_buffers = audio_codec.force_replay->next;
|
|
|
|
audio_codec.force_replay->next = nullptr;
|
|
audio_codec.force_replay = nullptr;
|
|
} else {
|
|
EncodedBuffer* prv_head{nullptr};
|
|
EncodedBuffer* head{nullptr};
|
|
|
|
//Trying to replay the sequence
|
|
head = audio_codec.pending_buffers;
|
|
while(head && head->packet_id == audio_codec.last_packet_id + 1) {
|
|
if(!replay_head)
|
|
replay_head = audio_codec.pending_buffers;
|
|
|
|
audio_codec.last_packet_id++;
|
|
prv_head = head;
|
|
head = head->next;
|
|
}
|
|
audio_codec.pending_buffers = head;
|
|
|
|
if(prv_head) {
|
|
prv_head->next = nullptr; /* mark the tail */
|
|
} else {
|
|
assert(!replay_head); /* could not be set, else prv_head would be set */
|
|
|
|
//No packet found here, test if we've more than n packets in a row somewhere
|
|
#define SKIP_SEQ_LENGTH (3)
|
|
EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1];
|
|
memset(skip_ptr, 0, sizeof(skip_ptr));
|
|
skip_ptr[0] = audio_codec.pending_buffers;
|
|
|
|
while(skip_ptr[0]->next) {
|
|
for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) {
|
|
if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id)
|
|
break;
|
|
|
|
skip_ptr[i + 1] = skip_ptr[i]->next;
|
|
}
|
|
if(skip_ptr[SKIP_SEQ_LENGTH])
|
|
break;
|
|
|
|
skip_ptr[0] = skip_ptr[0]->next;
|
|
}
|
|
|
|
if(skip_ptr[SKIP_SEQ_LENGTH]) {
|
|
/* we've three packets in a row */
|
|
replay_head = audio_codec.pending_buffers;
|
|
audio_codec.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH];
|
|
skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr;
|
|
log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), audio_codec.last_packet_id, replay_head->packet_id, SKIP_SEQ_LENGTH);
|
|
|
|
/* Do not set process_pending to false, because we're not done
|
|
* We're just replaying all loose packets which are not within a sequence until we reach a sequence
|
|
* In the next loop the sequence will be played
|
|
*/
|
|
} else {
|
|
head = audio_codec.pending_buffers;
|
|
while(head) {
|
|
if(packet_id_diff(audio_codec.last_packet_id, head->packet_id) >= 5) {
|
|
break;
|
|
}
|
|
head = head->next;
|
|
}
|
|
|
|
if(head) {
|
|
replay_head = audio_codec.pending_buffers;
|
|
audio_codec.pending_buffers = head->next;
|
|
head->next = nullptr;
|
|
log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"),
|
|
audio_codec.last_packet_id, replay_head->packet_id);
|
|
/* do not negate process_pending here. Same reason as with the 3 sequence */
|
|
} else {
|
|
/* no packets we're willing to replay */
|
|
audio_codec.process_pending = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if(!replay_head) {
|
|
audio_codec.process_pending = false;
|
|
break;
|
|
}
|
|
|
|
{
|
|
auto head = replay_head;
|
|
while(head->next)
|
|
head = head->next;
|
|
|
|
audio_codec.last_packet_id = head->packet_id;
|
|
const auto ordered = !audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10);
|
|
if(!ordered) {
|
|
log_critical(category::voice_connection, tr("Unordered packet ids. [!audio_codec.pending_buffers: {}; a: {}; b: {}]"),
|
|
!audio_codec.pending_buffers,
|
|
audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id
|
|
);
|
|
//assert(!audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10));
|
|
}
|
|
}
|
|
lock.unlock();
|
|
while(replay_head) {
|
|
if(replay_head->buffer.empty()) {
|
|
this->set_state(state::stopping);
|
|
log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->client_id_);
|
|
} else {
|
|
auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1;
|
|
if(lost_packets > 10) {
|
|
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->client_id_, lost_packets, local_last_pid, replay_head->packet_id);
|
|
replay_head->reset_decoder = true;
|
|
} else if(lost_packets > 0) {
|
|
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. FEC decoding it."), this->client_id_, lost_packets);
|
|
/*
|
|
if(audio_codec.converter->decode_lost(error, lost_packets))
|
|
log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error);
|
|
*/
|
|
auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer, true);
|
|
if(decoded) {
|
|
this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size);
|
|
}
|
|
}
|
|
|
|
const auto is_new_audio_stream = this->state_ != state::buffering && this->state_ != state::playing;
|
|
if(replay_head->reset_decoder || is_new_audio_stream) {
|
|
audio_codec.converter->reset_decoder();
|
|
replay_head->reset_decoder = false;
|
|
|
|
#if 1 /* Better approch */
|
|
/* initialize with last packet */
|
|
char target_buffer[target_buffer_length];
|
|
if(target_buffer_length > audio_codec.converter->expected_decoded_length(replay_head->buffer.data_ptr(), replay_head->buffer.length())) {
|
|
audio_codec.converter->decode(error, replay_head->buffer.data_ptr(), replay_head->buffer.length(), target_buffer, 1);
|
|
} else {
|
|
//TODO: May a small warning here?
|
|
}
|
|
#endif
|
|
}
|
|
|
|
#if 0 /* (maybe) TS3 approch */
|
|
if(replay_head->head) {
|
|
/* initialize with last packet */
|
|
char target_buffer[target_buffer_length];
|
|
if(target_buffer_length > audio_codec.converter->expected_decoded_length(replay_head->buffer.data_ptr(), replay_head->buffer.length())) {
|
|
audio_codec.converter->decode(error, replay_head->buffer.data_ptr(), replay_head->buffer.length(), target_buffer, 1);
|
|
} else {
|
|
//TODO: May a small warning here?
|
|
}
|
|
}
|
|
#endif
|
|
|
|
//TODO: Use statically allocated buffer?
|
|
auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer, false);
|
|
if(!decoded) {
|
|
log_warn(category::audio, tr("Failed to decode buffer for client {} (nullptr). Dropping buffer."), this->client_id_, error);
|
|
} else {
|
|
if(is_new_audio_stream) {
|
|
log_warn(category::audio, tr("New audio chunk for client {}"), this->client_id_);
|
|
|
|
//this->output_source->enqueue_silence((size_t) ceil(0.0075f * (float) this->output_source->sample_rate)); /* enqueue 7.5ms silence so we give the next packet a chance to be send */
|
|
}
|
|
this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size);
|
|
this->set_state(state::playing);
|
|
}
|
|
}
|
|
|
|
local_last_pid = replay_head->packet_id;
|
|
|
|
auto last_head = replay_head;
|
|
replay_head = replay_head->next;
|
|
delete last_head;
|
|
}
|
|
lock.lock(); //Check for more packets
|
|
//TODO: Check for timeout?
|
|
} while(audio_codec.process_pending);
|
|
}
|
|
|
|
if(reschedule) {
|
|
log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"),
|
|
chrono::duration_cast<chrono::microseconds>(max_time).count());
|
|
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
|
|
}
|
|
}
|
|
|
|
void VoiceClient::initialize_code(const codec::value &audio_codec) {
|
|
assert(this->output_source);
|
|
|
|
string error;
|
|
|
|
auto& codec_data = this->codec[audio_codec];
|
|
if(codec_data.state != AudioCodec::State::UNINITIALIZED) {
|
|
log_warn(category::voice_connection, tr("Could not initialize codec of type {} because it isn't in uninitialized state anymore!"), (int) codec_data.state);
|
|
return;
|
|
}
|
|
|
|
codec_data.codec = audio_codec;
|
|
|
|
auto info = codec::get_info(audio_codec);
|
|
if(!info || !info->supported) {
|
|
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Codec is not supported"), audio_codec, this->client_id_);
|
|
codec_data.state = AudioCodec::State::UNSUPPORTED;
|
|
return;
|
|
}
|
|
|
|
codec_data.state = AudioCodec::State::INITIALIZED_FAIL;
|
|
codec_data.converter = info->new_converter(error);
|
|
if(!codec_data.converter) {
|
|
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize decoder: {}"), audio_codec, this->client_id_, error);
|
|
return;
|
|
}
|
|
|
|
codec_data.resampler = make_shared<audio::AudioResampler>(codec_data.converter->sample_rate(), this->output_source->sample_rate, this->output_source->channel_count);
|
|
if(!codec_data.resampler->valid()) {
|
|
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize resampler"), audio_codec, this->client_id_);
|
|
return;
|
|
}
|
|
|
|
codec_data.state = AudioCodec::State::INITIALIZED_SUCCESSFULLY;
|
|
log_trace(category::voice_connection, tr("Successfully initialized codec {} for client {}."), audio_codec, this->client_id_);
|
|
}
|
|
|
|
std::shared_ptr<audio::SampleBuffer> VoiceClient::decode_buffer(const codec::value &audio_codec, const pipes::buffer_view &buffer, bool fec) {
|
|
assert(this->output_source);
|
|
|
|
auto& codec_data = this->codec[audio_codec];
|
|
if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) {
|
|
log_trace(category::audio, tr("Cant decode auto buffer of codec {} because codec isn't successfully initialized (state: {})"), audio_codec, (int) codec_data.state);
|
|
return nullptr;
|
|
}
|
|
|
|
string error;
|
|
char target_buffer[target_buffer_length];
|
|
if(target_buffer_length < codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())) {
|
|
log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length()));
|
|
return nullptr;
|
|
}
|
|
auto samples = codec_data.converter->decode(error, buffer.data_ptr(), buffer.length(), target_buffer, fec);
|
|
if(samples < 0) {
|
|
log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error);
|
|
return nullptr;
|
|
}
|
|
|
|
if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count, target_buffer, codec_data.converter->channels(), samples)) {
|
|
log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!"));
|
|
return nullptr;
|
|
}
|
|
|
|
if(target_buffer_length < codec_data.resampler->estimated_output_size(samples) * this->output_source->channel_count * sizeof(float)) {
|
|
log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, (codec_data.resampler->estimated_output_size(samples) * this->output_source->channel_count * 4));
|
|
return nullptr;
|
|
}
|
|
|
|
auto resampled_samples = codec_data.resampler->process(target_buffer, target_buffer, samples);
|
|
if(resampled_samples <= 0) {
|
|
log_warn(category::voice_connection, tr("Failed to resample audio data. Resampler resulted in {}"), resampled_samples);
|
|
return nullptr;
|
|
}
|
|
|
|
audio::apply_gain(target_buffer, this->output_source->channel_count, resampled_samples, this->volume_);
|
|
auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count, (uint16_t) resampled_samples);
|
|
|
|
audio_buffer->sample_index = 0;
|
|
memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count * resampled_samples * 4);
|
|
return audio_buffer;
|
|
}
|
|
|
|
void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) {
|
|
if(audio_decode_event_dropped.exchange(true)) {
|
|
//Is not really a warning, it happens all the time and isn't really an issue
|
|
//log_warn(category::voice_connection, tr("Dropped auto enqueue event execution two or more times in a row for client {}"), this->_client_id);
|
|
}
|
|
}
|