A lot of query reworking

This commit is contained in:
WolverinDEV 2021-01-28 20:59:14 +01:00
parent eb77a7fefb
commit 0cd49a6a2b
4 changed files with 73 additions and 79 deletions

View File

@ -7,84 +7,83 @@
#include <thread>
#include <condition_variable>
namespace ts {
namespace event {
class EventExecutor;
namespace ts::event {
class EventExecutor;
class EventEntry {
friend class EventExecutor;
public:
virtual void event_execute(const std::chrono::system_clock::time_point& /* scheduled timestamp */) = 0;
class EventEntry {
friend class EventExecutor;
public:
virtual void event_execute(const std::chrono::system_clock::time_point& /* scheduled timestamp */) = 0;
private:
void* _event_ptr = nullptr;
};
private:
void* _event_ptr = nullptr;
};
template <typename class_t>
class ProxiedEventEntry : public event::EventEntry {
public:
using callback_t = void(class_t::*)(const std::chrono::system_clock::time_point &);
using static_callback_t = void(*)(class_t *, const std::chrono::system_clock::time_point &);
template <typename class_t>
class ProxiedEventEntry : public event::EventEntry {
public:
using callback_t = void(class_t::*)(const std::chrono::system_clock::time_point &);
using static_callback_t = void(*)(class_t *, const std::chrono::system_clock::time_point &);
ProxiedEventEntry(const std::shared_ptr<class_t>& _instance, callback_t callback) : instance(_instance), callback(callback) { }
ProxiedEventEntry(const std::shared_ptr<class_t>& _instance, callback_t callback) : instance(_instance), callback(callback) { }
std::weak_ptr<class_t> instance;
callback_t callback;
std::weak_ptr<class_t> instance;
callback_t callback;
void event_execute(const std::chrono::system_clock::time_point &point) override {
auto _instance = this->instance.lock();
if(!_instance)
return;
auto callback_ptr = (void**) &this->callback;
(*(static_callback_t*) callback_ptr)(&*_instance, point);
void event_execute(const std::chrono::system_clock::time_point &point) override {
auto _instance = this->instance.lock();
if(!_instance) {
return;
}
};
class EventExecutor {
public:
explicit EventExecutor(std::string /* thread prefix */);
virtual ~EventExecutor();
auto callback_ptr = (void**) &this->callback;
(*(static_callback_t*) callback_ptr)(&*_instance, point);
}
};
bool initialize(int /* num threads */);
bool schedule(const std::shared_ptr<EventEntry>& /* entry */);
bool cancel(const std::shared_ptr<EventEntry>& /* entry */); /* Note: Will not cancel already running executes */
void shutdown();
class EventExecutor {
public:
explicit EventExecutor(std::string /* thread prefix */);
virtual ~EventExecutor();
inline const std::string& thread_prefix() const { return this->_thread_prefix; }
bool initialize(int /* num threads */);
bool schedule(const std::shared_ptr<EventEntry>& /* entry */);
bool cancel(const std::shared_ptr<EventEntry>& /* entry */); /* Note: Will not cancel already running executes */
void shutdown();
void threads(int /* num threads */);
inline int threads() const { return this->target_threads; }
private:
struct LinkedEntry {
LinkedEntry* previous;
LinkedEntry* next;
inline const std::string& thread_prefix() const { return this->_thread_prefix; }
std::chrono::system_clock::time_point scheduled;
std::weak_ptr<EventEntry> entry;
};
void threads(int /* num threads */);
inline int threads() const { return this->target_threads; }
private:
struct LinkedEntry {
LinkedEntry* previous;
LinkedEntry* next;
static void _executor(EventExecutor*);
void _spawn_executor(std::unique_lock<std::mutex>&);
void _shutdown(std::unique_lock<std::mutex>&);
void _reset_events(std::unique_lock<std::mutex>&);
std::chrono::system_clock::time_point scheduled;
std::weak_ptr<EventEntry> entry;
};
static void _executor(EventExecutor*);
void _spawn_executor(std::unique_lock<std::mutex>&);
void _shutdown(std::unique_lock<std::mutex>&);
void _reset_events(std::unique_lock<std::mutex>&);
#ifndef WIN32
void _reassign_thread_names(std::unique_lock<std::mutex>&);
void _reassign_thread_names(std::unique_lock<std::mutex>&);
#endif
bool should_shutdown = true;
bool should_adjust = false; /* thread adjustments */
int target_threads = 0;
bool should_shutdown = true;
bool should_adjust = false; /* thread adjustments */
int target_threads = 0;
std::vector<std::thread> _threads;
std::mutex lock;
std::condition_variable condition;
std::vector<std::thread> _threads;
std::mutex lock;
std::condition_variable condition;
LinkedEntry* head = nullptr;
LinkedEntry* tail = nullptr;
LinkedEntry* head = nullptr;
LinkedEntry* tail = nullptr;
std::string _thread_prefix;
};
}
std::string _thread_prefix;
};
}

View File

@ -111,9 +111,6 @@ namespace ts {
this->memory_state.id_branded = true;
this->setPacketId(packetId, generationId);
}
Command BasicPacket::asCommand() {
return Command::parse(this->data());
}
/**
* @param buffer -> [mac][Header [uint16 BE packetId | [uint8](4bit flags | 4bit type)]][Data]

View File

@ -78,39 +78,37 @@ namespace ts {
}
*/
Command Command::parse(const pipes::buffer_view &buffer, bool expect_type, bool drop_non_utf8) {
string_view data{buffer.data_ptr<const char>(), buffer.length()};
Command Command::parse(const std::string_view& command_data, bool expect_type, bool drop_non_utf8) {
Command result;
size_t current_index = std::string::npos, end_index;
if(expect_type) {
current_index = data.find(' ', 0);
current_index = command_data.find(' ', 0);
if(current_index == std::string::npos){
result._command = std::string(data);
result._command = std::string{command_data};
return result;
} else {
result._command = std::string(data.substr(0, current_index));
result._command = std::string{command_data.substr(0, current_index)};
}
}
size_t bulk_index = 0;
while(++current_index > 0 || (current_index == 0 && !expect_type && (expect_type = true))) {
end_index = data.find_first_of(" |", current_index);
end_index = command_data.find_first_of(" |", current_index);
if(end_index != current_index && current_index < data.length()) { /* else we've found another space or a pipe */
if(data[current_index] == '-') {
string trigger(data.substr(current_index + 1, end_index - current_index - 1));
if(end_index != current_index && current_index < command_data.length()) { /* else we've found another space or a pipe */
if(command_data[current_index] == '-') {
string trigger(command_data.substr(current_index + 1, end_index - current_index - 1));
result.paramethers.push_back(trigger);
} else {
auto index_assign = data.find_first_of('=', current_index);
auto index_assign = command_data.find_first_of('=', current_index);
string key, value;
if(index_assign == string::npos || index_assign > end_index) {
key = data.substr(current_index, end_index - current_index);
key = command_data.substr(current_index, end_index - current_index);
} else {
key = data.substr(current_index, index_assign - current_index);
key = command_data.substr(current_index, index_assign - current_index);
try {
value = query::unescape(string(data.substr(index_assign + 1, end_index - index_assign - 1)), true);
value = query::unescape(string(command_data.substr(index_assign + 1, end_index - index_assign - 1)), true);
} catch(const std::invalid_argument& ex) {
(void) ex;
@ -134,7 +132,7 @@ namespace ts {
}
}
if(end_index < data.length() && data[end_index] == '|')
if(end_index < command_data.length() && command_data[end_index] == '|')
bulk_index++;
current_index = end_index;
}

View File

@ -161,7 +161,7 @@ operator type(){ \
class Command {
public:
static Command parse(const pipes::buffer_view& buffer, bool expect_command = true, bool drop_non_utf8 = false);
static Command parse(const std::string_view& command_data, bool expect_command = true, bool drop_non_utf8 = false);
explicit Command(const std::string& command);
explicit Command(const std::string& command, std::initializer_list<variable>);