diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 0ae472b..22b44f7 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -53,6 +53,7 @@ set(SERVER_SOURCE_FILES src/TS3ServerHeartbeat.cpp src/SignalHandler.cpp src/server/VoiceServer.cpp + src/server/VoiceServerSocket.cpp src/server/POWHandler.cpp src/client/voice/VoiceClientConnection.cpp src/client/command_handler/groups.cpp diff --git a/server/src/Configuration.h b/server/src/Configuration.h index 93b55a0..7141fb5 100644 --- a/server/src/Configuration.h +++ b/server/src/Configuration.h @@ -229,12 +229,12 @@ namespace ts::config { } namespace threads { - extern size_t ticking; - extern size_t command_execute; - extern size_t network_events; + extern size_t ticking; /* in use */ + extern size_t command_execute; /* in use */ + extern size_t network_events; /* in use */ namespace voice { - extern size_t events_per_server; + extern size_t events_per_server; /* in use */ extern size_t io_min; extern size_t io_per_server; extern size_t io_limit; diff --git a/server/src/VirtualServer.cpp b/server/src/VirtualServer.cpp index e535c6d..a0ee5af 100644 --- a/server/src/VirtualServer.cpp +++ b/server/src/VirtualServer.cpp @@ -473,45 +473,37 @@ bool VirtualServer::start(std::string& error) { return false; } - deque> bindings; + std::deque bindings{}; for(const auto& address : split_hosts(host, ',')) { - auto entry = make_shared(); + sockaddr_storage binding{}; + memset(&binding, 0, sizeof(binding)); + if(net::is_ipv4(address)) { - sockaddr_in addr{}; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as_or(0)); - if(!evaluateAddress4(address, addr.sin_addr)) { + auto address_v4 = (sockaddr_in*) &binding; + address_v4->sin_family = AF_INET; + address_v4->sin_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as_or(0)); + if(!evaluateAddress4(address, address_v4->sin_addr)) { logError(this->serverId, "Fail to resolve v4 address info for \"{}\"", address); continue; } - - memcpy(&entry->address, &addr, sizeof(addr)); } else if(net::is_ipv6(address)) { - sockaddr_in6 addr{}; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as_or(0)); - if(!evaluateAddress6(address, addr.sin6_addr)) { + auto address_v6 = (sockaddr_in6*) &binding; + address_v6->sin6_family = AF_INET6; + address_v6->sin6_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as_or(0)); + if(!evaluateAddress6(address, address_v6->sin6_addr)) { logError(this->serverId, "Fail to resolve v6 address info for \"{}\"", address); continue; } - - memcpy(&entry->address, &addr, sizeof(addr)); } else { logError(this->serverId, "Failed to determinate address type for \"{}\"", address); continue; } - bindings.push_back(entry); - } - if(bindings.empty()) { - error = "failed to resole any host!"; - this->stop("failed to start", false); - return false; + + bindings.emplace_back(std::move(binding)); } //Setup voice server - udpVoiceServer = make_shared(self.lock()); + udpVoiceServer = std::make_shared(self.lock()); if(!udpVoiceServer->start(bindings, error)) { error = "could not start voice server. Message: " + error; this->stop("failed to start", false); diff --git a/server/src/VirtualServerManager.cpp b/server/src/VirtualServerManager.cpp index af967c9..cbf323d 100644 --- a/server/src/VirtualServerManager.cpp +++ b/server/src/VirtualServerManager.cpp @@ -7,6 +7,7 @@ #include "src/client/ConnectedClient.h" #include #include +#include using namespace std; using namespace std::chrono; @@ -14,11 +15,7 @@ using namespace ts::server; VirtualServerManager::VirtualServerManager(InstanceHandler* handle) : handle(handle) { this->puzzles = new udp::PuzzleManager{}; - this->handshakeTickers = new threads::Scheduler(1, "handshake ticker"); - //this->join_loop = new event::EventExecutor("joiner #"); this->_ioManager = new io::VoiceIOManager(); - - this->handshakeTickers->schedule("ticker", [&](){ this->tickHandshakeClients(); }, seconds(1)); } VirtualServerManager::~VirtualServerManager() { @@ -39,17 +36,6 @@ VirtualServerManager::~VirtualServerManager() { delete this->puzzles; this->puzzles = nullptr; - if(this->join_loop) - this->join_loop->shutdown(); - delete this->join_loop; - this->join_loop = nullptr; - - if(this->handshakeTickers) { - this->handshakeTickers->shutdown(); - } - delete this->handshakeTickers; - this->handshakeTickers = nullptr; - if(this->_ioManager) this->_ioManager->shutdownGlobally(); delete this->_ioManager; this->_ioManager = nullptr; @@ -187,26 +173,24 @@ shared_ptr VirtualServerManager::findServerById(ServerId sid) { shared_ptr VirtualServerManager::findServerByPort(uint16_t port) { for(const auto& server : this->serverInstances()){ - if(server->properties()[property::VIRTUALSERVER_PORT] == port) return server; - if(server->running() && server->getVoiceServer()) - for(const auto& binding : server->getVoiceServer()->activeBindings()) - if(binding->address_port() == port) return server; + if(server->properties()[property::VIRTUALSERVER_PORT] == port) { + return server; + } } return nullptr; } uint16_t VirtualServerManager::next_available_port(const std::string& host_string) { - auto instances = this->serverInstances(); - std::vector unallowed_ports{}; - unallowed_ports.reserve(instances.size()); + auto instances_ = this->serverInstances(); + std::set unallowed_ports{}; - for(const auto& instance : instances) { - unallowed_ports.push_back(instance->properties()[property::VIRTUALSERVER_PORT].as_or(0)); + for(const auto& instance : instances_) { + unallowed_ports.insert(instance->properties()[property::VIRTUALSERVER_PORT].as_or(0)); auto vserver = instance->getVoiceServer(); if(instance->running() && vserver) { - for(const auto& binding : vserver->activeBindings()) { - unallowed_ports.push_back(binding->address_port()); + for(const auto& socket : vserver->getSockets()) { + unallowed_ports.insert(net::port(socket->address())); } } } @@ -217,8 +201,9 @@ uint16_t VirtualServerManager::next_available_port(const std::string& host_strin if(port < 1024) goto next_port; for(auto& p : unallowed_ports) { - if(p == port) + if(p == port) { goto next_port; + } } for(auto& binding : bindings) { @@ -470,14 +455,6 @@ void VirtualServerManager::shutdownAll(const std::string& msg) { } } -void VirtualServerManager::tickHandshakeClients() { - for(const auto& server : this->serverInstances()) { - auto vserver = server->getVoiceServer(); - if(vserver) - vserver->tickHandshakingClients(); - } -} - void VirtualServerManager::delete_server_in_db(ts::ServerId server_id, bool data_only) { #define execute_delete(statement) \ result = sql::command(this->handle->getSql(), statement, variable{":sid", server_id}).execute(); \ diff --git a/server/src/VirtualServerManager.h b/server/src/VirtualServerManager.h index 0330f45..900cdb5 100644 --- a/server/src/VirtualServerManager.h +++ b/server/src/VirtualServerManager.h @@ -70,8 +70,6 @@ namespace ts::server { udp::PuzzleManager* rsaPuzzles() { return this->puzzles; } - event::EventExecutor* get_join_loop() { return this->join_loop; } - io::VoiceIOManager* ioManager(){ return this->_ioManager; } /* This must be recursive */ @@ -86,7 +84,6 @@ namespace ts::server { udp::PuzzleManager* puzzles{nullptr}; event::EventExecutor* join_loop = nullptr; - threads::Scheduler* handshakeTickers = nullptr; io::VoiceIOManager* _ioManager = nullptr; struct { @@ -95,8 +92,6 @@ namespace ts::server { std::mutex lock; } acknowledge; - void tickHandshakeClients(); - void delete_server_in_db(ServerId /* server id */, bool /* data only */); void change_server_id_in_db(ServerId /* old id */, ServerId /* new id */); diff --git a/server/src/client/voice/PacketEncoder.cpp b/server/src/client/voice/PacketEncoder.cpp index f47adb4..24c1ac1 100644 --- a/server/src/client/voice/PacketEncoder.cpp +++ b/server/src/client/voice/PacketEncoder.cpp @@ -32,22 +32,22 @@ PacketEncoder::~PacketEncoder() { void PacketEncoder::reset() { this->acknowledge_manager_.reset(); - protocol::OutgoingServerPacket *whead, *rhead; + protocol::OutgoingServerPacket *write_head, *read_head; { std::lock_guard wlock{this->write_queue_mutex}; - whead = std::exchange(this->encrypt_queue_head, nullptr); - rhead = std::exchange(this->send_queue_head, nullptr); + write_head = std::exchange(this->encrypt_queue_head, nullptr); + read_head = std::exchange(this->send_queue_head, nullptr); this->encrypt_queue_tail = &this->encrypt_queue_head; this->send_queue_tail = &this->send_queue_head; } - while(whead) { - std::exchange(whead, whead->next)->unref(); + while(write_head) { + std::exchange(write_head, write_head->next)->unref(); } - while(rhead) { - std::exchange(rhead, rhead->next)->unref(); + while(read_head) { + std::exchange(read_head, read_head->next)->unref(); } } @@ -277,7 +277,7 @@ bool PacketEncoder::encrypt_outgoing_packet(ts::protocol::OutgoingServerPacket * return true; } -PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::OutgoingServerPacket *&result) { +bool PacketEncoder::pop_write_buffer(protocol::OutgoingServerPacket *&result) { bool need_encrypt{false}, more_packets; { @@ -305,7 +305,8 @@ PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::Outgoin need_encrypt = true; } else { - return BufferPopResult::DRAINED; + result = nullptr; + return false; } result->next = nullptr; @@ -316,7 +317,7 @@ PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::Outgoin this->encrypt_outgoing_packet(result); } - return more_packets ? BufferPopResult::MORE_AVAILABLE : BufferPopResult::DRAINED; + return more_packets; } void PacketEncoder::reenqueue_failed_buffer(protocol::OutgoingServerPacket *packet) { diff --git a/server/src/client/voice/PacketEncoder.h b/server/src/client/voice/PacketEncoder.h index 9000173..1a58667 100644 --- a/server/src/client/voice/PacketEncoder.h +++ b/server/src/client/voice/PacketEncoder.h @@ -19,11 +19,6 @@ namespace ts::server::server::udp { using AcknowledgeEntry = connection::AcknowledgeManager::Entry; using StatisticsCategory = stats::ConnectionStatistics::category; public: - enum struct BufferPopResult { - DRAINED, - MORE_AVAILABLE - }; - enum struct CryptError { KEY_GENERATION_FAILED, ENCRYPT_FAILED /* contains some data */ @@ -53,8 +48,11 @@ namespace ts::server::server::udp { bool wait_empty_write_and_prepare_queue(std::chrono::time_point until = std::chrono::time_point()); - /* if the result is true, ownership has been transferred */ - BufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */); + /** + * Returns true if there is more data to write and false otherwise + * @return + */ + bool pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */); void reenqueue_failed_buffer(protocol::OutgoingServerPacket* /* packet */); [[nodiscard]] inline auto& acknowledge_manager() { return this->acknowledge_manager_; } diff --git a/server/src/client/voice/VoiceClient.cpp b/server/src/client/voice/VoiceClient.cpp index 89cade4..7ba5231 100644 --- a/server/src/client/voice/VoiceClient.cpp +++ b/server/src/client/voice/VoiceClient.cpp @@ -30,6 +30,9 @@ VoiceClient::VoiceClient(const std::shared_ptr& server, const socka void VoiceClient::initialize() { auto ref_self = dynamic_pointer_cast(this->ref()); + assert(ref_self); + this->ref_self_voice = ref_self; + this->server_command_queue_ = std::make_unique( serverInstance->server_command_executor(), std::make_unique(ref_self) diff --git a/server/src/client/voice/VoiceClient.h b/server/src/client/voice/VoiceClient.h index c61e4dc..aa51650 100644 --- a/server/src/client/voice/VoiceClient.h +++ b/server/src/client/voice/VoiceClient.h @@ -64,6 +64,7 @@ namespace ts { bool close_connection(const std::chrono::system_clock::time_point &timeout) override; bool disconnect(const std::string&) override; + [[nodiscard]] inline const auto& get_remote_address() const { return this->remote_address; } /* * TODO: Use a helper class called InvokerDescription containing the invoker properties and not holding a whole connected client reference * 2. May use some kind of class to easily set the disconnect reason? @@ -106,10 +107,13 @@ namespace ts { virtual command_result handleCommand(Command &command) override; private: - void finalDisconnect(); - - /* Used by close_connection to determine if we've successfully flushed the connection */ - [[nodiscard]] bool connection_flushed(); + /* + * Use to schedule a network write. + * If we don't have a proper weak ref we have, + * every time we want to schedule a write, + * to lock the weak_ptr and dynamic ptr cast it + */ + std::weak_ptr ref_self_voice{}; rtc::NativeAudioSourceSupplier rtc_audio_supplier{}; rtc::NativeAudioSourceSupplier rtc_audio_whisper_supplier{}; @@ -117,9 +121,6 @@ namespace ts { uint16_t stop_seq_counter{0}; uint16_t whisper_head_counter{0}; - command_result handleCommandClientInit(Command&) override; - command_result handleCommandClientDisconnect(Command&); - std::mutex flush_mutex{}; task_id flush_task{0}; bool flush_executed{false}; @@ -127,6 +128,14 @@ namespace ts { std::optional disconnect_acknowledged{}; /* locked by flush_mutex */ std::unique_ptr server_command_queue_{}; + + void finalDisconnect(); + + /* Used by close_connection to determine if we've successfully flushed the connection */ + [[nodiscard]] bool connection_flushed(); + + command_result handleCommandClientInit(Command&) override; + command_result handleCommandClientDisconnect(Command&); }; class VoiceClientCommandHandler : public ts::server::ServerCommandHandler { diff --git a/server/src/client/voice/VoiceClientConnection.cpp b/server/src/client/voice/VoiceClientConnection.cpp index 84b5f41..045f526 100644 --- a/server/src/client/voice/VoiceClientConnection.cpp +++ b/server/src/client/voice/VoiceClientConnection.cpp @@ -68,12 +68,6 @@ std::string VoiceClientConnection::log_prefix() { return CLIENT_STR_LOG_PREFIX_(client); } -void VoiceClientConnection::triggerWrite() { - if(this->current_client->voice_server) { - this->current_client->voice_server->triggerWrite(dynamic_pointer_cast(this->current_client->ref())); - } -} - void VoiceClientConnection::handle_incoming_datagram(protocol::ClientPacketParser& packet_parser) { #ifndef CONNECTION_NO_STATISTICS if(this->current_client) { @@ -211,7 +205,7 @@ void VoiceClientConnection::reset() { } void VoiceClientConnection::reset_remote_address() { - memset(&this->remote_address_, 0, sizeof(this->remote_address_)); + memset(&this->current_client->remote_address, 0, sizeof(this->current_client->remote_address)); memset(&this->remote_address_info_, 0, sizeof(this->remote_address_info_)); } @@ -248,7 +242,7 @@ void VoiceClientConnection::callback_encode_crypt_error(void *ptr_this, void VoiceClientConnection::callback_request_write(void *ptr_this) { auto connection = reinterpret_cast(ptr_this); - connection->triggerWrite(); + connection->socket_->enqueue_client_write(connection->current_client->ref_self_voice); } void VoiceClientConnection::callback_resend_failed(void *ptr_this, const shared_ptr &entry) { diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index facdb1f..9f209e2 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -34,6 +34,7 @@ namespace ts { class VoiceClient; class VoiceServer; class POWHandler; + class VoiceServerSocket; } namespace connection { @@ -71,8 +72,8 @@ namespace ts { [[nodiscard]] inline auto virtual_server_id() const { return this->virtual_server_id_; } - [[nodiscard]] inline const auto& remote_address() const { return this->remote_address_; } - [[nodiscard]] inline const auto& socket_id() const { return this->socket_id_; } + [[nodiscard]] inline const auto& remote_address_info() const { return this->remote_address_info_; } + [[nodiscard]] inline const auto& socket() const { return this->socket_; } [[nodiscard]] inline auto& packet_statistics() { return this->packet_statistics_; } [[nodiscard]] inline auto& packet_decoder() { return this->packet_decoder_; } @@ -80,17 +81,15 @@ namespace ts { [[nodiscard]] inline auto& ping_handler() { return this->ping_handler_; } [[nodiscard]] inline auto& crypt_setup_handler() { return this->crypt_setup_handler_; } - protected: + void handle_incoming_datagram(protocol::ClientPacketParser& /* packet */); bool verify_encryption(const protocol::ClientPacketParser& /* packet */); - - void triggerWrite(); private: ServerId virtual_server_id_; server::VoiceClient* current_client; - int socket_id_{0}; - sockaddr_storage remote_address_{}; + /* The remote address is stored within the client object.... FIXME! */ + std::shared_ptr socket_{}; server::udp::pktinfo_storage remote_address_info_{}; CryptHandler crypt_handler; /* access to CryptHandler is thread save */ diff --git a/server/src/server/POWHandler.cpp b/server/src/server/POWHandler.cpp index 923461b..18bbeb8 100644 --- a/server/src/server/POWHandler.cpp +++ b/server/src/server/POWHandler.cpp @@ -38,18 +38,20 @@ void POWHandler::delete_client(const std::shared_ptrpending_clients.erase(it); } -void POWHandler::handle_datagram(int socket, const sockaddr_storage &address,msghdr &info, const pipes::buffer_view &buffer) { - if(buffer.length() < MAC_SIZE + CLIENT_HEADER_SIZE + 5) +void POWHandler::handle_datagram(const std::shared_ptr& socket, const sockaddr_storage &address,msghdr &info, const pipes::buffer_view &buffer) { + if(buffer.length() < MAC_SIZE + CLIENT_HEADER_SIZE + 5) { return; /* too short packet! */ + } std::shared_ptr client; { lock_guard lock(this->pending_clients_lock); - for(const auto& c : this->pending_clients) - if(c->socket == socket && memcmp(&address, &c->address, sizeof(sockaddr_storage)) == 0) { - client = c; + for(const auto& pending_client : this->pending_clients) { + if(pending_client->socket == socket && memcmp(&address, &pending_client->address, sizeof(sockaddr_storage)) == 0) { + client = pending_client; break; } + } if(!client) { #ifdef POW_DEBUG @@ -139,7 +141,7 @@ void POWHandler::send_data(const std::shared_ptr datagram->data[10] = (uint8_t) (0x08U | 0x80U); memcpy(&datagram->data[11], buffer.data_ptr(), buffer.length()); - this->server->send_datagram(client->socket, datagram); + client->socket->send_datagram(datagram); } void POWHandler::reset_client(const std::shared_ptr &client) { @@ -352,7 +354,7 @@ shared_ptr POWHandler::register_verified_client(const std::shared_p voice_client->initialize_weak_reference(voice_client); voice_client->initialize(); - voice_client->connection->socket_id_ = client->socket; + voice_client->connection->socket_ = client->socket; voice_client->state = ConnectionState::INIT_LOW; memcpy(&voice_client->connection->remote_address_info_, &client->address_info, sizeof(client->address_info)); diff --git a/server/src/server/POWHandler.h b/server/src/server/POWHandler.h index 8d4b72d..4ecc804 100644 --- a/server/src/server/POWHandler.h +++ b/server/src/server/POWHandler.h @@ -10,59 +10,60 @@ namespace ts::server { - class POWHandler { - public: - enum LowHandshakeState : uint8_t { - COOKIE_GET, - COOKIE_SET, - PUZZLE_GET, - PUZZLE_SET, - PUZZLE_SOLVE, - PUZZLE_RESET, - COMPLETED, - COMMAND_RESET = 127, - UNSET = 0xFB - }; + class VoiceServerSocket; + class POWHandler { + public: + enum LowHandshakeState : uint8_t { + COOKIE_GET, + COOKIE_SET, + PUZZLE_GET, + PUZZLE_SET, + PUZZLE_SOLVE, + PUZZLE_RESET, + COMPLETED, + COMMAND_RESET = 127, + UNSET = 0xFB + }; - struct Client { - int socket; - sockaddr_storage address; - udp::pktinfo_storage address_info; + struct Client { + std::shared_ptr socket; + sockaddr_storage address; + udp::pktinfo_storage address_info; - std::timed_mutex handle_lock; - std::chrono::system_clock::time_point last_packet; - LowHandshakeState state = LowHandshakeState::COOKIE_GET; + std::timed_mutex handle_lock; + std::chrono::system_clock::time_point last_packet; + LowHandshakeState state = LowHandshakeState::COOKIE_GET; - uint8_t client_control_data[4]{0}; - uint8_t server_control_data[16]{0}; - uint8_t server_data[100]; + uint8_t client_control_data[4]{0}; + uint8_t server_control_data[16]{0}; + uint8_t server_data[100]; - uint32_t client_version; + uint32_t client_version; - std::shared_ptr rsa_challenge; - }; + std::shared_ptr rsa_challenge; + }; - explicit POWHandler(VoiceServer* /* server */); + explicit POWHandler(VoiceServer* /* server */); - void handle_datagram(int /* socket */, const sockaddr_storage& /* address */, msghdr& /* info */, const pipes::buffer_view& /* buffer */); - void execute_tick(); - private: - inline ServerId get_server_id() { - return this->server->get_server()->getServerId(); - } - VoiceServer* server; + void handle_datagram(const std::shared_ptr& /* socket */, const sockaddr_storage& /* address */, msghdr& /* info */, const pipes::buffer_view& /* buffer */); + void execute_tick(); + private: + inline ServerId get_server_id() { + return this->server->get_server()->getServerId(); + } + VoiceServer* server; - std::mutex pending_clients_lock; - std::deque> pending_clients; + std::mutex pending_clients_lock; + std::deque> pending_clients; - void delete_client(const std::shared_ptr& /* client */); + void delete_client(const std::shared_ptr& /* client */); - void handle_cookie_get(const std::shared_ptr& /* client */, const pipes::buffer_view& /* buffer */); - void handle_puzzle_get(const std::shared_ptr& /* client */, const pipes::buffer_view& /* buffer */); - void handle_puzzle_solve(const std::shared_ptr& /* client */, const pipes::buffer_view& /* buffer */); - std::shared_ptr register_verified_client(const std::shared_ptr& /* client */); + void handle_cookie_get(const std::shared_ptr& /* client */, const pipes::buffer_view& /* buffer */); + void handle_puzzle_get(const std::shared_ptr& /* client */, const pipes::buffer_view& /* buffer */); + void handle_puzzle_solve(const std::shared_ptr& /* client */, const pipes::buffer_view& /* buffer */); + std::shared_ptr register_verified_client(const std::shared_ptr& /* client */); - void send_data(const std::shared_ptr &client /* client */, const pipes::buffer_view &buffer /* buffer */); - void reset_client(const std::shared_ptr &client /* client */); - }; - } \ No newline at end of file + void send_data(const std::shared_ptr &client /* client */, const pipes::buffer_view &buffer /* buffer */); + void reset_client(const std::shared_ptr &client /* client */); + }; +} \ No newline at end of file diff --git a/server/src/server/VoiceIOManager.cpp b/server/src/server/VoiceIOManager.cpp index 56e4c41..d5bf750 100644 --- a/server/src/server/VoiceIOManager.cpp +++ b/server/src/server/VoiceIOManager.cpp @@ -218,18 +218,22 @@ int IOServerHandler::resolve_file_descriptor(const std::shared_ptrevent_loop_events.empty()) return -1; +#if 0 auto socket = client->connection->socket_id(); auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()]; if(socket < 0 || socket > event_loop->events.size()) return -1; return event_loop->events[socket]->file_descriptor; +#endif + return -1; } void IOServerHandler::invoke_write(const std::shared_ptr &client) { if(this->event_loop_events.empty()) return; /* TODO any kind of error or warning? */ +#if 0 auto socket = client->connection->socket_id(); auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()]; if(socket < 0 || socket > event_loop->events.size()) @@ -241,6 +245,7 @@ void IOServerHandler::invoke_write(const std::shared_ptrpush_voice_write_queue(client); event_add(event->event_write, nullptr); +#endif } void IOServerHandler::send_datagram(server::udp::DatagramPacket* datagram, int socket) { @@ -262,6 +267,7 @@ void IOServerHandler::send_datagram(server::udp::DatagramPacket* datagram, int s } void IOEventLoopEvents::spawn() { +#if 0 for(const auto& binding : this->owner->server->getVoiceServer()->activeBindings()) { auto entry = make_shared(); entry->file_descriptor = binding->file_descriptor; @@ -281,6 +287,7 @@ void IOEventLoopEvents::spawn() { this->event_loop->assigned_events.push_back(entry); } } +#endif } void IOEventLoopEvents::despawn() { diff --git a/server/src/server/VoiceIOManager.h b/server/src/server/VoiceIOManager.h index 8350212..c8a30e4 100644 --- a/server/src/server/VoiceIOManager.h +++ b/server/src/server/VoiceIOManager.h @@ -105,8 +105,9 @@ namespace ts { } it++; } - if(it_begin != it_end) + if(it_begin != it_end) { this->voice_write_queue.erase(it_begin, it_end); + } return 2; } diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index 8974990..36d9f4f 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -1,4 +1,3 @@ -#define TIMING_DISABLED #include "POWHandler.h" #include #include @@ -9,7 +8,7 @@ #include #include "src/VirtualServerManager.h" #include "../InstanceHandler.h" -#include +#include "./GlobalNetworkEvents.h" using namespace std; using namespace std::chrono; @@ -17,8 +16,6 @@ using namespace ts::server; using namespace ts::buffer; using namespace ts; -extern InstanceHandler* serverInstance; - VoiceServer::VoiceServer(const std::shared_ptr& server) { this->server = server; this->pow_handler = make_unique(this); @@ -26,92 +23,62 @@ VoiceServer::VoiceServer(const std::shared_ptr& server) { VoiceServer::~VoiceServer() { } -#define SET_OPTION(type, option, flag, error) \ -if(setsockopt(bind->file_descriptor, type, option, &flag, sizeof(flag)) < 0) { \ - error; \ - ::close(bind->file_descriptor); \ - bind->file_descriptor = 0; \ - continue; \ -} - -bool VoiceServer::start(const std::deque>& binding, std::string& error) { - if(this->running) return false; - if(binding.empty()) { - error = "Missing bindings!"; +bool VoiceServer::start(const std::deque& address_list, std::string& error) { + if(this->running) { return false; } this->running = true; - this->bindings = binding; - int enable = 1, disable = 0; - for (auto &bind : binding) { - bind->file_descriptor = socket(bind->address.ss_family, SOCK_DGRAM, 0); - if(!bind->file_descriptor) { - logError(this->server->getServerId(), "Failed to create socket for {}", bind->address_string()); - continue; - } + size_t active_sockets{0}; + for(const auto& address : address_list) { + auto socket = std::make_shared(this, address); + this->sockets.push_back(socket); - if(setsockopt(bind->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &disable, sizeof(int)) < 0) logError(this->server->getServerId(), "Could not disable flag reuse address for bound {}!", bind->address_string()); - //if(setsockopt(bind->file_descriptor, SOL_SOCKET, SO_REUSEPORT, &disable, sizeof(int)) < 0) logError(this->server->getServerId(), "Could not disable flag reuse port for bound {}!", bind->address_string()); - - /* We're never sending over MTU size packets! */ - int pmtu = IP_PMTUDISC_DO; - setsockopt(bind->file_descriptor, IPPROTO_IP, IP_MTU_DISCOVER, &pmtu, sizeof(pmtu)); - - if(fcntl(bind->file_descriptor, F_SETFD, FD_CLOEXEC) < 0) - logError(this->server->getServerId(), "Failed to enable FD_CLOEXEC for {} ({}) (VoiceServer)", bind->file_descriptor, bind->address_string()); - - if(bind->address.ss_family == AF_INET6) { - SET_OPTION(IPPROTO_IPV6, IPV6_RECVPKTINFO, enable, { - logError(this->server->getServerId(), "Failed to enable packet info (v6) for {}", bind->address_string()); - }); - SET_OPTION(IPPROTO_IPV6, IPV6_V6ONLY, enable, { - logError(this->server->getServerId(), "Failed to enable ip v6 only for {}", bind->address_string()); - }); + std::string socket_error{}; + if(socket->activate(socket_error)) { + active_sockets++; } else { - SET_OPTION(IPPROTO_IP, IP_PKTINFO, enable, { - logError(this->server->getServerId(), "Failed to enable packet info for {}", bind->address_string()); - }); + logError(this->server->getServerId(), "Failed to bind UDP socket {}: {}", net::to_string(socket->address()), socket_error); } - - if(::bind(bind->file_descriptor, (const sockaddr*) &bind->address, net::address_size(bind->address)) < 0) { - logError(this->server->getServerId(), "Failed to bind to {} ({} => {})", bind->address_string(), errno, strerror(errno)); - ::close(bind->file_descriptor); - bind->file_descriptor = 0; - continue; - } - - fcntl(bind->file_descriptor, F_SETFL, fcntl(bind->file_descriptor, F_GETFL, 0) | O_NONBLOCK); } + + if(!active_sockets) { + error = "failed to bind to any host"; + goto error_exit; + } + { - auto active_bindings = this->activeBindings(); - if(active_bindings.empty()) { - error = "Failed to bind any address!"; - this->running = false; - return false; - } + auto task_scheduled = serverInstance->general_task_executor()->schedule_repeating( + this->handshake_tick_task, + "voice server tick " + std::to_string(this->get_server()->getServerId()), + std::chrono::seconds{1}, + [&](const auto&) { + this->tickHandshakingClients(); + } + ); - string str; - for(auto it = active_bindings.begin(); it != active_bindings.end(); it++) { - str += net::to_string((*it)->address) + (it + 1 == active_bindings.end() ? "" : " | "); + if(!task_scheduled) { + error = "failed to schedule voice server tick task"; + goto error_exit; } - logMessage(this->server->getServerId(), "Started server on {}.", str); } - - - this->io = serverInstance->getVoiceServerManager()->ioManager()->enableIo(this->server.get()); return true; -} -void VoiceServer::triggerWrite(const std::shared_ptr& client) { - if(!client) { - logError(this->server->getServerId(), "Invalid client for triggerWrite()"); - return; - } + error_exit: - if(auto io_{this->io}; io_) { - io_->invoke_write(client); + this->running = false; + + for(const auto& socket : this->sockets) { + socket->deactivate(); } + this->sockets.clear(); + + if(this->handshake_tick_task > 0) { + auto task = serverInstance->general_task_executor()->cancel_task_joinable(this->handshake_tick_task); + this->handshake_tick_task = 0; + task.wait(); + } + return false; } void VoiceServer::tickHandshakingClients() { @@ -122,9 +89,12 @@ void VoiceServer::tickHandshakingClients() { lock_guard lock(this->connectionLock); connections = this->activeConnections; } - for(const auto& client : connections) - if(client->state == ConnectionState::INIT_HIGH || client->state == ConnectionState::INIT_LOW) + + for(const auto& client : connections) { + if(client->state == ConnectionState::INIT_HIGH || client->state == ConnectionState::INIT_LOW) { client->tick_server(system_clock::now()); + } + } } void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) { @@ -142,36 +112,37 @@ void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &no } bool VoiceServer::stop(const std::chrono::milliseconds& flushTimeout) { - if(!this->running) return false; + if(!this->running) { + return false; + } this->running = false; this->connectionLock.lock(); auto list = this->activeConnections; this->connectionLock.unlock(); - for(const auto &e : list) + for(const auto &e : list) { e->close_connection(system_clock::now() + seconds(1)); + } auto beg = system_clock::now(); - while(!this->activeConnections.empty() && flushTimeout.count() != 0 && system_clock::now() - beg < flushTimeout) + while(!this->activeConnections.empty() && flushTimeout.count() != 0 && system_clock::now() - beg < flushTimeout) { threads::self::sleep_for(milliseconds(10)); + } - for(const auto& connection : this->activeConnections) + for(const auto& connection : this->activeConnections) { connection->voice_server = nullptr; + } this->activeConnections.clear(); - serverInstance->getVoiceServerManager()->ioManager()->disableIo(this->server.get()); - this->io = nullptr; - for(const auto& bind : this->bindings) { - if(bind->file_descriptor > 0){ - if(!shutdown(bind->file_descriptor, SHUT_RDWR)) logError(this->server->getServerId(), "Failed to shutdown socket {} ({}) Reason: {}/{}", bind->file_descriptor, bind->address_string(), errno, strerror(errno)); - if(!close(bind->file_descriptor)) { - if(errno != ENOTCONN) - logError(this->server->getServerId(), "Failed to close socket {} ({}) Reason: {}/{}", bind->file_descriptor, bind->address_string(), errno, strerror(errno)); - } - bind->file_descriptor = 0; - } + auto tick_task_future = serverInstance->general_task_executor()->cancel_task_joinable(this->handshake_tick_task); + if(tick_task_future.wait_for(std::chrono::seconds{5}) != std::future_status::ready) { + logCritical(this->get_server()->getServerId(), "Failed to shutdown tick executor"); } - this->bindings.clear(); + + for(const auto& bind : this->sockets) { + bind->deactivate(); + } + this->sockets.clear(); return true; } @@ -230,321 +201,14 @@ bool VoiceServer::unregisterConnection(std::shared_ptr connection) return true; } -static union { - char literal[8]{'T', 'S', '3', 'I', 'N', 'I', 'T', '1'}; - uint64_t integral; -} TS3INIT; +void VoiceServer::handleClientAddressChange(const std::shared_ptr &client, + const sockaddr_storage &remote_address, + const udp::pktinfo_storage &remote_address_info) { + auto old_address = net::to_string(client->get_remote_address()); + auto new_address = net::to_string(remote_address); -constexpr static auto kRecvBufferSize{1600}; //IPv6 MTU: 1500 | IPv4 MTU: 576 -void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { - (void) events; - - auto event_handle = (io::IOEventLoopEntry*) _event_handle; - auto voice_server = event_handle->voice_server; - auto ts_server = event_handle->server; - - uint8_t raw_read_buffer[kRecvBufferSize]; //Allocate on stack, so we dont need heap here - - ssize_t bytes_read; - pipes::buffer_view read_buffer{raw_read_buffer, kRecvBufferSize}; /* will not allocate anything, just sets its mode to ptr and that's it :) */ - - sockaddr_storage remote_address{}; - iovec io_vector{}; - io_vector.iov_base = (void*) raw_read_buffer; - io_vector.iov_len = kRecvBufferSize; - - char message_headers[0x100]; - - msghdr message{}; - message.msg_name = &remote_address; - message.msg_namelen = sizeof(remote_address); - message.msg_iov = &io_vector; - message.msg_iovlen = 1; - message.msg_control = message_headers; - message.msg_controllen = 0x100; - - auto read_timeout = system_clock::now() + microseconds{2500}; /* read 2.5ms long at a time or 'till nothing more is there */ - while(system_clock::now() <= read_timeout){ - message.msg_flags = 0; - bytes_read = recvmsg(fd, &message, 0); - - if((message.msg_flags & MSG_TRUNC) > 0) { - static std::chrono::system_clock::time_point last_error_message{}; - auto now = system_clock::now(); - if(last_error_message + std::chrono::seconds{5} < now) { - logError(ts_server->getServerId(), "Received truncated message from {}", net::to_string(remote_address)); - last_error_message = now; - } - continue; - } - - if(bytes_read < 0) { - if(errno == EAGAIN) { - break; - } - - //Nothing more to read - logCritical(ts_server->getServerId(), "Could not receive datagram packet! Code: {} Reason: {}", errno, strerror(errno)); - break; - } else if(bytes_read == 0){ - //This should never happen - break; - } - - if(bytes_read < MAC_SIZE + CLIENT_HEADER_SIZE) { - /* reenable for debug. else short packages could be a dos attach */ - //logError(ts_server->getServerId(), "Received an too short packet!"); - continue; - } - - if(*(uint64_t*) raw_read_buffer == TS3INIT.integral) { - //Handle ddos protection... - voice_server->pow_handler->handle_datagram(event_handle->socket_id, remote_address, message, read_buffer.view(0, bytes_read)); - continue; - } - - protocol::ClientPacketParser packet_parser{read_buffer.view(0, bytes_read)}; - if(!packet_parser.valid()) { - return; - } - - std::shared_ptr client{}; - { - auto client_id = packet_parser.client_id(); - if(client_id > 0) { - client = dynamic_pointer_cast(voice_server->server->find_client_by_id(client_id)); - } else { - client = voice_server->findClient(&remote_address, true); - } - } - - if(!client) { - continue; - } - - if(memcmp(&client->remote_address, &remote_address, sizeof(sockaddr_storage)) != 0) { /* verify the remote address */ - /* only encrypted packets are allowed */ - if(!packet_parser.has_flag(protocol::PacketFlag::Unencrypted) && client->state == ConnectionState::CONNECTED) { - /* the ip had changed */ - if(client->connection->verify_encryption(packet_parser)) { - auto old_address = net::to_string(client->remote_address); - auto new_address = net::to_string(remote_address); - - auto command = "dummy_ipchange old_ip=" + old_address + " new_ip=" + new_address; - client->server_command_queue()->enqueue_command_string(command); - memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); - udp::DatagramPacket::extract_info(message, client->connection->remote_address_info_); - } - } else { - continue; /* we've no clue */ - } - } - - if(client->state != ConnectionState::DISCONNECTED) { - client->connection->handle_incoming_datagram(packet_parser); - client = nullptr; - } - } -} - -#ifndef USE_TIMER - #ifdef ALARM_TIMER - #undef ALARM_TIMER - #endif - #define ALARM_TIMER(...) -#endif - -template -struct IOData { - int file_descriptor = 0; - iovec vector{}; - struct msghdr message{}; - char message_headers[MHS]{}; - - IOData() { - /* Speed is key here, we dont need to zero paddings! - memset(&this->vector, 0, sizeof(this->vector)); - memset(&this->message, 0, sizeof(this->message)); - memset(this->message_headers, 0, sizeof(this->message_headers)); - */ - - this->vector.iov_base = nullptr; - this->vector.iov_len = 0; - - this->message.msg_name = nullptr; - this->message.msg_namelen = 0; - - this->message.msg_iov = &vector; - this->message.msg_iovlen = 1; - - this->message.msg_control = this->message_headers; - this->message.msg_controllen = sizeof(this->message_headers); - } -}; - -template -inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, udp::pktinfo_storage* info, size_t length, const void* buffer) { - io.message.msg_flags = 0; - io.message.msg_name = (void*) &address; - io.message.msg_namelen = address.ss_family == AF_INET ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); - - io.vector.iov_len = length; - io.vector.iov_base = (void*) buffer; - - if(info) { - auto cmsg = CMSG_FIRSTHDR(&io.message); - if(address.ss_family == AF_INET) { - cmsg->cmsg_level = IPPROTO_IP; - cmsg->cmsg_type = IP_PKTINFO; - cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo)); - - memcpy(CMSG_DATA(cmsg), info, sizeof(in_pktinfo)); - - io.message.msg_controllen = CMSG_SPACE(sizeof(in_pktinfo)); - } else if(address.ss_family == AF_INET6) { - cmsg->cmsg_level = IPPROTO_IPV6; - cmsg->cmsg_type = IPV6_PKTINFO; - cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo)); - - memcpy(CMSG_DATA(cmsg), info, sizeof(in6_pktinfo)); - - io.message.msg_controllen = CMSG_SPACE(sizeof(in6_pktinfo)); - } else if(address.ss_family == 0) - return length; /* address is unset (testing ip loss i guess) */ - } else { - io.message.msg_controllen = 0; - } - - auto status = sendmsg(io.file_descriptor, &io.message, 0); - if(status< 0 && errno == EINVAL) { - /* may something is wrong here */ - status = send(io.file_descriptor, buffer, length, 0); - if(status < 0) - return -0xFEB; - } - return status; -} - -void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) { - (void) events; - - using WBufferPopResult = server::udp::PacketEncoder::BufferPopResult; - auto event_handle = (io::IOEventLoopEntry*) _event_handle; - auto voice_server = event_handle->voice_server; - - bool retrigger{false}; - - IOData<0x100> io{}; - io.file_descriptor = fd; - - { /* write and process clients */ - shared_ptr client; - protocol::OutgoingServerPacket* packet; - WBufferPopResult client_wbuffer_state; - bool more_clients; - - auto write_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ - while(system_clock::now() <= write_timeout){ - if(!client) { - auto client_queue_state = event_handle->pop_voice_write_queue(client); /* we need a new client, the old client has nothing more to do */ - if(client_queue_state == 2) { - break; - } - - assert(client); - more_clients = (bool) client_queue_state; - } - - client_wbuffer_state = WBufferPopResult::MORE_AVAILABLE; - while(system_clock::now() <= write_timeout) { - packet = nullptr; - client_wbuffer_state = client->connection->packet_encoder().pop_write_buffer(packet); - if(!packet) { - assert(client_wbuffer_state == WBufferPopResult::DRAINED); - break; - } - - ssize_t res = write_datagram(io, client->remote_address, &client->connection->remote_address_info_, packet->packet_length(), packet->packet_data()); - if(res != packet->packet_length()) { - if(errno == EAGAIN) { - client->connection->packet_encoder().reenqueue_failed_buffer(packet); - logTrace(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN). Rescheduling packet.", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); - return; - } else if(errno == EINVAL || res == -0xFEB) { - /* needs more debug */ - auto voice_client = dynamic_pointer_cast(client); - logCritical( - voice_server->server->getServerId(), - "Failed to write datagram packet ({} @ {}) for client {} ({}) {}. Dropping packet! Extra data: [fd: {}/{}, supposed socket: {}/{} => {}, client family: {}, socket family: {}]", - packet->packet_length(), packet->packet_data(), - client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), - strerror(errno), - res, - fd, - event_handle->file_descriptor, - voice_client->connection->socket_id(), - event_handle->socket_id, - voice_server->io->resolve_file_descriptor(voice_client), - voice_client->isAddressV4() ? "v4" : voice_client->isAddressV6() ? "v6" : "v?", - event_handle->family == AF_INET ? "v4" : "v6" - ); - } else { - logCritical( - voice_server->server->getServerId(), - "Failed to write datagram packet for client {} (errno: {} message: {}). Dropping packet!", - client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), - errno, - strerror(errno) - ); - } - packet->unref(); - break; - } - packet->unref(); - if(client_wbuffer_state == WBufferPopResult::DRAINED) - break; - } - - if(client_wbuffer_state == WBufferPopResult::MORE_AVAILABLE) { - /* we exceeded the max write time, rescheduling write */ - voice_server->triggerWrite(client); - } - client.reset(); - } - - retrigger |= more_clients; - - } - - /* write all manually specified datagram packets */ - { - auto write_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ - udp::DatagramPacket* packet; - - while(system_clock::now() <= write_timeout && (packet = event_handle->pop_dg_write_queue())) { - ssize_t res = write_datagram(io, packet->address, &packet->pktinfo, packet->data_length, packet->data); - if(res != packet->data_length) { - if(errno == EAGAIN) { - event_handle->push_dg_write_queue(packet); - } else { - udp::DatagramPacket::destroy(packet); - } - - logError(voice_server->server->getServerId(), "Failed to send datagram. Wrote {} out of {}. {}/{}", res, packet->data_length, errno, strerror(errno)); - retrigger = false; - break; - } - udp::DatagramPacket::destroy(packet); - } - - retrigger |= packet != nullptr; /* memory stored at packet is not accessible anymore. But anyways pop_dg_write_queue returns 0 if there is nothing more */ - } - - if(retrigger) { - event_add(event_handle->event_write, nullptr); - } -} - -void VoiceServer::send_datagram(int socket, udp::DatagramPacket* packet) { - this->io->send_datagram(packet, socket); + auto command = "dummy_ipchange old_ip=" + old_address + " new_ip=" + new_address; + client->server_command_queue()->enqueue_command_string(command); + memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); + memcpy(&client->connection->remote_address_info_, &remote_address_info, sizeof(remote_address_info)); } \ No newline at end of file diff --git a/server/src/server/VoiceServer.h b/server/src/server/VoiceServer.h index 4f8b53e..161d21c 100644 --- a/server/src/server/VoiceServer.h +++ b/server/src/server/VoiceServer.h @@ -8,81 +8,197 @@ #include #include #include +#include #include "VoiceIOManager.h" #include "./voice/DatagramPacket.h" #include "Definitions.h" +#include namespace ts { - namespace protocol { - class PuzzleManager; - } - namespace server { class VirtualServer; class ConnectedClient; class VoiceClient; class POWHandler; - struct VoiceServerBinding { - sockaddr_storage address{}; - int file_descriptor = 0; + class VoiceServerSocket : public std::enable_shared_from_this { + public: + struct NetworkEvents { + VoiceServerSocket* socket; + struct event* event_read{nullptr}; + struct event* event_write{nullptr}; - [[nodiscard]] inline std::string address_string() const { return net::to_string(address); } - [[nodiscard]] inline uint16_t address_port() const { return net::port(address); } + NetworkEvents(VoiceServerSocket* socket) : socket{socket} {}; + NetworkEvents(const NetworkEvents&) = delete; + NetworkEvents(NetworkEvents&&) = delete; + ~NetworkEvents(); + }; + + explicit VoiceServerSocket(VoiceServer* server, sockaddr_storage address); + virtual ~VoiceServerSocket(); + + [[nodiscard]] inline auto is_active() const { return this->file_descriptor > 0; }; + [[nodiscard]] inline const sockaddr_storage& address() const { return this->address_; }; + + /** + * Create a new UDP server socket on the target address. + */ + [[nodiscard]] bool activate(std::string& /* error */); + + /** + * Deactivate the binding if activated. + * Note: This will block until all active network events have been processed. + */ + void deactivate(); + + inline void send_datagram(udp::DatagramPacket* datagram) { + assert(!datagram->next_packet); + datagram->next_packet = nullptr; + + std::lock_guard lock{this->mutex}; + if(!this->file_descriptor) { + udp::DatagramPacket::destroy(datagram); + return; + } + + *this->write_datagram_tail = datagram; + this->write_datagram_tail = &datagram->next_packet; + this->enqueue_network_write(); + } + + inline void enqueue_client_write(std::weak_ptr client) { + std::lock_guard lock{this->mutex}; + if(!this->file_descriptor) { + return; + } + + this->write_client_queue.push_back(std::move(client)); + this->enqueue_network_write(); + } + + private: + ServerId server_id; + VoiceServer* server; + sockaddr_storage address_; + + std::mutex mutex{}; + int file_descriptor{0}; + std::vector> network_events{}; + size_t network_write_index{0}; + + udp::DatagramPacket* write_datagram_head{nullptr}; + udp::DatagramPacket** write_datagram_tail{&this->write_datagram_head}; + std::deque> write_client_queue; + + inline udp::DatagramPacket* pop_dg_write_queue() { + std::lock_guard lock{this->mutex}; + if(!this->write_datagram_head) { + return nullptr; + } + + auto packet = std::exchange(this->write_datagram_head, this->write_datagram_head->next_packet); + if(!this->write_datagram_head) { + assert(this->write_datagram_tail == &packet->next_packet); + this->write_datagram_tail = &this->write_datagram_head; + } + + return packet; + } + + inline bool pop_voice_write_queue(std::shared_ptr& result) { + std::lock_guard lock{this->mutex}; + + auto it_begin = this->write_client_queue.begin(); + auto it_end = this->write_client_queue.end(); + auto it = it_begin; + + while(it != it_end) { + result = it->lock(); + if(result) { + this->write_client_queue.erase(it_begin, ++it); + return it != it_end; + } + it++; + } + + if(it_begin != it_end) { + this->write_client_queue.erase(it_begin, it_end); + } + return false; + } + + /** + * Enqueue a write event. + * Attention: The mutex should be locked! + */ + inline void enqueue_network_write() { + assert(!this->network_events.empty()); + auto write_event = this->network_events[this->network_write_index++ % this->network_events.size()]->event_write; + event_add(write_event, nullptr); + } + + static void network_event_read(int, short, void *); + static void network_event_write(int, short, void *); }; class VoiceServer { - friend class VoiceClient; - friend class io::VoiceIOManager; - friend struct io::IOEventLoopEvents; - friend class POWHandler; + friend class VoiceServerSocket; + friend class VoiceClient; /* Not needed any more */ + friend class io::VoiceIOManager; /* Not needed any more */ + friend struct io::IOEventLoopEvents; /* Not needed any more */ + friend class POWHandler; /* TODO: Still needed? May use some kind of callback */ public: explicit VoiceServer(const std::shared_ptr& server); ~VoiceServer(); - bool start(const std::deque>&, std::string&); + bool start(const std::deque&, std::string&); bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000)); - std::shared_ptr findClient(ClientId); - std::shared_ptr findClient(sockaddr_in* addr, bool lock = true); - std::shared_ptr findClient(sockaddr_in6* addr, bool lock = true); - inline std::shared_ptr findClient(sockaddr_storage* address, bool lock = true) { - return address->ss_family == AF_INET ? - this->findClient((sockaddr_in*) address, lock) : - address->ss_family == AF_INET6 ? - this->findClient((sockaddr_in6*) address, lock) : - nullptr; + [[nodiscard]] std::shared_ptr findClient(ClientId); + [[nodiscard]] std::shared_ptr findClient(sockaddr_in* addr, bool lock); + [[nodiscard]] std::shared_ptr findClient(sockaddr_in6* addr, bool lock); + [[nodiscard]] inline std::shared_ptr findClient(sockaddr_storage* address, bool lock = true) { + switch(address->ss_family) { + case AF_INET: + return this->findClient((sockaddr_in*) address, lock); + + case AF_INET6: + return this->findClient((sockaddr_in6*) address, lock); + + default: + return nullptr; + } } + [[nodiscard]] inline auto getSockets() { + std::lock_guard lock{this->sockets_mutex}; + return this->sockets; + } + + [[nodiscard]] inline std::shared_ptr get_server() { return this->server; } + + void tickHandshakingClients(); + void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */); bool unregisterConnection(std::shared_ptr); - inline std::deque> activeBindings() { - std::deque> result; - for(const auto& entry : this->bindings) - if(entry->file_descriptor > 0) result.push_back(entry); - return result; - } - - inline std::shared_ptr get_server() { return this->server; } private: std::unique_ptr pow_handler; - std::shared_ptr server = nullptr; + std::shared_ptr server{nullptr}; - bool running = false; - std::deque> bindings; + bool running{false}; + + std::shared_mutex sockets_mutex{}; + std::deque> sockets{}; + + task_id handshake_tick_task{0}; std::recursive_mutex connectionLock; std::deque> activeConnections; - public: - void tickHandshakingClients(); - void triggerWrite(const std::shared_ptr &); - void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */); - void send_datagram(int /* socket */, udp::DatagramPacket* /* packet */); - - std::shared_ptr io; - private: - static void handleMessageRead(int, short, void *); - static void handleMessageWrite(int, short, void *); + void handleClientAddressChange( + const std::shared_ptr& /* voice client */, + const sockaddr_storage& /* new address */, + const udp::pktinfo_storage& /* remote address info */ + ); }; } } \ No newline at end of file diff --git a/server/src/server/VoiceServerSocket.cpp b/server/src/server/VoiceServerSocket.cpp new file mode 100644 index 0000000..3814970 --- /dev/null +++ b/server/src/server/VoiceServerSocket.cpp @@ -0,0 +1,457 @@ +// +// Created by WolverinDEV on 15/04/2021. +// + +#include "POWHandler.h" +#include +#include +#include +#include +#include "../client/voice/VoiceClient.h" +#include +#include +#include "src/VirtualServerManager.h" +#include "../InstanceHandler.h" +#include "./GlobalNetworkEvents.h" + +using namespace std; +using namespace std::chrono; +using namespace ts::server; +using namespace ts::buffer; +using namespace ts; + +VoiceServerSocket::NetworkEvents::~NetworkEvents() { + auto event_read_ = std::exchange(this->event_read, nullptr); + auto event_write_ = std::exchange(this->event_read, nullptr); + + if(event_read_) { + event_free(event_read_); + } + + if(event_write_) { + event_free(event_write_); + } +} + +VoiceServerSocket::VoiceServerSocket(VoiceServer *server, sockaddr_storage address) : server{server}, server_id{server->get_server()->getServerId()}, address_{address} { } + +VoiceServerSocket::~VoiceServerSocket() { + /* just to ensure and to clean up pending writes */ + this->deactivate(); +} + +bool VoiceServerSocket::activate(std::string &error) { + this->file_descriptor = socket(this->address_.ss_family, SOCK_DGRAM, 0); + if(this->file_descriptor <= 0) { + this->file_descriptor = 0; + error = "failed to allocate new socket"; + return false; + } + + int enable = 1, disable = 0; + if(setsockopt(this->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &disable, sizeof(int)) < 0) { + logError(server_id, "Could not disable flag reuse address for bind {}!", net::to_string(this->address_)); + } + + /* + if(setsockopt(this->file_descriptor, SOL_SOCKET, SO_REUSEPORT, &disable, sizeof(int)) < 0) { + logError(server_id, "Could not disable flag reuse port for bind {}!", net::to_string(this->address_)); + } + */ + + /* We're never sending over MTU size packets! */ + int pmtu{IP_PMTUDISC_DO}; + setsockopt(this->file_descriptor, IPPROTO_IP, IP_MTU_DISCOVER, &pmtu, sizeof(pmtu)); + + if(fcntl(this->file_descriptor, F_SETFD, FD_CLOEXEC) < 0) { + error = "failed to enable FD_CLOEXEC"; + goto bind_failed; + } + + if(this->address_.ss_family == AF_INET6) { + if(setsockopt(this->file_descriptor, IPPROTO_IPV6, IPV6_RECVPKTINFO, &enable, sizeof(enable)) < 0) { + error = "failed to enable IPV6_RECVPKTINFO"; + goto bind_failed; + } + + if(setsockopt(this->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(enable)) < 0) { + error = "failed to enable IPV6_V6ONLY"; + goto bind_failed; + } + } else { + if(setsockopt(this->file_descriptor, IPPROTO_IP, IP_PKTINFO, &enable, sizeof(enable)) < 0) { + error = "failed to enable IP_PKTINFO"; + goto bind_failed; + } + } + + if(::bind(this->file_descriptor, (const sockaddr*) &this->address_, net::address_size(this->address_)) < 0) { + error = "bind failed: " + std::string{strerror(errno)} + " (" + std::to_string(errno) + ")"; + goto bind_failed; + } + + fcntl(this->file_descriptor, F_SETFL, fcntl(this->file_descriptor, F_GETFL, 0) | O_NONBLOCK); + + { + const auto& network_loop = serverInstance->network_event_loop(); + const auto network_event_count = std::min(network_loop->loop_count(), ts::config::threads::voice::events_per_server); + + std::lock_guard write_lock{this->mutex}; + NetworkEventLoopUseList* read_use_list{nullptr}; + NetworkEventLoopUseList* write_use_list{nullptr}; + for(size_t index{0}; index < network_event_count; index++) { + auto events = std::make_unique(this); + events->event_read = network_loop->allocate_event(this->file_descriptor, EV_READ | EV_PERSIST, VoiceServerSocket::network_event_read, &*events, &read_use_list); + events->event_write = network_loop->allocate_event(this->file_descriptor, EV_WRITE, VoiceServerSocket::network_event_write, &*events, &write_use_list); + + if(!events->event_read) { + logError(server_id, "Failed to allocate network read event for voice server binding {}", net::to_string(this->address_)); + continue; + } + + if(!events->event_write) { + logError(server_id, "Failed to allocate network write event for voice server binding {}", net::to_string(this->address_)); + continue; + } + + event_add(events->event_read, nullptr); + this->network_events.emplace_back(std::move(events)); + } + + if(this->network_events.empty()) { + error = "failed to register any network events"; + goto bind_failed; + } + } + + return true; + + bind_failed: + this->deactivate(); + return false; +} + +void VoiceServerSocket::deactivate() { + for(const auto& binding : this->network_events) { + if(binding->event_read) { + event_del_block(binding->event_read); + event_free(binding->event_read); + } + + if(binding->event_write) { + event_del_block(binding->event_write); + event_free(binding->event_write); + } + } + + { + std::lock_guard write_lock{this->mutex}; + this->network_events.clear(); + + if(this->file_descriptor > 0) { + ::close(this->file_descriptor); + this->file_descriptor = 0; + } + + this->write_client_queue.clear(); + while(this->write_datagram_head) { + auto datagram = std::exchange(this->write_datagram_head, this->write_datagram_head->next_packet); + udp::DatagramPacket::destroy(datagram); + } + this->write_datagram_tail = &this->write_datagram_head; + } +} + + +template +struct IOData { + int file_descriptor = 0; + iovec vector{}; + struct msghdr message{}; + char message_headers[MHS]{}; + + IOData() { + /* Speed is key here, we dont need to zero paddings! */ +#if 0 + memset(&this->vector, 0, sizeof(this->vector)); + memset(&this->message, 0, sizeof(this->message)); + memset(this->message_headers, 0, sizeof(this->message_headers)); +#endif + + this->vector.iov_base = nullptr; + this->vector.iov_len = 0; + + this->message.msg_name = nullptr; + this->message.msg_namelen = 0; + + this->message.msg_iov = &vector; + this->message.msg_iovlen = 1; + + this->message.msg_control = this->message_headers; + this->message.msg_controllen = sizeof(this->message_headers); + } +}; + +template +inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, const udp::pktinfo_storage* info, size_t length, const void* buffer) { + io.message.msg_flags = 0; + io.message.msg_name = (void*) &address; + io.message.msg_namelen = address.ss_family == AF_INET ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); + + io.vector.iov_len = length; + io.vector.iov_base = (void*) buffer; + + if(info) { + auto cmsg = CMSG_FIRSTHDR(&io.message); + if(address.ss_family == AF_INET) { + cmsg->cmsg_level = IPPROTO_IP; + cmsg->cmsg_type = IP_PKTINFO; + cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo)); + + memcpy(CMSG_DATA(cmsg), info, sizeof(in_pktinfo)); + + io.message.msg_controllen = CMSG_SPACE(sizeof(in_pktinfo)); + } else if(address.ss_family == AF_INET6) { + cmsg->cmsg_level = IPPROTO_IPV6; + cmsg->cmsg_type = IPV6_PKTINFO; + cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo)); + + memcpy(CMSG_DATA(cmsg), info, sizeof(in6_pktinfo)); + + io.message.msg_controllen = CMSG_SPACE(sizeof(in6_pktinfo)); + } else if(address.ss_family == 0) { + return length; /* address is unset (testing ip loss i guess) */ + } + } else { + io.message.msg_controllen = 0; + } + + auto status = sendmsg(io.file_descriptor, &io.message, 0); + if(status< 0 && errno == EINVAL) { + /* may something is wrong here */ + status = send(io.file_descriptor, buffer, length, 0); + if(status < 0) { + return -0xFEB; + } + } + return status; +} + +void VoiceServerSocket::network_event_write(int, short, void *ptr_network_events) { + auto network_events = (NetworkEvents*) ptr_network_events; + auto socket = network_events->socket; + bool add_write_event{false}; + + IOData<0x100> io{}; + io.file_descriptor = socket->file_descriptor; + + { /* write and process clients */ + std::shared_ptr client; + protocol::OutgoingServerPacket* packet{nullptr}; + bool more_clients{true}; + + auto write_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ + while(system_clock::now() <= write_timeout) { + more_clients = socket->pop_voice_write_queue(client); + if(!client) { + /* No pending client writes */ + break; + } + + bool client_data_pending{true}; + while(client_data_pending && std::chrono::system_clock::now() <= write_timeout) { + auto& client_packet_encoder = client->getConnection()->packet_encoder(); + + assert(!packet); + client_data_pending = client_packet_encoder.pop_write_buffer(packet); + if(!packet) { + assert(!client_data_pending); + break; + } + + ssize_t res = write_datagram(io, client->get_remote_address(), &client->getConnection()->remote_address_info(), packet->packet_length(), packet->packet_data()); + if(res <= 0) { + if(errno == EAGAIN) { + client_packet_encoder.reenqueue_failed_buffer(packet); + logTrace(socket->server_id, "Failed to write datagram packet for client {} (EAGAIN). Rescheduling packet.", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); + return; + } else if(errno == EINVAL || res == -0xFEB) { + /* needs more debug */ + auto voice_client = dynamic_pointer_cast(client); + logCritical( + socket->server_id, + "Failed to write datagram packet ({} @ {}) for client {} ({}) {}. Dropping packet! Extra data: [fd: {}, client family: {}, socket family: {}]", + packet->packet_length(), packet->packet_data(), + client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), + strerror(errno), + res, + socket->file_descriptor, + voice_client->isAddressV4() ? "v4" : voice_client->isAddressV6() ? "v6" : "v?", + socket->address_.ss_family == AF_INET ? "v4" : "v6" + ); + } else { + logCritical( + socket->server_id, + "Failed to write datagram packet for client {} (errno: {} message: {}). Dropping packet!", + client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()), + errno, + strerror(errno) + ); + } + packet->unref(); + break; + } else if(res != packet->packet_length()) { + logWarning(socket->server_id, "Datagram write result didn't matches the datagrams size. Expected {}, Received {}", packet->packet_length(), res); + } + + packet->unref(); + packet = nullptr; + } + + if(client_data_pending) { + /* we exceeded the max write time, rescheduling write */ + socket->enqueue_client_write(client); + more_clients = true; + } + + client = nullptr; + } + + add_write_event |= more_clients; + } + + /* write all manually specified datagram packets */ + { + auto write_timeout = system_clock::now() + std::chrono::microseconds{2500}; /* read 2.5ms long at a time or 'till nothing more is there */ + udp::DatagramPacket* packet; + + while(system_clock::now() <= write_timeout && (packet = socket->pop_dg_write_queue())) { + ssize_t res = write_datagram(io, packet->address, &packet->pktinfo, packet->data_length, packet->data); + if(res != packet->data_length) { + if(errno == EAGAIN) { + /* Just resend it */ + socket->send_datagram(packet); + } else { + udp::DatagramPacket::destroy(packet); + } + + logError(socket->server_id, "Failed to send datagram. Wrote {} out of {}. {}/{}", res, packet->data_length, errno, strerror(errno)); + add_write_event = false; + break; + } + udp::DatagramPacket::destroy(packet); + } + + add_write_event |= packet != nullptr; /* memory stored at packet is not accessible anymore. But anyways pop_dg_write_queue returns 0 if there is nothing more */ + } + + if(add_write_event) { + event_add(network_events->event_write, nullptr); + } +} + + +static union { + char literal[8]{'T', 'S', '3', 'I', 'N', 'I', 'T', '1'}; + uint64_t integral; +} TS3INIT; + +constexpr static auto kRecvBufferSize{1600}; //IPv6 MTU: 1500 | IPv4 MTU: 576 +void VoiceServerSocket::network_event_read(int, short, void *ptr_network_events) { + auto network_events = (NetworkEvents*) ptr_network_events; + auto socket = network_events->socket; + + uint8_t raw_read_buffer[kRecvBufferSize]; //Allocate on stack, so we dont need heap here + + ssize_t bytes_read; + pipes::buffer_view read_buffer{raw_read_buffer, kRecvBufferSize}; /* will not allocate anything, just sets its mode to ptr and that's it :) */ + + sockaddr_storage remote_address{}; + iovec io_vector{}; + io_vector.iov_base = (void*) raw_read_buffer; + io_vector.iov_len = kRecvBufferSize; + + char message_headers[0x100]; + + msghdr message{}; + message.msg_name = &remote_address; + message.msg_namelen = sizeof(remote_address); + message.msg_iov = &io_vector; + message.msg_iovlen = 1; + message.msg_control = message_headers; + message.msg_controllen = 0x100; + + auto read_timeout = system_clock::now() + microseconds{2500}; /* read 2.5ms long at a time or 'till nothing more is there */ + while(system_clock::now() <= read_timeout) { + message.msg_flags = 0; + bytes_read = recvmsg(socket->file_descriptor, &message, 0); + + if((message.msg_flags & MSG_TRUNC) > 0) { + static std::chrono::system_clock::time_point last_error_message{}; + auto now = system_clock::now(); + if(last_error_message + std::chrono::seconds{5} < now) { + logError(socket->server_id, "Received truncated message from {}", net::to_string(remote_address)); + last_error_message = now; + } + continue; + } + + if(bytes_read < 0) { + if(errno == EAGAIN) { + break; + } + + //Nothing more to read + logCritical(socket->server_id, "Could not receive datagram packet! Code: {} Reason: {}", errno, strerror(errno)); + break; + } else if(bytes_read == 0){ + //This should never happen + break; + } + + if(*(uint64_t*) raw_read_buffer == TS3INIT.integral) { + //Handle ddos protection... + /* TODO: Don't pass the raw buffer instead pass the protocol::ClientPacketParser and ClientPacketParser mus allow the INIT packet */ + socket->server->pow_handler->handle_datagram(socket->shared_from_this(), remote_address, message, read_buffer.view(0, bytes_read)); + continue; + } + + protocol::ClientPacketParser packet_parser{read_buffer.view(0, bytes_read)}; + if(!packet_parser.valid()) { + return; + } + + std::shared_ptr client{}; + { + auto client_id = packet_parser.client_id(); + if(client_id > 0) { + client = dynamic_pointer_cast(socket->server->server->find_client_by_id(client_id)); + } else { + client = socket->server->findClient(&remote_address, true); + } + } + + if(!client) { + continue; + } + + auto client_connection = client->getConnection(); + if(memcmp(&client->get_remote_address(), &remote_address, sizeof(sockaddr_storage)) != 0) { /* verify the remote address */ + /* only encrypted packets are allowed */ + if(!packet_parser.has_flag(protocol::PacketFlag::Unencrypted) && client->connectionState() == ConnectionState::CONNECTED) { + /* the ip had changed */ + if(client_connection->verify_encryption(packet_parser)) { + udp::pktinfo_storage remote_address_info; + udp::DatagramPacket::extract_info(message, remote_address_info); + socket->server->handleClientAddressChange(client, remote_address, remote_address_info); + } + } else { + continue; + } + } + + if(client->connectionState() != ConnectionState::DISCONNECTED) { + client_connection->handle_incoming_datagram(packet_parser); + } + } +} \ No newline at end of file diff --git a/server/src/server/voice/DatagramPacket.cpp b/server/src/server/voice/DatagramPacket.cpp index 5636729..967fb13 100644 --- a/server/src/server/voice/DatagramPacket.cpp +++ b/server/src/server/voice/DatagramPacket.cpp @@ -34,7 +34,9 @@ void DatagramPacket::destroy(DatagramPacket *packet) { int DatagramPacket::extract_info(msghdr &message, pktinfo_storage &info) { for (cmsghdr* cmsg = CMSG_FIRSTHDR(&message); cmsg != nullptr; cmsg = CMSG_NXTHDR(&message, cmsg)) { // iterate through all the control headers - if(cmsg->cmsg_type != IP_PKTINFO && cmsg->cmsg_type != IPV6_PKTINFO) continue; + if(cmsg->cmsg_type != IP_PKTINFO && cmsg->cmsg_type != IPV6_PKTINFO) { + continue; + } if(cmsg->cmsg_level == IPPROTO_IP) { memcpy(&info, (void*) CMSG_DATA(cmsg), sizeof(in_pktinfo)); diff --git a/shared b/shared index 0726cd6..1eaa9bb 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 0726cd6c95ff5597bfc20ac2bb560ad03ace7b49 +Subproject commit 1eaa9bb371387aeb17b0d87e8068654b5844e9fe