489 lines
18 KiB
C++
489 lines
18 KiB
C++
#include <algorithm>
|
|
#include <log/LogUtils.h>
|
|
#include "VirtualServerManager.h"
|
|
#include "src/server/VoiceServer.h"
|
|
#include "src/client/query/QueryClient.h"
|
|
#include "InstanceHandler.h"
|
|
#include "src/server/file/FileServer.h"
|
|
#include "src/client/ConnectedClient.h"
|
|
#include <ThreadPool/ThreadHelper.h>
|
|
|
|
using namespace std;
|
|
using namespace std::chrono;
|
|
using namespace ts::server;
|
|
|
|
VirtualServerManager::VirtualServerManager(InstanceHandler* handle) : handle(handle) {
|
|
this->puzzles = new udp::PuzzleManager{};
|
|
this->handshakeTickers = new threads::Scheduler(1, "handshake ticker");
|
|
this->execute_loop = new event::EventExecutor("executor #");
|
|
//this->join_loop = new event::EventExecutor("joiner #");
|
|
this->_ioManager = new io::VoiceIOManager();
|
|
|
|
this->handshakeTickers->schedule("ticker", [&](){ this->tickHandshakeClients(); }, seconds(1));
|
|
}
|
|
|
|
VirtualServerManager::~VirtualServerManager() {
|
|
this->state = State::STOPPED;
|
|
{
|
|
threads::MutexLock lock(this->instanceLock);
|
|
this->instances.clear();
|
|
}
|
|
|
|
{
|
|
this->acknowledge.condition.notify_all();
|
|
if(!threads::timed_join(this->acknowledge.executor,std::chrono::seconds{2})) {
|
|
logCritical(LOG_GENERAL, "Failed to shutdown packet resend thread.");
|
|
this->acknowledge.executor.detach();
|
|
}
|
|
}
|
|
|
|
delete this->puzzles;
|
|
this->puzzles = nullptr;
|
|
|
|
if(this->execute_loop)
|
|
this->execute_loop->shutdown();
|
|
delete this->execute_loop;
|
|
this->execute_loop = nullptr;
|
|
|
|
if(this->join_loop)
|
|
this->join_loop->shutdown();
|
|
delete this->join_loop;
|
|
this->join_loop = nullptr;
|
|
|
|
if(this->handshakeTickers) {
|
|
this->handshakeTickers->shutdown();
|
|
}
|
|
delete this->handshakeTickers;
|
|
this->handshakeTickers = nullptr;
|
|
|
|
if(this->_ioManager) this->_ioManager->shutdownGlobally();
|
|
delete this->_ioManager;
|
|
this->_ioManager = nullptr;
|
|
}
|
|
|
|
bool VirtualServerManager::initialize(bool autostart) {
|
|
this->execute_loop->initialize(1);
|
|
|
|
this->state = State::STARTING;
|
|
logMessage(LOG_INSTANCE, "Generating server puzzles...");
|
|
auto start = system_clock::now();
|
|
if(!this->puzzles->precompute_puzzles(config::voice::DefaultPuzzlePrecomputeSize))
|
|
logCritical(LOG_INSTANCE, "Failed to precompute RSA puzzles");
|
|
logMessage(LOG_INSTANCE, "Puzzles generated! Time required: " + to_string(duration_cast<milliseconds>(system_clock::now() - start).count()) + "ms");
|
|
|
|
size_t serverCount = 0;
|
|
sql::command(this->handle->getSql(), "SELECT COUNT(`serverId`) FROM `servers`").query([](size_t& ptr, int, char** v, char**) { ptr = stoll(v[0]); return 0; }, serverCount);
|
|
|
|
{
|
|
logMessage(LOG_INSTANCE, "Loading startup cache (This may take a while)");
|
|
auto beg = system_clock::now();
|
|
this->handle->databaseHelper()->loadStartupCache();
|
|
auto end = system_clock::now();
|
|
logMessage(LOG_INSTANCE, "Required {}ms to preload the startup cache. Cache needs {}mb",
|
|
duration_cast<milliseconds>(end - beg).count(),
|
|
this->handle->databaseHelper()->cacheBinarySize() / 1024 / 1024
|
|
);
|
|
}
|
|
|
|
auto beg = system_clock::now();
|
|
size_t server_count = 0;
|
|
sql::command(this->handle->getSql(), "SELECT `serverId`, `host`, `port` FROM `servers`").query([&](VirtualServerManager* mgr, int length, std::string* values, std::string* columns){
|
|
ServerId id = 0;
|
|
std::string host;
|
|
uint16_t port = 0;
|
|
|
|
for(int index = 0; index < length; index++) {
|
|
try {
|
|
if(columns[index] == "serverId")
|
|
id = static_cast<ServerId>(stoll(values[index]));
|
|
else if(columns[index] == "host")
|
|
host = values[index];
|
|
else if(columns[index] == "port")
|
|
port = static_cast<uint16_t>(stoul(values[index]));
|
|
} catch(std::exception& ex) {
|
|
logError(LOG_INSTANCE, "Failed to parse virtual server from database. Failed to parse field {} with value {}: {}", columns[index], values[index], ex.what());
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
if(id == 0) {
|
|
logError(LOG_INSTANCE, "Failed to load virtual server from database. Server id is zero!");
|
|
return 0;
|
|
}
|
|
|
|
if(host.empty()) {
|
|
logWarning(id, "The loaded host is empty. Using default one (from the config.yml)");
|
|
host = config::binding::DefaultVoiceHost;
|
|
}
|
|
|
|
if(port == 0) {
|
|
logError(LOG_INSTANCE, "Failed to load virtual server from database. Server port is zero!");
|
|
return 0;
|
|
}
|
|
|
|
|
|
auto server = make_shared<VirtualServer>(id, this->handle->getSql());
|
|
server->self = server;
|
|
if(!server->initialize(true)) {
|
|
//FIXME error handling
|
|
}
|
|
server->properties()[property::VIRTUALSERVER_HOST] = host;
|
|
server->properties()[property::VIRTUALSERVER_PORT] = port;
|
|
|
|
{
|
|
threads::MutexLock l(this->instanceLock);
|
|
this->instances.push_back(server);
|
|
}
|
|
|
|
if(autostart && server->properties()[property::VIRTUALSERVER_AUTOSTART].as<bool>()) {
|
|
logMessage(server->getServerId(), "Starting server");
|
|
string msg;
|
|
try {
|
|
if(!server->start(msg))
|
|
logError(server->getServerId(), "Failed to start server.\n Message: " + msg);
|
|
} catch (const std::exception& ex) {
|
|
logError(server->getServerId(), "Could not start server! Got an active exception. Message {}", ex.what());
|
|
}
|
|
}
|
|
if(id > 0)
|
|
this->handle->databaseHelper()->clearStartupCache(id);
|
|
|
|
server_count++;
|
|
return 0;
|
|
}, this);
|
|
auto time = duration_cast<milliseconds>(system_clock::now() - beg).count();
|
|
logMessage(LOG_INSTANCE, "Loaded {} servers within {}ms. Server/sec: {:2f}",
|
|
server_count,
|
|
time,
|
|
(float) server_count / (time / 1024 == 0 ? 1 : time / 1024)
|
|
);
|
|
this->handle->databaseHelper()->clearStartupCache(0);
|
|
this->adjust_executor_threads();
|
|
|
|
{
|
|
this->acknowledge.executor = std::thread([&]{
|
|
system_clock::time_point next_execute = system_clock::now() + milliseconds(500);
|
|
while(this->state == State::STARTED || this->state == State::STARTING) {
|
|
unique_lock<mutex> lock(this->acknowledge.lock);
|
|
this->acknowledge.condition.wait_until(lock, next_execute, [&](){ return this->state != State::STARTED && this->state != State::STARTING; });
|
|
|
|
auto now = system_clock::now();
|
|
next_execute = now + milliseconds(500);
|
|
for(const auto& server : this->serverInstances()) {
|
|
auto vserver = server->getVoiceServer(); //Read this only once
|
|
if(vserver)
|
|
vserver->execute_resend(now, next_execute);
|
|
}
|
|
}
|
|
return 0;
|
|
});
|
|
}
|
|
|
|
this->state = State::STARTED;
|
|
return true;
|
|
}
|
|
|
|
shared_ptr<VirtualServer> VirtualServerManager::findServerById(ServerId sid) {
|
|
for(auto server : this->serverInstances())
|
|
if(server->getServerId() == sid)
|
|
return server;
|
|
return nullptr;
|
|
}
|
|
|
|
shared_ptr<VirtualServer> VirtualServerManager::findServerByPort(uint16_t port) {
|
|
for(const auto& server : this->serverInstances()){
|
|
if(server->properties()[property::VIRTUALSERVER_PORT] == port) return server;
|
|
if(server->running() && server->getVoiceServer())
|
|
for(const auto& binding : server->getVoiceServer()->activeBindings())
|
|
if(binding->address_port() == port) return server;
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
uint16_t VirtualServerManager::next_available_port(const std::string& host_string) {
|
|
auto instances = this->serverInstances();
|
|
std::vector<uint16_t> unallowed_ports{};
|
|
unallowed_ports.reserve(instances.size());
|
|
|
|
for(const auto& instance : instances) {
|
|
unallowed_ports.push_back(instance->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>());
|
|
|
|
auto vserver = instance->getVoiceServer();
|
|
if(instance->running() && vserver) {
|
|
for(const auto& binding : vserver->activeBindings()) {
|
|
unallowed_ports.push_back(binding->address_port());
|
|
}
|
|
}
|
|
}
|
|
auto bindings = net::resolve_bindings(host_string, 0);
|
|
|
|
uint16_t port = config::voice::default_voice_port;
|
|
while(true) {
|
|
if(port < 1024) goto next_port;
|
|
|
|
for(auto& p : unallowed_ports) {
|
|
if(p == port)
|
|
goto next_port;
|
|
}
|
|
|
|
for(auto& binding : bindings) {
|
|
if(!std::get<2>(binding).empty()) continue; /* error on that */
|
|
auto& baddress = std::get<1>(binding);
|
|
auto& raw_port = baddress.ss_family == AF_INET ? ((sockaddr_in*) &baddress)->sin_port : ((sockaddr_in6*) &baddress)->sin6_port;
|
|
raw_port = htons(port);
|
|
|
|
switch (net::address_available(baddress, net::binding_type::TCP)) {
|
|
case net::binding_result::ADDRESS_USED:
|
|
goto next_port;
|
|
default:
|
|
break; /* if we've an internal error we ignore it */
|
|
}
|
|
switch (net::address_available(baddress, net::binding_type::UDP)) {
|
|
case net::binding_result::ADDRESS_USED:
|
|
goto next_port;
|
|
default:
|
|
break; /* if we've an internal error we ignore it */
|
|
}
|
|
}
|
|
break;
|
|
|
|
next_port:
|
|
port++;
|
|
}
|
|
return port;
|
|
}
|
|
|
|
ts::ServerId VirtualServerManager::next_available_server_id(bool& success) {
|
|
auto server_id_base = this->handle->properties()[property::SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX].as<ServerId>();
|
|
if(server_id_base > 65530) {
|
|
success = false;
|
|
return 0;
|
|
}
|
|
ServerId serverId = server_id_base != 0 ? server_id_base : (ServerId) 1;
|
|
|
|
auto instances = this->serverInstances();
|
|
vector<ServerId> used_ids;
|
|
used_ids.reserve(instances.size());
|
|
|
|
for(const auto& server : instances)
|
|
used_ids.push_back(server->getServerId());
|
|
|
|
std::stable_sort(used_ids.begin(), used_ids.end(), [](ServerId a, ServerId b) { return b > a; });
|
|
while(true) {
|
|
auto it = used_ids.begin();
|
|
while(it != used_ids.end() && *it < serverId)
|
|
it++;
|
|
|
|
if(it == used_ids.end() || *it != serverId) {
|
|
break;
|
|
} else {
|
|
used_ids.erase(used_ids.begin(), it + 1);
|
|
serverId++;
|
|
}
|
|
}
|
|
|
|
/* increase counter */
|
|
if(server_id_base != 0)
|
|
this->handle->properties()[property::SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX] = serverId;
|
|
|
|
success = true;
|
|
return serverId;
|
|
}
|
|
|
|
ServerReport VirtualServerManager::report() {
|
|
ServerReport result{};
|
|
for(const auto& sr : this->serverInstances()) {
|
|
result.avariable++;
|
|
if(sr->running()) {
|
|
result.online++;
|
|
result.slots += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as<size_t>();
|
|
result.onlineClients += sr->onlineClients();
|
|
result.onlineChannels += sr->onlineChannels();
|
|
}
|
|
|
|
}
|
|
return result;
|
|
}
|
|
|
|
OnlineClientReport VirtualServerManager::clientReport() {
|
|
OnlineClientReport result{};
|
|
for(const auto& server : this->serverInstances()) {
|
|
if(!server->running()) continue;
|
|
auto sr = server->onlineStats();
|
|
result.bots += sr.bots;
|
|
result.queries += sr.queries;
|
|
result.clients_web += sr.clients_web;
|
|
result.clients_ts += sr.clients_ts;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
size_t VirtualServerManager::runningServers() {
|
|
size_t res = 0;
|
|
for(const auto& sr : this->serverInstances())
|
|
if(sr->running()) res++;
|
|
return res;
|
|
}
|
|
|
|
size_t VirtualServerManager::usedSlots() {
|
|
size_t res = 0;
|
|
for(const auto& sr : this->serverInstances())
|
|
res += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as<size_t>();
|
|
return res;
|
|
}
|
|
|
|
shared_ptr<VirtualServer> VirtualServerManager::create_server(std::string hosts, uint16_t port) {
|
|
bool sid_success = false;
|
|
|
|
ServerId serverId = this->next_available_server_id(sid_success);
|
|
if(!sid_success)
|
|
return nullptr;
|
|
|
|
this->delete_server_in_db(serverId); /* just to ensure */
|
|
sql::command(this->handle->getSql(), "INSERT INTO `servers` (`serverId`, `host`, `port`) VALUES (:sid, :host, :port)", variable{":sid", serverId}, variable{":host", hosts}, variable{":port", port}).executeLater().waitAndGetLater(LOG_SQL_CMD, {1, "future failed"});
|
|
|
|
auto prop_copy = sql::command(this->handle->getSql(), "INSERT INTO `properties` (`serverId`, `type`, `id`, `key`, `value`) SELECT :target_sid AS `serverId`, `type`, `id`, `key`, `value` FROM `properties` WHERE `type` = :type AND `id` = 0 AND `serverId` = 0;",
|
|
variable{":target_sid", serverId},
|
|
variable{":type", property::PROP_TYPE_SERVER}).execute();
|
|
if(!prop_copy)
|
|
logCritical(LOG_GENERAL, "Failed to copy default server properties: {}", prop_copy.fmtStr());
|
|
|
|
auto server = make_shared<VirtualServer>(serverId, this->handle->getSql());
|
|
server->self = server;
|
|
if(!server->initialize(true)) {
|
|
//FIXME error handling
|
|
}
|
|
server->properties()[property::VIRTUALSERVER_HOST] = hosts;
|
|
server->properties()[property::VIRTUALSERVER_PORT] = port;
|
|
if(config::server::default_music_bot) {
|
|
auto bot = server->musicManager->createBot(0);
|
|
if(!bot) {
|
|
logCritical(server->getServerId(), "Failed to create default music bot!");
|
|
}
|
|
}
|
|
{
|
|
threads::MutexLock l(this->instanceLock);
|
|
this->instances.push_back(server);
|
|
}
|
|
this->adjust_executor_threads();
|
|
return server;
|
|
}
|
|
|
|
bool VirtualServerManager::deleteServer(shared_ptr<VirtualServer> server) {
|
|
{
|
|
threads::MutexLock l(this->instanceLock);
|
|
bool found = false;
|
|
for(const auto& s : this->instances)
|
|
if(s == server) {
|
|
found = true;
|
|
break;
|
|
}
|
|
if(!found) return false;
|
|
this->instances.erase(std::remove_if(this->instances.begin(), this->instances.end(), [&](const shared_ptr<VirtualServer>& s) {
|
|
return s == server;
|
|
}), this->instances.end());
|
|
}
|
|
this->adjust_executor_threads();
|
|
|
|
if(server->getState() != ServerState::OFFLINE)
|
|
server->stop("server deleted", true);
|
|
for(const auto& cl : server->getClients()) { //start disconnecting
|
|
if(cl->getType() == CLIENT_TEAMSPEAK || cl->getType() == CLIENT_TEASPEAK || cl->getType() == CLIENT_WEB) {
|
|
cl->close_connection(chrono::system_clock::now());
|
|
} else if(cl->getType() == CLIENT_QUERY){
|
|
auto qc = dynamic_pointer_cast<QueryClient>(cl);
|
|
qc->disconnect_from_virtual_server();
|
|
} else if(cl->getType() == CLIENT_MUSIC) {
|
|
cl->disconnect("");
|
|
cl->currentChannel = nullptr;
|
|
} else if(cl->getType() == CLIENT_INTERNAL) {
|
|
|
|
} else {
|
|
}
|
|
}
|
|
{
|
|
for(const shared_ptr<ConnectedClient>& client : server->getClients()) {
|
|
if(client && client->getType() == ClientType::CLIENT_QUERY) {
|
|
lock_guard lock(client->command_lock);
|
|
client->server = nullptr;
|
|
|
|
client->loadDataForCurrentServer();
|
|
}
|
|
}
|
|
}
|
|
{
|
|
threads::MutexLock locK(server->stateLock);
|
|
server->state = ServerState::DELETING;
|
|
}
|
|
|
|
this->handle->properties()[property::SERVERINSTANCE_SPOKEN_TIME_DELETED] += server->properties()[property::VIRTUALSERVER_SPOKEN_TIME].as<uint64_t>();
|
|
this->delete_server_in_db(server->serverId);
|
|
this->handle->getFileServer()->deleteServer(server);
|
|
return true;
|
|
}
|
|
|
|
void VirtualServerManager::executeAutostart() {
|
|
threads::MutexLock l(this->instanceLock);
|
|
auto lastStart = system_clock::time_point();
|
|
for(const auto& server : this->instances){
|
|
if(!server->running() && server->properties()[property::VIRTUALSERVER_AUTOSTART].as<bool>()){
|
|
threads::self::sleep_until(lastStart + milliseconds(10)); //Don't start all server at the same point (otherwise all servers would tick at the same moment)
|
|
lastStart = system_clock::now();
|
|
logMessage(server->getServerId(), "Starting server");
|
|
string msg;
|
|
try {
|
|
if(!server->start(msg))
|
|
logError(server->getServerId(), "Failed to start server.\n Message:{}", msg);
|
|
} catch (const std::exception& ex) {
|
|
logError(server->getServerId(), "Could not start server! Got an active exception. Message {}", ex.what());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void VirtualServerManager::shutdownAll(const std::string& msg) {
|
|
for(const auto &server : this->serverInstances())
|
|
server->preStop(msg);
|
|
for(const auto &server : this->serverInstances()){
|
|
if(server->running()) server->stop(msg, true);
|
|
}
|
|
|
|
this->execute_loop->shutdown();
|
|
}
|
|
|
|
void VirtualServerManager::tickHandshakeClients() {
|
|
for(const auto& server : this->serverInstances()) {
|
|
auto vserver = server->getVoiceServer();
|
|
if(vserver)
|
|
vserver->tickHandshakingClients();
|
|
}
|
|
}
|
|
|
|
void VirtualServerManager::delete_server_in_db(ts::ServerId server_id) {
|
|
#define execute_delete(statement) \
|
|
result = sql::command(this->handle->getSql(), statement, variable{":sid", server_id}).execute(); \
|
|
if(!result) { \
|
|
logWarning(LOG_INSTANCE, "Failed to execute SQL command {}: {}", statement, result.fmtStr()); \
|
|
result = sql::result{}; \
|
|
}
|
|
|
|
sql::result result{};
|
|
|
|
execute_delete("DELETE FROM `tokens` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `properties` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `permissions` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `clients` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `channels` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `bannedClients` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `ban_trigger` WHERE `server_id` = :sid");
|
|
execute_delete("DELETE FROM `assignedGroups` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `servers` WHERE `serverId` = :sid");
|
|
|
|
execute_delete("DELETE FROM `musicbots` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `conversations` WHERE `server_id` = :sid");
|
|
execute_delete("DELETE FROM `conversation_blocks` WHERE `server_id` = :sid");
|
|
|
|
execute_delete("DELETE FROM `playlists` WHERE `serverId` = :sid");
|
|
execute_delete("DELETE FROM `playlist_songs` WHERE `serverId` = :sid");
|
|
} |