2019-07-17 13:37:18 -04:00
# include <algorithm>
# include <log/LogUtils.h>
# include "ServerManager.h"
# include "src/server/VoiceServer.h"
# include "InstanceHandler.h"
# include "src/server/file/FileServer.h"
# include "src/client/ConnectedClient.h"
using namespace std ;
using namespace std : : chrono ;
using namespace ts : : server ;
ServerManager : : ServerManager ( InstanceHandler * handle ) : handle ( handle ) {
this - > puzzles = new protocol : : PuzzleManager ( ) ;
this - > handshakeTickers = new threads : : Scheduler ( 1 , " handshake ticker " ) ;
this - > execute_loop = new event : : EventExecutor ( " executor # " ) ;
2019-09-22 06:36:30 -04:00
//this->join_loop = new event::EventExecutor("joiner #");
2019-07-17 13:37:18 -04:00
this - > _ioManager = new io : : VoiceIOManager ( ) ;
this - > handshakeTickers - > schedule ( " ticker " , [ & ] ( ) { this - > tickHandshakeClients ( ) ; } , seconds ( 1 ) ) ;
}
ServerManager : : ~ ServerManager ( ) {
this - > state = State : : STOPPED ;
{
threads : : MutexLock lock ( this - > instanceLock ) ;
this - > instances . clear ( ) ;
}
{
this - > acknowledge . condition . notify_all ( ) ;
if ( this - > acknowledge . thread )
this - > acknowledge . thread - > join ( ) ;
delete this - > acknowledge . thread ;
}
delete this - > puzzles ;
this - > puzzles = nullptr ;
if ( this - > execute_loop )
this - > execute_loop - > shutdown ( ) ;
delete this - > execute_loop ;
this - > execute_loop = nullptr ;
2019-09-22 06:36:30 -04:00
if ( this - > join_loop )
this - > join_loop - > shutdown ( ) ;
delete this - > join_loop ;
this - > join_loop = nullptr ;
2019-07-17 13:37:18 -04:00
if ( this - > handshakeTickers ) {
this - > handshakeTickers - > shutdown ( ) ;
}
delete this - > handshakeTickers ;
this - > handshakeTickers = nullptr ;
if ( this - > _ioManager ) this - > _ioManager - > shutdownGlobally ( ) ;
delete this - > _ioManager ;
this - > _ioManager = nullptr ;
}
bool ServerManager : : initialize ( bool autostart ) {
this - > execute_loop - > initialize ( 1 ) ;
this - > state = State : : STARTING ;
logMessage ( " Generating server puzzles... " ) ;
auto start = system_clock : : now ( ) ;
this - > puzzles - > precomputePuzzles ( config : : voice : : DefaultPuzzlePrecomputeSize ) ;
logMessage ( " Puzzles generated! Time required: " + to_string ( duration_cast < milliseconds > ( system_clock : : now ( ) - start ) . count ( ) ) + " ms " ) ;
size_t serverCount = 0 ;
sql : : command ( this - > handle - > getSql ( ) , " SELECT COUNT(`serverId`) FROM `servers` " ) . query ( [ ] ( size_t & ptr , int , char * * v , char * * ) { ptr = stoll ( v [ 0 ] ) ; return 0 ; } , serverCount ) ;
{
logMessage ( LOG_INSTANCE , " Loading startup cache (This may take a while) " ) ;
auto beg = system_clock : : now ( ) ;
this - > handle - > databaseHelper ( ) - > loadStartupCache ( ) ;
auto end = system_clock : : now ( ) ;
logMessage ( LOG_INSTANCE , " Required {}ms to preload the startup cache. Cache needs {}mb " ,
duration_cast < milliseconds > ( end - beg ) . count ( ) ,
this - > handle - > databaseHelper ( ) - > cacheBinarySize ( ) / 1024 / 1024
) ;
}
auto beg = system_clock : : now ( ) ;
size_t server_count = 0 ;
2019-10-15 12:04:34 -04:00
sql : : command ( this - > handle - > getSql ( ) , " SELECT `serverId`, `host`, `port` FROM `servers` " ) . query ( [ & ] ( ServerManager * mgr , int length , std : : string * values , std : : string * columns ) {
2019-07-17 13:37:18 -04:00
ServerId id = 0 ;
std : : string host ;
uint16_t port = 0 ;
2019-10-15 12:04:34 -04:00
for ( int index = 0 ; index < length ; index + + ) {
try {
if ( columns [ index ] = = " serverId " )
id = static_cast < ServerId > ( stoll ( values [ index ] ) ) ;
else if ( columns [ index ] = = " host " )
host = values [ index ] ;
else if ( columns [ index ] = = " port " )
port = static_cast < uint16_t > ( stoul ( values [ index ] ) ) ;
} catch ( std : : exception & ex ) {
logError ( LOG_INSTANCE , " Failed to parse virtual server from database. Failed to parse field {} with value {}: {} " , columns [ index ] , values [ index ] , ex . what ( ) ) ;
return 0 ;
}
}
if ( id = = 0 ) {
logError ( LOG_INSTANCE , " Failed to load virtual server from database. Server id is zero! " ) ;
return 0 ;
}
if ( host . empty ( ) ) {
logWarning ( id , " The loaded host is empty. Using default one (from the config.yml) " ) ;
host = config : : binding : : DefaultVoiceHost ;
}
if ( port = = 0 ) {
logError ( LOG_INSTANCE , " Failed to load virtual server from database. Server port is zero! " ) ;
return 0 ;
}
2019-07-17 13:37:18 -04:00
auto server = make_shared < TSServer > ( id , this - > handle - > getSql ( ) ) ;
server - > self = server ;
if ( ! server - > initialize ( true ) ) {
//FIXME error handling
}
server - > properties ( ) [ property : : VIRTUALSERVER_HOST ] = host ;
server - > properties ( ) [ property : : VIRTUALSERVER_PORT ] = port ;
{
threads : : MutexLock l ( this - > instanceLock ) ;
this - > instances . push_back ( server ) ;
}
if ( autostart & & server - > properties ( ) [ property : : VIRTUALSERVER_AUTOSTART ] . as < bool > ( ) ) {
logMessage ( server - > getServerId ( ) , " Starting server " ) ;
string msg ;
try {
if ( ! server - > start ( msg ) )
logError ( server - > getServerId ( ) , " Failed to start server. \n Message: " + msg ) ;
} catch ( const std : : exception & ex ) {
logError ( string ( ) + " Could not start server! Got an active exception. Message " + ex . what ( ) ) ;
}
}
if ( id > 0 )
this - > handle - > databaseHelper ( ) - > clearStartupCache ( id ) ;
server_count + + ;
return 0 ;
} , this ) ;
auto time = duration_cast < milliseconds > ( system_clock : : now ( ) - beg ) . count ( ) ;
logMessage ( LOG_INSTANCE , " Loaded {} servers within {}ms. Server/sec: {:2f} " ,
server_count ,
time ,
( float ) server_count / ( time / 1024 = = 0 ? 1 : time / 1024 )
) ;
this - > handle - > databaseHelper ( ) - > clearStartupCache ( 0 ) ;
this - > adjust_executor_threads ( ) ;
{
this - > acknowledge . thread = new threads : : Thread ( THREAD_SAVE_OPERATIONS , [ & ] {
system_clock : : time_point next_execute = system_clock : : now ( ) + milliseconds ( 500 ) ;
while ( this - > state = = State : : STARTED | | this - > state = = State : : STARTING ) {
unique_lock < mutex > lock ( this - > acknowledge . lock ) ;
this - > acknowledge . condition . wait_until ( lock , next_execute , [ & ] ( ) { return this - > state ! = State : : STARTED & & this - > state ! = State : : STARTING ; } ) ;
auto now = system_clock : : now ( ) ;
next_execute = now + milliseconds ( 500 ) ;
for ( const auto & server : this - > serverInstances ( ) ) {
auto vserver = server - > getVoiceServer ( ) ; //Read this only once
if ( vserver )
vserver - > execute_resend ( now , next_execute ) ;
}
}
return 0 ;
} ) ;
}
this - > state = State : : STARTED ;
return true ;
}
shared_ptr < TSServer > ServerManager : : findServerById ( ServerId sid ) {
for ( auto server : this - > serverInstances ( ) )
if ( server - > getServerId ( ) = = sid )
return server ;
return nullptr ;
}
shared_ptr < TSServer > ServerManager : : findServerByPort ( uint16_t port ) {
for ( const auto & server : this - > serverInstances ( ) ) {
if ( server - > properties ( ) [ property : : VIRTUALSERVER_PORT ] = = port ) return server ;
if ( server - > running ( ) & & server - > getVoiceServer ( ) )
for ( const auto & binding : server - > getVoiceServer ( ) - > activeBindings ( ) )
if ( binding - > address_port ( ) = = port ) return server ;
}
return nullptr ;
}
uint16_t ServerManager : : next_available_port ( ) {
auto instances = this - > serverInstances ( ) ;
deque < uint16_t > unallowed_ports ;
for ( const auto & instance : instances ) {
unallowed_ports . push_back ( instance - > properties ( ) [ property : : VIRTUALSERVER_PORT ] . as < uint16_t > ( ) ) ;
auto vserver = instance - > getVoiceServer ( ) ;
if ( instance - > running ( ) & & vserver ) {
for ( const auto & binding : vserver - > activeBindings ( ) ) {
unallowed_ports . push_back ( binding - > address_port ( ) ) ;
}
}
}
uint16_t port = config : : voice : : default_voice_port ;
while ( true ) {
if ( port < 1024 ) goto c ;
for ( auto & p : unallowed_ports ) {
if ( p = = port )
goto c ;
}
break ;
c :
port + + ;
}
return port ;
}
ts : : ServerId ServerManager : : next_available_server_id ( bool & success ) {
auto server_id_base = this - > handle - > properties ( ) [ property : : SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX ] . as < ServerId > ( ) ;
if ( server_id_base > 65530 ) {
success = false ;
return 0 ;
}
ServerId serverId = server_id_base ! = 0 ? server_id_base : ( ServerId ) 1 ;
auto instances = this - > serverInstances ( ) ;
vector < ServerId > used_ids ;
used_ids . reserve ( instances . size ( ) ) ;
for ( const auto & server : instances )
used_ids . push_back ( server - > getServerId ( ) ) ;
std : : stable_sort ( used_ids . begin ( ) , used_ids . end ( ) , [ ] ( ServerId a , ServerId b ) { return b > a ; } ) ;
while ( true ) {
auto it = used_ids . begin ( ) ;
while ( it ! = used_ids . end ( ) & & * it < serverId )
it + + ;
if ( it = = used_ids . end ( ) | | * it ! = serverId ) {
break ;
} else {
used_ids . erase ( used_ids . begin ( ) , it + 1 ) ;
serverId + + ;
}
}
/* increase counter */
if ( server_id_base ! = 0 )
this - > handle - > properties ( ) [ property : : SERVERINSTANCE_VIRTUAL_SERVER_ID_INDEX ] = serverId ;
success = true ;
return serverId ;
}
ServerReport ServerManager : : report ( ) {
ServerReport result { } ;
for ( const auto & sr : this - > serverInstances ( ) ) {
result . avariable + + ;
if ( sr - > running ( ) ) {
result . online + + ;
result . slots + = sr - > properties ( ) [ property : : VIRTUALSERVER_MAXCLIENTS ] . as < size_t > ( ) ;
result . onlineClients + = sr - > onlineClients ( ) ;
result . onlineChannels + = sr - > onlineChannels ( ) ;
}
}
return result ;
}
OnlineClientReport ServerManager : : clientReport ( ) {
OnlineClientReport result { } ;
for ( const auto & server : this - > serverInstances ( ) ) {
if ( ! server - > running ( ) ) continue ;
auto sr = server - > onlineStats ( ) ;
result . bots + = sr . bots ;
result . queries + = sr . queries ;
result . clients_web + = sr . clients_web ;
result . clients_ts + = sr . clients_ts ;
}
return result ;
}
size_t ServerManager : : runningServers ( ) {
size_t res = 0 ;
for ( const auto & sr : this - > serverInstances ( ) )
if ( sr - > running ( ) ) res + + ;
return res ;
}
size_t ServerManager : : usedSlots ( ) {
size_t res = 0 ;
for ( const auto & sr : this - > serverInstances ( ) )
res + = sr - > properties ( ) [ property : : VIRTUALSERVER_MAXCLIENTS ] . as < size_t > ( ) ;
return res ;
}
shared_ptr < TSServer > ServerManager : : createServer ( std : : string hosts , uint16_t port ) {
bool sid_success = false ;
ServerId serverId = this - > next_available_server_id ( sid_success ) ;
if ( ! sid_success )
return nullptr ;
sql : : command ( this - > handle - > getSql ( ) , " INSERT INTO `servers` (`serverId`, `host`, `port`) VALUES (:sid, :host, :port) " , variable { " :sid " , serverId } , variable { " :host " , hosts } , variable { " :port " , port } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
//`serverId` INTEGER DEFAULT -1, `type` INTEGER, `id` INTEGER, `key` VARCHAR(" UNKNOWN_KEY_LENGTH "), `value` TEXT
auto prop_copy = sql : : command ( this - > handle - > getSql ( ) , " INSERT INTO `properties` (`serverId`, `type`, `id`, `key`, `value`) SELECT :target_sid AS `serverId`, `type`, `id`, `key`, `value` FROM `properties` WHERE `type` = :type AND `id` = 0 AND `serverId` = 0; " ,
variable { " :target_sid " , serverId } ,
variable { " :type " , property : : PROP_TYPE_SERVER } ) . execute ( ) ;
if ( ! prop_copy . success )
logCritical ( LOG_GENERAL , " Failed to copy default server properties: {} " , prop_copy . fmtStr ( ) ) ;
auto server = make_shared < TSServer > ( serverId , this - > handle - > getSql ( ) ) ;
server - > self = server ;
if ( ! server - > initialize ( true ) ) {
//FIXME error handling
}
server - > properties ( ) [ property : : VIRTUALSERVER_HOST ] = hosts ;
server - > properties ( ) [ property : : VIRTUALSERVER_PORT ] = port ;
{
threads : : MutexLock l ( this - > instanceLock ) ;
this - > instances . push_back ( server ) ;
}
this - > adjust_executor_threads ( ) ;
return server ;
}
bool ServerManager : : deleteServer ( shared_ptr < TSServer > server ) {
{
threads : : MutexLock l ( this - > instanceLock ) ;
bool found = false ;
for ( const auto & s : this - > instances )
if ( s = = server ) {
found = true ;
break ;
}
if ( ! found ) return false ;
this - > instances . erase ( std : : remove_if ( this - > instances . begin ( ) , this - > instances . end ( ) , [ & ] ( const shared_ptr < TSServer > & s ) {
return s = = server ;
} ) , this - > instances . end ( ) ) ;
}
this - > adjust_executor_threads ( ) ;
if ( server - > getState ( ) ! = ServerState : : OFFLINE )
server - > stop ( " server deleted " ) ;
{
for ( const shared_ptr < ConnectedClient > & client : server - > getClients ( ) ) {
if ( client & & client - > getType ( ) = = ClientType : : CLIENT_QUERY ) {
lock_guard lock ( client - > command_lock ) ;
client - > server = nullptr ;
client - > loadDataForCurrentServer ( ) ;
}
}
}
{
threads : : MutexLock locK ( server - > stateLock ) ;
server - > state = ServerState : : DELETING ;
}
this - > handle - > properties ( ) [ property : : SERVERINSTANCE_SPOKEN_TIME_DELETED ] + = server - > properties ( ) [ property : : VIRTUALSERVER_SPOKEN_TIME ] . as < uint64_t > ( ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `tokens` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `properties` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `permissions` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `groups` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `clients` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `channels` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `bannedClients` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `assignedGroups` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `servers` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `musicbots` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `bannedClients` WHERE `serverId` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
sql : : command ( this - > handle - > getSql ( ) , " DELETE FROM `ban_trigger` WHERE `server_id` = :sid " , variable { " :sid " , server - > getServerId ( ) } ) . executeLater ( ) . waitAndGetLater ( LOG_SQL_CMD , { 1 , " future failed " } ) ;
this - > handle - > getFileServer ( ) - > deleteServer ( server ) ;
return true ;
}
void ServerManager : : executeAutostart ( ) {
threads : : MutexLock l ( this - > instanceLock ) ;
auto lastStart = system_clock : : time_point ( ) ;
for ( const auto & server : this - > instances ) {
if ( ! server - > running ( ) & & server - > properties ( ) [ property : : VIRTUALSERVER_AUTOSTART ] . as < bool > ( ) ) {
threads : : self : : sleep_until ( lastStart + milliseconds ( 10 ) ) ; //Don't start all server at the same point (otherwise all servers would tick at the same moment)
lastStart = system_clock : : now ( ) ;
logMessage ( server - > getServerId ( ) , " Starting server " ) ;
string msg ;
try {
if ( ! server - > start ( msg ) )
logError ( server - > getServerId ( ) , " Failed to start server. \n Message: " + msg ) ;
} catch ( const std : : exception & ex ) {
logError ( string ( ) + " Could not start server! Got an active exception. Message " + ex . what ( ) ) ;
}
}
}
}
void ServerManager : : shutdownAll ( const std : : string & msg ) {
for ( const auto & server : this - > serverInstances ( ) )
server - > preStop ( msg ) ;
for ( const auto & server : this - > serverInstances ( ) ) {
if ( server - > running ( ) ) server - > stop ( msg ) ;
}
this - > execute_loop - > shutdown ( ) ;
}
void ServerManager : : tickHandshakeClients ( ) {
for ( const auto & server : this - > serverInstances ( ) ) {
auto vserver = server - > getVoiceServer ( ) ;
if ( vserver )
vserver - > tickHandshakingClients ( ) ;
}
}