diff --git a/CMakeLists.txt b/CMakeLists.txt index 7bff075..5fbce4f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,6 +104,7 @@ set(SOURCE_FILES src/misc/digest.cpp src/misc/base64.cpp src/misc/net.cpp + src/misc/task_executor.cpp src/lock/rw_mutex.cpp @@ -155,6 +156,7 @@ set(HEADER_FILES src/misc/hex.h src/misc/advanced_mutex.h src/misc/strobf.h + src/misc/task_executor.h src/protocol/buffers.h src/protocol/Packet.h diff --git a/src/Properties.h b/src/Properties.h index 481f8b4..d225e1d 100644 --- a/src/Properties.h +++ b/src/Properties.h @@ -823,6 +823,13 @@ namespace ts { template PropertyWrapper& operator=(const T& value) { + this->update_value(value); + return *this; + } + + + template + bool update_value(const T& value) { static_assert(ts::converter::supported, "type isn't supported for type"); { @@ -830,17 +837,18 @@ namespace ts { auto value_string = ts::converter::to_string(any_value); std::lock_guard lock(this->data_ptr->value_lock); - if(value_string == this->data_ptr->value) - return *this; + if(value_string == this->data_ptr->value) { + return false; + } + this->data_ptr->casted_value = any_value; this->data_ptr->value = value_string; } + this->trigger_update(); - - return *this; + return true; } - PropertyWrapper& operator=(const std::string& value) { this->value(value); return *this; diff --git a/src/misc/task_executor.cpp b/src/misc/task_executor.cpp new file mode 100644 index 0000000..1f1079e --- /dev/null +++ b/src/misc/task_executor.cpp @@ -0,0 +1,353 @@ +// +// Created by WolverinDEV on 21/02/2021. +// + +#include "./task_executor.h" +#include +#include +#include + +using std::chrono::system_clock; +using namespace ts; + +task_executor::task_executor(size_t num_threads, const std::string &thread_prefix) { + this->task_context = std::make_shared(); + + this->executors.reserve(num_threads); + for(size_t index{0}; index < num_threads; index++) { + auto handle = std::make_shared(); + handle->handle = this; + handle->task_context = this->task_context; + handle->thread_handle = std::thread(task_executor::executor, handle); + if(!thread_prefix.empty()) { + threads::name(handle->thread_handle, thread_prefix + std::to_string(index + 1)); + } + this->executors.push_back(std::move(handle)); + } +} + +task_executor::~task_executor() { + for(auto& thread : this->executors) { + if(thread->thread_handle.joinable()) { + /* TODO: Print an error */ + thread->thread_handle.detach(); + } + } + + /* TODO: delete tasks which are still pending */ +} + +void task_executor::set_exception_handler(task_exception_handler handler) { + std::lock_guard task_lock{this->task_context->mutex}; + this->task_context->exception_handler = std::move(handler); +} + +void task_executor::abort_exception_handler(const std::string &task, const std::exception_ptr &exception) { + std::string message{}; + try { + std::rethrow_exception(exception); + } catch (const std::exception& ex) { + message = "std::exception::what() -> " + std::string{ex.what()}; + } catch(...) { + message = "unknown exception"; + } + + std::cerr << "task_executor encountered an exception while executing task " << task << ": " << message << std::endl; + abort(); +} + +bool task_executor::shutdown(const std::chrono::system_clock::time_point &timeout) { + { + std::lock_guard task_lock{this->task_context->mutex}; + this->task_context->shutdown = true; + this->task_context->notify.notify_all(); + } + + for(auto& thread : this->executors) { + if(timeout.time_since_epoch().count() > 0) { + auto now = system_clock::now(); + if(now > timeout) { + /* failed to join all executors */ + return false; + } + + if(!threads::timed_join(thread->thread_handle, timeout - now)) { + /* thread failed to join */ + return false; + } + } else { + threads::save_join(thread->thread_handle, false); + } + } + + return true; +} + +bool task_executor::schedule(task_id &task_id, std::string task_name, std::function callback) { + auto& task_context_ = this->task_context; + + auto task = std::make_unique(); + task->name = std::move(task_name); + task->callback = std::move(callback); + + std::lock_guard task_lock{task_context_->mutex}; + if(task_context_->shutdown) { + return false; + } + + task->id = task_context_->id_index++; + if(!task->id) { + /* the task ids wrapped around I guess */ + task->id = task_context_->id_index++; + } + task_id = task->id; + + auto task_ptr = task.release(); + task_context_->task_count++; + *task_context_->task_tail = task_ptr; + task_context_->task_tail = &task_ptr->next; + task_context_->notify.notify_one(); + return true; +} + +bool task_executor::schedule_repeating(task_id &task_id, std::string task_name, std::chrono::nanoseconds interval, + std::function callback) { + auto& task_context_ = this->task_context; + + auto task = std::make_unique(); + task->name = std::move(task_name); + task->callback = std::move(callback); + task->interval = std::move(interval); + task->scheduled_invoke = std::chrono::system_clock::now(); + + std::lock_guard task_lock{task_context_->mutex}; + if(task_context_->shutdown) { + return false; + } + + task->id = task_context_->id_index++; + if(!task->id) { + /* the task ids wrapped around I guess */ + task->id = task_context_->id_index++; + } + task_id = task->id; + + task_context_->task_recurring_count++; + this->enqueue_recurring_task(task.release()); + task_context_->notify.notify_one(); + return true; +} + +void task_executor::enqueue_recurring_task(task_recurring *task) { + auto& task_context_ = this->task_context; + + if(!task_context_->task_recurring_head) { + /* No tasks pending. We could easily enqueue the task. */ + task->next = nullptr; + task_context_->task_recurring_head = task; + } else { + /* Find the correct insert spot */ + task_recurring* previous_task{nullptr}; + task_recurring* next_task{task_context_->task_recurring_head}; + while(true) { + if(next_task->scheduled_invoke > task->scheduled_invoke) { + break; + } + + previous_task = next_task; + next_task = next_task->next; + + if(!next_task) { + /* We reached the queue end. */ + break; + } + } + + task->next = next_task; + if(!previous_task) { + /* we're inserting the task as head */ + assert(next_task == task_context_->task_recurring_head); + task_context_->task_recurring_head = task; + } else { + previous_task->next = task; + task->next = next_task; + } + } +} + +bool task_executor::cancel_task(task_id task_id) { + auto& task_context_ = this->task_context; + + std::unique_lock task_lock{task_context->mutex}; + /* 1. Search for a pending normal task */ + { + task* previous_task{nullptr}; + task* current_task{task_context_->task_head}; + while(current_task) { + if(current_task->id == task_id) { + /* We found our task. Just remove and delete it. */ + if(previous_task) { + previous_task->next = current_task->next; + } else { + assert(task_context_->task_head == current_task); + if(current_task->next) { + assert(task_context_->task_tail != ¤t_task->next); + task_context_->task_head = current_task->next; + } else { + assert(task_context_->task_tail == ¤t_task->next); + task_context_->task_head = nullptr; + task_context_->task_tail = &task_context_->task_head; + } + } + + assert(task_context_->task_count > 0); + task_context_->task_count--; + task_lock.unlock(); + + delete current_task; + return true; + } + + previous_task = current_task; + current_task = current_task->next; + } + } + + /* 2. Search for a pending recurring task */ + { + task_recurring* previous_task{nullptr}; + task_recurring* current_task{task_context_->task_recurring_head}; + while(current_task) { + if(current_task->id == task_id) { + /* We found our task. Just remove and delete it. */ + if(previous_task) { + previous_task->next = current_task->next; + } else { + assert(task_context_->task_recurring_head == current_task); + task_context_->task_recurring_head = nullptr; + } + + assert(task_context_->task_recurring_count > 0); + task_context_->task_recurring_count--; + task_lock.unlock(); + + delete current_task; + return true; + } + + previous_task = current_task; + current_task = current_task->next; + } + } + + /* 3. Our task does not seem to pend anywhere. May it already gets executed. */ + for(auto& executor : this->executors) { + if(executor->executing_task && executor->executing_task->id == task_id) { + /* + * It gets executed right now. + * The task itself will be deleted by the executor. + * Note: No need to decrease the task count here since it has already been + * decreased when receiving the task by the executor. + */ + return true; + } + + if(executor->executing_recurring_task && executor->executing_recurring_task->id == task_id) { + /* + * It gets executed right now. + * Setting shutdown flag to prevent rescheduling. + * The task will be deleted by the executor itself. + */ + auto task_handle = executor->executing_recurring_task; + task_handle->shutdown = true; + + assert(task_context_->task_recurring_count > 0); + task_context_->task_recurring_count--; + return true; + } + } + + return false; +} + +void task_executor::executor(std::shared_ptr executor_context) { + auto& task_context = executor_context->task_context; + + std::unique_lock task_lock{task_context->mutex}; + while(true) { + assert(task_lock.owns_lock()); + if(task_context->shutdown) { + break; + } + + if(task_context->task_head) { + auto task = task_context->task_head; + if(task->next) { + assert(task_context->task_tail != &task->next); + task_context->task_head = task->next; + } else { + assert(task_context->task_tail == &task->next); + task_context->task_head = nullptr; + task_context->task_tail = &task_context->task_head; + } + + assert(task_context->task_count > 0); + task_context->task_count--; + + executor_context->executing_task = task; + task_lock.unlock(); + + try { + task->callback(); + } catch (...) { + auto exception = std::current_exception(); + + task_lock.lock(); + auto handler = task_context->exception_handler; + task_lock.unlock(); + + handler(task->name, exception); + } + + task_lock.lock(); + executor_context->executing_task = nullptr; + delete task; + continue; + } + + auto execute_timestamp = system_clock::now(); + if(task_context->task_recurring_head && task_context->task_recurring_head->scheduled_invoke <= execute_timestamp) { + auto task = task_context->task_recurring_head; + task_context->task_recurring_head = task->next; + + executor_context->executing_recurring_task = task; + task_lock.unlock(); + + try { + task->callback(task->last_invoked); + } catch (...) { + auto exception = std::current_exception(); + + task_lock.lock(); + auto handler = task_context->exception_handler; + task_lock.unlock(); + + handler(task->name, exception); + } + + task->last_invoked = execute_timestamp; + task->scheduled_invoke = std::min(system_clock::now(), execute_timestamp + task->interval); + + task_lock.lock(); + executor_context->executing_recurring_task = nullptr; + if(task->shutdown) { + delete task; + } else { + executor_context->handle->enqueue_recurring_task(task); + } + continue; + } + + task_context->notify.wait(task_lock); + } +} \ No newline at end of file diff --git a/src/misc/task_executor.h b/src/misc/task_executor.h new file mode 100644 index 0000000..8af4af7 --- /dev/null +++ b/src/misc/task_executor.h @@ -0,0 +1,281 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace ts { + typedef uint32_t task_id; + typedef std::function task_exception_handler; + + /** + * A basic task executor & scheduler for one time and repeating tasks + * Note: All methods are thread save or it's specified otherwise + */ + class task_executor { + public: + task_executor(size_t /* num threads */, const std::string& /* thread prefix */); + ~task_executor(); + + void set_exception_handler(task_exception_handler); + + /** + * Note: This method is not thread save. + * @returns `true` if all actions have been successfully shut down + */ + bool shutdown(const std::chrono::system_clock::time_point & /* timeout */); + + /** + * Cancel a task. If the task is currently executing it will not block. + * @returns `true` if the task has been found and `false` if the task isn't known. + */ + bool cancel_task(task_id /* task id */); + +#if 0 + /** + * Cancel a task with the possibility to wait until it has finished. + */ + void cancel_task_joinable(task_id /* task id */); +#endif + + /** + * @returns `true` if the task has successfully be enqueued for scheduling. + */ + bool schedule(task_id& /* task handle */, std::string /* name */, std::function /* callback */); + + /** + * @returns `true` if the task has successfully be enqueued for repeating scheduling. + */ + bool schedule_repeating(task_id& /* task handle */, + std::string /* name */, + std::chrono::nanoseconds /* interval */, + std::function /* callback */); + private: + struct task { + task_id id{0}; + std::string name{}; + std::function callback{}; + + task* next{nullptr}; + }; + + struct task_recurring { + task_id id{0}; + std::string name{}; + + bool shutdown{false}; + + std::chrono::nanoseconds interval{}; + std::chrono::system_clock::time_point last_invoked{}; + std::chrono::system_clock::time_point scheduled_invoke{}; + + std::function callback{}; + + task_recurring* next{nullptr}; + }; + + struct task_context { + std::mutex mutex{}; + std::condition_variable notify{}; + + bool shutdown{false}; + task_id id_index{1}; + + size_t task_count{}; + task* task_head{nullptr}; + task** task_tail{&this->task_head}; + + size_t task_recurring_count{}; + task_recurring* task_recurring_head{nullptr}; + + task_exception_handler exception_handler{task_executor::abort_exception_handler}; + }; + + struct executor_context { + task_executor* handle; + std::thread thread_handle{}; + + std::shared_ptr task_context{}; + + /** + * Must be accessed while holding the task_context.mutex and shall never be changed except for the executor. + * Lifetime will be granted while holding the lock. + */ + task* executing_task{nullptr}; + task_recurring* executing_recurring_task{nullptr}; + }; + + std::vector> executors{}; + std::shared_ptr task_context; + + /** + * Enqueue the task into the task queue. + * Attention: + * 1. The task context mutex must be hold by the caller + * 2. The task should not be enqueued already + */ + void enqueue_recurring_task(task_recurring* /* task */); + + static void executor(std::shared_ptr /* context shared pointer */); + static void abort_exception_handler(const std::string& /* task name */, const std::exception_ptr& /* exception */); + }; + + /** + * Helper class for tasks which could be executed multiple times. + * It will avoid execution stacking while the task is executing. + * The task will never be executed twice only sequential. + */ + struct multi_shot_task { + public: + explicit multi_shot_task() {} + + multi_shot_task(std::shared_ptr executor, std::string task_name, std::function callback) + : inner{std::make_shared(std::move(executor), std::move(task_name), std::move(callback))} { + this->inner->callback_wrapper = [inner = this->inner]{ + auto result = inner->schedule_kind.exchange(2); + assert(result == 1); + (void) result; + + try { + (inner->callback)(); + execute_finished(&*inner); + } catch (...) { + execute_finished(&*inner); + std::rethrow_exception(std::current_exception()); + } + }; + } + + multi_shot_task(const multi_shot_task&) = default; + multi_shot_task(multi_shot_task&& other) = default; + + inline multi_shot_task& operator=(const multi_shot_task& other) { + this->inner = other.inner; + return *this; + } + + inline multi_shot_task& operator=(multi_shot_task&& other) { + this->inner = std::move(other.inner); + return *this; + } + + /** + * @returns `true` if the task has successfully be enqueued or is already enqueued + * and `false` if the `schedule` call failed or we have no task. + */ + inline bool enqueue() { + auto& inner_ = this->inner; + + if(!inner_) { + return false; + } + + { + //CAS loop: https://preshing.com/20150402/you-can-do-any-kind-of-atomic-read-modify-write-operation/ + + uint8_t current_state = inner_->schedule_kind.load(); + uint8_t new_state; + do { + switch(current_state) { + case 0: + /* no execute has been scheduled */ + new_state = 1; + break; + + case 1: + case 3: + /* an execute is already scheduled */ + return true; + + case 2: + /* we're already executing now but we need a new execute */ + new_state = 3; + break; + + default: + assert(false); + return false; + } + } while(!inner_->schedule_kind.compare_exchange_weak(current_state, new_state)); + } + + task_id task_id_; + auto result = inner_->executor->schedule(task_id_, inner_->task_name, inner->callback_wrapper); + if(!result) { + /* + * Task isn't scheduled any more. We failed to schedule it. + * Note: The task might got rescheduled again so may more than only one schedule attempt fail + * in total. + */ + inner_->schedule_kind = 0; + return false; + } + + return true; + } + private: + struct execute_inner { + explicit execute_inner(std::shared_ptr executor, std::string name, std::function callback) noexcept + : task_name{std::move(name)}, executor{std::move(executor)}, callback{std::move(callback)} {} + + std::string task_name; + std::shared_ptr executor; + + std::function callback; + std::function callback_wrapper; + + /** + * `0` not scheduled + * `1` scheduled + * `2` executing + * `3` executing with reschedule + */ + std::atomic schedule_kind{0}; + }; + + std::shared_ptr inner{}; + + inline static void execute_finished(execute_inner* inner) { + auto current_state = inner->schedule_kind.load(); + uint8_t new_state; + do { + switch(current_state) { + case 0: + case 1: + assert(false); + return; + + case 2: + new_state = 0; + break; + + case 3: + new_state = 1; + break; + + default: + assert(false); + return; + }; + } while(!inner->schedule_kind.compare_exchange_weak(current_state, new_state)); + + if(new_state == 1) { + /* a reschedule was requested */ + + task_id task_id_; + if(!inner->executor->schedule(task_id_, inner->task_name, inner->callback_wrapper)) { + /* + * Task isn't scheduled any more. We failed to schedule it. + * Note: The task might got rescheduled again so may more than only one schedule attempt fail + * in total. + */ + inner->schedule_kind = 0; + } + } + } + }; +} \ No newline at end of file