diff --git a/MusicBot/src/MusicPlayer.cpp b/MusicBot/src/MusicPlayer.cpp index 9df83a9..0945f64 100644 --- a/MusicBot/src/MusicPlayer.cpp +++ b/MusicBot/src/MusicPlayer.cpp @@ -22,7 +22,7 @@ void AbstractMusicPlayer::registerEventHandler(const std::string& key, const std void AbstractMusicPlayer::unregisterEventHandler(const std::string& string) { threads::MutexLock lock(this->eventLock); for(const auto& entry : this->eventHandlers){ - if(entry.first == string){ + if(entry.first == string) { this->eventHandlers.erase(find_if(this->eventHandlers.begin(), this->eventHandlers.end(), [string](const std::pair>& elm){ return elm.first == string; })); break; } diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 7b9e1db..7cabfc9 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -128,6 +128,8 @@ set(SERVER_SOURCE_FILES src/weblist/WebListManager.cpp src/weblist/TeamSpeakWebClient.cpp + + src/manager/ConversationManager.cpp ) if(COMPILE_WEB_CLIENT) add_definitions(-DCOMPILE_WEB_CLIENT) diff --git a/server/src/InstanceHandler.cpp b/server/src/InstanceHandler.cpp index 795154d..8f36bd2 100644 --- a/server/src/InstanceHandler.cpp +++ b/server/src/InstanceHandler.cpp @@ -226,7 +226,8 @@ inline sockaddr_in* resolveAddress(const string& host, uint16_t port) { } bool InstanceHandler::startInstance() { - if (this->active) return false; + if (this->active) + return false; active = true; this->web_list->enabled = ts::config::server::enable_teamspeak_weblist; @@ -236,6 +237,12 @@ bool InstanceHandler::startInstance() { return false; } + this->conversation_io = make_shared("conv io #"); + if(!this->conversation_io->initialize(1)) { //TODO: Make the conversation IO loop thread size configurable + logCritical(LOG_GENERAL, "Failed to initialize conversation io write loop"); + return false; + } + //Startup file server sockaddr_in *fAddr = resolveAddress(this->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as(), this->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as()); if (!fAddr) { diff --git a/server/src/InstanceHandler.h b/server/src/InstanceHandler.h index 0f63226..04b5ff9 100644 --- a/server/src/InstanceHandler.h +++ b/server/src/InstanceHandler.h @@ -62,6 +62,7 @@ namespace ts { std::shared_ptr getDefaultServerProperties() { return this->default_server_properties; } std::shared_ptr getWebIoLoop() { return this->web_event_loop; } std::shared_ptr getWebList() { return this->web_list; } + std::shared_ptr getConversationIo() { return this->conversation_io; } private: std::mutex activeLock; std::condition_variable activeCon; @@ -86,6 +87,7 @@ namespace ts { ts::Properties* _properties = nullptr; + std::shared_ptr conversation_io = nullptr; std::shared_ptr web_event_loop = nullptr; std::shared_ptr web_list = nullptr; diff --git a/server/src/TS3ServerHeartbeat.cpp b/server/src/TS3ServerHeartbeat.cpp index 790d534..86f9ba4 100644 --- a/server/src/TS3ServerHeartbeat.cpp +++ b/server/src/TS3ServerHeartbeat.cpp @@ -4,6 +4,7 @@ #include #include "InstanceHandler.h" #include "TSServer.h" +#include "./manager/ConversationManager.h" using namespace std; using namespace std::chrono; @@ -52,7 +53,7 @@ void TSServer::executeServerTick() { lastTick = system_clock::now(); system_clock::time_point timing_begin, timing_end; - milliseconds timing_update_states, timing_client_tick, timing_channel, timing_statistic, timing_groups; + milliseconds timing_update_states, timing_client_tick, timing_channel, timing_statistic, timing_groups, timing_ccache; auto client_list = this->getClients(); @@ -255,14 +256,26 @@ void TSServer::executeServerTick() { END_TIMINGS(timing_groups); } + { + BEGIN_TIMINGS(); + if(this->conversation_cache_cleanup_timestamp + minutes(15) < system_clock::now()) { + debugMessage(this->serverId, "Cleaning up conversation cache."); + this->_conversation_manager->cleanup_cache(); + conversation_cache_cleanup_timestamp = system_clock::now(); + } + END_TIMINGS(timing_ccache); + } + if(system_clock::now() - lastTick > milliseconds(100)) { //milliseconds timing_update_states, timing_client_tick, timing_channel, timing_statistic; - logError(this->serverId, "Server tick tooks to long ({}ms => Status updates: {}ms Client tick: {}ms, Channel tick: {}ms, Statistic tick: {}ms)", + logError(this->serverId, "Server tick tooks to long ({}ms => Status updates: {}ms Client tick: {}ms, Channel tick: {}ms, Statistic tick: {}ms, Groups: {}ms, Conversation cache: {}ms)", duration_cast(system_clock::now() - lastTick).count(), timing_update_states.count(), timing_client_tick.count(), timing_channel.count(), - timing_statistic.count() + timing_statistic.count(), + timing_groups.count(), + timing_ccache.count() ); } } catch (std::exception& ex) { diff --git a/server/src/TSServer.cpp b/server/src/TSServer.cpp index 28cbc18..6fd76ff 100644 --- a/server/src/TSServer.cpp +++ b/server/src/TSServer.cpp @@ -21,6 +21,7 @@ #include "InstanceHandler.h" #include "Configuration.h" #include "TSServer.h" +#include "src/manager/ConversationManager.h" #include using namespace std; @@ -113,6 +114,9 @@ bool TSServer::initialize(bool test_properties) { properties()[property::VIRTUALSERVER_UNIQUE_IDENTIFIER] = base64::encode(digest::sha1(base64::encode(buffer, bufferLength))); } + this->_conversation_manager = make_shared(this->ref()); + this->_conversation_manager->initialize(this->_conversation_manager); + channelTree = new ServerChannelTree(self.lock(), this->sql); channelTree->loadChannelsFromDatabase(); @@ -218,9 +222,11 @@ bool TSServer::initialize(bool test_properties) { } } - if(this->properties()[property::VIRTUALSERVER_FILEBASE].value().empty()) { + if(this->properties()[property::VIRTUALSERVER_FILEBASE].value().empty()) this->properties()[property::VIRTUALSERVER_FILEBASE] = serverInstance->getFileServer()->server_file_base(self.lock()); - } + + /* lets cleanup the conversations for not existent channels */ + this->_conversation_manager->cleanup_channels(); return true; } @@ -231,6 +237,7 @@ TSServer::~TSServer() { delete this->channelTree; delete this->letters; delete this->complains; + this->_conversation_manager.reset(); if(this->_serverKey) ecc_free(this->_serverKey); delete this->_serverKey; @@ -860,7 +867,7 @@ vector(data); }), server_group_data.end()); + server_group_data.erase(remove_if(server_group_data.begin(), server_group_data.end(), [](auto data) { return !std::get<2>(data); }), server_group_data.end()); logTrace(this->serverId, "[Permission] Found negate flag within server groups. Groups left: {}", server_group_data.size()); sassert(!server_group_data.empty()); /* this should never happen! */ permission::PermissionValue current_lowest = 0; diff --git a/server/src/TSServer.h b/server/src/TSServer.h index 8fc43ba..14da4ca 100644 --- a/server/src/TSServer.h +++ b/server/src/TSServer.h @@ -64,6 +64,10 @@ namespace ts { class WebControlServer; + namespace conversation { + class ConversationManager; + } + struct ServerState { enum value { OFFLINE, @@ -268,6 +272,7 @@ namespace ts { ); inline int voice_encryption_mode() { return this->_voice_encryption_mode; } + inline std::shared_ptr conversation_manager() { return this->_conversation_manager; } protected: bool registerClient(std::shared_ptr); bool unregisterClient(std::shared_ptr, std::string, std::unique_lock& channel_tree_lock); @@ -289,6 +294,7 @@ namespace ts { letter::LetterManager* letters = nullptr; std::shared_ptr musicManager; std::shared_ptr serverStatistics; + std::shared_ptr _conversation_manager; sql::SqlManager* sql; @@ -296,6 +302,7 @@ namespace ts { std::chrono::system_clock::time_point startTimestamp; std::chrono::system_clock::time_point fileStatisticsTimestamp; + std::chrono::system_clock::time_point conversation_cache_cleanup_timestamp; //The client list struct { diff --git a/server/src/channel/ClientChannelView.h b/server/src/channel/ClientChannelView.h index fdc0f84..ea951bd 100644 --- a/server/src/channel/ClientChannelView.h +++ b/server/src/channel/ClientChannelView.h @@ -32,7 +32,6 @@ namespace ts { ChannelId cached_parent_id = 0; }; - typedef std::shared_lock TreeLock; class ClientChannelView : private TreeView { public: enum ChannelAction { diff --git a/server/src/channel/ServerChannel.cpp b/server/src/channel/ServerChannel.cpp index 5ee17a4..0dd1668 100644 --- a/server/src/channel/ServerChannel.cpp +++ b/server/src/channel/ServerChannel.cpp @@ -7,6 +7,7 @@ #include "src/client/ConnectedClient.h" #include "src/server/file/FileServer.h" #include "src/InstanceHandler.h" +#include "../manager/ConversationManager.h" using namespace std; using namespace ts; @@ -530,9 +531,10 @@ void ServerChannelTree::on_channel_entry_deleted(const shared_ptr BasicChannelTree::on_channel_entry_deleted(channel); auto server = this->server.lock(); - if(server) + if(server) { server->getGroupManager()->handleChannelDeleted(channel); - else + server->conversation_manager()->delete_conversation(channel->channelId()); + } else serverInstance->getGroupManager()->handleChannelDeleted(channel); diff --git a/server/src/client/ConnectedClient.cpp b/server/src/client/ConnectedClient.cpp index 4685bb4..3936660 100644 --- a/server/src/client/ConnectedClient.cpp +++ b/server/src/client/ConnectedClient.cpp @@ -112,12 +112,12 @@ void ConnectedClient::updateChannelClientProperties(bool lock_channel_tree, bool deque notifyList; debugMessage(this->getServerId(), "{} Got a channel talk power of {} Talk power set is {}", CLIENT_STR_LOG_PREFIX, permission_talk_power, this->properties()[property::CLIENT_TALK_POWER].as()); - if(permission_talk_power != this->properties()[property::CLIENT_TALK_POWER].as() && this->currentChannel) { //We do not have to update tp if there's no channel + if(permission_talk_power != this->properties()[property::CLIENT_TALK_POWER].as()) { //We do not have to update tp if there's no channel this->properties()[property::CLIENT_TALK_POWER] = permission_talk_power; notifyList.emplace_back(property::CLIENT_TALK_POWER); auto update = this->properties()[property::CLIENT_IS_TALKER].as() || this->properties()[property::CLIENT_TALK_REQUEST].as() > 0; - if(update) { + if(update && this->currentChannel) { if(this->currentChannel->talk_power_granted({permission_talk_power, permission_talk_power != permNotGranted})) { this->properties()[property::CLIENT_IS_TALKER] = 0; this->properties()[property::CLIENT_TALK_REQUEST] = 0; diff --git a/server/src/client/ConnectedClient.h b/server/src/client/ConnectedClient.h index 010745f..a2ad32e 100644 --- a/server/src/client/ConnectedClient.h +++ b/server/src/client/ConnectedClient.h @@ -583,6 +583,9 @@ namespace ts { CommandResult handleCommandQueryDelete(Command&); CommandResult handleCommandQueryChangePassword(Command&); + CommandResult handleCommandConversationHistory(Command&); + CommandResult handleCommandConversationFetch(Command&); + CommandResult handleCommandLogView(Command&); //CMD_TODO handleCommandLogAdd diff --git a/server/src/client/ConnectedClientCommandHandler.cpp b/server/src/client/ConnectedClientCommandHandler.cpp index 7ee1e55..3329ac3 100644 --- a/server/src/client/ConnectedClientCommandHandler.cpp +++ b/server/src/client/ConnectedClientCommandHandler.cpp @@ -17,6 +17,7 @@ #include "music/MusicClient.h" #include "query/QueryClient.h" #include "../weblist/WebListManager.h" +#include "../manager/ConversationManager.h" #include #include #include @@ -175,7 +176,7 @@ CommandResult ConnectedClient::handleCommand(Command &cmd) { else if (command == "permissionlist") return this->handleCommandPermissionList(cmd); else if (command == "propertylist") return this->handleCommandPropertyList(cmd); - //Server group + //Server group else if (command == "servergrouplist") return this->handleCommandServerGroupList(cmd); else if (command == "servergroupadd") return this->handleCommandServerGroupAdd(cmd); else if (command == "servergroupcopy") return this->handleCommandServerGroupCopy(cmd); @@ -190,19 +191,19 @@ CommandResult ConnectedClient::handleCommand(Command &cmd) { else if (command == "setclientchannelgroup") return this->handleCommandSetClientChannelGroup(cmd); - //Channel basic actions + //Channel basic actions else if (command == "channelcreate") return this->handleCommandChannelCreate(cmd); else if (command == "channelmove") return this->handleCommandChannelMove(cmd); else if (command == "channeledit") return this->handleCommandChannelEdit(cmd); else if (command == "channeldelete") return this->handleCommandChannelDelete(cmd); - //Find a channel and get informations + //Find a channel and get informations else if (command == "channelfind") return this->handleCommandChannelFind(cmd); else if (command == "channelinfo") return this->handleCommandChannelInfo(cmd); - //Channel perm actions + //Channel perm actions else if (command == "channelpermlist") return this->handleCommandChannelPermList(cmd); else if (command == "channeladdperm") return this->handleCommandChannelAddPerm(cmd); else if (command == "channeldelperm") return this->handleCommandChannelDelPerm(cmd); - //Channel group actions + //Channel group actions else if (command == "channelgroupadd") return this->handleCommandChannelGroupAdd(cmd); else if (command == "channelgroupcopy") return this->handleCommandChannelGroupCopy(cmd); else if (command == "channelgrouprename") return this->handleCommandChannelGroupRename(cmd); @@ -212,16 +213,16 @@ CommandResult ConnectedClient::handleCommand(Command &cmd) { else if (command == "channelgrouppermlist") return this->handleCommandChannelGroupPermList(cmd); else if (command == "channelgroupaddperm") return this->handleCommandChannelGroupAddPerm(cmd); else if (command == "channelgroupdelperm") return this->handleCommandChannelGroupDelPerm(cmd); - //Channel sub/unsubscribe + //Channel sub/unsubscribe else if (command == "channelsubscribe") return this->handleCommandChannelSubscribe(cmd); else if (command == "channelsubscribeall") return this->handleCommandChannelSubscribeAll(cmd); else if (command == "channelunsubscribe") return this->handleCommandChannelUnsubscribe(cmd); else if (command == "channelunsubscribeall") return this->handleCommandChannelUnsubscribeAll(cmd); - //manager channel permissions + //manager channel permissions else if (command == "channelclientpermlist") return this->handleCommandChannelClientPermList(cmd); else if (command == "channelclientaddperm") return this->handleCommandChannelClientAddPerm(cmd); else if (command == "channelclientdelperm") return this->handleCommandChannelClientDelPerm(cmd); - //Client actions + //Client actions else if (command == "clientupdate") return this->handleCommandClientUpdate(cmd); else if (command == "clientmove") return this->handleCommandClientMove(cmd); else if (command == "clientgetids") return this->handleCommandClientGetIds(cmd); @@ -237,14 +238,14 @@ CommandResult ConnectedClient::handleCommand(Command &cmd) { else if (command == "clientaddperm") return this->handleCommandClientAddPerm(cmd); else if (command == "clientdelperm") return this->handleCommandClientDelPerm(cmd); else if (command == "clientpermlist") return this->handleCommandClientPermList(cmd); - //File transfare + //File transfare else if (command == "ftgetfilelist") return this->handleCommandFTGetFileList(cmd); else if (command == "ftcreatedir") return this->handleCommandFTCreateDir(cmd); else if (command == "ftdeletefile") return this->handleCommandFTDeleteFile(cmd); else if (command == "ftinitupload") return this->handleCommandFTInitUpload(cmd); else if (command == "ftinitdownload") return this->handleCommandFTInitDownload(cmd); else if (command == "ftgetfileinfo") return this->handleCommandFTGetFileInfo(cmd); - //Banlist + //Banlist else if (command == "banlist") return this->handleCommandBanList(cmd); else if (command == "banadd") return this->handleCommandBanAdd(cmd); else if (command == "banedit") return this->handleCommandBanEdit(cmd); @@ -252,13 +253,13 @@ CommandResult ConnectedClient::handleCommand(Command &cmd) { else if (command == "bandel") return this->handleCommandBanDel(cmd); else if (command == "bandelall") return this->handleCommandBanDelAll(cmd); else if (command == "bantriggerlist") return this->handleCommandBanTriggerList(cmd); - //Tokens + //Tokens else if (command == "tokenlist" || command == "privilegekeylist") return this->handleCommandTokenList(cmd); else if (command == "tokenadd" || command == "privilegekeyadd") return this->handleCommandTokenAdd(cmd); else if (command == "tokenuse" || command == "privilegekeyuse") return this->handleCommandTokenUse(cmd); else if (command == "tokendelete" || command == "privilegekeydelete") return this->handleCommandTokenDelete(cmd); - //DB stuff + //DB stuff else if (command == "clientdblist") return this->handleCommandClientDbList(cmd); else if (command == "clientdbinfo") return this->handleCommandClientDbInfo(cmd); else if (command == "clientdbedit") return this->handleCommandClientDBEdit(cmd); @@ -345,6 +346,8 @@ CommandResult ConnectedClient::handleCommand(Command &cmd) { else if (command == "playlistsongremove") return this->handleCommandPlaylistSongRemove(cmd); else if (command == "dummy_ipchange") return this->handleCommandDummy_IpChange(cmd); + else if (command == "conversationhistory") return this->handleCommandConversationHistory(cmd); + else if (command == "conversationfetch") return this->handleCommandConversationFetch(cmd); if (this->getType() == ClientType::CLIENT_QUERY) return CommandResult::NotImplemented; //Dont log query invalid commands if (this->getType() == ClientType::CLIENT_TEAMSPEAK) @@ -1162,8 +1165,8 @@ CommandResult ConnectedClient::handleCommandChannelGroupAddPerm(Command &cmd) { permission::v2::PermissionUpdateType::set_value, permission::v2::PermissionUpdateType::do_nothing, - cmd[index]["permnegated"].as() ? 1 : 0, - cmd[index]["permskip"].as() ? 1 : 0 + cmd[index]["permskip"].as() ? 1 : 0, + cmd[index]["permnegated"].as() ? 1 : 0 ); updateList |= permission_is_group_property(permType); } @@ -2328,8 +2331,8 @@ CommandResult ConnectedClient::handleCommandChannelAddPerm(Command &cmd) { permission::v2::PermissionUpdateType::set_value, permission::v2::PermissionUpdateType::do_nothing, - cmd[index]["permnegated"].as() ? 1 : 0, - cmd[index]["permskip"].as() ? 1 : 0 + cmd[index]["permskip"].as() ? 1 : 0, + cmd[index]["permnegated"].as() ? 1 : 0 ); updateClients |= permission_is_client_property(permType); update_view |= permType == permission::i_channel_needed_view_power; @@ -2813,8 +2816,8 @@ CommandResult ConnectedClient::handleCommandServerGroupAddPerm(Command &cmd) { permission::v2::PermissionUpdateType::set_value, permission::v2::PermissionUpdateType::do_nothing, - cmd[index]["permnegated"].as() ? 1 : 0, - cmd[index]["permskip"].as() ? 1 : 0 + cmd[index]["permskip"].as() ? 1 : 0, + cmd[index]["permnegated"].as() ? 1 : 0 ); sgroupUpdate |= permission_is_group_property(permType); checkTp |= permission_is_client_property(permType); @@ -3210,6 +3213,10 @@ CommandResult ConnectedClient::handleCommandSendTextMessage(Command &cmd) { if(this->handleTextMessage(ChatMessageMode::TEXTMODE_CHANNEL, cmd["msg"], nullptr)) return CommandResult::Success; for (auto &cl : this->server->getClientsByChannel(this->currentChannel)) cl->notifyTextMessage(ChatMessageMode::TEXTMODE_CHANNEL, _this.lock(), this->getClientId(), cmd["msg"].string()); + + auto conversations = this->server->conversation_manager(); + auto conversation = conversations->get_or_create(this->currentChannel->channelId()); + conversation->register_message(this->getClientDatabaseId(), this->getUid(), this->getDisplayName(), cmd["msg"].string()); } else if (cmd["targetmode"] == ChatMessageMode::TEXTMODE_SERVER) { CACHED_PERM_CHECK(permission::b_client_server_textmessage_send, 1); @@ -7200,7 +7207,189 @@ CommandResult ConnectedClient::handleCommandDummy_IpChange(ts::Command &cmd) { return CommandResult::Success; } - +//conversationhistory cid=1 [cpw=xxx] [timestamp_begin] [timestamp_end (0 := no end)] [message_count (default 25| max 100)] [-merge] +CommandResult ConnectedClient::handleCommandConversationHistory(ts::Command &command) { + CMD_REF_SERVER(ref_server); + CMD_CHK_AND_INC_FLOOD_POINTS(25); + + if(!command[0].has("cid") || !command[0]["cid"].castable()) + return {findError("conversation_invalid_id")}; + + auto conversation_id = command[0]["cid"].as(); + /* test if we have access to the conversation */ + { + /* test if we're able to see the channel */ + { + shared_lock channel_view_lock(this->channel_lock); + auto channel = this->channel_view()->find_channel(conversation_id); + if(!channel) + return {findError("conversation_invalid_id")}; + } + + /* test if there is a channel password or join power which denies that we see the conversation */ + { + shared_lock channel_view_lock(ref_server->channel_tree_lock); + auto channel = ref_server->getChannelTree()->findChannel(conversation_id); + if(!channel) /* should never happen! */ + return {findError("conversation_invalid_id")}; + + if(!command[0].has("cpw")) + command[0]["cpw"] = ""; + + if (!channel->passwordMatch(command["cpw"], true)) + if (!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_join_ignore_password, 1, channel, true)) + return {findError("channel_invalid_password"), "invalid password"}; + + if(!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_ignore_join_power, 1, channel, true)) { + CHANNEL_PERMISSION_TEST(permission::i_channel_join_power, permission::i_channel_needed_join_power, channel, false); + } + } + } + + auto conversation_manager = ref_server->conversation_manager(); + auto conversation = conversation_manager->get(conversation_id); + if(!conversation) + return {ErrorType::DBEmpty}; + + system_clock::time_point timestamp_begin = system_clock::now(); + system_clock::time_point timestamp_end; + size_t message_count = 25; + + if(command[0].has("timestamp_begin")) + timestamp_begin = system_clock::time_point{} + milliseconds(command[0]["timestamp_begin"].as()); + + if(command[0].has("timestamp_end")) + timestamp_end = system_clock::time_point{} + milliseconds(command[0]["timestamp_end"].as()); + + if(command[0].has("message_count")) + message_count = command[0]["message_count"].as(); + + if(timestamp_begin < timestamp_end) + return {findError("parameter_invalid")}; + if(message_count > 100) + message_count = 100; + + auto messages = conversation->message_history(timestamp_begin, message_count + 1, timestamp_end); /* query one more to test for more data */ + if(messages.empty()) + return {ErrorType::DBEmpty}; + bool more_data = messages.size() > message_count; + if(more_data) + messages.pop_back(); + + Command notify(this->notify_response_command("notifyconversationhistory")); + size_t index = 0; + size_t length = 0; + bool merge = command.hasParm("merge"); + for(auto& message : messages) { + if(index == 0) + notify[index]["cid"] = conversation_id; + + notify[index]["timestamp"] = duration_cast(message->message_timestamp.time_since_epoch()).count(); + notify[index]["sender_database_id"] = message->sender_database_id; + notify[index]["sender_unique_id"] = message->sender_unique_id; + notify[index]["sender_name"] = message->sender_name; + + notify[index]["msg"] = message->message; + length += message->message.size(); + length += message->sender_name.size(); + length += message->sender_unique_id.size(); + if(length > 1024 * 8 || !merge) { + index = 0; + this->sendCommand(notify); + notify = Command{this->notify_response_command("notifyconversationhistory")}; + } else + index++; + } + if(index > 0) + this->sendCommand(notify); + + if(more_data) + return {findError("conversation_more_data")}; + + return CommandResult::Success; +} + +CommandResult ConnectedClient::handleCommandConversationFetch(ts::Command &cmd) { + CMD_REF_SERVER(ref_server); + CMD_CHK_AND_INC_FLOOD_POINTS(25); + + + Command result(this->notify_response_command("notifyconversationindex")); + size_t result_index = 0; + + auto conversation_manager = ref_server->conversation_manager(); + for(size_t index = 0; index < cmd.bulkCount(); index++) { + auto& bulk = cmd[index]; + + if(!bulk.has("cid") || !bulk["cid"].castable()) + continue; + auto conversation_id = bulk["cid"].as(); + + auto& result_bulk = result[result_index++]; + result_bulk["cid"] = conversation_id; + + /* test if we have access to the conversation */ + { + /* test if we're able to see the channel */ + { + shared_lock channel_view_lock(this->channel_lock); + auto channel = this->channel_view()->find_channel(conversation_id); + if(!channel) { + auto error = findError("conversation_invalid_id"); + result_bulk["error_id"] = error.errorId; + result_bulk["error_msg"] = error.message; + continue; + } + } + + /* test if there is a channel password or join power which denies that we see the conversation */ + { + shared_lock channel_view_lock(ref_server->channel_tree_lock); + auto channel = ref_server->getChannelTree()->findChannel(conversation_id); + if(!channel) { /* should never happen! */ + auto error = findError("conversation_invalid_id"); + result_bulk["error_id"] = error.errorId; + result_bulk["error_msg"] = error.message; + continue; + } + + if(!bulk.has("cpw")) + bulk["cpw"] = ""; + + if (!channel->passwordMatch(bulk["cpw"], true)) + if (!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_join_ignore_password, 1, channel, true)) { + auto error = findError("channel_invalid_password"); + result_bulk["error_id"] = error.errorId; + result_bulk["error_msg"] = error.message; + continue; + } + + if(!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_ignore_join_power, 1, channel, true)) { + auto permission_granted = this->calculate_permission_value(permission::i_channel_join_power, channel->channelId()); + if(!channel->permission_granted(permission::i_channel_needed_join_power, permission_granted, false)) { + auto error = findError("server_insufficeient_permissions"); + result_bulk["error_id"] = error.errorId; + result_bulk["error_msg"] = error.message; + result_bulk["failed_permid"] = permission::i_channel_join_power; + continue; + } + } + } + } + + auto conversation = conversation_manager->get(conversation_id); + if(conversation) + result_bulk["timestamp"] = duration_cast(conversation->last_message().time_since_epoch()).count(); + else + result_bulk["timestamp"] = 0; + } + if(result_index == 0) + return {ErrorType::DBEmpty}; + this->sendCommand(result); + + + return CommandResult::Success; +} diff --git a/server/src/client/ConnectedClientTextCommandHandler.cpp b/server/src/client/ConnectedClientTextCommandHandler.cpp index ca68ef2..e60837f 100644 --- a/server/src/client/ConnectedClientTextCommandHandler.cpp +++ b/server/src/client/ConnectedClientTextCommandHandler.cpp @@ -1,11 +1,14 @@ #include -#include -#include -#include -#include +#include #include -#include "ConnectedClient.h" -#include "src/client/voice/VoiceClient.h" +#include + +#include "../InstanceHandler.h" +#include "../manager/ConversationManager.h" +#include "../music/MusicBotManager.h" +#include "../client/music/MusicClient.h" +#include "../client/voice/VoiceClient.h" +#include "./ConnectedClient.h" using namespace ts; using namespace ts::server; @@ -82,6 +85,8 @@ inline std::string bot_volume(float vol) { bool ConnectedClient::handleTextMessage(ChatMessageMode mode, std::string text, const std::shared_ptr& target) { if (text.length() < ts::config::music::command_prefix.length()) return false; if (text.find(ts::config::music::command_prefix) != 0) return false; + if(!this->currentChannel) + return false; std::string command = text.substr(ts::config::music::command_prefix.length()); auto arguments = command.find(' ') != -1 ? split(command.substr(command.find(' ') + 1), " ") : deque{}; @@ -669,6 +674,40 @@ bool ConnectedClient::handle_text_command( threads::self::sleep_until(end); send_message(_this.lock(), "Done!"); return true; + } else if(command == "conversation") { + if(TARG(0, "history")) { + system_clock::time_point timestamp_begin = system_clock::now(); + system_clock::time_point timestamp_end; + size_t message_count = 100; + + if(arguments.size() > 1) { + timestamp_begin -= seconds(stoll(arguments[1])); + } + if(arguments.size() > 2) { + timestamp_end = system_clock::now() - seconds(stoll(arguments[2])); + } + if(arguments.size() > 3) { + message_count = stoll(arguments[3]); + } + + auto time_str = [](const system_clock::time_point& tp) { + using system_clock_duration = std::chrono::system_clock::duration; + auto converted_timep = std::chrono::time_point_cast(tp); + auto seconds_since_epoch = std::chrono::system_clock::to_time_t(tp); + + ostringstream os; + os << std::put_time(std::localtime(&seconds_since_epoch), "%Y %b %d %H:%M:%S"); + return os.str(); + }; + send_message(_this.lock(), "Looking up history from " + time_str(timestamp_end) + " to " + time_str(timestamp_begin) + ". Max messages: " + to_string(message_count)); + auto conversation = this->server->conversation_manager()->get_or_create(this->currentChannel->channelId()); + auto data = conversation->message_history(timestamp_begin, message_count, timestamp_end); + send_message(_this.lock(), "Entries: " + to_string(data.size())); + for(auto& entry : data) { + send_message(_this.lock(), "<" + time_str(entry->message_timestamp) + ">" + entry->sender_name + ": " + entry->message); + } + return true; + } } send_message(serverInstance->musicRoot(), "Invalid channel command."); diff --git a/server/src/client/query/QueryClient.h b/server/src/client/query/QueryClient.h index d215232..d430c7c 100644 --- a/server/src/client/query/QueryClient.h +++ b/server/src/client/query/QueryClient.h @@ -38,8 +38,6 @@ namespace ts { bool ignoresFlood() override; inline std::shared_ptr getQueryAccount() { return this->query_account; } - - std::shared_recursive_mutex server_lock; protected: void preInitialize(); void postInitialize(); diff --git a/server/src/client/query/QueryClientCommands.cpp b/server/src/client/query/QueryClientCommands.cpp index cc661b4..69efa71 100644 --- a/server/src/client/query/QueryClientCommands.cpp +++ b/server/src/client/query/QueryClientCommands.cpp @@ -318,7 +318,6 @@ CommandResult QueryClient::handleCommandServerSelect(Command &cmd) { //register at current server { - unique_lock server_lock(this->server_lock); this->server = target; } diff --git a/server/src/manager/ConversationManager.cpp b/server/src/manager/ConversationManager.cpp new file mode 100644 index 0000000..e4e6e6f --- /dev/null +++ b/server/src/manager/ConversationManager.cpp @@ -0,0 +1,975 @@ +#include "./ConversationManager.h" +#include "../InstanceHandler.h" +#include "../TSServer.h" + +#include +#include +#include + +/* for the file */ +#include +#include + +using namespace std; +using namespace std::chrono; +using namespace ts; +using namespace ts::server; +using namespace ts::server::conversation; + +namespace fs = std::experimental::filesystem; + +Conversation::Conversation(const std::shared_ptr &handle, ts::ChannelId channel_id, const std::string& file) : _ref_handle(handle), _channel_id(channel_id), file_name(file) { } + +Conversation::~Conversation() { + this->finalize(); +} + +bool Conversation::initialize(std::string& error) { + auto ref_self = this->_ref_self.lock(); + assert(ref_self); + + auto handle = this->_ref_handle.lock(); + assert(handle); + + auto ref_server = handle->ref_server(); + if(!ref_server) { + error = "invalid server ref"; + return false; + } + + auto file = fs::u8path(this->file_name); + if(!fs::is_directory(file.parent_path())) { + debugMessage(ref_server->getServerId(), "[Conversations] Creating conversation containing directory {}", file.parent_path().string()); + try { + fs::create_directories(file.parent_path()); + } catch(fs::filesystem_error& ex) { + error = "failed to create data directories (" + to_string(ex.code().value()) + "|" + string(ex.what()) + ")"; + return false; + } + } + + this->file_handle = fopen(this->file_name.c_str(), fs::exists(file) ? "r+" : "w+"); + if(!this->file_handle) { + error = "failed to open file"; + return false; + } + setbuf(this->file_handle, nullptr); /* we're doing random access (a buffer is useless here) */ + + auto sql = ref_server->getSql(); + + auto result = sql::command(sql, "SELECT `begin_timestamp`, `end_timestamp`, `block_offset`, `flags` FROM `conversation_blocks` WHERE `server_id` = :sid AND `conversation_id` = :cid", + variable{":sid", ref_server->getServerId()}, variable{":cid", this->_channel_id}).query([&](int count, std::string* values, std::string* names) { + std::chrono::system_clock::time_point begin_timestamp{}, end_timestamp{}; + uint64_t block_offset = 0; + uint16_t flags = 0; + + try { + for(int index = 0; index < count; index++) { + if(names[index] == "begin_timestamp") + begin_timestamp += milliseconds(stoll(values[index])); + else if(names[index] == "end_timestamp") + end_timestamp += milliseconds(stoll(values[index])); + else if(names[index] == "block_offset") + block_offset = stoull(values[index]); + else if(names[index] == "flags") + flags = (uint16_t) stoll(values[index]); + } + } catch(std::exception& ex) { + logError(ref_server->getServerId(), "[Conversations] Failed to parse conversation block entry! Exception: {}", ex.what()); + return 0; + } + + auto block = make_shared(db::MessageBlock{ + begin_timestamp, + end_timestamp, + + block_offset, + {flags}, + + nullptr, + nullptr + }); + + /* we dont load invalid blocks */ + if(block->flag_invalid) + return 0; + + this->message_blocks.push_back(block); + return 0; + }); + LOG_SQL_CMD(result); + + /* lets find the last block */ + if(!this->message_blocks.empty()) { + debugMessage(ref_server->getServerId(), "[Conversations][{}] Loaded {} blocks. Trying to find last block.", this->_channel_id, this->message_blocks.size()); + deque> open_blocks; + for(auto& block : this->message_blocks) + if(!block->flag_finished) + open_blocks.push_back(block); + + logTrace(ref_server->getServerId(), "[Conversations][{}] Found {} unfinished blocks. Searching for the \"latest\" and closing all previous blocks.", this->_channel_id, open_blocks.size()); + shared_ptr latest_block; + const auto calculate_latest_block = [&](bool open_only) { + latest_block = nullptr; + for(auto& block : open_blocks) { + if(block->flag_invalid) + continue; + + if(!latest_block || latest_block->begin_timestamp < block->begin_timestamp) { + if(latest_block) { + latest_block->flag_finished_later = true; + latest_block->flag_finished = true; + this->db_save_block(latest_block); + } + + latest_block = block; + } + } + }; + calculate_latest_block(true); + + if(latest_block) { + logTrace(ref_server->getServerId(), "[Conversations][{}] Found a latest block at index {}. Verify block with file.", this->_channel_id, latest_block->block_offset); + + const auto verify_block = [&] { + auto result = fseek(this->file_handle, 0, SEEK_END); + if(result != 0) { + error = "failed to seek to the end (" + to_string(result) + " | " + to_string(errno) + ")"; + return; + } + + auto file_size = ftell(this->file_handle); + if(file_size < 0) { + error = "failed to tell the end position (" + to_string(file_size) + " | " + to_string(errno) + ")"; + return; + } + logTrace(ref_server->getServerId(), "[Conversations][{}] File total size {}, last block index {}", this->_channel_id, file_size, latest_block->block_offset); + if(file_size < (latest_block->block_offset + sizeof(fio::BlockHeader))) { + latest_block->flag_finished_later = true; + latest_block->flag_invalid = true; + this->finish_block(latest_block, false); + + logTrace(ref_server->getServerId(), "[Conversations][{}] File total size is less than block requires. Appending a new block at the end of the file.", this->_channel_id, latest_block->block_offset); + latest_block = nullptr; + return; + } + + if(!this->load_message_block_header(latest_block, error)) { + latest_block->flag_finished_later = true; + latest_block->flag_invalid = true; + this->finish_block(latest_block, false); + + logTrace(ref_server->getServerId(), "[Conversations][{}] Failed to load latest block at file index {}: {}. Appending an new one.", this->_channel_id, latest_block->block_offset, error); + error = ""; + latest_block = nullptr; + return; + } + + /* We've a valid last block. Now the general write function could decide if we want a new block. */ + this->last_message_block = latest_block; + logTrace(ref_server->getServerId(), "[Conversations][{}] Last db saved block valid. Reusing it.", this->_channel_id, latest_block->block_offset, error); + }; + verify_block(); + if(!error.empty()) { + latest_block->flag_finished_later = true; + latest_block->flag_invalid = true; + this->finish_block(latest_block, false); + + logError(ref_server->getServerId(), "[Conversations][{}] Could not verify last block. Appending a new one at the end of the file.", this->_channel_id); + latest_block = nullptr; + } + error = ""; + } else { + logTrace(ref_server->getServerId(), "[Conversations][{}] Found no open last block. Using a new one.", this->_channel_id); + } + } else { + debugMessage(ref_server->getServerId(), "[Conversations][{}] Found no blocks. Creating new at the end of the file.", this->_channel_id, this->message_blocks.size()); + } + + std::stable_sort(this->message_blocks.begin(), this->message_blocks.end(), [](const shared_ptr& a, const shared_ptr& b) { return a->begin_timestamp < b->begin_timestamp; }); + this->_write_event = make_shared>(ts::event::ProxiedEventEntry(ref_self, &Conversation::process_write_queue)); + + /* set the last message timestamp property */ + { + auto last_message = this->message_history(1); + if(!last_message.empty()) + this->_last_message_timestamp = last_message.back()->message_timestamp; + else + this->_last_message_timestamp = system_clock::time_point{}; + } + return true; +} + +void Conversation::finalize() { + this->_write_event = nullptr; /* we dont want to schedule/execute new events! */ + this->_write_loop_lock.lock(); /* wait until current write proceed */ + this->_write_loop_lock.unlock(); + + //TODO: May flush? + + { + lock_guard lock(this->message_block_lock); + this->message_blocks.clear(); + } +} + +void Conversation::cleanup_cache() { + //FIXME: Implement this shit here! +} + +ssize_t Conversation::fread(void *target, size_t length, ssize_t index) { + if(length == 0) + return 0; + + lock_guard file_lock(this->file_handle_lock); + if(index >= 0) { + auto result = fseek(this->file_handle, index, SEEK_SET); + if(result < 0) + return -2; + } + + size_t total_read = 0; + while(total_read < length) { + auto read = ::fread_unlocked((char*) target + total_read, 1, length - total_read, this->file_handle); + if(read <= 0) + return -1; + total_read += read; + } + return total_read; +} + +ssize_t Conversation::fwrite(void *target, size_t length, ssize_t index, bool extend_file) { + if(length == 0) + return 0; + + extend_file = false; /* fseek does the job good ad well */ + lock_guard file_lock(this->file_handle_lock); + if(index >= 0) { + auto result = extend_file ? lseek(fileno(this->file_handle), index, SEEK_SET) : fseek(this->file_handle, index, SEEK_SET); + if(result < 0) + return -2; + } + + size_t total_written = 0; + while(total_written < length) { + auto written = ::fwrite_unlocked((char*) target + total_written, 1, length - total_written, this->file_handle); + if(written <= 0) + return -1; + total_written += written; + } + return total_written; +} + +bool Conversation::load_message_block_header(const std::shared_ptr &block, std::string &error) { + if(block->block_header) + return true; + + auto block_header = make_unique(); + if(this->fread(&*block_header, sizeof(*block_header), block->block_offset) != sizeof(*block_header)) { + error = "failed to read block header"; + return false; + } + if(block_header->version != 1) { + error = "block version missmatch (block version: " + to_string(block_header->version) + ", current version: 1)"; + return false; + } + + if(block_header->cookie != fio::BlockHeader::HEADER_COOKIE) { + error = "block cookie missmatch"; + return false; + } + + block->block_header = move(block_header); + return true; +} + +bool Conversation::load_message_block_index(const std::shared_ptr &block, std::string& error) { + if(block->indexed_block) + return true; + + auto index = make_shared(); + index->successfully = false; + { + if(!this->load_message_block_header(block, error)) { + error = "failed to load block header: " + error; + return false; + } + auto block_header = block->block_header; + if(!block_header) { + error = "failed to reference block header "; + return false; + } + + if(block_header->block_size > fio::BlockHeader::MAX_BLOCK_SIZE) { + error = "block contains too many messages (" + to_string(block_header->block_size) + ")"; + return false; + } + + size_t offset = block->block_offset; + offset += sizeof(fio::BlockHeader); + size_t max_offset = block->block_offset + block_header->block_size; /* block_size := Written size, must be smaller or equal to the max size, except max size is 0 */ + + fio::MessageHeader header{}; + while(offset < max_offset) { + if(this->fread(&header, sizeof(header), offset) != sizeof(header)) { + error = "failed to read message header at index" + to_string(offset); + return false; + } + if(header.cookie != fio::MessageHeader::HEADER_COOKIE) { + error = "failed to verify message header cookie at index " + to_string(offset); + return false; + } + index->message_index.emplace_back(fio::IndexedMessage{(uint32_t) (offset - block->block_offset), system_clock::time_point{} + milliseconds{header.message_timestamp}, std::shared_ptr{nullptr}}); + offset += header.total_length; + } + } + block->indexed_block = index; + return true; +} + +bool Conversation::load_messages(const std::shared_ptr &block, size_t index, size_t end_index, std::string &error) { + if(!this->load_message_block_index(block, error)) { + error = "failed to index block: " + error; + return false; + } + + auto indexed_block = block->indexed_block; + auto header = block->block_header; + if(!indexed_block || !header) { + error = "failed to reference required data"; + return false; + } + + /* Note: We dont lock message_index_lock here because the write thread only increases the list and dont take stuff away where we could pointing at! */ + if(index >= indexed_block->message_index.size()) + return true; + + auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].offset, SEEK_SET); + if(result == EINVAL) { + error = "failed to seek to begin of an indexed block read"; + return false; + } + while(index < end_index && index < indexed_block->message_index.size()) { + auto& message_data = indexed_block->message_index[index]; + if(message_data.message_data) { + index++; + continue; + } + + auto data = make_shared(); + if(this->fread(&data->header, sizeof(data->header), -1) != sizeof(data->header)) { + error = "failed to read message header at index " + to_string(index); + return false; + } + + if(data->header.cookie != fio::MessageHeader::HEADER_COOKIE) { + error = "failed to verify message header at " + to_string(index); + return false; + } + + data->sender_unique_id.resize(data->header.sender_unique_id_length); + data->sender_name.resize(data->header.sender_name_length); + data->message.resize(data->header.message_length); + + if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1) != data->sender_unique_id.length()) { + error = "failed to read message sender unique id at " + to_string(index); + return false; + } + + if(this->fread(data->sender_name.data(), data->sender_name.length(), -1) != data->sender_name.length()) { + error = "failed to read message sender name id at " + to_string(index); + return false; + } + + if(this->fread(data->message.data(), data->message.length(), -1) != data->message.length()) { + error = "failed to read message id at " + to_string(index); + return false; + } + + if(header->message_encrypted) { + uint64_t crypt_key = block->block_offset ^ data->header.message_timestamp; + size_t length_left = data->message.size(); + auto ptr = (char*) data->message.data(); + while(length_left >= 8) { + crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; + *(uint64_t*) ptr ^= crypt_key; + ptr += 8; + length_left -= 8; + } + while(length_left > 0) { + crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; + *ptr ^= (uint8_t) crypt_key; + length_left--; + ptr++; + } + } + + message_data.message_data = data; + index++; + } + + return true; +} + +void Conversation::finish_block(const std::shared_ptr &block, bool write_file) { + if(block->flag_finished) + return; + + auto handle = this->_ref_handle.lock(); + sassert(handle); + if(!handle) + return; + + auto ref_server = handle->ref_server(); + if(!ref_server) + return; + + block->flag_finished = true; + + if(write_file) { + string error; + auto result = this->load_message_block_header(block, error); + auto header = block->block_header; + result &= !!header; /* only success if we really have a header */ + if(result) { + if(header->block_max_size == 0) { + header->block_max_size = header->block_size; + header->finished = true; + if(!this->write_block_header(header, block->block_offset, error)) { + logError(ref_server->getServerId(), "Failed to finish block because block header could not be written: {}", error); + block->flag_invalid = true; /* because we cant set the block size we've to declare that block as invalid */ + } + } + } else { + logError(ref_server->getServerId(), "Failed to finish block because block header could not be set: {}", error); + block->flag_invalid = true; /* because we cant set the block size we've to declare that block as invalid */ + } + } else { + block->flag_invalid = true; /* because we dont write the block we cant ensure a valid block */ + } + + this->db_save_block(block); +} + +bool Conversation::write_block_header(const std::shared_ptr &header, size_t index, std::string &error) { + auto code = this->fwrite(&*header, sizeof(fio::BlockHeader), index, false); + if(code == sizeof(fio::BlockHeader)) + return true; + error = "write returned " + to_string(code); + return false; +} + +void Conversation::process_write_queue(const std::chrono::system_clock::time_point &scheduled_time) { + unique_lock write_lock(this->_write_loop_lock, try_to_lock); + if(!write_lock.owns_lock()) /* we're already writing if this lock fails */ + return; + + unique_lock write_queue_lock(this->_write_queue_lock, defer_lock); + std::shared_ptr write_entry; + fio::MessageHeader write_header{}; + std::shared_ptr block_header; + auto handle = this->_ref_handle.lock(); + sassert(handle); + if(!handle) + return; + + auto ref_server = handle->ref_server(); + if(!ref_server) + return; + + write_header.cookie = fio::MessageHeader::HEADER_COOKIE; + write_header.message_flags = 0; + + while(true) { + write_queue_lock.lock(); + if(this->_write_queue.empty()) + break; + write_entry = this->_write_queue.front(); + this->_write_queue.pop_front(); + write_queue_lock.unlock(); + + /* calculate the write message total length */ + write_header.message_length = (uint16_t) min(write_entry->message.size(), (size_t) (65536 - 1)); + write_header.sender_name_length = (uint8_t) min(write_entry->sender_name.size(), (size_t) 255); + write_header.sender_unique_id_length = (uint8_t) min(write_entry->sender_unique_id.size(), (size_t) 255); + write_header.total_length = sizeof(write_header) + write_header.sender_unique_id_length + write_header.sender_name_length + write_header.message_length; + + /* verify last block */ + if(this->last_message_block) { + block_header = this->last_message_block->block_header; + + if(!block_header) { + logError(ref_server->getServerId(), "[Conversations][{}] Current last block contains no header! Try to finish it and creating a new one.", this->_channel_id); + this->finish_block(this->last_message_block, true); + this->last_message_block = nullptr; + } else if(this->last_message_block->flag_finished) + this->last_message_block = nullptr; + else { + if(this->last_message_block->begin_timestamp + hours(24) < scheduled_time) { + debugMessage(ref_server->getServerId(), "[Conversations][{}] Beginning new block. Old block is older than 24hrs. ({})", this->_channel_id, this->last_message_block->begin_timestamp.time_since_epoch().count()); + this->finish_block(this->last_message_block, true); + this->last_message_block = nullptr; + } else if((block_header->block_max_size != 0 && block_header->block_size + write_header.total_length >= block_header->block_max_size) || block_header->block_size > fio::BlockHeader::MAX_BLOCK_SIZE){ + debugMessage(ref_server->getServerId(), "[Conversations][{}] Beginning new block. Old block would exceed his space (Current index: {}, Max index: {}, Soft cap: {}, Message size: {}).", + this->_channel_id, block_header->block_size, block_header->block_max_size, fio::BlockHeader::MAX_BLOCK_SIZE, write_header.total_length); + this->finish_block(this->last_message_block, true); + this->last_message_block = nullptr; + } else if(block_header->message_version != 1){ + debugMessage(ref_server->getServerId(), "[Conversations][{}] Beginning new block. Old block had another message version (Current version: {}, Block version: {}).", this->_channel_id, 1, block_header->message_version); + this->finish_block(this->last_message_block, true); + this->last_message_block = nullptr; + } + } + } + /* test if we have a block or create a new one at the begin of the file */ + if(!this->last_message_block) { + //Note: If we reuse blocks we've to reorder them within message_blocks (newest blocks are at the end) + //TODO: Find "free" blocks and use them! (But do not use indirectly finished blocks, their max size could be invalid) + + unique_lock file_lock(this->file_handle_lock); + auto result = fseek(this->file_handle, 0, SEEK_END); + if(result != 0) { + logError(ref_server->getServerId(), "[Conversations][{}] failed to seek to the end (" + to_string(result) + " " + to_string(errno) + "). Could not create new block. Dropping message!", this->_channel_id); + return; + } + + auto file_size = ftell(this->file_handle); + if(file_size < 0) { + logError(ref_server->getServerId(), "[Conversations][{}] failed to tell the end position (" + to_string(result) + " " + to_string(errno) + "). Could not create new block. Dropping message!", this->_channel_id); + return; + } + file_lock.unlock(); + this->last_message_block = this->db_create_block((uint64_t) file_size); + if(!this->last_message_block) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to create a new block within database. Dropping message!", this->_channel_id); + return; + } + block_header = make_shared(); + memset(&*block_header, 0, sizeof(fio::BlockHeader)); + + block_header->version = 1; + block_header->message_version = 1; + block_header->cookie = fio::BlockHeader::HEADER_COOKIE; + block_header->first_message_timestamp = (uint64_t) duration_cast(write_entry->message_timestamp.time_since_epoch()).count(); + block_header->block_size = sizeof(fio::BlockHeader); + + //block_header->message_encrypted = true; /* May add some kind of hidden debug option? */ + this->last_message_block->block_header = block_header; + } + + auto entry_offset = this->last_message_block->block_offset + sizeof(fio::BlockHeader) + block_header->last_message_offset; + write_header.sender_database_id = write_entry->sender_database_id; + write_header.message_timestamp = (uint64_t) duration_cast(write_entry->message_timestamp.time_since_epoch()).count(); + block_header->last_message_timestamp = write_header.message_timestamp; + + /* first write the header */ + if(this->fwrite(&write_header, sizeof(write_header), entry_offset, true) != sizeof(write_header)) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message header. Dropping message!", this->_channel_id); + return; + } + entry_offset += sizeof(write_header); + + /* then write the sender unique id */ + if(this->fwrite(write_entry->sender_unique_id.data(), write_header.sender_unique_id_length, entry_offset, true) != write_header.sender_unique_id_length) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender unique id. Dropping message!", this->_channel_id); + return; + } + entry_offset += write_header.sender_unique_id_length; + + /* then write the sender name */ + if(this->fwrite(write_entry->sender_name.data(), write_header.sender_name_length, entry_offset, true) != write_header.sender_name_length) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender name. Dropping message!", this->_channel_id); + return; + } + entry_offset += write_header.sender_name_length; + + /* then write the message */ + bool message_result; + if(block_header->message_encrypted) { + uint64_t crypt_key = this->last_message_block->block_offset ^ write_header.message_timestamp; + size_t length_left = write_entry->message.size(); + auto ptr = (char*) write_entry->message.data(); + char* target_buffer = (char*) malloc(length_left); + char* target_buffer_ptr = target_buffer; + assert(target_buffer); + + while(length_left >= 8) { + crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; + *(uint64_t*) target_buffer_ptr = crypt_key; + ptr += 8; + target_buffer_ptr += 8; + length_left -= 8; + } + while(length_left > 0) { + crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; + *target_buffer_ptr = *ptr ^ (uint8_t) crypt_key; + length_left--; + ptr++; + target_buffer_ptr++; + } + + message_result = this->fwrite(target_buffer, write_header.message_length, entry_offset, true) == write_header.message_length; + free(target_buffer); + } else { + message_result = this->fwrite(write_entry->message.data(), write_header.message_length, entry_offset, true) == write_header.message_length; + } + if(!message_result) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message. Dropping message!", this->_channel_id); + return; + } + entry_offset += write_header.message_length; + + block_header->last_message_offset = (uint32_t) (entry_offset - this->last_message_block->block_offset - sizeof(fio::BlockHeader)); + block_header->block_size += write_header.total_length; + block_header->message_count += 1; + + auto indexed_block = this->last_message_block->indexed_block; + if(indexed_block) { + lock_guard lock(indexed_block->message_index_lock); + indexed_block->message_index.push_back(fio::IndexedMessage{(uint32_t) (entry_offset - this->last_message_block->block_offset), write_entry->message_timestamp, nullptr}); + } + } + + if(write_header.total_length != 0) {/* will be set when at least one message has been written */ + this->db_save_block(this->last_message_block); + + string error; + if(!this->write_block_header(block_header, this->last_message_block->block_offset, error)) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to write block header after message write ({}). This could cause data loss!", this->_channel_id, error); + } + } +} + +std::shared_ptr Conversation::db_create_block(uint64_t offset) { + auto result = make_shared(); + result->block_offset = offset; + result->begin_timestamp = system_clock::now(); + result->end_timestamp = system_clock::now(); + result->flags = 0; + result->flag_used = true; + result->flag_invalid = true; /* first set it to invalid for the database so it becomes active as soon somebody uses it */ + + auto handle = this->_ref_handle.lock(); + assert(handle); + + auto ref_server = handle->ref_server(); + if(!ref_server) + return nullptr; + + auto sql = ref_server->getSql(); + if(!sql) + return nullptr; + + //`server_id` INT, `conversation_id` INT, `begin_timestamp` INT, `end_timestamp` INT, `block_offset` INT, `flags` INT + auto sql_result = sql::command(sql, "INSERT INTO `conversation_blocks` (`server_id`, `conversation_id`, `begin_timestamp`, `end_timestamp`, `block_offset`, `flags`) VALUES (:sid, :cid, :bt, :et, :bo, :f);", + variable{":sid", ref_server->getServerId()}, + variable{":cid", this->_channel_id}, + variable{":bt", duration_cast(result->begin_timestamp.time_since_epoch()).count()}, + variable{":et", duration_cast(result->end_timestamp.time_since_epoch()).count()}, + variable{":bo", offset}, + variable{":f", result->flags} + ).executeLater(); + sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"}); + + { + lock_guard lock(this->message_block_lock); + this->message_blocks.push_back(result); + } + + result->flag_invalid = false; + return result; +} + +void Conversation::db_save_block(const std::shared_ptr &block) { + auto handle = this->_ref_handle.lock(); + assert(handle); + + auto ref_server = handle->ref_server(); + if(!ref_server) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to update block db info (server expired)", this->_channel_id); + return; + } + + auto sql = ref_server->getSql(); + if(!sql) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to update block db info (sql expired)", this->_channel_id); + return; + } + + auto sql_result = sql::command(sql, "UPDATE `conversation_blocks` SET `end_timestamp` = :et, `flags` = :f WHERE `server_id` = :sid AND `conversation_id` = :cid AND `begin_timestamp` = :bt AND `block_offset` = :bo", + variable{":sid", ref_server->getServerId()}, + variable{":cid", this->_channel_id}, + variable{":bt", duration_cast(block->begin_timestamp.time_since_epoch()).count()}, + variable{":et", duration_cast(block->end_timestamp.time_since_epoch()).count()}, + variable{":bo", block->block_offset}, + variable{":f", block->flags} + ).executeLater(); + sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"}); +} + +void Conversation::register_message(ts::ClientDbId sender_database_id, const std::string &sender_unique_id, const std::string &sender_name, const std::string &message) { + auto entry = make_shared(); + entry->message_timestamp = system_clock::now(); + this->_last_message_timestamp = entry->message_timestamp; + entry->message = message; + entry->sender_name = sender_name; + entry->sender_unique_id = sender_unique_id; + entry->sender_database_id = sender_database_id; + + { + lock_guard lock(this->_last_messages_lock); + this->_last_messages.push_back(entry); + while(this->_last_messages.size() > this->_last_messages_limit) + this->_last_messages.pop_front(); /* TODO: Use a iterator for delete to improve performance */ + } + if(!this->volatile_only()) { + { + lock_guard lock(this->_write_queue_lock); + this->_write_queue.push_back(entry); + } + auto executor = serverInstance->getConversationIo(); + executor->schedule(this->_write_event); + } +} + +std::deque> Conversation::message_history(const std::chrono::system_clock::time_point &end_timestamp, size_t message_count, const std::chrono::system_clock::time_point &begin_timestamp) { + std::deque> result; + if(message_count == 0) + return result; + + /* first try to fillout the result with the cached messages */ + { + lock_guard lock(this->_last_messages_lock); + //TODO: May just insert the rest of the iterator instead of looping? + for(auto it = this->_last_messages.rbegin(); it != this->_last_messages.rend(); it++) { + if((*it)->message_timestamp > end_timestamp) /* message has been send after the search timestamp */ + continue; + result.push_back(*it); + if(--message_count == 0) + return result; + } + } + + if(!this->volatile_only()) { + auto timestamp = result.empty() ? end_timestamp : result.back()->message_timestamp; + + unique_lock lock(this->message_block_lock); + auto rit = this->message_blocks.end(); + if(rit != this->message_blocks.begin()) { + bool found = false; + do { + rit--; + if((*rit)->begin_timestamp < timestamp) { + found = true; + break; /* we found the first block which is created before the point we're searching from */ + } + } while(rit != this->message_blocks.begin()); + + + string error; + if(found) { + vector> relevant_entries{this->message_blocks.begin(), ++rit}; + lock.unlock(); + + auto _rit = --relevant_entries.end(); + do { + auto block = *_rit; + /* lets search for messages */ + if(!this->load_message_block_index(block, error)) { + //TODO: Log error + continue; + } + auto index = (*_rit)->indexed_block; + if(!index) { + //TODO Log error + continue; + } + + lock_guard index_lock{index->message_index_lock}; + auto rmid = index->message_index.end(); + if(rmid == index->message_index.begin()) + continue; /* Empty block? Funny */ + + auto block_found = false; + do { + rmid--; + if((*rmid).timestamp < timestamp) { + block_found = true; + break; /* we found the first block which is created before the point we're searching from */ + } + } while(rmid != index->message_index.begin()); + if(!block_found) + continue; + + + if(!this->load_messages(block, 0, std::distance(index->message_index.begin(), rmid) + 1, error)) { + //TODO: Log error + continue; + } + do { + auto data = rmid->message_data; + if(!data) + continue; + + if(begin_timestamp.time_since_epoch().count() != 0 && rmid->timestamp < begin_timestamp) + return result; + /* + std::chrono::system_clock::time_point message_timestamp; + + ClientDbId sender_database_id; + std::string sender_unique_id; + std::string sender_name; + + std::string message; + */ + result.push_back(make_shared(ConversationEntry{ + rmid->timestamp, + + (ClientDbId) data->header.sender_database_id, + data->sender_unique_id, + data->sender_name, + + data->message + })); + if(--message_count == 0) + return result; + } while(rmid-- != index->message_index.begin()); + } while(_rit-- != relevant_entries.begin()); + } + } + } + + return result; +} + +ConversationManager::ConversationManager(const std::shared_ptr &server) : _ref_server(server) { } + +ConversationManager::~ConversationManager() { } + +void ConversationManager::initialize(const std::shared_ptr &_this) { + assert(&*_this == this); + this->_ref_this = _this; + + auto ref_server = this->ref_server(); + assert(ref_server); + auto sql = ref_server->getSql(); + assert(sql); + + auto result = sql::command(sql, "SELECT `conversation_id`, `file_path` FROM `conversations` WHERE `server_id` = :sid", variable{":sid", ref_server->getServerId()}).query([&](int count, std::string* values, std::string* names) { + ChannelId conversation_id = 0; + std::string file_path; + + try { + for(int index = 0; index < count; index++) { + if(names[index] == "conversation_id") + conversation_id += stoll(values[index]); + else if(names[index] == "file_path") + file_path += values[index]; + } + } catch(std::exception& ex) { + logError(ref_server->getServerId(), "[Conversations] Failed to parse conversation entry! Exception: {}", ex.what()); + return 0; + } + + auto conversation = make_shared(_this, conversation_id, file_path); + conversation->set_ref_self(conversation); + string error; + if(!conversation->initialize(error)) { + logError(ref_server->getServerId(), "[Conversations] Failed to load conversation for channel {}: {}. Conversation is in volatile mode", conversation_id, error); + } + this->_conversations.push_back(conversation); + return 0; + }); + LOG_SQL_CMD(result); + debugMessage(ref_server->getServerId(), "[Conversations] Loaded {} conversations", this->_conversations.size()); +} + +bool ConversationManager::conversation_exists(ts::ChannelId channel_id) { + lock_guard lock(this->_conversations_lock); + return find_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr& conv){ return conv->channel_id() == channel_id; })!= this->_conversations.end(); +} + +std::shared_ptr ConversationManager::get(ts::ChannelId channel_id) { + unique_lock lock(this->_conversations_lock); + auto found = find_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr& conv){ return conv->channel_id() == channel_id; }); + if(found != this->_conversations.end()) + return *found; + return nullptr; +} + +std::shared_ptr ConversationManager::get_or_create(ts::ChannelId channel_id) { + unique_lock lock(this->_conversations_lock); + auto found = find_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr& conv){ return conv->channel_id() == channel_id; }); + if(found != this->_conversations.end()) + return *found; + + auto ref_server = this->ref_server(); + assert(ref_server); + + //TODO: More configurable! + auto file_path = "files/server_" + to_string(ref_server->getServerId()) + "/conversations/conversation_" + to_string(channel_id) + ".cvs"; + auto conversation = make_shared(this->_ref_this.lock(), channel_id, file_path); + conversation->set_ref_self(conversation); + this->_conversations.push_back(conversation); + lock.unlock(); + + { + auto sql_result = sql::command(ref_server->getSql(), "INSERT INTO `conversations` (`server_id`, `channel_id`, `conversation_id`, `file_path`) VALUES (:sid, :cid, :cid, :fpath)", + variable{":sid", ref_server->getServerId()}, variable{":cid", channel_id}, variable{":fpath", file_path}).executeLater(); + sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"}); + } + + string error; + if(!conversation->initialize(error)) + logError(ref_server->getServerId(), "[Conversations] Failed to load conversation for channel {}: {}. Conversation is in volatile mode", channel_id, error); + return conversation; +} + +void ConversationManager::delete_conversation(ts::ChannelId channel_id) { + { + lock_guard lock(this->_conversations_lock); + this->_conversations.erase(remove_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr& conv){ return conv->channel_id() == channel_id; }), this->_conversations.end()); + } + + auto ref_server = this->ref_server(); + if(!ref_server) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to delete conversation (server expired)", channel_id); + return; + } + + auto sql = ref_server->getSql(); + if(!sql) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to delete conversation (sql expired)", channel_id); + return; + } + + { + auto sql_result = sql::command(sql, "DELETE FROM `conversations` WHERE `server_id` = :sid AND `channel_id` = :cid AND `conversation_id` = :cid", variable{":sid", ref_server->getServerId()}, variable{":cid", channel_id}).executeLater(); + sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"}); + } + { + auto sql_result = sql::command(sql, "DELETE FROM `conversation_blocks` WHERE `server_id` = :sid AND `conversation_id` = :cid", variable{":sid", ref_server->getServerId()}, variable{":cid", channel_id}).executeLater(); + sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"}); + } + + //TODO: More configurable! + auto file_path = "files/server_" + to_string(ref_server->getServerId()) + "/conversations/conversation_" + to_string(channel_id) + ".cvs"; + try { + fs::remove(fs::u8path(file_path)); + } catch(fs::filesystem_error& error) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to delete data file ({}): {}|{}", channel_id, file_path, error.code().value(), error.what()); + } +} + +void ConversationManager::cleanup_channels() { + //TODO: Check if the channel for the conversation still exists! +} + +void ConversationManager::cleanup_cache() { + unique_lock lock(this->_conversations_lock); + std::vector> conversations{this->_conversations.begin(), this->_conversations.end()}; + lock.unlock(); + + for(auto& conversation : conversations) + conversation->cleanup_cache(); +} \ No newline at end of file diff --git a/server/src/manager/ConversationManager.h b/server/src/manager/ConversationManager.h new file mode 100644 index 0000000..d056ad1 --- /dev/null +++ b/server/src/manager/ConversationManager.h @@ -0,0 +1,246 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ts { + namespace server { + class TSServer; + namespace conversation { + struct ConversationEntry { + std::chrono::system_clock::time_point message_timestamp; + + ClientDbId sender_database_id; + std::string sender_unique_id; + std::string sender_name; + + std::string message; + }; + + namespace fio { + #pragma pack(push, 1) + struct BlockHeader { + static constexpr uint64_t HEADER_COOKIE = 0xC0FFEEBABE; + static constexpr uint64_t MAX_BLOCK_SIZE = 0xFF00; + + uint8_t version; /* every time 1 */ + uint64_t cookie /* const 0xC0FFEEBABE */; + uint8_t message_version; /* every time 1; Version for the containing messages */ + + uint32_t block_size; /* size of the full block (with data) incl. header! */ + uint32_t block_max_size; /* size of the full block incl. header! 0 if the block is located at the end and could be extended */ + + uint32_t message_count; /* message count */ + uint32_t last_message_offset; /* offset to the last message. Offset begins after header (first message has offset of 0) */ + union { + uint8_t flags; + struct { + uint8_t _padding : 5; + + bool message_encrypted: 1; /* 0x04 */ + bool meta_encrypted: 1; /* 0x02 */ /* Not implemented */ + bool finished: 1; /* 0x01 */ /* if this block is finally finished; Most the time a next block follows directly */ + }; + }; + + uint64_t first_message_timestamp; + uint64_t last_message_timestamp; + }; + static_assert(__BYTE_ORDER == __LITTLE_ENDIAN); + static_assert(sizeof(BlockHeader) == 43); + + struct MessageHeader { + static constexpr uint16_t HEADER_COOKIE = 0xAFFE; + uint16_t cookie; /* const 0xAFFE */ + uint16_t total_length; /* Total length of the full message data. Includes this header! */ + uint64_t message_timestamp; /* milliseconds since epoch */ + uint64_t sender_database_id; + uint8_t sender_unique_id_length; /* directly followed by this header */ + uint8_t sender_name_length; /* directly followed after the unique id */ + uint16_t message_length; /* directly followed after the name */ + uint16_t message_flags; /* could be later something like deleted etc.... */ + }; + static_assert(sizeof(MessageHeader) == 26); + #pragma pack(pop) + + struct IndexedMessageData { + MessageHeader header; + std::string sender_unique_id; + std::string sender_name; + std::string message; + }; + + struct IndexedMessage { + uint32_t offset; + std::chrono::system_clock::time_point timestamp; + + std::shared_ptr message_data; + }; + + struct IndexedBlock { + bool successfully; + /* + * message_index[0] := index of the message (including the header!) + * message_index[1] := timestamp of the message + */ + std::deque message_index; + std::mutex message_index_lock; + }; + } + + namespace db { + struct MessageBlock { + std::chrono::system_clock::time_point begin_timestamp; + std::chrono::system_clock::time_point end_timestamp; + + uint64_t block_offset; + + union { + uint16_t flags; + struct { + //Attention: Order matters! + bool flag__unused_0 : 1; + bool flag__unused_1 : 1; + bool flag__unused_2 : 1; + bool flag__unused_3 : 1; + bool flag__unused_4 : 1; + bool flag__unused_5 : 1; + bool flag__unused_6 : 1; + bool flag__unused_7 : 1; + bool flag__unused_8 : 1; + bool flag__unused_9 : 1; + bool flag__unused_10 : 1; + bool flag__unused_11 : 1; + + bool flag_finished : 1; + bool flag_finished_later : 1; /* if true the block has been closed because we've a newer block. */ + + bool flag_invalid : 1; /* this block is considered as invalid and will be ignored */ + bool flag_used : 1; + }; + }; + + std::shared_ptr block_header; + std::shared_ptr indexed_block; + }; + static_assert(__BYTE_ORDER == __LITTLE_ENDIAN); + } + + class ConversationManager; + class Conversation { + public: + Conversation(const std::shared_ptr& /* handle */, ChannelId /* channel id */, const std::string& /* file name */); + ~Conversation(); + + bool initialize(std::string& error); + void finalize(); + + inline ChannelId channel_id() { return this->_channel_id; } + /* if for some reason we're not able to open the file then we're in volatile mode */ + inline bool volatile_only() { return !this->file_handle; } + void cleanup_cache(); + + //void set_history_length(ssize_t /* save length */); + //ssize_t history_length(); + + inline std::chrono::system_clock::time_point last_message() { return this->_last_message_timestamp; } + void register_message(ClientDbId sender_database_id, const std::string& sender_unique_id, const std::string& sender_name, const std::string& message); + /* Lookup n messages since end timestamp. Upper time limit is begin timestamp */ + std::deque> message_history(const std::chrono::system_clock::time_point& /* end timestamp */, size_t /* limit */, const std::chrono::system_clock::time_point& /* begin timestamp */); + + std::deque> message_history(size_t limit) { + return this->message_history(std::chrono::system_clock::now(), limit, std::chrono::system_clock::time_point{}); + } + + ts_always_inline void set_ref_self(const std::shared_ptr& pointer) { + this->_ref_self = pointer; + } + private: + std::weak_ptr _ref_self; + std::weak_ptr _ref_handle; + ts_always_inline std::shared_ptr ref_handle() { + return this->_ref_handle.lock(); + } + inline ssize_t fread(void* target, size_t length, ssize_t index); + inline ssize_t fwrite(void* target, size_t length, ssize_t index, bool extend_file); + + /* block db functions */ + void db_save_block(const std::shared_ptr& /* block */); + std::shared_ptr db_create_block(uint64_t /* block offset */); + + /* message blocks */ + std::mutex message_block_lock; + /* blocks sorted desc (newest blocks last in list (push_back)) */ + std::deque> message_blocks; + /* Access last_message_block only within the write queue or while initializing! */ + std::shared_ptr last_message_block; /* is registered within message_blocks,but musnt be the last! */ + bool load_message_block_header(const std::shared_ptr& /* block */, std::string& /* error */); + bool load_message_block_index(const std::shared_ptr& /* block */, std::string& /* error */); + bool load_messages(const std::shared_ptr& /* block */, size_t /* begin index */, size_t /* end index */, std::string& /* error */); + + /* message blocks write stuff */ + std::shared_ptr create_new_block(std::string& /* error */); + void finish_block(const std::shared_ptr& /* block */, bool write_file); + bool write_block_header(const std::shared_ptr& /* header */, size_t /* header index */, std::string& /* error */); + + /* cached messages */ + std::mutex _last_messages_lock; + std::deque> _last_messages; + size_t _last_messages_limit = 100; /* cache max 100 messages */ + + /* write handler */ + std::mutex _write_loop_lock; + std::mutex _write_queue_lock; + std::deque> _write_queue; + std::shared_ptr> _write_event; + void process_write_queue(const std::chrono::system_clock::time_point&); + + /* basic file stuff */ + std::string file_name; + std::mutex file_handle_lock; + FILE* file_handle = nullptr; + ChannelId _channel_id; + + std::chrono::system_clock::time_point _last_message_timestamp; + }; + + class ConversationManager { + public: + ConversationManager(const std::shared_ptr& /* server */); + virtual ~ConversationManager(); + + void initialize(const std::shared_ptr& _this); + void cleanup_channels(); + void cleanup_cache(); + + bool conversation_exists(ChannelId /* channel */); + std::shared_ptr get(ChannelId /* channel */); + std::shared_ptr get_or_create(ChannelId /* channel */); + void delete_conversation(ChannelId /* channel */); + + inline const std::deque> conversations() { + std::lock_guard lock(this->_conversations_lock); + return this->_conversations; + } + + ts_always_inline std::shared_ptr ref_server() { + return this->_ref_server.lock(); + } + private: + std::weak_ptr _ref_this; + std::weak_ptr _ref_server; + + std::mutex _conversations_lock; + std::deque> _conversations; + + std::string file_path; + }; + } + } +} \ No newline at end of file diff --git a/server/src/manager/SqlDataManager.cpp b/server/src/manager/SqlDataManager.cpp index ee4358e..6d40c74 100644 --- a/server/src/manager/SqlDataManager.cpp +++ b/server/src/manager/SqlDataManager.cpp @@ -44,7 +44,7 @@ if(!result && result.msg().find(ignore) == string::npos){ #define RESIZE_COLUMN(tblName, rowName, size) up vote EXECUTE("Could not change column size", "ALTER TABLE " tblName " ALTER COLUMN " rowName " varchar(" size ")"); -#define CURRENT_VERSION 10 +#define CURRENT_VERSION 11 #define CLIENT_UID_LENGTH "64" #define CLIENT_NAME_LENGTH "128" @@ -345,6 +345,12 @@ ROLLBACK; } } this->changeVersion(10); + case 10: + CREATE_TABLE("conversations", "`server_id` INT, `channel_id` INT, `conversation_id` INT, `file_path` TEXT", command_append_utf8); + CREATE_TABLE("conversation_blocks", "`server_id` INT, `conversation_id` INT, `begin_timestamp` INT, `end_timestamp` INT, `block_offset` INT, `flags` INT", command_append_utf8); + CREATE_INDEX("conversations", "server_id"); + CREATE_INDEX2R("conversation_blocks", "server_id", "conversation_id"); + this->changeVersion(11); default: if(manager->getType() == sql::TYPE_SQLITE) { result = sql::command(this->sql(), "COMMIT;").execute(); diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index 600ef21..bed0721 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -121,7 +121,14 @@ void VoiceServer::triggerWrite(const std::shared_ptr& client) { } void VoiceServer::schedule_execute(const ts::server::VoiceClient *client) { - serverInstance->getVoiceServerManager()->get_executor_loop()->schedule(client->event_handle_packet); + auto vmanager = serverInstance->getVoiceServerManager(); + if(!vmanager) + return; + auto evloop = vmanager->get_executor_loop(); + if(!evloop) + return; + + evloop->schedule(client->event_handle_packet); } void VoiceServer::tickHandshakingClients() { diff --git a/shared b/shared index 6ee207a..d9ddc2c 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 6ee207aaadeb9a57a9e1fc4fd19ebeb1be81f1fd +Subproject commit d9ddc2c06d7731b14cae47f67a3414ce47e34bae