From 1eaa9bb371387aeb17b0d87e8068654b5844e9fe Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 15 Apr 2021 17:14:46 +0200 Subject: [PATCH] Reworked the voice IP part --- src/misc/task_executor.cpp | 107 +++++++++++++++++++++++++++++++++---- src/misc/task_executor.h | 17 ++++-- 2 files changed, 112 insertions(+), 12 deletions(-) diff --git a/src/misc/task_executor.cpp b/src/misc/task_executor.cpp index 90bc520..05a492a 100644 --- a/src/misc/task_executor.cpp +++ b/src/misc/task_executor.cpp @@ -7,6 +7,7 @@ #include #include #include +#include using std::chrono::system_clock; using namespace ts; @@ -28,14 +29,37 @@ task_executor::task_executor(size_t num_threads, const std::string &thread_prefi } task_executor::~task_executor() { + { + std::lock_guard task_lock{this->task_context->mutex}; + for(auto& thread : this->executors) { + if(!thread->executing_recurring_task) { + continue; + } + + /* TODO: Log error */ + thread->executing_recurring_task->shutdown = true; + } + + while(this->task_context->task_head) { + auto task = std::exchange(this->task_context->task_head, this->task_context->task_head->next); + delete task; + } + this->task_context->task_tail = &this->task_context->task_head; + this->task_context->task_count = 0; + + while(this->task_context->task_recurring_head) { + auto task = std::exchange(this->task_context->task_recurring_head, this->task_context->task_recurring_head->next); + delete task; + } + this->task_context->task_recurring_count = 0; + } + for(auto& thread : this->executors) { if(thread->thread_handle.joinable()) { /* TODO: Print an error */ thread->thread_handle.detach(); } } - - /* TODO: delete tasks which are still pending */ } void task_executor::set_exception_handler(task_exception_handler handler) { @@ -176,7 +200,7 @@ void task_executor::enqueue_recurring_task(task_recurring *task) { } } -bool task_executor::cancel_task(task_id task_id) { +task_executor::task_cancel_result task_executor::internal_cancel_task(std::future *future, task_id task_id) { auto& task_context_ = this->task_context; std::unique_lock task_lock{task_context->mutex}; @@ -206,7 +230,7 @@ bool task_executor::cancel_task(task_id task_id) { task_lock.unlock(); delete current_task; - return true; + return task_cancel_result::pending_canceled; } previous_task = current_task; @@ -233,7 +257,7 @@ bool task_executor::cancel_task(task_id task_id) { task_lock.unlock(); delete current_task; - return true; + return task_cancel_result::pending_canceled; } previous_task = current_task; @@ -244,31 +268,88 @@ bool task_executor::cancel_task(task_id task_id) { /* 3. Our task does not seem to pend anywhere. May it already gets executed. */ for(auto& executor : this->executors) { if(executor->executing_task && executor->executing_task->id == task_id) { + auto task_handle = executor->executing_task; + if(task_handle->canceled) { + /* task has already been canceled */ + return task_cancel_result::not_found; + } + + task_handle->canceled = true; + if(future) { + assert(!task_handle->finish_callback.has_value()); + task_handle->finish_callback = std::make_optional(std::promise{}); + *future = task_handle->finish_callback->get_future(); + } + /* * It gets executed right now. * The task itself will be deleted by the executor. * Note: No need to decrease the task count here since it has already been * decreased when receiving the task by the executor. */ - return true; + return task_cancel_result::running_canceled; } if(executor->executing_recurring_task && executor->executing_recurring_task->id == task_id) { + auto task_handle = executor->executing_recurring_task; + if(task_handle->shutdown) { + /* the task has already been canceled */ + return task_cancel_result::not_found; + } + /* * It gets executed right now. * Setting shutdown flag to prevent rescheduling. * The task will be deleted by the executor itself. */ - auto task_handle = executor->executing_recurring_task; task_handle->shutdown = true; + if(future) { + assert(!task_handle->finish_callback.has_value()); + task_handle->finish_callback = std::make_optional(std::promise{}); + *future = task_handle->finish_callback->get_future(); + } assert(task_context_->task_recurring_count > 0); task_context_->task_recurring_count--; - return true; + return task_cancel_result::running_canceled; } } - return false; + return task_cancel_result::not_found; +} + +bool task_executor::cancel_task(task_id task_id) { + switch (this->internal_cancel_task(nullptr, task_id)) { + case task_cancel_result::pending_canceled: + case task_cancel_result::running_canceled: + return true; + + case task_cancel_result::not_found: + default: + assert(false); + return false; + } +} + +std::future task_executor::cancel_task_joinable(task_id task_id) { + std::future result{}; + switch (this->internal_cancel_task(&result, task_id)) { + case task_cancel_result::pending_canceled: + /* May throw an exception? */ + + case task_cancel_result::not_found: { + std::promise promise{}; + promise.set_value(); + return promise.get_future(); + } + + case task_cancel_result::running_canceled: + return result; + + default: + assert(false); + return result; + } } void task_executor::executor(std::shared_ptr executor_context) { @@ -312,6 +393,10 @@ void task_executor::executor(std::shared_ptr executor_context) task_lock.lock(); executor_context->executing_task = nullptr; + + if(task->finish_callback.has_value()) { + task->finish_callback->set_value(); + } delete task; continue; } @@ -346,6 +431,10 @@ void task_executor::executor(std::shared_ptr executor_context) task_lock.lock(); executor_context->executing_recurring_task = nullptr; if(task->shutdown) { + if(task->finish_callback) { + task->finish_callback->set_value(); + } + delete task; } else { executor_context->handle->enqueue_recurring_task(task); diff --git a/src/misc/task_executor.h b/src/misc/task_executor.h index fcc857a..d81f45b 100644 --- a/src/misc/task_executor.h +++ b/src/misc/task_executor.h @@ -6,6 +6,7 @@ #include #include #include +#include #include namespace ts { @@ -35,12 +36,10 @@ namespace ts { */ bool cancel_task(task_id /* task id */); -#if 0 /** * Cancel a task with the possibility to wait until it has finished. */ - void cancel_task_joinable(task_id /* task id */); -#endif + std::future cancel_task_joinable(task_id /* task id */); /** * @returns `true` if the task has successfully be enqueued for scheduling. @@ -60,6 +59,10 @@ namespace ts { std::string name{}; std::function callback{}; + /* will be set to true if the task has been canceled but is executing right now */ + bool canceled{false}; + std::optional> finish_callback{}; + task* next{nullptr}; }; @@ -68,6 +71,7 @@ namespace ts { std::string name{}; bool shutdown{false}; + std::optional> finish_callback{}; std::chrono::nanoseconds interval{}; std::chrono::system_clock::time_point last_invoked{}; @@ -109,6 +113,12 @@ namespace ts { task_recurring* executing_recurring_task{nullptr}; }; + enum struct task_cancel_result { + not_found, + pending_canceled, + running_canceled, + }; + std::vector> executors{}; std::shared_ptr task_context; @@ -119,6 +129,7 @@ namespace ts { * 2. The task should not be enqueued already */ void enqueue_recurring_task(task_recurring* /* task */); + [[nodiscard]] task_cancel_result internal_cancel_task(std::future* /* future */, task_id /* task id */); static void executor(std::shared_ptr /* context shared pointer */); static void abort_exception_handler(const std::string& /* task name */, const std::exception_ptr& /* exception */);