Teaspeak-Server/server/src/manager/ConversationManager.cpp

975 lines
38 KiB
C++
Raw Normal View History

#include "./ConversationManager.h"
#include "../InstanceHandler.h"
#include "../TSServer.h"
#include <experimental/filesystem>
#include <log/LogUtils.h>
#include <misc/sassert.h>
/* for the file */
#include <sys/types.h>
#include <unistd.h>
using namespace std;
using namespace std::chrono;
using namespace ts;
using namespace ts::server;
using namespace ts::server::conversation;
namespace fs = std::experimental::filesystem;
Conversation::Conversation(const std::shared_ptr<ts::server::conversation::ConversationManager> &handle, ts::ChannelId channel_id, const std::string& file) : _ref_handle(handle), _channel_id(channel_id), file_name(file) { }
Conversation::~Conversation() {
this->finalize();
}
bool Conversation::initialize(std::string& error) {
auto ref_self = this->_ref_self.lock();
assert(ref_self);
auto handle = this->_ref_handle.lock();
assert(handle);
auto ref_server = handle->ref_server();
if(!ref_server) {
error = "invalid server ref";
return false;
}
auto file = fs::u8path(this->file_name);
if(!fs::is_directory(file.parent_path())) {
debugMessage(ref_server->getServerId(), "[Conversations] Creating conversation containing directory {}", file.parent_path().string());
try {
fs::create_directories(file.parent_path());
} catch(fs::filesystem_error& ex) {
error = "failed to create data directories (" + to_string(ex.code().value()) + "|" + string(ex.what()) + ")";
return false;
}
}
this->file_handle = fopen(this->file_name.c_str(), fs::exists(file) ? "r+" : "w+");
if(!this->file_handle) {
error = "failed to open file";
return false;
}
setbuf(this->file_handle, nullptr); /* we're doing random access (a buffer is useless here) */
auto sql = ref_server->getSql();
auto result = sql::command(sql, "SELECT `begin_timestamp`, `end_timestamp`, `block_offset`, `flags` FROM `conversation_blocks` WHERE `server_id` = :sid AND `conversation_id` = :cid",
variable{":sid", ref_server->getServerId()}, variable{":cid", this->_channel_id}).query([&](int count, std::string* values, std::string* names) {
std::chrono::system_clock::time_point begin_timestamp{}, end_timestamp{};
uint64_t block_offset = 0;
uint16_t flags = 0;
try {
for(int index = 0; index < count; index++) {
if(names[index] == "begin_timestamp")
begin_timestamp += milliseconds(stoll(values[index]));
else if(names[index] == "end_timestamp")
end_timestamp += milliseconds(stoll(values[index]));
else if(names[index] == "block_offset")
block_offset = stoull(values[index]);
else if(names[index] == "flags")
flags = (uint16_t) stoll(values[index]);
}
} catch(std::exception& ex) {
logError(ref_server->getServerId(), "[Conversations] Failed to parse conversation block entry! Exception: {}", ex.what());
return 0;
}
auto block = make_shared<db::MessageBlock>(db::MessageBlock{
begin_timestamp,
end_timestamp,
block_offset,
{flags},
nullptr,
nullptr
});
/* we dont load invalid blocks */
if(block->flag_invalid)
return 0;
this->message_blocks.push_back(block);
return 0;
});
LOG_SQL_CMD(result);
/* lets find the last block */
if(!this->message_blocks.empty()) {
debugMessage(ref_server->getServerId(), "[Conversations][{}] Loaded {} blocks. Trying to find last block.", this->_channel_id, this->message_blocks.size());
deque<shared_ptr<db::MessageBlock>> open_blocks;
for(auto& block : this->message_blocks)
if(!block->flag_finished)
open_blocks.push_back(block);
logTrace(ref_server->getServerId(), "[Conversations][{}] Found {} unfinished blocks. Searching for the \"latest\" and closing all previous blocks.", this->_channel_id, open_blocks.size());
shared_ptr<db::MessageBlock> latest_block;
const auto calculate_latest_block = [&](bool open_only) {
latest_block = nullptr;
for(auto& block : open_blocks) {
if(block->flag_invalid)
continue;
if(!latest_block || latest_block->begin_timestamp < block->begin_timestamp) {
if(latest_block) {
latest_block->flag_finished_later = true;
latest_block->flag_finished = true;
this->db_save_block(latest_block);
}
latest_block = block;
}
}
};
calculate_latest_block(true);
if(latest_block) {
logTrace(ref_server->getServerId(), "[Conversations][{}] Found a latest block at index {}. Verify block with file.", this->_channel_id, latest_block->block_offset);
const auto verify_block = [&] {
auto result = fseek(this->file_handle, 0, SEEK_END);
if(result != 0) {
error = "failed to seek to the end (" + to_string(result) + " | " + to_string(errno) + ")";
return;
}
auto file_size = ftell(this->file_handle);
if(file_size < 0) {
error = "failed to tell the end position (" + to_string(file_size) + " | " + to_string(errno) + ")";
return;
}
logTrace(ref_server->getServerId(), "[Conversations][{}] File total size {}, last block index {}", this->_channel_id, file_size, latest_block->block_offset);
if(file_size < (latest_block->block_offset + sizeof(fio::BlockHeader))) {
latest_block->flag_finished_later = true;
latest_block->flag_invalid = true;
this->finish_block(latest_block, false);
logTrace(ref_server->getServerId(), "[Conversations][{}] File total size is less than block requires. Appending a new block at the end of the file.", this->_channel_id, latest_block->block_offset);
latest_block = nullptr;
return;
}
if(!this->load_message_block_header(latest_block, error)) {
latest_block->flag_finished_later = true;
latest_block->flag_invalid = true;
this->finish_block(latest_block, false);
logTrace(ref_server->getServerId(), "[Conversations][{}] Failed to load latest block at file index {}: {}. Appending an new one.", this->_channel_id, latest_block->block_offset, error);
error = "";
latest_block = nullptr;
return;
}
/* We've a valid last block. Now the general write function could decide if we want a new block. */
this->last_message_block = latest_block;
logTrace(ref_server->getServerId(), "[Conversations][{}] Last db saved block valid. Reusing it.", this->_channel_id, latest_block->block_offset, error);
};
verify_block();
if(!error.empty()) {
latest_block->flag_finished_later = true;
latest_block->flag_invalid = true;
this->finish_block(latest_block, false);
logError(ref_server->getServerId(), "[Conversations][{}] Could not verify last block. Appending a new one at the end of the file.", this->_channel_id);
latest_block = nullptr;
}
error = "";
} else {
logTrace(ref_server->getServerId(), "[Conversations][{}] Found no open last block. Using a new one.", this->_channel_id);
}
} else {
debugMessage(ref_server->getServerId(), "[Conversations][{}] Found no blocks. Creating new at the end of the file.", this->_channel_id, this->message_blocks.size());
}
std::stable_sort(this->message_blocks.begin(), this->message_blocks.end(), [](const shared_ptr<db::MessageBlock>& a, const shared_ptr<db::MessageBlock>& b) { return a->begin_timestamp < b->begin_timestamp; });
this->_write_event = make_shared<event::ProxiedEventEntry<Conversation>>(ts::event::ProxiedEventEntry<Conversation>(ref_self, &Conversation::process_write_queue));
/* set the last message timestamp property */
{
auto last_message = this->message_history(1);
if(!last_message.empty())
this->_last_message_timestamp = last_message.back()->message_timestamp;
else
this->_last_message_timestamp = system_clock::time_point{};
}
return true;
}
void Conversation::finalize() {
this->_write_event = nullptr; /* we dont want to schedule/execute new events! */
this->_write_loop_lock.lock(); /* wait until current write proceed */
this->_write_loop_lock.unlock();
//TODO: May flush?
{
lock_guard lock(this->message_block_lock);
this->message_blocks.clear();
}
}
void Conversation::cleanup_cache() {
//FIXME: Implement this shit here!
}
ssize_t Conversation::fread(void *target, size_t length, ssize_t index) {
if(length == 0)
return 0;
lock_guard file_lock(this->file_handle_lock);
if(index >= 0) {
auto result = fseek(this->file_handle, index, SEEK_SET);
if(result < 0)
return -2;
}
size_t total_read = 0;
while(total_read < length) {
auto read = ::fread_unlocked((char*) target + total_read, 1, length - total_read, this->file_handle);
if(read <= 0)
return -1;
total_read += read;
}
return total_read;
}
ssize_t Conversation::fwrite(void *target, size_t length, ssize_t index, bool extend_file) {
if(length == 0)
return 0;
extend_file = false; /* fseek does the job good ad well */
lock_guard file_lock(this->file_handle_lock);
if(index >= 0) {
auto result = extend_file ? lseek(fileno(this->file_handle), index, SEEK_SET) : fseek(this->file_handle, index, SEEK_SET);
if(result < 0)
return -2;
}
size_t total_written = 0;
while(total_written < length) {
auto written = ::fwrite_unlocked((char*) target + total_written, 1, length - total_written, this->file_handle);
if(written <= 0)
return -1;
total_written += written;
}
return total_written;
}
bool Conversation::load_message_block_header(const std::shared_ptr<ts::server::conversation::db::MessageBlock> &block, std::string &error) {
if(block->block_header)
return true;
auto block_header = make_unique<fio::BlockHeader>();
if(this->fread(&*block_header, sizeof(*block_header), block->block_offset) != sizeof(*block_header)) {
error = "failed to read block header";
return false;
}
if(block_header->version != 1) {
error = "block version missmatch (block version: " + to_string(block_header->version) + ", current version: 1)";
return false;
}
if(block_header->cookie != fio::BlockHeader::HEADER_COOKIE) {
error = "block cookie missmatch";
return false;
}
block->block_header = move(block_header);
return true;
}
bool Conversation::load_message_block_index(const std::shared_ptr<ts::server::conversation::db::MessageBlock> &block, std::string& error) {
if(block->indexed_block)
return true;
auto index = make_shared<fio::IndexedBlock>();
index->successfully = false;
{
if(!this->load_message_block_header(block, error)) {
error = "failed to load block header: " + error;
return false;
}
auto block_header = block->block_header;
if(!block_header) {
error = "failed to reference block header ";
return false;
}
if(block_header->block_size > fio::BlockHeader::MAX_BLOCK_SIZE) {
error = "block contains too many messages (" + to_string(block_header->block_size) + ")";
return false;
}
size_t offset = block->block_offset;
offset += sizeof(fio::BlockHeader);
size_t max_offset = block->block_offset + block_header->block_size; /* block_size := Written size, must be smaller or equal to the max size, except max size is 0 */
fio::MessageHeader header{};
while(offset < max_offset) {
if(this->fread(&header, sizeof(header), offset) != sizeof(header)) {
error = "failed to read message header at index" + to_string(offset);
return false;
}
if(header.cookie != fio::MessageHeader::HEADER_COOKIE) {
error = "failed to verify message header cookie at index " + to_string(offset);
return false;
}
index->message_index.emplace_back(fio::IndexedMessage{(uint32_t) (offset - block->block_offset), system_clock::time_point{} + milliseconds{header.message_timestamp}, std::shared_ptr<fio::IndexedMessageData>{nullptr}});
offset += header.total_length;
}
}
block->indexed_block = index;
return true;
}
bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block, size_t index, size_t end_index, std::string &error) {
if(!this->load_message_block_index(block, error)) {
error = "failed to index block: " + error;
return false;
}
auto indexed_block = block->indexed_block;
auto header = block->block_header;
if(!indexed_block || !header) {
error = "failed to reference required data";
return false;
}
/* Note: We dont lock message_index_lock here because the write thread only increases the list and dont take stuff away where we could pointing at! */
if(index >= indexed_block->message_index.size())
return true;
auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].offset, SEEK_SET);
if(result == EINVAL) {
error = "failed to seek to begin of an indexed block read";
return false;
}
while(index < end_index && index < indexed_block->message_index.size()) {
auto& message_data = indexed_block->message_index[index];
if(message_data.message_data) {
index++;
continue;
}
auto data = make_shared<fio::IndexedMessageData>();
if(this->fread(&data->header, sizeof(data->header), -1) != sizeof(data->header)) {
error = "failed to read message header at index " + to_string(index);
return false;
}
if(data->header.cookie != fio::MessageHeader::HEADER_COOKIE) {
error = "failed to verify message header at " + to_string(index);
return false;
}
data->sender_unique_id.resize(data->header.sender_unique_id_length);
data->sender_name.resize(data->header.sender_name_length);
data->message.resize(data->header.message_length);
if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1) != data->sender_unique_id.length()) {
error = "failed to read message sender unique id at " + to_string(index);
return false;
}
if(this->fread(data->sender_name.data(), data->sender_name.length(), -1) != data->sender_name.length()) {
error = "failed to read message sender name id at " + to_string(index);
return false;
}
if(this->fread(data->message.data(), data->message.length(), -1) != data->message.length()) {
error = "failed to read message id at " + to_string(index);
return false;
}
if(header->message_encrypted) {
uint64_t crypt_key = block->block_offset ^ data->header.message_timestamp;
size_t length_left = data->message.size();
auto ptr = (char*) data->message.data();
while(length_left >= 8) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*(uint64_t*) ptr ^= crypt_key;
ptr += 8;
length_left -= 8;
}
while(length_left > 0) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*ptr ^= (uint8_t) crypt_key;
length_left--;
ptr++;
}
}
message_data.message_data = data;
index++;
}
return true;
}
void Conversation::finish_block(const std::shared_ptr<ts::server::conversation::db::MessageBlock> &block, bool write_file) {
if(block->flag_finished)
return;
auto handle = this->_ref_handle.lock();
sassert(handle);
if(!handle)
return;
auto ref_server = handle->ref_server();
if(!ref_server)
return;
block->flag_finished = true;
if(write_file) {
string error;
auto result = this->load_message_block_header(block, error);
auto header = block->block_header;
result &= !!header; /* only success if we really have a header */
if(result) {
if(header->block_max_size == 0) {
header->block_max_size = header->block_size;
header->finished = true;
if(!this->write_block_header(header, block->block_offset, error)) {
logError(ref_server->getServerId(), "Failed to finish block because block header could not be written: {}", error);
block->flag_invalid = true; /* because we cant set the block size we've to declare that block as invalid */
}
}
} else {
logError(ref_server->getServerId(), "Failed to finish block because block header could not be set: {}", error);
block->flag_invalid = true; /* because we cant set the block size we've to declare that block as invalid */
}
} else {
block->flag_invalid = true; /* because we dont write the block we cant ensure a valid block */
}
this->db_save_block(block);
}
bool Conversation::write_block_header(const std::shared_ptr<fio::BlockHeader> &header, size_t index, std::string &error) {
auto code = this->fwrite(&*header, sizeof(fio::BlockHeader), index, false);
if(code == sizeof(fio::BlockHeader))
return true;
error = "write returned " + to_string(code);
return false;
}
void Conversation::process_write_queue(const std::chrono::system_clock::time_point &scheduled_time) {
unique_lock write_lock(this->_write_loop_lock, try_to_lock);
if(!write_lock.owns_lock()) /* we're already writing if this lock fails */
return;
unique_lock write_queue_lock(this->_write_queue_lock, defer_lock);
std::shared_ptr<ConversationEntry> write_entry;
fio::MessageHeader write_header{};
std::shared_ptr<fio::BlockHeader> block_header;
auto handle = this->_ref_handle.lock();
sassert(handle);
if(!handle)
return;
auto ref_server = handle->ref_server();
if(!ref_server)
return;
write_header.cookie = fio::MessageHeader::HEADER_COOKIE;
write_header.message_flags = 0;
while(true) {
write_queue_lock.lock();
if(this->_write_queue.empty())
break;
write_entry = this->_write_queue.front();
this->_write_queue.pop_front();
write_queue_lock.unlock();
/* calculate the write message total length */
write_header.message_length = (uint16_t) min(write_entry->message.size(), (size_t) (65536 - 1));
write_header.sender_name_length = (uint8_t) min(write_entry->sender_name.size(), (size_t) 255);
write_header.sender_unique_id_length = (uint8_t) min(write_entry->sender_unique_id.size(), (size_t) 255);
write_header.total_length = sizeof(write_header) + write_header.sender_unique_id_length + write_header.sender_name_length + write_header.message_length;
/* verify last block */
if(this->last_message_block) {
block_header = this->last_message_block->block_header;
if(!block_header) {
logError(ref_server->getServerId(), "[Conversations][{}] Current last block contains no header! Try to finish it and creating a new one.", this->_channel_id);
this->finish_block(this->last_message_block, true);
this->last_message_block = nullptr;
} else if(this->last_message_block->flag_finished)
this->last_message_block = nullptr;
else {
if(this->last_message_block->begin_timestamp + hours(24) < scheduled_time) {
debugMessage(ref_server->getServerId(), "[Conversations][{}] Beginning new block. Old block is older than 24hrs. ({})", this->_channel_id, this->last_message_block->begin_timestamp.time_since_epoch().count());
this->finish_block(this->last_message_block, true);
this->last_message_block = nullptr;
} else if((block_header->block_max_size != 0 && block_header->block_size + write_header.total_length >= block_header->block_max_size) || block_header->block_size > fio::BlockHeader::MAX_BLOCK_SIZE){
debugMessage(ref_server->getServerId(), "[Conversations][{}] Beginning new block. Old block would exceed his space (Current index: {}, Max index: {}, Soft cap: {}, Message size: {}).",
this->_channel_id, block_header->block_size, block_header->block_max_size, fio::BlockHeader::MAX_BLOCK_SIZE, write_header.total_length);
this->finish_block(this->last_message_block, true);
this->last_message_block = nullptr;
} else if(block_header->message_version != 1){
debugMessage(ref_server->getServerId(), "[Conversations][{}] Beginning new block. Old block had another message version (Current version: {}, Block version: {}).", this->_channel_id, 1, block_header->message_version);
this->finish_block(this->last_message_block, true);
this->last_message_block = nullptr;
}
}
}
/* test if we have a block or create a new one at the begin of the file */
if(!this->last_message_block) {
//Note: If we reuse blocks we've to reorder them within message_blocks (newest blocks are at the end)
//TODO: Find "free" blocks and use them! (But do not use indirectly finished blocks, their max size could be invalid)
unique_lock file_lock(this->file_handle_lock);
auto result = fseek(this->file_handle, 0, SEEK_END);
if(result != 0) {
logError(ref_server->getServerId(), "[Conversations][{}] failed to seek to the end (" + to_string(result) + " " + to_string(errno) + "). Could not create new block. Dropping message!", this->_channel_id);
return;
}
auto file_size = ftell(this->file_handle);
if(file_size < 0) {
logError(ref_server->getServerId(), "[Conversations][{}] failed to tell the end position (" + to_string(result) + " " + to_string(errno) + "). Could not create new block. Dropping message!", this->_channel_id);
return;
}
file_lock.unlock();
this->last_message_block = this->db_create_block((uint64_t) file_size);
if(!this->last_message_block) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to create a new block within database. Dropping message!", this->_channel_id);
return;
}
block_header = make_shared<fio::BlockHeader>();
memset(&*block_header, 0, sizeof(fio::BlockHeader));
block_header->version = 1;
block_header->message_version = 1;
block_header->cookie = fio::BlockHeader::HEADER_COOKIE;
block_header->first_message_timestamp = (uint64_t) duration_cast<milliseconds>(write_entry->message_timestamp.time_since_epoch()).count();
block_header->block_size = sizeof(fio::BlockHeader);
//block_header->message_encrypted = true; /* May add some kind of hidden debug option? */
this->last_message_block->block_header = block_header;
}
auto entry_offset = this->last_message_block->block_offset + sizeof(fio::BlockHeader) + block_header->last_message_offset;
write_header.sender_database_id = write_entry->sender_database_id;
write_header.message_timestamp = (uint64_t) duration_cast<milliseconds>(write_entry->message_timestamp.time_since_epoch()).count();
block_header->last_message_timestamp = write_header.message_timestamp;
/* first write the header */
if(this->fwrite(&write_header, sizeof(write_header), entry_offset, true) != sizeof(write_header)) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message header. Dropping message!", this->_channel_id);
return;
}
entry_offset += sizeof(write_header);
/* then write the sender unique id */
if(this->fwrite(write_entry->sender_unique_id.data(), write_header.sender_unique_id_length, entry_offset, true) != write_header.sender_unique_id_length) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender unique id. Dropping message!", this->_channel_id);
return;
}
entry_offset += write_header.sender_unique_id_length;
/* then write the sender name */
if(this->fwrite(write_entry->sender_name.data(), write_header.sender_name_length, entry_offset, true) != write_header.sender_name_length) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender name. Dropping message!", this->_channel_id);
return;
}
entry_offset += write_header.sender_name_length;
/* then write the message */
bool message_result;
if(block_header->message_encrypted) {
uint64_t crypt_key = this->last_message_block->block_offset ^ write_header.message_timestamp;
size_t length_left = write_entry->message.size();
auto ptr = (char*) write_entry->message.data();
char* target_buffer = (char*) malloc(length_left);
char* target_buffer_ptr = target_buffer;
assert(target_buffer);
while(length_left >= 8) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*(uint64_t*) target_buffer_ptr = crypt_key;
ptr += 8;
target_buffer_ptr += 8;
length_left -= 8;
}
while(length_left > 0) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*target_buffer_ptr = *ptr ^ (uint8_t) crypt_key;
length_left--;
ptr++;
target_buffer_ptr++;
}
message_result = this->fwrite(target_buffer, write_header.message_length, entry_offset, true) == write_header.message_length;
free(target_buffer);
} else {
message_result = this->fwrite(write_entry->message.data(), write_header.message_length, entry_offset, true) == write_header.message_length;
}
if(!message_result) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message. Dropping message!", this->_channel_id);
return;
}
entry_offset += write_header.message_length;
block_header->last_message_offset = (uint32_t) (entry_offset - this->last_message_block->block_offset - sizeof(fio::BlockHeader));
block_header->block_size += write_header.total_length;
block_header->message_count += 1;
auto indexed_block = this->last_message_block->indexed_block;
if(indexed_block) {
lock_guard lock(indexed_block->message_index_lock);
indexed_block->message_index.push_back(fio::IndexedMessage{(uint32_t) (entry_offset - this->last_message_block->block_offset), write_entry->message_timestamp, nullptr});
}
}
if(write_header.total_length != 0) {/* will be set when at least one message has been written */
this->db_save_block(this->last_message_block);
string error;
if(!this->write_block_header(block_header, this->last_message_block->block_offset, error)) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to write block header after message write ({}). This could cause data loss!", this->_channel_id, error);
}
}
}
std::shared_ptr<db::MessageBlock> Conversation::db_create_block(uint64_t offset) {
auto result = make_shared<db::MessageBlock>();
result->block_offset = offset;
result->begin_timestamp = system_clock::now();
result->end_timestamp = system_clock::now();
result->flags = 0;
result->flag_used = true;
result->flag_invalid = true; /* first set it to invalid for the database so it becomes active as soon somebody uses it */
auto handle = this->_ref_handle.lock();
assert(handle);
auto ref_server = handle->ref_server();
if(!ref_server)
return nullptr;
auto sql = ref_server->getSql();
if(!sql)
return nullptr;
//`server_id` INT, `conversation_id` INT, `begin_timestamp` INT, `end_timestamp` INT, `block_offset` INT, `flags` INT
auto sql_result = sql::command(sql, "INSERT INTO `conversation_blocks` (`server_id`, `conversation_id`, `begin_timestamp`, `end_timestamp`, `block_offset`, `flags`) VALUES (:sid, :cid, :bt, :et, :bo, :f);",
variable{":sid", ref_server->getServerId()},
variable{":cid", this->_channel_id},
variable{":bt", duration_cast<milliseconds>(result->begin_timestamp.time_since_epoch()).count()},
variable{":et", duration_cast<milliseconds>(result->end_timestamp.time_since_epoch()).count()},
variable{":bo", offset},
variable{":f", result->flags}
).executeLater();
sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"});
{
lock_guard lock(this->message_block_lock);
this->message_blocks.push_back(result);
}
result->flag_invalid = false;
return result;
}
void Conversation::db_save_block(const std::shared_ptr<db::MessageBlock> &block) {
auto handle = this->_ref_handle.lock();
assert(handle);
auto ref_server = handle->ref_server();
if(!ref_server) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to update block db info (server expired)", this->_channel_id);
return;
}
auto sql = ref_server->getSql();
if(!sql) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to update block db info (sql expired)", this->_channel_id);
return;
}
auto sql_result = sql::command(sql, "UPDATE `conversation_blocks` SET `end_timestamp` = :et, `flags` = :f WHERE `server_id` = :sid AND `conversation_id` = :cid AND `begin_timestamp` = :bt AND `block_offset` = :bo",
variable{":sid", ref_server->getServerId()},
variable{":cid", this->_channel_id},
variable{":bt", duration_cast<milliseconds>(block->begin_timestamp.time_since_epoch()).count()},
variable{":et", duration_cast<milliseconds>(block->end_timestamp.time_since_epoch()).count()},
variable{":bo", block->block_offset},
variable{":f", block->flags}
).executeLater();
sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"});
}
void Conversation::register_message(ts::ClientDbId sender_database_id, const std::string &sender_unique_id, const std::string &sender_name, const std::string &message) {
auto entry = make_shared<ConversationEntry>();
entry->message_timestamp = system_clock::now();
this->_last_message_timestamp = entry->message_timestamp;
entry->message = message;
entry->sender_name = sender_name;
entry->sender_unique_id = sender_unique_id;
entry->sender_database_id = sender_database_id;
{
lock_guard lock(this->_last_messages_lock);
this->_last_messages.push_back(entry);
while(this->_last_messages.size() > this->_last_messages_limit)
this->_last_messages.pop_front(); /* TODO: Use a iterator for delete to improve performance */
}
if(!this->volatile_only()) {
{
lock_guard lock(this->_write_queue_lock);
this->_write_queue.push_back(entry);
}
auto executor = serverInstance->getConversationIo();
executor->schedule(this->_write_event);
}
}
std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(const std::chrono::system_clock::time_point &end_timestamp, size_t message_count, const std::chrono::system_clock::time_point &begin_timestamp) {
std::deque<std::shared_ptr<ConversationEntry>> result;
if(message_count == 0)
return result;
/* first try to fillout the result with the cached messages */
{
lock_guard lock(this->_last_messages_lock);
//TODO: May just insert the rest of the iterator instead of looping?
for(auto it = this->_last_messages.rbegin(); it != this->_last_messages.rend(); it++) {
if((*it)->message_timestamp > end_timestamp) /* message has been send after the search timestamp */
continue;
result.push_back(*it);
if(--message_count == 0)
return result;
}
}
if(!this->volatile_only()) {
auto timestamp = result.empty() ? end_timestamp : result.back()->message_timestamp;
unique_lock lock(this->message_block_lock);
auto rit = this->message_blocks.end();
if(rit != this->message_blocks.begin()) {
bool found = false;
do {
rit--;
if((*rit)->begin_timestamp < timestamp) {
found = true;
break; /* we found the first block which is created before the point we're searching from */
}
} while(rit != this->message_blocks.begin());
string error;
if(found) {
vector<shared_ptr<db::MessageBlock>> relevant_entries{this->message_blocks.begin(), ++rit};
lock.unlock();
auto _rit = --relevant_entries.end();
do {
auto block = *_rit;
/* lets search for messages */
if(!this->load_message_block_index(block, error)) {
//TODO: Log error
continue;
}
auto index = (*_rit)->indexed_block;
if(!index) {
//TODO Log error
continue;
}
lock_guard index_lock{index->message_index_lock};
auto rmid = index->message_index.end();
if(rmid == index->message_index.begin())
continue; /* Empty block? Funny */
auto block_found = false;
do {
rmid--;
if((*rmid).timestamp < timestamp) {
block_found = true;
break; /* we found the first block which is created before the point we're searching from */
}
} while(rmid != index->message_index.begin());
if(!block_found)
continue;
if(!this->load_messages(block, 0, std::distance(index->message_index.begin(), rmid) + 1, error)) {
//TODO: Log error
continue;
}
do {
auto data = rmid->message_data;
if(!data)
continue;
if(begin_timestamp.time_since_epoch().count() != 0 && rmid->timestamp < begin_timestamp)
return result;
/*
std::chrono::system_clock::time_point message_timestamp;
ClientDbId sender_database_id;
std::string sender_unique_id;
std::string sender_name;
std::string message;
*/
result.push_back(make_shared<ConversationEntry>(ConversationEntry{
rmid->timestamp,
(ClientDbId) data->header.sender_database_id,
data->sender_unique_id,
data->sender_name,
data->message
}));
if(--message_count == 0)
return result;
} while(rmid-- != index->message_index.begin());
} while(_rit-- != relevant_entries.begin());
}
}
}
return result;
}
ConversationManager::ConversationManager(const std::shared_ptr<ts::server::TSServer> &server) : _ref_server(server) { }
ConversationManager::~ConversationManager() { }
void ConversationManager::initialize(const std::shared_ptr<ConversationManager> &_this) {
assert(&*_this == this);
this->_ref_this = _this;
auto ref_server = this->ref_server();
assert(ref_server);
auto sql = ref_server->getSql();
assert(sql);
auto result = sql::command(sql, "SELECT `conversation_id`, `file_path` FROM `conversations` WHERE `server_id` = :sid", variable{":sid", ref_server->getServerId()}).query([&](int count, std::string* values, std::string* names) {
ChannelId conversation_id = 0;
std::string file_path;
try {
for(int index = 0; index < count; index++) {
if(names[index] == "conversation_id")
conversation_id += stoll(values[index]);
else if(names[index] == "file_path")
file_path += values[index];
}
} catch(std::exception& ex) {
logError(ref_server->getServerId(), "[Conversations] Failed to parse conversation entry! Exception: {}", ex.what());
return 0;
}
auto conversation = make_shared<Conversation>(_this, conversation_id, file_path);
conversation->set_ref_self(conversation);
string error;
if(!conversation->initialize(error)) {
logError(ref_server->getServerId(), "[Conversations] Failed to load conversation for channel {}: {}. Conversation is in volatile mode", conversation_id, error);
}
this->_conversations.push_back(conversation);
return 0;
});
LOG_SQL_CMD(result);
debugMessage(ref_server->getServerId(), "[Conversations] Loaded {} conversations", this->_conversations.size());
}
bool ConversationManager::conversation_exists(ts::ChannelId channel_id) {
lock_guard lock(this->_conversations_lock);
return find_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr<Conversation>& conv){ return conv->channel_id() == channel_id; })!= this->_conversations.end();
}
std::shared_ptr<Conversation> ConversationManager::get(ts::ChannelId channel_id) {
unique_lock lock(this->_conversations_lock);
auto found = find_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr<Conversation>& conv){ return conv->channel_id() == channel_id; });
if(found != this->_conversations.end())
return *found;
return nullptr;
}
std::shared_ptr<Conversation> ConversationManager::get_or_create(ts::ChannelId channel_id) {
unique_lock lock(this->_conversations_lock);
auto found = find_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr<Conversation>& conv){ return conv->channel_id() == channel_id; });
if(found != this->_conversations.end())
return *found;
auto ref_server = this->ref_server();
assert(ref_server);
//TODO: More configurable!
auto file_path = "files/server_" + to_string(ref_server->getServerId()) + "/conversations/conversation_" + to_string(channel_id) + ".cvs";
auto conversation = make_shared<Conversation>(this->_ref_this.lock(), channel_id, file_path);
conversation->set_ref_self(conversation);
this->_conversations.push_back(conversation);
lock.unlock();
{
auto sql_result = sql::command(ref_server->getSql(), "INSERT INTO `conversations` (`server_id`, `channel_id`, `conversation_id`, `file_path`) VALUES (:sid, :cid, :cid, :fpath)",
variable{":sid", ref_server->getServerId()}, variable{":cid", channel_id}, variable{":fpath", file_path}).executeLater();
sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"});
}
string error;
if(!conversation->initialize(error))
logError(ref_server->getServerId(), "[Conversations] Failed to load conversation for channel {}: {}. Conversation is in volatile mode", channel_id, error);
return conversation;
}
void ConversationManager::delete_conversation(ts::ChannelId channel_id) {
{
lock_guard lock(this->_conversations_lock);
this->_conversations.erase(remove_if(this->_conversations.begin(), this->_conversations.end(), [&](const shared_ptr<Conversation>& conv){ return conv->channel_id() == channel_id; }), this->_conversations.end());
}
auto ref_server = this->ref_server();
if(!ref_server) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to delete conversation (server expired)", channel_id);
return;
}
auto sql = ref_server->getSql();
if(!sql) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to delete conversation (sql expired)", channel_id);
return;
}
{
auto sql_result = sql::command(sql, "DELETE FROM `conversations` WHERE `server_id` = :sid AND `channel_id` = :cid AND `conversation_id` = :cid", variable{":sid", ref_server->getServerId()}, variable{":cid", channel_id}).executeLater();
sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"});
}
{
auto sql_result = sql::command(sql, "DELETE FROM `conversation_blocks` WHERE `server_id` = :sid AND `conversation_id` = :cid", variable{":sid", ref_server->getServerId()}, variable{":cid", channel_id}).executeLater();
sql_result.waitAndGetLater(LOG_SQL_CMD, {-1, "future error"});
}
//TODO: More configurable!
auto file_path = "files/server_" + to_string(ref_server->getServerId()) + "/conversations/conversation_" + to_string(channel_id) + ".cvs";
try {
fs::remove(fs::u8path(file_path));
} catch(fs::filesystem_error& error) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to delete data file ({}): {}|{}", channel_id, file_path, error.code().value(), error.what());
}
}
void ConversationManager::cleanup_channels() {
//TODO: Check if the channel for the conversation still exists!
}
void ConversationManager::cleanup_cache() {
unique_lock lock(this->_conversations_lock);
std::vector<std::shared_ptr<Conversation>> conversations{this->_conversations.begin(), this->_conversations.end()};
lock.unlock();
for(auto& conversation : conversations)
conversation->cleanup_cache();
}