diff --git a/server/src/ShutdownHelper.cpp b/server/src/ShutdownHelper.cpp index 062d15b..85b13ac 100644 --- a/server/src/ShutdownHelper.cpp +++ b/server/src/ShutdownHelper.cpp @@ -30,7 +30,8 @@ void ts::server::shutdownInstance(const std::string& message) { threads::name(force_kill, "force stopper"); force_kill.detach(); - kill(0, SIGKILL); + exit(2); + //kill(0, SIGKILL); }); threads::name(hangup_controller, "stop controller"); hangup_controller.detach(); diff --git a/server/src/client/ConnectedClient.cpp b/server/src/client/ConnectedClient.cpp index db7bd4b..3a67619 100644 --- a/server/src/client/ConnectedClient.cpp +++ b/server/src/client/ConnectedClient.cpp @@ -633,12 +633,7 @@ inline void send_channels(ConnectedClient* client, ChannelIT begin, const Channe break; } - if(dynamic_cast(client)) { - auto vc = dynamic_cast(client); - vc->sendCommand0(builder.build(), false, true); /* we need to process this command directly so it will be processed before the channellistfinished stuff */ - } else { - client->sendCommand(builder); - } + client->sendCommand(builder); if(begin != end) send_channels(client, begin, end, override_orderid); } @@ -779,11 +774,7 @@ void ConnectedClient::sendServerInit() { command["pv"] = 6; //Protocol version command["acn"] = this->getDisplayName(); command["aclid"] = this->getClientId(); - if(dynamic_cast(this)) { - dynamic_cast(this)->sendCommand0(command.build(), false, true); /* process it directly so the order for the channellist entries is ensured. (First serverinit then everything else) */ - } else { - this->sendCommand(command); - } + this->sendCommand(command); } bool ConnectedClient::handleCommandFull(Command& cmd, bool disconnectOnFail) { diff --git a/server/src/client/SpeakingClient.cpp b/server/src/client/SpeakingClient.cpp index 32e81ab..c79e798 100644 --- a/server/src/client/SpeakingClient.cpp +++ b/server/src/client/SpeakingClient.cpp @@ -10,7 +10,6 @@ #include "SpeakingClient.h" #include "src/InstanceHandler.h" #include "StringVariable.h" -#include "src/music/MusicBotManager.h" #include "misc/timer.h" using namespace std::chrono; @@ -99,9 +98,10 @@ void SpeakingClient::handlePacketVoice(const pipes::buffer_view& data, bool head memcpy(&buffer[5], &data[3], data.length() - 3); } + auto bview = pipes::buffer_view{buffer, data.length() + 2}; for (const auto& client : target_clients) { auto speaking_client = static_pointer_cast(client); - speaking_client->send_voice_packet(pipes::buffer_view{buffer, data.length() + 2}, flags); + speaking_client->send_voice_packet(bview, flags); } } diff --git a/server/src/client/command_handler/client.cpp b/server/src/client/command_handler/client.cpp index 5b96225..239951c 100644 --- a/server/src/client/command_handler/client.cpp +++ b/server/src/client/command_handler/client.cpp @@ -275,7 +275,7 @@ command_result ConnectedClient::handleCommandClientPoke(Command &cmd) { CMD_CHK_AND_INC_FLOOD_POINTS(25); std::vector> clients{}; - clients.resize(cmd.bulkCount()); + clients.reserve(cmd.bulkCount()); bool is_single_poke = cmd.bulkCount() == 1; for(size_t index{0}; index < cmd.bulkCount(); index++) { diff --git a/server/src/client/command_handler/file.cpp b/server/src/client/command_handler/file.cpp index cb08cbf..91cbf15 100644 --- a/server/src/client/command_handler/file.cpp +++ b/server/src/client/command_handler/file.cpp @@ -118,14 +118,9 @@ command_result ConnectedClient::handleCommandFTGetFileList(Command &cmd) { } if (fileList[0].has("name")) { - if(dynamic_cast(this)) { - dynamic_cast(this)->sendCommand0(fileList.build(), false, true); /* We need to process this directly else the order could get shuffled up! */ + this->sendCommand(fileList); + if(this->getType() != CLIENT_QUERY) this->sendCommand(fileListFinished); - } else { - this->sendCommand(fileList); - if(this->getType() != CLIENT_QUERY) - this->sendCommand(fileListFinished); - } return command_result{error::ok}; } else { return command_result{error::database_empty_result}; diff --git a/server/src/client/voice/PacketStatistics.h b/server/src/client/voice/PacketStatistics.h index 5a93fd3..a8b5ca7 100644 --- a/server/src/client/voice/PacketStatistics.h +++ b/server/src/client/voice/PacketStatistics.h @@ -2,7 +2,7 @@ #include #include -#include +#include namespace ts::server::client { class PacketStatistics { @@ -53,7 +53,7 @@ namespace ts::server::client { private: std::chrono::system_clock::time_point last_short{}; - spin_lock data_mutex{}; + spin_mutex data_mutex{}; protocol::UnorderedPacketLossCalculator calculator_voice_whisper{}; protocol::UnorderedPacketLossCalculator calculator_voice{}; diff --git a/server/src/client/voice/PrecomputedPuzzles.h b/server/src/client/voice/PrecomputedPuzzles.h index 4822af0..e1ade86 100644 --- a/server/src/client/voice/PrecomputedPuzzles.h +++ b/server/src/client/voice/PrecomputedPuzzles.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include namespace ts::server::udp { @@ -33,7 +33,7 @@ namespace ts::server::udp { void generate_puzzle(std::mt19937&); size_t cache_index{0}; - spin_lock cache_lock{}; + spin_mutex cache_lock{}; std::vector> cached_puzzles{}; }; } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClient.cpp b/server/src/client/voice/VoiceClient.cpp index eaf57e0..9d3b469 100644 --- a/server/src/client/voice/VoiceClient.cpp +++ b/server/src/client/voice/VoiceClient.cpp @@ -47,21 +47,8 @@ VoiceClient::~VoiceClient() { memtrack::freed(this); } -void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, bool direct, std::unique_ptr> listener) { - if(cmd.empty()) { - logCritical(this->getServerId(), "{} Attempted to send an empty command!", CLIENT_STR_LOG_PREFIX); - return; - } - - auto packet = make_shared( - low ? protocol::PacketTypeInfo::CommandLow : protocol::PacketTypeInfo::Command, - pipes::buffer_view{(void*) cmd.data(), cmd.length()} - ); - if(low) { - packet->enable_flag(protocol::PacketFlag::NewProtocol); - } - packet->setListener(std::move(listener)); - this->connection->sendPacket(packet, false, direct); +void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, std::unique_ptr> listener) { + this->connection->send_command(cmd, low, std::move(listener)); #ifdef PKT_LOG_CMD logTrace(this->getServerId(), "{}[Command][Server -> Client] Sending command {}. Command low: {}. Full command: {}", CLIENT_STR_LOG_PREFIX, cmd.substr(0, cmd.find(' ')), low, cmd); @@ -71,10 +58,8 @@ void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) { char buffer[2]; le2be16(packetId, buffer); - auto packet = make_shared(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2}); - packet->enable_flag(PacketFlag::Unencrypted); - if(!low) packet->enable_flag(protocol::PacketFlag::NewProtocol); - this->connection->sendPacket(packet); + auto pflags = PacketFlag::Unencrypted | PacketFlag::NewProtocol; + this->connection->send_packet(low ? protocol::PacketType::ACK_LOW : protocol::PacketType::ACK, (PacketFlag::PacketFlag) pflags, buffer, 2); #ifdef PKT_LOG_ACK logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId); #endif @@ -186,7 +171,7 @@ bool VoiceClient::disconnect(ts::ViewReasonId reason_id, const std::string &reas self->close_connection(chrono::system_clock::time_point{}); /* we received the ack, we do not need to flush anything */ }, system_clock::now() + seconds(5)); - this->sendCommand0(cmd.build(), false, false, std::move(listener)); + this->sendCommand0(cmd.build(), false, std::move(listener)); } else { //TODO: Extra case for INIT_HIGH? this->close_connection(chrono::system_clock::now() + chrono::seconds{5}); @@ -284,33 +269,23 @@ void VoiceClient::execute_handle_packet(const std::chrono::system_clock::time_po } void VoiceClient::send_voice_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) { - auto packet = make_shared(PacketTypeInfo::Voice, voice_buffer.length()); - { - PacketFlag::PacketFlags packet_flags = PacketFlag::None; - packet_flags |= flags.encrypted ? 0 : PacketFlag::Unencrypted; - packet_flags |= flags.head ? PacketFlag::Compressed : 0; - packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0; - packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0; - packet->set_flags(packet_flags); - } + PacketFlag::PacketFlags packet_flags{PacketFlag::None}; + packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted; + packet_flags |= flags.head ? PacketFlag::Compressed : 0U; + packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U; + packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U; - memcpy(packet->data().data_ptr(), voice_buffer.data_ptr(), voice_buffer.length()); - this->connection->sendPacket(packet, false, false); + this->connection->send_packet(PacketType::VOICE, packet_flags, voice_buffer.data_ptr(), voice_buffer.length()); } void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) { - auto packet = make_shared(PacketTypeInfo::VoiceWhisper, voice_buffer.length()); - { - PacketFlag::PacketFlags packet_flags = PacketFlag::None; - packet_flags |= flags.encrypted ? 0 : PacketFlag::Unencrypted; - packet_flags |= flags.head ? PacketFlag::Compressed : 0; - packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0; - packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0; - packet->set_flags(packet_flags); - } + PacketFlag::PacketFlags packet_flags{PacketFlag::None}; + packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted; + packet_flags |= flags.head ? PacketFlag::Compressed : 0U; + packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U; + packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U; - memcpy(packet->data().data_ptr(), voice_buffer.data_ptr(), voice_buffer.length()); - this->connection->sendPacket(packet, false, false); + this->connection->send_packet(PacketType::VOICE_WHISPER, packet_flags, voice_buffer.data_ptr(), voice_buffer.length()); } float VoiceClient::current_ping_deviation() { diff --git a/server/src/client/voice/VoiceClient.h b/server/src/client/voice/VoiceClient.h index 55d953c..310d926 100644 --- a/server/src/client/voice/VoiceClient.h +++ b/server/src/client/voice/VoiceClient.h @@ -58,7 +58,7 @@ namespace ts { virtual void sendCommand(const ts::command_builder &command, bool low) { return this->sendCommand0(command.build(), low); } /* Note: Order is only guaranteed if progressDirectly is on! */ - virtual void sendCommand0(const std::string_view& /* data */, bool low = false, bool progressDirectly = false, std::unique_ptr> listener = nullptr); + virtual void sendCommand0(const std::string_view& /* data */, bool low = false, std::unique_ptr> listener = nullptr); virtual void sendAcknowledge(uint16_t packetId, bool low = false); connection::VoiceClientConnection* getConnection(){ return connection; } @@ -80,6 +80,7 @@ namespace ts { void handlePacketCommand(const pipes::buffer_view&); void handlePacketAck(const protocol::ClientPacketParser&); void handlePacketVoice(const protocol::ClientPacketParser&); + void handlePacketVoiceWhisper(const protocol::ClientPacketParser&); void handlePacketPing(const protocol::ClientPacketParser&); void handlePacketInit(const protocol::ClientPacketParser&); diff --git a/server/src/client/voice/VoiceClientConnection.cpp b/server/src/client/voice/VoiceClientConnection.cpp index 0549e40..7c8f9b2 100644 --- a/server/src/client/voice/VoiceClientConnection.cpp +++ b/server/src/client/voice/VoiceClientConnection.cpp @@ -32,23 +32,16 @@ using namespace ts::server; VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : client(client) { memtrack::allocated(this); + this->acknowledge_handler.destroy_packet = [](void* packet) { + reinterpret_cast(packet)->unref(); + }; + this->crypt_handler.reset(); debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this); } VoiceClientConnection::~VoiceClientConnection() { - /* locking here should be useless, but just to ensure! */ - { - lock_guard write_queue_lock(this->write_queue_lock); - this->write_queue.clear(); - } - - for(auto& category : this->write_preprocess_queues) { - lock_guard work_lock{category.work_lock}; - lock_guard queue_lock{category.queue_lock}; - - category.queue.clear(); - } + this->reset(); this->client = nullptr; memtrack::freed(this); } @@ -96,6 +89,12 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b #endif auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW; + + /* in previous versions we checked if the arrived packet is "worth decoding". + * But since in general a command buffer underflow is much more unlikely, especially because most packets are not even command packets, + * it's better we just skip that step and decode it anyways */ + +#if 0 /* pretest if the packet is worth the effort of decoding it */ if(is_command) { /* handle the order stuff */ @@ -120,6 +119,7 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b return; } } +#endif //NOTICE I found out that the Compressed flag is set if the packet contains an audio header @@ -194,7 +194,19 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b unique_lock queue_lock(fragment_buffer.buffer_lock); if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) { - logTrace(this->client->getServerId(), "{} Failed to insert command packet into command packet buffer.", CLIENT_STR_LOG_PREFIX_(this->client)); + auto ignore_type = fragment_buffer.accept_index(packet_parser.packet_id()); + debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})", + CLIENT_STR_LOG_PREFIX_(this->client), + ignore_type == -1 ? "underflow" : "overflow", + fragment_buffer.capacity(), + fragment_buffer.current_index(), + packet_parser.packet_id() + ); + + if(ignore_type == -1) { /* underflow */ + /* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */ + this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); + } return; } } @@ -204,8 +216,10 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b if(voice_server) voice_server->schedule_command_handling(this->client); } else { - if(packet_parser.type() == protocol::VOICE || packet_parser.type() == protocol::VOICE_WHISPER) + if(packet_parser.type() == protocol::VOICE) this->client->handlePacketVoice(packet_parser); + else if(packet_parser.type() == protocol::VOICE_WHISPER) + this->client->handlePacketVoiceWhisper(packet_parser); else if(packet_parser.type() == protocol::ACK || packet_parser.type() == protocol::ACK_LOW) this->client->handlePacketAck(packet_parser); else if(packet_parser.type() == protocol::PING || packet_parser.type() == protocol::PONG) @@ -376,273 +390,155 @@ bool VoiceClientConnection::next_reassembled_command(unique_lockclient->getServerId(), "{} Command packet has a too large compressed size. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); return false; } - auto buffer = buffer::allocate_buffer(decompressed_size); - if(!compression::qlz_decompress_payload(payload.data_ptr(), buffer.data_ptr(), &decompressed_size)) { + auto decompress_buffer = buffer::allocate_buffer(decompressed_size); + if(!compression::qlz_decompress_payload(payload.data_ptr(), decompress_buffer.data_ptr(), &decompressed_size)) { logTrace(this->client->getServerId(), "{} Failed to decompress received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); return false; } - payload = buffer.range(0, decompressed_size); + payload = decompress_buffer.range(0, decompressed_size); } result = std::move(payload); return have_more; } - -void VoiceClientConnection::sendPacket(const shared_ptr& original_packet, bool copy, bool prepare_directly) { - if(this->client->state == ConnectionState::DISCONNECTED) - return; - - shared_ptr packet; - if(copy) { - packet = protocol::ServerPacket::from_buffer(original_packet->buffer().dup(buffer::allocate_buffer(original_packet->buffer().length()))); - if(original_packet->getListener()) - packet->setListener(std::move(original_packet->getListener())); - packet->memory_state.flags = original_packet->memory_state.flags; +bool VoiceClientConnection::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) { + if(packet->type_and_flags & PacketFlag::Unencrypted) { + this->crypt_handler.write_default_mac(packet->mac); } else { - packet = original_packet; - } + CryptHandler::key_t crypt_key{}; + CryptHandler::nonce_t crypt_nonce{}; + std::string error{}; - auto type = WritePreprocessCategory::from_type(packet->type().type()); - auto& queue = this->write_preprocess_queues[type]; - if(prepare_directly) { - vector buffers; - this->prepare_process_count++; - - { - unique_lock work_lock{queue.work_lock}; - if(!this->prepare_packet_for_write(buffers, packet, work_lock)) { - logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); - this->prepare_process_count--; - return; - } - } - - /* enqueue buffers for write */ - { - lock_guard write_queue_lock(this->write_queue_lock); - this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end()); - } - this->prepare_process_count--; /* we're now done preparing */ - } else { - lock_guard queue_lock{queue.queue_lock}; - queue.queue.push_back(packet); - queue.has_work = true; - } - this->triggerWrite(); -} - -bool VoiceClientConnection::prepare_packet_for_write(vector &result, const shared_ptr &packet, std::unique_lock& work_lock) { - assert(work_lock.owns_lock()); - - string error = "success"; - - if(packet->type().compressable() && !packet->memory_state.fragment_entry) { - packet->enable_flag(PacketFlag::Compressed); - if(!this->compress_handler.progressPacketOut(packet.get(), error)) { - logError(this->getClient()->getServerId(), "{} Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}", error); - return false; - } - } - - std::vector> fragments; - fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1); - - if(packet->data().length() > packet->type().max_length()) { - if(!packet->type().fragmentable()) { - logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length()); - return false; - } - - { //Split packets - auto buffer = packet->data(); - - const auto max_length = packet->type().max_length(); - while(buffer.length() > max_length * 2) { - fragments.push_back(make_shared(packet->type(), buffer.view(0, max_length).dup(buffer::allocate_buffer(max_length)))); - buffer = buffer.range((size_t) max_length); - } - - if(buffer.length() > max_length) { //Divide rest by 2 - fragments.push_back(make_shared(packet->type(), buffer.view(0, buffer.length() / 2).dup(buffer::allocate_buffer(buffer.length() / 2)))); - buffer = buffer.range(buffer.length() / 2); - } - fragments.push_back(make_shared(packet->type(), buffer)); - - for(const auto& frag : fragments) { - frag->setFragmentedEntry(true); - frag->enable_flag(PacketFlag::NewProtocol); - } - } - - assert(fragments.size() >= 2); - fragments.front()->enable_flag(PacketFlag::Fragmented); - if(packet->has_flag(PacketFlag::Compressed)) - fragments.front()->enable_flag(PacketFlag::Compressed); - - fragments.back()->enable_flag(PacketFlag::Fragmented); - - if(packet->getListener()) - fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :) - } else { - fragments.push_back(packet); - } - - result.reserve(fragments.size()); - - /* apply packet ids */ - for(const auto& fragment : fragments) { - if(!fragment->memory_state.id_branded) - fragment->applyPacketId(this->packet_id_manager); - - if(fragment->type().type() == protocol::PacketType::COMMAND_LOW || fragment->type().type() == protocol::PacketType::COMMAND) - this->packet_statistics().send_command(fragment->type().type(), fragment->packetId() | fragment->generationId() << 16U); - } - - work_lock.unlock(); /* the rest could be unordered */ - - - CryptHandler::key_t crypt_key{}; - CryptHandler::nonce_t crypt_nonce{}; - auto statistics = this->client ? this->client->connectionStatistics : nullptr; - for(const auto& fragment : fragments) { - if(fragment->has_flag(PacketFlag::Unencrypted)) { - this->crypt_handler.write_default_mac(fragment->mac().data_ptr()); + if(!this->client->crypto.protocol_encrypted) { + crypt_key = CryptHandler::default_key; + crypt_nonce = CryptHandler::default_nonce; } else { - if(!this->client->crypto.protocol_encrypted) { - crypt_key = CryptHandler::default_key; - crypt_nonce = CryptHandler::default_nonce; - } else { - if(!this->crypt_handler.generate_key_nonce(false, fragment->type().type(), fragment->packetId(), fragment->generationId(), crypt_key, crypt_nonce)) { - logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); - return false; - } - } - - auto crypt_result = this->crypt_handler.encrypt(fragment->header().data_ptr(), fragment->header().length(), - fragment->data().data_ptr(), fragment->data().length(), - fragment->mac().data_ptr(), - crypt_key, crypt_nonce, error); - if(!crypt_result){ - logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); + if(!this->crypt_handler.generate_key_nonce(false, (uint8_t) packet->packet_type(), packet->packet_id(), packet->generation, crypt_key, crypt_nonce)) { + logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); return false; } } -#ifndef CONNECTION_NO_STATISTICS - if(statistics) { - auto category = stats::ConnectionStatistics::category::from_type(fragment->type()); - statistics->logOutgoingPacket(category, fragment->length() + 96); /* 96 for the UDP packet overhead */ + auto crypt_result = this->crypt_handler.encrypt((char*) packet->packet_data() + ServerPacketP::kHeaderOffset, ServerPacketP::kHeaderLength, + packet->payload, packet->payload_size, + packet->mac, + crypt_key, crypt_nonce, error); + if(!crypt_result){ + logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); + return false; } -#endif - this->acknowledge_handler.process_packet(*fragment); - result.push_back(fragment->buffer()); } - return true; } -bool VoiceClientConnection::preprocess_write_packets() { - std::shared_ptr packet{nullptr}; - vector buffers{}; - bool flag_more{false}; +VoiceClientConnection::WBufferPopResult VoiceClientConnection::pop_write_buffer(protocol::OutgoingServerPacket *&result) { + if(this->client->state == ConnectionState::DISCONNECTED) + return WBufferPopResult::DRAINED; - prepare_process_count++; /* we're not preparing a packet */ - for(auto& category : this->write_preprocess_queues) { - if(!category.has_work) continue; - else if(packet) { - flag_more = true; - break; - } - - unique_lock work_lock{category.work_lock, try_to_lock}; - if(!work_lock) continue; /* This particular category will already be processed */ - - { - lock_guard buffer_lock{category.queue_lock}; - if(category.queue.empty()) { - category.has_work = false; - continue; + bool need_prepare_packet{false}, more_packets{false}; + { + std::lock_guard wlock{this->write_queue_mutex}; + if(this->resend_queue_head) { + result = this->resend_queue_head; + if(result->next) { + assert(this->resend_queue_tail != &result->next); + this->resend_queue_head = result->next; + } else { + assert(this->resend_queue_tail == &result->next); + this->resend_queue_head = nullptr; + this->resend_queue_tail = &this->resend_queue_head; } - - packet = std::move(category.queue.front()); - category.queue.pop_front(); - category.has_work = !category.queue.empty(); - flag_more = category.has_work; + } else if(this->write_queue_head) { + result = this->write_queue_head; + if(result->next) { + assert(this->write_queue_tail != &result->next); + this->write_queue_head = result->next; + } else { + assert(this->write_queue_tail == &result->next); + this->write_queue_head = nullptr; + this->write_queue_tail = &this->write_queue_head; + } + need_prepare_packet = true; + } else { + return WBufferPopResult::DRAINED; } - - if(!this->prepare_packet_for_write(buffers, packet, work_lock)) { - logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); - if(flag_more) - break; - else - continue; /* find out if we have more */ - } - - if(flag_more) - break; - else - continue; /* find out if we have more */ + result->next = nullptr; + more_packets = this->resend_queue_head != nullptr || this->write_queue_head != nullptr; } - /* enqueue buffers for write */ - if(!buffers.empty()) { - lock_guard write_queue_lock(this->write_queue_lock); - this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end()); - } - this->prepare_process_count--; /* we're now done preparing */ - - return flag_more; + if(need_prepare_packet) + this->prepare_outgoing_packet(result); + return more_packets ? WBufferPopResult::MORE_AVAILABLE : WBufferPopResult::DRAINED; } -int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) { - if(this->client->state == DISCONNECTED) - return 2; +void VoiceClientConnection::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) { + std::deque> buffers{}; + std::string error{}; - lock_guard wqlock{this->write_queue_lock}; - size_t size = this->write_queue.size(); - if(size == 0) - return 2; + if (this->acknowledge_handler.execute_resend(now, next, buffers, error) < 0) { + debugMessage(client->getServerId(), "{} Failed to execute packet resend: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); - target = std::move(this->write_queue.front()); - this->write_queue.pop_front(); - -#ifdef FUZZING_TESTING_OUTGOING - #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE - if (this->client->state == ConnectionState::CONNECTED) { - #endif - if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) { - debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping outgoing packet", CLIENT_STR_LOG_PREFIX_(this->client)); - return 0; + if(this->client->state == ConnectionState::CONNECTED) { + this->client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true); + } else { + this->client->close_connection(system_clock::now() + seconds(1)); } - #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE + } else if(!buffers.empty()) { + size_t send_count{0}; + { + lock_guard wlock{this->write_queue_mutex}; + for(auto& buffer : buffers) { + auto packet = (protocol::OutgoingServerPacket*) buffer->packet_ptr; + if(packet->next) continue; /* still in write queue (this shall not happen very often) */ + if(&packet->next == this->write_queue_tail || &packet->next == this->resend_queue_tail) continue; + + packet->ref(); /* for the write queue again */ + *this->resend_queue_tail = packet; + this->resend_queue_tail = &packet->next; + + send_count++; + this->packet_statistics().send_command((protocol::PacketType) buffer->packet_type, buffer->packet_full_id); + } + } + logTrace(client->getServerId(), "{} Resending {} packets. Send actually {} packets.", CLIENT_STR_LOG_PREFIX_(client), buffers.size(), send_count); + this->triggerWrite(); + } +} + +void VoiceClientConnection::encrypt_write_queue() { + OutgoingServerPacket* packets_head, *packets_tail; + { + std::lock_guard wlock{this->write_queue_mutex}; + packets_head = this->write_queue_head; + this->write_queue_head = nullptr; + this->write_queue_tail = &this->write_queue_head; + } + if(!packets_head) return; + + auto packet = packets_head; + while(packet) { + this->prepare_outgoing_packet(packet); + packets_tail = packet; + packet = packet->next; + } + + { + std::lock_guard wlock{this->write_queue_mutex}; + *this->resend_queue_tail = packets_head; + this->resend_queue_tail = &packets_tail->next; } - #endif -#endif - return size > 1; } bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point until) { while(true) { - for(auto& queue : this->write_preprocess_queues) { - { - lock_guard lock{queue.queue_lock}; - if(!queue.queue.empty()) - goto _wait; - } - - { - unique_lock lock{queue.work_lock, try_to_lock}; - if(!lock.owns_lock()) - goto _wait; - } - } { - lock_guard buffer_lock{this->write_queue_lock}; - if(!this->write_queue.empty()) + std::lock_guard wlock{this->write_queue_mutex}; + if(this->write_queue_head) goto _wait; - if(this->prepare_process_count != 0) + + if(this->resend_queue_head) goto _wait; } break; @@ -657,11 +553,25 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin } void VoiceClientConnection::reset() { - for(auto& queue : this->write_preprocess_queues) { - { - lock_guard lock{queue.queue_lock}; - queue.queue.clear(); + { + std::lock_guard wlock{this->write_queue_mutex}; + auto head = this->write_queue_head; + while(head) { + auto next = head->next; + head->unref(); + head = next; } + this->write_queue_head = nullptr; + this->write_queue_tail = &this->write_queue_head; + + head = this->resend_queue_head; + while(head) { + auto next = head->next; + head->unref(); + head = next; + } + this->resend_queue_head = nullptr; + this->resend_queue_tail = &this->resend_queue_head; } this->acknowledge_handler.reset(); @@ -701,4 +611,165 @@ void VoiceClientConnection::register_initiv_packet() { auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; unique_lock buffer_lock(fragment_buffer.buffer_lock); fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */ +} + +void VoiceClientConnection::send_packet(protocol::OutgoingServerPacket *packet) { + uint32_t full_id; + { + std::lock_guard id_lock{this->packet_id_mutex}; + full_id = this->packet_id_manager.generate_full_id(packet->packet_type()); + } + packet->set_packet_id(full_id & 0xFFFFU); + packet->generation = full_id >> 16U; + + { + std::lock_guard qlock{this->write_queue_mutex}; + *this->write_queue_tail = packet; + this->write_queue_tail = &packet->next; + } + + auto statistics = this->client ? this->client->connectionStatistics : nullptr; + if(statistics) { + auto category = stats::ConnectionStatistics::category::from_type(packet->packet_type()); + statistics->logOutgoingPacket(category, packet->packet_length() + 96); /* 96 for the UDP packet overhead */ + } + + this->triggerWrite(); +} + +void VoiceClientConnection::send_packet(protocol::PacketType type, protocol::PacketFlag::PacketFlags flag, const void *payload, size_t payload_size) { + auto packet = protocol::allocate_outgoing_packet(payload_size); + + packet->type_and_flags = (uint8_t) type | (uint8_t) flag; + memcpy(packet->payload, payload, payload_size); + + this->send_packet(packet); +} + +#define MAX_COMMAND_PACKET_PAYLOAD_LENGTH (487) +void VoiceClientConnection::send_command(const std::string_view &command, bool low, std::unique_ptr> ack_listener) { + bool own_data_buffer{false}; + void* own_data_buffer_ptr; /* imutable! */ + + const char* data_buffer{command.data()}; + size_t data_length{command.length()}; + + uint8_t head_pflags{0}; + PacketType ptype{low ? PacketType::COMMAND_LOW : PacketType::COMMAND}; + protocol::OutgoingServerPacket *packets_head{nullptr}; + protocol::OutgoingServerPacket *packets_tail{nullptr}; + /* only compress "long" commands */ + if(command.size() > 100) { + size_t max_compressed_payload_size = compression::qlz_compressed_size(command.data(), command.length()); + auto compressed_buffer = ::malloc(max_compressed_payload_size); + + size_t compressed_size{max_compressed_payload_size}; + if(!compression::qlz_compress_payload(command.data(), command.length(), compressed_buffer, &compressed_size)) { + logCritical(0, "Failed to compress command packet. Dropping packet"); + ::free(compressed_buffer); + return; + } + + /* we don't need to make the command longer than it is */ + if(compressed_size < command.length()) { + own_data_buffer = true; + data_buffer = (char*) compressed_buffer; + own_data_buffer_ptr = compressed_buffer; + data_length = compressed_size; + head_pflags |= PacketFlag::Compressed; + } else { + ::free(compressed_buffer); + } + } + + uint8_t ptype_and_flags{(uint8_t) ((uint8_t) ptype | (uint8_t) PacketFlag::NewProtocol)}; + if(data_length > MAX_COMMAND_PACKET_PAYLOAD_LENGTH) { + auto chunk_count = (size_t) ceil((float) data_length / (float) MAX_COMMAND_PACKET_PAYLOAD_LENGTH); + auto chunk_size = (size_t) ceil((float) data_length / (float) chunk_count); + auto packet = protocol::allocate_outgoing_packet(chunk_size); + packets_head = packet; + while(true) { + packet->type_and_flags = ptype_and_flags; + + auto bytes = min(chunk_size, data_length); + memcpy(packet->payload, data_buffer, bytes); + + data_length -= bytes; + if(data_length == 0) + break; + data_buffer += bytes; + + packet->next = protocol::allocate_outgoing_packet(bytes); + packet = packet->next; + } + packets_tail = packet; + } else { + auto packet = protocol::allocate_outgoing_packet(data_length); + packet->type_and_flags = ptype_and_flags; + + memcpy(packet->payload, data_buffer, data_length); + + packets_head = packet; + packets_tail = packet; + } + + + { + std::lock_guard id_lock{this->packet_id_mutex}; + uint32_t full_id; + auto head = packets_head; + while(head) { + full_id = this->packet_id_manager.generate_full_id(ptype); + + head->set_packet_id(full_id & 0xFFFFU); + head->generation = full_id >> 16U; + head = head->next; + } + } + + /* if head = tail, fragmented will not be enabled (2x xored) */ + packets_head->type_and_flags |= head_pflags; + packets_head->type_and_flags ^= PacketFlag::Fragmented; + packets_tail->type_and_flags ^= PacketFlag::Fragmented; + + /* do this before the next ptr might get modified due to the write queue */ + auto statistics = this->client ? this->client->connectionStatistics : nullptr; + /* general stats */ + if(statistics) { + auto head = packets_head; + while(head) { + statistics->logOutgoingPacket(stats::ConnectionStatistics::category::COMMAND, head->packet_length() + 96); /* 96 for the UDP overhead */ + head = head->next; + } + } + + /* loss stats */ + { + auto head = packets_head; + while(head) { + auto full_packet_id = (uint32_t) (head->generation << 16U) | head->packet_id(); + this->packet_statistics_.send_command(head->packet_type(), full_packet_id); + + /* increase a reference for the ack handler */ + head->ref(); + + /* Even thou the packet is yet unencrypted, it will be encrypted with the next write. The next write will be before the next resend because the next ptr must be null in order to resend a packet */ + if(head == packets_tail) + this->acknowledge_handler.process_packet(ptype, full_packet_id, head, std::move(ack_listener)); + else + this->acknowledge_handler.process_packet(ptype, full_packet_id, head, nullptr); + + head = head->next; + } + } + + { + std::lock_guard qlock{this->write_queue_mutex}; + *this->write_queue_tail = packets_head; + this->write_queue_tail = &packets_tail->next; + } + this->triggerWrite(); + + if(own_data_buffer) + ::free(own_data_buffer_ptr); } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index 7607f1a..abab5af 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -39,6 +39,11 @@ namespace ts { friend class server::VoiceClient; friend class server::POWHandler; public: + enum struct WBufferPopResult { + DRAINED, + MORE_AVAILABLE + }; + struct CommandFragment { uint16_t packet_id{0}; uint16_t packet_generation{0}; @@ -63,7 +68,10 @@ namespace ts { explicit VoiceClientConnection(server::VoiceClient*); virtual ~VoiceClientConnection(); - void sendPacket(const std::shared_ptr& original_packet, bool copy = false, bool prepare_directly = false); + /* Do not send command packets via send_packet! The send_packet will take ownership of the packet! */ + void send_packet(protocol::OutgoingServerPacket* /* packet */); + void send_packet(protocol::PacketType /* type */, protocol::PacketFlag::PacketFlags /* flags */, const void* /* payload */, size_t /* payload length */); + void send_command(const std::string_view& /* build command command */, bool /* command low */, std::unique_ptr> /* acknowledge listener */); CryptHandler* getCryptHandler(){ return &crypt_handler; } @@ -72,14 +80,11 @@ namespace ts { #ifdef VC_USE_READ_QUEUE bool handleNextDatagram(); #endif - /* - * Split packets waiting in write_process_queue and moves the final buffers to writeQueue. - * @returns true when there are more packets to prepare - */ - bool preprocess_write_packets(); - /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ - int pop_write_buffer(pipes::buffer& /* buffer */); + /* if the result is true, ownership has been transferred */ + WBufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */); + void execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next); + void encrypt_write_queue(); bool wait_empty_write_and_prepare_queue(std::chrono::time_point until = std::chrono::time_point()); protocol::PacketIdManager& getPacketIdManager() { return this->packet_id_manager; } @@ -112,64 +117,22 @@ namespace ts { bool next_reassembled_command(std::unique_lock &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */); - /* ---------- Write declarations ---------- */ - spin_lock write_queue_lock{}; /* queue access isn't for long in general */ - std::deque write_queue{}; + /* ---------- Write ---------- */ + spin_mutex write_queue_mutex{}; + protocol::OutgoingServerPacket* resend_queue_head{nullptr}; + protocol::OutgoingServerPacket** resend_queue_tail{&resend_queue_head}; - struct WritePreprocessCategory { - enum value { - PING_PONG = 0, //Ping/Pongs - ACK = 2, - VOICE_WHISPER = 1, //Voice/Whisper - COMMAND = 3, - INIT = 4, - - MAX = INIT - }; - - inline static value from_type(protocol::PacketType type) { - switch(type) { - case protocol::PING: - case protocol::PONG: - return value::PING_PONG; - - case protocol::VOICE: - case protocol::VOICE_WHISPER: - return value::VOICE_WHISPER; - - case protocol::ACK: - case protocol::ACK_LOW: - return value::ACK; - - case protocol::COMMAND: - case protocol::COMMAND_LOW: - return value::COMMAND; - - default: - return value::INIT; - } - } - }; - - struct WritePreprocessQueue { - int _zero1{0}; - bool has_work{false}; - std::mutex work_lock{}; - - spin_lock queue_lock{}; - std::deque> queue{}; - - int _zero{0}; - }; - std::array write_preprocess_queues{}; + protocol::OutgoingServerPacket* write_queue_head{nullptr}; + protocol::OutgoingServerPacket** write_queue_tail{&write_queue_head}; /* ---------- Processing ---------- */ /* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */ protocol::PacketIdManager packet_id_manager; + spin_mutex packet_id_mutex{}; /* this function is thread save :) */ std::atomic prepare_process_count{0}; /* current thread count preparing a packet */ - bool prepare_packet_for_write(std::vector &/* buffers which need to be transferred */, const std::shared_ptr &/* the packet */, std::unique_lock& /* work lock */); + bool prepare_outgoing_packet(protocol::OutgoingServerPacket* /* packet */); std::array incoming_generation_estimators{}; /* implementation is thread save */ std::recursive_mutex packet_buffer_lock; diff --git a/server/src/client/voice/VoiceClientHandschake.cpp b/server/src/client/voice/VoiceClientHandschake.cpp index bb74be5..2f557be 100644 --- a/server/src/client/voice/VoiceClientHandschake.cpp +++ b/server/src/client/voice/VoiceClientHandschake.cpp @@ -167,7 +167,8 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { } else { this->handshake.state = HandshakeState::SUCCEEDED; /* we're doing the verify via TeamSpeak */ } - this->sendCommand0(initivexpand.build(), false, true); //If we setup the encryption now + this->sendCommand0(initivexpand.build()); //If we setup the encryption now + this->connection->encrypt_write_queue(); } { diff --git a/server/src/client/voice/VoiceClientPacketHandler.cpp b/server/src/client/voice/VoiceClientPacketHandler.cpp index 1047a92..1030cb6 100644 --- a/server/src/client/voice/VoiceClientPacketHandler.cpp +++ b/server/src/client/voice/VoiceClientPacketHandler.cpp @@ -67,17 +67,15 @@ void VoiceClient::handlePacketPing(const protocol::ClientPacketParser& packet) { #endif char buffer[2]; le2be16(packet.packet_id(), buffer); - auto pkt = make_shared(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2}); - pkt->enable_flag(PacketFlag::Unencrypted); - this->connection->sendPacket(pkt); + this->connection->send_packet(PacketType::PONG, PacketFlag::Unencrypted, buffer, 2); } void VoiceClient::handlePacketVoice(const protocol::ClientPacketParser& packet) { - if (packet.type() == protocol::VOICE) { - SpeakingClient::handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); - } else if(packet.type() == protocol::VOICE_WHISPER) { - SpeakingClient::handlePacketVoiceWhisper(packet.payload(), (packet.flags() & PacketFlag::NewProtocol) > 0); - } + SpeakingClient::handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); +} + +void VoiceClient::handlePacketVoiceWhisper(const ts::protocol::ClientPacketParser &packet) { + SpeakingClient::handlePacketVoiceWhisper(packet.payload(), (packet.flags() & PacketFlag::NewProtocol) > 0); } void VoiceClient::handlePacketAck(const protocol::ClientPacketParser& packet) { diff --git a/server/src/client/voice/VoiceClientView.cpp b/server/src/client/voice/VoiceClientView.cpp index c1b6176..6f4c36c 100644 --- a/server/src/client/voice/VoiceClientView.cpp +++ b/server/src/client/voice/VoiceClientView.cpp @@ -7,16 +7,17 @@ using namespace std; using namespace ts::server; using namespace ts::protocol; -extern InstanceHandler* serverInstance; - void VoiceClient::sendPingRequest() { this->lastPingRequest = std::chrono::system_clock::now(); - auto packet = make_shared(PacketTypeInfo::Ping, pipes::buffer_view{}); - packet->enable_flag(PacketFlag::Unencrypted); - this->connection->sendPacket(packet, false, true); /* prepare directly so the packet gets a packet id */ + auto packet = protocol::allocate_outgoing_packet(0); + packet->ref(); /* extra ref for ourself */ - this->lastPingId = packet->packetId(); + packet->type_and_flags = (uint8_t) PacketType::PING | (uint8_t) PacketFlag::Unencrypted; + this->connection->send_packet(packet); + + this->lastPingId = packet->packet_id(); + packet->unref(); #ifdef PKT_LOG_PING logMessage(this->getServerId(), "{}[Ping] Sending a ping request with it {}", CLIENT_STR_LOG_PREFIX, this->lastPingId); diff --git a/server/src/manager/SqlDataManager.cpp b/server/src/manager/SqlDataManager.cpp index 6236000..580627a 100644 --- a/server/src/manager/SqlDataManager.cpp +++ b/server/src/manager/SqlDataManager.cpp @@ -45,7 +45,7 @@ if(!result && result.msg().find(ignore) == string::npos){ #define RESIZE_COLUMN(tblName, rowName, size) up vote EXECUTE("Could not change column size", "ALTER TABLE " tblName " ALTER COLUMN " rowName " varchar(" size ")"); #define CURRENT_DATABASE_VERSION 11 -#define CURRENT_PERMISSION_VERSION 3 +#define CURRENT_PERMISSION_VERSION 4 #define CLIENT_UID_LENGTH "64" #define CLIENT_NAME_LENGTH "128" @@ -631,6 +631,16 @@ bool SqlDataManager::update_permissions(std::string &error) { return false; perm_version(3); + + case 3: + if(!auto_update(permission::update::SERVER_ADMIN, "i_client_poke_max_clients", {20, true}, false, false, {75, true})) + return false; + if(!auto_update(permission::update::QUERY_ADMIN, "i_client_poke_max_clients", {50, true}, false, false, {100, true})) + return false; + if(!auto_update(permission::update::SERVER_NORMAL, "i_client_poke_max_clients", {5, true}, false, false, {0, false})) + return false; + + perm_version(4); default: break; } diff --git a/server/src/server/VoiceIOManager.h b/server/src/server/VoiceIOManager.h index 4a4aeec..9f8abb6 100644 --- a/server/src/server/VoiceIOManager.h +++ b/server/src/server/VoiceIOManager.h @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include namespace ts { @@ -80,7 +80,7 @@ namespace ts { ::event* event_read = nullptr; ::event* event_write = nullptr; - spin_lock write_queue_lock; + spin_mutex write_queue_lock; datagram_packet_t dg_write_queue_head = nullptr; datagram_packet_t dg_write_queue_tail = nullptr; diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index 5bab5d2..0c876f7 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -148,33 +148,11 @@ void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &no lock_guard lock(this->connectionLock); connections = this->activeConnections; } - deque> buffers; string error; for(const auto& client : connections) { auto connection = client->getConnection(); sassert(connection); /* its not possible that a client hasn't a connection! */ - - if (connection->acknowledge_handler.execute_resend(now, next, buffers, error) < 0) { - debugMessage(client->getServerId(), "{} Failed to execute packet resend: {}", CLIENT_STR_LOG_PREFIX_(client), error); - - if(client->state == ConnectionState::CONNECTED) { - client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true); - } else { - client->close_connection(system_clock::now() + seconds(1)); - } - } else if(!buffers.empty()) { - { - lock_guard client_write_lock(connection->write_queue_lock); - for(auto& buf : buffers) - connection->write_queue.push_back(buf->buffer); - } - for(auto& entry : buffers) - connection->packet_statistics().send_command((protocol::PacketType) entry->packet_type, entry->packet_id | entry->generation_id << 16U); - //if(buffers.size() > 0) - // logTrace(client->getServerId(), "{} Resending {} packets.", CLIENT_STR_LOG_PREFIX_(client), buffers.size()); - connection->triggerWrite(); - } - buffers.clear(); + connection->execute_resend(now, next); } } @@ -380,7 +358,7 @@ struct IOData { int file_descriptor = 0; iovec vector{}; struct msghdr message{}; - char message_headers[MHS]; + char message_headers[MHS]{}; IOData() { /* Speed is key here, we dont need to zero paddings! @@ -404,13 +382,13 @@ struct IOData { }; template -inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, io::pktinfo_storage* info, size_t length, void* buffer) { +inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, io::pktinfo_storage* info, size_t length, const void* buffer) { io.message.msg_flags = 0; io.message.msg_name = (void*) &address; io.message.msg_namelen = address.ss_family == AF_INET ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); io.vector.iov_len = length; - io.vector.iov_base = buffer; + io.vector.iov_base = (void*) buffer; if(info) { auto cmsg = CMSG_FIRSTHDR(&io.message); @@ -447,21 +425,20 @@ inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, } void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) { + using WBufferPopResult = connection::VoiceClientConnection::WBufferPopResult; auto event_handle = (io::IOEventLoopEntry*) _event_handle; auto voice_server = event_handle->voice_server; bool retrigger = false; - int buffer_state; IOData<0x100> io{}; io.file_descriptor = fd; - TIMING_START(timings); - TIMING_STEP(timings, "client"); { /* write and process clients */ shared_ptr client; - pipes::buffer buffer; - bool more_clients, more_to_prepare = false, more_to_write = false; + protocol::OutgoingServerPacket* packet; + WBufferPopResult client_wbuffer_state; + bool more_clients; auto write_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ while(system_clock::now() <= write_timeout){ @@ -469,88 +446,71 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) auto client_queue_state = event_handle->pop_voice_write_queue(client); /* we need a new client, the old client has nothing more to do */ if(client_queue_state == 2) break; + + assert(client); more_clients = (bool) client_queue_state; } - auto connection = client->connection; - auto client_ptr = &*client; - - TIMING_STEP(timings, "client get"); - more_to_prepare = connection->preprocess_write_packets(); - TIMING_STEP(timings, "client prepare"); - while(system_clock::now() <= write_timeout) { - buffer_state = connection->pop_write_buffer(buffer); - more_to_write = buffer_state == 1; - - TIMING_STEP(timings, "buffer pop"); - if(buffer_state != 2) { - ssize_t res = write_datagram(io, client_ptr->remote_address, &client_ptr->address_info, buffer.length(), buffer.data_ptr()); - TIMING_STEP(timings, "buffer write"); - if(res != buffer.length()){ - if(errno == EAGAIN) { - logCritical(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN).", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); - return; - } else if(errno == EINVAL || res == -0xFEB) { - /* needs more debug */ - auto voice_client = dynamic_pointer_cast(client); - logCritical( - voice_server->server->getServerId(), - "Failed to write datagram packet ({} @ {}) for client {} ({}) {}. Dropping packet! Extra data: [fd: {}/{}, supposed socket: {}/{} => {}, client family: {}, socket family: {}]", - buffer.length(), buffer.data_ptr(), - client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), - strerror(errno), - res, - fd, - event_handle->file_descriptor, - voice_client->socket, - event_handle->socket_id, - voice_server->io->resolve_file_descriptor(voice_client), - voice_client->isAddressV4() ? "v4" : voice_client->isAddressV6() ? "v6" : "v?", - event_handle->family == AF_INET ? "v4" : "v6" - ); - } else { - logCritical( - voice_server->server->getServerId(), - "Failed to write datagram packet for client {} (errno: {} message: {}). Dropping packet!", - client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), - errno, - strerror(errno) - ); - } - break; - } + packet = nullptr; + client_wbuffer_state = client->connection->pop_write_buffer(packet); + if(!packet) { + assert(client_wbuffer_state == WBufferPopResult::DRAINED); + break; } - if(!more_to_write) - break; /* client has no more datagram packets to write */ + + ssize_t res = write_datagram(io, client->remote_address, &client->address_info, packet->packet_length(), packet->packet_data()); + if(res != packet->packet_length()) { + if(errno == EAGAIN) { + logCritical(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN).", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); + packet->unref(); + return; + } else if(errno == EINVAL || res == -0xFEB) { + /* needs more debug */ + auto voice_client = dynamic_pointer_cast(client); + logCritical( + voice_server->server->getServerId(), + "Failed to write datagram packet ({} @ {}) for client {} ({}) {}. Dropping packet! Extra data: [fd: {}/{}, supposed socket: {}/{} => {}, client family: {}, socket family: {}]", + packet->packet_length(), packet->packet_data(), + client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), + strerror(errno), + res, + fd, + event_handle->file_descriptor, + voice_client->socket, + event_handle->socket_id, + voice_server->io->resolve_file_descriptor(voice_client), + voice_client->isAddressV4() ? "v4" : voice_client->isAddressV6() ? "v6" : "v?", + event_handle->family == AF_INET ? "v4" : "v6" + ); + } else { + logCritical( + voice_server->server->getServerId(), + "Failed to write datagram packet for client {} (errno: {} message: {}). Dropping packet!", + client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), + errno, + strerror(errno) + ); + } + packet->unref(); + break; + } + packet->unref(); + if(client_wbuffer_state == WBufferPopResult::DRAINED) + break; } - if(more_to_write) { + if(client_wbuffer_state == WBufferPopResult::MORE_AVAILABLE) { /* we exceeded the max write time, rescheduling write */ - more_to_prepare = false; /* we'll call this with the next write */ voice_server->triggerWrite(client); - client.reset(); - TIMING_STEP(timings, "retrigger client"); - } - - if(more_clients) { - /* allow other clients to write as well */ - if(more_to_write) - event_handle->push_voice_write_queue(client); - client.reset(); - continue; - } - - if(!more_to_prepare) { - /* we're done with this client. Nothing more to prepare */ - client.reset(); } + client.reset(); } - retrigger |= more_to_prepare || more_to_write; + retrigger |= more_clients; } - TIMING_STEP(timings, "client-full-end"); + /* write all manually specified datagram packets */ { auto write_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ @@ -573,12 +533,8 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) retrigger |= packet != nullptr; /* memory stored at packet is not accessible anymore. But anyways pop_dg_write_queue returns 0 if there is nothing more */ } - TIMING_STEP(timings, "dgram-full-end"); if(retrigger) event_add(event_handle->event_write, nullptr); - TIMING_REPORT({ - debugMessage(0, "Write timings: {}", TIMING_FINISH_U(timings, microseconds, "mu")); - }); } void VoiceServer::send_datagram(int socket, io::datagram_packet_t packet) { diff --git a/shared b/shared index 2725c57..5842bbe 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 2725c57f2e8c75a2bd8eae1ff477de3a33241cc9 +Subproject commit 5842bbe0676cb06c3947943fb5dd422aff0bef16