Teaspeak-Server/server/src/server/VoiceIOManager.cpp

330 lines
11 KiB
C++

#include <event.h>
#include <algorithm>
#include <log/LogUtils.h>
#include "src/VirtualServer.h"
#include "VoiceIOManager.h"
#include "VoiceServer.h"
#include "src/client/voice/VoiceClient.h"
using namespace std;
using namespace std::chrono;
using namespace ts;
using namespace ts::io;
using namespace ts::server;
VoiceIOManager::VoiceIOManager(){ }
VoiceIOManager::~VoiceIOManager() { }
std::shared_ptr<IOServerHandler> VoiceIOManager::enableIo(server::VirtualServer *server) {
auto server_io = std::make_shared<IOServerHandler>(server);
this->adjustExecutors(this->servers.size() + 1);
std::vector<shared_ptr<IOEventLoop>> use_list;
use_list.reserve(config::threads::voice::events_per_server);
lock_guard<mutex> executor_lock(this->executorLock);
for(size_t i = 0; i < config::threads::voice::events_per_server; i++){
auto loop = this->less_used_io_loop(use_list);
if(!loop) break; //No more loops open
server_io->create_event_loop_events(loop)->activate();
use_list.push_back(loop);
}
{
threads::MutexLock l(this->serverLock);
this->servers.push_back(server_io);
}
this->ioExecutorNotify.notify_all();
return server_io;
}
void VoiceIOManager::disableIo(server::VirtualServer* server) {
std::shared_ptr<IOServerHandler> server_io;
{
threads::MutexLock l(this->serverLock);
for(const auto& sio : this->servers) {
if(sio->server == server) {
server_io = sio;
break;
}
}
if(!server_io) return;
this->servers.erase(std::find(this->servers.begin(), this->servers.end(), server_io),this->servers.end());
}
for(const auto& entry : server_io->event_loop_events) {
entry->disable();
entry->despawn();
}
server_io->event_loop_events.clear();
this->adjustExecutors(this->servers.size());
}
void VoiceIOManager::shutdownGlobally() {
/* Unregister all servers */
{
lock_guard server_lock(this->serverLock);
for(const auto& server : this->servers)
for(const auto& loop : server->event_loop_events){
loop->disable();
loop->despawn();
}
}
/* shutting down event loops */
{
lock_guard<mutex> executor_lock(this->executorLock);
for(const auto& loop : this->event_loops) {
loop->shutdown = true;
event_base_loopexit(loop->base, nullptr);
}
this->ioExecutorNotify.notify_all();
}
/* keep a ref to all event loops so they dont despawn in their event thread */
unique_lock executor_lock{this->executorLock};
auto wait_end = system_clock::now() + chrono::seconds{5};
while(true) {
if(this->event_loops.empty()) {
break;
}
auto status = this->ioExecutorNotify.wait_until(executor_lock, wait_end);
if(status == std::cv_status::timeout) {
logCritical(LOG_GENERAL,
"Failed to shutdown all event loops successfully. After timeout {} loops are left.",
this->event_loops.size()
);
break;
}
}
/* now delete all loops */
this->event_loops.clear();
}
//TODO also reduce thread pool!
void VoiceIOManager::adjustExecutors(size_t size) {
lock_guard<mutex> l(this->executorLock);
size_t targetThreads = size * config::threads::voice::events_per_server;
if(targetThreads > config::threads::voice::io_limit)
targetThreads = config::threads::voice::io_limit;
if(targetThreads < config::threads::voice::io_min)
targetThreads = config::threads::voice::io_min;
while(this->event_loops.size() < targetThreads) {
spawnEventLoop();
}
}
IOEventLoop::~IOEventLoop() {
assert(this_thread::get_id() != this->executor.get_id());
assert(!this->executor.joinable());
}
/**
* @warning executor lock must be locked!
*/
std::shared_ptr<IOEventLoop> VoiceIOManager::spawnEventLoop() {
std::shared_ptr<IOEventLoop> loop = std::make_shared<IOEventLoop>();
loop->base = event_base_new();
loop->executor = std::thread(&VoiceIOManager::dispatchBase, this, loop);
loop->bound_thread = -1;
{
#ifndef WIN32
const auto name = "IO exec #" + to_string(this->event_loops.size());
pthread_setname_np(loop->executor.native_handle(), name.c_str());
#endif
const auto num_threads = std::thread::hardware_concurrency();
if(num_threads != 0 && config::threads::voice::bind_io_thread_to_kernel_thread) {
auto thread_usage = new uint8_t[num_threads];
memset(thread_usage, 0, num_threads);
for(auto& ev_loop : this->event_loops) {
if(ev_loop->bound_thread < 0 || ev_loop->bound_thread >= num_threads) {
continue;
}
thread_usage[ev_loop->bound_thread]++;
}
int thread_index = 0;
int thread_use_count = 256;
for(int index = 0; index < num_threads; index++)
if(thread_usage[index] < thread_use_count) {
thread_use_count = thread_usage[index];
thread_index = index;
}
debugMessage(0, "Binding event loop '{}' to CPU thread {}. Current bound threads: {}", name, thread_index, thread_use_count);
delete[] thread_usage;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(thread_index, &cpuset);
auto err = pthread_setaffinity_np(loop->executor.native_handle(), sizeof(cpu_set_t), &cpuset);
if(err != 0) {
logError(0, "Failed to bind IO event loop '{}' to kernel thread. Code: {} => {}/{}", name, err, errno, strerror(errno));
}
loop->bound_thread = thread_index;
}
}
this->event_loops.push_back(loop);
return loop;
}
std::shared_ptr<IOEventLoop> VoiceIOManager::less_used_io_loop(vector<shared_ptr<IOEventLoop>> &blacklist) {
std::shared_ptr<IOEventLoop> current;
for(const auto& loop : this->event_loops)
if(!current || loop->assigned_events.size() < current->assigned_events.size()) {
for(const auto& elm : blacklist) if(elm == loop) goto skipSet;
current = loop;
skipSet:;
}
return current;
}
IOServerHandler::IOServerHandler(server::VirtualServer* server) : server(server) { }
IOServerHandler::~IOServerHandler() {
for(const auto& entry : this->event_loop_events) {
entry->disable();
entry->despawn();
}
this->event_loop_events.clear();
}
std::shared_ptr<IOEventLoopEvents> IOServerHandler::create_event_loop_events(const std::shared_ptr<IOEventLoop> &loop) {
std::shared_ptr<IOEventLoopEvents> entry = std::make_shared<IOEventLoopEvents>();
entry->owner = this;
entry->event_loop = loop;
entry->spawn();
this->event_loop_events.push_back(entry);
return entry;
}
int IOServerHandler::resolve_file_descriptor(const std::shared_ptr<ts::server::VoiceClient> &client) {
if(this->event_loop_events.empty())
return -1;
#if 0
auto socket = client->connection->socket_id();
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return -1;
return event_loop->events[socket]->file_descriptor;
#endif
return -1;
}
void IOServerHandler::invoke_write(const std::shared_ptr<ts::server::VoiceClient> &client) {
if(this->event_loop_events.empty())
return; /* TODO any kind of error or warning? */
#if 0
auto socket = client->connection->socket_id();
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return; /* TODO any kind of error or warning? */
auto event = event_loop->events[socket];
if(!event->event_write)
return; /* TODO any kind of error or warning? */
event->push_voice_write_queue(client);
event_add(event->event_write, nullptr);
#endif
}
void IOServerHandler::send_datagram(server::udp::DatagramPacket* datagram, int socket) {
if(this->event_loop_events.empty())
return; /* TODO any kind of error or warning? */
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return; /* TODO any kind of error or warning? */
auto event = event_loop->events[socket];
if(!event->event_write)
return; /* TODO any kind of error or warning? */
if(datagram)
event->push_dg_write_queue(datagram);
event_add(event->event_write, nullptr);
}
void IOEventLoopEvents::spawn() {
#if 0
for(const auto& binding : this->owner->server->getVoiceServer()->activeBindings()) {
auto entry = make_shared<IOEventLoopEntry>();
entry->file_descriptor = binding->file_descriptor;
entry->handle = this;
entry->server = this->owner->server;
entry->family = binding->address.ss_family;
entry->voice_server = &*this->owner->server->getVoiceServer();
entry->event_read = event_new(this->event_loop->base, binding->file_descriptor, EV_READ | EV_PERSIST, VoiceServer::handleMessageRead, &*entry);
entry->event_write = event_new(this->event_loop->base, binding->file_descriptor, EV_WRITE, VoiceServer::handleMessageWrite, &*entry);
this->events.push_back(entry);
entry->socket_id = (int) this->events.size() - 1;
{
lock_guard lock(event_loop->events_lock);
this->event_loop->assigned_events.push_back(entry);
}
}
#endif
}
void IOEventLoopEvents::despawn() {
{
lock_guard lock(event_loop->events_lock);
for(const auto& event : this->events) {
auto& event_loop_events = this->event_loop->assigned_events;
assert(std::find(event_loop_events.begin(), event_loop_events.end(), event) != event_loop_events.end());
event_loop_events.erase(std::find(event_loop_events.begin(), event_loop_events.end(), event));
}
}
this->events.clear();
}
void VoiceIOManager::dispatchBase(shared_ptr<IOEventLoop> self) {
debugMessage(LOG_INSTANCE, "Dispatching io base {}", (void*) self->base);
event_base_loop(self->base, EVLOOP_NO_EXIT_ON_EMPTY);
debugMessage(LOG_INSTANCE, "Dispatching io base {} finished", (void*) self->base);
{
lock_guard executor_lock(this->executorLock);
auto found = std::find(this->event_loops.begin(), this->event_loops.end(), self);
if(found != this->event_loops.end()) {
this->event_loops.erase(found);
} else {
logCritical(LOG_INSTANCE, "Could not find executor in executor registry ({})!", (void*) self->base);
}
if(!self->assigned_events.empty()) {
logError(LOG_INSTANCE, "Event loop exited, but sill containing some events ({})!", self->assigned_events.size());
}
event_base_free(self->base);
self->base = nullptr;
this->ioExecutorNotify.notify_all(); /* let everybody know we're done */
}
}