#include #include #include "VirtualServerManager.h" #include "src/server/VoiceServer.h" #include "InstanceHandler.h" #include "src/server/file/FileServer.h" #include "src/client/ConnectedClient.h" #include using namespace std; using namespace std::chrono; using namespace ts::server; VirtualServerManager::VirtualServerManager(InstanceHandler* handle) : handle(handle) { this->puzzles = new protocol::PuzzleManager(); this->handshakeTickers = new threads::Scheduler(1, "handshake ticker"); this->execute_loop = new event::EventExecutor("executor #"); //this->join_loop = new event::EventExecutor("joiner #"); this->_ioManager = new io::VoiceIOManager(); this->handshakeTickers->schedule("ticker", [&](){ this->tickHandshakeClients(); }, seconds(1)); } VirtualServerManager::~VirtualServerManager() { this->state = State::STOPPED; { threads::MutexLock lock(this->instanceLock); this->instances.clear(); } { this->acknowledge.condition.notify_all(); if(!threads::timed_join(this->acknowledge.executor,std::chrono::seconds{2})) { logCritical(LOG_GENERAL, "Failed to shutdown packet resend thread."); this->acknowledge.executor.detach(); } } delete this->puzzles; this->puzzles = nullptr; if(this->execute_loop) this->execute_loop->shutdown(); delete this->execute_loop; this->execute_loop = nullptr; if(this->join_loop) this->join_loop->shutdown(); delete this->join_loop; this->join_loop = nullptr; if(this->handshakeTickers) { this->handshakeTickers->shutdown(); } delete this->handshakeTickers; this->handshakeTickers = nullptr; if(this->_ioManager) this->_ioManager->shutdownGlobally(); delete this->_ioManager; this->_ioManager = nullptr; } bool VirtualServerManager::initialize(bool autostart) { this->execute_loop->initialize(1); this->state = State::STARTING; logMessage(LOG_INSTANCE, "Generating server puzzles..."); auto start = system_clock::now(); this->puzzles->precomputePuzzles(config::voice::DefaultPuzzlePrecomputeSize); logMessage(LOG_INSTANCE, "Puzzles generated! Time required: " + to_string(duration_cast(system_clock::now() - start).count()) + "ms"); size_t serverCount = 0; sql::command(this->handle->getSql(), "SELECT COUNT(`serverId`) FROM `servers`").query([](size_t& ptr, int, char** v, char**) { ptr = stoll(v[0]); return 0; }, serverCount); { logMessage(LOG_INSTANCE, "Loading startup cache (This may take a while)"); auto beg = system_clock::now(); this->handle->databaseHelper()->loadStartupCache(); auto end = system_clock::now(); logMessage(LOG_INSTANCE, "Required {}ms to preload the startup cache. Cache needs {}mb", duration_cast(end - beg).count(), this->handle->databaseHelper()->cacheBinarySize() / 1024 / 1024 ); } auto beg = system_clock::now(); size_t server_count = 0; sql::command(this->handle->getSql(), "SELECT `serverId`, `host`, `port` FROM `servers`").query([&](VirtualServerManager* mgr, int length, std::string* values, std::string* columns){ ServerId id = 0; std::string host; uint16_t port = 0; for(int index = 0; index < length; index++) { try { if(columns[index] == "serverId") id = static_cast(stoll(values[index])); else if(columns[index] == "host") host = values[index]; else if(columns[index] == "port") port = static_cast(stoul(values[index])); } catch(std::exception& ex) { logError(LOG_INSTANCE, "Failed to parse virtual server from database. Failed to parse field {} with value {}: {}", columns[index], values[index], ex.what()); return 0; } } if(id == 0) { logError(LOG_INSTANCE, "Failed to load virtual server from database. Server id is zero!"); return 0; } if(host.empty()) { logWarning(id, "The loaded host is empty. Using default one (from the config.yml)"); host = config::binding::DefaultVoiceHost; } if(port == 0) { logError(LOG_INSTANCE, "Failed to load virtual server from database. Server port is zero!"); return 0; } auto server = make_shared(id, this->handle->getSql()); server->self = server; if(!server->initialize(true)) { //FIXME error handling } server->properties()[property::VIRTUALSERVER_HOST] = host; server->properties()[property::VIRTUALSERVER_PORT] = port; { threads::MutexLock l(this->instanceLock); this->instances.push_back(server); } if(autostart && server->properties()[property::VIRTUALSERVER_AUTOSTART].as()) { logMessage(server->getServerId(), "Starting server"); string msg; try { if(!server->start(msg)) logError(server->getServerId(), "Failed to start server.\n Message: " + msg); } catch (const std::exception& ex) { logError(server->getServerId(), "Could not start server! Got an active exception. Message {}", ex.what()); } } if(id > 0) this->handle->databaseHelper()->clearStartupCache(id); server_count++; return 0; }, this); auto time = duration_cast(system_clock::now() - beg).count(); logMessage(LOG_INSTANCE, "Loaded {} servers within {}ms. Server/sec: {:2f}", server_count, time, (float) server_count / (time / 1024 == 0 ? 1 : time / 1024) ); this->handle->databaseHelper()->clearStartupCache(0); this->adjust_executor_threads(); { this->acknowledge.executor = std::thread([&]{ system_clock::time_point next_execute = system_clock::now() + milliseconds(500); while(this->state == State::STARTED || this->state == State::STARTING) { unique_lock lock(this->acknowledge.lock); this->acknowledge.condition.wait_until(lock, next_execute, [&](){ return this->state != State::STARTED && this->state != State::STARTING; }); auto now = system_clock::now(); next_execute = now + milliseconds(500); for(const auto& server : this->serverInstances()) { auto vserver = server->getVoiceServer(); //Read this only once if(vserver) vserver->execute_resend(now, next_execute); } } return 0; }); } this->state = State::STARTED; return true; } shared_ptr VirtualServerManager::findServerById(ServerId sid) { for(auto server : this->serverInstances()) if(server->getServerId() == sid) return server; return nullptr; } shared_ptr VirtualServerManager::findServerByPort(uint16_t port) { for(const auto& server : this->serverInstances()){ if(server->properties()[property::VIRTUALSERVER_PORT] == port) return server; if(server->running() && server->getVoiceServer()) for(const auto& binding : server->getVoiceServer()->activeBindings()) if(binding->address_port() == port) return server; } return nullptr; } uint16_t VirtualServerManager::next_available_port() { auto instances = this->serverInstances(); deque unallowed_ports; for(const auto& instance : instances) { unallowed_ports.push_back(instance->properties()[property::VIRTUALSERVER_PORT].as()); auto vserver = instance->getVoiceServer(); if(instance->running() && vserver) { for(const auto& binding : vserver->activeBindings()) { unallowed_ports.push_back(binding->address_port()); } } } uint16_t port = config::voice::default_voice_port; while(true) { if(port < 1024) goto c; for(auto& p : unallowed_ports) { if(p == port) goto c; } break; c: port++; } return port; } ts::ServerId VirtualServerManager::next_available_server_id(bool& success) { auto server_id_base = this->handle->properties()[property::SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX].as(); if(server_id_base > 65530) { success = false; return 0; } ServerId serverId = server_id_base != 0 ? server_id_base : (ServerId) 1; auto instances = this->serverInstances(); vector used_ids; used_ids.reserve(instances.size()); for(const auto& server : instances) used_ids.push_back(server->getServerId()); std::stable_sort(used_ids.begin(), used_ids.end(), [](ServerId a, ServerId b) { return b > a; }); while(true) { auto it = used_ids.begin(); while(it != used_ids.end() && *it < serverId) it++; if(it == used_ids.end() || *it != serverId) { break; } else { used_ids.erase(used_ids.begin(), it + 1); serverId++; } } /* increase counter */ if(server_id_base != 0) this->handle->properties()[property::SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX] = serverId; success = true; return serverId; } ServerReport VirtualServerManager::report() { ServerReport result{}; for(const auto& sr : this->serverInstances()) { result.avariable++; if(sr->running()) { result.online++; result.slots += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as(); result.onlineClients += sr->onlineClients(); result.onlineChannels += sr->onlineChannels(); } } return result; } OnlineClientReport VirtualServerManager::clientReport() { OnlineClientReport result{}; for(const auto& server : this->serverInstances()) { if(!server->running()) continue; auto sr = server->onlineStats(); result.bots += sr.bots; result.queries += sr.queries; result.clients_web += sr.clients_web; result.clients_ts += sr.clients_ts; } return result; } size_t VirtualServerManager::runningServers() { size_t res = 0; for(const auto& sr : this->serverInstances()) if(sr->running()) res++; return res; } size_t VirtualServerManager::usedSlots() { size_t res = 0; for(const auto& sr : this->serverInstances()) res += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as(); return res; } shared_ptr VirtualServerManager::createServer(std::string hosts, uint16_t port) { bool sid_success = false; ServerId serverId = this->next_available_server_id(sid_success); if(!sid_success) return nullptr; sql::command(this->handle->getSql(), "INSERT INTO `servers` (`serverId`, `host`, `port`) VALUES (:sid, :host, :port)", variable{":sid", serverId}, variable{":host", hosts}, variable{":port", port}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); //`serverId` INTEGER DEFAULT -1, `type` INTEGER, `id` INTEGER, `key` VARCHAR(" UNKNOWN_KEY_LENGTH "), `value` TEXT auto prop_copy = sql::command(this->handle->getSql(), "INSERT INTO `properties` (`serverId`, `type`, `id`, `key`, `value`) SELECT :target_sid AS `serverId`, `type`, `id`, `key`, `value` FROM `properties` WHERE `type` = :type AND `id` = 0 AND `serverId` = 0;", variable{":target_sid", serverId}, variable{":type", property::PROP_TYPE_SERVER}).execute(); if(!prop_copy.success) logCritical(LOG_GENERAL, "Failed to copy default server properties: {}", prop_copy.fmtStr()); auto server = make_shared(serverId, this->handle->getSql()); server->self = server; if(!server->initialize(true)) { //FIXME error handling } server->properties()[property::VIRTUALSERVER_HOST] = hosts; server->properties()[property::VIRTUALSERVER_PORT] = port; { threads::MutexLock l(this->instanceLock); this->instances.push_back(server); } this->adjust_executor_threads(); return server; } bool VirtualServerManager::deleteServer(shared_ptr server) { { threads::MutexLock l(this->instanceLock); bool found = false; for(const auto& s : this->instances) if(s == server) { found = true; break; } if(!found) return false; this->instances.erase(std::remove_if(this->instances.begin(), this->instances.end(), [&](const shared_ptr& s) { return s == server; }), this->instances.end()); } this->adjust_executor_threads(); if(server->getState() != ServerState::OFFLINE) server->stop("server deleted"); { for(const shared_ptr& client : server->getClients()) { if(client && client->getType() == ClientType::CLIENT_QUERY) { lock_guard lock(client->command_lock); client->server = nullptr; client->loadDataForCurrentServer(); } } } { threads::MutexLock locK(server->stateLock); server->state = ServerState::DELETING; } this->handle->properties()[property::SERVERINSTANCE_SPOKEN_TIME_DELETED] += server->properties()[property::VIRTUALSERVER_SPOKEN_TIME].as(); sql::command(this->handle->getSql(), "DELETE FROM `tokens` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `properties` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `permissions` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `groups` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `clients` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `channels` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `bannedClients` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `assignedGroups` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `servers` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `musicbots` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `bannedClients` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); sql::command(this->handle->getSql(), "DELETE FROM `ban_trigger` WHERE `server_id` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"}); this->handle->getFileServer()->deleteServer(server); return true; } void VirtualServerManager::executeAutostart() { threads::MutexLock l(this->instanceLock); auto lastStart = system_clock::time_point(); for(const auto& server : this->instances){ if(!server->running() && server->properties()[property::VIRTUALSERVER_AUTOSTART].as()){ threads::self::sleep_until(lastStart + milliseconds(10)); //Don't start all server at the same point (otherwise all servers would tick at the same moment) lastStart = system_clock::now(); logMessage(server->getServerId(), "Starting server"); string msg; try { if(!server->start(msg)) logError(server->getServerId(), "Failed to start server.\n Message:{}", msg); } catch (const std::exception& ex) { logError(server->getServerId(), "Could not start server! Got an active exception. Message {}", ex.what()); } } } } void VirtualServerManager::shutdownAll(const std::string& msg) { for(const auto &server : this->serverInstances()) server->preStop(msg); for(const auto &server : this->serverInstances()){ if(server->running()) server->stop(msg); } this->execute_loop->shutdown(); } void VirtualServerManager::tickHandshakeClients() { for(const auto& server : this->serverInstances()) { auto vserver = server->getVoiceServer(); if(vserver) vserver->tickHandshakingClients(); } }