From 9af0a76ed67ad41c2f4c8021fce9dcc1dc158402 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Mon, 1 Feb 2021 20:59:03 +0100 Subject: [PATCH] Reschedule packet send on `EAGAIN` --- server/src/FileServerHandler.cpp | 22 +++++++++---- server/src/client/voice/PacketEncoder.cpp | 39 ++++++++++++++--------- server/src/client/voice/PacketEncoder.h | 7 ++-- server/src/server/VoiceServer.cpp | 4 +-- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/server/src/FileServerHandler.cpp b/server/src/FileServerHandler.cpp index 9467003..ad1d21b 100644 --- a/server/src/FileServerHandler.cpp +++ b/server/src/FileServerHandler.cpp @@ -151,8 +151,9 @@ void FileServerHandler::callback_transfer_statistics(const std::shared_ptrgetConnectionStatistics()->logFileTransferOut(statistics.delta_file_bytes_transferred); } - if(client->getType() == ClientType::CLIENT_TEAMSPEAK) + if(client->getType() == ClientType::CLIENT_TEAMSPEAK) { return; /* TS3 does not know this notify */ + } ts::command_builder notify{"notifyfiletransferprogress"}; notify.put_unchecked(0, "clientftfid", transfer->client_transfer_id); @@ -176,7 +177,9 @@ void FileServerHandler::callback_transfer_started(const std::shared_ptrfind_client_by_id(transfer->client_id); - if(!client || client->getUid() != transfer->client_unique_id) return; + if(!client || client->getUid() != transfer->client_unique_id) { + return; + } ts::command_builder notify{"notifyfiletransferstarted"}; @@ -186,14 +189,19 @@ void FileServerHandler::callback_transfer_started(const std::shared_ptr &transfer) { auto server = this->instance_->getVoiceServerManager()->findServerById(transfer->server->server_id()); - if(!server) return; /* well that's bad */ + if(!server) { + return; /* well that's bad */ + } auto client = server->find_client_by_id(transfer->client_id); - if(!client || client->getUid() != transfer->client_unique_id) return; - - - if(client->getType() == ClientType::CLIENT_TEAMSPEAK) + if(!client || client->getUid() != transfer->client_unique_id) { return; + } + + + if(client->getType() == ClientType::CLIENT_TEAMSPEAK) { + return; + } ts::command_builder notify{"notifystatusfiletransfer"}; diff --git a/server/src/client/voice/PacketEncoder.cpp b/server/src/client/voice/PacketEncoder.cpp index 210cc2d..958af42 100644 --- a/server/src/client/voice/PacketEncoder.cpp +++ b/server/src/client/voice/PacketEncoder.cpp @@ -35,10 +35,10 @@ void PacketEncoder::reset() { { std::lock_guard wlock{this->write_queue_mutex}; whead = std::exchange(this->encrypt_queue_head, nullptr); - rhead = std::exchange(this->resend_queue_head, nullptr); + rhead = std::exchange(this->send_queue_head, nullptr); this->encrypt_queue_tail = &this->encrypt_queue_head; - this->send_queue_tail = &this->resend_queue_head; + this->send_queue_tail = &this->send_queue_head; } while(whead) { @@ -238,7 +238,7 @@ void PacketEncoder::encrypt_pending_packets() { auto packet = packets_head; auto packet_tail = packet; while(packet) { - this->prepare_outgoing_packet(packet); + this->encrypt_outgoing_packet(packet); packet = packet->next; if(packet) { @@ -253,7 +253,7 @@ void PacketEncoder::encrypt_pending_packets() { } } -bool PacketEncoder::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) { +bool PacketEncoder::encrypt_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) { if(packet->type_and_flags & protocol::PacketFlag::Unencrypted) { this->crypt_handler_->write_default_mac(packet->mac); } else { @@ -284,18 +284,18 @@ bool PacketEncoder::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket * } PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::OutgoingServerPacket *&result) { - bool need_prepare_packet{false}, more_packets; + bool need_encrypt{false}, more_packets; { std::lock_guard wlock{this->write_queue_mutex}; - if(this->resend_queue_head) { - result = this->resend_queue_head; + if(this->send_queue_head) { + result = this->send_queue_head; if(result->next) { assert(this->send_queue_tail != &result->next); - this->resend_queue_head = result->next; + this->send_queue_head = result->next; } else { assert(this->send_queue_tail == &result->next); - this->resend_queue_head = nullptr; - this->send_queue_tail = &this->resend_queue_head; + this->send_queue_head = nullptr; + this->send_queue_tail = &this->send_queue_head; } } else if(this->encrypt_queue_head) { result = this->encrypt_queue_head; @@ -307,21 +307,30 @@ PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::Outgoin this->encrypt_queue_head = nullptr; this->encrypt_queue_tail = &this->encrypt_queue_head; } - need_prepare_packet = true; + need_encrypt = true; } else { return BufferPopResult::DRAINED; } result->next = nullptr; - more_packets = this->resend_queue_head != nullptr || this->encrypt_queue_head != nullptr; + more_packets = this->send_queue_head != nullptr || this->encrypt_queue_head != nullptr; } - if(need_prepare_packet) { - this->prepare_outgoing_packet(result); + if(need_encrypt) { + this->encrypt_outgoing_packet(result); } return more_packets ? BufferPopResult::MORE_AVAILABLE : BufferPopResult::DRAINED; } +void PacketEncoder::reenqueue_failed_buffer(protocol::OutgoingServerPacket *packet) { + std::lock_guard wlock{this->write_queue_mutex}; + if(!this->send_queue_head) { + this->send_queue_tail = &packet->next; + } + packet->next = this->send_queue_head; + this->send_queue_head = packet; +} + void PacketEncoder::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) { std::deque> buffers{}; std::string error{}; @@ -367,7 +376,7 @@ bool PacketEncoder::wait_empty_write_and_prepare_queue(std::chrono::time_pointencrypt_queue_head) goto _wait; - if(this->resend_queue_head) + if(this->send_queue_head) goto _wait; } break; diff --git a/server/src/client/voice/PacketEncoder.h b/server/src/client/voice/PacketEncoder.h index 112be38..23674f7 100644 --- a/server/src/client/voice/PacketEncoder.h +++ b/server/src/client/voice/PacketEncoder.h @@ -55,6 +55,7 @@ namespace ts::server::server::udp { /* if the result is true, ownership has been transferred */ BufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */); + void reenqueue_failed_buffer(protocol::OutgoingServerPacket* /* packet */); [[nodiscard]] inline auto& acknowledge_manager() { return this->acknowledge_manager_; } @@ -74,8 +75,8 @@ namespace ts::server::server::udp { connection::AcknowledgeManager acknowledge_manager_{}; spin_mutex write_queue_mutex{}; - protocol::OutgoingServerPacket* resend_queue_head{nullptr}; - protocol::OutgoingServerPacket** send_queue_tail{&resend_queue_head}; + protocol::OutgoingServerPacket* send_queue_head{nullptr}; + protocol::OutgoingServerPacket** send_queue_tail{&send_queue_head}; protocol::OutgoingServerPacket* encrypt_queue_head{nullptr}; protocol::OutgoingServerPacket** encrypt_queue_tail{&encrypt_queue_head}; @@ -85,6 +86,6 @@ namespace ts::server::server::udp { /* thread save function */ - bool prepare_outgoing_packet(protocol::OutgoingServerPacket* /* packet */); + bool encrypt_outgoing_packet(protocol::OutgoingServerPacket* /* packet */); }; } diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index 32d71e8..f58c945 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -438,8 +438,8 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) ssize_t res = write_datagram(io, client->remote_address, &client->connection->remote_address_info_, packet->packet_length(), packet->packet_data()); if(res != packet->packet_length()) { if(errno == EAGAIN) { - logCritical(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN).", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); - packet->unref(); + client->connection->packet_encoder().reenqueue_failed_buffer(packet); + logTrace(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN). Rescheduling packet.", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); return; } else if(errno == EINVAL || res == -0xFEB) { /* needs more debug */