Removed the old voice io

This commit is contained in:
WolverinDEV 2021-04-15 17:34:44 +02:00
parent cd5804125d
commit 63eccff578
10 changed files with 33 additions and 581 deletions

View File

@ -117,7 +117,6 @@ set(SERVER_SOURCE_FILES
src/client/query/XMacroEventTypes.h
src/server/VoiceIOManager.cpp
src/client/SpeakingClient.cpp
../shared/src/ssl/SSLManager.cpp

View File

@ -15,7 +15,6 @@ using namespace ts::server;
VirtualServerManager::VirtualServerManager(InstanceHandler* handle) : handle(handle) {
this->puzzles = new udp::PuzzleManager{};
this->_ioManager = new io::VoiceIOManager();
}
VirtualServerManager::~VirtualServerManager() {
@ -35,10 +34,6 @@ VirtualServerManager::~VirtualServerManager() {
delete this->puzzles;
this->puzzles = nullptr;
if(this->_ioManager) this->_ioManager->shutdownGlobally();
delete this->_ioManager;
this->_ioManager = nullptr;
}
bool VirtualServerManager::initialize(bool autostart) {
@ -280,7 +275,7 @@ ts::ServerId VirtualServerManager::next_available_server_id(bool& success) {
ServerReport VirtualServerManager::report() {
ServerReport result{};
for(const auto& sr : this->serverInstances()) {
result.avariable++;
result.available++;
if(sr->running()) {
result.online++;
result.slots += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as_or<size_t>(0);

View File

@ -3,7 +3,6 @@
#include <deque>
#include <EventLoop.h>
#include "src/server/PrecomputedPuzzles.h"
#include "server/VoiceIOManager.h"
#include "VirtualServer.h"
#include <query/command3.h>
#include "snapshots/snapshot.h"
@ -12,7 +11,7 @@ namespace ts::server {
class InstanceHandler;
struct ServerReport {
size_t avariable;
size_t available;
size_t online;
size_t slots;
@ -70,8 +69,6 @@ namespace ts::server {
udp::PuzzleManager* rsaPuzzles() { return this->puzzles; }
io::VoiceIOManager* ioManager(){ return this->_ioManager; }
/* This must be recursive */
threads::Mutex server_create_lock;
@ -83,9 +80,6 @@ namespace ts::server {
std::deque<std::shared_ptr<VirtualServer>> instances;
udp::PuzzleManager* puzzles{nullptr};
event::EventExecutor* join_loop = nullptr;
io::VoiceIOManager* _ioManager = nullptr;
struct {
std::thread executor{};
std::condition_variable condition;

View File

@ -71,7 +71,6 @@ namespace ts {
class ConnectedClient : public DataClient {
friend class VirtualServer;
friend class VoiceServer;
friend class VoiceClient;
friend class MusicClient;
friend class WebClient;

View File

@ -19,7 +19,7 @@ constexpr static auto kMaxWhisperClientNameLength{30};
constexpr static auto kWhisperClientUniqueIdLength{28}; /* base64 encoded SHA1 hash */
VoiceClient::VoiceClient(const std::shared_ptr<VoiceServer>& server, const sockaddr_storage* address) :
SpeakingClient{server->server->sql, server->server},
SpeakingClient{server->get_server()->sql, server->get_server()},
voice_server(server) {
assert(address);
memtrack::allocated<VoiceClient>(this);

View File

@ -15,7 +15,6 @@
#include "VoiceClientConnection.h"
#include "src/server/PrecomputedPuzzles.h"
#include "../../lincense/TeamSpeakLicense.h"
#include "src/client/shared/ServerCommandExecutor.h"
//#define LOG_INCOMPING_PACKET_FRAGMENTS
//#define LOG_AUTO_ACK_AUTORESPONSE
@ -38,7 +37,6 @@ namespace ts {
}
namespace server {
namespace server::udp {
class ServerCommandExecutor;
class CryptSetupHandler;
}
@ -51,12 +49,8 @@ namespace ts {
friend class POWHandler;
friend class ts::connection::VoiceClientConnection;
friend class ConnectedClient;
friend class io::IOServerHandler;
friend class server::udp::ServerCommandExecutor;
friend class server::udp::CryptSetupHandler;
friend class VoiceClientCommandHandler;
using ServerCommandExecutor = ts::server::ServerCommandQueue;
public:
VoiceClient(const std::shared_ptr<VoiceServer>& server, const sockaddr_storage*);
~VoiceClient() override;

View File

@ -1,330 +0,0 @@
#include <event.h>
#include <algorithm>
#include <log/LogUtils.h>
#include "src/VirtualServer.h"
#include "VoiceIOManager.h"
#include "VoiceServer.h"
#include "src/client/voice/VoiceClient.h"
using namespace std;
using namespace std::chrono;
using namespace ts;
using namespace ts::io;
using namespace ts::server;
VoiceIOManager::VoiceIOManager(){ }
VoiceIOManager::~VoiceIOManager() { }
std::shared_ptr<IOServerHandler> VoiceIOManager::enableIo(server::VirtualServer *server) {
auto server_io = std::make_shared<IOServerHandler>(server);
this->adjustExecutors(this->servers.size() + 1);
std::vector<shared_ptr<IOEventLoop>> use_list;
use_list.reserve(config::threads::voice::events_per_server);
lock_guard<mutex> executor_lock(this->executorLock);
for(size_t i = 0; i < config::threads::voice::events_per_server; i++){
auto loop = this->less_used_io_loop(use_list);
if(!loop) break; //No more loops open
server_io->create_event_loop_events(loop)->activate();
use_list.push_back(loop);
}
{
threads::MutexLock l(this->serverLock);
this->servers.push_back(server_io);
}
this->ioExecutorNotify.notify_all();
return server_io;
}
void VoiceIOManager::disableIo(server::VirtualServer* server) {
std::shared_ptr<IOServerHandler> server_io;
{
threads::MutexLock l(this->serverLock);
for(const auto& sio : this->servers) {
if(sio->server == server) {
server_io = sio;
break;
}
}
if(!server_io) return;
this->servers.erase(std::find(this->servers.begin(), this->servers.end(), server_io),this->servers.end());
}
for(const auto& entry : server_io->event_loop_events) {
entry->disable();
entry->despawn();
}
server_io->event_loop_events.clear();
this->adjustExecutors(this->servers.size());
}
void VoiceIOManager::shutdownGlobally() {
/* Unregister all servers */
{
lock_guard server_lock(this->serverLock);
for(const auto& server : this->servers)
for(const auto& loop : server->event_loop_events){
loop->disable();
loop->despawn();
}
}
/* shutting down event loops */
{
lock_guard<mutex> executor_lock(this->executorLock);
for(const auto& loop : this->event_loops) {
loop->shutdown = true;
event_base_loopexit(loop->base, nullptr);
}
this->ioExecutorNotify.notify_all();
}
/* keep a ref to all event loops so they dont despawn in their event thread */
unique_lock executor_lock{this->executorLock};
auto wait_end = system_clock::now() + chrono::seconds{5};
while(true) {
if(this->event_loops.empty()) {
break;
}
auto status = this->ioExecutorNotify.wait_until(executor_lock, wait_end);
if(status == std::cv_status::timeout) {
logCritical(LOG_GENERAL,
"Failed to shutdown all event loops successfully. After timeout {} loops are left.",
this->event_loops.size()
);
break;
}
}
/* now delete all loops */
this->event_loops.clear();
}
//TODO also reduce thread pool!
void VoiceIOManager::adjustExecutors(size_t size) {
lock_guard<mutex> l(this->executorLock);
size_t targetThreads = size * config::threads::voice::events_per_server;
if(targetThreads > config::threads::voice::io_limit)
targetThreads = config::threads::voice::io_limit;
if(targetThreads < config::threads::voice::io_min)
targetThreads = config::threads::voice::io_min;
while(this->event_loops.size() < targetThreads) {
spawnEventLoop();
}
}
IOEventLoop::~IOEventLoop() {
assert(this_thread::get_id() != this->executor.get_id());
assert(!this->executor.joinable());
}
/**
* @warning executor lock must be locked!
*/
std::shared_ptr<IOEventLoop> VoiceIOManager::spawnEventLoop() {
std::shared_ptr<IOEventLoop> loop = std::make_shared<IOEventLoop>();
loop->base = event_base_new();
loop->executor = std::thread(&VoiceIOManager::dispatchBase, this, loop);
loop->bound_thread = -1;
{
#ifndef WIN32
const auto name = "IO exec #" + to_string(this->event_loops.size());
pthread_setname_np(loop->executor.native_handle(), name.c_str());
#endif
const auto num_threads = std::thread::hardware_concurrency();
if(num_threads != 0 && config::threads::voice::bind_io_thread_to_kernel_thread) {
auto thread_usage = new uint8_t[num_threads];
memset(thread_usage, 0, num_threads);
for(auto& ev_loop : this->event_loops) {
if(ev_loop->bound_thread < 0 || ev_loop->bound_thread >= num_threads) {
continue;
}
thread_usage[ev_loop->bound_thread]++;
}
int thread_index = 0;
int thread_use_count = 256;
for(int index = 0; index < num_threads; index++)
if(thread_usage[index] < thread_use_count) {
thread_use_count = thread_usage[index];
thread_index = index;
}
debugMessage(0, "Binding event loop '{}' to CPU thread {}. Current bound threads: {}", name, thread_index, thread_use_count);
delete[] thread_usage;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(thread_index, &cpuset);
auto err = pthread_setaffinity_np(loop->executor.native_handle(), sizeof(cpu_set_t), &cpuset);
if(err != 0) {
logError(0, "Failed to bind IO event loop '{}' to kernel thread. Code: {} => {}/{}", name, err, errno, strerror(errno));
}
loop->bound_thread = thread_index;
}
}
this->event_loops.push_back(loop);
return loop;
}
std::shared_ptr<IOEventLoop> VoiceIOManager::less_used_io_loop(vector<shared_ptr<IOEventLoop>> &blacklist) {
std::shared_ptr<IOEventLoop> current;
for(const auto& loop : this->event_loops)
if(!current || loop->assigned_events.size() < current->assigned_events.size()) {
for(const auto& elm : blacklist) if(elm == loop) goto skipSet;
current = loop;
skipSet:;
}
return current;
}
IOServerHandler::IOServerHandler(server::VirtualServer* server) : server(server) { }
IOServerHandler::~IOServerHandler() {
for(const auto& entry : this->event_loop_events) {
entry->disable();
entry->despawn();
}
this->event_loop_events.clear();
}
std::shared_ptr<IOEventLoopEvents> IOServerHandler::create_event_loop_events(const std::shared_ptr<IOEventLoop> &loop) {
std::shared_ptr<IOEventLoopEvents> entry = std::make_shared<IOEventLoopEvents>();
entry->owner = this;
entry->event_loop = loop;
entry->spawn();
this->event_loop_events.push_back(entry);
return entry;
}
int IOServerHandler::resolve_file_descriptor(const std::shared_ptr<ts::server::VoiceClient> &client) {
if(this->event_loop_events.empty())
return -1;
#if 0
auto socket = client->connection->socket_id();
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return -1;
return event_loop->events[socket]->file_descriptor;
#endif
return -1;
}
void IOServerHandler::invoke_write(const std::shared_ptr<ts::server::VoiceClient> &client) {
if(this->event_loop_events.empty())
return; /* TODO any kind of error or warning? */
#if 0
auto socket = client->connection->socket_id();
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return; /* TODO any kind of error or warning? */
auto event = event_loop->events[socket];
if(!event->event_write)
return; /* TODO any kind of error or warning? */
event->push_voice_write_queue(client);
event_add(event->event_write, nullptr);
#endif
}
void IOServerHandler::send_datagram(server::udp::DatagramPacket* datagram, int socket) {
if(this->event_loop_events.empty())
return; /* TODO any kind of error or warning? */
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return; /* TODO any kind of error or warning? */
auto event = event_loop->events[socket];
if(!event->event_write)
return; /* TODO any kind of error or warning? */
if(datagram)
event->push_dg_write_queue(datagram);
event_add(event->event_write, nullptr);
}
void IOEventLoopEvents::spawn() {
#if 0
for(const auto& binding : this->owner->server->getVoiceServer()->activeBindings()) {
auto entry = make_shared<IOEventLoopEntry>();
entry->file_descriptor = binding->file_descriptor;
entry->handle = this;
entry->server = this->owner->server;
entry->family = binding->address.ss_family;
entry->voice_server = &*this->owner->server->getVoiceServer();
entry->event_read = event_new(this->event_loop->base, binding->file_descriptor, EV_READ | EV_PERSIST, VoiceServer::handleMessageRead, &*entry);
entry->event_write = event_new(this->event_loop->base, binding->file_descriptor, EV_WRITE, VoiceServer::handleMessageWrite, &*entry);
this->events.push_back(entry);
entry->socket_id = (int) this->events.size() - 1;
{
lock_guard lock(event_loop->events_lock);
this->event_loop->assigned_events.push_back(entry);
}
}
#endif
}
void IOEventLoopEvents::despawn() {
{
lock_guard lock(event_loop->events_lock);
for(const auto& event : this->events) {
auto& event_loop_events = this->event_loop->assigned_events;
assert(std::find(event_loop_events.begin(), event_loop_events.end(), event) != event_loop_events.end());
event_loop_events.erase(std::find(event_loop_events.begin(), event_loop_events.end(), event));
}
}
this->events.clear();
}
void VoiceIOManager::dispatchBase(shared_ptr<IOEventLoop> self) {
debugMessage(LOG_INSTANCE, "Dispatching io base {}", (void*) self->base);
event_base_loop(self->base, EVLOOP_NO_EXIT_ON_EMPTY);
debugMessage(LOG_INSTANCE, "Dispatching io base {} finished", (void*) self->base);
{
lock_guard executor_lock(this->executorLock);
auto found = std::find(this->event_loops.begin(), this->event_loops.end(), self);
if(found != this->event_loops.end()) {
this->event_loops.erase(found);
} else {
logCritical(LOG_INSTANCE, "Could not find executor in executor registry ({})!", (void*) self->base);
}
if(!self->assigned_events.empty()) {
logError(LOG_INSTANCE, "Event loop exited, but sill containing some events ({})!", self->assigned_events.size());
}
event_base_free(self->base);
self->base = nullptr;
this->ioExecutorNotify.notify_all(); /* let everybody know we're done */
}
}

View File

@ -1,204 +0,0 @@
#pragma once
#include <deque>
#include <event.h>
#include <ThreadPool/Thread.h>
#include <condition_variable>
#include <pipes/buffer.h>
#include <misc/spin_mutex.h>
#include <ThreadPool/Mutex.h>
#include <src/server/voice/DatagramPacket.h>
namespace ts {
namespace server {
class VirtualServer;
class VoiceServer;
class VoiceClient;
}
namespace io {
class VoiceIOManager;
class IOServerHandler;
struct IOEventLoopEntry;
struct IOEventLoopEvents;
struct IOEventLoop {
IOEventLoop() = default;
~IOEventLoop();
int bound_thread = -1; /* -1 represents that this loop is bound to no thread at all */
bool shutdown = false;
event_base* base = nullptr;
std::thread executor;
std::mutex events_lock;
std::deque<std::shared_ptr<IOEventLoopEntry>> assigned_events;
};
struct IOEventLoopEntry {
IOEventLoopEvents* handle;
int socket_id = 0;
sa_family_t family;
/* keeps these addresses in "hot" memory instead resolving three ptr */
server::VirtualServer* server;
server::VoiceServer* voice_server;
int file_descriptor = 0; /* actual socket */
::event* event_read = nullptr;
::event* event_write = nullptr;
spin_mutex write_queue_lock;
server::udp::DatagramPacket* dg_write_queue_head = nullptr;
server::udp::DatagramPacket* dg_write_queue_tail = nullptr;
std::deque<std::weak_ptr<server::VoiceClient>> voice_write_queue;
inline server::udp::DatagramPacket* pop_dg_write_queue() {
std::lock_guard lock(this->write_queue_lock);
if(!this->dg_write_queue_head)
return nullptr;
auto packet = this->dg_write_queue_head;
if(packet == this->dg_write_queue_tail) {
this->dg_write_queue_tail = nullptr;
this->dg_write_queue_head = nullptr;
} else {
this->dg_write_queue_head = packet->next_packet;
}
return packet;
}
inline void push_dg_write_queue(server::udp::DatagramPacket* packet) {
assert(!packet->next_packet);
packet->next_packet = nullptr;
std::lock_guard lock(this->write_queue_lock);
if(this->dg_write_queue_tail) {
this->dg_write_queue_tail->next_packet = packet;
} else {
this->dg_write_queue_head = packet;
}
this->dg_write_queue_tail = packet;
}
inline void push_voice_write_queue(const std::shared_ptr<server::VoiceClient>& client) {
std::lock_guard lock(this->write_queue_lock);
this->voice_write_queue.push_back(client);
}
/* return 0 on success | 1 on there is more, but success | 2 on empty */
inline int pop_voice_write_queue(std::shared_ptr<server::VoiceClient>& result) {
std::lock_guard lock(this->write_queue_lock);
auto it_begin = this->voice_write_queue.begin();
auto it_end = this->voice_write_queue.end();
auto it = it_begin;
while(it != it_end) {
result = it->lock();
if(result) {
this->voice_write_queue.erase(it_begin, ++it);
return (int) (it != it_end);
}
it++;
}
if(it_begin != it_end) {
this->voice_write_queue.erase(it_begin, it_end);
}
return 2;
}
~IOEventLoopEntry() {
if(this->event_read)
event_free(this->event_read);
if(this->event_write)
event_free(this->event_write);
}
};
struct IOEventLoopEvents {
IOServerHandler* owner = nullptr;
std::shared_ptr<IOEventLoop> event_loop;
std::deque<std::shared_ptr<IOEventLoopEntry>> events;
void spawn();
void despawn();
inline void activate(){
for(const auto& event : events) {
if(event->event_read) {
event_add(event->event_read, nullptr);
}
}
}
inline void disable(bool blocking = true){
for(const auto& event : events) {
if(event->event_read) {
event_del(event->event_read);
event_del_block(event->event_read);
}
if(event->event_write) {
event_del(event->event_write);
event_del_block(event->event_write);
}
}
}
inline IOEventLoopEntry* event(int fd) {
for(const auto& entry : this->events)
if(entry->file_descriptor == fd) return entry.get();
return nullptr;
}
};
class IOServerHandler {
friend class VoiceIOManager;
friend struct IOEventLoopEvents;
public:
explicit IOServerHandler(server::VirtualServer*);
~IOServerHandler();
void invoke_write(const std::shared_ptr<server::VoiceClient>& /* client */);
int resolve_file_descriptor(const std::shared_ptr<server::VoiceClient>& /* client */);
void send_datagram(server::udp::DatagramPacket* /* packet */, int /* socket */);
private:
std::shared_ptr<IOEventLoopEvents> create_event_loop_events(const std::shared_ptr<IOEventLoop> &);
server::VirtualServer* server = nullptr;
std::deque<std::shared_ptr<IOEventLoopEvents>> event_loop_events;
size_t event_loop_index = 0;
};
class VoiceIOManager {
public:
VoiceIOManager();
virtual ~VoiceIOManager();
std::shared_ptr<IOServerHandler> enableIo(server::VirtualServer* server);
void disableIo(server::VirtualServer*);
void shutdownGlobally();
private:
std::shared_ptr<IOEventLoop> less_used_io_loop(std::vector<std::shared_ptr<IOEventLoop>>&);
threads::Mutex serverLock;
std::deque<std::shared_ptr<IOServerHandler>> servers;
std::mutex executorLock;
/* will be called as soon servers have been added or an event loop has been finished */
std::condition_variable ioExecutorNotify;
std::deque<std::shared_ptr<IOEventLoop>> event_loops;
void adjustExecutors(size_t);
std::shared_ptr<IOEventLoop> spawnEventLoop();
void dispatchBase(std::shared_ptr<IOEventLoop>);
};
}
}

View File

@ -9,7 +9,6 @@
#include <misc/net.h>
#include <protocol/ringbuffer.h>
#include <misc/task_executor.h>
#include "VoiceIOManager.h"
#include "./voice/DatagramPacket.h"
#include "Definitions.h"
#include <shared_mutex>
@ -17,12 +16,16 @@
namespace ts {
namespace server {
class VirtualServer;
class ConnectedClient;
class VoiceServer;
class VoiceClient;
class POWHandler;
class VoiceServerSocket : public std::enable_shared_from_this<VoiceServerSocket> {
public:
/**
* Note: Only access event_read or event_write when the socket mutex is acquired or
* or within the event loop!
*/
struct NetworkEvents {
VoiceServerSocket* socket;
struct event* event_read{nullptr};
@ -129,7 +132,7 @@ namespace ts {
/**
* Enqueue a write event.
* Attention: The mutex should be locked!
* Attention: The socket mutex must be locked!
*/
inline void enqueue_network_write() {
assert(!this->network_events.empty());
@ -143,16 +146,13 @@ namespace ts {
class VoiceServer {
friend class VoiceServerSocket;
friend class VoiceClient; /* Not needed any more */
friend class io::VoiceIOManager; /* Not needed any more */
friend struct io::IOEventLoopEvents; /* Not needed any more */
friend class POWHandler; /* TODO: Still needed? May use some kind of callback */
public:
explicit VoiceServer(const std::shared_ptr<VirtualServer>& server);
~VoiceServer();
bool start(const std::deque<sockaddr_storage>&, std::string&);
bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000));
bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds{1000});
[[nodiscard]] std::shared_ptr<VoiceClient> findClient(ClientId);
[[nodiscard]] std::shared_ptr<VoiceClient> findClient(sockaddr_in* addr, bool lock);

View File

@ -22,7 +22,7 @@ using namespace ts;
VoiceServerSocket::NetworkEvents::~NetworkEvents() {
auto event_read_ = std::exchange(this->event_read, nullptr);
auto event_write_ = std::exchange(this->event_read, nullptr);
auto event_write_ = std::exchange(this->event_write, nullptr);
if(event_read_) {
event_free(event_read_);
@ -132,33 +132,38 @@ bool VoiceServerSocket::activate(std::string &error) {
}
void VoiceServerSocket::deactivate() {
for(const auto& binding : this->network_events) {
std::unique_lock write_lock{this->mutex};
auto network_events_ = std::move(this->network_events);
auto file_descriptor_ = std::exchange(this->file_descriptor, 0);
this->write_client_queue.clear();
while(this->write_datagram_head) {
auto datagram = std::exchange(this->write_datagram_head, this->write_datagram_head->next_packet);
udp::DatagramPacket::destroy(datagram);
}
this->write_datagram_tail = &this->write_datagram_head;
write_lock.unlock();
/*
* Finish all active/pending events before we clear them.
* Since we moved these events out of network_events the can't get rescheduled.
*/
for(const auto& binding : network_events_) {
if(binding->event_read) {
event_del_block(binding->event_read);
event_free(binding->event_read);
}
if(binding->event_write) {
event_del_block(binding->event_write);
event_free(binding->event_write);
}
}
{
std::lock_guard write_lock{this->mutex};
this->network_events.clear();
/* Will free all events. */
network_events_.clear();
if(this->file_descriptor > 0) {
::close(this->file_descriptor);
this->file_descriptor = 0;
}
this->write_client_queue.clear();
while(this->write_datagram_head) {
auto datagram = std::exchange(this->write_datagram_head, this->write_datagram_head->next_packet);
udp::DatagramPacket::destroy(datagram);
}
this->write_datagram_tail = &this->write_datagram_head;
/* Close the file descriptor after all network events have been finished*/
if(file_descriptor_ > 0) {
::close(file_descriptor_);
}
}