This commit is contained in:
WolverinDEV 2019-08-16 16:13:14 +02:00
parent f7cb93d768
commit bae6a56ed3
4 changed files with 245 additions and 56 deletions

View File

@ -588,6 +588,7 @@ namespace ts {
CommandResult handleCommandConversationHistory(Command&);
CommandResult handleCommandConversationFetch(Command&);
CommandResult handleCommandConversationMessageDelete(Command&);
CommandResult handleCommandLogView(Command&);
//CMD_TODO handleCommandLogAdd

View File

@ -7550,6 +7550,74 @@ CommandResult ConnectedClient::handleCommandConversationFetch(ts::Command &cmd)
}
CommandResult ConnectedClient::handleCommandConversationMessageDelete(ts::Command &cmd) {
CMD_REF_SERVER(ref_server);
CMD_CHK_AND_INC_FLOOD_POINTS(25);
auto conversation_manager = ref_server->conversation_manager();
std::shared_ptr<conversation::Conversation> current_conversation;
ChannelId current_conversation_id = 0;
for(size_t index = 0; index < cmd.bulkCount(); index++) {
auto &bulk = cmd[index];
if(!bulk.has("cid") || !bulk["cid"].castable<ChannelId>())
continue;
/* test if we have access to the conversation */
if(current_conversation_id != bulk["cid"].as<ChannelId>()) {
current_conversation_id = bulk["cid"].as<ChannelId>();
/* test if we're able to see the channel */
{
shared_lock channel_view_lock(this->channel_lock);
auto channel = this->channel_view()->find_channel(current_conversation_id);
if(!channel)
return findError("conversation_invalid_id");
}
/* test if there is a channel password or join power which denies that we see the conversation */
{
shared_lock channel_view_lock(ref_server->channel_tree_lock);
auto channel = ref_server->getChannelTree()->findChannel(current_conversation_id);
if(!channel)
return findError("conversation_invalid_id");
if(!bulk.has("cpw"))
bulk["cpw"] = "";
if (!channel->passwordMatch(bulk["cpw"], true))
if (!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_join_ignore_password, 1, channel, true))
return findError("channel_invalid_password");
if (!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_conversation_message_delete, 1, channel))
return CommandResultPermissionError{permission::b_channel_conversation_message_delete};
if(!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_ignore_join_power, 1, channel, true)) {
auto permission_granted = this->calculate_permission_value(permission::i_channel_join_power, channel->channelId());
if(!channel->permission_granted(permission::i_channel_needed_join_power, permission_granted, false))
return CommandResultPermissionError{permission::i_channel_needed_join_power};
}
}
}
current_conversation = conversation_manager->get(current_conversation_id);
if(!current_conversation) continue;
auto timestamp_begin = system_clock::time_point{} + milliseconds{bulk["timestamp_begin"]};
auto timestamp_end = system_clock::time_point{} + milliseconds{bulk.has("timestamp_begin") ? bulk["timestamp_begin"].as<uint64_t>() : 0};
auto limit = bulk.has("limit") ? bulk["limit"].as<uint64_t>() : 1;
if(limit > 100)
limit = 100;
auto delete_count = current_conversation->delete_messages(timestamp_begin, limit, timestamp_end, bulk["cldbid"]);
if(delete_count > 0) {
//TODO: Notify
}
}
return CommandResult::Success;
}

View File

