From d5ffbff3be5d536029282cc450e7a131a56c79ff Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Tue, 22 Oct 2019 18:39:52 +0200 Subject: [PATCH] Fixed voice disconnecting --- .../client/voice/VoiceClientConnection.cpp | 162 ++++++++++-------- .../src/client/voice/VoiceClientConnection.h | 60 ++++++- server/src/server/VoiceServer.cpp | 2 +- 3 files changed, 145 insertions(+), 79 deletions(-) diff --git a/server/src/client/voice/VoiceClientConnection.cpp b/server/src/client/voice/VoiceClientConnection.cpp index 1fcbc6c..ed5db31 100644 --- a/server/src/client/voice/VoiceClientConnection.cpp +++ b/server/src/client/voice/VoiceClientConnection.cpp @@ -42,9 +42,12 @@ VoiceClientConnection::~VoiceClientConnection() { lock_guard write_queue_lock(this->write_queue_lock); this->write_queue.clear(); } - { - lock_guard write_queue_lock(this->write_prepare_queue_lock); - this->write_prepare_queue.clear(); + + for(auto& category : this->write_preprocess_queues) { + lock_guard work_lock{category.work_lock}; + lock_guard queue_lock{category.queue_lock}; + + category.queue.clear(); } this->client = nullptr; memtrack::freed(this); @@ -451,7 +454,8 @@ unique_ptr VoiceClientConnection::next_reassembled_packe void VoiceClientConnection::sendPacket(const shared_ptr& original_packet, bool copy, bool prepare_directly) { - if(this->client->state == ConnectionState::DISCONNECTED) return; + if(this->client->state == ConnectionState::DISCONNECTED) + return; shared_ptr packet; if(copy) { @@ -459,16 +463,23 @@ void VoiceClientConnection::sendPacket(const shared_ptr& if(original_packet->getListener()) packet->setListener(std::move(original_packet->getListener())); packet->memory_state.flags = original_packet->memory_state.flags; - } else + } else { packet = original_packet; + } + auto type = WritePreprocessCategory::from_type(packet->type().type()); + auto& queue = this->write_preprocess_queues[type]; if(prepare_directly) { vector buffers; this->prepare_process_count++; - if(!this->prepare_packet_for_write(buffers, packet)) { - logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); - this->prepare_process_count--; - return; + + { + unique_lock work_lock{queue.work_lock}; + if(!this->prepare_packet_for_write(buffers, packet, work_lock)) { + logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); + this->prepare_process_count--; + return; + } } /* enqueue buffers for write */ @@ -478,13 +489,16 @@ void VoiceClientConnection::sendPacket(const shared_ptr& } this->prepare_process_count--; /* we're now done preparing */ } else { - lock_guard prepare_queue_lock(this->write_prepare_queue_lock); - this->write_prepare_queue.push_back(packet); + lock_guard queue_lock{queue.queue_lock}; + queue.queue.push_back(packet); + queue.has_work = true; } this->triggerWrite(); } -bool VoiceClientConnection::prepare_packet_for_write(vector &result, const shared_ptr &packet) { +bool VoiceClientConnection::prepare_packet_for_write(vector &result, const shared_ptr &packet, std::unique_lock& work_lock) { + assert(work_lock.owns_lock()); + string error = "success"; if(packet->type().compressable() && !packet->memory_state.fragment_entry) { @@ -498,7 +512,7 @@ bool VoiceClientConnection::prepare_packet_for_write(vector &resu std::vector> fragments; fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1); - if(packet->data().length() > packet->type().max_length()){ + if(packet->data().length() > packet->type().max_length()) { if(!packet->type().fragmentable()) { logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length()); return false; @@ -534,21 +548,21 @@ bool VoiceClientConnection::prepare_packet_for_write(vector &resu if(packet->getListener()) fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :) - } else + } else { fragments.push_back(packet); + } + result.reserve(fragments.size()); /* apply packet ids */ - { - lock_guard id_lock(this->packet_id_manager_lock); - for(const auto& fragment : fragments) { - if(!fragment->memory_state.id_branded) - fragment->applyPacketId(this->packet_id_manager); - } + for(const auto& fragment : fragments) { + if(!fragment->memory_state.id_branded) + fragment->applyPacketId(this->packet_id_manager); } + work_lock.unlock(); /* the rest could be unordered */ - auto statistics = this->client && this->client->getServer() ? this->client->connectionStatistics : nullptr; + auto statistics = this->client ? this->client->connectionStatistics : nullptr; for(const auto& fragment : fragments) { if(!this->crypt_handler.progressPacketOut(fragment.get(), error, false)){ logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); @@ -565,50 +579,50 @@ bool VoiceClientConnection::prepare_packet_for_write(vector &resu return true; } -#ifdef VC_USE_READ_QUEUE -bool VoiceClientConnection::handleNextDatagram() { - - bool flag_empty; - pipes::buffer buffer; - { - lock_guard queue_lock(this->queueLock); - if(this->readQueue.empty()) return false; - buffer = std::move(this->readQueue.front()); - this->readQueue.pop_front(); - flag_empty = this->readQueue.empty(); - }; - - try { - this->handleDatagramReceived(buffer); - } catch (std::exception& e) { - logCritical(this->client->getServerId(), "Handling of raw packet thrown an uncaught exception! (Message: " + string(e.what()) + ")"); - } - return flag_empty; -} -#endif - -bool VoiceClientConnection::prepare_write_packets() { - /* get the next packet to prepare */ - unique_lock prepare_queue_lock(this->write_prepare_queue_lock); - if(this->write_prepare_queue.empty()) - return false; +bool VoiceClientConnection::preprocess_write_packets() { + std::shared_ptr packet{nullptr}; + vector buffers{}; + bool flag_more{false}; prepare_process_count++; /* we're not preparing a packet */ - auto packet = move(this->write_prepare_queue.front()); - this->write_prepare_queue.pop_front(); - auto flag_more = !this->write_prepare_queue.empty(); - prepare_queue_lock.unlock(); + for(auto& category : this->write_preprocess_queues) { + if(!category.has_work) continue; + else if(packet) { + flag_more = true; + break; + } - /* prepare the next packet */ - vector buffers; - if(!this->prepare_packet_for_write(buffers, packet)) { - logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); - this->prepare_process_count--; - return flag_more; + unique_lock work_lock{category.work_lock, try_to_lock}; + if(!work_lock) continue; /* This particular category will already be processed */ + + { + lock_guard buffer_lock{category.queue_lock}; + if(category.queue.empty()) { + category.has_work = false; + continue; + } + + packet = std::move(category.queue.front()); + category.queue.pop_front(); + category.has_work = !category.queue.empty(); + } + + if(!this->prepare_packet_for_write(buffers, packet, work_lock)) { + logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); + if(flag_more) + break; + else + continue; /* find out if we have more */ + } + + if(flag_more) + break; + else + continue; /* find out if we have more */ } /* enqueue buffers for write */ - { + if(!buffers.empty()) { lock_guard write_queue_lock(this->write_queue_lock); this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end()); } @@ -646,12 +660,18 @@ int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) { bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point until) { while(true) { - { - lock_guard prepare_lock{this->write_prepare_queue_lock}; - if(!this->write_prepare_queue.empty()) - goto _wait; - if(this->prepare_process_count != 0) - goto _wait; + for(auto& queue : this->write_preprocess_queues) { + { + lock_guard lock{queue.queue_lock}; + if(!queue.queue.empty()) + goto _wait; + } + + { + unique_lock lock{queue.work_lock, try_to_lock}; + if(!lock.owns_lock()) + goto _wait; + } } { lock_guard buffer_lock{this->write_queue_lock}; @@ -672,12 +692,16 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin } void VoiceClientConnection::reset() { + for(auto& queue : this->write_preprocess_queues) { + { + lock_guard lock{queue.queue_lock}; + queue.queue.clear(); + } + } + this->acknowledge_handler.reset(); this->crypt_handler.reset(); - { - lock_guard manager_lock(this->packet_id_manager_lock); - this->packet_id_manager.reset(); - } + this->packet_id_manager.reset(); { lock_guard buffer_lock(this->packet_buffer_lock); diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index 25fa433..e719ca7 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -55,7 +55,7 @@ namespace ts { * Split packets waiting in write_process_queue and moves the final buffers to writeQueue. * @returns true when there are more packets to prepare */ - bool prepare_write_packets(); + bool preprocess_write_packets(); /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ int pop_write_buffer(pipes::buffer& /* buffer */); @@ -84,22 +84,64 @@ namespace ts { std::unique_ptr next_reassembled_packet(std::unique_lock& /* packet channel execute lock */, bool& /* have more */); -#ifdef VC_USE_READ_QUEUE - std::deque readQueue; -#endif - + /* ---------- Write declarations ---------- */ spin_lock write_queue_lock; /* queue access isn't for long in general */ std::deque write_queue; - spin_lock write_prepare_queue_lock; /* preprocess queue access isn't for long in general */ - std::deque> write_prepare_queue; + struct WritePreprocessCategory { + enum value { + PING_PONG = 0, //Ping/Pongs + ACK = 2, + VOICE_WHISPER = 1, //Voice/Whisper + COMMAND = 3, + INIT = 4, - spin_lock packet_id_manager_lock; /* packet id's must be generated in order; Calculating the ID should also not be take too much time */ + MAX = INIT + }; + + inline static value from_type(protocol::PacketType type) { + switch(type) { + case protocol::PING: + case protocol::PONG: + return value::PING_PONG; + + case protocol::VOICE: + case protocol::VOICE_WHISPER: + return value::VOICE_WHISPER; + + case protocol::ACK: + case protocol::ACK_LOW: + return value::ACK; + + case protocol::COMMAND: + case protocol::COMMAND_LOW: + return value::COMMAND; + + default: + return value::INIT; + } + } + }; + + struct WritePreprocessQueue { + int _zero1{0}; + bool has_work{false}; + std::mutex work_lock{}; + + spin_lock queue_lock{}; + std::deque> queue{}; + + int _zero{0}; + }; + std::array write_preprocess_queues{}; + + /* ---------- Processing ---------- */ + /* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */ protocol::PacketIdManager packet_id_manager; /* this function is thread save :) */ std::atomic prepare_process_count{0}; /* current thread count preparing a packet */ - bool prepare_packet_for_write(std::vector &/* buffers which need to be transferred */, const std::shared_ptr &/* the packet */); + bool prepare_packet_for_write(std::vector &/* buffers which need to be transferred */, const std::shared_ptr &/* the packet */, std::unique_lock& /* work lock */); std::recursive_mutex packet_buffer_lock; packet_buffers_t _packet_buffers; diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index 8ce94f7..7aae25f 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -482,7 +482,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) auto client_ptr = &*client; TIMING_STEP(timings, "client get"); - more_to_prepare = connection->prepare_write_packets(); + more_to_prepare = connection->preprocess_write_packets(); TIMING_STEP(timings, "client prepare"); while(system_clock::now() <= write_timeout) {