Reworked the voice IP part

This commit is contained in:
WolverinDEV 2021-04-15 17:14:46 +02:00
parent 0726cd6c95
commit 1eaa9bb371
2 changed files with 112 additions and 12 deletions

View File

@ -7,6 +7,7 @@
#include <cassert> #include <cassert>
#include <iostream> #include <iostream>
#include <algorithm> #include <algorithm>
#include <future>
using std::chrono::system_clock; using std::chrono::system_clock;
using namespace ts; using namespace ts;
@ -28,14 +29,37 @@ task_executor::task_executor(size_t num_threads, const std::string &thread_prefi
} }
task_executor::~task_executor() { task_executor::~task_executor() {
{
std::lock_guard task_lock{this->task_context->mutex};
for(auto& thread : this->executors) {
if(!thread->executing_recurring_task) {
continue;
}
/* TODO: Log error */
thread->executing_recurring_task->shutdown = true;
}
while(this->task_context->task_head) {
auto task = std::exchange(this->task_context->task_head, this->task_context->task_head->next);
delete task;
}
this->task_context->task_tail = &this->task_context->task_head;
this->task_context->task_count = 0;
while(this->task_context->task_recurring_head) {
auto task = std::exchange(this->task_context->task_recurring_head, this->task_context->task_recurring_head->next);
delete task;
}
this->task_context->task_recurring_count = 0;
}
for(auto& thread : this->executors) { for(auto& thread : this->executors) {
if(thread->thread_handle.joinable()) { if(thread->thread_handle.joinable()) {
/* TODO: Print an error */ /* TODO: Print an error */
thread->thread_handle.detach(); thread->thread_handle.detach();
} }
} }
/* TODO: delete tasks which are still pending */
} }
void task_executor::set_exception_handler(task_exception_handler handler) { void task_executor::set_exception_handler(task_exception_handler handler) {
@ -176,7 +200,7 @@ void task_executor::enqueue_recurring_task(task_recurring *task) {
} }
} }
bool task_executor::cancel_task(task_id task_id) { task_executor::task_cancel_result task_executor::internal_cancel_task(std::future<void> *future, task_id task_id) {
auto& task_context_ = this->task_context; auto& task_context_ = this->task_context;
std::unique_lock task_lock{task_context->mutex}; std::unique_lock task_lock{task_context->mutex};
@ -206,7 +230,7 @@ bool task_executor::cancel_task(task_id task_id) {
task_lock.unlock(); task_lock.unlock();
delete current_task; delete current_task;
return true; return task_cancel_result::pending_canceled;
} }
previous_task = current_task; previous_task = current_task;
@ -233,7 +257,7 @@ bool task_executor::cancel_task(task_id task_id) {
task_lock.unlock(); task_lock.unlock();
delete current_task; delete current_task;
return true; return task_cancel_result::pending_canceled;
} }
previous_task = current_task; previous_task = current_task;
@ -244,31 +268,88 @@ bool task_executor::cancel_task(task_id task_id) {
/* 3. Our task does not seem to pend anywhere. May it already gets executed. */ /* 3. Our task does not seem to pend anywhere. May it already gets executed. */
for(auto& executor : this->executors) { for(auto& executor : this->executors) {
if(executor->executing_task && executor->executing_task->id == task_id) { if(executor->executing_task && executor->executing_task->id == task_id) {
auto task_handle = executor->executing_task;
if(task_handle->canceled) {
/* task has already been canceled */
return task_cancel_result::not_found;
}
task_handle->canceled = true;
if(future) {
assert(!task_handle->finish_callback.has_value());
task_handle->finish_callback = std::make_optional(std::promise<void>{});
*future = task_handle->finish_callback->get_future();
}
/* /*
* It gets executed right now. * It gets executed right now.
* The task itself will be deleted by the executor. * The task itself will be deleted by the executor.
* Note: No need to decrease the task count here since it has already been * Note: No need to decrease the task count here since it has already been
* decreased when receiving the task by the executor. * decreased when receiving the task by the executor.
*/ */
return true; return task_cancel_result::running_canceled;
} }
if(executor->executing_recurring_task && executor->executing_recurring_task->id == task_id) { if(executor->executing_recurring_task && executor->executing_recurring_task->id == task_id) {
auto task_handle = executor->executing_recurring_task;
if(task_handle->shutdown) {
/* the task has already been canceled */
return task_cancel_result::not_found;
}
/* /*
* It gets executed right now. * It gets executed right now.
* Setting shutdown flag to prevent rescheduling. * Setting shutdown flag to prevent rescheduling.
* The task will be deleted by the executor itself. * The task will be deleted by the executor itself.
*/ */
auto task_handle = executor->executing_recurring_task;
task_handle->shutdown = true; task_handle->shutdown = true;
if(future) {
assert(!task_handle->finish_callback.has_value());
task_handle->finish_callback = std::make_optional(std::promise<void>{});
*future = task_handle->finish_callback->get_future();
}
assert(task_context_->task_recurring_count > 0); assert(task_context_->task_recurring_count > 0);
task_context_->task_recurring_count--; task_context_->task_recurring_count--;
return true; return task_cancel_result::running_canceled;
} }
} }
return false; return task_cancel_result::not_found;
}
bool task_executor::cancel_task(task_id task_id) {
switch (this->internal_cancel_task(nullptr, task_id)) {
case task_cancel_result::pending_canceled:
case task_cancel_result::running_canceled:
return true;
case task_cancel_result::not_found:
default:
assert(false);
return false;
}
}
std::future<void> task_executor::cancel_task_joinable(task_id task_id) {
std::future<void> result{};
switch (this->internal_cancel_task(&result, task_id)) {
case task_cancel_result::pending_canceled:
/* May throw an exception? */
case task_cancel_result::not_found: {
std::promise<void> promise{};
promise.set_value();
return promise.get_future();
}
case task_cancel_result::running_canceled:
return result;
default:
assert(false);
return result;
}
} }
void task_executor::executor(std::shared_ptr<executor_context> executor_context) { void task_executor::executor(std::shared_ptr<executor_context> executor_context) {
@ -312,6 +393,10 @@ void task_executor::executor(std::shared_ptr<executor_context> executor_context)
task_lock.lock(); task_lock.lock();
executor_context->executing_task = nullptr; executor_context->executing_task = nullptr;
if(task->finish_callback.has_value()) {
task->finish_callback->set_value();
}
delete task; delete task;
continue; continue;
} }
@ -346,6 +431,10 @@ void task_executor::executor(std::shared_ptr<executor_context> executor_context)
task_lock.lock(); task_lock.lock();
executor_context->executing_recurring_task = nullptr; executor_context->executing_recurring_task = nullptr;
if(task->shutdown) { if(task->shutdown) {
if(task->finish_callback) {
task->finish_callback->set_value();
}
delete task; delete task;
} else { } else {
executor_context->handle->enqueue_recurring_task(task); executor_context->handle->enqueue_recurring_task(task);

View File

@ -6,6 +6,7 @@
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <future>
#include <cassert> #include <cassert>
namespace ts { namespace ts {
@ -35,12 +36,10 @@ namespace ts {
*/ */
bool cancel_task(task_id /* task id */); bool cancel_task(task_id /* task id */);
#if 0
/** /**
* Cancel a task with the possibility to wait until it has finished. * Cancel a task with the possibility to wait until it has finished.
*/ */
void cancel_task_joinable(task_id /* task id */); std::future<void> cancel_task_joinable(task_id /* task id */);
#endif
/** /**
* @returns `true` if the task has successfully be enqueued for scheduling. * @returns `true` if the task has successfully be enqueued for scheduling.
@ -60,6 +59,10 @@ namespace ts {
std::string name{}; std::string name{};
std::function<void()> callback{}; std::function<void()> callback{};
/* will be set to true if the task has been canceled but is executing right now */
bool canceled{false};
std::optional<std::promise<void>> finish_callback{};
task* next{nullptr}; task* next{nullptr};
}; };
@ -68,6 +71,7 @@ namespace ts {
std::string name{}; std::string name{};
bool shutdown{false}; bool shutdown{false};
std::optional<std::promise<void>> finish_callback{};
std::chrono::nanoseconds interval{}; std::chrono::nanoseconds interval{};
std::chrono::system_clock::time_point last_invoked{}; std::chrono::system_clock::time_point last_invoked{};
@ -109,6 +113,12 @@ namespace ts {
task_recurring* executing_recurring_task{nullptr}; task_recurring* executing_recurring_task{nullptr};
}; };
enum struct task_cancel_result {
not_found,
pending_canceled,
running_canceled,
};
std::vector<std::shared_ptr<executor_context>> executors{}; std::vector<std::shared_ptr<executor_context>> executors{};
std::shared_ptr<task_context> task_context; std::shared_ptr<task_context> task_context;
@ -119,6 +129,7 @@ namespace ts {
* 2. The task should not be enqueued already * 2. The task should not be enqueued already
*/ */
void enqueue_recurring_task(task_recurring* /* task */); void enqueue_recurring_task(task_recurring* /* task */);
[[nodiscard]] task_cancel_result internal_cancel_task(std::future<void>* /* future */, task_id /* task id */);
static void executor(std::shared_ptr<executor_context> /* context shared pointer */); static void executor(std::shared_ptr<executor_context> /* context shared pointer */);
static void abort_exception_handler(const std::string& /* task name */, const std::exception_ptr& /* exception */); static void abort_exception_handler(const std::string& /* task name */, const std::exception_ptr& /* exception */);