Teaspeak-Server/server/src/server/VoiceIOManager.h

200 lines
7.0 KiB
C++

#pragma once
#include <deque>
#include <event.h>
#include <ThreadPool/Thread.h>
#include <condition_variable>
#include <pipes/buffer.h>
#include <misc/spin_mutex.h>
#include <ThreadPool/Mutex.h>
#include <src/server/voice/DatagramPacket.h>
namespace ts {
namespace server {
class VirtualServer;
class VoiceServer;
class VoiceClient;
}
namespace io {
class VoiceIOManager;
class IOServerHandler;
struct IOEventLoopEntry;
struct IOEventLoopEvents;
struct IOEventLoop {
IOEventLoop() = default;
~IOEventLoop();
int bound_thread = -1; /* -1 represents that this loop is bound to no thread at all */
bool shutdown = false;
event_base* base = nullptr;
std::thread executor;
std::mutex events_lock;
std::deque<std::shared_ptr<IOEventLoopEntry>> assigned_events;
};
struct IOEventLoopEntry {
IOEventLoopEvents* handle;
int socket_id = 0;
sa_family_t family;
/* keeps these addresses in "hot" memory instead resolving three ptr */
server::VirtualServer* server;
server::VoiceServer* voice_server;
int file_descriptor = 0; /* actual socket */
::event* event_read = nullptr;
::event* event_write = nullptr;
spin_mutex write_queue_lock;
server::udp::DatagramPacket* dg_write_queue_head = nullptr;
server::udp::DatagramPacket* dg_write_queue_tail = nullptr;
std::deque<std::weak_ptr<server::VoiceClient>> voice_write_queue;
inline server::udp::DatagramPacket* pop_dg_write_queue() {
std::lock_guard lock(this->write_queue_lock);
if(!this->dg_write_queue_head)
return nullptr;
auto packet = this->dg_write_queue_head;
if(packet == this->dg_write_queue_tail) {
this->dg_write_queue_tail = nullptr;
this->dg_write_queue_head = nullptr;
} else {
this->dg_write_queue_head = packet->next_packet;
}
return packet;
}
inline void push_dg_write_queue(server::udp::DatagramPacket* packet) {
std::lock_guard lock(this->write_queue_lock);
if(this->dg_write_queue_tail) {
this->dg_write_queue_tail->next_packet = packet;
} else {
this->dg_write_queue_head = packet;
}
this->dg_write_queue_tail = packet;
}
inline void push_voice_write_queue(const std::shared_ptr<server::VoiceClient>& client) {
std::lock_guard lock(this->write_queue_lock);
this->voice_write_queue.push_back(client);
}
/* return 0 on success | 1 on there is more, but success | 2 on empty */
inline int pop_voice_write_queue(std::shared_ptr<server::VoiceClient>& result) {
std::lock_guard lock(this->write_queue_lock);
auto it_begin = this->voice_write_queue.begin();
auto it_end = this->voice_write_queue.end();
auto it = it_begin;
while(it != it_end) {
result = it->lock();
if(result) {
this->voice_write_queue.erase(it_begin, ++it);
return (int) (it != it_end);
}
it++;
}
if(it_begin != it_end)
this->voice_write_queue.erase(it_begin, it_end);
return 2;
}
~IOEventLoopEntry() {
if(this->event_read)
event_free(this->event_read);
if(this->event_write)
event_free(this->event_write);
}
};
struct IOEventLoopEvents {
IOServerHandler* owner = nullptr;
std::shared_ptr<IOEventLoop> event_loop;
std::deque<std::shared_ptr<IOEventLoopEntry>> events;
void spawn();
void despawn();
inline void activate(){
for(const auto& event : events) {
if(event->event_read) {
event_add(event->event_read, nullptr);
}
}
}
inline void disable(bool blocking = true){
for(const auto& event : events) {
if(event->event_read) {
event_del(event->event_read);
event_del_block(event->event_read);
}
if(event->event_write) {
event_del(event->event_write);
event_del_block(event->event_write);
}
}
}
inline IOEventLoopEntry* event(int fd) {
for(const auto& entry : this->events)
if(entry->file_descriptor == fd) return entry.get();
return nullptr;
}
};
class IOServerHandler {
friend class VoiceIOManager;
friend struct IOEventLoopEvents;
public:
explicit IOServerHandler(server::VirtualServer*);
~IOServerHandler();
void invoke_write(const std::shared_ptr<server::VoiceClient>& /* client */);
int resolve_file_descriptor(const std::shared_ptr<server::VoiceClient>& /* client */);
void send_datagram(server::udp::DatagramPacket* /* packet */, int /* socket */);
private:
std::shared_ptr<IOEventLoopEvents> create_event_loop_events(const std::shared_ptr<IOEventLoop> &);
server::VirtualServer* server = nullptr;
std::deque<std::shared_ptr<IOEventLoopEvents>> event_loop_events;
size_t event_loop_index = 0;
};
class VoiceIOManager {
public:
VoiceIOManager();
virtual ~VoiceIOManager();
std::shared_ptr<IOServerHandler> enableIo(server::VirtualServer* server);
void disableIo(server::VirtualServer*);
void shutdownGlobally();
private:
std::shared_ptr<IOEventLoop> less_used_io_loop(std::vector<std::shared_ptr<IOEventLoop>>&);
threads::Mutex serverLock;
std::deque<std::shared_ptr<IOServerHandler>> servers;
std::mutex executorLock;
/* will be called as soon servers have been added or an event loop has been finished */
std::condition_variable ioExecutorNotify;
std::deque<std::shared_ptr<IOEventLoop>> event_loops;
void adjustExecutors(size_t);
std::shared_ptr<IOEventLoop> spawnEventLoop();
void dispatchBase(std::shared_ptr<IOEventLoop>);
};
}
}