diff --git a/native/serverconnection/CMakeLists.txt b/native/serverconnection/CMakeLists.txt index 32f3f8c..f60bdee 100644 --- a/native/serverconnection/CMakeLists.txt +++ b/native/serverconnection/CMakeLists.txt @@ -145,6 +145,9 @@ target_compile_definitions(${MODULE_NAME} PUBLIC -DNODEJS_API) add_executable(Audio-Test ${SOURCE_FILES} test/audio/main.cpp) target_link_libraries(Audio-Test ${REQUIRED_LIBRARIES}) +add_executable(Audio-Test-2 ${SOURCE_FILES} test/audio/sio.cpp) +target_link_libraries(Audio-Test-2 ${REQUIRED_LIBRARIES} soundio.a pulse) + add_executable(HW-UID-Test src/hwuid.cpp) target_link_libraries(HW-UID-Test ${REQUIRED_LIBRARIES} diff --git a/native/serverconnection/src/audio/AudioInput.cpp b/native/serverconnection/src/audio/AudioInput.cpp index 247fd33..d593011 100644 --- a/native/serverconnection/src/audio/AudioInput.cpp +++ b/native/serverconnection/src/audio/AudioInput.cpp @@ -69,7 +69,15 @@ bool AudioInput::open_device(std::string& error, PaDeviceIndex index) { parameters.sampleFormat = paFloat32; parameters.suggestedLatency = this->_current_device->defaultLowOutputLatency; - auto err = Pa_OpenStream(&this->input_stream, ¶meters, nullptr, (double) this->_sample_rate, paFramesPerBufferUnspecified, paClipOff, &AudioInput::_audio_callback, this); + auto err = Pa_OpenStream( + &this->input_stream, + ¶meters, + nullptr, + (double) this->_sample_rate, + paFramesPerBufferUnspecified, + paClipOff, + &AudioInput::_audio_callback, + this); if(err != paNoError) { error = "Pa_OpenStream returned " + to_string(err); return false; @@ -82,6 +90,7 @@ bool AudioInput::record() { lock_guard lock(this->input_stream_lock); if(!this->input_stream) return false; + if(Pa_IsStreamActive(this->input_stream)) return true; diff --git a/native/serverconnection/src/audio/js/AudioOutputStream.cpp b/native/serverconnection/src/audio/js/AudioOutputStream.cpp index 35e8d36..04d53b2 100644 --- a/native/serverconnection/src/audio/js/AudioOutputStream.cpp +++ b/native/serverconnection/src/audio/js/AudioOutputStream.cpp @@ -295,10 +295,5 @@ NAN_METHOD(AudioOutputStreamWrapper::_flush_buffer) { return; } - if(info.Length() != 1 || !info[0]->IsNumber()) { - Nan::ThrowError("Invalid arguments"); - return; - } - handle->clear(); } \ No newline at end of file diff --git a/native/serverconnection/src/connection/ProtocolHandler.cpp b/native/serverconnection/src/connection/ProtocolHandler.cpp index b19f95d..7170693 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.cpp +++ b/native/serverconnection/src/connection/ProtocolHandler.cpp @@ -147,8 +147,10 @@ void ProtocolHandler::execute_resend() { } void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { - if(this->connection_state >= connection_state::DISCONNECTED) + if(this->connection_state >= connection_state::DISCONNECTED) { + log_warn(category::connection, tr("Dropping received packet. We're already disconnected.")); return; + } if(buffer.length() < ServerPacket::META_SIZE) { log_error(category::connection, tr("Received a packet which is too small. ({})"), buffer.length()); @@ -212,8 +214,10 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { } if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow){ - if(packet->has_flag(PacketFlag::Unencrypted)) + if(packet->has_flag(PacketFlag::Unencrypted)) { + log_warn(category::connection, tr("Received unencrypted command packet! Dropping packet.")); return; + } } if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow) @@ -240,8 +244,10 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { } bool ProtocolHandler::handle_packets() { - if(this->connection_state >= connection_state::DISCONNECTED) + if(this->connection_state >= connection_state::DISCONNECTED) { + log_warn(category::connection, tr("Don't handle received packets because we're already disconnected.")); return false; + } bool reexecute_handle = false; shared_ptr current_packet = nullptr; @@ -256,17 +262,23 @@ bool ProtocolHandler::handle_packets() { auto base_index = this->_packet_buffers_index; auto select_index = base_index; auto max_index = this->_packet_buffers.size(); - for(uint8_t index = 0; index < max_index; index++) { + for(size_t index = 0; index < max_index; index++) { if(!buffer) select_index++; auto& buf = this->_packet_buffers[base_index++ % max_index]; unique_lock ring_lock(buf.buffer_lock, try_to_lock); - if(!ring_lock.owns_lock()) continue; + if(!ring_lock.owns_lock()) { + log_debug(category::connection, tr("Skipping packet type {} for handling"), base_index++ % max_index); + continue; + } if(buf.front_set()) { if(!buffer) { /* lets still test for reexecute */ buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock); - if(!buffer_execute_lock.owns_lock()) continue; + if(!buffer_execute_lock.owns_lock()) { + log_debug(category::connection, tr("Skipping packet type {} for handling (already executed)"), base_index++ % max_index); + continue; + } buffer_lock = move(ring_lock); buffer = &buf; @@ -360,17 +372,11 @@ bool ProtocolHandler::handle_packets() { } auto end = chrono::system_clock::now(); - if(end - startTime > chrono::milliseconds(10)) { + if(end - startTime > chrono::milliseconds(5)) { if(current_packet->type() != PacketTypeInfo::Command && current_packet->type() != PacketTypeInfo::CommandLow) { - //FIXME! - /* - logError(this->client->getServerId(), - "{} Handling of packet {} needs more than 10ms ({}ms)", - CLIENT_STR_LOG_PREFIX_(this->client), - current_packet->type().name(), - duration_cast(end - startTime).count() - ); - */ + log_warn(category::connection, + tr("Handling of packet {} ({}) needed longer than expected. Handle time {}ms"), + current_packet->packetId(), current_packet->type().name(), duration_cast(end - startTime).count()); } } } diff --git a/native/serverconnection/src/connection/Socket.cpp b/native/serverconnection/src/connection/Socket.cpp index 558c200..1abf705 100644 --- a/native/serverconnection/src/connection/Socket.cpp +++ b/native/serverconnection/src/connection/Socket.cpp @@ -28,12 +28,20 @@ bool UDPSocket::initialize() { if(this->file_descriptor > 0) return false; - this->file_descriptor = socket(this->_remote_address.ss_family, SOCK_DGRAM, 0); + this->file_descriptor = socket(this->_remote_address.ss_family, SOCK_DGRAM | SOCK_NONBLOCK, 0); if(this->file_descriptor < 2) { this->file_descriptor = 0; return false; } +#ifdef WIN32 + u_long enabled = 0; + auto non_block_rs = ioctlsocket(this->file_descriptor, FIONBIO, &enabled); + if (non_block_rs != NO_ERROR) { + log_warn(category::connection, tr("Failed to enable noblock!")) + } +#endif + /* * TODO: Make configurable */ @@ -127,20 +135,26 @@ void UDPSocket::callback_read(evutil_socket_t fd) { sockaddr source_address{}; socklen_t source_address_length = sizeof(sockaddr); - ssize_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */ + ssize_t read_length = -1; + size_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */ char buffer[1600]; - buffer_length = recvfrom(fd, (char*) buffer, buffer_length, 0, &source_address, &source_address_length); - if(buffer_length <= 0) { - if(errno == EAGAIN) - return; + size_t read_count = 0; + while(true) { //TODO: Some kind of timeout + source_address_length = sizeof(sockaddr); + read_length = recvfrom(fd, (char*) buffer, buffer_length, MSG_DONTWAIT, &source_address, &source_address_length); + if(read_length <= 0) { + if(errno == EAGAIN) + break; - logger::warn(category::socket, tr("Failed to receive data: {}/{}"), errno, strerror(errno)); - return; /* this should never happen! */ - } + logger::warn(category::socket, tr("Failed to receive data: {}/{}"), errno, strerror(errno)); + break; /* this should never happen! */ + } - if(this->on_data) - this->on_data(pipes::buffer_view{buffer, (size_t) buffer_length}); + read_count++; + if(this->on_data) + this->on_data(pipes::buffer_view{buffer, (size_t) read_length}); + } } void UDPSocket::callback_write(evutil_socket_t fd) { @@ -152,7 +166,7 @@ void UDPSocket::callback_write(evutil_socket_t fd) { this->write_queue.pop_front(); lock.unlock(); - auto written = sendto(fd, buffer.data_ptr(), buffer.length(), 0, (sockaddr*) &this->_remote_address, sizeof(this->_remote_address)); + auto written = sendto(fd, buffer.data_ptr(), buffer.length(), MSG_DONTWAIT, (sockaddr*) &this->_remote_address, sizeof(this->_remote_address)); if(written != buffer.length()) { if(errno == EAGAIN) { lock.lock(); diff --git a/native/serverconnection/src/connection/audio/VoiceClient.cpp b/native/serverconnection/src/connection/audio/VoiceClient.cpp index 4b403a2..d785966 100644 --- a/native/serverconnection/src/connection/audio/VoiceClient.cpp +++ b/native/serverconnection/src/connection/audio/VoiceClient.cpp @@ -223,11 +223,10 @@ VoiceClient::VoiceClient(const std::shared_ptr&, uint16_t clien log_warn(category::audio, tr("Client {} has a audio buffer underflow and not received any data for one second. Stopping replay."), this->_client_id); } else { if(this->_state != state::buffering) { - log_warn(category::audio, tr("Client {} has a audio buffer underflow. Buffer again and try to replay prematured packets."), this->_client_id); + log_warn(category::audio, tr("Client {} has a audio buffer underflow. Buffer again."), this->_client_id); this->set_state(state::buffering); } - play_premature_packets = true; /* try to replay any premature packets because we assume that the other packets got lost */ audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } } @@ -248,6 +247,7 @@ VoiceClient::~VoiceClient() { assert(this->_js_handle.IsEmpty()); } + this->cancel_replay(); /* cleanup all buffers */ this->output_source->on_underflow = nullptr; /* to ensure */ global_audio_output->delete_source(this->output_source); } @@ -272,27 +272,97 @@ void VoiceClient::finalize_js_object() { this->_js_handle.Reset(); } -#define target_buffer_length 16384 -void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value codec, bool head) { - if(this->_volume == 0) - return; +/** + * @param lower The packet ID which should be lower than the other + * @param upper The packet id which should be higher than the lower one + * @param clip_window The size how long the "overflow" counts + * @return true if lower is less than upper + */ +inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t window) { + constexpr auto bounds = std::numeric_limits::max(); + if(bounds - window <= lower) { + uint16_t max_clip = lower + window; + if(upper <= max_clip) + return true; + else if(upper > lower) + return true; + return false; + } else { + if(lower >= upper) + return false; + + return upper - lower <= window; + } +} + +inline constexpr uint16_t packet_id_diff(uint16_t lower, uint16_t upper) { + if(upper < lower) + return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower); + return upper - lower; +} + +#define MAX_LOST_PACKETS (6) +#define target_buffer_length 16384 +void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value codec, bool is_head) { if(codec < 0 || codec > this->codec.size()) { log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->_client_id, codec); return; } - auto encoded_buffer = make_unique(); + auto& codec_data = this->codec[codec]; + if(codec_data.state == AudioCodec::State::UNINITIALIZED) + this->initialize_code(codec); + + if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) { + log_warn(category::voice_connection, tr("Dropping audio packet because audio codec {} hasn't been initialized successfully (state: {})"), codec, (int) codec_data.state); + return; + } + //TODO: short circuit handling if we've muted him (e.g. volume = 0) + + auto encoded_buffer = new EncodedBuffer{}; encoded_buffer->packet_id = packet_id; encoded_buffer->codec = codec; encoded_buffer->receive_timestamp = chrono::system_clock::now(); encoded_buffer->buffer = buffer.own_buffer(); - encoded_buffer->head = head; - + encoded_buffer->head = is_head; this->_last_received_packet = encoded_buffer->receive_timestamp; + + { - lock_guard lock(this->audio_decode_queue_lock); - this->audio_decode_queue.push_back(move(encoded_buffer)); + lock_guard lock{codec_data.pending_lock}; + if(codec_data.stream_timeout() < encoded_buffer->receive_timestamp) { + //Old stream hasn't been terminated successfully. + //TODO: Cleanup packets which are too old? + codec_data.force_replay = encoded_buffer; + } else if(encoded_buffer->buffer.empty()) { + //Flush replay and stop + codec_data.force_replay = encoded_buffer; + } + + if(packet_id_less(encoded_buffer->packet_id, codec_data.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == codec_data.last_packet_id) { + log_debug(category::voice_connection, + tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, codec_data.last_packet_id); + return; + } + + /* insert the new buffer */ + { + EncodedBuffer* prv_head{nullptr}; + auto head{codec_data.pending_buffers}; + while(head && packet_id_less(head->packet_id, encoded_buffer->packet_id, MAX_LOST_PACKETS)) { + prv_head = head; + head = head->next; + } + + encoded_buffer->next = head; + if(prv_head) + prv_head->next = encoded_buffer; + else + codec_data.pending_buffers = encoded_buffer; + } + codec_data.last_packet_timestamp = encoded_buffer->receive_timestamp; + codec_data.process_pending = true; } audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); @@ -300,228 +370,246 @@ void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& b void VoiceClient::cancel_replay() { log_trace(category::voice_connection, tr("Cancel replay for client {}"), this->_client_id); + this->output_source->clear(); this->set_state(state::stopped); + audio::decode_event_loop->cancel(static_pointer_cast(this->ref())); + + auto execute_lock = this->execute_lock(true); + for(auto& codec : this->codec) { + auto head = codec.pending_buffers; + while(head) { + auto tmp = head->next; + delete head; + head = tmp; + } + + codec.pending_buffers = nullptr; + codec.force_replay = nullptr; + } } void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) { static auto max_time = chrono::milliseconds(10); + auto reschedule{false}; + string error; - bool reschedule = false; - auto now = chrono::system_clock::now(); - while(true) { - unique_lock buffer_lock(this->audio_decode_queue_lock); - if(this->audio_decode_queue.empty()) - break; + auto timeout = chrono::system_clock::now() + max_time; - if(chrono::system_clock::now() - now > max_time) { - reschedule = true; - break; - } + for(auto& audio_codec : this->codec) { + if(!audio_codec.process_pending) + continue; - auto entry = move(this->audio_decode_queue.front()); - this->audio_decode_queue.pop_front(); - buffer_lock.unlock(); + unique_lock lock{audio_codec.pending_lock}; + do { + EncodedBuffer* replay_head{nullptr}; + uint16_t local_last_pid{audio_codec.last_packet_id}; - //TODO: Drop too old buffers! - this->process_encoded_buffer(entry); - } - { + /* nothing to play */ + if(!audio_codec.pending_buffers) { + audio_codec.process_pending = false; + break; + } - unique_lock buffer_lock(this->audio_decode_queue_lock); - if(this->play_premature_packets) { - this->play_premature_packets = false; + if(audio_codec.force_replay) { + replay_head = audio_codec.pending_buffers; + audio_codec.pending_buffers = audio_codec.force_replay->next; - for(auto& codec_data : this->codec) { - if(!codec_data) continue; + audio_codec.force_replay->next = nullptr; + audio_codec.force_replay = nullptr; + } else { + EncodedBuffer* prv_head{nullptr}; + EncodedBuffer* head{nullptr}; - if(!codec_data->premature_packets.empty()) { - size_t play_index = 0; - bool should_replay = false; - for(; play_index < codec_data->premature_packets.size() - 1; play_index++) { - auto& packet = codec_data->premature_packets[play_index]; - auto& next_packet = codec_data->premature_packets[play_index + 1]; - if(codec_data->last_packet_id + 5 < packet.packet_id) { - //No packets which are in a row, but we have stuff so replay it - should_replay = true; - break; - } else if(packet.packet_id + 1 == next_packet.packet_id) { - /* we've good sound! */ - should_replay = true; - break; + //Trying to replay the sequence + head = audio_codec.pending_buffers; + while(head && head->packet_id == audio_codec.last_packet_id + 1) { + if(!replay_head) + replay_head = audio_codec.pending_buffers; + + audio_codec.last_packet_id++; + prv_head = head; + head = head->next; + } + audio_codec.pending_buffers = head; + + if(prv_head) { + prv_head->next = nullptr; /* mark the tail */ + } else { + assert(!replay_head); /* could not be set, else prv_head would be set */ + + //No packet found here, test if we've more than n packets in a row somewhere +#define SKIP_SEQ_LENGTH (1) + EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1]; + memset(skip_ptr, 0, sizeof(skip_ptr)); + skip_ptr[0] = audio_codec.pending_buffers; + + while(skip_ptr[0]->next) { + for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) { + if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id) + break; + + skip_ptr[i + 1] = skip_ptr[i]->next; } + if(skip_ptr[SKIP_SEQ_LENGTH]) + break; + + skip_ptr[0] = skip_ptr[0]->next; } - if(should_replay) { - for(size_t index = 0; index <= play_index; index++) { - auto& packet = codec_data->premature_packets[index]; + if(skip_ptr[SKIP_SEQ_LENGTH]) { + /* we've tree packets in a row */ + replay_head = audio_codec.pending_buffers; + audio_codec.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH]; + skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr; + log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), audio_codec.last_packet_id, head->packet_id, SKIP_SEQ_LENGTH); - this->output_source->enqueue_samples(packet.buffer); - codec_data->last_packet_id = packet.packet_id; - codec_data->premature_packets.pop_front(); + /* Do not set process_pending to false, because we're not done + * We're just replaying all loose packets which are not within a sequence until we reach a sequence + * In the next loop the sequence will be played + */ + } else { + head = audio_codec.pending_buffers; + while(head) { + if(packet_id_diff(audio_codec.last_packet_id, head->packet_id) >= 5) { + break; + } + head = head->next; + } + + if(head) { + replay_head = audio_codec.pending_buffers; + audio_codec.pending_buffers = head->next; + head->next = nullptr; + log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"), + audio_codec.last_packet_id, head->packet_id); + /* do not negate process_pending here. Same reason as with the 3 sequence */ + } else { + /* no packets we're willing to replay */ + audio_codec.process_pending = false; } - codec_data->premature_packets.erase(codec_data->premature_packets.begin(), codec_data->premature_packets.begin() + play_index + 1); -#ifdef DEBUG_PREMATURE_PACKETS - if(play_index > 0) - log_debug(category::audio, tr("Replayed (buffer underflow) {} premature packets for client {}"), play_index + 1, this->_client_id); -#endif } - break; } } - } - } + if(!replay_head) { + audio_codec.process_pending = false; + break; + } - if(audio_decode_event_dropped.exchange(false) && !reschedule) { - //Is not really a warning, it happens all the time and isn't really an issue - //log_warn(category::voice_connection, tr("Dropped auto enqueue event execution for client {}. No reschedulling planned, hopefully we processed all buffers."), this->_client_id); + { + auto head = replay_head; + while(head->next) + head = head->next; + + audio_codec.last_packet_id = head->packet_id; + assert(!audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10)); + } + lock.unlock(); + while(replay_head) { + if(replay_head->buffer.empty()) { + this->set_state(state::stopping); + log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->_client_id); + } else { + auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1; + if(lost_packets > 6) { + log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->_client_id, lost_packets, local_last_pid, replay_head->packet_id); + replay_head->reset_decoder = true; + } else if(lost_packets > 0) { + log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. Skipping ahead."), this->_client_id, lost_packets); + if(audio_codec.converter->decode_lost(error, lost_packets)) + log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error); + } + + if(replay_head->reset_decoder) + audio_codec.converter->reset_decoder(); + + auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer); + this->output_source->enqueue_samples(decoded); + this->set_state(state::playing); + } + + local_last_pid = replay_head->packet_id; + replay_head = replay_head->next; + } + lock.lock(); //Check for more packets + //TODO: Check for timeout? + } while(audio_codec.process_pending); } if(reschedule) { - log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"), chrono::duration_cast(max_time).count()); + log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"), + chrono::duration_cast(max_time).count()); audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } } -#define MAX_LOST_PACKETS (6) -//Note: This function must be executed single threaded -void VoiceClient::process_encoded_buffer(const std::unique_ptr &buffer) { +void VoiceClient::initialize_code(const codec::value &audio_codec) { string error; - auto& codec_data = this->codec[buffer->codec]; - - if(!codec_data) { - auto info = codec::get_info(buffer->codec); - if(!info || !info->supported) { - log_warn(category::voice_connection, tr("Received voice packet from client {}, but we dont support it ({})"), this->_client_id, buffer->codec); - return; - } - - auto instance = make_unique(); - instance->successfully_initialized = false; - - instance->last_packet_id = (uint16_t) (buffer->packet_id - 1); /* could be 0xFFFF */ - instance->converter = info->new_converter(error); - if(!instance->converter) { - codec_data = move(instance); - - log_warn(category::voice_connection, tr("Failed to initialize new codec {} for client {}: {}"), buffer->codec, this->_client_id, error); - return; - } - - instance->resampler = make_shared(instance->converter->sample_rate(), this->output_source->sample_rate, instance->converter->channels()); - if(!instance->resampler->valid()) { - codec_data = move(instance); - - log_warn(category::voice_connection, tr("Failed to initialize new codec resampler {} for client {}"), buffer->codec, this->_client_id); - return; - } - instance->successfully_initialized = true; - codec_data = move(instance); - - log_trace(category::voice_connection, tr("Initalized autio codec {} for client {}"), buffer->codec, this->_client_id); - } else if(!codec_data->successfully_initialized) { - log_trace(category::voice_connection, tr("Dropping auto packet for failed initialized codec {} for client {}"), buffer->codec, this->_client_id); - return; /* already failed ignore that stuff */ - } - - uint16_t diff; - bool premature = false; - - if(codec_data->last_packet_timestamp + chrono::seconds(1) < buffer->receive_timestamp || this->_state >= state::stopping) { - diff = 0xFFFF; - } else { - if(codec_data->last_packet_id > buffer->packet_id) { - auto local_index = (uint16_t) (codec_data->last_packet_id + MAX_LOST_PACKETS); - if(local_index < buffer->packet_id) - diff = 0xFF; /* we got in a new generation */ - else { - /* - log_warn(category::audio, - tr("Received voice packet for client {} with is older than the last we received (Current index: {}, Packet index: {}). Dropping packet."), - this->_client_id, buffer->packet_id, codec_data->last_packet_id - ); - */ - return; - } - } else { - diff = buffer->packet_id - codec_data->last_packet_id; - } - } - - const auto old_packet_id = codec_data->last_packet_id; - codec_data->last_packet_timestamp = buffer->receive_timestamp; - - if(buffer->buffer.empty()) { - /* lets playback the last samples and we're done */ - this->set_state(state::stopping); - - /* enqueue all premature packets (list should be already ordered!) */ - { - unique_lock buffer_lock(this->audio_decode_queue_lock); - for(const auto& packet : codec_data->premature_packets) - this->output_source->enqueue_samples(packet.buffer); - codec_data->premature_packets.clear(); - } - - log_trace(category::voice_connection, tr("Stopping replay for client {}. Empty buffer!"), this->_client_id); + auto& codec_data = this->codec[audio_codec]; + if(codec_data.state != AudioCodec::State::UNINITIALIZED) { + log_warn(category::voice_connection, tr("Could not initialize codec of type {} because it isn't in uninitialized state anymore!"), (int) codec_data.state); return; } - if(diff == 0) { - //Duplicated packets - log_warn(category::audio, tr("Received voice packet with the same ID then the last one. Dropping packet.")); + codec_data.codec = audio_codec; + + auto info = codec::get_info(audio_codec); + if(!info || !info->supported) { + log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Codec is not supported"), audio_codec, this->_client_id); + codec_data.state = AudioCodec::State::UNSUPPORTED; return; - } else - diff--; /* because the diff is normally 1 (ofc) */ - - if(diff <= MAX_LOST_PACKETS) { - if(diff > 0) { - /* lets first handle packet as "lost", even thou we're enqueueing it as premature */ - //auto status = codec_data->converter->decode_lost(error, diff); - //if(status < 0) - // log_warn(category::voice_connection, tr("Failed to decode (skip) dropped packets. Return code {} => {}"), status, error); - premature = !buffer->head && this->state() != state::stopped; - - log_debug(category::voice_connection, - tr("Client {} dropped one or more audio packets. Old packet id: {}, New packet id: {}, Diff: {}. Head: {}. Flagging chunk as premature: {}"), - this->_client_id, old_packet_id, buffer->packet_id, diff, buffer->head, premature); - } - } else { - log_debug(category::voice_connection, tr("Client {} resetted decoder. Old packet id: {}, New packet id: {}, diff: {}"), this->_client_id, old_packet_id, buffer->packet_id, diff); - codec_data->converter->reset_decoder(); - if(!codec_data->converter) { - log_warn(category::voice_connection, tr("Failed to reset codec decoder {} for client {}: {}"), buffer->codec, this->_client_id, error); - return; - } } - if(!premature) - codec_data->last_packet_id = buffer->packet_id; + codec_data.state = AudioCodec::State::INITIALIZED_FAIL; + codec_data.converter = info->new_converter(error); + if(!codec_data.converter) { + log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize decoder: {}"), audio_codec, this->_client_id, error); + return; + } + + codec_data.resampler = make_shared(codec_data.converter->sample_rate(), this->output_source->sample_rate, codec_data.converter->channels()); + if(!codec_data.resampler->valid()) { + log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize resampler"), audio_codec, this->_client_id); + return; + } + + codec_data.state = AudioCodec::State::INITIALIZED_SUCCESSFULLY; + log_trace(category::voice_connection, tr("Successfully initialized codec {} for client {}."), audio_codec, this->_client_id); +} + +std::shared_ptr VoiceClient::decode_buffer(const codec::value &audio_codec, const pipes::buffer_view &buffer) { + auto& codec_data = this->codec[audio_codec]; + if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) { + log_trace(category::audio, tr("Cant decode auto buffer of codec {} because codec isn't successfully initialized (state: {})"), audio_codec, (int) codec_data.state); + return nullptr; + } + + string error; char target_buffer[target_buffer_length]; - if(target_buffer_length < codec_data->converter->expected_decoded_length(buffer->buffer.data_ptr(), buffer->buffer.length())) { - log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, codec_data->converter->expected_decoded_length(buffer->buffer.data_ptr(), buffer->buffer.length())); - return; + if(target_buffer_length < codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())) { + log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())); + return nullptr; } - auto samples = codec_data->converter->decode(error, buffer->buffer.data_ptr(), buffer->buffer.length(), target_buffer); + auto samples = codec_data.converter->decode(error, buffer.data_ptr(), buffer.length(), target_buffer); if(samples < 0) { log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error); - return; + return nullptr; } - if(target_buffer_length < codec_data->resampler->estimated_output_size(samples) * codec_data->resampler->channels() * 4) { - log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, (codec_data->resampler->estimated_output_size(samples) * codec_data->resampler->channels() * 4)); - return; + if(target_buffer_length < codec_data.resampler->estimated_output_size(samples) * codec_data.resampler->channels() * 4) { + log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, (codec_data.resampler->estimated_output_size(samples) * codec_data.resampler->channels() * 4)); + return nullptr; } - auto resampled_samples = codec_data->resampler->process(target_buffer, target_buffer, samples); + auto resampled_samples = codec_data.resampler->process(target_buffer, target_buffer, samples); if(resampled_samples <= 0) { log_warn(category::voice_connection, tr("Failed to resample audio data. Resampler resulted in {}"), resampled_samples); - return; + return nullptr; } - if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count, target_buffer, codec_data->resampler->channels(), resampled_samples)) { + if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count, target_buffer, codec_data.resampler->channels(), resampled_samples)) { log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!")); - return; + return nullptr; } if(this->_volume != 1) { @@ -531,58 +619,11 @@ void VoiceClient::process_encoded_buffer(const std::unique_ptr &b *(buf++) *= this->_volume; } - if(premature) { - auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count, (uint16_t) resampled_samples); + auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count, (uint16_t) resampled_samples); - audio_buffer->sample_index = 0; - memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count * resampled_samples * 4); - - { - unique_lock buffer_lock(this->audio_decode_queue_lock); - auto it = codec_data->premature_packets.begin(); - for(; it != codec_data->premature_packets.end(); it++) { - if(it->packet_id > buffer->packet_id) { - break; /* it is set to the right position */ - } - } - codec_data->premature_packets.insert(it, { - buffer->packet_id, - move(audio_buffer) - }); - std::stable_sort(codec_data->premature_packets.begin(), codec_data->premature_packets.end(), [](const PrematureAudioPacket& a, const PrematureAudioPacket& b) { - return a.packet_id < b.packet_id; - }); - } - } else { - auto enqueued = this->output_source->enqueue_samples(target_buffer, resampled_samples); - if(enqueued != resampled_samples) - log_warn(category::voice_connection, tr("Failed to enqueue all samples for client {}. Enqueued {} of {}"), this->_client_id, enqueued, resampled_samples); - this->set_state(state::playing); - this->play_premature_packets = false; - - /* test if any premature got its original place */ - { - unique_lock buffer_lock(this->audio_decode_queue_lock); - - size_t play_count = 0; - while(!codec_data->premature_packets.empty()) { - auto& packet = codec_data->premature_packets[0]; - - //Test if we're able to replay stuff again - if((uint16_t) (codec_data->last_packet_id + 1) < packet.packet_id) - break; //Nothing new - - this->output_source->enqueue_samples(packet.buffer); - codec_data->last_packet_id = packet.packet_id; - codec_data->premature_packets.pop_front(); - play_count++; - } -#ifdef DEBUG_PREMATURE_PACKETS - if(play_count > 0) - log_debug(category::audio, tr("Replayed (id match) {} premature packets for client {}"), play_count, this->_client_id); -#endif - } - } + audio_buffer->sample_index = 0; + memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count * resampled_samples * 4); + return audio_buffer; } void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) { diff --git a/native/serverconnection/src/connection/audio/VoiceClient.h b/native/serverconnection/src/connection/audio/VoiceClient.h index 0400e1a..1738ea9 100644 --- a/native/serverconnection/src/connection/audio/VoiceClient.h +++ b/native/serverconnection/src/connection/audio/VoiceClient.h @@ -88,39 +88,60 @@ namespace tc { inline std::shared_ptr output_stream() { return this->output_source; } private: - struct PrematureAudioPacket { - uint16_t packet_id = 0; - std::shared_ptr buffer{}; + struct EncodedBuffer { + bool head{false}; + bool reset_decoder{false}; + + std::chrono::system_clock::time_point receive_timestamp; + + pipes::buffer buffer; + codec::value codec{codec::MIN}; + + uint16_t packet_id{0}; + EncodedBuffer* next{nullptr}; }; struct AudioCodec { - uint16_t last_packet_id = 0; + enum struct State { + UNINITIALIZED, + INITIALIZED_SUCCESSFULLY, + INITIALIZED_FAIL, + UNSUPPORTED, + }; + + codec::value codec{}; + + uint16_t last_packet_id{0xFFFF}; /* the first packet id is 0 so one packet before is 0xFFFF */ std::chrono::system_clock::time_point last_packet_timestamp; - bool successfully_initialized; - std::shared_ptr converter; - std::shared_ptr resampler; + inline std::chrono::system_clock::time_point stream_timeout() { + return this->last_packet_timestamp + std::chrono::milliseconds{1000}; + } - std::deque premature_packets; + State state{State::UNINITIALIZED}; + std::shared_ptr converter{nullptr}; + std::shared_ptr resampler{nullptr}; + + std::mutex pending_lock{}; + EncodedBuffer* pending_buffers{nullptr}; + + /* forces all packets which are within the next chain to replay until (inclusive) force_replay is reached */ + EncodedBuffer* force_replay{nullptr}; + + bool process_pending{false}; }; - std::array, codec::MAX + 1> codec{ - nullptr, - nullptr, - nullptr, - nullptr, - nullptr - }; + std::array codec{}; + void initialize_code(const codec::value& /* codec */); + std::shared_ptr output_source; std::weak_ptr _ref; v8::Persistent _js_handle; - uint16_t _client_id; + uint16_t _client_id{0}; float _volume = 1.f; - bool play_premature_packets = false; - std::chrono::system_clock::time_point _last_received_packet; state::value _state = state::stopped; inline void set_state(state::value value) { @@ -131,20 +152,12 @@ namespace tc { this->on_state_changed(); } - struct EncodedBuffer { - bool head; - uint16_t packet_id; - pipes::buffer buffer; - codec::value codec; - std::chrono::system_clock::time_point receive_timestamp; - }; - std::atomic_bool audio_decode_event_dropped{false}; - std::mutex audio_decode_queue_lock; - std::deque> audio_decode_queue; void event_execute(const std::chrono::system_clock::time_point &point) override; void event_execute_dropped(const std::chrono::system_clock::time_point &point) override; - void process_encoded_buffer(const std::unique_ptr& /* buffer */); + + /* its recommend to call this in correct packet oder */ + std::shared_ptr decode_buffer(const codec::value& /* codec */,const pipes::buffer_view& /* buffer */); }; diff --git a/native/serverconnection/src/connection/ft/FileTransferManager.cpp b/native/serverconnection/src/connection/ft/FileTransferManager.cpp index 9ca385f..606fabe 100644 --- a/native/serverconnection/src/connection/ft/FileTransferManager.cpp +++ b/native/serverconnection/src/connection/ft/FileTransferManager.cpp @@ -3,13 +3,14 @@ #include #include -#include #include #ifndef WIN32 #include #include - #define IPPROTO_TCP (0) + #ifndef IPPROTO_TCP + #define IPPROTO_TCP (0) + #endif #else #include #define SOCK_NONBLOCK (0) diff --git a/native/serverconnection/src/connection/ft/FileTransferManager.h b/native/serverconnection/src/connection/ft/FileTransferManager.h index 150d12b..6e959ef 100644 --- a/native/serverconnection/src/connection/ft/FileTransferManager.h +++ b/native/serverconnection/src/connection/ft/FileTransferManager.h @@ -13,12 +13,11 @@ #include #include #include +#include "../../logger.h" #if NODEJS_API #include #include -#include "../../logger.h" - #endif namespace tc { diff --git a/native/serverconnection/src/hwuid.cpp b/native/serverconnection/src/hwuid.cpp index e413956..90e91bc 100644 --- a/native/serverconnection/src/hwuid.cpp +++ b/native/serverconnection/src/hwuid.cpp @@ -214,12 +214,4 @@ std::string system_uuid() { if(!generate_uuid(_cached_system_uuid, _cached_system_uuid_cksm)) _cached_system_uuid = ""; return _cached_system_uuid; -} - -int main() { - std::cout << "UUID: " << system_uuid() << "\n"; - return 1; -} - -//ADaX2mIRrC1uv83h -//NEg5KzRIOSs0SDkrNEg5Kw== \ No newline at end of file +} \ No newline at end of file diff --git a/native/serverconnection/src/logger.cpp b/native/serverconnection/src/logger.cpp index 7ae6faf..94a130f 100644 --- a/native/serverconnection/src/logger.cpp +++ b/native/serverconnection/src/logger.cpp @@ -1,10 +1,15 @@ -#include #include #include "logger.h" /* Basic */ void(*logger::_force_log)(logger::category::value, logger::level::level_enum /* lvl */, const std::string_view& /* message */); +void force_log_raw(logger::category::value, spdlog::level::level_enum level, const std::string_view &message); +void force_log_node(logger::category::value, spdlog::level::level_enum, const std::string_view &); + +#ifdef NODEJS_API +#include + /* NODE JS */ struct LogMessage { uint8_t level; @@ -19,10 +24,6 @@ LogMessage* log_messages_head = nullptr; LogMessage** log_messages_tail = &log_messages_head; Nan::callback_t<> log_messages_callback; -void force_log_node(logger::category::value, spdlog::level::level_enum, const std::string_view &); - -/* Normal */ -void force_log_raw(logger::category::value, spdlog::level::level_enum level, const std::string_view &message); struct StdExternalStringResourceBase : public v8::String::ExternalOneByteStringResource { public: @@ -88,10 +89,6 @@ void logger::initialize_node() { logger::_force_log = force_log_node; } -void logger::initialize_raw() { - logger::_force_log = force_log_raw; -} - void force_log_node(logger::category::value category, spdlog::level::level_enum level, const std::string_view &message) { auto entry = new LogMessage{}; entry->level = level; @@ -107,6 +104,12 @@ void force_log_node(logger::category::value category, spdlog::level::level_enum log_messages_callback(); } +#endif + +void logger::initialize_raw() { + logger::_force_log = force_log_raw; +} + void force_log_raw(logger::category::value category, spdlog::level::level_enum level, const std::string_view &message) { std::cout << "[" << level << "][" << category << "] " << message << std::endl; } diff --git a/native/serverconnection/test/audio/main.cpp b/native/serverconnection/test/audio/main.cpp index c3a6841..ce95a62 100644 --- a/native/serverconnection/test/audio/main.cpp +++ b/native/serverconnection/test/audio/main.cpp @@ -7,20 +7,14 @@ #include #include -#include "portaudio.h" -#ifndef WIN32 - #include -#endif #include "../../src/audio/AudioOutput.h" #include "../../src/audio/AudioInput.h" #include "../../src/audio/filter/FilterVad.h" #include "../../src/audio/filter/FilterThreshold.h" +#include "../../src/audio/Audio.h" #ifdef WIN32 #include -#if PA_USE_ASIO - #include "pa_asio.h" -#endif #endif using namespace std; @@ -28,31 +22,29 @@ using namespace tc; int main() { string error; - PaError err; - -#ifndef WIN32 - PaJack_SetClientName("TeaClient-Test"); -#endif - err = Pa_Initialize(); - if( err != paNoError ) - { - printf( "ERROR: Pa_Initialize returned 0x%x\n", err ); - return 0; + if(!tc::audio::initialize(error)) { + cerr << "Failed to initialize audio: " << error << endl; + return 1; } - auto playback_manager = audio::AudioOutput(2, 48000); if(!playback_manager.open_device(error, Pa_GetDefaultOutputDevice())) { cerr << "Failed to open output device (" << error << ")" << endl; return 1; } - playback_manager.playback(); + if(!playback_manager.playback()) { + cerr << "failed to start playback" << endl; + return 1; + } auto input = audio::AudioInput(2, 48000); if(!input.open_device(error, Pa_GetDefaultInputDevice())) { cerr << "Failed to open input device (" << error << ")" << endl; return 1; } - input.record(); + if(!input.record()) { + cerr << "failed to start record" << endl; + return 1; + } { auto consumer = input.create_consumer(960); @@ -70,6 +62,8 @@ int main() { } consumer->on_read = [target_stream, vad_handler, threshold_filter](const void* buffer, size_t samples) { + target_stream->enqueue_samples(buffer, samples); + cout << "T: " << threshold_filter->analyze(buffer, 0) << endl; if(vad_handler->process(buffer)) { cout << "Read " << samples << endl; @@ -81,7 +75,7 @@ int main() { cout << "Read started" << endl; } - this_thread::sleep_for(chrono::seconds(10)); + this_thread::sleep_for(chrono::seconds(60)); /* while(true) { @@ -89,5 +83,5 @@ int main() { } */ Pa_Terminate(); - return err; + return 1; } \ No newline at end of file diff --git a/native/serverconnection/test/audio/sio.cpp b/native/serverconnection/test/audio/sio.cpp new file mode 100644 index 0000000..09d38fb --- /dev/null +++ b/native/serverconnection/test/audio/sio.cpp @@ -0,0 +1,136 @@ +#include + +#include +#include +#include +#include +#include +#include + +using namespace std; +static const float PI = 3.1415926535f; +static float seconds_offset = 0.0f; + +auto next_write = chrono::system_clock::now(); +static void write_callback(struct SoundIoOutStream *outstream, + int frame_count_min, int frame_count_max) +{ + const struct SoundIoChannelLayout *layout = &outstream->layout; + float float_sample_rate = outstream->sample_rate; + float seconds_per_frame = 1.0f / float_sample_rate; + struct SoundIoChannelArea *areas; + int frames_left = 960 ; + int err; + + if(next_write > chrono::system_clock::now()) { + return; + } + cout << frame_count_min << "/" << frame_count_max << endl; + next_write = chrono::system_clock::now() + chrono::milliseconds{20}; + + while (frames_left > 0) { + int frame_count = frames_left; + + if ((err = soundio_outstream_begin_write(outstream, &areas, &frame_count))) { + fprintf(stderr, "%s\n", soundio_strerror(err)); + exit(1); + } + + if (!frame_count) + break; + + float pitch = 440.0f; + float radians_per_second = pitch * 2.0f * PI; + for (int frame = 0; frame < frame_count; frame += 1) { + float sample = sinf((seconds_offset + frame * seconds_per_frame) * radians_per_second); + for (int channel = 0; channel < layout->channel_count; channel += 1) { + float *ptr = (float*)(areas[channel].ptr + areas[channel].step * frame); + *ptr = sample; + } + } + seconds_offset = fmodf(seconds_offset + + seconds_per_frame * frame_count, 1.0f); + + if ((err = soundio_outstream_end_write(outstream))) { + fprintf(stderr, "%s\n", soundio_strerror(err)); + exit(1); + } + + frames_left -= frame_count; + } + + cout << "FLeft: " << frames_left << endl; +} + +int main(int argc, char **argv) { + int err; + + struct SoundIo *soundio = soundio_create(); + if (!soundio) { + fprintf(stderr, "out of memory\n"); + return 1; + } + + if ((err = soundio_connect(soundio))) { + fprintf(stderr, "error connecting: %s", soundio_strerror(err)); + return 1; + } + + + soundio_flush_events(soundio); + cout << "BCound: " << soundio_backend_count(soundio) << endl; + for(int i = 0; i < soundio_backend_count(soundio); i++) + cout << i << " => " << soundio_backend_name(soundio_get_backend(soundio, i)) << endl; + + for(int i = 0; i < soundio_input_device_count(soundio); i++) { + auto dev = soundio_get_input_device(soundio, i); + cout << dev->name << " - " << dev->id << endl; + } + + + int default_out_device_index = soundio_default_output_device_index(soundio); + if (default_out_device_index < 0) { + fprintf(stderr, "no output device found"); + return 1; + } + + struct SoundIoDevice *device = soundio_get_output_device(soundio, default_out_device_index); + if (!device) { + fprintf(stderr, "out of memory"); + return 1; + } + fprintf(stderr, "Output device: %s\n", device->name); + + for(int i = 0; i < 1; i++) { + struct SoundIoOutStream *outstream = soundio_outstream_create(device); + outstream->format = SoundIoFormatFloat32LE; + outstream->write_callback = write_callback; + outstream->software_latency = 0.02; + outstream->underflow_callback = [](auto ptr) { + cout << "Underflow" << endl; + }; + outstream->error_callback = [](auto ptr, auto code) { + cout << "Error:" << code << endl; + }; + if ((err = soundio_outstream_open(outstream))) { + fprintf(stderr, "unable to open device: %s", soundio_strerror(err)); + return 1; + } + + if (outstream->layout_error) + fprintf(stderr, "unable to set channel layout: %s\n", soundio_strerror(outstream->layout_error)); + + if ((err = soundio_outstream_start(outstream))) { + fprintf(stderr, "unable to start device: %s", soundio_strerror(err)); + return 1; + } + } + + for (;;) + soundio_wait_events(soundio); + + //soundio_outstream_destroy(outstream); + soundio_device_unref(device); + soundio_destroy(soundio); + return 0; +} \ No newline at end of file