diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 22b44f7..f36e154 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -117,7 +117,6 @@ set(SERVER_SOURCE_FILES src/client/query/XMacroEventTypes.h - src/server/VoiceIOManager.cpp src/client/SpeakingClient.cpp ../shared/src/ssl/SSLManager.cpp diff --git a/server/src/VirtualServerManager.cpp b/server/src/VirtualServerManager.cpp index cbf323d..5cf4c44 100644 --- a/server/src/VirtualServerManager.cpp +++ b/server/src/VirtualServerManager.cpp @@ -15,7 +15,6 @@ using namespace ts::server; VirtualServerManager::VirtualServerManager(InstanceHandler* handle) : handle(handle) { this->puzzles = new udp::PuzzleManager{}; - this->_ioManager = new io::VoiceIOManager(); } VirtualServerManager::~VirtualServerManager() { @@ -35,10 +34,6 @@ VirtualServerManager::~VirtualServerManager() { delete this->puzzles; this->puzzles = nullptr; - - if(this->_ioManager) this->_ioManager->shutdownGlobally(); - delete this->_ioManager; - this->_ioManager = nullptr; } bool VirtualServerManager::initialize(bool autostart) { @@ -280,7 +275,7 @@ ts::ServerId VirtualServerManager::next_available_server_id(bool& success) { ServerReport VirtualServerManager::report() { ServerReport result{}; for(const auto& sr : this->serverInstances()) { - result.avariable++; + result.available++; if(sr->running()) { result.online++; result.slots += sr->properties()[property::VIRTUALSERVER_MAXCLIENTS].as_or(0); diff --git a/server/src/VirtualServerManager.h b/server/src/VirtualServerManager.h index 900cdb5..a18f8bb 100644 --- a/server/src/VirtualServerManager.h +++ b/server/src/VirtualServerManager.h @@ -3,7 +3,6 @@ #include #include #include "src/server/PrecomputedPuzzles.h" -#include "server/VoiceIOManager.h" #include "VirtualServer.h" #include #include "snapshots/snapshot.h" @@ -12,7 +11,7 @@ namespace ts::server { class InstanceHandler; struct ServerReport { - size_t avariable; + size_t available; size_t online; size_t slots; @@ -70,8 +69,6 @@ namespace ts::server { udp::PuzzleManager* rsaPuzzles() { return this->puzzles; } - io::VoiceIOManager* ioManager(){ return this->_ioManager; } - /* This must be recursive */ threads::Mutex server_create_lock; @@ -83,9 +80,6 @@ namespace ts::server { std::deque> instances; udp::PuzzleManager* puzzles{nullptr}; - event::EventExecutor* join_loop = nullptr; - io::VoiceIOManager* _ioManager = nullptr; - struct { std::thread executor{}; std::condition_variable condition; diff --git a/server/src/client/ConnectedClient.h b/server/src/client/ConnectedClient.h index f41b4e6..4740c17 100644 --- a/server/src/client/ConnectedClient.h +++ b/server/src/client/ConnectedClient.h @@ -71,7 +71,6 @@ namespace ts { class ConnectedClient : public DataClient { friend class VirtualServer; - friend class VoiceServer; friend class VoiceClient; friend class MusicClient; friend class WebClient; diff --git a/server/src/client/voice/VoiceClient.cpp b/server/src/client/voice/VoiceClient.cpp index 7ba5231..b5c99bf 100644 --- a/server/src/client/voice/VoiceClient.cpp +++ b/server/src/client/voice/VoiceClient.cpp @@ -19,7 +19,7 @@ constexpr static auto kMaxWhisperClientNameLength{30}; constexpr static auto kWhisperClientUniqueIdLength{28}; /* base64 encoded SHA1 hash */ VoiceClient::VoiceClient(const std::shared_ptr& server, const sockaddr_storage* address) : - SpeakingClient{server->server->sql, server->server}, + SpeakingClient{server->get_server()->sql, server->get_server()}, voice_server(server) { assert(address); memtrack::allocated(this); diff --git a/server/src/client/voice/VoiceClient.h b/server/src/client/voice/VoiceClient.h index aa51650..1bcabcf 100644 --- a/server/src/client/voice/VoiceClient.h +++ b/server/src/client/voice/VoiceClient.h @@ -15,7 +15,6 @@ #include "VoiceClientConnection.h" #include "src/server/PrecomputedPuzzles.h" #include "../../lincense/TeamSpeakLicense.h" -#include "src/client/shared/ServerCommandExecutor.h" //#define LOG_INCOMPING_PACKET_FRAGMENTS //#define LOG_AUTO_ACK_AUTORESPONSE @@ -38,7 +37,6 @@ namespace ts { } namespace server { namespace server::udp { - class ServerCommandExecutor; class CryptSetupHandler; } @@ -51,12 +49,8 @@ namespace ts { friend class POWHandler; friend class ts::connection::VoiceClientConnection; friend class ConnectedClient; - friend class io::IOServerHandler; - friend class server::udp::ServerCommandExecutor; friend class server::udp::CryptSetupHandler; friend class VoiceClientCommandHandler; - - using ServerCommandExecutor = ts::server::ServerCommandQueue; public: VoiceClient(const std::shared_ptr& server, const sockaddr_storage*); ~VoiceClient() override; diff --git a/server/src/server/VoiceIOManager.cpp b/server/src/server/VoiceIOManager.cpp deleted file mode 100644 index d5bf750..0000000 --- a/server/src/server/VoiceIOManager.cpp +++ /dev/null @@ -1,330 +0,0 @@ -#include -#include -#include -#include "src/VirtualServer.h" -#include "VoiceIOManager.h" -#include "VoiceServer.h" -#include "src/client/voice/VoiceClient.h" - -using namespace std; -using namespace std::chrono; -using namespace ts; -using namespace ts::io; -using namespace ts::server; - -VoiceIOManager::VoiceIOManager(){ } - -VoiceIOManager::~VoiceIOManager() { } - -std::shared_ptr VoiceIOManager::enableIo(server::VirtualServer *server) { - auto server_io = std::make_shared(server); - - this->adjustExecutors(this->servers.size() + 1); - - std::vector> use_list; - use_list.reserve(config::threads::voice::events_per_server); - - lock_guard executor_lock(this->executorLock); - for(size_t i = 0; i < config::threads::voice::events_per_server; i++){ - auto loop = this->less_used_io_loop(use_list); - if(!loop) break; //No more loops open - - server_io->create_event_loop_events(loop)->activate(); - use_list.push_back(loop); - } - - { - threads::MutexLock l(this->serverLock); - this->servers.push_back(server_io); - } - this->ioExecutorNotify.notify_all(); - - return server_io; -} - -void VoiceIOManager::disableIo(server::VirtualServer* server) { - std::shared_ptr server_io; - { - threads::MutexLock l(this->serverLock); - for(const auto& sio : this->servers) { - if(sio->server == server) { - server_io = sio; - break; - } - } - if(!server_io) return; - this->servers.erase(std::find(this->servers.begin(), this->servers.end(), server_io),this->servers.end()); - } - - for(const auto& entry : server_io->event_loop_events) { - entry->disable(); - entry->despawn(); - } - - server_io->event_loop_events.clear(); - this->adjustExecutors(this->servers.size()); -} - -void VoiceIOManager::shutdownGlobally() { - /* Unregister all servers */ - { - lock_guard server_lock(this->serverLock); - for(const auto& server : this->servers) - for(const auto& loop : server->event_loop_events){ - loop->disable(); - loop->despawn(); - } - } - - /* shutting down event loops */ - { - lock_guard executor_lock(this->executorLock); - for(const auto& loop : this->event_loops) { - loop->shutdown = true; - event_base_loopexit(loop->base, nullptr); - } - this->ioExecutorNotify.notify_all(); - } - - /* keep a ref to all event loops so they dont despawn in their event thread */ - unique_lock executor_lock{this->executorLock}; - auto wait_end = system_clock::now() + chrono::seconds{5}; - - while(true) { - if(this->event_loops.empty()) { - break; - } - - auto status = this->ioExecutorNotify.wait_until(executor_lock, wait_end); - if(status == std::cv_status::timeout) { - logCritical(LOG_GENERAL, - "Failed to shutdown all event loops successfully. After timeout {} loops are left.", - this->event_loops.size() - ); - break; - } - } - - /* now delete all loops */ - this->event_loops.clear(); -} - -//TODO also reduce thread pool! -void VoiceIOManager::adjustExecutors(size_t size) { - lock_guard l(this->executorLock); - size_t targetThreads = size * config::threads::voice::events_per_server; - if(targetThreads > config::threads::voice::io_limit) - targetThreads = config::threads::voice::io_limit; - if(targetThreads < config::threads::voice::io_min) - targetThreads = config::threads::voice::io_min; - - while(this->event_loops.size() < targetThreads) { - spawnEventLoop(); - } -} - -IOEventLoop::~IOEventLoop() { - assert(this_thread::get_id() != this->executor.get_id()); - assert(!this->executor.joinable()); -} - -/** - * @warning executor lock must be locked! - */ -std::shared_ptr VoiceIOManager::spawnEventLoop() { - std::shared_ptr loop = std::make_shared(); - loop->base = event_base_new(); - loop->executor = std::thread(&VoiceIOManager::dispatchBase, this, loop); - loop->bound_thread = -1; - - { -#ifndef WIN32 - const auto name = "IO exec #" + to_string(this->event_loops.size()); - pthread_setname_np(loop->executor.native_handle(), name.c_str()); -#endif - - const auto num_threads = std::thread::hardware_concurrency(); - if(num_threads != 0 && config::threads::voice::bind_io_thread_to_kernel_thread) { - auto thread_usage = new uint8_t[num_threads]; - memset(thread_usage, 0, num_threads); - - for(auto& ev_loop : this->event_loops) { - if(ev_loop->bound_thread < 0 || ev_loop->bound_thread >= num_threads) { - continue; - } - - thread_usage[ev_loop->bound_thread]++; - } - - int thread_index = 0; - int thread_use_count = 256; - for(int index = 0; index < num_threads; index++) - if(thread_usage[index] < thread_use_count) { - thread_use_count = thread_usage[index]; - thread_index = index; - } - - debugMessage(0, "Binding event loop '{}' to CPU thread {}. Current bound threads: {}", name, thread_index, thread_use_count); - delete[] thread_usage; - - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(thread_index, &cpuset); - auto err = pthread_setaffinity_np(loop->executor.native_handle(), sizeof(cpu_set_t), &cpuset); - if(err != 0) { - logError(0, "Failed to bind IO event loop '{}' to kernel thread. Code: {} => {}/{}", name, err, errno, strerror(errno)); - } - loop->bound_thread = thread_index; - } - } - - this->event_loops.push_back(loop); - return loop; -} - -std::shared_ptr VoiceIOManager::less_used_io_loop(vector> &blacklist) { - std::shared_ptr current; - for(const auto& loop : this->event_loops) - if(!current || loop->assigned_events.size() < current->assigned_events.size()) { - for(const auto& elm : blacklist) if(elm == loop) goto skipSet; - current = loop; - skipSet:; - } - return current; -} - -IOServerHandler::IOServerHandler(server::VirtualServer* server) : server(server) { } - -IOServerHandler::~IOServerHandler() { - for(const auto& entry : this->event_loop_events) { - entry->disable(); - entry->despawn(); - } - this->event_loop_events.clear(); -} - -std::shared_ptr IOServerHandler::create_event_loop_events(const std::shared_ptr &loop) { - std::shared_ptr entry = std::make_shared(); - - entry->owner = this; - entry->event_loop = loop; - entry->spawn(); - - this->event_loop_events.push_back(entry); - return entry; -} - -int IOServerHandler::resolve_file_descriptor(const std::shared_ptr &client) { - if(this->event_loop_events.empty()) - return -1; - -#if 0 - auto socket = client->connection->socket_id(); - auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()]; - if(socket < 0 || socket > event_loop->events.size()) - return -1; - - return event_loop->events[socket]->file_descriptor; -#endif - return -1; -} - -void IOServerHandler::invoke_write(const std::shared_ptr &client) { - if(this->event_loop_events.empty()) - return; /* TODO any kind of error or warning? */ - -#if 0 - auto socket = client->connection->socket_id(); - auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()]; - if(socket < 0 || socket > event_loop->events.size()) - return; /* TODO any kind of error or warning? */ - - auto event = event_loop->events[socket]; - if(!event->event_write) - return; /* TODO any kind of error or warning? */ - - event->push_voice_write_queue(client); - event_add(event->event_write, nullptr); -#endif -} - -void IOServerHandler::send_datagram(server::udp::DatagramPacket* datagram, int socket) { - if(this->event_loop_events.empty()) - return; /* TODO any kind of error or warning? */ - - auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()]; - if(socket < 0 || socket > event_loop->events.size()) - return; /* TODO any kind of error or warning? */ - - auto event = event_loop->events[socket]; - if(!event->event_write) - return; /* TODO any kind of error or warning? */ - - if(datagram) - event->push_dg_write_queue(datagram); - - event_add(event->event_write, nullptr); -} - -void IOEventLoopEvents::spawn() { -#if 0 - for(const auto& binding : this->owner->server->getVoiceServer()->activeBindings()) { - auto entry = make_shared(); - entry->file_descriptor = binding->file_descriptor; - entry->handle = this; - entry->server = this->owner->server; - entry->family = binding->address.ss_family; - entry->voice_server = &*this->owner->server->getVoiceServer(); - - entry->event_read = event_new(this->event_loop->base, binding->file_descriptor, EV_READ | EV_PERSIST, VoiceServer::handleMessageRead, &*entry); - entry->event_write = event_new(this->event_loop->base, binding->file_descriptor, EV_WRITE, VoiceServer::handleMessageWrite, &*entry); - this->events.push_back(entry); - - entry->socket_id = (int) this->events.size() - 1; - - { - lock_guard lock(event_loop->events_lock); - this->event_loop->assigned_events.push_back(entry); - } - } -#endif -} - -void IOEventLoopEvents::despawn() { - { - lock_guard lock(event_loop->events_lock); - for(const auto& event : this->events) { - auto& event_loop_events = this->event_loop->assigned_events; - - assert(std::find(event_loop_events.begin(), event_loop_events.end(), event) != event_loop_events.end()); - event_loop_events.erase(std::find(event_loop_events.begin(), event_loop_events.end(), event)); - } - } - - this->events.clear(); -} - -void VoiceIOManager::dispatchBase(shared_ptr self) { - debugMessage(LOG_INSTANCE, "Dispatching io base {}", (void*) self->base); - event_base_loop(self->base, EVLOOP_NO_EXIT_ON_EMPTY); - debugMessage(LOG_INSTANCE, "Dispatching io base {} finished", (void*) self->base); - - { - lock_guard executor_lock(this->executorLock); - auto found = std::find(this->event_loops.begin(), this->event_loops.end(), self); - if(found != this->event_loops.end()) { - this->event_loops.erase(found); - } else { - logCritical(LOG_INSTANCE, "Could not find executor in executor registry ({})!", (void*) self->base); - } - - if(!self->assigned_events.empty()) { - logError(LOG_INSTANCE, "Event loop exited, but sill containing some events ({})!", self->assigned_events.size()); - } - - event_base_free(self->base); - self->base = nullptr; - - this->ioExecutorNotify.notify_all(); /* let everybody know we're done */ - } -} \ No newline at end of file diff --git a/server/src/server/VoiceIOManager.h b/server/src/server/VoiceIOManager.h deleted file mode 100644 index c8a30e4..0000000 --- a/server/src/server/VoiceIOManager.h +++ /dev/null @@ -1,204 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ts { - namespace server { - class VirtualServer; - class VoiceServer; - class VoiceClient; - } - namespace io { - class VoiceIOManager; - class IOServerHandler; - struct IOEventLoopEntry; - struct IOEventLoopEvents; - - struct IOEventLoop { - IOEventLoop() = default; - ~IOEventLoop(); - - int bound_thread = -1; /* -1 represents that this loop is bound to no thread at all */ - - bool shutdown = false; - event_base* base = nullptr; - std::thread executor; - - std::mutex events_lock; - std::deque> assigned_events; - }; - - struct IOEventLoopEntry { - IOEventLoopEvents* handle; - - int socket_id = 0; - sa_family_t family; - - /* keeps these addresses in "hot" memory instead resolving three ptr */ - server::VirtualServer* server; - server::VoiceServer* voice_server; - - int file_descriptor = 0; /* actual socket */ - ::event* event_read = nullptr; - ::event* event_write = nullptr; - - spin_mutex write_queue_lock; - server::udp::DatagramPacket* dg_write_queue_head = nullptr; - server::udp::DatagramPacket* dg_write_queue_tail = nullptr; - - std::deque> voice_write_queue; - - inline server::udp::DatagramPacket* pop_dg_write_queue() { - std::lock_guard lock(this->write_queue_lock); - if(!this->dg_write_queue_head) - return nullptr; - - auto packet = this->dg_write_queue_head; - if(packet == this->dg_write_queue_tail) { - this->dg_write_queue_tail = nullptr; - this->dg_write_queue_head = nullptr; - } else { - this->dg_write_queue_head = packet->next_packet; - } - - return packet; - } - - inline void push_dg_write_queue(server::udp::DatagramPacket* packet) { - assert(!packet->next_packet); - packet->next_packet = nullptr; - - std::lock_guard lock(this->write_queue_lock); - if(this->dg_write_queue_tail) { - this->dg_write_queue_tail->next_packet = packet; - } else { - this->dg_write_queue_head = packet; - } - this->dg_write_queue_tail = packet; - } - - inline void push_voice_write_queue(const std::shared_ptr& client) { - std::lock_guard lock(this->write_queue_lock); - this->voice_write_queue.push_back(client); - } - - /* return 0 on success | 1 on there is more, but success | 2 on empty */ - inline int pop_voice_write_queue(std::shared_ptr& result) { - std::lock_guard lock(this->write_queue_lock); - - auto it_begin = this->voice_write_queue.begin(); - auto it_end = this->voice_write_queue.end(); - auto it = it_begin; - - while(it != it_end) { - result = it->lock(); - if(result) { - this->voice_write_queue.erase(it_begin, ++it); - return (int) (it != it_end); - } - it++; - } - if(it_begin != it_end) { - this->voice_write_queue.erase(it_begin, it_end); - } - return 2; - } - - ~IOEventLoopEntry() { - if(this->event_read) - event_free(this->event_read); - - if(this->event_write) - event_free(this->event_write); - } - }; - - struct IOEventLoopEvents { - IOServerHandler* owner = nullptr; - std::shared_ptr event_loop; - std::deque> events; - - void spawn(); - void despawn(); - - inline void activate(){ - for(const auto& event : events) { - if(event->event_read) { - event_add(event->event_read, nullptr); - } - } - } - - inline void disable(bool blocking = true){ - for(const auto& event : events) { - if(event->event_read) { - event_del(event->event_read); - event_del_block(event->event_read); - } - - if(event->event_write) { - event_del(event->event_write); - event_del_block(event->event_write); - } - } - } - - inline IOEventLoopEntry* event(int fd) { - for(const auto& entry : this->events) - if(entry->file_descriptor == fd) return entry.get(); - return nullptr; - } - }; - - class IOServerHandler { - friend class VoiceIOManager; - friend struct IOEventLoopEvents; - public: - explicit IOServerHandler(server::VirtualServer*); - ~IOServerHandler(); - - void invoke_write(const std::shared_ptr& /* client */); - int resolve_file_descriptor(const std::shared_ptr& /* client */); - void send_datagram(server::udp::DatagramPacket* /* packet */, int /* socket */); - private: - std::shared_ptr create_event_loop_events(const std::shared_ptr &); - - server::VirtualServer* server = nullptr; - std::deque> event_loop_events; - size_t event_loop_index = 0; - }; - - class VoiceIOManager { - public: - VoiceIOManager(); - virtual ~VoiceIOManager(); - - std::shared_ptr enableIo(server::VirtualServer* server); - void disableIo(server::VirtualServer*); - - void shutdownGlobally(); - private: - std::shared_ptr less_used_io_loop(std::vector>&); - - threads::Mutex serverLock; - std::deque> servers; - - std::mutex executorLock; - /* will be called as soon servers have been added or an event loop has been finished */ - std::condition_variable ioExecutorNotify; - std::deque> event_loops; - - void adjustExecutors(size_t); - std::shared_ptr spawnEventLoop(); - - void dispatchBase(std::shared_ptr); - }; - } -} \ No newline at end of file diff --git a/server/src/server/VoiceServer.h b/server/src/server/VoiceServer.h index 161d21c..d6ef633 100644 --- a/server/src/server/VoiceServer.h +++ b/server/src/server/VoiceServer.h @@ -9,7 +9,6 @@ #include #include #include -#include "VoiceIOManager.h" #include "./voice/DatagramPacket.h" #include "Definitions.h" #include @@ -17,12 +16,16 @@ namespace ts { namespace server { class VirtualServer; - class ConnectedClient; + class VoiceServer; class VoiceClient; class POWHandler; class VoiceServerSocket : public std::enable_shared_from_this { public: + /** + * Note: Only access event_read or event_write when the socket mutex is acquired or + * or within the event loop! + */ struct NetworkEvents { VoiceServerSocket* socket; struct event* event_read{nullptr}; @@ -129,7 +132,7 @@ namespace ts { /** * Enqueue a write event. - * Attention: The mutex should be locked! + * Attention: The socket mutex must be locked! */ inline void enqueue_network_write() { assert(!this->network_events.empty()); @@ -143,16 +146,13 @@ namespace ts { class VoiceServer { friend class VoiceServerSocket; - friend class VoiceClient; /* Not needed any more */ - friend class io::VoiceIOManager; /* Not needed any more */ - friend struct io::IOEventLoopEvents; /* Not needed any more */ friend class POWHandler; /* TODO: Still needed? May use some kind of callback */ public: explicit VoiceServer(const std::shared_ptr& server); ~VoiceServer(); bool start(const std::deque&, std::string&); - bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000)); + bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds{1000}); [[nodiscard]] std::shared_ptr findClient(ClientId); [[nodiscard]] std::shared_ptr findClient(sockaddr_in* addr, bool lock); diff --git a/server/src/server/VoiceServerSocket.cpp b/server/src/server/VoiceServerSocket.cpp index 3814970..5bf1adf 100644 --- a/server/src/server/VoiceServerSocket.cpp +++ b/server/src/server/VoiceServerSocket.cpp @@ -22,7 +22,7 @@ using namespace ts; VoiceServerSocket::NetworkEvents::~NetworkEvents() { auto event_read_ = std::exchange(this->event_read, nullptr); - auto event_write_ = std::exchange(this->event_read, nullptr); + auto event_write_ = std::exchange(this->event_write, nullptr); if(event_read_) { event_free(event_read_); @@ -132,33 +132,38 @@ bool VoiceServerSocket::activate(std::string &error) { } void VoiceServerSocket::deactivate() { - for(const auto& binding : this->network_events) { + std::unique_lock write_lock{this->mutex}; + auto network_events_ = std::move(this->network_events); + auto file_descriptor_ = std::exchange(this->file_descriptor, 0); + + this->write_client_queue.clear(); + while(this->write_datagram_head) { + auto datagram = std::exchange(this->write_datagram_head, this->write_datagram_head->next_packet); + udp::DatagramPacket::destroy(datagram); + } + this->write_datagram_tail = &this->write_datagram_head; + write_lock.unlock(); + + /* + * Finish all active/pending events before we clear them. + * Since we moved these events out of network_events the can't get rescheduled. + */ + for(const auto& binding : network_events_) { if(binding->event_read) { event_del_block(binding->event_read); - event_free(binding->event_read); } if(binding->event_write) { event_del_block(binding->event_write); - event_free(binding->event_write); } } - { - std::lock_guard write_lock{this->mutex}; - this->network_events.clear(); + /* Will free all events. */ + network_events_.clear(); - if(this->file_descriptor > 0) { - ::close(this->file_descriptor); - this->file_descriptor = 0; - } - - this->write_client_queue.clear(); - while(this->write_datagram_head) { - auto datagram = std::exchange(this->write_datagram_head, this->write_datagram_head->next_packet); - udp::DatagramPacket::destroy(datagram); - } - this->write_datagram_tail = &this->write_datagram_head; + /* Close the file descriptor after all network events have been finished*/ + if(file_descriptor_ > 0) { + ::close(file_descriptor_); } }