@ -1,3 +1,5 @@
#include <utility>
#include "./ConversationManager.h"
#include "../InstanceHandler.h"
#include "../TSServer.h"
@ -215,7 +217,7 @@ __attribute__((optimize("-O3"), always_inline)) void apply_crypt(void* source, v
dest_ptr++;
}
}
Conversation::Conversation(const std::shared_ptr<ts::server::conversation::ConversationManager> &handle, ts::ChannelId channel_id, const std::string& file) : _ref_handle(handle), _channel_id(channel_id), file_name(file) { }
Conversation::Conversation(const std::shared_ptr<ts::server::conversation::ConversationManager> &handle, ts::ChannelId channel_id, std::string file) : _ref_handle(handle), _channel_id(channel_id), file_name(std::move(file)) { }
Conversation::~Conversation() {
this->finalize();
@ -565,7 +567,7 @@ bool Conversation::load_message_block_index(const std::shared_ptr<ts::server::co
return true;
auto index = make_shared<fio::IndexedBlock>();
index->successfully = false;
index->index_successful = false;
{
if(!this->load_message_block_header(block, error)) {
error = "failed to load block header: " + error;
@ -596,7 +598,7 @@ bool Conversation::load_message_block_index(const std::shared_ptr<ts::server::co
error = "failed to verify message header cookie at index " + to_string(offset);
return false;
}
index->message_index.emplace_back(fio::IndexedMessage{(uint32_t) (offset - block->block_offset), system_clock::time_point{} + milliseconds{header.message_timestamp}, std::shared_ptr<fio::IndexedMessageData>{nullptr}});
index->message_index.emplace_back(fio::IndexedBlockMessage{(uint32_t) (offset - block->block_offset), header, nullptr});
offset += header.total_length;
}
}
@ -627,11 +629,15 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
error = "failed to open file handle";
return false;
}
auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].offset, SEEK_SET);
auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].message_offset, SEEK_SET);
if(result == EINVAL) {
error = "failed to seek to begin of an indexed block read";
return false;
}
/*
* We dont need to lock the message_index_lock here because we never delete the messages and we just iterate with index
*/
while(index < end_index && index < indexed_block->message_index.size()) {
auto& message_data = indexed_block->message_index[index];
if(message_data.message_data) {
@ -639,19 +645,10 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
continue;
}
auto data = make_shared<fio::IndexedMessageData>();
if(this->fread(&data->header, sizeof(data->header), -1, false) != sizeof(data->header)) {
error = "failed to read message header at index " + to_string(index);
return false;
}
if(data->header.cookie != fio::MessageHeader::HEADER_COOKIE) {
error = "failed to verify message header at " + to_string(index);
return false;
}
if(header->meta_encrypted) {
auto meta_size = data->header.sender_unique_id_length + data->header.sender_name_length;
auto meta_size = message_data.header.sender_unique_id_length + message_data.header.sender_name_length;
auto meta_buffer = malloc(meta_size);
if(this->fread(meta_buffer, meta_size, -1, false) != meta_size) {
@ -660,14 +657,14 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
return false;
}
apply_crypt(meta_buffer, meta_buffer, meta_size, (block->block_offset ^ data->header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */
apply_crypt(meta_buffer, meta_buffer, meta_size, (block->block_offset ^ message_data.header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */
data->sender_unique_id.assign((char*) meta_buffer, data->header.sender_unique_id_length);
data->sender_name.assign((char*) meta_buffer + data->header.sender_unique_id_length, data->header.sender_name_length);
data->sender_unique_id.assign((char*) meta_buffer, message_data.header.sender_unique_id_length);
data->sender_name.assign((char*) meta_buffer + message_data.header.sender_unique_id_length, message_data.header.sender_name_length);
free(meta_buffer);
} else {
data->sender_unique_id.resize(data->header.sender_unique_id_length);
data->sender_name.resize(data->header.sender_name_length);
data->sender_unique_id.resize(message_data.header.sender_unique_id_length);
data->sender_name.resize(message_data.header.sender_name_length);
if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1, false) != data->sender_unique_id.length()) {
error = "failed to read message sender unique id at " + to_string(index);
@ -680,14 +677,14 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
}
}
data->message.resize(data->header.message_length);
data->message.resize(message_data.header.message_length);
if(this->fread(data->message.data(), data->message.length(), -1, false) != data->message.length()) {
error = "failed to read message id at " + to_string(index);
return false;
}
if(header->message_encrypted)
apply_crypt(data->message.data(), data->message.data(), data->message.size(), block->block_offset ^ data->header.message_timestamp);
apply_crypt(data->message.data(), data->message.data(), data->message.size(), block->block_offset ^ message_data.header.message_timestamp);
message_data.message_data = data;
index++;
@ -907,7 +904,7 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi
auto indexed_block = this->last_message_block->indexed_block;
if(indexed_block) {
lock_guard lock(indexed_block->message_index_lock);
indexed_block->message_index.push_back(fio::IndexedMessage{(uint32_t) (entry_offset - this->last_message_block->block_offset), write_entry->message_timestamp, nullptr});
indexed_block->message_index.push_back(fio::IndexedBlockMessage{(uint32_t) (entry_offset - this->last_message_block->block_offset), {write_header}, nullptr});
}
}
@ -1018,6 +1015,7 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
if(message_count == 0)
return result;
bool count_deleted = false;
/* first try to fillout the result with the cached messages */
{
lock_guard lock(this->_last_messages_lock);
@ -1027,6 +1025,8 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
continue;
if(begin_timestamp.time_since_epoch().count() != 0 && (*it)->message_timestamp < begin_timestamp)
return result;
if((*it)->flag_message_deleted && !count_deleted)
continue;
result.push_back(*it);
if(--message_count == 0)
@ -1043,7 +1043,8 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
if(!ref_server)
return result;
auto timestamp = result.empty() ? end_timestamp : result.back()->message_timestamp;
auto begin_timestamp_ms = chrono::floor<milliseconds>(begin_timestamp.time_since_epoch()).count();
auto timestamp_ms = result.empty() ? chrono::floor<milliseconds>(end_timestamp.time_since_epoch()).count() : chrono::floor<milliseconds>(result.back()->message_timestamp.time_since_epoch()).count();
unique_lock lock(this->message_block_lock);
auto rit = this->message_blocks.end();
@ -1051,7 +1052,7 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
bool found = false;
do {
rit--;
if((*rit)->begin_timestamp < timestamp) {
if(chrono::floor<milliseconds>((*rit)->begin_timestamp.time_since_epoch()).count() < timestamp_ms) {
found = true;
break; /* we found the first block which is created before the point we're searching from */
}
@ -1085,7 +1086,7 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
auto block_found = false;
do {
rmid--;
if((*rmid).timestamp < timestamp) {
if(rmid->header.message_timestamp < timestamp_ms) {
block_found = true;
break; /* we found the first block which is created before the point we're searching from */
}
@ -1099,34 +1100,31 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
continue;
}
do {
if(rmid->header.flag_deleted && !count_deleted)
continue;
if(rmid->header.message_timestamp >= timestamp_ms)
continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */
if(begin_timestamp.time_since_epoch().count() != 0 && rmid->header.message_timestamp < begin_timestamp_ms)
return result;
auto data = rmid->message_data;
if(!data)
continue;
if(begin_timestamp.time_since_epoch().count() != 0 && rmid->timestamp < begin_timestamp)
return result;
if(rmid->timestamp >= timestamp)
continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */
/*
std::chrono::system_clock::time_point message_timestamp;
ClientDbId sender_database_id;
std::string sender_unique_id;
std::string sender_name;
std::string message;
*/
result.push_back(make_shared<ConversationEntry>(ConversationEntry{
rmid->timestamp,
system_clock::time_point{} + milliseconds{rmid->header.message_timestamp},
(ClientDbId) data->header.sender_database_id,
(ClientDbId) rmid->header.sender_database_id,
data->sender_unique_id,
data->sender_name,
data->message
data->message,
rmid->header.flag_deleted
}));
timestamp = rmid->timestamp;
timestamp_ms = rmid->header.message_timestamp;
if(--message_count == 0)
return result;
} while(rmid-- != index->message_index.begin());
@ -1138,6 +1136,124 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
return result;
}
//TODO: May move the IO write part to the write queue?
size_t Conversation::delete_messages(const std::chrono::system_clock::time_point &end_timestamp, size_t message_count, const std::chrono::system_clock::time_point &begin_timestamp, ts::ClientDbId cldbid) {
size_t delete_count_volatile = 0, delete_count = 0;
if(message_count == 0)
return 0;
/* first try to fillout the result with the cached messages */
{
lock_guard lock(this->_last_messages_lock);
for(auto it = this->_last_messages.rbegin(); it != this->_last_messages.rend(); it++) {
if((*it)->message_timestamp > end_timestamp) /* message has been send after the search timestamp */
continue;
if(begin_timestamp.time_since_epoch().count() != 0 && (*it)->message_timestamp < begin_timestamp)
break;
if(cldbid != 0 && (*it)->sender_database_id != cldbid)
continue;
(*it)->flag_message_deleted = false;
if(++delete_count_volatile >= message_count)
break;
}
}
/* TODO: Remove from write queue */
if(!this->volatile_only()) {
auto handle = this->_ref_handle.lock();
if(!handle)
return delete_count_volatile;
auto ref_server = handle->ref_server();
if(!ref_server)
return delete_count_volatile;
auto begin_timestamp_ms = chrono::floor<milliseconds>(begin_timestamp.time_since_epoch()).count();
auto timestamp_ms = chrono::floor<milliseconds>(end_timestamp.time_since_epoch()).count();
unique_lock lock(this->message_block_lock);
auto rit = this->message_blocks.end();
if(rit != this->message_blocks.begin()) {
bool found = false;
do {
rit--;
if(chrono::floor<milliseconds>((*rit)->begin_timestamp.time_since_epoch()).count() < timestamp_ms) {
found = true;
break; /* we found the first block which is created before the point we're searching from */
}
} while(rit != this->message_blocks.begin());
string error;
if(found) {
vector<shared_ptr<db::MessageBlock>> relevant_entries{this->message_blocks.begin(), ++rit};
lock.unlock();
auto _rit = --relevant_entries.end();
do {
auto block = *_rit;
/* lets search for messages */
if(!this->load_message_block_index(block, error)) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to load message block {} for message delete: {}", this->_channel_id, block->block_offset, error);
continue;
}
auto index = (*_rit)->indexed_block;
if(!index) {
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to reference indexed block within message block.", this->_channel_id);
continue;
}
lock_guard index_lock{index->message_index_lock};
auto rmid = index->message_index.end();
if(rmid == index->message_index.begin())
continue; /* Empty block? Funny */
auto block_found = false;
do {
rmid--;
if(rmid->header.message_timestamp < timestamp_ms) {
block_found = true;
break; /* we found the first block which is created before the point we're searching from */
}
} while(rmid != index->message_index.begin());
if(!block_found)
continue;
do {
if(rmid->header.message_timestamp >= timestamp_ms)
continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */
if(begin_timestamp.time_since_epoch().count() != 0 && rmid->header.message_timestamp < begin_timestamp_ms)
return max(delete_count, delete_count_volatile);
if(cldbid != 0 && rmid->header.sender_database_id != cldbid)
continue;
if(!rmid->header.flag_deleted) {
rmid->header.flag_deleted = true;
auto offset = block->block_offset + rmid->message_offset;
if(this->fwrite(&rmid->header, sizeof(rmid->header), offset, false, true) != sizeof(rmid->header))
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to save message flags.", this->_channel_id);
}
timestamp_ms = rmid->header.message_timestamp;
if(++delete_count >= message_count)
return max(delete_count, delete_count_volatile);
} while(rmid-- != index->message_index.begin());
} while(_rit-- != relevant_entries.begin());
}
}
}
return max(delete_count, delete_count_volatile);
}
ConversationManager::ConversationManager(const std::shared_ptr<ts::server::TSServer> &server) : _ref_server(server) { }
ConversationManager::~ConversationManager() { }

View File

@ -21,6 +21,7 @@ namespace ts {
std::string sender_name;
std::string message;
bool flag_message_deleted;
};
namespace fio {
@ -64,32 +65,33 @@ namespace ts {
uint8_t sender_unique_id_length; /* directly followed by this header */
uint8_t sender_name_length; /* directly followed after the unique id */
uint16_t message_length; /* directly followed after the name */
union {
uint16_t message_flags; /* could be later something like deleted etc.... */
struct {
uint16_t _flags_padding: 15;
bool flag_deleted: 1;
};
};
};
static_assert(sizeof(MessageHeader) == 26);
#pragma pack(pop)
struct IndexedMessageData {
MessageHeader header;
std::string sender_unique_id;
std::string sender_name;
std::string message;
};
struct IndexedMessage {
uint32_t offset;
std::chrono::system_clock::time_point timestamp;
struct IndexedBlockMessage {
uint32_t message_offset;
MessageHeader header;
std::shared_ptr<IndexedMessageData> message_data;
};
struct IndexedBlock {
bool successfully;
/*
* message_index[0] := index of the message (including the header!)
* message_index[1] := timestamp of the message
*/
std::deque<IndexedMessage> message_index;
bool index_successful;
std::deque<IndexedBlockMessage> message_index;
std::mutex message_index_lock;
};
}
@ -135,7 +137,7 @@ namespace ts {
class ConversationManager;
class Conversation {
public:
Conversation(const std::shared_ptr<ConversationManager>& /* handle */, ChannelId /* channel id */, const std::string& /* file name */);
Conversation(const std::shared_ptr<ConversationManager>& /* handle */, ChannelId /* channel id */, std::string /* file name */);
~Conversation();
bool initialize(std::string& error);
@ -158,6 +160,8 @@ namespace ts {
return this->message_history(std::chrono::system_clock::now(), limit, std::chrono::system_clock::time_point{});
}
size_t delete_messages(const std::chrono::system_clock::time_point& /* end timestamp */, size_t /* limit */, const std::chrono::system_clock::time_point& /* begin timestamp */, ClientDbId /* database id */);
ts_always_inline void set_ref_self(const std::shared_ptr<Conversation>& pointer) {
this->_ref_self = pointer;
}