From d6f483a01987205404f6f7d121b2883888dc3476 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Tue, 17 Mar 2020 12:08:07 +0100 Subject: [PATCH] totally fucked up --- git-teaspeak | 2 +- server/CMakeLists.txt | 27 +- server/lock_concept | 1 + server/namespaces | 9 + server/src/InstanceHandler.cpp | 19 +- server/src/InstanceHandler.h | 8 + server/src/TS3ServerClientManager.cpp | 6 +- server/src/VirtualServer.cpp | 138 +++--- server/src/VirtualServer.h | 2 - server/src/VirtualServerManager.cpp | 2 +- server/src/client/ConnectedClient.cpp | 6 +- server/src/client/ConnectedClient.h | 36 +- .../ConnectedClientTextCommandHandler.cpp | 5 +- server/src/client/SpeakingClient.cpp | 35 +- server/src/client/command_handler/misc.cpp | 4 +- server/src/client/query/QueryClient.cpp | 419 +++--------------- server/src/client/query/QueryClient.h | 55 +-- .../client/query/QueryClientConnection.cpp | 384 ++++++++++++++++ .../src/client/query/QueryClientConnection.h | 98 ++++ server/src/client/voice/PacketDecoder.cpp | 20 +- server/src/client/voice/PacketEncoder.cpp | 4 +- server/src/client/voice/PacketEncoder.h | 4 +- server/src/client/voice/PingHandler.cpp | 64 ++- server/src/client/voice/PingHandler.h | 21 +- server/src/client/voice/VoiceClient.cpp | 194 ++------ server/src/client/voice/VoiceClient.h | 43 +- .../voice/VoiceClientCommandHandler.cpp | 4 +- .../client/voice/VoiceClientConnection.cpp | 163 +++---- .../src/client/voice/VoiceClientConnection.h | 48 +- .../client/voice/VoiceClientHandschake.cpp | 51 ++- .../client/voice/VoiceClientPacketHandler.cpp | 8 +- server/src/client/web/WSWebClient.cpp | 2 +- server/src/client/web/WebClient.cpp | 18 +- server/src/lincense/LicenseService.cpp | 1 + server/src/server/POWHandler.cpp | 2 +- server/src/server/POWHandler.h | 1 - server/src/server/QueryServer.cpp | 19 +- server/src/server/QueryServer.h | 2 + server/src/server/VoiceIOManager.h | 11 +- server/src/server/VoiceServer.cpp | 16 +- server/src/server/VoiceServer.h | 148 +++---- server/src/server/udp-server/UDPServer.h | 38 +- shared | 2 +- 43 files changed, 1155 insertions(+), 985 deletions(-) create mode 100644 server/namespaces create mode 100644 server/src/client/query/QueryClientConnection.cpp create mode 100644 server/src/client/query/QueryClientConnection.h diff --git a/git-teaspeak b/git-teaspeak index 9a26231..cba03a6 160000 --- a/git-teaspeak +++ b/git-teaspeak @@ -1 +1 @@ -Subproject commit 9a26231c1f6c44f799419f87a3cac37a55425bdb +Subproject commit cba03a6316db76fdf056a960ee509ad20542ea44 diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 405185c..5201575 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -33,8 +33,8 @@ add_definitions(-DUSE_BORINGSSL) #3 = PRIVATE option(BUILD_TYPE "Sets the build type" OFF) option(BUILD_TYPE_NAME "Sets the build type name" OFF) -option(COMPILE_WEB_CLIENT "Enable/Disable the web cleint future" OFF) -#set(COMPILE_WEB_CLIENT "ON") +option(COMPILE_WEB_CLIENT "Enable/Disable the web client future" OFF) +set(COMPILE_WEB_CLIENT "OFF") #FIXME! set(CMAKE_VERBOSE_MAKEFILE ON) set(SERVER_SOURCE_FILES @@ -147,6 +147,20 @@ set(SERVER_SOURCE_FILES src/services/PermissionsService.cpp src/services/ClientChannelService.cpp + + src/music/PlaylistPermissions.cpp + src/lincense/LicenseService.cpp + src/groups/GroupManager.cpp + src/groups/GroupAssignmentManager.cpp + src/groups/Group.cpp + src/services/VirtualServerInformation.cpp + src/vserver/VirtualServerManager.cpp + src/services/VirtualServerBroadcastService.cpp + src/server/udp-server/UDPServer.cpp + src/client/voice/PacketEncoder.cpp + src/client/voice/PacketDecoder.cpp + src/client/voice/PingHandler.cpp + src/client/query/QueryClientConnection.cpp ) if (COMPILE_WEB_CLIENT) add_definitions(-DCOMPILE_WEB_CLIENT) @@ -160,7 +174,7 @@ if (COMPILE_WEB_CLIENT) src/client/web/WSWebClient.cpp src/client/web/SampleHandler.cpp src/client/web/VoiceBridge.cpp - src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h src/client/voice/PingHandler.cpp src/client/voice/PingHandler.h) + ) endif () add_executable(PermHelper helpers/permgen.cpp) @@ -276,7 +290,6 @@ target_link_libraries(TeaSpeakServer #Require a so sqlite3 - DataPipes::rtc::shared breakpad::static protobuf::libprotobuf @@ -290,8 +303,10 @@ target_link_libraries(TeaSpeakServer ) if (COMPILE_WEB_CLIENT) - target_link_libraries(TeaSpeakServer ${glib20_DIR}/lib/x86_64-linux-gnu/libffi.so.7 ${nice_DIR}/lib/libnice.so.10) -endif () + target_link_libraries(TeaSpeakServer DataPipes::rtc::shared ${glib20_DIR}/lib/x86_64-linux-gnu/libffi.so.7 ${nice_DIR}/lib/libnice.so.10) +else() + target_link_libraries(TeaSpeakServer DataPipes::core::shared) +endif() # include_directories(${LIBRARY_PATH}/boringssl/include/) target_link_libraries(TeaSpeakServer diff --git a/server/lock_concept b/server/lock_concept index 2492a96..566a66c 100644 --- a/server/lock_concept +++ b/server/lock_concept @@ -7,6 +7,7 @@ General lock order: When executing a command: Lock order: - Client execute lock + - Client state lock - Server state lock (Server should not try to change state while a client is executing something) Notes: The server might be null or the default server. diff --git a/server/namespaces b/server/namespaces new file mode 100644 index 0000000..6bf1fa4 --- /dev/null +++ b/server/namespaces @@ -0,0 +1,9 @@ +The general namespace prefix is ts:: + +TeaSpeak - Server: ts::server + Basic: ts::server + Sub-Server: + Query: ts::server::server::query + Voice: ts::server::server::udp + File: ts::server::server::file + Web: ts::server::server::web \ No newline at end of file diff --git a/server/src/InstanceHandler.cpp b/server/src/InstanceHandler.cpp index a4f5928..10db9f3 100644 --- a/server/src/InstanceHandler.cpp +++ b/server/src/InstanceHandler.cpp @@ -23,6 +23,7 @@ #include #include #include +#include "src/server/udp-server/UDPServer.h" #ifndef _POSIX_SOURCE #define _POSIX_SOURCE @@ -389,6 +390,12 @@ FwIDAQAB } } + this->udpServer = new server::udp::Server{}; + if(std::string error{}; !this->udpServer->initialize(error)) { + logCritical(LOG_INSTANCE, "Failed to allocate UDP server."); + return false; + } + this->voiceServerManager = new VirtualServerManager(this); if (!this->voiceServerManager->initialize(true)) { logCritical(LOG_INSTANCE, "Could not load servers!"); @@ -433,22 +440,22 @@ void InstanceHandler::stopInstance() { debugMessage(LOG_INSTANCE, "Stopping all virtual servers"); if (this->voiceServerManager) this->voiceServerManager->shutdownAll(ts::config::messages::applicationStopped); - delete this->voiceServerManager; - this->voiceServerManager = nullptr; + delete std::exchange(this->voiceServerManager, nullptr); debugMessage(LOG_INSTANCE, "All virtual server stopped"); debugMessage(LOG_QUERY, "Stopping query server"); if (this->queryServer) this->queryServer->stop(); - delete this->queryServer; - this->queryServer = nullptr; + delete std::exchange(this->queryServer, nullptr); debugMessage(LOG_QUERY, "Query server stopped"); debugMessage(LOG_FT, "Stopping file server"); if (this->fileServer) this->fileServer->stop(); - delete this->fileServer; - this->fileServer = nullptr; + delete std::exchange(this->fileServer, nullptr); debugMessage(LOG_FT, "File server stopped"); + if(this->udpServer) this->udpServer->finalize(); + delete std::exchange(this->udpServer, nullptr); + this->save_channel_permissions(); this->save_group_permissions(); diff --git a/server/src/InstanceHandler.h b/server/src/InstanceHandler.h index 823cbd9..991414c 100644 --- a/server/src/InstanceHandler.h +++ b/server/src/InstanceHandler.h @@ -23,6 +23,10 @@ namespace ts { class LicenseService; } + namespace server::udp { + class Server; + } + class InstanceHandler { public: explicit InstanceHandler(SqlDataManager*); @@ -49,6 +53,8 @@ namespace ts { ssl::SSLManager* sslManager(){ return this->sslMgr; } sql::SqlManager* getSql(){ return sql->sql(); } + [[nodiscard]] inline auto udp_server() { return this->udpServer; } + std::chrono::time_point getStartTimestamp(){ return startTimestamp; } void executeTick(VirtualServer*); @@ -112,6 +118,8 @@ namespace ts { FileServer* fileServer = nullptr; QueryServer* queryServer = nullptr; + server::udp::Server* udpServer{nullptr}; + VirtualServerManager* voiceServerManager = nullptr; DatabaseHelper* dbHelper = nullptr; bans::BanManager* banMgr = nullptr; diff --git a/server/src/TS3ServerClientManager.cpp b/server/src/TS3ServerClientManager.cpp index b1a90bb..0b32fa6 100644 --- a/server/src/TS3ServerClientManager.cpp +++ b/server/src/TS3ServerClientManager.cpp @@ -88,7 +88,7 @@ bool VirtualServer::registerClient(shared_ptr client) { bool VirtualServer::unregisterClient(shared_ptr cl, std::string reason, std::unique_lock& chan_tree_lock) { if(cl->getType() == ClientType::CLIENT_TEAMSPEAK && cl->getType() == ClientType::CLIENT_WEB) { - sassert(cl->state == ConnectionState::DISCONNECTED); + sassert(cl->state == ClientState::DISCONNECTED); } auto client_id = cl->getClientId(); @@ -128,7 +128,7 @@ bool VirtualServer::unregisterClient(shared_ptr cl, std::string } void VirtualServer::registerInternalClient(std::shared_ptr client) { - client->state = ConnectionState::CONNECTED; + client->state = ClientState::CONNECTED; { lock_guard lock(this->clients.lock); if(client->getClientId() > 0) { @@ -152,7 +152,7 @@ void VirtualServer::registerInternalClient(std::shared_ptr clie } void VirtualServer::unregisterInternalClient(std::shared_ptr client) { - client->state = ConnectionState::DISCONNECTED; + client->state = ClientState::DISCONNECTED; { auto client_id = client->getClientId(); diff --git a/server/src/VirtualServer.cpp b/server/src/VirtualServer.cpp index 5a01945..90fd21d 100644 --- a/server/src/VirtualServer.cpp +++ b/server/src/VirtualServer.cpp @@ -24,6 +24,7 @@ #include "Configuration.h" #include "VirtualServer.h" #include "src/manager/ConversationManager.h" +#include "src/server/udp-server/UDPServer.h" #include using namespace std; @@ -313,64 +314,65 @@ bool VirtualServer::start(std::string& error) { } } - auto host = this->properties()[property::VIRTUALSERVER_HOST].as(); - if(config::binding::enforce_default_voice_host) - host = config::binding::DefaultVoiceHost; + { + auto host = this->properties()[property::VIRTUALSERVER_HOST].as(); + if(config::binding::enforce_default_voice_host) + host = config::binding::DefaultVoiceHost; - if(host.empty()){ - error = "invalid host (\"" + host + "\")"; - this->stop("failed to start", true); - return false; - } - if(this->properties()[property::VIRTUALSERVER_PORT].as() <= 0){ - error = "invalid port"; - this->stop("failed to start", true); - return false; - } - - deque> bindings; - for(const auto& address : split_hosts(host, ',')) { - auto entry = make_shared(); - if(net::is_ipv4(address)) { - sockaddr_in addr{}; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as()); - if(!evaluateAddress4(address, addr.sin_addr)) { - logError(this->serverId, "Fail to resolve v4 address info for \"{}\"", address); - continue; - } - - memcpy(&entry->address, &addr, sizeof(addr)); - } else if(net::is_ipv6(address)) { - sockaddr_in6 addr{}; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as()); - if(!evaluateAddress6(address, addr.sin6_addr)) { - logError(this->serverId, "Fail to resolve v6 address info for \"{}\"", address); - continue; - } - - memcpy(&entry->address, &addr, sizeof(addr)); - } else { - logError(this->serverId, "Failed to determinate address type for \"{}\"", address); - continue; + if(host.empty()){ + error = "invalid host (\"" + host + "\")"; + this->stop("failed to start", true); + return false; + } + if(this->properties()[property::VIRTUALSERVER_PORT].as() <= 0){ + error = "invalid port"; + this->stop("failed to start", true); + return false; } - bindings.push_back(entry); - } - if(bindings.empty()) { - error = "failed to resole any host!"; - this->stop("failed to start", false); - return false; - } - //Setup voice server - udpVoiceServer = make_shared(self.lock()); - if(!udpVoiceServer->start(bindings, error)) { - error = "could not start voice server. Message: " + error; - this->stop("failed to start", false); - return false; + deque> bindings; + for(const auto& address : split_hosts(host, ',')) { + auto entry = make_shared(); + if(net::is_ipv4(address)) { + sockaddr_in addr{}; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as()); + if(!evaluateAddress4(address, addr.sin_addr)) { + logError(this->serverId, "Fail to resolve v4 address info for \"{}\"", address); + continue; + } + + memcpy(&entry->address, &addr, sizeof(addr)); + } else if(net::is_ipv6(address)) { + sockaddr_in6 addr{}; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as()); + if(!evaluateAddress6(address, addr.sin6_addr)) { + logError(this->serverId, "Fail to resolve v6 address info for \"{}\"", address); + continue; + } + + memcpy(&entry->address, &addr, sizeof(addr)); + } else { + logError(this->serverId, "Failed to determinate address type for \"{}\"", address); + continue; + } + bindings.push_back(entry); + } + if(bindings.empty()) { + error = "failed to resole any host!"; + this->stop("failed to start", false); + return false; + } + + auto result = serverInstance->udp_server()->register_virtual_server(this); + if(result != server::udp::ServerRegisterResult::SUCCESS) { + error = "failed to start udp voice server (" + std::to_string((int) result) + ")"; + this->stop("failed to start", false); + return false; + } } if(ts::config::web::activated && serverInstance->sslManager()->web_ssl_options()) { @@ -501,9 +503,7 @@ void VirtualServer::stop(const std::string& reason, bool disconnect_query) { this->musicManager->disconnectBots(); serverInstance->cancelExecute(this); - - if(this->udpVoiceServer) this->udpVoiceServer->stop(); - this->udpVoiceServer = nullptr; + serverInstance->udp_server()->unregister_virtual_server(this); #ifdef COMPILE_WEB_CLIENT if(this->webControlServer) this->webControlServer->stop(); @@ -637,30 +637,26 @@ std::shared_ptr VirtualServer::findClient(std::string name, boo } bool VirtualServer::forEachClient(std::function)> function) { - for(const auto& elm : this->getClients()) { - shared_lock close_lock(elm->finalDisconnectLock, try_to_lock_t{}); - if(close_lock.owns_lock()) //If not locked than client is on the way to disconnect - if(elm->state == ConnectionState::CONNECTED && elm->getType() != ClientType::CLIENT_INTERNAL) { - function(elm); - } - } + for(const auto& elm : this->getClients()) + if(elm->state == ClientState::CONNECTED && elm->getType() != ClientType::CLIENT_INTERNAL) + function(elm); return true; } std::vector> VirtualServer::getClients() { - vector> clients; + vector> result{}; { lock_guard lock(this->clients.lock); - clients.reserve(this->clients.count); + result.reserve(this->clients.count); for(auto& client : this->clients.clients) { if(!client) continue; - clients.push_back(client); + result.push_back(client); } } - return clients; + return result; } deque> VirtualServer::getClientsByChannel(std::shared_ptr channel) { @@ -677,7 +673,7 @@ deque> VirtualServer::getClientsByChannel(std::share for(const auto& weak_client : weak_clients) { auto client = weak_client.lock(); if(!client) continue; - if(client->connectionState() != ConnectionState::CONNECTED) continue; + if(client->connectionState() != ClientState::CONNECTED) continue; if(client->getChannel() != channel) continue; /* to be sure */ result.push_back(move(client)); @@ -1189,7 +1185,7 @@ void VirtualServer::send_text_message(const std::shared_ptr &chann bool conversation_private = channel->properties()[property::CHANNEL_FLAG_CONVERSATION_PRIVATE].as(); auto flag_password = channel->properties()[property::CHANNEL_FLAG_PASSWORD].as(); for(const auto& client : this->getClients()) { - if(client->connectionState() != ConnectionState::CONNECTED) + if(client->connectionState() != ClientState::CONNECTED) continue; auto type = client->getType(); diff --git a/server/src/VirtualServer.h b/server/src/VirtualServer.h index 24efe3e..bab23bd 100644 --- a/server/src/VirtualServer.h +++ b/server/src/VirtualServer.h @@ -204,7 +204,6 @@ namespace ts { std::shared_ptr getServerStatistics(){ return serverStatistics; } - std::shared_ptr getVoiceServer(){ return this->udpVoiceServer; } WebControlServer* getWebServer(){ return this->webControlServer; } /* calculate permissions for an client in this server */ @@ -288,7 +287,6 @@ namespace ts { std::chrono::system_clock::time_point lastTick; void executeServerTick(); - std::shared_ptr udpVoiceServer = nullptr; WebControlServer* webControlServer = nullptr; ts::server::tokens::TokenManager* tokenManager = nullptr; ComplainManager* complains = nullptr; diff --git a/server/src/VirtualServerManager.cpp b/server/src/VirtualServerManager.cpp index 3d9fa9a..8a2d836 100644 --- a/server/src/VirtualServerManager.cpp +++ b/server/src/VirtualServerManager.cpp @@ -444,6 +444,6 @@ void VirtualServerManager::tickHandshakeClients() { for(const auto& server : this->serverInstances()) { auto vserver = server->getVoiceServer(); if(vserver) - vserver->tickHandshakingClients(); + vserver->tickClients(); } } \ No newline at end of file diff --git a/server/src/client/ConnectedClient.cpp b/server/src/client/ConnectedClient.cpp index 1a404ee..52010e1 100644 --- a/server/src/client/ConnectedClient.cpp +++ b/server/src/client/ConnectedClient.cpp @@ -535,7 +535,7 @@ bool ConnectedClient::notifyClientLeftViewBanned(const std::shared_ptrstate != ConnectionState::CONNECTED) return false; + if(!enforce && this->state != ClientState::CONNECTED) return false; if(!enforce && chrono::system_clock::now() - this->lastNeededNotify < chrono::seconds(5) && this->lastNeededPermissionNotifyChannel == this->currentChannel) { //Dont spam these (hang up ui) this->requireNeededPermissionResend = true; @@ -720,7 +720,7 @@ void ConnectedClient::sendChannelDescription(const std::shared_ptr void ConnectedClient::tick(const std::chrono::system_clock::time_point &time) { ALARM_TIMER(A1, "ConnectedClient::tick", milliseconds(2)); - if(this->state == ConnectionState::CONNECTED) { + if(this->state == ClientState::CONNECTED) { if(this->requireNeededPermissionResend) this->sendNeededPermissions(false); if(this->lastOnlineTimestamp.time_since_epoch().count() == 0) { @@ -833,7 +833,7 @@ bool ConnectedClient::handleCommandFull(Command& cmd, bool disconnectOnFail) { if(generateReturnStatus) this->notifyError(result, cmd["return_code"].size() > 0 ? cmd["return_code"].first().as() : ""); - if(result.error_code() != error::ok && this->state == ConnectionState::INIT_HIGH) + if(result.error_code() != error::ok && this->state == ClientState::INITIALIZING) this->close_connection(system_clock::now()); //Disconnect now for (const auto& handler : postCommandHandler) diff --git a/server/src/client/ConnectedClient.h b/server/src/client/ConnectedClient.h index 561eb31..c455723 100644 --- a/server/src/client/ConnectedClient.h +++ b/server/src/client/ConnectedClient.h @@ -68,14 +68,13 @@ namespace ts { friend class QueryServer; friend class DataClient; friend class SpeakingClient; - friend class connection::VoiceClientConnection; friend class ts::GroupManager; friend class VirtualServerManager; public: explicit ConnectedClient(sql::SqlManager*, const std::shared_ptr& server); ~ConnectedClient() override; - ConnectionState connectionState(){ return this->state; } + ClientState connectionState(){ return this->state; } std::string getLoggingPeerIp() { return config::server::disable_ip_saving || (this->server && this->server->disable_ip_saving()) ? "X.X.X.X" : this->getPeerIp(); } std::string getPeerIp(){ return this->isAddressV4() ? net::to_string(this->getAddressV4()->sin_addr) : this->isAddressV6() ? net::to_string(this->getAddressV6()->sin6_addr) : "localhost"; } uint16_t getPeerPort(){ return ntohs(this->isAddressV4() ? this->getAddressV4()->sin_port : this->isAddressV6() ? this->getAddressV6()->sin6_port : (uint16_t) 0); } @@ -273,29 +272,14 @@ namespace ts { */ bool update_cached_permissions(); - std::shared_lock require_connected_state(bool blocking = false) { - //try_to_lock_t - std::shared_lock disconnect_lock{}; - if(blocking) [[unlikely]] - disconnect_lock = std::shared_lock{this->finalDisconnectLock}; - else - disconnect_lock = std::shared_lock{this->finalDisconnectLock, std::try_to_lock}; - - if(!disconnect_lock) [[unlikely]] - return disconnect_lock; - - { - std::lock_guard state_lock{this->state_lock}; - if(this->state != ConnectionState::CONNECTED) - return {}; - } - return disconnect_lock; - } - inline bool playlist_subscribed(const std::shared_ptr& playlist) const { return this->_subscribed_playlist.lock() == playlist; } + [[nodiscard]] auto require_connected_state() { + return ts::rwshared_lock{this->state_lock}; + } + template > [[nodiscard]] inline auto lock_command_handling() { return T{this->command_lock}; } void increase_join_state() { this->join_state_id++; } @@ -304,13 +288,11 @@ namespace ts { sockaddr_storage remote_address; //General states - std::mutex state_lock; - ConnectionState state{ConnectionState::UNKNWON}; + ts::rw_mutex state_lock{}; + ClientState state{ClientState::UNKNWON}; bool allowedToTalk = false; - std::shared_mutex finalDisconnectLock; /* locked before state lock! */ - std::deque> visibleClients{}; /* variable locked with channel_lock */ std::deque> mutedClients{}; /* variable locked with channel_lock */ std::deque> openChats{}; /* variable locked with channel_lock */ @@ -647,7 +629,7 @@ namespace ts { this->connection_lock = std::move(other.connection_lock); } - inline bool valid() const { return !!this->client && !!this->connection_lock; } + inline bool valid() const { return !!this->client && this->connection_lock.shared_locked() && client->connectionState() == ClientState::CONNECTED; } inline operator bool() const { return this->valid(); } @@ -661,7 +643,7 @@ namespace ts { T &operator*() { return *this->client; } std::shared_ptr client; - std::shared_lock connection_lock{}; + ts::rwshared_lock connection_lock{}; }; } } diff --git a/server/src/client/ConnectedClientTextCommandHandler.cpp b/server/src/client/ConnectedClientTextCommandHandler.cpp index 2760304..189178d 100644 --- a/server/src/client/ConnectedClientTextCommandHandler.cpp +++ b/server/src/client/ConnectedClientTextCommandHandler.cpp @@ -629,6 +629,7 @@ bool ConnectedClient::handle_text_command( if(!vc) return false; send_message(_this.lock(), "Packet generations:"); + auto& id_generator = vc->getConnection()->packet_encoder().id_generator(); for(const auto& type : { protocol::PacketTypeInfo::Command, protocol::PacketTypeInfo::CommandLow, @@ -639,8 +640,8 @@ bool ConnectedClient::handle_text_command( protocol::PacketTypeInfo::Ping, protocol::PacketTypeInfo::Pong}) { - auto id = vc->getConnection()->getPacketIdManager().currentPacketId(type); - auto gen = vc->getConnection()->getPacketIdManager().generationId(type); + auto id = id_generator.currentPacketId(type); + auto gen = id_generator.generationId(type); send_message(_this.lock(), " OUT " + type.name() + " => generation: " + to_string(gen) + " id: " + to_string(id)); //auto& buffer = vc->getConnection()->packet_buffers()[type.type()]; //send_message(_this.lock(), " IN " + type.name() + " => generation: " + to_string(buffer.generation(0)) + " id: " + to_string(buffer.current_index())); diff --git a/server/src/client/SpeakingClient.cpp b/server/src/client/SpeakingClient.cpp index bf16381..b8398e4 100644 --- a/server/src/client/SpeakingClient.cpp +++ b/server/src/client/SpeakingClient.cpp @@ -622,7 +622,7 @@ command_result SpeakingClient::handleCommandClientInit(Command& cmd) { { for(const auto &cl : this->server->getClients()) if((cl->getType() == CLIENT_TEAMSPEAK || cl->getType() == CLIENT_WEB || cl->getType() == CLIENT_TEASPEAK || cl->getType() == CLIENT_MUSIC)) - if(cl->connectionState() <= ConnectionState::CONNECTED && cl->connectionState() >= ConnectionState::INIT_HIGH) + if(cl->connectionState() <= ClientState::CONNECTED && cl->connectionState() >= ClientState::CONNECTED) //TODO: Get "real" state and do not ignore just initializing clients! count++; } @@ -653,19 +653,8 @@ command_result SpeakingClient::handleCommandClientInit(Command& cmd) { } } - this->postCommandHandler.emplace_back([&](){ - auto self = dynamic_pointer_cast(_this.lock()); - std::thread([self](){ - if(self->state != ConnectionState::INIT_HIGH) return; - try { - self->processJoin(); - } catch (std::exception& ex) { - logError(self->getServerId(), "Failed to proceed client join for {}. Got exception with message {}", CLIENT_STR_LOG_PREFIX_(self), ex.what()); - self->close_connection(chrono::system_clock::now() + chrono::seconds{5}); - } - }).detach(); - }); - + /* we're not doing this anymore from a different thread */ + this->processJoin(); debugMessage(this->getServerId(), "{} Client init timings: {}", CLIENT_STR_LOG_PREFIX, TIMING_FINISH(timings)); return command_result{error::ok}; } @@ -679,7 +668,7 @@ void SpeakingClient::processJoin() { this->resetIdleTime(); threads::MutexLock lock(this->command_lock); //Don't process any commands! - if(this->state != ConnectionState::INIT_HIGH) { + if(this->state != ClientState::INITIALIZING) { logError(this->getServerId(), "{} Invalid processJoin() connection state!", CLIENT_STR_LOG_PREFIX); return; } @@ -729,7 +718,7 @@ void SpeakingClient::processJoin() { TIMING_STEP(timings, "assign chan"); this->sendChannelList(true); - this->state = ConnectionState::CONNECTED; + this->state = ClientState::CONNECTED; TIMING_STEP(timings, "send chan t"); /* trick the join method */ @@ -767,7 +756,7 @@ void SpeakingClient::processJoin() { void SpeakingClient::processLeave() { auto ownLock = _this.lock(); auto server = this->getServer(); - if(server){ + if(server) { logMessage(this->getServerId(), "Voice client {}/{} ({}) from {} left.", this->getClientDatabaseId(), this->getUid(), this->getDisplayName(), this->getLoggingPeerIp() + ":" + to_string(this->getPeerPort())); { unique_lock server_channel_lock(this->server->channel_tree_lock); @@ -777,14 +766,6 @@ void SpeakingClient::processLeave() { server->musicManager->cleanup_client_bots(this->getClientDatabaseId()); //ref_server = nullptr; Removed caused nullptr exceptions } - { //Delete own viewing clients - /* - * No need, are only weak references! - threads::MutexLock l(this->viewLock); - this->visibleClients.clear(); - this->mutedClients.clear(); - */ - } } void SpeakingClient::triggerVoiceEnd() { @@ -815,7 +796,7 @@ void SpeakingClient::tick(const std::chrono::system_clock::time_point &time) { ALARM_TIMER(A1, "SpeakingClient::tick", milliseconds(2)); this->updateSpeak(true, time); - if(this->state == ConnectionState::CONNECTED) { + if(this->state == ClientState::CONNECTED) { if(this->max_idle_time.has_value) { auto max_idle = this->max_idle_time.value; if(max_idle > 0 && this->idleTimestamp.time_since_epoch().count() > 0 && duration_cast(time - this->idleTimestamp).count() > max_idle) { @@ -833,7 +814,7 @@ void SpeakingClient::updateChannelClientProperties(bool channel_lock, bool notif } command_result SpeakingClient::handleCommand(Command &command) { - if(this->connectionState() == ConnectionState::INIT_HIGH) { + if(this->connectionState() == ClientState::INITIALIZING) { if(this->handshake.state == HandshakeState::BEGIN || this->handshake.state == HandshakeState::IDENTITY_PROOF) { command_result result; if(command.command() == "handshakebegin") diff --git a/server/src/client/command_handler/misc.cpp b/server/src/client/command_handler/misc.cpp index 24932bf..a03f40a 100644 --- a/server/src/client/command_handler/misc.cpp +++ b/server/src/client/command_handler/misc.cpp @@ -569,7 +569,7 @@ command_result ConnectedClient::handleCommandSendTextMessage(Command &cmd) { if(this->handleTextMessage(ChatMessageMode::TEXTMODE_SERVER, cmd["msg"], nullptr)) return command_result{error::ok}; for(const auto& client : this->server->getClients()) { - if (client->connectionState() != ConnectionState::CONNECTED) + if (client->connectionState() != ClientState::CONNECTED) continue; auto type = client->getType(); @@ -2526,7 +2526,7 @@ command_result ConnectedClient::handleCommandConversationMessageDelete(ts::Comma auto delete_count = current_conversation->delete_messages(timestamp_end, limit, timestamp_begin, bulk["cldbid"]); if(delete_count > 0) { for(const auto& client : ref_server->getClients()) { - if(client->connectionState() != ConnectionState::CONNECTED) + if(client->connectionState() != ClientState::CONNECTED) continue; auto type = client->getType(); diff --git a/server/src/client/query/QueryClient.cpp b/server/src/client/query/QueryClient.cpp index e794abe..a12b90a 100644 --- a/server/src/client/query/QueryClient.cpp +++ b/server/src/client/query/QueryClient.cpp @@ -8,6 +8,7 @@ #include "src/InstanceHandler.h" #include #include +#include "./QueryClientConnection.h" using namespace std; using namespace std::chrono; @@ -20,41 +21,20 @@ using namespace ts::server; //#define DEBUG_TRAFFIC -QueryClient::QueryClient(QueryServer* handle, int sockfd) : ConnectedClient(handle->sql, nullptr), handle(handle), clientFd(sockfd) { +QueryClient::QueryClient(QueryServer* handle, int sockfd) : ConnectedClient(handle->sql, nullptr), handle(handle) { memtrack::allocated(this); - int enabled = 1; - int disabled = 0; - setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled)); - if(setsockopt(sockfd, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) { - logError(this->getServerId(), "[Query] Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this), errno, strerror(errno)); - } - if(setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0) { - logError(this->getServerId(), "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this), errno, strerror(errno)); - } - this->readEvent = event_new(this->handle->eventLoop, this->clientFd, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryClient*) c)->handleMessageRead(a, b, c); }, this); - this->writeEvent = event_new(this->handle->eventLoop, this->clientFd, EV_WRITE, [](int a, short b, void* c){ ((QueryClient*) c)->handleMessageWrite(a, b, c); }, this); + this->connection = new server::query::QueryClientConnection{this, sockfd}; - this->state = ConnectionState::CONNECTED; + this->state = ClientState::INITIALIZING; connectedTimestamp = system_clock::now(); this->resetEventMask(); } -void QueryClient::applySelfLock(const std::shared_ptr &cl) { +bool QueryClient::initialize(std::string& error, const std::shared_ptr &cl) { this->_this = cl; -} -QueryClient::~QueryClient() { - memtrack::freed(this); -// if(this->closeLock.tryLock() != 0) -// logCritical("Query manager deleted, but is still in usage! (closeLock)"); -// if(this->bufferLock.tryLock() != 0) -// logCritical("Query manager deleted, but is still in usage! (bufferLock)"); - this->ssl_handler.finalize(); -} - -void QueryClient::preInitialize() { this->properties()[property::CLIENT_TYPE] = ClientType::CLIENT_QUERY; this->properties()[property::CLIENT_TYPE_EXACT] = ClientType::CLIENT_QUERY; this->properties()[property::CLIENT_UNIQUE_IDENTIFIER] = "UnknownQuery"; @@ -62,19 +42,24 @@ void QueryClient::preInitialize() { DatabaseHelper::assignDatabaseId(this->sql, this->getServerId(), _this.lock()); - - if(ts::config::query::sslMode == 0) { - this->connectionType = ConnectionType::PLAIN; - this->postInitialize(); - } + /* may already calls handle_connection_initialized() */ + if(!this->connection->initialize(error)) + return false; + return true; } -void QueryClient::postInitialize() { - lock_guard lock(this->lock_packet_handle); /* we dont want to handle anything while we're initializing */ +QueryClient::~QueryClient() { + memtrack::freed(this); + + delete this->connection; +} + +void QueryClient::handle_connection_initialized() { + std::lock_guard lock{this->command_lock}; /* we dont want to handle anything while we're initializing */ this->connectTimestamp = system_clock::now(); this->properties()[property::CLIENT_LASTCONNECTED] = duration_cast(this->connectTimestamp.time_since_epoch()).count(); - if(ts::config::query::sslMode == 1 && this->connectionType != ConnectionType::SSL_ENCRIPTED) { + if(ts::config::query::sslMode == 1 && this->connection->connection_type() != server::query::ConnectionType::SSL_ENCRYPTED) { command_result error{error::failed_connection_initialisation, "Please use a SSL encryption!"}; this->notifyError(error); error.release_details(); @@ -104,7 +89,7 @@ void QueryClient::postInitialize() { } if(!this->whitelisted) { - threads::MutexLock lock(this->handle->loginLock); + threads::MutexLock llock(this->handle->loginLock); if(this->handle->queryBann.count(this->getPeerIp()) > 0) { auto ban = this->handle->queryBann[this->getPeerIp()]; Command cmd("error"); @@ -120,12 +105,26 @@ void QueryClient::postInitialize() { this->update_cached_permissions(); } -void QueryClient::writeMessage(const std::string& message) { - if(this->state == ConnectionState::DISCONNECTED || !this->handle) return; +void QueryClient::handle_connection_finalized() { + /* when this has been called there could not be any command executing! */ + //TODO: Is this statement really true? - if(this->connectionType == ConnectionType::PLAIN) this->writeRawMessage(message); - else if(this->connectionType == ConnectionType::SSL_ENCRIPTED) this->ssl_handler.send(pipes::buffer_view{(void*) message.data(), message.length()}); - else logCritical(LOG_GENERAL, "Invalid query connection type to write to!"); + if(this->server) { + { + unique_lock channel_lock(this->server->channel_tree_lock); + this->server->unregisterClient(_this.lock(), "disconnected", channel_lock); + } + this->server->groups->disableCache(this->getClientDatabaseId()); + this->server = nullptr; + } + + if(this->handle) + this->handle->unregisterConnection(dynamic_pointer_cast(_this.lock())); +} + +void QueryClient::writeMessage(const std::string& message) { + if(this->state == ClientState::DISCONNECTED) return; + this->connection->send_data(message); } @@ -142,331 +141,25 @@ bool QueryClient::close_connection(const std::chrono::system_clock::time_point& auto ownLock = dynamic_pointer_cast(_this.lock()); if(!ownLock) return false; - unique_lock handleLock(this->lock_packet_handle); - unique_lock lock(this->closeLock); - - bool flushing = flushTimeout.time_since_epoch().count() != 0; - if(this->state == ConnectionState::DISCONNECTED || (flushing && this->state == ConnectionState::DISCONNECTING)) return false; - this->state = flushing ? ConnectionState::DISCONNECTING : ConnectionState::DISCONNECTED; - - if(this->readEvent) { //Attention dont trigger this within the read thread! - event_del_block(this->readEvent); - event_free(this->readEvent); - this->readEvent = nullptr; - } - - if(this->server){ - { - unique_lock channel_lock(this->server->channel_tree_lock); - this->server->unregisterClient(_this.lock(), "disconnected", channel_lock); - } - this->server->groups->disableCache(this->getClientDatabaseId()); - this->server = nullptr; - } - - if(flushing){ - this->flushThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [ownLock, flushTimeout](){ - while(ownLock->state == ConnectionState::DISCONNECTING && flushTimeout > system_clock::now()){ - { - std::lock_guard buffer_lock(ownLock->buffer_lock); - if(ownLock->readQueue.empty() && ownLock->writeQueue.empty()) break; - } - usleep(10 * 1000); - } - if(ownLock->state == ConnectionState::DISCONNECTING) ownLock->disconnectFinal(); - }); - flushThread->name("Flush thread QC").execute(); - } else { - threads::MutexLock l1(this->flushThreadLock); - handleLock.unlock(); - lock.unlock(); - if(this->flushThread){ - threads::NegatedMutexLock l(this->closeLock); - this->flushThread->join(); - } - disconnectFinal(); - } + this->connection->close_connection(flushTimeout); return true; } -void QueryClient::disconnectFinal() { - lock_guard lock_tick(this->lock_query_tick); - lock_guard lock_handle(this->lock_packet_handle); - threads::MutexLock lock_close(this->closeLock); - std::unique_lock buffer_lock(this->buffer_lock, try_to_lock); +bool QueryClient::process_next_command() { + using CommandAssembleState = server::query::CommandAssembleState; - if(final_disconnected) { - logError(LOG_QUERY, "Tried to disconnect a client twice!"); - return; - } - final_disconnected = true; - this->state = ConnectionState::DISCONNECTED; - { - threads::MutexTryLock l(this->flushThreadLock); - if(!!l) { - if(this->flushThread) { - this->flushThread->detach(); - delete this->flushThread; //Release the captured this lock - this->flushThread = nullptr; - } - } - } + lock_guard clock(this->command_lock); + if(!this->handle || this->state == ClientState::DISCONNECTED) return false; - if(this->writeEvent) { - event_del_block(this->writeEvent); - event_free(this->writeEvent); - this->writeEvent = nullptr; - } - if(this->readEvent) { - event_del_block(this->readEvent); - event_free(this->readEvent); - this->readEvent = nullptr; - } - if(this->clientFd > 0) { - if(shutdown(this->clientFd, SHUT_RDWR) < 0) - debugMessage(LOG_QUERY, "Could not shutdown query client socket! {} ({})", errno, strerror(errno)); - if(close(this->clientFd) < 0) - debugMessage(LOG_QUERY, "Failed to close the query client socket! {} ({})", errno, strerror(errno)); - this->clientFd = -1; - } - - if(this->server) { - { - unique_lock channel_lock(this->server->channel_tree_lock); - this->server->unregisterClient(_this.lock(), "disconnected", channel_lock); - } - this->server->groups->disableCache(this->getClientDatabaseId()); - this->server = nullptr; - } - - this->readQueue.clear(); - this->writeQueue.clear(); - - if(this->handle) - this->handle->unregisterConnection(dynamic_pointer_cast(_this.lock())); -} - -void QueryClient::writeRawMessage(const std::string &message) { - { - std::lock_guard lock(this->buffer_lock); - this->writeQueue.push_back(message); - } - if(this->writeEvent) event_add(this->writeEvent, nullptr); -} - -void QueryClient::handleMessageWrite(int fd, short, void *) { - auto ownLock = _this.lock(); - - std::unique_lock buffer_lock(this->buffer_lock, try_to_lock); - if(this->state == ConnectionState::DISCONNECTED) return; - if(!buffer_lock.owns_lock()) { - if(this->writeEvent) - event_add(this->writeEvent, nullptr); - return; - } - - int writes = 0; - string buffer; - while(writes < 10 && !this->writeQueue.empty()) { - if(buffer.empty()) { - buffer = std::move(this->writeQueue.front()); - this->writeQueue.pop_front(); - } - auto length = send(fd, buffer.data(), buffer.length(), MSG_NOSIGNAL); -#ifdef DEBUG_TRAFFIC - debugMessage("Write " + to_string(buffer.length())); - hexDump((void *) buffer.data(), buffer.length()); -#endif - if(length == -1) { - if (errno == EINTR || errno == EAGAIN) { - if(this->writeEvent) - event_add(this->writeEvent, nullptr); - return; - } - else { - logError(LOG_QUERY, "{} Failed to write message: {} ({} => {})", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno)); - threads::Thread([=](){ ownLock->close_connection(chrono::system_clock::now() + chrono::seconds{5}); }).detach(); - return; - } - } else { - if(buffer.length() == length) - buffer = ""; - else - buffer = buffer.substr(length); - } - writes++; - } - if(!buffer.empty()) - this->writeQueue.push_front(buffer); - - if(!this->writeQueue.empty() && this->writeEvent) - event_add(this->writeEvent, nullptr); -} - -void QueryClient::handleMessageRead(int fd, short, void *) { - auto ownLock = dynamic_pointer_cast(_this.lock()); - if(!ownLock) { - logCritical(LOG_QUERY, "Could not get own lock!"); - return; - - } - - string buffer(1024, 0); - - auto length = read(fd, (void*) buffer.data(), buffer.length()); - if(length <= 0){ - if(errno == EINTR || errno == EAGAIN) - ;//event_add(this->readEvent, nullptr); - else if(length == 0 && errno == 0) { - logMessage(LOG_QUERY, "{} Connection closed. Client disconnected.", CLIENT_STR_LOG_PREFIX); - event_del_noblock(this->readEvent); - std::thread([ownLock]{ - ownLock->close_connection(); - }).detach(); - } else { - logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno)); - event_del_noblock(this->readEvent); - threads::Thread(THREAD_SAVE_OPERATIONS, [ownLock](){ ownLock->close_connection(); }).detach(); - } - return; - } - - buffer.resize(length); - { - std::lock_guard buffer_lock(this->buffer_lock); - if(this->state == ConnectionState::DISCONNECTED) - return; - this->readQueue.push_back(std::move(buffer)); -#ifdef DEBUG_TRAFFIC - debugMessage("Read " + to_string(buffer.length())); - hexDump((void *) buffer.data(), buffer.length()); -#endif - } - - if(this->handle) - this->handle->executePool()->execute([ownLock]() { - int counter = 0; - while(ownLock->tickIOMessageProgress() && counter++ < 15); - }); -} - -bool QueryClient::tickIOMessageProgress() { - lock_guard lock(this->lock_packet_handle); - if(!this->handle || this->state == ConnectionState::DISCONNECTED || this->state == ConnectionState::DISCONNECTING) return false; - - string message; - bool next = false; - { - std::lock_guard buffer_lock(this->buffer_lock); - if(this->readQueue.empty()) return false; - message = std::move(this->readQueue.front()); - this->readQueue.pop_front(); - next |= this->readQueue.empty(); - } - - - if(this->connectionType == ConnectionType::PLAIN) { - int count = 0; - while(this->handleMessage(pipes::buffer_view{(void*) message.data(), message.length()}) && count++ < 15) message = ""; - next |= count == 15; - } else if(this->connectionType == ConnectionType::SSL_ENCRIPTED) { - this->ssl_handler.process_incoming_data(pipes::buffer_view{(void*) message.data(), message.length()}); - } else if(this->connectionType == ConnectionType::UNKNOWN) { - if(config::query::sslMode != 0 && pipes::SSL::isSSLHeader(message)) { - this->initializeSSL(); - - /* - * - Content - * \x16 - * -Version (1) - * \x03 \x00 - * - length (2) - * \x00 \x04 - * - * - Header - * \x00 -> hello request (3) - * \x05 -> length (4) - */ - - //this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10)); - } else { - this->connectionType = ConnectionType::PLAIN; - this->postInitialize(); - } - next = true; - { - std::lock_guard buffer_lock(this->buffer_lock); - this->readQueue.push_front(std::move(message)); - } - } - return next; -} - -extern InstanceHandler* serverInstance; - -void QueryClient::initializeSSL() { - this->connectionType = ConnectionType::SSL_ENCRIPTED; - - this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true); - this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true); - - this->ssl_handler.callback_data(std::bind(&QueryClient::handleMessage, this, placeholders::_1)); - this->ssl_handler.callback_write(std::bind(&QueryClient::writeRawMessage, this, placeholders::_1)); - this->ssl_handler.callback_initialized = std::bind(&QueryClient::postInitialize, this); - - this->ssl_handler.callback_error([&](int code, const std::string& message) { - if(code == PERROR_SSL_ACCEPT) { - this->disconnect("invalid accept"); - } else if(code == PERROR_SSL_TIMEOUT) - this->disconnect("invalid accept (timeout)"); - else - logError(LOG_QUERY, "Got unknown ssl error ({} | {})", code, message); - }); - - { - auto context = serverInstance->sslManager()->getQueryContext(); - - auto options = make_shared(); - options->type = pipes::SSL::SERVER; - options->context_method = TLS_method(); - options->default_keypair({context->privateKey, context->certificate}); - if(!this->ssl_handler.initialize(options)) { - logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX); - } - } -} - -bool QueryClient::handleMessage(const pipes::buffer_view& message) { - { - threads::MutexLock l(this->closeLock); - if(this->state == ConnectionState::DISCONNECTED) - return false; - } -#ifdef DEBUG_TRAFFIC - debugMessage("Handling message " + to_string(message.length())); - hexDump((void *) message.data(), message.length()); -#endif - - - string command; - { - this->lineBuffer += message.string(); - int length = 2; - auto pos = this->lineBuffer.find("\r\n"); - if(pos == string::npos) pos = this->lineBuffer.find("\n\r"); - if(pos == string::npos) { - length = 1; - pos = this->lineBuffer.find('\n'); - } - - if(pos != string::npos){ - command = this->lineBuffer.substr(0, pos); - if(this->lineBuffer.size() > pos + length) - this->lineBuffer = this->lineBuffer.substr(pos + length); - else - this->lineBuffer.clear(); - } - if(pos == string::npos) return false; + string command{}; + bool more_pending{false}; + switch (this->connection->next_command(command)) { + case CommandAssembleState::MORE_COMMANDS_PENDING: + more_pending = true; + case CommandAssembleState::SUCCESS: + break; + case CommandAssembleState::NO_COMMAND_PENDING: + return false; /* nothing to do */ } if(command.empty() || command.find_first_not_of(' ') == string::npos) { //Empty command @@ -515,7 +208,7 @@ bool QueryClient::handleMessage(const pipes::buffer_view& message) { error = command_result{error::vs_critical, std::string{ex.what()}}; goto handle_error; } - return true; + return more_pending; handle_error: this->notifyError(error); @@ -539,15 +232,13 @@ void QueryClient::tick(const std::chrono::system_clock::time_point &time) { } void QueryClient::queryTick() { - lock_guard lock_tick(this->lock_query_tick); if(this->idleTimestamp.time_since_epoch().count() > 0 && system_clock::now() - this->idleTimestamp > minutes(5)){ debugMessage(LOG_QUERY, "Dropping client " + this->getLoggingPeerIp() + "|" + this->getDisplayName() + ". (Timeout)"); this->close_connection(system_clock::now() + seconds(1)); } - if(this->connectionType == ConnectionType::UNKNOWN && system_clock::now() - milliseconds(500) > connectedTimestamp) { - this->connectionType = ConnectionType::PLAIN; - this->postInitialize(); + if(this->connection->connection_type() == server::query::ConnectionType::UNKNOWN && system_clock::now() - milliseconds{500} > connectedTimestamp) { + this->connection->enforce_text_connection(); } } diff --git a/server/src/client/query/QueryClient.h b/server/src/client/query/QueryClient.h index 127b61b..8031dd1 100644 --- a/server/src/client/query/QueryClient.h +++ b/server/src/client/query/QueryClient.h @@ -10,19 +10,18 @@ namespace ts::server { class QueryServer; class QueryAccount; + namespace server::query { + class QueryClientConnection; + } + class QueryClient : public ConnectedClient { friend class QueryServer; - - enum ConnectionType { - PLAIN, - SSL_ENCRIPTED, - UNKNOWN - }; + friend class server::query::QueryClientConnection; public: QueryClient(QueryServer*, int sockfd); ~QueryClient() override; - + [[nodiscard]] inline QueryServer* getQueryServer() { return this->handle; } void writeMessage(const std::string&); void sendCommand(const ts::Command &command, bool low = false) override; @@ -30,7 +29,6 @@ namespace ts::server { bool disconnect(const std::string &reason) override; bool close_connection(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point()) override; - void disconnectFinal(); bool eventActive(QueryEventGroup, QueryEventSpecifier); void toggleEvent(QueryEventGroup, QueryEventSpecifier, bool); @@ -41,51 +39,26 @@ namespace ts::server { inline std::shared_ptr getQueryAccount() { return this->query_account; } protected: - void preInitialize(); - void postInitialize(); + /* Will be called as soon the connection has been initialized. This means directly within the initialize call or within the IO read callback. */ + void handle_connection_initialized(); + void handle_connection_finalized(); + void tick(const std::chrono::system_clock::time_point &time) override; void queryTick(); protected: - void initializeSSL(); + /* returns true if more commands are pending */ + bool process_next_command(); - bool handleMessage(const pipes::buffer_view&); - bool tickIOMessageProgress(); - - void handleMessageRead(int, short, void*); - void handleMessageWrite(int, short, void*); - void writeRawMessage(const std::string&); - - void applySelfLock(const std::shared_ptr &cl); + [[nodiscard]] bool initialize(std::string& /* error */, const std::shared_ptr &cl); private: QueryServer* handle; - - ConnectionType connectionType = ConnectionType::UNKNOWN; - + server::query::QueryClientConnection* connection{nullptr}; bool whitelisted = false; - int clientFd = -1; - ::event* readEvent = nullptr; - ::event* writeEvent = nullptr; - threads::Mutex closeLock; - - pipes::SSL ssl_handler; - - std::mutex buffer_lock; - std::deque writeQueue; - std::deque readQueue; - - threads::Mutex flushThreadLock; - threads::Thread* flushThread = nullptr; - bool final_disconnected = false; - - std::string lineBuffer; std::chrono::time_point connectedTimestamp; uint16_t eventMask[QueryEventGroup::QEVENTGROUP_MAX]; - std::recursive_mutex lock_packet_handle; - std::recursive_mutex lock_query_tick; - std::shared_ptr query_account; protected: command_result handleCommand(Command &command) override; diff --git a/server/src/client/query/QueryClientConnection.cpp b/server/src/client/query/QueryClientConnection.cpp new file mode 100644 index 0000000..b95c565 --- /dev/null +++ b/server/src/client/query/QueryClientConnection.cpp @@ -0,0 +1,384 @@ +// +// Created by WolverinDEV on 11/03/2020. +// + +#include "./QueryClientConnection.h" + +#include +#include +#include +#include + +#include "./QueryClient.h" +#include "../ConnectedClient.h" + +#include "../../server/QueryServer.h" +#include "QueryClientConnection.h" + +using namespace ts::server::server::query; + +#if defined(TCP_CORK) && !defined(TCP_NOPUSH) + #define TCP_NOPUSH TCP_CORK +#endif + +namespace ts::server::server::query { + /* will be set by the event loop */ + thread_local bool thread_is_event_loop{false}; +} + +QueryClientConnection::QueryClientConnection(ts::server::QueryClient *client, int fd) : client_handle{client}, file_descriptor_{fd} { + TAILQ_INIT(&this->write_queue); +} + +QueryClientConnection::~QueryClientConnection() { + this->finalize(true); +} + +bool QueryClientConnection::initialize(std::string &error) { + assert(this->client_handle); + + int enabled{1}; + int disabled{0}; + setsockopt(this->file_descriptor_, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled)); + if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) + logError(LOG_QUERY, "Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno)); + + if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0) + logError(LOG_QUERY, "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno)); + + auto query_server = this->client_handle->getQueryServer(); + this->readEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_READ | EV_PERSIST, [](int a1, short a2, void* _this) { + reinterpret_cast(_this)->handle_event_read(a1, a2); + }, this); + this->writeEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_WRITE, [](int a1, short a2, void* _this){ + reinterpret_cast(_this)->handle_event_write(a1, a2); + }, this); + + this->connection_state = ConnectionState::INITIALIZING; + + if(ts::config::query::sslMode == 0) { + this->connection_state = ConnectionState::CONNECTED; + this->connection_type_ = ConnectionType::PLAIN_TEXT; + + this->client_handle->handle_connection_initialized(); + } + return true; +} + +void QueryClientConnection::add_read_event() { + std::lock_guard elock{this->event_mutex}; + if(this->readEvent) event_add(this->readEvent, nullptr); +} + +void QueryClientConnection::finalize(bool is_destructor_call) { + auto old_state = this->connection_state; + this->connection_state = ConnectionState::DISCONNECTED; + + /* unregister event handling */ + { + std::unique_lock elock{this->event_mutex}; + auto wevent = std::exchange(this->writeEvent, nullptr); + auto revent = std::exchange(this->readEvent, nullptr); + elock.unlock(); + if(revent) { + if(thread_is_event_loop) + event_del_noblock(revent); + else + event_del_block(revent); /* may calls finalize() while we're waiting. But thats okey. */ + event_free(revent); + } + if(wevent) { + if(thread_is_event_loop) + event_del_noblock(wevent); + else + event_del_block(wevent); /* may calls finalize() while we're waiting. But thats okey. */ + event_free(wevent); + } + } + + { + std::lock_guard block{this->buffer_lock}; + + /* Free the entire tail queue. */ + while (auto buffer = TAILQ_FIRST(&this->write_queue)) { + TAILQ_REMOVE(&this->write_queue, buffer, tq); + free(buffer->original_ptr); + delete buffer; + } + TAILQ_INIT(&this->write_queue); /* just ensures a valid tailq */ + + ::free(this->read_buffer.buffer); + this->read_buffer.buffer = nullptr; + this->read_buffer.length = 0; + this->read_buffer.fill_count = 0; + } + + if(!is_destructor_call && old_state != ConnectionState::DISCONNECTED) + this->client_handle->handle_connection_finalized(); +} + +void QueryClientConnection::handle_event_read(int fd, short events) { + constexpr auto buffer_length{1024 * 4}; + uint8_t buffer[buffer_length]; + + auto length = read(fd, (void *) buffer, buffer_length); + if (length <= 0) { + if (errno == EINTR || errno == EAGAIN) + return; + else if (length == 0) { + logMessage(LOG_QUERY, "{} Connection closed (r). Client disconnected.", + CLIENT_STR_LOG_PREFIX_(this->client_handle)); + } else { + logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}", + CLIENT_STR_LOG_PREFIX_(this->client_handle), length, errno, strerror(errno)); + } + event_del_noblock(this->readEvent); + this->close_connection(std::chrono::system_clock::time_point{}); + return; + } + + if (this->connection_type_ == ConnectionType::PLAIN_TEXT) { + plain_text_buffer_insert: + this->handle_decoded_message(buffer, length); + } else if (this->connection_type_ == ConnectionType::SSL_ENCRYPTED) { + ssl_buffer_insert:; + this->ssl_handler.process_incoming_data(pipes::buffer_view{(const char*) buffer, (size_t) length});; + } else { + if (config::query::sslMode != 0 && pipes::SSL::isSSLHeader(std::string{(const char *) buffer, (size_t) length})) { + if(!this->initialize_ssl()) return; + + /* + * - Content + * \x16 + * -Version (1) + * \x03 \x00 + * - length (2) + * \x00 \x04 + * + * - Header + * \x00 -> hello request (3) + * \x05 -> length (4) + */ + + //this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10)); + goto ssl_buffer_insert; + } else { + this->connection_type_ = ConnectionType::PLAIN_TEXT; + this->client_handle->handle_connection_initialized(); + goto plain_text_buffer_insert; + } + } +} + +void QueryClientConnection::handle_event_write(int fd, short events) { + bool readd_write{false}; + if(events & EV_WRITE) { + /* Safe to access, because we're only reading the queue and the head could never change. Only within the IO loop itself. */ + WriteBuffer* wbuffer; + while((wbuffer = TAILQ_FIRST(&this->write_queue))) { + auto written = send(fd, wbuffer->ptr, wbuffer->length, 0); + if(written <= 0) { + if(errno == EAGAIN) { + readd_write = true; + break; + } + if(written == 0) { + logMessage(LOG_QUERY, "{} Connection closed (w). Client disconnected.", CLIENT_STR_LOG_PREFIX_(this->client_handle)); + } else { + logError(LOG_QUERY, "{} Failed to write! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX_(this->client_handle), written, errno, strerror(errno)); + } + event_del_noblock(this->readEvent); + this->close_connection(std::chrono::system_clock::time_point{}); + return; + } + + wbuffer->length -= written; + if(wbuffer->length == 0) { + std::lock_guard block{this->buffer_lock}; + TAILQ_REMOVE(&this->write_queue, wbuffer, tq); + + ::free(wbuffer->original_ptr); + delete wbuffer; + } else { + wbuffer->ptr += written; + } + } + } + + if(this->connection_state == ConnectionState::DISCONNECTING) { + if(!readd_write || (events & EV_TIMEOUT)) { + /* disconnect timeouted or nothing more to write */ + this->finalize(false); + return; + } else /* if(readd_write) */ { /* check not needed because tested before already */ + auto time_left = this->disconnect_timeout - std::chrono::system_clock::now(); + timeval timeout{0, 1}; + if(time_left.count() > 0) { + timeout.tv_sec = std::chrono::floor(time_left).count(); + timeout.tv_usec = std::chrono::floor(time_left).count() % 1000000ULL; + } + event_add(this->writeEvent, &timeout); + } + } else if(readd_write) { + event_add(this->writeEvent, nullptr); + } +} + + +bool QueryClientConnection::initialize_ssl() { + this->connection_type_ = ConnectionType::SSL_ENCRYPTED; + + this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true); + this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true); + + this->ssl_handler.callback_data([&](const pipes::buffer_view &buffer) { + this->handle_decoded_message(buffer.data_ptr(), buffer.length()); + }); + + this->ssl_handler.callback_write([&](const pipes::buffer_view &buffer) { + this->send_data_raw({buffer.data_ptr(), buffer.length()}); + }); + + this->ssl_handler.callback_initialized = [&] { + this->client_handle->handle_connection_initialized(); + }; + + this->ssl_handler.callback_error([&](int code, const std::string& message) { + if(code == PERROR_SSL_ACCEPT) { + logError(LOG_QUERY, "{} Failed to initialize query ssl session ({})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message); + this->close_connection(std::chrono::system_clock::time_point{}); + } else if(code == PERROR_SSL_TIMEOUT) { + logError(LOG_QUERY, "{} Failed to initialize query ssl session (timeout: {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message); + this->close_connection(std::chrono::system_clock::time_point{}); + } else + logError(LOG_QUERY, "{} Received SSL error ({} | {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), code, message); + }); + + { + auto context = serverInstance->sslManager()->getQueryContext(); + + auto options = std::make_shared(); + options->type = pipes::SSL::SERVER; + options->context_method = TLS_method(); + options->default_keypair({context->privateKey, context->certificate}); + if(!this->ssl_handler.initialize(options)) { + logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX_(this->client_handle)); + this->close_connection(std::chrono::system_clock::time_point{}); + return false; + } + } + return true; +} + +void QueryClientConnection::handle_decoded_message(const void *buffer, size_t size) { + { + std::lock_guard block{this->buffer_lock}; + if((this->read_buffer.length - this->read_buffer.fill_count) < size) { /* !this->read_buffer.buffer is already implicitly implemented because by default read_buffer.length will be zero */ + const auto new_size{this->read_buffer.length + size + 128}; + auto new_buffer = ::malloc(new_size); + assert(new_buffer); + + if(this->read_buffer.fill_count) memcpy(new_buffer, this->read_buffer.buffer, this->read_buffer.fill_count); + ::free(this->read_buffer.buffer); + + this->read_buffer.buffer = new_buffer; + this->read_buffer.length = new_size; + } + assert(this->read_buffer.buffer); + assert(this->read_buffer.length - this->read_buffer.fill_count >= size); + + memcpy((char*) this->read_buffer.buffer + this->read_buffer.fill_count, buffer, size); + this->read_buffer.fill_count += size; + } + { + //TODO: Improve this command progress + auto qserver{this->client_handle->handle}; + if(qserver) { + auto wlock{this->client_handle->_this}; + qserver->executePool()->execute([wlock]() { + auto client{std::dynamic_pointer_cast(wlock.lock())}; + if(!client) return; + + int counter = 0; + while(client->process_next_command() && counter++ < 15); + }); + } + } +} + +void QueryClientConnection::send_data(const std::string_view &buffer) { + if(this->connection_type_ == ConnectionType::PLAIN_TEXT) + this->send_data_raw(buffer); + else if(this->connection_type_ == ConnectionType::SSL_ENCRYPTED) + this->ssl_handler.send(pipes::buffer_view{buffer.data(), buffer.length()}); +} + +void QueryClientConnection::send_data_raw(const std::string_view &buffer) { + auto wbuf = new WriteBuffer{}; + wbuf->original_ptr = (char*) malloc(buffer.length()); + wbuf->ptr = wbuf->original_ptr; + + memcpy(wbuf->ptr, buffer.data(), buffer.length()); + wbuf->length = buffer.length(); + + { + std::lock_guard wlock{this->buffer_lock}; + TAILQ_INSERT_TAIL(&this->write_queue, wbuf, tq); + } + + { + std::lock_guard elock{this->event_mutex}; + if(this->writeEvent) + event_add(this->writeEvent, nullptr); + } +} + +void QueryClientConnection::close_connection(const std::chrono::system_clock::time_point &timeout) { + if(timeout.time_since_epoch().count() > 0) { + this->connection_state = ConnectionState::DISCONNECTING; + this->disconnect_timeout = timeout; + + std::lock_guard elock{this->event_mutex}; + if(this->writeEvent) { + event_add(this->writeEvent, nullptr); + return; + } + /* failed to add the write event, so call disconnect */ + } + + if(this->connection_state == ConnectionState::DISCONNECTED) return; + this->finalize(false); +} + +void QueryClientConnection::enforce_text_connection() { + if(this->connection_state != ConnectionState::INITIALIZING) return; + + this->connection_state = ConnectionState::CONNECTED; + this->connection_type_ = ConnectionType::PLAIN_TEXT; + this->client_handle->handle_connection_initialized(); +} + +CommandAssembleState QueryClientConnection::next_command(std::string &result) { + std::lock_guard block{this->buffer_lock}; + + auto new_line_idx = (char*) memchr(this->read_buffer.buffer, '\n', this->read_buffer.fill_count); + if(!new_line_idx) return CommandAssembleState::NO_COMMAND_PENDING; + + const auto length = ((char*) this->read_buffer.buffer - new_line_idx) * sizeof(*new_line_idx); + auto line_length{length}; + if(length > 0 && *(new_line_idx - 1) == '\r') + line_length--; + + result.assign((char*) this->read_buffer.buffer, line_length); + + //Do not copy the \r character + auto copy_bytes{this->read_buffer.fill_count - length}; + if(copy_bytes > 0 && *(new_line_idx + 1) == '\r') { + copy_bytes--; + new_line_idx++; + } + memcpy(this->read_buffer.buffer, new_line_idx + 1, copy_bytes); + this->read_buffer.fill_count = copy_bytes; + + return copy_bytes == 0 ? CommandAssembleState::SUCCESS : CommandAssembleState::MORE_COMMANDS_PENDING; +} \ No newline at end of file diff --git a/server/src/client/query/QueryClientConnection.h b/server/src/client/query/QueryClientConnection.h new file mode 100644 index 0000000..90a4382 --- /dev/null +++ b/server/src/client/query/QueryClientConnection.h @@ -0,0 +1,98 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace ts::server { + class QueryClient; +} + +namespace ts::server::server::query { + enum struct ConnectionType { + UNKNOWN, + + PLAIN_TEXT, + SSL_ENCRYPTED, + + /* SSH */ + }; + + enum struct ConnectionState { + INITIALIZING, + CONNECTED, + DISCONNECTING, + DISCONNECTED + }; + + enum struct CommandAssembleState { + SUCCESS, + MORE_COMMANDS_PENDING, + + NO_COMMAND_PENDING + }; + + class QueryClientConnection { + public: + explicit QueryClientConnection(QueryClient* /* client */, int /* file descriptor */); + ~QueryClientConnection(); + + [[nodiscard]] inline ConnectionType connection_type() const { return this->connection_type_; } + + bool initialize(std::string& /* error */); + void add_read_event(); + + void finalize(bool /* is destructor call */); + + void send_data(const std::string_view& /* payload */); + void send_data_raw(const std::string_view& /* payload */); + + void enforce_text_connection(); + [[nodiscard]] CommandAssembleState next_command(std::string& /* command */); + + /* could be called from every thread (event IO thread) */ + void close_connection(const std::chrono::system_clock::time_point& /* disconnect timeout */); + private: + struct WriteBuffer { + char* original_ptr; + char* ptr; + size_t length; + + TAILQ_ENTRY(WriteBuffer) tq; + }; + + QueryClient* client_handle{nullptr}; + ConnectionState connection_state{ConnectionState::INITIALIZING}; + std::chrono::system_clock::time_point disconnect_timeout{}; + + ConnectionType connection_type_{ConnectionType::UNKNOWN}; + int file_descriptor_{-1}; + + /* only delete the events within the event loop! */ + std::mutex event_mutex{}; + ::event* readEvent{nullptr}; + ::event* writeEvent{nullptr}; + + pipes::SSL ssl_handler{}; + + std::mutex buffer_lock{}; + struct { + void* buffer{nullptr}; + size_t length{0}; + size_t fill_count{0}; + std::chrono::system_clock::time_point last_shrink{}; + } read_buffer; + TAILQ_HEAD(, WriteBuffer) write_queue{}; + + void handle_event_write(int, short); + void handle_event_read(int, short); + + + bool initialize_ssl(); + void handle_decoded_message(const void* /* message */, size_t /* length */); + }; +} \ No newline at end of file diff --git a/server/src/client/voice/PacketDecoder.cpp b/server/src/client/voice/PacketDecoder.cpp index 71e7b51..5dd9873 100644 --- a/server/src/client/voice/PacketDecoder.cpp +++ b/server/src/client/voice/PacketDecoder.cpp @@ -7,11 +7,13 @@ #include #include #include -#include +#include #include "../../ConnectionStatistics.h" using namespace ts; +using namespace ts::protocol; +using namespace ts::connection; using namespace ts::server::server::udp; PacketDecoder::PacketDecoder(ts::connection::CryptHandler *crypt_handler) @@ -69,7 +71,7 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool /* handle the order stuff */ auto& fragment_buffer = this->_command_fragment_buffers[PacketDecoder::command_fragment_buffer_index(packet_parser.type())]; - unique_lock queue_lock(fragment_buffer.buffer_lock); + std::unique_lock queue_lock(fragment_buffer.buffer_lock); auto result = fragment_buffer.accept_index(packet_parser.packet_id()); if(result != 0) { /* packet index is ahead buffer index */ error = "pid: " + std::to_string(packet_parser.packet_id()) + ","; @@ -78,7 +80,7 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool if(result == -1) { /* underflow */ /* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */ - this->callback_send_acknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); + this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); return PacketDecodeResult::DUPLICATED_PACKET; } @@ -145,7 +147,7 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool }; { - unique_lock queue_lock(fragment_buffer.buffer_lock); + std::unique_lock queue_lock(fragment_buffer.buffer_lock); if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) return PacketDecodeResult::COMMAND_INSTERT_FAILED; @@ -156,6 +158,8 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool } else { this->callback_decoded_packet(this->callback_argument, packet_parser); } + + return PacketDecodeResult::SUCCESS; } bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) { @@ -169,14 +173,14 @@ bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) { CommandReassembleResult PacketDecoder::reassemble_command(pipes::buffer &result, bool &is_command_low) { bool more_commands_pending{false}; command_fragment_buffer_t* buffer{nullptr}; - unique_lock buffer_lock; /* general buffer lock */ + std::unique_lock buffer_lock; /* general buffer lock */ { //FIXME: Currently command low packets cant be handled if there is a command packet stuck in reassemble queue /* handle commands before command low packets */ for(auto& buf : this->_command_fragment_buffers) { - unique_lock ring_lock(buf.buffer_lock, try_to_lock); + std::unique_lock ring_lock(buf.buffer_lock, std::try_to_lock); if(!ring_lock.owns_lock()) continue; if(buf.front_set()) { @@ -284,13 +288,13 @@ void PacketDecoder::force_insert_command(const pipes::buffer_view &buffer) { { auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; - unique_lock queue_lock(fragment_buffer.buffer_lock); + std::unique_lock queue_lock(fragment_buffer.buffer_lock); fragment_buffer.push_front(std::move(fragment_entry)); } } void PacketDecoder::register_initiv_packet() { auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; - unique_lock buffer_lock(fragment_buffer.buffer_lock); + std::unique_lock buffer_lock(fragment_buffer.buffer_lock); fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */ } \ No newline at end of file diff --git a/server/src/client/voice/PacketEncoder.cpp b/server/src/client/voice/PacketEncoder.cpp index adf2357..6c70143 100644 --- a/server/src/client/voice/PacketEncoder.cpp +++ b/server/src/client/voice/PacketEncoder.cpp @@ -30,7 +30,7 @@ void PacketEncoder::reset() { category.queue.clear(); } - this->id_generator.reset(); + this->id_generator_.reset(); } bool PacketEncoder::encode_packet(const std::shared_ptr &original_packet, EncodeFlags flags) { @@ -191,7 +191,7 @@ PacketEncodeResult PacketEncoder::encode_packet_(std::string& error, /* apply packet ids */ for(const auto& fragment : fragments) { if(!fragment->memory_state.id_branded) - fragment->applyPacketId(this->id_generator); + fragment->applyPacketId(this->id_generator_); } work_lock.unlock(); /* the rest could be unordered */ diff --git a/server/src/client/voice/PacketEncoder.h b/server/src/client/voice/PacketEncoder.h index ca542a4..5a1b1d5 100644 --- a/server/src/client/voice/PacketEncoder.h +++ b/server/src/client/voice/PacketEncoder.h @@ -83,6 +83,8 @@ namespace ts::server::server::udp { bool encode_packet(const std::shared_ptr &/* the packet */, EncodeFlags /* flags */); bool do_encode(); + [[nodiscard]] inline protocol::PacketIdManager& id_generator() { return this->id_generator_; } + [[nodiscard]] inline std::shared_ptr get_statistics() { return this->statistics_; } inline void set_statistics(const std::shared_ptr& stats) { this->statistics_ = stats; } @@ -111,7 +113,7 @@ namespace ts::server::server::udp { /* ---------- Processing ---------- */ /* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */ - protocol::PacketIdManager id_generator{}; + protocol::PacketIdManager id_generator_{}; std::atomic process_count{0}; diff --git a/server/src/client/voice/PingHandler.cpp b/server/src/client/voice/PingHandler.cpp index a41e692..f1f2caf 100644 --- a/server/src/client/voice/PingHandler.cpp +++ b/server/src/client/voice/PingHandler.cpp @@ -4,4 +4,66 @@ #include "PingHandler.h" -using namespace ts::server::server::udp; \ No newline at end of file +using namespace ts::server::server::udp; + +void PingHandler::reset() { + this->last_ping_id = 0; + this->current_ping_ = std::chrono::milliseconds{0}; + + this->last_recovery_command_send = std::chrono::system_clock::time_point{}; + this->last_command_acknowledge_ = std::chrono::system_clock::time_point{}; + + this->last_response_ = std::chrono::system_clock::time_point{}; + this->last_request_ = std::chrono::system_clock::time_point{}; +} + +void PingHandler::received_pong(uint16_t ping_id) { + if(this->last_ping_id != ping_id) return; + + auto now = std::chrono::system_clock::now(); + this->current_ping_ = std::chrono::floor(this->last_request_ - now); + + this->last_response_ = now; + this->last_command_acknowledge_ = now; /* That's here for purpose!*/ +} + +void PingHandler::received_command_acknowledged() { + this->last_command_acknowledge_ = std::chrono::system_clock::now(); +} + +void PingHandler::tick(const std::chrono::system_clock::time_point& now) { + if(this->last_request_ + PingHandler::ping_request_interval < now) + this->send_ping_request(); /* may update last_response_ */ + + if(this->last_response_ + PingHandler::ping_timeout < now) { + if(this->last_recovery_command_send + PingHandler::recovery_request_interval < now) + this->send_recovery_request(); + + if(this->last_command_acknowledge_ + PingHandler::recovery_timeout < now) { + if(auto callback{this->callback_time_outed}; callback) + callback(this->callback_argument); + } + } +} + +void PingHandler::send_ping_request() { + auto now = std::chrono::system_clock::now(); + if(this->last_response_.time_since_epoch().count() == 0) + this->last_response_ = now; + + this->last_request_ = now; + + if(auto callback{this->callback_send_ping}; callback) + callback(this->callback_argument, this->last_ping_id); +} + +void PingHandler::send_recovery_request() { + auto now = std::chrono::system_clock::now(); + if(this->last_command_acknowledge_.time_since_epoch().count() == 0) + this->last_command_acknowledge_ = now; + + this->last_recovery_command_send = now; + + if(auto callback{this->callback_send_recovery_command}; callback) + callback(this->callback_argument); +} \ No newline at end of file diff --git a/server/src/client/voice/PingHandler.h b/server/src/client/voice/PingHandler.h index 34fe173..9b692f1 100644 --- a/server/src/client/voice/PingHandler.h +++ b/server/src/client/voice/PingHandler.h @@ -12,10 +12,9 @@ namespace ts::server::server::udp { void reset(); - void tick(); - void received_ping(uint16_t /* ping id */); - - void command_packet_acknowledged(); + void tick(const std::chrono::system_clock::time_point&); + void received_pong(uint16_t /* ping id */); + void received_command_acknowledged(); [[nodiscard]] inline std::chrono::milliseconds current_ping() const { return this->current_ping_; } @@ -24,12 +23,22 @@ namespace ts::server::server::udp { callback_send_recovery_command_t callback_send_recovery_command{nullptr}; callback_time_outed_t callback_time_outed{nullptr}; private: + constexpr static std::chrono::milliseconds ping_request_interval{2500}; + constexpr static std::chrono::milliseconds ping_timeout{10 * 1000}; + + constexpr static std::chrono::milliseconds recovery_request_interval{1000}; + constexpr static std::chrono::milliseconds recovery_timeout{10 * 1000}; + + std::chrono::milliseconds current_ping_{0}; + uint16_t last_ping_id{0}; - std::chrono::milliseconds current_ping_{0};; std::chrono::system_clock::time_point last_response_{}; std::chrono::system_clock::time_point last_request_{}; - std::chrono::system_clock::time_point last_command_packet_{}; + std::chrono::system_clock::time_point last_command_acknowledge_{}; std::chrono::system_clock::time_point last_recovery_command_send{}; + + void send_ping_request(); + void send_recovery_request(); }; } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClient.cpp b/server/src/client/voice/VoiceClient.cpp index d7fa6e3..a5dbb93 100644 --- a/server/src/client/voice/VoiceClient.cpp +++ b/server/src/client/voice/VoiceClient.cpp @@ -1,22 +1,19 @@ -#include #include #include #include #include -#include #include #include #include "VoiceClient.h" #include "src/VirtualServer.h" -#include "../../server/VoiceServer.h" using namespace std; using namespace std::chrono; using namespace ts::server; using namespace ts::protocol; -VoiceClient::VoiceClient(const std::shared_ptr& server, const sockaddr_storage* address) : SpeakingClient(server->server->sql, server->server), voice_server(server) { +VoiceClient::VoiceClient(const std::shared_ptr& server, const sockaddr_storage* address) : SpeakingClient(server->sql, server) { assert(address); memtrack::allocated(this); memcpy(&this->remote_address, address, sizeof(sockaddr_storage)); @@ -24,27 +21,21 @@ VoiceClient::VoiceClient(const std::shared_ptr& server, const socka debugMessage(this->server->getServerId(), " Creating VoiceClient instance at {}", (void*) this); } -void VoiceClient::initialize() { - this->event_handle_packet = make_shared>(dynamic_pointer_cast(this->ref()), &VoiceClient::execute_handle_packet); +VoiceClient::~VoiceClient() { + debugMessage(this->getServerId(), " Deleting VoiceClient instance at {}", (void*) this); + + this->state = ClientState::DISCONNECTED; + memtrack::freed(this); +} + +void VoiceClient::initialize(const std::shared_ptr &connection) { + assert(connection); + this->connection_ = connection; this->properties()[property::CLIENT_TYPE] = ClientType::CLIENT_TEAMSPEAK; this->properties()[property::CLIENT_TYPE_EXACT] = ClientType::CLIENT_TEAMSPEAK; - this->state = ConnectionState::INIT_HIGH; - this->connection = new connection::VoiceClientConnection(this); -} - -VoiceClient::~VoiceClient() { - debugMessage(this->getServerId(), " Deleting VoiceClient instance at {}", (void*) this); - - this->state = ConnectionState::DISCONNECTED; - delete this->connection; - this->connection = nullptr; - - if(this->flushing_thread) - logCritical(this->getServerId(), "Deleting a VoiceClient which should still be hold within the flush thread!"); - - memtrack::freed(this); + this->state = ClientState::INITIALIZING; } void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, bool direct, std::unique_ptr> listener) { @@ -61,60 +52,26 @@ void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, bool direc packet->enable_flag(protocol::PacketFlag::NewProtocol); } packet->setListener(std::move(listener)); - this->connection->send_packet(packet, false, direct); + + if(auto connection{this->connection_}; connection) + connection->send_packet(packet, false, direct); #ifdef PKT_LOG_CMD logTrace(this->getServerId(), "{}[Command][Server -> Client] Sending command {}. Command low: {}. Full command: {}", CLIENT_STR_LOG_PREFIX, cmd.substr(0, cmd.find(' ')), low, cmd); #endif } -void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) { - char buffer[2]; - le2be16(packetId, buffer); - - auto packet = make_shared(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2}); - packet->enable_flag(PacketFlag::Unencrypted); - if(!low) packet->enable_flag(protocol::PacketFlag::NewProtocol); - this->connection->send_packet(packet); -#ifdef PKT_LOG_ACK - logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId); -#endif -} void VoiceClient::tick(const std::chrono::system_clock::time_point &time) { SpeakingClient::tick(time); - this->connection->tick(); + { ALARM_TIMER(A1, "VoiceClient::tick", milliseconds(3)); - if(this->state == ConnectionState::CONNECTED) { - if(this->lastPingRequest > this->lastPingResponse) { //Client is behind :) - if(this->lastPingRequest - this->lastPingResponse > chrono::seconds(20)) { - debugMessage(this->getServerId(), "{} Got a ping timeout. (Last successful ping: {}ms ago. Last request {}ms. Last response {}ms). Trying to recover via command acknowledge.", - CLIENT_STR_LOG_PREFIX, - duration_cast(this->lastPingRequest - this->lastPingResponse).count(), - duration_cast(time - this->lastPingRequest).count(), - duration_cast(time - this->lastPingResponse).count()); - - bool force; - this->request_connection_info(nullptr, force); - this->lastPingResponse = system_clock::now(); - return; - } - } - if(time - this->lastPingRequest >= chrono::milliseconds(1000)) { - //TODO calculate the ping smooth - if(this->lastPingResponse < this->lastPingRequest){ - if(time - this->lastPingRequest >= chrono::milliseconds(1500)) { //Max - this->sendPingRequest(); - } - } else - this->sendPingRequest(); - } - } else if(this->state == ConnectionState::INIT_LOW || this->state == ConnectionState::INIT_HIGH) { + //TODO! + if(this->state == ClientState::INITIALIZING) { if(this->last_packet_handshake.time_since_epoch().count() != 0) { if(time - this->last_packet_handshake > seconds(5)) { - debugMessage(this->getServerId(), "{} Got handshake timeout. {}. State: {} Time: {}", CLIENT_STR_LOG_PREFIX, + debugMessage(this->getServerId(), "{} Got initialize timeout. {}. Time: {}", CLIENT_STR_LOG_PREFIX, this->getLoggingPeerIp() + ":" + to_string(this->getPeerPort()), - this->state == ConnectionState::INIT_HIGH ? "INIT_HIGH" : "INIT_LOW", duration_cast(time - this->last_packet_handshake).count() ); this->close_connection(system_clock::now() + seconds(1)); @@ -134,18 +91,18 @@ bool VoiceClient::disconnect(ts::ViewReasonId reason_id, const std::string &reas * Its only for the clients own flavour and everything which the client receives after will be ignored :) */ - ConnectionState old_state{}; + ClientState old_state{}; { std::lock_guard state_lock{this->state_lock}; - if(this->state == ConnectionState::DISCONNECTING || this->state == ConnectionState::DISCONNECTED) + if(this->state == ClientState::DISCONNECTED) return false; //Already disconnecting/disconnected old_state = this->state; - this->state = ConnectionState::DISCONNECTING; + this->state = ClientState::DISCONNECTED; } - if(old_state == ConnectionState::CONNECTED) { - /* Client has been successflly initialized; Send normal disconnect. */ + if(old_state == ClientState::CONNECTED) { + /* Client has been successfully initialized; Send normal disconnect. */ Command cmd("notifyclientleftview"); cmd["reasonmsg"] = reason; @@ -194,120 +151,67 @@ bool VoiceClient::disconnect(ts::ViewReasonId reason_id, const std::string &reas } bool VoiceClient::close_connection(const system_clock::time_point &timeout) { - auto self_lock = dynamic_pointer_cast(_this.lock()); - assert(self_lock); //Should never happen! - - bool flush = timeout.time_since_epoch().count() > 0; { std::lock_guard state_lock{this->state_lock}; - - if(this->state == ConnectionState::DISCONNECTED) return false; - else if(this->state == ConnectionState::DISCONNECTING) { - /* here is nothing to pay attention for */ - } else if(this->state == ConnectionState::DISCONNECTING_FLUSHING) { - if(!flush) { - this->state = ConnectionState::DISCONNECTED; - return true; /* the flush thread will execute the final disconnect */ - } else { - //TODO: May update the flush timeout if its less then the other one? - return true; - } - } - this->state = flush ? ConnectionState::DISCONNECTING_FLUSHING : ConnectionState::DISCONNECTED; + this->state = ClientState::DISCONNECTED; } - debugMessage(this->getServerId(), "{} Closing voice client connection. (Flush: {})", CLIENT_STR_LOG_PREFIX, flush); - //TODO: Move this out into a thread pool? - this->flushing_thread = std::make_shared(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [this, self_lock, timeout, flush]{ - { - /* Await that all commands have been processed. It does not make sense to unregister the client while command handling. */ - std::lock_guard cmd_lock{this->command_lock}; - } - - if(flush) { - debugMessage(this->getServerId(), "{} Awaiting write prepare, write and acknowledge queue flushed", CLIENT_STR_LOG_PREFIX); - while(this->state == DISCONNECTING_FLUSHING) { - if(system_clock::now() > timeout){ - auto write_queue_flushed = this->connection->wait_empty_write_and_prepare_queue(timeout); - auto acknowledge_received = connection->acknowledge_handler.awaiting_acknowledge() == 0; - - if(write_queue_flushed && acknowledge_received) - break; - - debugMessage(this->getServerId(), "{} Failed to flush pending messages. Acknowledges pending: {} Buffers pending: {}", CLIENT_STR_LOG_PREFIX, acknowledge_received, write_queue_flushed); - break; - } - if(!this->connection->wait_empty_write_and_prepare_queue(timeout)) - continue; - - if(connection->acknowledge_handler.awaiting_acknowledge() > 0) { - usleep(5000); - continue; - } - debugMessage(this->getServerId(), "{} Write and acknowledge queue are flushed", CLIENT_STR_LOG_PREFIX); - break; - } - } - - if(this->state > DISCONNECTING) /* it could happen that the client "reconnects" while flushing this shit */ - this->finalDisconnect(); - }); - flushing_thread->name("Flush thread VC").execute(); + auto connection = std::exchange(this->connection_, nullptr); + if(connection) connection->close_connection(timeout); return true; } -void VoiceClient::finalDisconnect() { +void VoiceClient::finalize() { + /* there could not happen any IO while we're doing finalize()! */ auto ownLock = dynamic_pointer_cast(_this.lock()); assert(ownLock); - lock_guard disconnect_lock_final(this->finalDisconnectLock); +#if 0 if(this->final_disconnected) { logError(this->getServerId(), "Tried to final disconnect {}/{} twice", this->getLoggingPeerIp() + ":" + to_string(this->getPeerPort()), this->getDisplayName()); return; } - this->final_disconnected = true; - this->state = ConnectionState::DISCONNECTED; +#endif + this->state = ClientState::DISCONNECTED; threads::MutexLock command_lock(this->command_lock); //We should not progress any commands while disconnecting //Unload manager cache this->processLeave(); - { - if(this->flushing_thread) this->flushing_thread->detach(); //The thread itself should be already done or executing this method - this->flushing_thread.reset(); - } - if(this->voice_server) this->voice_server->unregisterConnection(ownLock); -} - -void VoiceClient::execute_handle_packet(const std::chrono::system_clock::time_point &time) { - this->connection->execute_handle_command_packets(time); } void VoiceClient::send_voice_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) { auto packet = make_shared(PacketTypeInfo::Voice, voice_buffer.length()); { PacketFlag::PacketFlags packet_flags = PacketFlag::None; - packet_flags |= flags.encrypted ? 0 : PacketFlag::Unencrypted; - packet_flags |= flags.head ? PacketFlag::Compressed : 0; - packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0; - packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0; + packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted; + packet_flags |= flags.head ? PacketFlag::Compressed : 0U; + packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U; + packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U; packet->set_flags(packet_flags); } memcpy(packet->data().data_ptr(), voice_buffer.data_ptr(), voice_buffer.length()); - this->connection->send_packet(packet, false, false); + if(auto connection{this->connection_}; connection) + connection->send_packet(packet, false, false); } void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) { auto packet = make_shared(PacketTypeInfo::VoiceWhisper, voice_buffer.length()); { PacketFlag::PacketFlags packet_flags = PacketFlag::None; - packet_flags |= flags.encrypted ? 0 : PacketFlag::Unencrypted; - packet_flags |= flags.head ? PacketFlag::Compressed : 0; - packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0; - packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0; + packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted; + packet_flags |= flags.head ? PacketFlag::Compressed : 0U; + packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U; + packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U; packet->set_flags(packet_flags); } memcpy(packet->data().data_ptr(), voice_buffer.data_ptr(), voice_buffer.length()); - this->connection->send_packet(packet, false, false); + if(auto connection{this->connection_}; connection) + connection->send_packet(packet, false, false); +} + +std::chrono::milliseconds VoiceClient::calculatePing() { + auto connection = this->connection_; + return connection ? connection->ping_handler_.current_ping() : std::chrono::milliseconds{0}; } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClient.h b/server/src/client/voice/VoiceClient.h index 5a3d49d..c885e28 100644 --- a/server/src/client/voice/VoiceClient.h +++ b/server/src/client/voice/VoiceClient.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include "../SpeakingClient.h" #include "../ConnectedClient.h" @@ -42,12 +41,10 @@ namespace ts { class VoiceClient : public SpeakingClient { friend class VirtualServer; friend class VoiceServer; - friend class ts::server::server::udp::POWHandler; friend class ts::connection::VoiceClientConnection; friend class ConnectedClient; - friend class io::IOServerHandler; public: - VoiceClient(const std::shared_ptr& server,const sockaddr_storage*); + VoiceClient(const std::shared_ptr& server,const sockaddr_storage*); ~VoiceClient(); bool close_connection(const std::chrono::system_clock::time_point &timeout) override; @@ -60,43 +57,29 @@ namespace ts { /* Note: Order is only guaranteed if progressDirectly is on! */ virtual void sendCommand0(const std::string_view& /* data */, bool low = false, bool progressDirectly = false, std::unique_ptr> listener = nullptr); - connection::VoiceClientConnection* getConnection(){ return connection; } - std::shared_ptr getVoiceServer(){ return voice_server; } - std::chrono::milliseconds calculatePing(){ return ping; } + /* the connection might be null! */ + [[nodiscard]] inline auto connection() { return this->connection_; } + [[nodiscard]] std::chrono::milliseconds calculatePing(); private: - connection::VoiceClientConnection* connection; + std::shared_ptr connection_{nullptr}; protected: - std::shared_ptr voice_server; + void initialize(const std::shared_ptr& /* connection */); + void finalize(); - void initialize(); virtual void tick(const std::chrono::system_clock::time_point &time) override; - void handlePacketCommand(const pipes::buffer_view&); - //Handshake helpers - - public: void send_voice_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override; void send_voice_whisper_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override; protected: + void handlePacketCommand(const pipes::buffer_view&); virtual command_result handleCommand(Command &command) override; - //Ping/pong - uint16_t lastPingId = 0; - std::chrono::milliseconds ping = std::chrono::milliseconds(0); - std::chrono::system_clock::time_point lastPingResponse; - std::chrono::system_clock::time_point lastPingRequest; - std::chrono::system_clock::time_point last_packet_handshake; private: - int socket = 0; - io::pktinfo_storage address_info; - - void finalDisconnect(); - bool final_disconnected = false; //General TS3 manager commands command_result handleCommandClientInitIv(Command&); @@ -104,9 +87,6 @@ namespace ts { command_result handleCommandClientInit(Command&) override; command_result handleCommandClientDisconnect(Command&); - //Locked by finalDisconnect, disconnect and close connection - std::shared_ptr flushing_thread; - struct { bool client_init = false; bool new_protocol = false; @@ -118,8 +98,11 @@ namespace ts { std::shared_ptr remote_key; } crypto; - std::shared_ptr> event_handle_packet; - void execute_handle_packet(const std::chrono::system_clock::time_point& /* scheduled */); + enum struct CryptoHandshakeState { + INITEV, + CLIENT_EK, + DONE + } crypto_handshake_state{CryptoHandshakeState::INITEV}; }; } } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientCommandHandler.cpp b/server/src/client/voice/VoiceClientCommandHandler.cpp index f4741cf..1dd3556 100644 --- a/server/src/client/voice/VoiceClientCommandHandler.cpp +++ b/server/src/client/voice/VoiceClientCommandHandler.cpp @@ -17,10 +17,10 @@ using namespace ts; command_result VoiceClient::handleCommand(ts::Command &command) { threads::MutexLock l2(this->command_lock); - if(this->state == ConnectionState::DISCONNECTED) return command_result{error::client_not_logged_in}; + if(this->state == ClientState::DISCONNECTED) return command_result{error::client_not_logged_in}; if(!this->voice_server) return command_result{error::server_unbound}; - if(this->state == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED) { + if(this->state == ClientState::INITIALIZING && this->crypto_handshake_state == CryptoHandshakeState::DONE) { if(command.command() == "clientinit") return this->handleCommandClientInit(command); } else if(command.command() == "clientdisconnect") diff --git a/server/src/client/voice/VoiceClientConnection.cpp b/server/src/client/voice/VoiceClientConnection.cpp index 61d9121..9b5359a 100644 --- a/server/src/client/voice/VoiceClientConnection.cpp +++ b/server/src/client/voice/VoiceClientConnection.cpp @@ -1,12 +1,13 @@ #include #include #include -#include "../../server/VoiceServer.h" #include #include #include #include "./VoiceClientConnection.h" #include "./VoiceClient.h" +#include "src/server/udp-server/UDPServer.h" +#include "src/InstanceHandler.h" using namespace std; using namespace std::chrono; @@ -15,7 +16,9 @@ using namespace ts::connection; using namespace ts::protocol; using namespace ts::server; -VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : +VoiceClientConnection::VoiceClientConnection(server::server::udp::Server* server, const std::shared_ptr& client, int socket) : + socket{socket}, + udp_server{server}, packet_encoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler}, packet_decoder_{&this->crypt_handler} { memtrack::allocated(this); @@ -43,15 +46,26 @@ VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : this->ping_handler_.callback_send_ping = [](auto _this, auto& a1) { reinterpret_cast(_this)->send_packet_ping(a1); }; - //TODO: The two other callbacks! + this->ping_handler_.callback_send_recovery_command = [](auto _this) { + reinterpret_cast(_this)->send_packet_ping_recovery(); + }; + this->ping_handler_.callback_time_outed = [](auto _this) { + reinterpret_cast(_this)->handle_ping_timeout(); + }; this->server_id = client->getServerId(); - this->client_handle = client; + this->client_handle_ = client; this->crypt_handler.reset(); debugMessage(this->server_id, "Allocated new voice client connection at {}", (void*) this); } +void VoiceClientConnection::initialize(const std::shared_ptr &self) { + assert(&*self == this); + this->weak_this = self; + this->event_handle_packet = make_shared>(self, &VoiceClientConnection::execute_handle_command_packets); +} + VoiceClientConnection::~VoiceClientConnection() { debugMessage(this->server_id, "Deleted voice client connection at {}", (void*) this); @@ -61,16 +75,25 @@ VoiceClientConnection::~VoiceClientConnection() { this->write_queue.clear(); } - this->client_handle = nullptr; + this->client_handle_ = nullptr; memtrack::freed(this); } -void VoiceClientConnection::register_client_for_write() { - std::shared_lock client_lock{this->client_mutex}; - if(!this->client_handle) return; +void VoiceClientConnection::register_for_write() { + auto self = this->weak_this.lock(); + assert(self); + this->udp_server->schedule_client_write(self); +} - if(this->client_handle->voice_server) - this->client_handle->voice_server->triggerWrite(dynamic_pointer_cast(this->client_handle->_this.lock())); +void VoiceClientConnection::register_for_command_handling() { + auto vmanager = serverInstance->getVoiceServerManager(); + if(!vmanager) + return; + auto evloop = vmanager->get_executor_loop(); + if(!evloop) + return; + + evloop->schedule(this->event_handle_packet); } #ifdef CLIENT_LOG_PREFIX @@ -82,14 +105,7 @@ void VoiceClientConnection::register_client_for_write() { void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) { auto command_pending = this->packet_decoder_.decode_incoming_data(buffer); - if(command_pending) { - std::shared_lock clock{this->client_mutex}; - if(!this->client_handle) return; //TODO: Warn etc? - - auto voice_server = this->client_handle->voice_server; - if(voice_server) - voice_server->schedule_command_handling(this->client_handle); - } + if(command_pending) this->register_for_command_handling(); } bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer) { @@ -98,20 +114,27 @@ bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer) void VoiceClientConnection::handle_decoded_packet(const ts::protocol::ClientPacketParser &packet) { auto packet_type = packet.type(); - if(packet_type == PacketType::VOICE ) { - std::shared_lock clock{this->client_mutex}; - if(!this->client_handle) return; //TODO: Warn etc? + if(packet_type == PacketType::VOICE) { + auto client = this->client_handle_; + if(!client) [[unlikely]] { + logWarning(this->server_id, "Received voice data for client, but we've no client associated with the connection."); + return; + } - this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); + client->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); } else if(packet_type == PacketType::VOICE_WHISPER) { - std::shared_lock clock{this->client_mutex}; - if(!this->client_handle) return; //TODO: Warn etc? + auto client = this->client_handle_; + if(!client) [[unlikely]] { + logWarning(this->server_id, "Received voice whisper data for client, but we've no client associated with the connection."); + return; + } - this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); + client->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); } else if(packet_type == PacketType::ACK || packet_type == PacketType::ACK_LOW) { string error{}; if(!this->acknowledge_handler.process_acknowledge(packet.type(), packet.payload(), error)) debugMessage(this->server_id, "{} Failed to handle acknowledge: {}", this->client_log_prefix(), error); + this->ping_handler_.received_command_acknowledged(); } else if(packet_type == PacketType::PING) { /* just send a pong response */ char buffer[2]; @@ -122,8 +145,7 @@ void VoiceClientConnection::handle_decoded_packet(const ts::protocol::ClientPack } else if(packet_type == PacketType::PONG) { if(packet.payload_length() < 2) return; - uint16_t ping_id = be2le16((char*) packet.payload().data_ptr()); - //TODO: Ping handler handle ping + this->ping_handler_.received_pong(be2le16((char*) packet.payload().data_ptr())); } else if(packet_type == PacketType::COMMAND || packet_type == PacketType::COMMAND_LOW) { logCritical(this->server_id, "{} Received command packet within handle_decoded_packet callback.", this->client_log_prefix()); } else if(packet_type == PacketType::INIT1) { @@ -188,12 +210,22 @@ void VoiceClientConnection::send_packet_ping(uint16_t& ping_id) { ping_id = packet->packetId(); } +void VoiceClientConnection::send_packet_ping_recovery() { + const char* command = "notifyserverpingrecovery"; + auto packet = make_shared(protocol::PacketTypeInfo::Command, + pipes::buffer_view{(void*) command, strlen(command)} + ); + this->send_packet(packet); +} + void VoiceClientConnection::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) { if((int) this->connection_state_ >= (int) ClientConnectionState::DISCONNECTING) return; - std::shared_lock clock{this->client_handle}; - if(!this->client_handle) return; //TODO: Warn etc? + auto client = this->client_handle_; + if(!client) [[unlikely]] + return; + std::lock_guard clock{client->command_lock}; using CommandReassembleResult = ts::server::server::udp::CommandReassembleResult; @@ -218,7 +250,7 @@ void VoiceClientConnection::execute_handle_command_packets(const std::chrono::sy auto startTime = system_clock::now(); try { - this->client_handle->handlePacketCommand(payload); + client->handlePacketCommand(payload); } catch (std::exception& ex) { logCritical(this->server_id, "{} An exception has been thrown within command handling, which reached to root handler. This should not happen! (Message: {})", this->client_log_prefix(), ex.what()); } @@ -237,11 +269,8 @@ void VoiceClientConnection::execute_handle_command_packets(const std::chrono::sy ); } - if(command_status == CommandReassembleResult::MORE_COMMANDS_PENDING) { - auto voice_server = this->client_handle->voice_server; - if(voice_server) - voice_server->schedule_command_handling(this->client_handle); - } + if(command_status == CommandReassembleResult::MORE_COMMANDS_PENDING) + this->register_for_command_handling(); } @@ -257,7 +286,7 @@ void VoiceClientConnection::send_packet(const shared_ptr flags |= (unsigned) EncodeFlags::sync; if(this->packet_encoder_.encode_packet(original_packet, (EncodeFlags) flags)) - this->register_client_for_write(); + this->register_for_write(); } void VoiceClientConnection::handle_encode_error(const shared_ptr &packet, @@ -290,7 +319,7 @@ void VoiceClientConnection::handle_encoded_buffers(const std::vectorwrite_queue_lock}; this->write_queue.insert(this->write_queue.begin(), buffers.begin(), buffers.end()); } - this->register_client_for_write(); + this->register_for_write(); } bool VoiceClientConnection::encode_packets() { @@ -321,41 +350,6 @@ WriteBufferStatus VoiceClientConnection::pop_write_buffer(pipes::buffer& target) return size > 1 ? WriteBufferStatus::BUFFERS_LEFT : WriteBufferStatus::EMPTY; } -bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point until) { - while(true) { -#if 0 - for(auto& queue : this->write_preprocess_queues) { - { - lock_guard lock{queue.queue_lock}; - if(!queue.queue.empty()) - goto _wait; - } - - { - unique_lock lock{queue.work_lock, try_to_lock}; - if(!lock.owns_lock()) - goto _wait; - } - } - { - lock_guard buffer_lock{this->write_queue_lock}; - if(!this->write_queue.empty()) - goto _wait; - if(this->prepare_process_count != 0) - goto _wait; - } -#endif - break; - - _wait: - if(until.time_since_epoch().count() != 0 && system_clock::now() > until) - return false; - - threads::self::sleep_for(milliseconds(5)); - } - return true; -} - void VoiceClientConnection::reset() { this->packet_encoder_.reset(); this->packet_decoder_.reset(); @@ -366,15 +360,28 @@ void VoiceClientConnection::reset() { void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) { this->packet_decoder_.force_insert_command(buffer); + this->register_for_command_handling(); +} - std::shared_lock clock{this->client_handle}; - if(!this->client_handle) return; //TODO: Warn etc? +void VoiceClientConnection::close_connection(const std::chrono::system_clock::time_point &timeout) { + //TODO! + if(timeout.time_since_epoch().count() > 0) { - auto voice_server = this->client_handle->voice_server; - if(voice_server) - voice_server->schedule_command_handling(this->client_handle); + } else { + this->connection_state_ = ClientConnectionState::DISCONNECTED; + + /* Unregister connection from server */ + } } void VoiceClientConnection::tick() { - //TODO: Tick ping handler + auto now = std::chrono::system_clock::now(); + this->ping_handler_.tick(now); + + if(this->connection_state_ == ClientConnectionState::DISCONNECTING) { + //TODO! + if(now > this->disconnect_timeout_) { + //TODO! + } + } } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index 43e4f4f..a9f3a59 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -30,7 +30,9 @@ namespace ts { namespace server { class VoiceClient; - class VoiceServer; + namespace server::udp { + class Server; + } } namespace connection { @@ -44,7 +46,7 @@ namespace ts { }; enum struct ClientConnectionState { - INITIALITZING, /* crypto setup */ + INITIALIZING, /* crypto setup */ CONNECTED, /* basic connection has been established */ DISCONNECTING, /* connection is already disconnecting */ DISCONNECTED /* connection has been (maybe successfully) closed */ @@ -54,12 +56,19 @@ namespace ts { friend class server::VoiceServer; friend class server::VoiceClient; public: - explicit VoiceClientConnection(server::VoiceClient*); + explicit VoiceClientConnection(server::server::udp::Server*, const std::shared_ptr&, int /* socket */); virtual ~VoiceClientConnection(); + void initialize(const std::shared_ptr& /* self */); + [[nodiscard]] inline CryptHandler* getCryptHandler(){ return &crypt_handler; } //[[nodiscard]] inline server::VoiceClient* getClient(){ return client; } + [[nodiscard]] inline server::server::udp::PacketEncoder& packet_encoder() { return this->packet_encoder_; } + [[nodiscard]] inline server::server::udp::PacketDecoder& packet_decoder() { return this->packet_decoder_; } + + [[nodiscard]] inline ClientConnectionState connection_state() const { return this->connection_state_; } + void send_packet(const std::shared_ptr& original_packet, bool copy = false, bool prepare_directly = false); /* @@ -68,31 +77,38 @@ namespace ts { */ bool encode_packets(); - [[nodiscard]] inline server::server::udp::PacketEncoder& packet_encoder() { return this->packet_encoder_; } - [[nodiscard]] inline server::server::udp::PacketDecoder& packet_decoder() { return this->packet_decoder_; } - /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ [[nodiscard]] WriteBufferStatus pop_write_buffer(pipes::buffer& /* buffer */); - bool wait_empty_write_and_prepare_queue(std::chrono::time_point until = std::chrono::time_point()); + /* a flush timout less than now will cause the client to close the connection instantly */ + void close_connection(const std::chrono::system_clock::time_point& /* flush timeout */); void reset(); - void tick(); + void tick(); /* called via the UDP server tick */ void force_insert_command(const pipes::buffer_view& /* payload */); void send_packet_acknowledge(uint16_t /* packet id */, bool /* is command low */); void send_packet_ping(uint16_t& /* ping id */); + void send_packet_ping_recovery(); protected: void handle_incoming_datagram(const pipes::buffer_view &buffer); bool verify_encryption(const pipes::buffer_view& /* full packet */); - void register_client_for_write(); + void register_for_write(); + void register_for_command_handling(); private: - VirtualServerId server_id{0}; - std::shared_mutex client_mutex{}; - server::VoiceClient* client_handle{nullptr}; + std::weak_ptr weak_this{}; - ClientConnectionState connection_state_{ClientConnectionState::INITIALITZING}; + server::server::udp::Server* udp_server; + int socket{0}; + io::pktinfo_storage address_info{}; + + VirtualServerId server_id{0}; + /* may change at any given time. */ + std::shared_ptr client_handle_{nullptr}; + + ClientConnectionState connection_state_{ClientConnectionState::INITIALIZING}; + std::chrono::system_clock::time_point disconnect_timeout_{}; CryptHandler crypt_handler{}; CompressionHandler compress_handler{}; @@ -105,6 +121,10 @@ namespace ts { server::server::udp::PacketEncoder packet_encoder_; server::server::udp::PacketDecoder packet_decoder_; server::server::udp::PingHandler ping_handler_{}; + + + std::shared_ptr> event_handle_packet; + //Handle stuff [[nodiscard]] std::string client_log_prefix(); void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */); @@ -116,6 +136,8 @@ namespace ts { /* will be called on the IO thread */ void handle_decoded_packet(const protocol::ClientPacketParser&); void handle_decode_error(ts::server::server::udp::PacketDecodeResult /* error */, const std::string& /* custom message */); + + void handle_ping_timeout(); }; } } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientHandschake.cpp b/server/src/client/voice/VoiceClientHandschake.cpp index cd96d11..b055abe 100644 --- a/server/src/client/voice/VoiceClientHandschake.cpp +++ b/server/src/client/voice/VoiceClientHandschake.cpp @@ -23,10 +23,13 @@ inline void generate_random(uint8_t *destination, size_t length) { } ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { + auto connection = this->connection_; + if(!connection) return ts::command_result{error::vs_critical}; + this->last_packet_handshake = system_clock::now(); std::unique_lock state_lock{this->state_lock}; - if(this->state == ConnectionState::CONNECTED) { /* we've a reconnect */ + if(this->state == ClientState::CONNECTED) { /* we've a reconnect */ if(system_clock::now() - this->lastPingResponse < seconds(5)) { logMessage(this->getServerId(), "{} Client initialized session reconnect, but last ping response is not older then 5 seconds ({}). Ignoring attempt", CLIENT_STR_LOG_PREFIX, @@ -53,24 +56,24 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { this->server->client_move(this->ref(), nullptr, nullptr, config::messages::timeout::connection_reinitialized, ViewReasonId::VREASON_TIMEOUT, false, server_channel_lock); } - this->finalDisconnect(); + this->finalize(); state_lock.lock(); - } else if(this->state >= ConnectionState::DISCONNECTING) { - state_lock.unlock(); - std::shared_lock disconnect_finish{this->finalDisconnectLock}; /* await until the last disconnect has been processed */ - state_lock.lock(); - this->state = ConnectionState::INIT_HIGH; - } else if(this->state == ConnectionState::INIT_HIGH) { - logTrace(this->getServerId(), "{} Received a duplicated initiv. It seems like our initivexpand2 hasn't yet reached the client. The acknowledge handler should handle this issue for us.", CLIENT_STR_LOG_PREFIX); - return command_result{error::ok}; + } else if(this->state == ClientState::DISCONNECTED) { + this->state = ClientState::INITIALIZING; + this->crypto_handshake_state = CryptoHandshakeState::INITEV; + connection->reset(); } else { - this->state = ConnectionState::INIT_HIGH; + assert(this->state == ClientState::INITIALIZING); + if(this->crypto_handshake_state != CryptoHandshakeState::INITEV) { + logTrace(this->getServerId(), "{} Received a duplicated initiv. It seems like our initivexpand2 hasn't yet reached the client. The acknowledge handler should handle this issue for us.", CLIENT_STR_LOG_PREFIX); + return command_result{error::ok}; + } } state_lock.unlock(); - this->connection->reset(); - this->connection->packet_decoder().register_initiv_packet(); - this->connection->packet_decoder().set_protocol_encrypted(false); + connection->reset(); + connection->packet_decoder().register_initiv_packet(); + connection->packet_decoder().set_protocol_encrypted(false); bool use_teaspeak = command.hasParm("teaspeak"); if(use_teaspeak ? !config::server::clients::teaspeak : !config::server::clients::teamspeak) @@ -145,6 +148,7 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { initivexpand2["ot"] = 1; this->sendCommand(initivexpand2); + this->crypto_handshake_state = CryptoHandshakeState::CLIENT_EK; this->handshake.state = HandshakeState::SUCCEEDED; /* we're doing the verify via TeamSpeak */ } else { debugMessage(this->getServerId(), "{} Got non client 3.1 protocol with build timestamp {}", CLIENT_STR_LOG_PREFIX, this->crypto.client_time); @@ -167,34 +171,41 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { this->handshake.state = HandshakeState::SUCCEEDED; /* we're doing the verify via TeamSpeak */ } this->sendCommand0(initivexpand.build(), false, true); //If we setup the encryption now + this->crypto_handshake_state = CryptoHandshakeState::DONE; } { string error; - if(!this->connection->getCryptHandler()->setupSharedSecret(this->crypto.alpha, this->crypto.beta, this->crypto.remote_key.get(), this->server->serverKey(), error)){ + if(!connection->getCryptHandler()->setupSharedSecret(this->crypto.alpha, this->crypto.beta, this->crypto.remote_key.get(), this->server->serverKey(), error)){ logError(this->server->getServerId(), "Could not setup shared secret! (" + error + ")"); return ts::command_result{error::vs_critical}; } - auto& decoder = this->connection->packet_decoder(); + auto& decoder = connection->packet_decoder(); decoder.set_protocol_encrypted(true); } } + return ts::command_result{error::ok}; } ts::command_result VoiceClient::handleCommandClientEk(Command& cmd) { + auto connection = this->connection_; + if(!connection) return ts::command_result{error::vs_critical}; + if(this->crypto_handshake_state != CryptoHandshakeState::CLIENT_EK) return ts::command_result{error::client_hacked}; + this->last_packet_handshake = system_clock::now(); debugMessage(this->getServerId(), "{} Got client ek!", CLIENT_STR_LOG_PREFIX); auto client_key = base64::decode(cmd["ek"]); auto private_key = this->crypto.chain_data->chain->generatePrivateKey(this->crypto.chain_data->root_key, this->crypto.chain_data->root_index); - this->connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data()); - this->connection->acknowledge_handler.reset(); + connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data()); + connection->acknowledge_handler.reset(); + this->crypto_handshake_state = CryptoHandshakeState::DONE; - auto& decoder = this->connection->packet_decoder(); + auto& decoder = connection->packet_decoder(); decoder.set_protocol_encrypted(true); - this->connection->send_packet_acknowledge(1, false); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop) + connection->send_packet_acknowledge(1, false); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop) return ts::command_result{error::ok}; } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientPacketHandler.cpp b/server/src/client/voice/VoiceClientPacketHandler.cpp index 89b8b19..435096d 100644 --- a/server/src/client/voice/VoiceClientPacketHandler.cpp +++ b/server/src/client/voice/VoiceClientPacketHandler.cpp @@ -1,7 +1,6 @@ #include #include -#include "../web/WebClient.h" -#include "VoiceClient.h" +#include "./VoiceClient.h" using namespace std; using namespace std::chrono; @@ -10,6 +9,11 @@ using namespace ts::protocol; //#define PKT_LOG_PING void VoiceClient::handlePacketCommand(const pipes::buffer_view& command_string) { + { + std::lock_guard slock{this->state_lock}; + if(this->state == ClientState::DISCONNECTED) return; + } + std::unique_ptr command; command_result result{}; try { diff --git a/server/src/client/web/WSWebClient.cpp b/server/src/client/web/WSWebClient.cpp index 3cbf0a6..cb58481 100644 --- a/server/src/client/web/WSWebClient.cpp +++ b/server/src/client/web/WSWebClient.cpp @@ -125,7 +125,7 @@ inline bool is_ssl_handshake_header(const pipes::buffer_view& buffer) { void WebClient::processNextMessage(const std::chrono::system_clock::time_point& /* scheduled */) { lock_guard execute_lock(this->execute_lock); - if(this->state != ConnectionState::INIT_HIGH && this->state != ConnectionState::INIT_LOW && this->state != ConnectionState::CONNECTED) + if(this->state != ClientState::INIT_HIGH && this->state != ClientState::INITIALIZING && this->state != ClientState::CONNECTED) return; unique_lock buffer_lock(this->queue_lock); diff --git a/server/src/client/web/WebClient.cpp b/server/src/client/web/WebClient.cpp index 300f09e..79c2dd9 100644 --- a/server/src/client/web/WebClient.cpp +++ b/server/src/client/web/WebClient.cpp @@ -26,7 +26,7 @@ WebClient::WebClient(WebControlServer* server, int fd) : SpeakingClient(server-> memtrack::allocated(this); assert(server->getTS()); - this->state = ConnectionState::INIT_LOW; + this->state = ClientState::INITIALIZING; this->file_descriptor = fd; } @@ -178,9 +178,9 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti assert(self_lock); unique_lock state_lock(this->state_lock); - if(this->state == ConnectionState::DISCONNECTED) return false; - if(this->state == ConnectionState::DISCONNECTING && flushing) return true; - this->state = flushing ? ConnectionState::DISCONNECTING : ConnectionState::DISCONNECTED; + if(this->state == ClientState::DISCONNECTED) return false; + if(this->state == ClientState::DISCONNECTING && flushing) return true; + this->state = flushing ? ClientState::DISCONNECTING : ClientState::DISCONNECTED; unique_lock close_lock(this->close_lock); state_lock.unlock(); @@ -203,7 +203,7 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti while(true) { { lock_guard lock(self_lock->state_lock); - if(self_lock->state != ConnectionState::DISCONNECTING) return; /* somebody else had this problem now */ + if(self_lock->state != ClientState::DISCONNECTING) return; /* somebody else had this problem now */ } flag_flushed = true; @@ -226,8 +226,8 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti { lock_guard lock(self_lock->state_lock); - if(self_lock->state != ConnectionState::DISCONNECTING) return; /* somebody else had this problem now */ - self_lock->state = ConnectionState::DISCONNECTED; + if(self_lock->state != ClientState::DISCONNECTING) return; /* somebody else had this problem now */ + self_lock->state = ClientState::DISCONNECTED; } /* we can lock here again because we've already ensured that we're still disconnecting and updated the status to disconnected. * So no thread will wait for this thread while close_lock had been locked @@ -246,7 +246,7 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti } command_result WebClient::handleCommand(Command &command) { - if(this->connectionState() == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED){ + if(this->connectionState() == ClientState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED){ if(command.command() == "clientinit") { auto result = this->handleCommandClientInit(command); if(result.error_code()) @@ -289,7 +289,7 @@ void WebClient::tick(const std::chrono::system_clock::time_point& point) { } void WebClient::onWSConnected() { - this->state = ConnectionState::INIT_HIGH; + this->state = ClientState::INIT_HIGH; this->handshake.state = HandshakeState::BEGIN; debugMessage(this->getServerId(), "{} WebSocket handshake completed!", CLIENT_STR_LOG_PREFIX); //TODO here! diff --git a/server/src/lincense/LicenseService.cpp b/server/src/lincense/LicenseService.cpp index d4849b5..b5669cc 100644 --- a/server/src/lincense/LicenseService.cpp +++ b/server/src/lincense/LicenseService.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include diff --git a/server/src/server/POWHandler.cpp b/server/src/server/POWHandler.cpp index eee59a4..99302e7 100644 --- a/server/src/server/POWHandler.cpp +++ b/server/src/server/POWHandler.cpp @@ -301,7 +301,7 @@ shared_ptr POWHandler::register_verified_client(const s voice_client->initialize(); voice_client->socket = client->socket; - voice_client->state = ConnectionState::INIT_LOW; + voice_client->state = ClientState::INITIALIZING; memcpy(&voice_client->address_info, &client->address_info, sizeof(client->address_info)); { diff --git a/server/src/server/POWHandler.h b/server/src/server/POWHandler.h index 585ea7b..b822424 100644 --- a/server/src/server/POWHandler.h +++ b/server/src/server/POWHandler.h @@ -4,7 +4,6 @@ #include #include #include -#include "VoiceServer.h" #include "src/VirtualServer.h" #include "./udp-server/PrecomputedPuzzles.h" diff --git a/server/src/server/QueryServer.cpp b/server/src/server/QueryServer.cpp index 94d748b..ec28db0 100644 --- a/server/src/server/QueryServer.cpp +++ b/server/src/server/QueryServer.cpp @@ -13,6 +13,7 @@ #include #include #include +#include "src/client/query/QueryClientConnection.h" using namespace std; using namespace std::chrono; @@ -23,6 +24,10 @@ using namespace ts::server; #define TCP_NOPUSH TCP_CORK #endif +namespace ts::server::server::query { + extern thread_local bool thread_is_event_loop; +} + QueryServer::QueryServer(sql::SqlManager* db) : sql(db) { this->_executePool = new threads::ThreadPool(4, "EXEC Query"); } @@ -79,6 +84,7 @@ bool QueryServer::start(const deque> &bindings, { this->eventLoop = event_base_new(); this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&]{ + ts::server::server::query::thread_is_event_loop = true; while(this->active) { debugMessage(LOG_QUERY, "Entering event loop ({})", (void*) this->eventLoop); event_base_loop(this->eventLoop, EVLOOP_NO_EXIT_ON_EMPTY); @@ -386,18 +392,19 @@ void QueryServer::on_client_receive(int _server_file_descriptor, short ev, void } } - shared_ptr client = std::make_shared(this, file_descriptor); - client->applySelfLock(client); + std::string error{}; + auto client = std::make_shared(this, file_descriptor); + if(!client->initialize(error, client)) { + logError(LOG_QUERY, "Failed to initialize newly accepted query client: {}", error); + return; + } memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); { lock_guard lock(this->connected_clients_lock); this->connectedClients.push_back(client); } - client->preInitialize(); - if(client->readEvent) { - event_add(client->readEvent, nullptr); - } + client->connection->add_read_event(); logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); } diff --git a/server/src/server/QueryServer.h b/server/src/server/QueryServer.h index 884df33..388c0f6 100644 --- a/server/src/server/QueryServer.h +++ b/server/src/server/QueryServer.h @@ -80,6 +80,8 @@ namespace ts { bool change_query_password(const std::shared_ptr& /* account */, const std::string& /* new password */); threads::ThreadPool* executePool() { return this->_executePool; } + [[nodiscard]] inline event_base* io_event_loop() { return this->eventLoop; } + [[nodiscard]] inline std::thread::id io_event_loop_id() { return {}; } private: sql::SqlManager* sql; diff --git a/server/src/server/VoiceIOManager.h b/server/src/server/VoiceIOManager.h index 9caa0f4..f8a9813 100644 --- a/server/src/server/VoiceIOManager.h +++ b/server/src/server/VoiceIOManager.h @@ -12,8 +12,11 @@ namespace ts { namespace server { class VirtualServer; class VoiceServer; - class VoiceClient; } + namespace connection { + class VoiceClientConnection; + } + namespace io { union pktinfo_storage { in_pktinfo v4; @@ -84,7 +87,7 @@ namespace ts { datagram_packet_t dg_write_queue_head = nullptr; datagram_packet_t dg_write_queue_tail = nullptr; - std::deque> voice_write_queue; + std::deque> voice_write_queue; inline datagram_packet_t pop_dg_write_queue() { std::lock_guard lock(this->write_queue_lock); @@ -112,13 +115,13 @@ namespace ts { this->dg_write_queue_tail = packet; } - inline void push_voice_write_queue(const std::shared_ptr& client) { + inline void push_voice_write_queue(const std::shared_ptr& client) { std::lock_guard lock(this->write_queue_lock); this->voice_write_queue.push_back(client); } /* return 0 on success | 1 on there is more, but success | 2 on empty */ - inline int pop_voice_write_queue(std::shared_ptr& result) { + inline int pop_voice_write_queue(std::shared_ptr& result) { std::lock_guard lock(this->write_queue_lock); auto it_begin = this->voice_write_queue.begin(); diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index cb13cf8..b493186 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -5,6 +5,7 @@ #include #include #include "VoiceServer.h" +#include "../client/voice/VoiceClientConnection.h" #include "../client/voice/VoiceClient.h" #include "../Configuration.h" #include @@ -109,7 +110,7 @@ bool VoiceServer::start(const std::deque>& b return true; } -void VoiceServer::triggerWrite(const std::shared_ptr& client) { +void VoiceServer::triggerWrite(const std::shared_ptr& client) { if(!client) { logError(this->server->getServerId(), "Invalid client for triggerWrite()"); return; @@ -118,7 +119,7 @@ void VoiceServer::triggerWrite(const std::shared_ptr& client) { this->io->invoke_write(client); } -void VoiceServer::schedule_command_handling(const ts::server::VoiceClient *client) { +void VoiceServer::schedule_command_handling(const connection::VoiceClientConnection *client) { auto vmanager = serverInstance->getVoiceServerManager(); if(!vmanager) return; @@ -129,7 +130,7 @@ void VoiceServer::schedule_command_handling(const ts::server::VoiceClient *clien evloop->schedule(client->event_handle_packet); } -void VoiceServer::tickHandshakingClients() { +void VoiceServer::tickClients() { this->pow_handler->execute_tick(); decltype(this->activeConnections) connections; @@ -138,8 +139,7 @@ void VoiceServer::tickHandshakingClients() { connections = this->activeConnections; } for(const auto& client : connections) - if(client->state == ConnectionState::INIT_HIGH || client->state == ConnectionState::INIT_LOW) - client->tick(system_clock::now()); + client->tick(); } void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) { @@ -157,7 +157,7 @@ void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &no if (connection->acknowledge_handler.execute_resend(now, next, buffers, error) < 0) { debugMessage(client->getServerId(), "{} Failed to execute packet resend: {}", CLIENT_STR_LOG_PREFIX_(client), error); - if(client->state == ConnectionState::CONNECTED) { + if(client->state == ClientState::CONNECTED) { client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true); } else { client->close_connection(system_clock::now() + seconds(1)); @@ -342,7 +342,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { continue; if(memcmp(&client->remote_address, &remote_address, sizeof(sockaddr_storage)) != 0) { /* verify the remote address */ - if((read_buffer[12] & 0x80) == 0 && client->state == ConnectionState::CONNECTED) { /* only encrypted packets are allowed */ + if((read_buffer[12] & 0x80) == 0 && client->state == ClientState::CONNECTED) { /* only encrypted packets are allowed */ if(client->connection->verify_encryption(read_buffer.view(0, bytes_read))) { /* the ip had changed */ auto old_address = net::to_string(client->remote_address); auto new_address = net::to_string(remote_address); @@ -357,7 +357,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { } } - if(client->state != ConnectionState::DISCONNECTED){ + if(client->state != ClientState::DISCONNECTED){ client->connection->handle_incoming_datagram(read_buffer.view(0, bytes_read)); client = nullptr; } diff --git a/server/src/server/VoiceServer.h b/server/src/server/VoiceServer.h index f607469..264facc 100644 --- a/server/src/server/VoiceServer.h +++ b/server/src/server/VoiceServer.h @@ -10,84 +10,78 @@ #include #include "VoiceIOManager.h" -namespace ts { - namespace server { - namespace server::udp { - class POWHandler; - } - class VirtualServer; - class ConnectedClient; - class VoiceClient; +namespace ts::connection { + class VoiceClientConnection; +} - struct VoiceServerBinding { - sockaddr_storage address{}; - int file_descriptor = 0; - - inline std::string address_string() { return net::to_string(address); } - inline uint16_t address_port() { return net::port(address); } - }; - - class VoiceServer { - friend class VoiceClient; - friend class io::VoiceIOManager; - friend struct io::IOEventLoopEvents; - friend class server::udp::POWHandler; - public: - explicit VoiceServer(const std::shared_ptr& server); - ~VoiceServer(); - - bool start(const std::deque>&, std::string&); - bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000)); - - std::shared_ptr findClient(ClientId); - std::shared_ptr findClient(sockaddr_in* addr, bool lock = true); - std::shared_ptr findClient(sockaddr_in6* addr, bool lock = true); - inline std::shared_ptr findClient(sockaddr_storage* address, bool lock = true) { - return address->ss_family == AF_INET ? - this->findClient((sockaddr_in*) address, lock) : - address->ss_family == AF_INET6 ? - this->findClient((sockaddr_in6*) address, lock) : - nullptr; - } - - bool unregisterConnection(std::shared_ptr); - inline std::deque> activeBindings() { - std::deque> result; - for(const auto& entry : this->bindings) - if(entry->file_descriptor > 0) result.push_back(entry); - return result; - } - - inline std::shared_ptr get_server() { return this->server; } - private: - std::unique_ptr pow_handler; - std::shared_ptr server = nullptr; - - bool running = false; - std::deque> bindings; - - std::recursive_mutex connectionLock; - std::deque> activeConnections; - public: //lib event - void triggerWrite(const std::shared_ptr &); - void schedule_command_handling(VoiceClient const *client); - - void tickHandshakingClients(); - void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */); - void send_datagram(int /* socket */, io::datagram_packet_t /* packet */); - - std::shared_ptr io; - private: - static void handleMessageRead(int, short, void *); - static void handleMessageWrite(int, short, void *); - - /* execute loop */ - /* TODO - std::mutex execute_list_lock; - protocol::RingBuffer execute_list; - void run_execute_clients(); - */ - }; +namespace ts::server { + namespace server::udp { + class POWHandler; } + + class VirtualServer; + class ConnectedClient; + + struct VoiceServerBinding { + sockaddr_storage address{}; + int file_descriptor = 0; + + inline std::string address_string() { return net::to_string(address); } + inline uint16_t address_port() { return net::port(address); } + }; + + class VoiceServer { + friend class VoiceClient; + friend class io::VoiceIOManager; + friend struct io::IOEventLoopEvents; + friend class server::udp::POWHandler; + public: + explicit VoiceServer(const std::shared_ptr& server); + ~VoiceServer(); + + bool start(const std::deque>&, std::string&); + bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000)); + + std::shared_ptr findClient(sockaddr_in* addr, bool lock = true); + std::shared_ptr findClient(sockaddr_in6* addr, bool lock = true); + inline std::shared_ptr findClient(sockaddr_storage* address, bool lock = true) { + return address->ss_family == AF_INET ? + this->findClient((sockaddr_in*) address, lock) : + address->ss_family == AF_INET6 ? + this->findClient((sockaddr_in6*) address, lock) : + nullptr; + } + + bool unregisterConnection(std::shared_ptr); + inline std::deque> activeBindings() { + std::deque> result; + for(const auto& entry : this->bindings) + if(entry->file_descriptor > 0) result.push_back(entry); + return result; + } + + inline std::shared_ptr get_server() { return this->server; } + private: + std::unique_ptr pow_handler; + std::shared_ptr server = nullptr; + + bool running = false; + std::deque> bindings; + + std::recursive_mutex connectionLock; + std::deque> activeConnections; + public: //lib event + void triggerWrite(const std::shared_ptr &); + void schedule_command_handling(connection::VoiceClientConnection const *client); + + void tickClients(); + void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */); + void send_datagram(int /* socket */, io::datagram_packet_t /* packet */); + + std::shared_ptr io; + private: + static void handleMessageRead(int, short, void *); + static void handleMessageWrite(int, short, void *); + }; } \ No newline at end of file diff --git a/server/src/server/udp-server/UDPServer.h b/server/src/server/udp-server/UDPServer.h index 4e95ca0..d7d9ac4 100644 --- a/server/src/server/udp-server/UDPServer.h +++ b/server/src/server/udp-server/UDPServer.h @@ -9,11 +9,15 @@ #include namespace ts::server { - class VoiceClient; + class VirtualServer; +} - namespace vserver { - class VirtualServerBase; - } +namespace ts::connection { + class VoiceClientConnection; +} + +namespace ts::server::vserver { + class VirtualServerBase; } namespace ts::server::server::udp { @@ -61,14 +65,14 @@ namespace ts::server::server::udp { int file_descriptor{0}; - event* event_read{}; - event* event_write{}; + struct event* event_read{}; + struct event* event_write{}; spin_lock write_queue_lock{}; datagram_packet* dg_write_queue_head{nullptr}; datagram_packet* dg_write_queue_tail{nullptr}; - write_ring_queue, 1024 * 8> voice_write_queue{}; + write_ring_queue, 1024 * 8> voice_write_queue{}; }; struct io_loop { @@ -80,13 +84,18 @@ namespace ts::server::server::udp { }; struct io_binding { - vserver::VirtualServerBase* virtual_server{nullptr}; + VirtualServer* virtual_server{nullptr}; sockaddr_storage address{}; size_t loop_entry_index{0}; std::vector loop_entries{}; }; + enum struct ServerRegisterResult { + SUCCESS, + FAILED_TO_BIND + }; + class Server { public: Server(); @@ -95,19 +104,22 @@ namespace ts::server::server::udp { bool initialize(std::string& /* error */); void finalize(); - void register_virtual_server(vserver::VirtualServerBase* /* server */); + ServerRegisterResult register_virtual_server(VirtualServer* /* server */); /* this will block until all executions have been finished */ - void unregister_virtual_server(vserver::VirtualServerBase* /* server */); + void unregister_virtual_server(VirtualServer* /* server */); - void schedule_client_write(const std::shared_ptr& /* client */); - - void unregister_client(const std::shared_ptr& /* client */); + void schedule_client_write(const std::shared_ptr& /* client */); + void unregister_client(const std::shared_ptr& /* client */); private: std::mutex io_lock{}; std::vector io_loops{}; std::mutex bindings_lock{}; std::vector io_bindings{}; /* may contains nullptr! */ + + std::thread client_tick_thread{}; + + void execute_client_ticking(); }; } \ No newline at end of file diff --git a/shared b/shared index 62292af..eb503d4 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 62292af022798db2aba9ae5aa69aebbb849fb75a +Subproject commit eb503d43156fbcb091fee9f9a33110b1be99c2ed