Teaspeak-Server/server/src/VirtualServerManager.cpp
2020-02-20 13:12:14 +01:00

434 lines
18 KiB
C++

#include <algorithm>
#include <log/LogUtils.h>
#include "VirtualServerManager.h"
#include "src/server/VoiceServer.h"
#include "InstanceHandler.h"
#include "src/server/file/FileServer.h"
#include "src/client/ConnectedClient.h"
#include <ThreadPool/ThreadHelper.h>
using namespace std;
using namespace std::chrono;
using namespace ts::server;
VirtualServerManager::VirtualServerManager(InstanceHandler* handle) : handle(handle) {
this->puzzles = new protocol::PuzzleManager();
this->handshakeTickers = new threads::Scheduler(1, "handshake ticker");
this->execute_loop = new event::EventExecutor("executor #");
//this->join_loop = new event::EventExecutor("joiner #");
this->_ioManager = new io::VoiceIOManager();
this->handshakeTickers->schedule("ticker", [&](){ this->tickHandshakeClients(); }, seconds(1));
}
VirtualServerManager::~VirtualServerManager() {
this->state = State::STOPPED;
{
threads::MutexLock lock(this->instanceLock);
this->instances.clear();
}
{
this->acknowledge.condition.notify_all();
if(!threads::timed_join(this->acknowledge.executor,std::chrono::seconds{2})) {
logCritical(LOG_GENERAL, "Failed to shutdown packet resend thread.");
this->acknowledge.executor.detach();
}
}
delete this->puzzles;
this->puzzles = nullptr;
if(this->execute_loop)
this->execute_loop->shutdown();
delete this->execute_loop;
this->execute_loop = nullptr;
if(this->join_loop)
this->join_loop->shutdown();
delete this->join_loop;
this->join_loop = nullptr;
if(this->handshakeTickers) {
this->handshakeTickers->shutdown();
}
delete this->handshakeTickers;
this->handshakeTickers = nullptr;
if(this->_ioManager) this->_ioManager->shutdownGlobally();
delete this->_ioManager;
this->_ioManager = nullptr;
}
bool VirtualServerManager::initialize(bool autostart) {
this->execute_loop->initialize(1);
this->state = State::STARTING;
logMessage(LOG_INSTANCE, "Generating server puzzles...");
auto start = system_clock::now();
this->puzzles->precomputePuzzles(config::voice::DefaultPuzzlePrecomputeSize);
logMessage(LOG_INSTANCE, "Puzzles generated! Time required: " + to_string(duration_cast<milliseconds>(system_clock::now() - start).count()) + "ms");
size_t serverCount = 0;
sql::command(this->handle->getSql(), "SELECT COUNT(`serverId`) FROM `servers`").query([](size_t& ptr, int, char** v, char**) { ptr = stoll(v[0]); return 0; }, serverCount);
{
logMessage(LOG_INSTANCE, "Loading startup cache (This may take a while)");
auto beg = system_clock::now();
this->handle->databaseHelper()->loadStartupCache();
auto end = system_clock::now();
logMessage(LOG_INSTANCE, "Required {}ms to preload the startup cache. Cache needs {}mb",
duration_cast<milliseconds>(end - beg).count(),
this->handle->databaseHelper()->cacheBinarySize() / 1024 / 1024
);
}
auto beg = system_clock::now();
size_t server_count = 0;
sql::command(this->handle->getSql(), "SELECT `serverId`, `host`, `port` FROM `servers`").query([&](VirtualServerManager* mgr, int length, std::string* values, std::string* columns){
ServerId id = 0;
std::string host;
uint16_t port = 0;
for(int index = 0; index < length; index++) {
try {
if(columns[index] == "serverId")
id = static_cast<ServerId>(stoll(values[index]));
else if(columns[index] == "host")
host = values[index];
else if(columns[index] == "port")
port = static_cast<uint16_t>(stoul(values[index]));
} catch(std::exception& ex) {
logError(LOG_INSTANCE, "Failed to parse virtual server from database. Failed to parse field {} with value {}: {}", columns[index], values[index], ex.what());
return 0;
}
}
if(id == 0) {
logError(LOG_INSTANCE, "Failed to load virtual server from database. Server id is zero!");
return 0;
}
if(host.empty()) {
logWarning(id, "The loaded host is empty. Using default one (from the config.yml)");
host = config::binding::DefaultVoiceHost;
}
if(port == 0) {
logError(LOG_INSTANCE, "Failed to load virtual server from database. Server port is zero!");
return 0;
}
auto server = make_shared<VirtualServer>(id, this->handle->getSql());
server->self = server;
if(!server->initialize(true)) {
//FIXME error handling
}
server->properties()[property::VIRTUALSERVER_HOST] = host;
server->properties()[property::VIRTUALSERVER_PORT] = port;
{
threads::MutexLock l(this->instanceLock);
this->instances.push_back(server);
}
if(autostart && server->properties()[property::VIRTUALSERVER_AUTOSTART].as<bool>()) {
logMessage(server->getServerId(), "Starting server");
string msg;
try {
if(!server->start(msg))
logError(server->getServerId(), "Failed to start server.\n Message: " + msg);
} catch (const std::exception& ex) {
logError(server->getServerId(), "Could not start server! Got an active exception. Message {}", ex.what());
}
}
if(id > 0)
this->handle->databaseHelper()->clearStartupCache(id);
server_count++;
return 0;
}, this);
auto time = duration_cast<milliseconds>(system_clock::now() - beg).count();
logMessage(LOG_INSTANCE, "Loaded {} servers within {}ms. Server/sec: {:2f}",
server_count,
time,
(float) server_count / (time / 1024 == 0 ? 1 : time / 1024)
);
this->handle->databaseHelper()->clearStartupCache(0);
this->adjust_executor_threads();
{
this->acknowledge.executor = std::thread([&]{
system_clock::time_point next_execute = system_clock::now() + milliseconds(500);
while(this->state == State::STARTED || this->state == State::STARTING) {
unique_lock<mutex> lock(this->acknowledge.lock);
this->acknowledge.condition.wait_until(lock, next_execute, [&](){ return this->state != State::STARTED && this->state != State::STARTING; });
auto now = system_clock::now();
next_execute = now + milliseconds(500);
for(const auto& server : this->serverInstances()) {
auto vserver = server->getVoiceServer(); //Read this only once
if(vserver)
vserver->execute_resend(now, next_execute);
}
}
return 0;
});
}
this->state = State::STARTED;
return true;
}
shared_ptr<VirtualServer> VirtualServerManager::findServerById(ServerId sid) {
for(auto server : this->serverInstances())
if(server->getServerId() == sid)
return server;
return nullptr;
}
shared_ptr<VirtualServer> VirtualServerManager::findServerByPort(uint16_t port) {
for(const auto& server : this->serverInstances()){
if(server->properties()[property::VIRTUALSERVER_PORT] == port) return server;
if(server->running() && server->getVoiceServer())
for(const auto& binding : server->getVoiceServer()->activeBindings())
if(binding->address_port() == port) return server;
}
return nullptr;
}
uint16_t VirtualServerManager::next_available_port() {
auto instances = this->serverInstances();
deque<uint16_t> unallowed_ports;
for(const auto& instance : instances) {
unallowed_ports.push_back(instance->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>());
auto vserver = instance->getVoiceServer();
if(instance->running() && vserver) {
for(const auto& binding : vserver->activeBindings()) {
unallowed_ports.push_back(binding->address_port());
}
}
}
uint16_t port = config::voice::default_voice_port;
while(true) {
if(port < 1024) goto c;
for(auto& p : unallowed_ports) {
if(p == port)
goto c;
}
break;
c:
port++;
}
return port;
}
ts::ServerId VirtualServerManager::next_available_server_id(bool& success) {
auto server_id_base = this->handle->properties()[property::SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX].as<ServerId>();
if(server_id_base > 65530) {
success = false;
return 0;
}
ServerId serverId = server_id_base != 0 ? server_id_base : (ServerId) 1;
auto instances = this->serverInstances();
vector<ServerId> used_ids;
used_ids.reserve(instances.size());
for(const auto& server : instances)
used_ids.push_back(server->getServerId());
std::stable_sort(used_ids.begin(), used_ids.end(), [](ServerId a, ServerId b) { return b > a; });
while(true) {
auto it = used_ids.begin();
while(it != used_ids.end() && *it < serverId)
it++;
if(it == used_ids.end() || *it != serverId) {
break;
} else {
used_ids.erase(used_ids.begin(), it + 1);
serverId++;
}
}
/* increase counter */
if(server_id_base != 0)
this->handle->properties()[property::SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX] = serverId;
success = true;
return serverId;
}
ServerReport VirtualServerManager::report() {
ServerReport result{};
for(const auto& sr : this->serverInstances()) {
result.avariable++;
if(sr->running()) {
result.online++;
result.slots += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as<size_t>();
result.onlineClients += sr->onlineClients();
result.onlineChannels += sr->onlineChannels();
}
}
return result;
}
OnlineClientReport VirtualServerManager::clientReport() {
OnlineClientReport result{};
for(const auto& server : this->serverInstances()) {
if(!server->running()) continue;
auto sr = server->onlineStats();
result.bots += sr.bots;
result.queries += sr.queries;
result.clients_web += sr.clients_web;
result.clients_ts += sr.clients_ts;
}
return result;
}
size_t VirtualServerManager::runningServers() {
size_t res = 0;
for(const auto& sr : this->serverInstances())
if(sr->running()) res++;
return res;
}
size_t VirtualServerManager::usedSlots() {
size_t res = 0;
for(const auto& sr : this->serverInstances())
res += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as<size_t>();
return res;
}
shared_ptr<VirtualServer> VirtualServerManager::create_server(std::string hosts, uint16_t port) {
bool sid_success = false;
ServerId serverId = this->next_available_server_id(sid_success);
if(!sid_success)
return nullptr;
sql::command(this->handle->getSql(), "INSERT INTO `servers` (`serverId`, `host`, `port`) VALUES (:sid, :host, :port)", variable{":sid", serverId}, variable{":host", hosts}, variable{":port", port}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
//`serverId` INTEGER DEFAULT -1, `type` INTEGER, `id` INTEGER, `key` VARCHAR(" UNKNOWN_KEY_LENGTH "), `value` TEXT
auto prop_copy = sql::command(this->handle->getSql(), "INSERT INTO `properties` (`serverId`, `type`, `id`, `key`, `value`) SELECT :target_sid AS `serverId`, `type`, `id`, `key`, `value` FROM `properties` WHERE `type` = :type AND `id` = 0 AND `serverId` = 0;",
variable{":target_sid", serverId},
variable{":type", property::PROP_TYPE_SERVER}).execute();
if(!prop_copy)
logCritical(LOG_GENERAL, "Failed to copy default server properties: {}", prop_copy.fmtStr());
auto server = make_shared<VirtualServer>(serverId, this->handle->getSql());
server->self = server;
if(!server->initialize(true)) {
//FIXME error handling
}
server->properties()[property::VIRTUALSERVER_HOST] = hosts;
server->properties()[property::VIRTUALSERVER_PORT] = port;
if(!config::server::default_music_bot) {
auto bot = server->musicManager->createBot(0);
if(!bot) {
logCritical(server->getServerId(), "Failed to create default music bot!");
}
}
{
threads::MutexLock l(this->instanceLock);
this->instances.push_back(server);
}
this->adjust_executor_threads();
return server;
}
bool VirtualServerManager::deleteServer(shared_ptr<VirtualServer> server) {
{
threads::MutexLock l(this->instanceLock);
bool found = false;
for(const auto& s : this->instances)
if(s == server) {
found = true;
break;
}
if(!found) return false;
this->instances.erase(std::remove_if(this->instances.begin(), this->instances.end(), [&](const shared_ptr<VirtualServer>& s) {
return s == server;
}), this->instances.end());
}
this->adjust_executor_threads();
if(server->getState() != ServerState::OFFLINE)
server->stop("server deleted");
{
for(const shared_ptr<ConnectedClient>& client : server->getClients()) {
if(client && client->getType() == ClientType::CLIENT_QUERY) {
lock_guard lock(client->command_lock);
client->server = nullptr;
client->loadDataForCurrentServer();
}
}
}
{
threads::MutexLock locK(server->stateLock);
server->state = ServerState::DELETING;
}
this->handle->properties()[property::SERVERINSTANCE_SPOKEN_TIME_DELETED] += server->properties()[property::VIRTUALSERVER_SPOKEN_TIME].as<uint64_t>();
sql::command(this->handle->getSql(), "DELETE FROM `tokens` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `properties` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `permissions` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `groups` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `clients` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `channels` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `bannedClients` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `assignedGroups` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `servers` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `musicbots` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `bannedClients` WHERE `serverId` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
sql::command(this->handle->getSql(), "DELETE FROM `ban_trigger` WHERE `server_id` = :sid", variable{":sid", server->getServerId()}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
this->handle->getFileServer()->deleteServer(server);
return true;
}
void VirtualServerManager::executeAutostart() {
threads::MutexLock l(this->instanceLock);
auto lastStart = system_clock::time_point();
for(const auto& server : this->instances){
if(!server->running() && server->properties()[property::VIRTUALSERVER_AUTOSTART].as<bool>()){
threads::self::sleep_until(lastStart + milliseconds(10)); //Don't start all server at the same point (otherwise all servers would tick at the same moment)
lastStart = system_clock::now();
logMessage(server->getServerId(), "Starting server");
string msg;
try {
if(!server->start(msg))
logError(server->getServerId(), "Failed to start server.\n Message:{}", msg);
} catch (const std::exception& ex) {
logError(server->getServerId(), "Could not start server! Got an active exception. Message {}", ex.what());
}
}
}
}
void VirtualServerManager::shutdownAll(const std::string& msg) {
for(const auto &server : this->serverInstances())
server->preStop(msg);
for(const auto &server : this->serverInstances()){
if(server->running()) server->stop(msg);
}
this->execute_loop->shutdown();
}
void VirtualServerManager::tickHandshakeClients() {
for(const auto& server : this->serverInstances()) {
auto vserver = server->getVoiceServer();
if(vserver)
vserver->tickHandshakingClients();
}
}