A lot of query reworking

This commit is contained in:
WolverinDEV
2021-01-28 20:59:15 +01:00
parent 9a5aa1d42b
commit 902b1f3511
58 changed files with 1615 additions and 1012 deletions
+19
View File
@@ -0,0 +1,19 @@
//
// Created by WolverinDEV on 28/01/2021.
//
#include "RawCommand.h"
using namespace ts::server::command;
ReassembledCommand *ReassembledCommand::allocate(size_t size) {
auto instance = (ReassembledCommand*) malloc(sizeof(ReassembledCommand) + size);
instance->length_ = size;
instance->capacity_ = size;
instance->next_command = nullptr;
return instance;
}
void ReassembledCommand::free(ReassembledCommand *command) {
::free(command);
}
+48
View File
@@ -0,0 +1,48 @@
#pragma once
#include <cstdint>
#include <string_view>
#include <pipes/buffer.h>
namespace ts::server::command {
struct CommandFragment {
uint16_t packet_id{0};
uint16_t packet_generation{0};
uint8_t packet_flags{0};
uint32_t payload_length : 24;
pipes::buffer payload{};
CommandFragment() : payload_length{0} { }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload)
: packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default;
CommandFragment(CommandFragment&&) = default;
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
struct ReassembledCommand {
public:
static ReassembledCommand* allocate(size_t /* command length */);
static void free(ReassembledCommand* /* command */);
[[nodiscard]] inline size_t length() const { return this->length_; }
inline void set_length(size_t length) { assert(this->capacity_ >= length); this->length_ = length; }
[[nodiscard]] inline size_t capacity() const { return this->capacity_; }
[[nodiscard]] inline const char* command() const { return (const char*) this + sizeof(ReassembledCommand); }
[[nodiscard]] inline char* command() { return (char*) this + sizeof(ReassembledCommand); }
[[nodiscard]] inline std::string_view command_view() const { return std::string_view{this->command(), this->length()}; }
mutable ReassembledCommand* next_command; /* nullptr by default */
private:
explicit ReassembledCommand() = default;
size_t capacity_;
size_t length_;
};
}
@@ -0,0 +1,272 @@
//
// Created by WolverinDEV on 29/07/2020.
//
#include <utility>
#include <ThreadPool/ThreadHelper.h>
#include "./ServerCommandExecutor.h"
#include "src/client/voice/PacketDecoder.h"
#include "src/client/voice/VoiceClientConnection.h"
using namespace ts;
using namespace ts::server;
using namespace ts::server::command;
namespace ts::server {
struct ServerCommandQueueInner {
spin_mutex pending_commands_lock{};
command::ReassembledCommand* pending_commands_head{nullptr};
command::ReassembledCommand** pending_commands_tail{&pending_commands_head};
bool has_command_handling_scheduled{false};
~ServerCommandQueueInner() {
auto head = this->pending_commands_head;
while(head) {
auto cmd = head->next_command;
ReassembledCommand::free(head);
head = cmd;
}
}
void reset() {
std::unique_lock pc_lock{this->pending_commands_lock};
auto head = std::exchange(this->pending_commands_head, nullptr);
this->pending_commands_tail = &this->pending_commands_head;
this->has_command_handling_scheduled = false;
pc_lock.unlock();
while(head) {
auto cmd = head->next_command;
ReassembledCommand::free(head);
head = cmd;
}
}
/**
* @param command The target command to enqueue.
* Ownership will be taken.
* @returns `true` if command handling has already been schedules and `false` if not
*/
bool enqueue(ReassembledCommand *command){
std::lock_guard pc_lock{this->pending_commands_lock};
*this->pending_commands_tail = command;
this->pending_commands_tail = &command->next_command;
return std::exchange(this->has_command_handling_scheduled, true);
}
ReassembledCommand* pop_command(bool& more_pending) {
std::lock_guard pc_lock{this->pending_commands_lock};
auto result = this->pending_commands_head;
if(!result) {
more_pending = false;
this->has_command_handling_scheduled = false;
} else if(result->next_command) {
more_pending = true;
this->pending_commands_head = result->next_command;
} else {
/* We assume true here since we might get new commands while the handler itself is still handling our current result. */
more_pending = true;
this->pending_commands_head = nullptr;
this->pending_commands_tail = &this->pending_commands_head;
}
return result;
}
};
}
bool ServerCommandHandler::execute_handling() {
bool more_pending;
std::unique_ptr<ReassembledCommand, void(*)(ReassembledCommand*)> pending_command{nullptr, ReassembledCommand::free};
while(true) {
pending_command.reset(this->inner->pop_command(more_pending));
if(!pending_command) {
break;
}
try {
auto result = this->handle_command(std::string_view{pending_command->command(), pending_command->length()});
if(!result) {
/* flush all commands */
this->inner->reset();
break;
}
} catch (std::exception& ex) {
logCritical(LOG_GENERAL, "Exception reached command execution root! {}",ex.what());
}
break; /* Maybe handle more than one command? Maybe some kind of time limit? */
}
return more_pending;
}
ServerCommandQueue::ServerCommandQueue(std::shared_ptr<ServerCommandExecutor> executor, std::unique_ptr<ServerCommandHandler> command_handler) :
executor{std::move(executor)},
command_handler{command_handler.release()} {
assert(this->command_handler);
this->inner = std::make_shared<ServerCommandQueueInner>();
this->command_handler->inner = this->inner;
}
ServerCommandQueue::~ServerCommandQueue() = default;
void ServerCommandQueue::reset() {
this->inner->reset();
}
void ServerCommandQueue::enqueue_command_string(const std::string_view &buffer) {
auto command = ReassembledCommand::allocate(buffer.length());
memcpy(command->command(), buffer.data(), command->length());
this->enqueue_command_execution(command);
}
void ServerCommandQueue::enqueue_command_execution(ReassembledCommand *command) {
assert(!command->next_command);
bool command_handling_scheduled = this->inner->enqueue(command);
if(!command_handling_scheduled) {
this->executor->enqueue_handler(this->command_handler);
}
}
#if 0
void ServerCommandQueue::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) {
if(!this->client->getServer() || this->client->connectionState() >= ConnectionState::DISCONNECTING) {
return;
}
std::unique_ptr<ReassembledCommand, void(*)(ReassembledCommand*)> pending_command{nullptr, ReassembledCommand::free};
while(true) {
{
std::lock_guard pc_lock{this->pending_commands_lock};
pending_command.reset(this->pending_commands_head);
if(!pending_command) {
this->has_command_handling_scheduled = false;
return;
} else if(pending_command->next_command) {
this->pending_commands_head = pending_command->next_command;
} else {
this->pending_commands_head = nullptr;
this->pending_commands_tail = &this->pending_commands_head;
}
}
auto startTime = std::chrono::system_clock::now();
try {
this->client->handlePacketCommand(pipes::buffer_view{pending_command->command(), pending_command->length()});
} catch (std::exception& ex) {
logCritical(this->client->getServerId(), "{} Exception reached command execution root! {}", CLIENT_STR_LOG_PREFIX_(this->client), ex.what());
}
auto end = std::chrono::system_clock::now();
if(end - startTime > std::chrono::milliseconds(10)) {
logError(this->client->getServerId(),
"{} Handling of command packet needs more than 10ms ({}ms)",
CLIENT_STR_LOG_PREFIX_(this->client),
duration_cast<std::chrono::milliseconds>(end - startTime).count()
);
}
break; /* Maybe handle more than one command? Maybe some kind of time limit? */
}
auto voice_server = this->client->getVoiceServer();
if(voice_server) {
voice_server->schedule_command_handling(client);
}
}
#endif
ServerCommandExecutor::ServerCommandExecutor(size_t threads) : thread_count_{threads} {
this->threads_.reserve(threads);
for(size_t index{0}; index < threads; index++) {
auto& thread = this->threads_.emplace_back(&ServerCommandExecutor::thread_entry_point, this);
threads::name(thread, "cmd executor " + std::to_string(index + 1));
}
}
ServerCommandExecutor::~ServerCommandExecutor() {
this->shutdown();
}
void ServerCommandExecutor::shutdown() {
{
std::lock_guard handler_lock{this->handler_mutex};
this->handler_shutdown = true;
this->handler_notify.notify_all();
}
for(auto& thread : this->threads_) {
threads::save_join(thread, true);
}
}
void ServerCommandExecutor::enqueue_handler(const std::shared_ptr<ServerCommandHandler> &handler) {
std::lock_guard handler_lock{this->handler_mutex};
if(handler->next_handler) {
/* handler already scheduled */
return;
}
if(handler->executing) {
/* handler currently gets executed */
return;
}
if(this->handler_tail == &handler->next_handler) {
/* handler already schedules (current head) */
return;
}
*this->handler_tail = handler;
this->handler_tail = &handler->next_handler;
this->handler_notify.notify_one();
}
void ServerCommandExecutor::thread_entry_point(void *ptr_this) {
reinterpret_cast<ServerCommandExecutor*>(ptr_this)->executor();
}
void ServerCommandExecutor::executor() {
std::unique_lock handler_lock{this->handler_mutex};
while(!this->handler_shutdown) {
this->handler_notify.wait(handler_lock, [&]{
return this->handler_shutdown || this->handler_head != nullptr;
});
if(this->handler_shutdown) {
break;
}
if(!this->handler_head) {
continue;
}
auto executor = this->handler_head;
if(executor->next_handler) {
this->handler_head = executor->next_handler;
} else {
this->handler_head = nullptr;
this->handler_tail = &this->handler_head;
}
executor->executing = true;
handler_lock.unlock();
auto reschedule = executor->execute_handling();
handler_lock.lock();
executor->executing = false;
if(reschedule && !executor->next_handler && this->handler_tail != &executor->next_handler) {
*this->handler_tail = executor;
this->handler_tail = &executor->next_handler;
/* No need to notify anybody since we'll be executing him again */
}
}
}
@@ -0,0 +1,89 @@
#pragma once
#include <misc/spin_mutex.h>
#include <pipes/buffer.h>
#include <EventLoop.h>
namespace ts::server {
class VoiceClient;
}
namespace ts::server {
namespace command {
struct ReassembledCommand;
}
class ServerCommandExecutor;
class ServerCommandQueue;
struct ServerCommandQueueInner;
class ServerCommandHandler {
friend class ServerCommandQueue;
friend class ServerCommandExecutor;
public:
ServerCommandHandler() = default;
protected:
/**
* Handle a command.
* @returns `false` if all other commands should be dropped and no further command handling should be done.
* `true` on success.
*/
virtual bool handle_command(const std::string_view& /* raw command */) = 0;
private:
std::shared_ptr<ServerCommandQueueInner> inner{nullptr};
/* locked by ServerCommandExecutor::handler_mutex */
std::shared_ptr<ServerCommandHandler> next_handler{nullptr};
bool executing{false};
/**
* @returns `true` if more commands need to be handled and `false` if all commands have been handled.
*/
bool execute_handling();
};
class ServerCommandQueue {
public:
explicit ServerCommandQueue(std::shared_ptr<ServerCommandExecutor> /* executor */, std::unique_ptr<ServerCommandHandler> /* command handler */);
~ServerCommandQueue();
void reset();
void enqueue_command_string(const std::string_view& /* payload */);
/* Attention: The method will take ownership of the command */
void enqueue_command_execution(command::ReassembledCommand*);
private:
std::shared_ptr<ServerCommandExecutor> executor{};
std::shared_ptr<ServerCommandHandler> command_handler{};
std::shared_ptr<ServerCommandQueueInner> inner;
};
class ServerCommandExecutor {
friend class ServerCommandQueue;
public:
explicit ServerCommandExecutor(size_t /* threads */);
~ServerCommandExecutor();
[[nodiscard]] inline auto thread_count() const { return this->thread_count_; }
void shutdown();
protected:
void enqueue_handler(const std::shared_ptr<ServerCommandHandler>& /* handler */);
private:
size_t thread_count_;
std::vector<std::thread> threads_{};
std::mutex handler_mutex{};
std::condition_variable handler_notify{};
std::shared_ptr<ServerCommandHandler> handler_head{nullptr};
std::shared_ptr<ServerCommandHandler>* handler_tail{&this->handler_head};
bool handler_shutdown{false};
static void thread_entry_point(void* /* this ptr */);
void executor();
};
}