From abeeae4ed5e620fa22adaffaaa95f18727ee7c4c Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 15 Apr 2021 12:54:52 +0200 Subject: [PATCH] Using one global event loop for the query and web client --- server/CMakeLists.txt | 2 +- server/src/Configuration.cpp | 7 + server/src/Configuration.h | 1 + server/src/InstanceHandler.cpp | 14 +- server/src/InstanceHandler.h | 6 +- server/src/TS3ServerClientManager.cpp | 175 ++++++++---------- server/src/TS3ServerHeartbeat.cpp | 15 +- server/src/VirtualServer.cpp | 133 ++++++------- server/src/VirtualServer.h | 8 +- server/src/absl/btree/btree.h | 3 + server/src/client/SpeakingClient.cpp | 3 +- server/src/client/command_handler/misc.cpp | 8 +- server/src/client/query/QueryClient.cpp | 5 +- .../voice/VoiceClientCommandHandler.cpp | 6 +- .../client/voice/VoiceClientConnection.cpp | 15 +- .../src/client/voice/VoiceClientConnection.h | 4 +- server/src/client/web/WSWebClient.cpp | 60 +++--- server/src/client/web/WebClient.cpp | 23 ++- server/src/client/web/WebClient.h | 4 +- server/src/server/GlobalNetworkEvents.cpp | 119 ++++++++++++ server/src/server/GlobalNetworkEvents.h | 58 ++++++ server/src/server/QueryServer.cpp | 148 +++------------ server/src/server/QueryServer.h | 6 +- server/src/server/VoiceServer.cpp | 89 +++++---- server/src/server/VoiceServer.h | 2 +- server/src/server/WebIoManager.cpp | 53 ------ server/src/server/WebIoManager.h | 31 ---- server/src/server/WebServer.cpp | 78 ++++---- server/src/server/WebServer.h | 2 +- shared | 2 +- 30 files changed, 529 insertions(+), 551 deletions(-) create mode 100644 server/src/server/GlobalNetworkEvents.cpp create mode 100644 server/src/server/GlobalNetworkEvents.h delete mode 100644 server/src/server/WebIoManager.cpp delete mode 100644 server/src/server/WebIoManager.h diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 8f5da73..0ae472b 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -76,6 +76,7 @@ set(SERVER_SOURCE_FILES src/client/query/QueryClientNotify.cpp src/manager/IpListManager.cpp + src/server/GlobalNetworkEvents.cpp src/ConnectionStatistics.cpp @@ -116,7 +117,6 @@ set(SERVER_SOURCE_FILES src/client/query/XMacroEventTypes.h src/server/VoiceIOManager.cpp - src/server/WebIoManager.cpp src/client/SpeakingClient.cpp ../shared/src/ssl/SSLManager.cpp diff --git a/server/src/Configuration.cpp b/server/src/Configuration.cpp index 1488159..d710208 100644 --- a/server/src/Configuration.cpp +++ b/server/src/Configuration.cpp @@ -122,6 +122,7 @@ std::string config::messages::timeout::connection_reinitialized; size_t config::threads::ticking; size_t config::threads::command_execute; +size_t config::threads::network_events; size_t config::threads::voice::events_per_server; size_t config::threads::voice::io_min; size_t config::threads::voice::io_per_server; @@ -1835,6 +1836,12 @@ std::deque> config::create_bindings() { ADD_DESCRIPTION("Command executors"); ADD_SENSITIVE(); } + { + CREATE_BINDING("network_events", 0); + BIND_INTEGRAL(config::threads::network_events, 4, 1, 128); + ADD_DESCRIPTION("Network event loops"); + ADD_SENSITIVE(); + } { BIND_GROUP(voice) { diff --git a/server/src/Configuration.h b/server/src/Configuration.h index f05bacc..93b55a0 100644 --- a/server/src/Configuration.h +++ b/server/src/Configuration.h @@ -231,6 +231,7 @@ namespace ts::config { namespace threads { extern size_t ticking; extern size_t command_execute; + extern size_t network_events; namespace voice { extern size_t events_per_server; diff --git a/server/src/InstanceHandler.cpp b/server/src/InstanceHandler.cpp index 35abaab..a84d6e3 100644 --- a/server/src/InstanceHandler.cpp +++ b/server/src/InstanceHandler.cpp @@ -9,6 +9,7 @@ #include "src/server/QueryServer.h" #include "src/manager/PermissionNameMapper.h" #include "./FileServerHandler.h" +#include "./server/GlobalNetworkEvents.h" #include #include "ShutdownHelper.h" #include @@ -270,6 +271,13 @@ bool InstanceHandler::startInstance() { string errorMessage; this->server_command_executor_ = std::make_shared(ts::config::threads::command_execute); + this->network_event_loop_ = std::make_unique(ts::config::threads::network_events); + if(!this->network_event_loop_->initialize()) { + this->server_command_executor_ = nullptr; + this->network_event_loop_ = nullptr; + logCritical(LOG_INSTANCE, "Failed to initialize network event loop"); + return false; + } this->permission_mapper = make_shared(); if(!this->permission_mapper->initialize(config::permission_mapping_file, errorMessage)) { @@ -376,7 +384,6 @@ FwIDAQAB logCritical(LOG_GENERAL, "Failed to initialize WebClient TeaForum key! ({})", error); return false; } - this->web_event_loop = make_shared(); } #endif @@ -467,10 +474,11 @@ void InstanceHandler::stopInstance() { delete this->sslMgr; this->sslMgr = nullptr; - this->web_event_loop = nullptr; - this->license_service_->shutdown(); this->server_command_executor_ = nullptr; + + this->network_event_loop_->shutdown(); + this->network_event_loop_ = nullptr; } void InstanceHandler::tickInstance() { diff --git a/server/src/InstanceHandler.h b/server/src/InstanceHandler.h index 9974fa8..c5985de 100644 --- a/server/src/InstanceHandler.h +++ b/server/src/InstanceHandler.h @@ -7,7 +7,6 @@ #include #include "manager/SqlDataManager.h" #include "lincense/TeamSpeakLicense.h" -#include "server/WebIoManager.h" #include namespace ts { @@ -32,6 +31,7 @@ namespace ts { class GroupManager; } + class NetworkEventLoop; class ServerCommandExecutor; class InstanceHandler; @@ -80,13 +80,13 @@ namespace ts { bool resetMonthlyStats(); [[nodiscard]] inline const auto& general_task_executor(){ return this->general_task_executor_; } + [[nodiscard]] inline const auto& network_event_loop(){ return this->network_event_loop_; } [[nodiscard]] inline std::shared_ptr getStatistics(){ return statistics; } [[nodiscard]] std::shared_ptr generateLicenseData(); [[nodiscard]] inline std::shared_ptr getTeamSpeakLicense() { return this->teamspeak_license; } [[nodiscard]] inline PropertyWrapper getDefaultServerProperties() { return PropertyWrapper{this->default_server_properties}; } - [[nodiscard]] inline std::shared_ptr getWebIoLoop() { return this->web_event_loop; } [[nodiscard]] inline std::shared_ptr getPermissionMapper() { return this->permission_mapper; } [[nodiscard]] inline std::shared_ptr getConversationIo() { return this->conversation_io; } @@ -118,13 +118,13 @@ namespace ts { ssl::SSLManager* sslMgr = nullptr; file::FileServerHandler* file_server_handler_{nullptr}; std::unique_ptr action_logger_{nullptr}; + std::unique_ptr network_event_loop_{nullptr}; std::shared_ptr _properties{}; std::shared_ptr server_command_executor_{}; std::shared_ptr conversation_io = nullptr; - std::shared_ptr web_event_loop = nullptr; std::shared_ptr default_server_properties = nullptr; std::shared_mutex default_tree_lock; diff --git a/server/src/TS3ServerClientManager.cpp b/server/src/TS3ServerClientManager.cpp index ca207d2..c3dd6e4 100644 --- a/server/src/TS3ServerClientManager.cpp +++ b/server/src/TS3ServerClientManager.cpp @@ -22,100 +22,120 @@ bool VirtualServer::registerClient(shared_ptr client) { sassert(client); { - lock_guard lock(this->clients.lock); + std::lock_guard clients_lock{this->clients_mutex}; if(client->getClientId() > 0) { logCritical(this->getServerId(), "Client {} ({}|{}) has been already registered!", client->getDisplayName(), client->getClientId(), client->getUid()); return false; } - ClientId client_id = 0; - ClientId max_client_id = this->clients.clients.size(); - - while(client_id < max_client_id && this->clients.clients[client_id]) + ClientId client_id{0}; + while(this->clients.count(client_id)) { client_id++; - if(client_id == max_client_id) - this->clients.clients.push_back(client); - else - this->clients.clients[client_id] = client; - this->clients.count++; + } + + this->clients.emplace(client_id, client); client->setClientId(client_id); } { - lock_guard lock(this->client_nickname_lock); + std::lock_guard lock{this->client_nickname_lock}; auto login_name = client->getDisplayName(); - while(login_name.length() < 3) - login_name += "."; - - if(client->getExternalType() == ClientType::CLIENT_TEAMSPEAK) + if(client->getExternalType() == ClientType::CLIENT_TEAMSPEAK) { client->properties()[property::CLIENT_LOGIN_NAME] = login_name; + } - std::shared_ptr found_client = nullptr; + while(login_name.length() < 3) { + login_name += "."; + } - auto client_name = login_name; - size_t counter = 0; + std::shared_ptr found_client{nullptr}; + auto registered_clients = this->getClients(); + + auto client_name{login_name}; + size_t counter{0}; { - lock_guard clients_lock(this->clients.lock); while(true) { - for(auto& _client : this->clients.clients) { - if(!_client) continue; - - if(_client->getDisplayName() == client_name && _client != client) + for(auto& _client : registered_clients) { + if(_client->getDisplayName() == client_name && _client != client) { goto increase_name; + } } goto nickname_valid; increase_name: - client_name = login_name + to_string(++counter); + client_name = login_name + std::to_string(++counter); } } + nickname_valid: client->setDisplayName(client_name); } + switch(client->getType()) { + case ClientType::CLIENT_TEAMSPEAK: + case ClientType::CLIENT_TEASPEAK: + case ClientType::CLIENT_WEB: + this->properties()[property::VIRTUALSERVER_CLIENT_CONNECTIONS].increment_by(1); //increase manager connections + this->properties()[property::VIRTUALSERVER_LAST_CLIENT_CONNECT] = duration_cast(system_clock::now().time_since_epoch()).count(); + break; + + case ClientType::CLIENT_QUERY: + this->properties()[property::VIRTUALSERVER_LAST_QUERY_CONNECT] = duration_cast(system_clock::now().time_since_epoch()).count(); + this->properties()[property::VIRTUALSERVER_QUERY_CLIENT_CONNECTIONS].increment_by(1); //increase manager connections + break; + + case ClientType::CLIENT_MUSIC: + case ClientType::CLIENT_INTERNAL: + break; - if(client->getType() == ClientType::CLIENT_TEAMSPEAK || client->getType() == ClientType::CLIENT_WEB) { - this->properties()[property::VIRTUALSERVER_CLIENT_CONNECTIONS].increment_by(1); //increase manager connections - this->properties()[property::VIRTUALSERVER_LAST_CLIENT_CONNECT] = duration_cast(system_clock::now().time_since_epoch()).count(); - } - else if(client->getType() == ClientType::CLIENT_QUERY) { - this->properties()[property::VIRTUALSERVER_LAST_QUERY_CONNECT] = duration_cast(system_clock::now().time_since_epoch()).count(); - this->properties()[property::VIRTUALSERVER_QUERY_CLIENT_CONNECTIONS].increment_by(1); //increase manager connections + case ClientType::MAX: + default: + assert(false); + break; } return true; } -bool VirtualServer::unregisterClient(shared_ptr cl, std::string reason, std::unique_lock& chan_tree_lock) { - if(cl->getType() == ClientType::CLIENT_TEAMSPEAK || cl->getType() == ClientType::CLIENT_TEASPEAK || cl->getType() == ClientType::CLIENT_WEB) { - sassert(cl->state == ConnectionState::DISCONNECTED); +bool VirtualServer::unregisterClient(shared_ptr client, std::string reason, std::unique_lock& chan_tree_lock) { + if(client->getType() == ClientType::CLIENT_TEAMSPEAK || client->getType() == ClientType::CLIENT_TEASPEAK || client->getType() == ClientType::CLIENT_WEB) { + sassert(client->state == ConnectionState::DISCONNECTED); } - auto client_id = cl->getClientId(); - if(client_id == 0) { - return false; /* not registered */ - } + { - - lock_guard lock(this->clients.lock); - if(client_id >= this->clients.clients.size()) { - logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but client id exceed client id! Failed to unregister client.", cl->getDisplayName(), client_id, cl->getUid()); - } else { - auto& client_container = this->clients.clients[client_id]; - if(client_container != cl) { - logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but container hasn't client set! Failed to unregister client.", cl->getDisplayName(), client_id, cl->getUid()); - } else { - client_container.reset(); - this->clients.count--; - } + if(!chan_tree_lock.owns_lock()) { + chan_tree_lock.lock(); } + + if(client->currentChannel) { + //We dont have to make him invisible if he hasnt even a channel + this->client_move(client, nullptr, nullptr, reason, ViewReasonId::VREASON_SERVER_LEFT, false, chan_tree_lock); + } + + chan_tree_lock.unlock(); + } + + { + std::lock_guard clients_lock{this->clients_mutex}; + auto client_id = client->getClientId(); + if(client_id == 0) { + return false; /* not registered */ + } + + if(!this->clients.erase(client_id)) { + client->setClientId(0); + logError(this->getServerId(), "Tried to unregister a not registered client {}/{} ({})", client->getDisplayName(), client->getUid(), client_id); + return false; + } + client->setClientId(0); } auto current_time_seconds = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - switch(cl->getType()) { + switch(client->getType()) { case ClientType::CLIENT_TEAMSPEAK: case ClientType::CLIENT_TEASPEAK: case ClientType::CLIENT_WEB: @@ -134,65 +154,20 @@ bool VirtualServer::unregisterClient(shared_ptr cl, std::string break; } - { - if(!chan_tree_lock.owns_lock()) { - chan_tree_lock.lock(); - } - - if(cl->currentChannel) { - //We dont have to make him invisible if he hasnt even a channel - this->client_move(cl, nullptr, nullptr, reason, ViewReasonId::VREASON_SERVER_LEFT, false, chan_tree_lock); - } - } - - serverInstance->databaseHelper()->saveClientPermissions(this->ref(), cl->getClientDatabaseId(), cl->clientPermissions); - cl->setClientId(0); + serverInstance->databaseHelper()->saveClientPermissions(this->ref(), client->getClientDatabaseId(), client->clientPermissions); return true; } void VirtualServer::registerInternalClient(std::shared_ptr client) { client->state = ConnectionState::CONNECTED; - { - lock_guard lock(this->clients.lock); - if(client->getClientId() > 0) { - logCritical(this->getServerId(), "Internal client {} ({}|{}) has been already registered!", client->getDisplayName(), client->getClientId(), client->getUid()); - return; - } - - ClientId client_id = 0; - ClientId max_client_id = this->clients.clients.size(); - while(client_id < max_client_id && this->clients.clients[client_id]) - client_id++; - if(client_id == max_client_id) - this->clients.clients.push_back(client); - else - this->clients.clients[client_id] = client; - - this->clients.clients[client_id] = client; - this->clients.count++; - client->setClientId(client_id); - } + this->registerClient(client); } void VirtualServer::unregisterInternalClient(std::shared_ptr client) { client->state = ConnectionState::DISCONNECTED; - { - auto client_id = client->getClientId(); - - lock_guard lock(this->clients.lock); - if(client_id >= this->clients.clients.size()) { - logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but client id exceed client id! Failed to unregister internal client.", client->getDisplayName(), client_id, client->getUid()); - } else { - auto& client_container = this->clients.clients[client_id]; - if(client_container != client) { - logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but container hasn't client set! Failed to unregister internal client.", client->getDisplayName(), client_id, client->getUid()); - } else { - this->clients.count--; - client_container.reset(); - } - } - } + std::unique_lock tree_lock{this->channel_tree_mutex}; + this->unregisterClient(client, "internal disconnect", tree_lock); } bool VirtualServer::assignDefaultChannel(const shared_ptr& client, bool join) { diff --git a/server/src/TS3ServerHeartbeat.cpp b/server/src/TS3ServerHeartbeat.cpp index 55f6149..d8af669 100644 --- a/server/src/TS3ServerHeartbeat.cpp +++ b/server/src/TS3ServerHeartbeat.cpp @@ -126,18 +126,9 @@ void VirtualServer::executeServerTick() { tick_client_begin = tick_client_end; if(cl->server != this) { logError(this->getServerId(), "Got registered client, but client does not think hes bound to this server!"); - - { - lock_guard lock(this->clients.lock); - for(auto& client : this->clients.clients) { - if(client != cl) continue; - - client.reset(); - this->clients.count--; - break; - } - } - continue; //Fully ha? + std::unique_lock tree_lock{this->channel_tree_mutex}; + this->unregisterClient(cl, "invalid server handle", tree_lock); + continue; } if(cl->floodPoints > flood_block){ diff --git a/server/src/VirtualServer.cpp b/server/src/VirtualServer.cpp index 52fc226..e535c6d 100644 --- a/server/src/VirtualServer.cpp +++ b/server/src/VirtualServer.cpp @@ -458,17 +458,6 @@ bool VirtualServer::start(std::string& error) { this->serverRoot->server = self.lock(); this->serverAdmin->server = self.lock(); - { //Client delete after server stop/start - lock_guard lock(this->clients.lock); - - for(auto& client : this->clients.clients) { - if(!client) continue; - if(client->getType() == ClientType::CLIENT_WEB || client->getType() == ClientType::CLIENT_TEAMSPEAK) { - client.reset(); - } - } - } - auto host = this->properties()[property::VIRTUALSERVER_HOST].value(); if(config::binding::enforce_default_voice_host) host = config::binding::DefaultVoiceHost; @@ -691,45 +680,39 @@ void VirtualServer::stop(const std::string& reason, bool disconnect_query) { } size_t VirtualServer::onlineClients() { - size_t result = 0; + size_t result{0}; - lock_guard lock(this->clients.lock); - for(const auto &cl : this->clients.clients) { - if(!cl) - continue; - if(cl->getType() == CLIENT_TEAMSPEAK || cl->getType() == CLIENT_QUERY) + for(const auto& client : this->getClients()) { + if(client->getType() == CLIENT_TEAMSPEAK || client->getType() == CLIENT_QUERY) { result++; + } } + return result; } OnlineClientReport VirtualServer::onlineStats() { OnlineClientReport response{}; - { - lock_guard lock(this->clients.lock); - for(const auto &cl : this->clients.clients) { - if(!cl) continue; - - switch (cl->getType()) { - case CLIENT_TEAMSPEAK: - case CLIENT_TEASPEAK: - response.clients_ts++; - break; - case CLIENT_WEB: - response.clients_web++; - break; - case CLIENT_QUERY: - response.queries++; - break; - case CLIENT_MUSIC: - response.bots++; - break; - case CLIENT_INTERNAL: - case MAX: - default: - break; - } + for(const auto &client : this->getClients()) { + switch (client->getType()) { + case CLIENT_TEAMSPEAK: + case CLIENT_TEASPEAK: + response.clients_ts++; + break; + case CLIENT_WEB: + response.clients_web++; + break; + case CLIENT_QUERY: + response.queries++; + break; + case CLIENT_MUSIC: + response.bots++; + break; + case CLIENT_INTERNAL: + case MAX: + default: + break; } } @@ -737,37 +720,38 @@ OnlineClientReport VirtualServer::onlineStats() { } std::shared_ptr VirtualServer::find_client_by_id(uint16_t client_id) { - lock_guard lock(this->clients.lock); - if(this->clients.clients.size() > client_id) - return this->clients.clients[client_id]; - else + std::lock_guard lock{this->clients_mutex}; + auto it = this->clients.find(client_id); + if(it == this->clients.end()) { return nullptr; + } else { + return it->second; + } } deque> VirtualServer::findClientsByCldbId(uint64_t cldbId) { - deque> result; + std::deque> result; - lock_guard lock(this->clients.lock); - for(const auto &client : this->clients.clients) { - if(!client) continue; - - if(client->getClientDatabaseId() == cldbId) + std::lock_guard lock{this->clients_mutex}; + for(const auto& [_, client] : this->clients) { + if(client->getClientDatabaseId() == cldbId) { result.push_back(client); + } } + return result; } deque> VirtualServer::findClientsByUid(std::string uid) { - lock_guard lock(this->clients.lock); - - deque> result; - for(const auto &client : this->clients.clients) { - if(!client) continue; + std::deque> result; + std::lock_guard lock{this->clients_mutex}; + for(const auto& [_, client] : this->clients) { if(client->getUid() == uid) { result.push_back(client); } } + return result; } @@ -776,22 +760,18 @@ std::shared_ptr VirtualServer::findClient(std::string name, boo std::transform(name.begin(), name.end(), name.begin(), ::tolower); } - { + std::lock_guard lock{this->clients_mutex}; + for(const auto& [_, client] : this->clients) { + string clName = client->getDisplayName(); + if(ignoreCase) { + std::transform(clName.begin(), clName.end(), clName.begin(), ::tolower); + } - lock_guard lock(this->clients.lock); - - for(const auto& client : this->clients.clients) { - if(!client) continue; - - string clName = client->getDisplayName(); - if(ignoreCase) { - std::transform(clName.begin(), clName.end(), clName.begin(), ::tolower); - } - - if(clName == name) - return client; + if(clName == name) { + return client; } } + return nullptr; } @@ -809,19 +789,16 @@ bool VirtualServer::forEachClient(std::function> VirtualServer::getClients() { - vector> clients; + std::vector> result{}; - { - lock_guard lock(this->clients.lock); - clients.reserve(this->clients.count); + std::lock_guard lock{this->clients_mutex}; + result.reserve(this->clients.size()); - for(auto& client : this->clients.clients) { - if(!client) continue; - clients.push_back(client); - } + for(const auto& [_, client] : this->clients) { + result.push_back(client); } - return clients; + return result; } /* Note: This method **should** not lock the channel tree else we've a lot to do! */ diff --git a/server/src/VirtualServer.h b/server/src/VirtualServer.h index 47ca34a..7e68f38 100644 --- a/server/src/VirtualServer.h +++ b/server/src/VirtualServer.h @@ -21,6 +21,7 @@ #include "manager/LetterManager.h" #include "Configuration.h" #include "protocol/ringbuffer.h" +#include "absl/btree/map.h" #include #include @@ -325,11 +326,8 @@ namespace ts { std::chrono::system_clock::time_point conversation_cache_cleanup_timestamp; //The client list - struct { - size_t count = 0; - std::mutex lock; - std::vector> clients; - } clients; + std::mutex clients_mutex{}; + btree::map> clients{}; std::recursive_mutex client_nickname_lock; diff --git a/server/src/absl/btree/btree.h b/server/src/absl/btree/btree.h index 1a4fc03..a2a787d 100644 --- a/server/src/absl/btree/btree.h +++ b/server/src/absl/btree/btree.h @@ -901,7 +901,10 @@ private: // allocator_type& alloc = allocator(); // allocator_traits::destroy(alloc, v); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wclass-memaccess" assert(memcpy(v, zero_value, sizeof(value_type))); +#pragma GCC diagnostic pop } void destroy_value(int i) { diff --git a/server/src/client/SpeakingClient.cpp b/server/src/client/SpeakingClient.cpp index 39a0a2d..f8169a7 100644 --- a/server/src/client/SpeakingClient.cpp +++ b/server/src/client/SpeakingClient.cpp @@ -119,7 +119,8 @@ command_result SpeakingClient::applyClientInitParameters(Command &cmd) { this->properties()[property::CLIENT_NICKNAME_PHONETIC] = name; } else if(key == "client_version" || key == "client_platform") { auto value = cmd[key].string(); - if(value.length() > 64) { + if(value.length() > 512) { + /* The web client uses the full browser string which might be a bit longer */ return command_result{error::client_hacked}; } diff --git a/server/src/client/command_handler/misc.cpp b/server/src/client/command_handler/misc.cpp index d64fd3e..0a3a372 100644 --- a/server/src/client/command_handler/misc.cpp +++ b/server/src/client/command_handler/misc.cpp @@ -3353,10 +3353,10 @@ enum struct FeatureSupportMode { DEPRECATED }; -#define REGISTER_FEATURE(name, support, version) \ - notify.put_unchecked(index, "name", name); \ - notify.put_unchecked(index, "support", (int) support); \ - notify.put_unchecked(index, "version", version); \ +#define REGISTER_FEATURE(name, support, version) \ + notify.put_unchecked(index, "name", name); \ + notify.put_unchecked(index, "support", (int) support); \ + notify.put_unchecked(index, "version", version); \ index++ command_result ConnectedClient::handleCommandListFeatureSupport(ts::Command &cmd) { diff --git a/server/src/client/query/QueryClient.cpp b/server/src/client/query/QueryClient.cpp index 722c44c..548b2e8 100644 --- a/server/src/client/query/QueryClient.cpp +++ b/server/src/client/query/QueryClient.cpp @@ -7,6 +7,7 @@ #include #include #include "../../groups/GroupAssignmentManager.h" +#include "../../server/GlobalNetworkEvents.h" using namespace std; using namespace std::chrono; @@ -66,8 +67,8 @@ void QueryClient::initialize_weak_reference(const std::shared_ptr(dynamic_pointer_cast(self)) ); - this->event_read = event_new(this->handle->event_io_loop, this->client_file_descriptor, EV_READ | EV_PERSIST, QueryClient::handle_event_read, this); - this->event_write = event_new(this->handle->event_io_loop, this->client_file_descriptor, EV_WRITE, QueryClient::handle_event_write, this); + this->event_read = serverInstance->network_event_loop()->allocate_event(this->client_file_descriptor, EV_READ | EV_PERSIST, QueryClient::handle_event_read, this, nullptr); + this->event_write = serverInstance->network_event_loop()->allocate_event(this->client_file_descriptor, EV_WRITE, QueryClient::handle_event_write, this, nullptr); } QueryClient::~QueryClient() { diff --git a/server/src/client/voice/VoiceClientCommandHandler.cpp b/server/src/client/voice/VoiceClientCommandHandler.cpp index f8d4710..a1eb4a6 100644 --- a/server/src/client/voice/VoiceClientCommandHandler.cpp +++ b/server/src/client/voice/VoiceClientCommandHandler.cpp @@ -72,10 +72,12 @@ command_result VoiceClient::handleCommand(ts::Command &command) { if(!this->voice_server) return command_result{error::server_unbound}; if(this->state == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED) { - if(command.command() == "clientinit") + if(command.command() == "clientinit") { return this->handleCommandClientInit(command); - } else if(command.command() == "clientdisconnect") + } + } else if(command.command() == "clientdisconnect") { return this->handleCommandClientDisconnect(command); + } return SpeakingClient::handleCommand(command); } diff --git a/server/src/client/voice/VoiceClientConnection.cpp b/server/src/client/voice/VoiceClientConnection.cpp index 8deab5e..84b5f41 100644 --- a/server/src/client/voice/VoiceClientConnection.cpp +++ b/server/src/client/voice/VoiceClientConnection.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -6,7 +5,6 @@ #include "../../server/VoiceServer.h" #include "./VoiceClientConnection.h" -#include "./VoiceClient.h" //#define LOG_AUTO_ACK_AUTORESPONSE @@ -76,16 +74,11 @@ void VoiceClientConnection::triggerWrite() { } } -void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) { - ClientPacketParser packet_parser{buffer}; - if(!packet_parser.valid()) { - return; - } - +void VoiceClientConnection::handle_incoming_datagram(protocol::ClientPacketParser& packet_parser) { #ifndef CONNECTION_NO_STATISTICS if(this->current_client) { auto stats = this->current_client->connectionStatistics; - stats->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length() + 96); /* 96 for the UDP packet overhead */ + stats->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), packet_parser.buffer().length() + 96); /* 96 for the UDP packet overhead */ } this->packet_statistics().received_packet((protocol::PacketType) packet_parser.type(), packet_parser.full_packet_id()); #endif @@ -197,8 +190,8 @@ void VoiceClientConnection::callback_command_decoded(void *ptr_this, Reassembled connection->handlePacketCommand(std::exchange(command, nullptr)); } -bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) { - return this->packet_decoder_.verify_encryption_client_packet(buffer); +bool VoiceClientConnection::verify_encryption(const protocol::ClientPacketParser& packet) { + return this->packet_decoder_.verify_encryption_client_packet(packet); } std::shared_ptr VoiceClientConnection::getCurrentClient() { diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index 3e9af7e..facdb1f 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -81,8 +81,8 @@ namespace ts { [[nodiscard]] inline auto& ping_handler() { return this->ping_handler_; } [[nodiscard]] inline auto& crypt_setup_handler() { return this->crypt_setup_handler_; } protected: - void handle_incoming_datagram(const pipes::buffer_view &buffer); - bool verify_encryption(const pipes::buffer_view& /* full packet */); + void handle_incoming_datagram(protocol::ClientPacketParser& /* packet */); + bool verify_encryption(const protocol::ClientPacketParser& /* packet */); void triggerWrite(); private: diff --git a/server/src/client/web/WSWebClient.cpp b/server/src/client/web/WSWebClient.cpp index a478d6a..c4b36af 100644 --- a/server/src/client/web/WSWebClient.cpp +++ b/server/src/client/web/WSWebClient.cpp @@ -10,59 +10,61 @@ using namespace ts; using namespace ts::server; using namespace ts::protocol; -void WebClient::handleMessageWrite(int fd, short, void *) { - auto self_lock = this->ref(); +void WebClient::handleMessageWrite(int fd, short, void *ptr_client) { + auto client = dynamic_pointer_cast(((WebClient*) ptr_client)->ref()); + assert(client); - unique_lock buffer_lock(this->queue_mutex); - if(this->queue_write.empty()) return; + unique_lock buffer_lock(client->queue_mutex); + if(client->queue_write.empty()) return; - auto buffer = this->queue_write[0]; - this->queue_write.pop_front(); + auto buffer = client->queue_write[0]; + client->queue_write.pop_front(); auto written = send(fd, buffer.data_ptr(), buffer.length(), MSG_NOSIGNAL | MSG_DONTWAIT); if(written == -1) { buffer_lock.unlock(); if (errno == EINTR || errno == EAGAIN) { - lock_guard event_lock(this->event_mutex); - if(this->writeEvent) - event_add(this->writeEvent, nullptr); + lock_guard event_lock(client->event_mutex); + if(client->writeEvent) + event_add(client->writeEvent, nullptr); return; } else { //new ServerConnection(globalClient).startConnection({ host: "localhost", port: 9987}, new HandshakeHandler(profiles.default_profile(), "test")) { - std::lock_guard event_lock{this->event_mutex}; - if(this->writeEvent) { - event_del_noblock(this->writeEvent); - event_free(this->writeEvent); - this->writeEvent = nullptr; + std::lock_guard event_lock{client->event_mutex}; + if(client->writeEvent) { + event_del_noblock(client->writeEvent); + event_free(client->writeEvent); + client->writeEvent = nullptr; } } - debugMessage(this->getServerId(), "[{}] Failed to write message (length {}, errno {}, message {}) Disconnecting client.", CLIENT_STR_LOG_PREFIX, written, errno, strerror(errno)); + debugMessage(client->getServerId(), "[{}] Failed to write message (length {}, errno {}, message {}) Disconnecting client.", client->getLoggingPrefix(), written, errno, strerror(errno)); } - this->close_connection(system_clock::now()); /* close connection in a new thread */ + client->close_connection(system_clock::now()); /* close connection in a new thread */ return; } if(written < buffer.length()) { buffer = buffer.range((size_t) written); /* get the overhead */ - this->queue_write.push_front(buffer); + client->queue_write.push_front(buffer); } - if(this->queue_write.empty()) + if(client->queue_write.empty()) return; /* reschedule new write */ buffer_lock.unlock(); - lock_guard event_lock(this->event_mutex); - if(this->writeEvent) - event_add(this->writeEvent, nullptr); + lock_guard event_lock(client->event_mutex); + if(client->writeEvent) + event_add(client->writeEvent, nullptr); } -void WebClient::handleMessageRead(int fd, short, void *) { - auto self_lock = this->ref(); +void WebClient::handleMessageRead(int fd, short, void *ptr_client) { + auto client = dynamic_pointer_cast(((WebClient*) ptr_client)->ref()); + assert(client); size_t buffer_length = 1024 * 4; uint8_t buffer[buffer_length]; @@ -72,14 +74,14 @@ void WebClient::handleMessageRead(int fd, short, void *) { if(errno == EINTR || errno == EAGAIN) ; else { - debugMessage(this->getServerId(), "[{}] Failed to read message (length {}, errno {}, message: {}). Closing connection.", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno)); + debugMessage(client->getServerId(), "[{}] Failed to read message (length {}, errno {}, message: {}). Closing connection.", client->getLoggingPrefix(), length, errno, strerror(errno)); { - lock_guard lock(this->event_mutex); - if(this->readEvent) - event_del_noblock(this->readEvent); + lock_guard lock(client->event_mutex); + if(client->readEvent) + event_del_noblock(client->readEvent); } - self_lock->close_connection(system_clock::now()); /* direct close, but from another thread */ + client->close_connection(system_clock::now()); /* direct close, but from another thread */ } return; } @@ -87,7 +89,7 @@ void WebClient::handleMessageRead(int fd, short, void *) { auto command = command::ReassembledCommand::allocate((size_t) length); memcpy(command->command(), buffer, (size_t) length); - this->command_queue->enqueue_command_execution(command); + client->command_queue->enqueue_command_execution(command); } void WebClient::enqueue_raw_packet(const pipes::buffer_view &msg) { diff --git a/server/src/client/web/WebClient.cpp b/server/src/client/web/WebClient.cpp index fe63c4d..65c77ec 100644 --- a/server/src/client/web/WebClient.cpp +++ b/server/src/client/web/WebClient.cpp @@ -12,6 +12,7 @@ #include #include #include "../../manager/ActionLogger.h" +#include "../../server/GlobalNetworkEvents.h" #if defined(TCP_CORK) && !defined(TCP_NOPUSH) #define TCP_NOPUSH TCP_CORK @@ -43,14 +44,12 @@ void WebClient::initialize() { int enabled = 1; int disabled = 0; setsockopt(this->file_descriptor, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled)); - if(setsockopt(this->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) + if(setsockopt(this->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) { logError(this->getServerId(), "{} Cant disable nopush! system error: {} => {}", CLIENT_STR_LOG_PREFIX, errno, strerror(errno)); + } - - auto event_loop = serverInstance->getWebIoLoop()->next_loop(); - this->readEvent = event_new(event_loop->loop, this->file_descriptor, EV_READ|EV_PERSIST, [](int a, short b, void* c){ ((WebClient*) c)->handleMessageRead(a, b, c); }, this); - this->writeEvent = event_new(event_loop->loop, this->file_descriptor, EV_WRITE, [](int a, short b, void* c){ ((WebClient*) c)->handleMessageWrite(a, b, c); }, this); - + this->readEvent = serverInstance->network_event_loop()->allocate_event(this->file_descriptor, EV_READ | EV_PERSIST, WebClient::handleMessageRead, this, nullptr); + this->writeEvent = serverInstance->network_event_loop()->allocate_event(this->file_descriptor, EV_WRITE, WebClient::handleMessageWrite, this, nullptr); { this->ws_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true); @@ -281,8 +280,9 @@ command_result WebClient::handleCommand(Command &command) { if(this->connectionState() == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED){ if(command.command() == "clientinit") { auto result = this->handleCommandClientInit(command); - if(result.has_error()) + if(result.has_error()) { this->close_connection(system_clock::now() + seconds(1)); + } return result; } } @@ -400,10 +400,11 @@ void WebClient::disconnectFinal() { lock_guard lock(this->execute_mutex); } - if(this->flush_thread.get_id() == this_thread::get_id()) + if(this->flush_thread.get_id() == this_thread::get_id()) { this->flush_thread.detach(); - else + } else { assert(!this->flush_thread.joinable()); /* shall be already joined via closeConnection(...)*/ + } { ::event *event_read, *event_write; @@ -434,6 +435,7 @@ void WebClient::disconnectFinal() { this->file_descriptor = -1; } + this->state = ConnectionState::DISCONNECTED; this->processLeave(); /* We do not finalize here since we might still try to send some data */ @@ -551,8 +553,9 @@ bool WebClient::disconnect(const std::string &reason) { } command_result WebClient::handleCommandClientInit(Command &command) { - if(!config::server::clients::teaweb) + if(!config::server::clients::teaweb) { return command_result{error::client_type_is_not_allowed, config::server::clients::teaweb_not_allowed_message}; + } return SpeakingClient::handleCommandClientInit(command); } diff --git a/server/src/client/web/WebClient.h b/server/src/client/web/WebClient.h index 0701e05..d1c09b0 100644 --- a/server/src/client/web/WebClient.h +++ b/server/src/client/web/WebClient.h @@ -83,8 +83,8 @@ namespace ts::server { private: void initialize(); - void handleMessageRead(int, short, void*); - void handleMessageWrite(int, short, void*); + static void handleMessageRead(int, short, void*); + static void handleMessageWrite(int, short, void*); void enqueue_raw_packet(const pipes::buffer_view& /* buffer */); /* TODO: Put the message processing part into the IO loop and not into command processing! */ diff --git a/server/src/server/GlobalNetworkEvents.cpp b/server/src/server/GlobalNetworkEvents.cpp new file mode 100644 index 0000000..4f4f36c --- /dev/null +++ b/server/src/server/GlobalNetworkEvents.cpp @@ -0,0 +1,119 @@ +// +// Created by WolverinDEV on 15/04/2021. +// + +#include "GlobalNetworkEvents.h" +#include +#include + +using namespace ts::server; + +namespace ts::server { + struct NetworkEventLoopUseList { + std::vector used_event_loops{}; + }; +} + +NetworkEventLoop::NetworkEventLoop(size_t event_loop_size) : event_loop_size{event_loop_size} { } +NetworkEventLoop::~NetworkEventLoop() { + this->shutdown(); +} + +bool NetworkEventLoop::initialize() { + std::lock_guard lock{this->mutex}; + while(this->event_loops.size() < this->event_loop_size) { + auto event_loop = new EventLoop{this->event_loop_id_index++}; + event_loop->event_base = event_base_new(); + if(!event_loop->event_base) { + logError(LOG_GENERAL, "Failed to allocate new event base."); + delete event_loop; + return false; + } + + event_loop->dispatcher = std::thread{NetworkEventLoop::event_loop_dispatch, event_loop}; + threads::name(event_loop->dispatcher, "network loop #" + std::to_string(event_loop->loop_id)); + this->event_loops.push_back(event_loop); + } + + return true; +} + +void NetworkEventLoop::shutdown() { + std::unique_lock lock{this->mutex}; + auto event_loops_ = std::move(this->event_loops); + lock.unlock(); + + for(const auto& loop : event_loops_) { + event_base_loopexit(loop->event_base, nullptr); + } + + for(const auto& loop : event_loops_) { + if(!threads::timed_join(loop->dispatcher, std::chrono::seconds{15})) { + /* This will cause a memory corruption since the memory we're freeing will still be accessed */ + logCritical(LOG_GENERAL, "Failed to join event loop {}. Detaching thread.", loop->loop_id); + loop->dispatcher.detach(); + } + + event_base_free(loop->event_base); + delete loop; + } +} + +void NetworkEventLoop::free_use_list(NetworkEventLoopUseList *list) { + delete list; +} + +event* NetworkEventLoop::allocate_event(int fd, short events, event_callback_fn callback, void *callback_data, NetworkEventLoopUseList **use_list) { + if(use_list && !*use_list) { + *use_list = new NetworkEventLoopUseList{}; + } + + std::lock_guard lock{this->mutex}; + EventLoop* event_loop; + + size_t try_count{0}; + while(try_count++ < this->event_loops.size()) { + event_loop = this->event_loops[this->event_loop_index % this->event_loops.size()]; + + if(!use_list) { + /* we have our event loop */ + break; + } + + auto& used_loops = (*use_list)->used_event_loops; + if(std::find(used_loops.begin(), used_loops.end(), event_loop->loop_id) == used_loops.end()) { + /* we haven't yet used that event loop */ + break; + } + } + + if(try_count >= this->event_loops.size()) { + /* We've no event loop to put the event in */ + return nullptr; + } + + auto event = event_new(event_loop->event_base, fd, events, callback, callback_data); + if(!event) { + /* failed to allocate the new event */ + return nullptr; + } + + this->event_loop_index++; + if(use_list) { + (*use_list)->used_event_loops.push_back(event_loop->loop_id); + } + + return event; +} + +void NetworkEventLoop::event_loop_dispatch(EventLoop *event_loop) { + debugMessage(LOG_GENERAL, "Network event loop {} started.", event_loop->loop_id); + auto result = event_base_loop(event_loop->event_base, EVLOOP_NO_EXIT_ON_EMPTY); + if(result < 0) { + logError(LOG_GENERAL, "Network event loop exited due to an error."); + } else if(result == 0) { + debugMessage(LOG_GENERAL, "Network event loop {} exited.", event_loop->loop_id); + } else if(result > 0) { + logError(LOG_GENERAL, "Network event loop exited because of no pending events. This should not happen!"); + } +} \ No newline at end of file diff --git a/server/src/server/GlobalNetworkEvents.h b/server/src/server/GlobalNetworkEvents.h new file mode 100644 index 0000000..374b48f --- /dev/null +++ b/server/src/server/GlobalNetworkEvents.h @@ -0,0 +1,58 @@ +// +// Created by WolverinDEV on 15/04/2021. +// + +#pragma once + +#include +#include +#include +#include + +namespace ts::server { + struct NetworkEventLoopUseList; + class NetworkEventLoop { + public: + typedef uint32_t EventLoopId; + explicit NetworkEventLoop(size_t /* thread pool size */); + ~NetworkEventLoop(); + + [[nodiscard]] bool initialize(); + void shutdown(); + + [[nodiscard]] inline size_t loop_count() const { return this->event_loop_size; } + + /** + * Allocate a new event on the network event loop. + * @param fd + * @param events + * @param callback + * @param callback_arg + * @return `nullptr` if an error occurred and an even otherwise + */ + [[nodiscard]] struct event* allocate_event( + evutil_socket_t /* fd */, + short /* events */, + event_callback_fn /* callback */, + void */* callback_arg */, + NetworkEventLoopUseList** /* containing all loops the event has already been bound to */ + ); + + void free_use_list(NetworkEventLoopUseList* /* use list */); + private: + struct EventLoop { + const EventLoopId loop_id; + struct event_base* event_base{nullptr}; + std::thread dispatcher{}; + }; + + size_t event_loop_size; + + std::mutex mutex{}; + EventLoopId event_loop_id_index{1}; + size_t event_loop_index{0}; + std::vector event_loops{}; + + static void event_loop_dispatch(EventLoop*); + }; +} \ No newline at end of file diff --git a/server/src/server/QueryServer.cpp b/server/src/server/QueryServer.cpp index b8d3b7b..939b9fd 100644 --- a/server/src/server/QueryServer.cpp +++ b/server/src/server/QueryServer.cpp @@ -12,6 +12,7 @@ #include #include #include +#include "./GlobalNetworkEvents.h" using namespace std; using namespace std::chrono; @@ -75,25 +76,6 @@ bool QueryServer::start(const deque> &bindings_ } } - /* setup event bases */ - { - this->event_io_loop = event_base_new(); - this->event_io_thread = std::thread{[&]{ - while(this->active) { - debugMessage(LOG_QUERY, "Entering event loop ({})", (void*) this->event_io_loop); - event_base_loop(this->event_io_loop, EVLOOP_NO_EXIT_ON_EMPTY); - if(this->active) { - debugMessage(LOG_QUERY, "Event loop exited ({}). No active events. Sleeping 1 seconds", (void*) this->event_io_loop); - this_thread::sleep_for(seconds(1)); - } else { - debugMessage(LOG_QUERY, "Event loop exited ({})", (void*) this->event_io_loop); - } - } - }}; - - threads::name(this->event_io_thread, "query io"); - } - for(auto& binding : bindings_) { binding->file_descriptor = socket(binding->address.ss_family, (unsigned) SOCK_STREAM | (unsigned) SOCK_NONBLOCK, 0); if(binding->file_descriptor < 0) { @@ -134,7 +116,13 @@ bool QueryServer::start(const deque> &bindings_ continue; } - binding->event_accept = event_new(this->event_io_loop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer *) c)->on_client_receive(a, b, c); }, this); + binding->event_accept = serverInstance->network_event_loop()->allocate_event(binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer *) c)->on_client_receive(a, b, c); }, this, nullptr); + if(!binding->event_accept) { + logError(LOG_QUERY, "Failed to allocate accept event for query binding", binding->as_string()); + close(binding->file_descriptor); + continue; + } + event_add(binding->event_accept, nullptr); this->bindings.push_back(binding); } @@ -227,17 +215,6 @@ void QueryServer::stop() { } } - /* 5. Shutdown the io event loop */ - if(this->event_io_loop) { - event_base_loopexit(this->event_io_loop, nullptr); - } - threads::save_join(this->event_io_thread, false); - - if(this->event_io_loop) { - event_base_free(this->event_io_loop); - this->event_io_loop = nullptr; - } - /* 6. Cleanup the servers reserve file descriptor */ if(this->server_reserve_fd > 0) { if(close(this->server_reserve_fd) < 0) { @@ -319,7 +296,9 @@ inline void send_direct_disconnect(const sockaddr_storage& address, int file_des //dummyfdflood //dummyfdflood clear -void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { +void QueryServer::on_client_receive(int server_file_descriptor, short, void *ptr_server) { + auto query_server = (QueryServer*) ptr_server; + sockaddr_storage remote_address{}; memset(&remote_address, 0, sizeof(sockaddr_in)); socklen_t address_length = sizeof(remote_address); @@ -339,16 +318,16 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { bool tmp_close_success{false}; { - lock_guard reserve_fd_lock(server_reserve_fd_lock); - if(this->server_reserve_fd > 0) { + lock_guard reserve_fd_lock(query_server->server_reserve_fd_lock); + if(query_server->server_reserve_fd > 0) { debugMessage(LOG_QUERY, "Trying to accept client with the reserved file descriptor to send him a protocol limit reached exception."); auto _ = [&]{ - if(close(this->server_reserve_fd) < 0) { + if(close(query_server->server_reserve_fd) < 0) { debugMessage(LOG_QUERY, "Failed to close reserved file descriptor"); tmp_close_success = false; return; } - this->server_reserve_fd = 0; + query_server->server_reserve_fd = 0; errno = 0; client_file_descriptor = accept(server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); @@ -360,8 +339,8 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { } else { debugMessage(LOG_QUERY, "[{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno)); } - this->server_reserve_fd = dup(1); - if(this->server_reserve_fd < 0) { + query_server->server_reserve_fd = dup(1); + if(query_server->server_reserve_fd < 0) { debugMessage(LOG_QUERY, "[{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address)); } else { tmp_close_success = true; @@ -373,8 +352,8 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { static auto resource_limit_error = R"(error id=57344 msg=query\sserver\sresource\slimit\sreached extra_msg=file\sdescriptor\slimit\sexceeded)"; send_direct_disconnect(remote_address, client_file_descriptor, resource_limit_error, strlen(resource_limit_error)); - this->server_reserve_fd = dup(1); - if(this->server_reserve_fd < 0) { + query_server->server_reserve_fd = dup(1); + if(query_server->server_reserve_fd < 0) { debugMessage(LOG_QUERY, "Failed to reclaim reserved file descriptor. Future clients cant be accepted!"); } else { tmp_close_success = true; @@ -387,10 +366,10 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { if(!tmp_close_success) { debugMessage(LOG_QUERY, "Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)"); - for(auto& binding : this->bindings) { + for(auto& binding : query_server->bindings) { event_del_noblock(binding->event_accept); } - accept_event_deleted = system_clock::now(); + query_server->accept_event_deleted = system_clock::now(); return; } return; @@ -400,9 +379,9 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { } { - unique_lock lock{this->connected_clients_mutex}; + unique_lock lock{query_server->connected_clients_mutex}; auto max_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS].as_unchecked(); - if(max_connections > 0 && max_connections <= this->connected_clients.size()) { + if(max_connections > 0 && max_connections <= query_server->connected_clients.size()) { lock.unlock(); logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many connected query clients.", logging_address(remote_address)); static auto query_server_full = R"(error id=4611 msg=max\sclients\sreached)"; @@ -413,7 +392,7 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { auto max_ip_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS_PER_IP].as_unchecked(); if(max_ip_connections > 0) { size_t connection_count = 0; - for(auto& client : this->connected_clients) { + for(auto& client : query_server->connected_clients) { if(net::address_equal(client->remote_address, remote_address)) connection_count++; } @@ -428,13 +407,13 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { } } - auto client = std::make_shared(this, client_file_descriptor); + auto client = std::make_shared(query_server, client_file_descriptor); memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); client->initialize_weak_reference(client); { - lock_guard lock(this->connected_clients_mutex); - this->connected_clients.push_back(client); + lock_guard lock(query_server->connected_clients_mutex); + query_server->connected_clients.push_back(client); } client->preInitialize(); @@ -444,79 +423,6 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) { logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); } -/* -#define QUERY_PASSWORD_LENGTH 7 -std::string QueryServer::resetQueryPassword(const std::shared_ptr& credits, std::string password) { - if(password.empty()) password = rnd_string(QUERY_PASSWORD_LENGTH); - auto password_copy = password; - auto res = sql::command(this->sql, "UPDATE `queries` SET `password` = :password WHERE `uniqueId` = :uid AND `username` = :name", variable{":name", credits->username}, variable{":uid", credits->uniqueId}, variable{":password", password}).execute(); - LOG_SQL_CMD(res); - return password_copy; //Analize why I have to copy that shit here? -} - -std::string QueryServer::createQueryLogin(const string &name, ClientUid uid, string password) { - bool exists = false; - sql::command(this->sql, "SELECT * FROM `queries` WHERE `username` = :name", variable{":name", name}).query([](bool* flag, int, char**, char**){ - *flag = true; - return 0; - }, &exists); - if(exists) return ""; - - if(password.empty()) - password = rnd_string(QUERY_PASSWORD_LENGTH); - sql::command(this->sql, "INSERT INTO `queries` (`username`, `password`, `uniqueId`) VALUES (:name, :password, :uid)", variable{":name", name}, variable{":uid", uid}, variable{":password", password}).execute(); - - return password; -} - -bool QueryServer::renameQueryLogin(ClientUid uid, const string &name) { - bool exists = false; - sql::command(this->sql, "SELECT * FROM `queries` WHERE `username` = :name", variable{":name", name}).query([](bool* flag, int, char**, char**){ - *flag = true; - return 0; - }, &exists); - if(!exists) return false; - - auto res = sql::command(this->sql, "UPDATE `queries` SET `username` = :name WHERE `uniqueId` = :uid", variable{":name", name}, variable{":uid", uid}).execute(); - LOG_SQL_CMD(res); - - return res; -} - -std::shared_ptr QueryServer::findQueryLoginByName(const string &name) { - std::shared_ptr result = std::make_shared(); - sql::command(this->sql, "SELECT * FROM `queries` WHERE `username` = :name", variable{":name", name}).query([](QueryLoginCredentials* res, int length, char** value, char** columns){ - for(int index = 0; index < length; index++) - if(strcmp(columns[index], "username") == 0) - res->username = value[index]; - else if(strcmp(columns[index], "password") == 0) - res->password = value[index]; - else if(strcmp(columns[index], "uniqueId") == 0) - res->uniqueId = value[index]; - return 0; - }, result.get()); - if(result->username.empty()) return nullptr; - return result; -} - -std::shared_ptr QueryServer::findQueryLoginByUid(const string &uid) { - std::shared_ptr result = std::make_shared(); - sql::command(this->sql, "SELECT * FROM `queries` WHERE `uniqueId` = :name", variable{":name", uid}).query([](QueryLoginCredentials *res, int length, char **value, char **columns) { - for (int index = 0; index < length; index++) - if (strcmp(columns[index], "username") == 0) - res->username = value[index]; - else if (strcmp(columns[index], "password") == 0) - res->password = value[index]; - else if (strcmp(columns[index], "uniqueId") == 0) - res->uniqueId = value[index]; - return 0; - }, result.get()); - if (result->username.empty()) return nullptr; - - return result; -} - */ - /* api */ inline deque> query_accounts(sql::command& command) { deque> result; diff --git a/server/src/server/QueryServer.h b/server/src/server/QueryServer.h index 040e59c..07401d0 100644 --- a/server/src/server/QueryServer.h +++ b/server/src/server/QueryServer.h @@ -84,10 +84,6 @@ namespace ts { std::unique_ptr ip_whitelist; std::unique_ptr ip_blacklist; - //IO stuff - event_base* event_io_loop{nullptr}; - std::thread event_io_thread{}; - std::chrono::system_clock::time_point accept_event_deleted; std::mutex connected_clients_mutex{}; @@ -107,7 +103,7 @@ namespace ts { std::deque> tick_pending_connection_close{}; std::chrono::system_clock::time_point tick_next_client_timestamp{}; - void on_client_receive(int server_file_descriptor, short ev, void *arg); + static void on_client_receive(int, short, void *); void tick_clients(); void tick_executor(); diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index d73984d..8974990 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -4,19 +4,12 @@ #include #include #include -#include "VoiceServer.h" #include "../client/voice/VoiceClient.h" -#include "../Configuration.h" #include -#include "src/VirtualServer.h" #include #include "src/VirtualServerManager.h" #include "../InstanceHandler.h" #include -#include -#include -#include -#include "misc/timer.h" using namespace std; using namespace std::chrono; @@ -87,19 +80,20 @@ bool VoiceServer::start(const std::deque>& b bind->file_descriptor = 0; continue; } + fcntl(bind->file_descriptor, F_SETFL, fcntl(bind->file_descriptor, F_GETFL, 0) | O_NONBLOCK); } { - auto bindings = this->activeBindings(); - if(bindings.empty()) { + auto active_bindings = this->activeBindings(); + if(active_bindings.empty()) { error = "Failed to bind any address!"; this->running = false; return false; } string str; - for(auto it = bindings.begin(); it != bindings.end(); it++) { - str += net::to_string((*it)->address) + (it + 1 == bindings.end() ? "" : " | "); + for(auto it = active_bindings.begin(); it != active_bindings.end(); it++) { + str += net::to_string((*it)->address) + (it + 1 == active_bindings.end() ? "" : " | "); } logMessage(this->server->getServerId(), "Started server on {}.", str); } @@ -115,8 +109,8 @@ void VoiceServer::triggerWrite(const std::shared_ptr& client) { return; } - if(auto io{this->io}; io) { - io->invoke_write(client); + if(auto io_{this->io}; io_) { + io_->invoke_write(client); } } @@ -237,25 +231,27 @@ bool VoiceServer::unregisterConnection(std::shared_ptr connection) } static union { - char literal[8] = {'T', 'S', '3', 'I', 'N', 'I', 'T', '1'}; + char literal[8]{'T', 'S', '3', 'I', 'N', 'I', 'T', '1'}; uint64_t integral; } TS3INIT; +constexpr static auto kRecvBufferSize{1600}; //IPv6 MTU: 1500 | IPv4 MTU: 576 void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { + (void) events; + auto event_handle = (io::IOEventLoopEntry*) _event_handle; auto voice_server = event_handle->voice_server; auto ts_server = event_handle->server; - size_t raw_read_buffer_length = event_handle->family == AF_INET ? 600 : 1600; //IPv6 MTU: 1500 | IPv4 MTU: 576 - uint8_t raw_read_buffer[raw_read_buffer_length]; //Allocate on stack, so we dont need heap here + uint8_t raw_read_buffer[kRecvBufferSize]; //Allocate on stack, so we dont need heap here - ssize_t bytes_read = 0; - pipes::buffer_view read_buffer{raw_read_buffer, raw_read_buffer_length}; /* will not allocate anything, just sets its mode to ptr and thats it :) */ + ssize_t bytes_read; + pipes::buffer_view read_buffer{raw_read_buffer, kRecvBufferSize}; /* will not allocate anything, just sets its mode to ptr and that's it :) */ sockaddr_storage remote_address{}; iovec io_vector{}; io_vector.iov_base = (void*) raw_read_buffer; - io_vector.iov_len = raw_read_buffer_length; + io_vector.iov_len = kRecvBufferSize; char message_headers[0x100]; @@ -267,13 +263,19 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { message.msg_control = message_headers; message.msg_controllen = 0x100; - auto read_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ + auto read_timeout = system_clock::now() + microseconds{2500}; /* read 2.5ms long at a time or 'till nothing more is there */ while(system_clock::now() <= read_timeout){ message.msg_flags = 0; bytes_read = recvmsg(fd, &message, 0); if((message.msg_flags & MSG_TRUNC) > 0) { - logError(ts_server->getServerId(), "Received truncated message from {}", net::to_string(remote_address)); + static std::chrono::system_clock::time_point last_error_message{}; + auto now = system_clock::now(); + if(last_error_message + std::chrono::seconds{5} < now) { + logError(ts_server->getServerId(), "Received truncated message from {}", net::to_string(remote_address)); + last_error_message = now; + } + continue; } if(bytes_read < 0) { @@ -295,27 +297,36 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { continue; } - std::shared_ptr client; + if(*(uint64_t*) raw_read_buffer == TS3INIT.integral) { + //Handle ddos protection... + voice_server->pow_handler->handle_datagram(event_handle->socket_id, remote_address, message, read_buffer.view(0, bytes_read)); + continue; + } + + protocol::ClientPacketParser packet_parser{read_buffer.view(0, bytes_read)}; + if(!packet_parser.valid()) { + return; + } + + std::shared_ptr client{}; { - if(*(uint64_t*) raw_read_buffer == TS3INIT.integral) { - //Handle ddos protection... - voice_server->pow_handler->handle_datagram(event_handle->socket_id, remote_address, message, read_buffer.view(0, bytes_read)); + auto client_id = packet_parser.client_id(); + if(client_id > 0) { + client = dynamic_pointer_cast(voice_server->server->find_client_by_id(client_id)); } else { - auto client_id = (ClientId) be2le16(&raw_read_buffer[10]); - if(client_id > 0) { - client = dynamic_pointer_cast(voice_server->server->find_client_by_id(client_id)); - } else { - client = voice_server->findClient(&remote_address, true); - } + client = voice_server->findClient(&remote_address, true); } } - if(!client) + if(!client) { continue; + } if(memcmp(&client->remote_address, &remote_address, sizeof(sockaddr_storage)) != 0) { /* verify the remote address */ - if((read_buffer[12] & 0x80) == 0 && client->state == ConnectionState::CONNECTED) { /* only encrypted packets are allowed */ - if(client->connection->verify_encryption(read_buffer.view(0, bytes_read))) { /* the ip had changed */ + /* only encrypted packets are allowed */ + if(!packet_parser.has_flag(protocol::PacketFlag::Unencrypted) && client->state == ConnectionState::CONNECTED) { + /* the ip had changed */ + if(client->connection->verify_encryption(packet_parser)) { auto old_address = net::to_string(client->remote_address); auto new_address = net::to_string(remote_address); @@ -329,8 +340,8 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { } } - if(client->state != ConnectionState::DISCONNECTED){ - client->connection->handle_incoming_datagram(read_buffer.view(0, bytes_read)); + if(client->state != ConnectionState::DISCONNECTED) { + client->connection->handle_incoming_datagram(packet_parser); client = nullptr; } } @@ -415,6 +426,8 @@ inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, } void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) { + (void) events; + using WBufferPopResult = server::udp::PacketEncoder::BufferPopResult; auto event_handle = (io::IOEventLoopEntry*) _event_handle; auto voice_server = event_handle->voice_server; @@ -434,8 +447,9 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) while(system_clock::now() <= write_timeout){ if(!client) { auto client_queue_state = event_handle->pop_voice_write_queue(client); /* we need a new client, the old client has nothing more to do */ - if(client_queue_state == 2) + if(client_queue_state == 2) { break; + } assert(client); more_clients = (bool) client_queue_state; @@ -525,6 +539,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) retrigger |= packet != nullptr; /* memory stored at packet is not accessible anymore. But anyways pop_dg_write_queue returns 0 if there is nothing more */ } + if(retrigger) { event_add(event_handle->event_write, nullptr); } diff --git a/server/src/server/VoiceServer.h b/server/src/server/VoiceServer.h index f542d4d..4f8b53e 100644 --- a/server/src/server/VoiceServer.h +++ b/server/src/server/VoiceServer.h @@ -73,9 +73,9 @@ namespace ts { std::recursive_mutex connectionLock; std::deque> activeConnections; public: + void tickHandshakingClients(); void triggerWrite(const std::shared_ptr &); - void tickHandshakingClients(); void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */); void send_datagram(int /* socket */, udp::DatagramPacket* /* packet */); diff --git a/server/src/server/WebIoManager.cpp b/server/src/server/WebIoManager.cpp deleted file mode 100644 index fa7ff20..0000000 --- a/server/src/server/WebIoManager.cpp +++ /dev/null @@ -1,53 +0,0 @@ -#include "log/LogUtils.h" -#include "WebIoManager.h" -#include "src/Configuration.h" - -using namespace std; -using namespace ts::webio; - -void EventLoop::invoker_loop() { - const timeval tick_timeout{10, 0}; - const auto original_base = this->loop; - while(this->loop == original_base) { - event_base_loopexit(original_base, &tick_timeout); - event_base_dispatch(original_base); - } - debugMessage(LOG_GENERAL, "Event base dispatch of {} ended.", (void*) original_base); -} - -LoopManager::LoopManager() { - lock_guard lock(this->loop_lock); - for(int i = 0; i < config::threads::web::io_loops; i++) { - auto loop = make_shared(); - loop->loop = event_base_new(); - loop->invoker = make_unique(&EventLoop::invoker_loop, loop.get()); - - pthread_setname_np(loop->invoker->native_handle(), ("Web IO #" + to_string(i)).c_str()); - - this->loops.push_back(move(loop)); - } -} - -LoopManager::~LoopManager() { - lock_guard lock(this->loop_lock); - for(const auto& loop : this->loops) { - const auto base = loop->loop; - loop->loop = nullptr; - - const timeval timeout_now{0, 1}; - event_base_loopexit(base, &timeout_now); - - if(loop->invoker && loop->invoker->joinable()) - loop->invoker->join(); - - event_base_free(base); - } -} - -std::shared_ptr LoopManager::next_loop() { - lock_guard lock(this->loop_lock); - if(this->loops.empty()) - return nullptr; - - return this->loops[this->loop_index++ % this->loops.size()]; -} \ No newline at end of file diff --git a/server/src/server/WebIoManager.h b/server/src/server/WebIoManager.h deleted file mode 100644 index 2e8ec54..0000000 --- a/server/src/server/WebIoManager.h +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace ts { - namespace webio { - class LoopManager; - struct EventLoop { - friend class LoopManager; - public: - event_base* loop; - std::unique_ptr invoker; - - private: - void invoker_loop(); - }; - class LoopManager { - public: - LoopManager(); - ~LoopManager(); - - std::shared_ptr next_loop(); - private: - std::atomic loop_index{0}; - std::mutex loop_lock; - std::deque> loops; - }; - } -} \ No newline at end of file diff --git a/server/src/server/WebServer.cpp b/server/src/server/WebServer.cpp index eeddc40..820a92c 100644 --- a/server/src/server/WebServer.cpp +++ b/server/src/server/WebServer.cpp @@ -3,8 +3,8 @@ #include "WebServer.h" #include "src/client/web/WebClient.h" #include -#include #include "src/InstanceHandler.h" +#include "./GlobalNetworkEvents.h" using namespace std; using namespace std::chrono; @@ -18,7 +18,7 @@ using namespace ts::server; WebControlServer::WebControlServer(const std::shared_ptr& handle) : handle(handle) {} WebControlServer::~WebControlServer() = default; -bool WebControlServer::start(const std::deque>& bindings, std::string& error) { +bool WebControlServer::start(const std::deque>& target_bindings, std::string& error) { if(this->running()) { error = "server already running"; return false; @@ -33,7 +33,7 @@ bool WebControlServer::start(const std::dequefile_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0); if(binding->file_descriptor < 0) { logError(this->handle->getServerId(), "[Web] Failed to bind server to {}. (Failed to create socket: {} | {})", binding->as_string(), errno, strerror(errno)); @@ -67,9 +67,13 @@ bool WebControlServer::start(const std::dequegetWebIoLoop()->next_loop(); - assert(io_base); - binding->event_accept = event_new(io_base->loop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((WebControlServer *) c)->on_client_receive(a, b, c); }, this); + binding->event_accept = serverInstance->network_event_loop()->allocate_event(binding->file_descriptor, EV_READ | EV_PERSIST, WebControlServer::on_client_receive, this, nullptr); + if(!binding->event_accept) { + logError(this->handle->getServerId(), "[Web] Failed to allocate network event for binding {}.", binding->as_string()); + close(binding->file_descriptor); + continue; + } + event_add(binding->event_accept, nullptr); this->bindings.push_back(binding); } @@ -97,12 +101,14 @@ inline std::string logging_address(const sockaddr_storage& address) { return net::to_string(address, true); } -void WebControlServer::on_client_receive(int _server_file_descriptor, short ev, void *arg) { +void WebControlServer::on_client_receive(int server_file_descriptor_, short, void *ptr_server) { + auto server = (WebControlServer*) ptr_server; + sockaddr_storage remote_address{}; memset(&remote_address, 0, sizeof(remote_address)); socklen_t address_length = sizeof(remote_address); - int file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); + int file_descriptor = accept(server_file_descriptor_, (struct sockaddr *) &remote_address, &address_length); if (file_descriptor < 0) { if(errno == EAGAIN) { return; @@ -110,78 +116,78 @@ void WebControlServer::on_client_receive(int _server_file_descriptor, short ev, if(errno == EMFILE || errno == ENFILE) { if(errno == EMFILE) { - logError(this->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process file descriptor limit."); + logError(server->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process file descriptor limit."); } else { - logError(this->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process and system-wide file descriptor limit."); + logError(server->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process and system-wide file descriptor limit."); } bool tmp_close_success = false; { - lock_guard reserve_fd_lock(server_reserve_fd_lock); - if(this->server_reserve_fd > 0) { - debugMessage(this->handle->getServerId(), "[Web] Trying to accept client with the reserved file descriptor to close the incoming connection."); + lock_guard reserve_fd_lock(server->server_reserve_fd_lock); + if(server->server_reserve_fd > 0) { + debugMessage(server->handle->getServerId(), "[Web] Trying to accept client with the reserved file descriptor to close the incoming connection."); auto _ = [&]{ - if(close(this->server_reserve_fd) < 0) { - debugMessage(this->handle->getServerId(), "[Web] Failed to close reserved file descriptor"); + if(close(server->server_reserve_fd) < 0) { + debugMessage(server->handle->getServerId(), "[Web] Failed to close reserved file descriptor"); tmp_close_success = false; return; } - this->server_reserve_fd = 0; + server->server_reserve_fd = 0; errno = 0; - file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); + file_descriptor = accept(server_file_descriptor_, (struct sockaddr *) &remote_address, &address_length); if(file_descriptor < 0) { if(errno == EMFILE || errno == ENFILE) - debugMessage(this->handle->getServerId(), "[Web] [{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address)); + debugMessage(server->handle->getServerId(), "[Web] [{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address)); else if(errno == EAGAIN); else { - debugMessage(this->handle->getServerId(), "[Web] [{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno)); + debugMessage(server->handle->getServerId(), "[Web] [{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno)); } - this->server_reserve_fd = dup(1); - if(this->server_reserve_fd < 0) - debugMessage(this->handle->getServerId(), "[Web] [{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address)); + server->server_reserve_fd = dup(1); + if(server->server_reserve_fd < 0) + debugMessage(server->handle->getServerId(), "[Web] [{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address)); else tmp_close_success = true; return; } - debugMessage(this->handle->getServerId(), "[Web] [{}] Successfully accepted client via reserved descriptor (fd: {}). Disconnecting client.", logging_address(remote_address), file_descriptor); + debugMessage(server->handle->getServerId(), "[Web] [{}] Successfully accepted client via reserved descriptor (fd: {}). Disconnecting client.", logging_address(remote_address), file_descriptor); CLOSE_CONNECTION - this->server_reserve_fd = dup(1); - if(this->server_reserve_fd < 0) - debugMessage(this->handle->getServerId(), "[Web] Failed to reclaim reserved file descriptor. Future clients cant be accepted!"); + server->server_reserve_fd = dup(1); + if(server->server_reserve_fd < 0) + debugMessage(server->handle->getServerId(), "[Web] Failed to reclaim reserved file descriptor. Future clients cant be accepted!"); else tmp_close_success = true; - logMessage(this->handle->getServerId(), "[Web] [{}] Dropping file transfer connection attempt because of too many open file descriptors.", logging_address(remote_address)); + logMessage(server->handle->getServerId(), "[Web] [{}] Dropping file transfer connection attempt because of too many open file descriptors.", logging_address(remote_address)); }; _(); } } if(!tmp_close_success) { - debugMessage(this->handle->getServerId(), "[Web] Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)"); - for(auto& binding : this->bindings) + debugMessage(server->handle->getServerId(), "[Web] Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)"); + for(auto& binding : server->bindings) event_del_noblock(binding->event_accept); - accept_event_deleted = system_clock::now(); + server->accept_event_deleted = system_clock::now(); return; } return; } - logMessage(this->handle->getServerId(), "[Web] Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno)); + logMessage(server->handle->getServerId(), "[Web] Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno)); return; } - auto client = std::make_shared(this, file_descriptor); + auto client = std::make_shared(server, file_descriptor); memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); client->initialize_weak_reference(client); client->initialize(); - this->clientLock.lock(); - this->clients.push_back(client); - this->clientLock.unlock(); + server->clientLock.lock(); + server->clients.push_back(client); + server->clientLock.unlock(); event_add(client->readEvent, nullptr); - logMessage(this->handle->getServerId(), "[Web] Got new client from {}:{}", client->getLoggingPeerIp(), client->getPeerPort()); + logMessage(server->handle->getServerId(), "[Web] Got new client from {}:{}", client->getLoggingPeerIp(), client->getPeerPort()); } void WebControlServer::stop() { diff --git a/server/src/server/WebServer.h b/server/src/server/WebServer.h index 2729b37..37d93eb 100644 --- a/server/src/server/WebServer.h +++ b/server/src/server/WebServer.h @@ -50,7 +50,7 @@ namespace ts { //IO stuff std::chrono::system_clock::time_point accept_event_deleted; private: - void on_client_receive(int fd, short ev, void *arg); + static void on_client_receive(int, short, void *); void unregisterConnection(const std::shared_ptr&); }; } diff --git a/shared b/shared index 0a73e4c..0726cd6 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 0a73e4c10c4af3b6f982c0e2986f122e75bd20a8 +Subproject commit 0726cd6c95ff5597bfc20ac2bb560ad03ace7b49