From fa7a390fe3d5e8c255af5e1e6831fdce05f9859b Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Mon, 3 Aug 2020 13:51:47 +0200 Subject: [PATCH] Adding a basic skillet to the new voice server - fixed some connection issues --- server/CMakeLists.txt | 3 + server/src/client/command_handler/server.cpp | 4 +- server/src/client/voice/CryptSetupHandler.cpp | 2 +- .../src/client/voice/VoiceClientConnection.h | 2 +- server/src/music/MusicBotManager.cpp | 6 +- server/src/server/POWHandler.cpp | 4 +- server/src/server/POWHandler.h | 2 +- server/src/server/VoiceIOManager.cpp | 28 +------- server/src/server/VoiceIOManager.h | 43 ++--------- server/src/server/VoiceServer.cpp | 29 ++------ server/src/server/VoiceServer.h | 16 ++--- server/src/server/voice/DatagramPacket.cpp | 48 +++++++++++++ server/src/server/voice/DatagramPacket.h | 32 +++++++++ server/src/server/voice/UDPVoiceServer.cpp | 5 ++ server/src/server/voice/UDPVoiceServer.h | 71 +++++++++++++++++++ shared | 2 +- 16 files changed, 190 insertions(+), 107 deletions(-) create mode 100644 server/src/server/voice/DatagramPacket.cpp create mode 100644 server/src/server/voice/DatagramPacket.h create mode 100644 server/src/server/voice/UDPVoiceServer.cpp create mode 100644 server/src/server/voice/UDPVoiceServer.h diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 5e42aee..b408186 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -157,7 +157,10 @@ set(SERVER_SOURCE_FILES src/terminal/PipedTerminal.cpp + src/server/voice/UDPVoiceServer.cpp + src/server/voice/DatagramPacket.cpp ) + if (COMPILE_WEB_CLIENT) add_definitions(-DCOMPILE_WEB_CLIENT) diff --git a/server/src/client/command_handler/server.cpp b/server/src/client/command_handler/server.cpp index ad567eb..b166ba9 100644 --- a/server/src/client/command_handler/server.cpp +++ b/server/src/client/command_handler/server.cpp @@ -583,8 +583,10 @@ command_result ConnectedClient::handleCommandServerGroupClientList(Command &cmd) notify[index]["client_unique_identifier"] = clientEntry.uid; index++; } - if(index == 0) + + if(index == 0 && this->getType() != ClientType::CLIENT_TEAMSPEAK) { return ts::command_result{error::database_empty_result}; + } this->sendCommand(notify); return command_result{error::ok}; diff --git a/server/src/client/voice/CryptSetupHandler.cpp b/server/src/client/voice/CryptSetupHandler.cpp index 925bcf5..add0a34 100644 --- a/server/src/client/voice/CryptSetupHandler.cpp +++ b/server/src/client/voice/CryptSetupHandler.cpp @@ -107,7 +107,7 @@ CryptSetupHandler::CommandResult CryptSetupHandler::handleCommandClientInitIv(co client->state = ConnectionState::INIT_HIGH; } else if(client->state == ConnectionState::INIT_HIGH) { logTrace(client->getServerId(), "{} Received a duplicated initiv. It seems like our initivexpand2 hasn't yet reached the client. The acknowledge handler should handle this issue for us.", CLIENT_STR_LOG_PREFIX_(client)); - return command_result{error::ok}; + return CommandHandleResult::CONSUME_COMMAND; /* we don't want to send an error id=0 msg=ok */ } else { client->state = ConnectionState::INIT_HIGH; } diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index 901abff..e5c2e5b 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -91,7 +91,7 @@ namespace ts { int socket_id_{0}; sockaddr_storage remote_address_{}; - io::pktinfo_storage remote_address_info_{}; + server::udp::pktinfo_storage remote_address_info_{}; CryptHandler crypt_handler; /* access to CryptHandler is thread save */ server::client::PacketStatistics packet_statistics_{}; diff --git a/server/src/music/MusicBotManager.cpp b/server/src/music/MusicBotManager.cpp index 9dc4bf1..454d103 100644 --- a/server/src/music/MusicBotManager.cpp +++ b/server/src/music/MusicBotManager.cpp @@ -47,19 +47,19 @@ void MusicBotManager::cleanup_client_bots(ts::ClientDbId clientid) { this->deleteBot(bot); } -std::deque> MusicBotManager::available_bots() { +std::deque> MusicBotManager::available_bots() { lock_guard lock(music_bots_lock); return this->music_bots; } -std::shared_ptr MusicBotManager::find_bot_by_playlist(const std::shared_ptr &playlist) { +std::shared_ptr MusicBotManager::find_bot_by_playlist(const std::shared_ptr &playlist) { for(const auto& bot : this->available_bots()) if(bot->playlist() == playlist) return bot; return nullptr; } -std::deque> MusicBotManager::listBots(ClientDbId clid) { +std::deque> MusicBotManager::listBots(ClientDbId clid) { lock_guard lock(music_bots_lock); std::deque> res; for(const auto& bot : this->music_bots) diff --git a/server/src/server/POWHandler.cpp b/server/src/server/POWHandler.cpp index 43f65ef..c63cd32 100644 --- a/server/src/server/POWHandler.cpp +++ b/server/src/server/POWHandler.cpp @@ -72,7 +72,7 @@ void POWHandler::handle_datagram(int socket, const sockaddr_storage &address,msg client->socket = socket; client->client_version = be2le32(&buffer[MAC_SIZE + CLIENT_HEADER_SIZE]); memcpy(&client->address, &address, sizeof(client->address)); - io::DatagramPacket::extract_info(info, client->address_info); + udp::DatagramPacket::extract_info(info, client->address_info); client->state = LowHandshakeState::COOKIE_GET; } @@ -126,7 +126,7 @@ void POWHandler::handle_datagram(int socket, const sockaddr_storage &address,msg } void POWHandler::send_data(const std::shared_ptr &client, const pipes::buffer_view &buffer) { - auto datagram = io::DatagramPacket::create(client->address, client->address_info, buffer.length() + MAC_SIZE + SERVER_HEADER_SIZE, nullptr); + auto datagram = udp::DatagramPacket::create(client->address, client->address_info, buffer.length() + MAC_SIZE + SERVER_HEADER_SIZE, nullptr); if(!datagram) return; //Should never happen /* first 8 bytes mac */ diff --git a/server/src/server/POWHandler.h b/server/src/server/POWHandler.h index 4a70ebf..8d4b72d 100644 --- a/server/src/server/POWHandler.h +++ b/server/src/server/POWHandler.h @@ -27,7 +27,7 @@ namespace ts::server { struct Client { int socket; sockaddr_storage address; - io::pktinfo_storage address_info; + udp::pktinfo_storage address_info; std::timed_mutex handle_lock; std::chrono::system_clock::time_point last_packet; diff --git a/server/src/server/VoiceIOManager.cpp b/server/src/server/VoiceIOManager.cpp index ca0abe4..7fbc73a 100644 --- a/server/src/server/VoiceIOManager.cpp +++ b/server/src/server/VoiceIOManager.cpp @@ -242,7 +242,7 @@ void IOServerHandler::invoke_write(const std::shared_ptrevent_write, nullptr); } -void IOServerHandler::send_datagram(ts::io::datagram_packet_t datagram, int socket) { +void IOServerHandler::send_datagram(server::udp::DatagramPacket* datagram, int socket) { if(this->event_loop_events.empty()) return; /* TODO any kind of error or warning? */ @@ -330,30 +330,4 @@ void VoiceIOManager::dispatchBase(shared_ptr self) { this->ioExecutorNotify.notify_all(); /* let everybody know we're done */ } -} - - -DatagramPacket* DatagramPacket::create(const sockaddr_storage &address, const pktinfo_storage& address_info, size_t length, const uint8_t *data) { - auto membuf = buffer::allocate_buffer(sizeof(DatagramPacket) + length); - auto instance = membuf.data_ptr(); - new (&instance->self_buffer) pipes::buffer; - - instance->next_packet = nullptr; - instance->self_buffer = membuf; - memcpy(&instance->address, &address, sizeof(address)); - if(address.ss_family == AF_INET6) { - memcpy(&instance->address_info, &address_info, sizeof(in6_pktinfo)); - } else { - memcpy(&instance->address_info, &address_info, sizeof(in_pktinfo)); - } - - instance->data_length = length; - if(data) - memcpy(&instance->data, data, length); - - return instance; -} - -void DatagramPacket::destory(ts::io::datagram_packet_t packet) { - packet->self_buffer.~buffer(); } \ No newline at end of file diff --git a/server/src/server/VoiceIOManager.h b/server/src/server/VoiceIOManager.h index 9f8abb6..16bf96d 100644 --- a/server/src/server/VoiceIOManager.h +++ b/server/src/server/VoiceIOManager.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace ts { namespace server { @@ -15,38 +16,6 @@ namespace ts { class VoiceClient; } namespace io { - union pktinfo_storage { - in_pktinfo v4; - in6_pktinfo v6; - }; - - struct DatagramPacket; - - typedef DatagramPacket* datagram_packet_t; - struct DatagramPacket { - private: - pipes::buffer self_buffer; - - public: - datagram_packet_t next_packet; - - sockaddr_storage address; - pktinfo_storage address_info; - - size_t data_length; - uint8_t data[0]; - - static void destory(datagram_packet_t); - static datagram_packet_t create(const sockaddr_storage& address, const pktinfo_storage& address_info, size_t length, const uint8_t* data); - static int extract_info(msghdr& /* header */, pktinfo_storage& /* info */); - - DatagramPacket() = delete; - DatagramPacket(const DatagramPacket&) = delete; - DatagramPacket(DatagramPacket&&) = delete; - - ~DatagramPacket() = delete; - }; - class VoiceIOManager; class IOServerHandler; struct IOEventLoopEntry; @@ -81,12 +50,12 @@ namespace ts { ::event* event_write = nullptr; spin_mutex write_queue_lock; - datagram_packet_t dg_write_queue_head = nullptr; - datagram_packet_t dg_write_queue_tail = nullptr; + server::udp::DatagramPacket* dg_write_queue_head = nullptr; + server::udp::DatagramPacket* dg_write_queue_tail = nullptr; std::deque> voice_write_queue; - inline datagram_packet_t pop_dg_write_queue() { + inline server::udp::DatagramPacket* pop_dg_write_queue() { std::lock_guard lock(this->write_queue_lock); if(!this->dg_write_queue_head) return nullptr; @@ -102,7 +71,7 @@ namespace ts { return packet; } - inline void push_dg_write_queue(datagram_packet_t packet) { + inline void push_dg_write_queue(server::udp::DatagramPacket* packet) { std::lock_guard lock(this->write_queue_lock); if(this->dg_write_queue_tail) { this->dg_write_queue_tail->next_packet = packet; @@ -193,7 +162,7 @@ namespace ts { void invoke_write(const std::shared_ptr& /* client */); int resolve_file_descriptor(const std::shared_ptr& /* client */); - void send_datagram(datagram_packet_t /* packet */, int /* socket */); + void send_datagram(server::udp::DatagramPacket* /* packet */, int /* socket */); private: std::shared_ptr create_event_loop_events(const std::shared_ptr &); diff --git a/server/src/server/VoiceServer.cpp b/server/src/server/VoiceServer.cpp index f5b1ddf..ec730d2 100644 --- a/server/src/server/VoiceServer.cpp +++ b/server/src/server/VoiceServer.cpp @@ -239,21 +239,6 @@ static union { uint64_t integral; } TS3INIT; -int io::DatagramPacket::extract_info(msghdr &message, pktinfo_storage &info) { - for (cmsghdr* cmsg = CMSG_FIRSTHDR(&message); cmsg != nullptr; cmsg = CMSG_NXTHDR(&message, cmsg)) { // iterate through all the control headers - if(cmsg->cmsg_type != IP_PKTINFO && cmsg->cmsg_type != IPV6_PKTINFO) continue; - - if(cmsg->cmsg_level == IPPROTO_IP) { - memcpy(&info, (void*) CMSG_DATA(cmsg), sizeof(in_pktinfo)); - return 4; - } else if(cmsg->cmsg_level == IPPROTO_IPV6) { - memcpy(&info, (void*) CMSG_DATA(cmsg), sizeof(in6_pktinfo)); - return 6; - } - } - return 0; -} - void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { auto event_handle = (io::IOEventLoopEntry*) _event_handle; auto voice_server = event_handle->voice_server; @@ -332,7 +317,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) { auto command = "dummy_ipchange old_ip=" + old_address + " new_ip=" + new_address; client->server_command_executor().force_insert_command(pipes::buffer_view{command.data(), command.length()}); memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); - io::DatagramPacket::extract_info(message, client->connection->remote_address_info_); + udp::DatagramPacket::extract_info(message, client->connection->remote_address_info_); } } else { continue; /* we've no clue */ @@ -382,7 +367,7 @@ struct IOData { }; template -inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, io::pktinfo_storage* info, size_t length, const void* buffer) { +inline ssize_t write_datagram(IOData& io, const sockaddr_storage& address, udp::pktinfo_storage* info, size_t length, const void* buffer) { io.message.msg_flags = 0; io.message.msg_name = (void*) &address; io.message.msg_namelen = address.ss_family == AF_INET ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); @@ -514,21 +499,21 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) /* write all manually specified datagram packets */ { auto write_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */ - io::datagram_packet_t packet; + udp::DatagramPacket* packet; while(system_clock::now() <= write_timeout && (packet = event_handle->pop_dg_write_queue())) { - ssize_t res = write_datagram(io, packet->address, &packet->address_info, packet->data_length, packet->data); + ssize_t res = write_datagram(io, packet->address, &packet->pktinfo, packet->data_length, packet->data); if(res != packet->data_length) { if(errno == EAGAIN) { event_handle->push_dg_write_queue(packet); } else - io::DatagramPacket::destory(packet); + udp::DatagramPacket::destroy(packet); logError(voice_server->server->getServerId(), "Failed to send datagram. Wrote {} out of {}. {}/{}", res, packet->data_length, errno, strerror(errno)); retrigger = false; break; } - io::DatagramPacket::destory(packet); + udp::DatagramPacket::destroy(packet); } retrigger |= packet != nullptr; /* memory stored at packet is not accessible anymore. But anyways pop_dg_write_queue returns 0 if there is nothing more */ @@ -537,6 +522,6 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) event_add(event_handle->event_write, nullptr); } -void VoiceServer::send_datagram(int socket, io::datagram_packet_t packet) { +void VoiceServer::send_datagram(int socket, udp::DatagramPacket* packet) { this->io->send_datagram(packet, socket); } \ No newline at end of file diff --git a/server/src/server/VoiceServer.h b/server/src/server/VoiceServer.h index ee78955..5732855 100644 --- a/server/src/server/VoiceServer.h +++ b/server/src/server/VoiceServer.h @@ -9,6 +9,7 @@ #include #include #include "VoiceIOManager.h" +#include "./voice/DatagramPacket.h" namespace ts { namespace protocol { @@ -25,8 +26,8 @@ namespace ts { sockaddr_storage address{}; int file_descriptor = 0; - inline std::string address_string() { return net::to_string(address); } - inline uint16_t address_port() { return net::port(address); } + [[nodiscard]] inline std::string address_string() const { return net::to_string(address); } + [[nodiscard]] inline uint16_t address_port() const { return net::port(address); } }; class VoiceServer { @@ -70,25 +71,18 @@ namespace ts { std::recursive_mutex connectionLock; std::deque> activeConnections; - public: //lib event + public: void triggerWrite(const std::shared_ptr &); void schedule_command_handling(VoiceClient const *client); void tickHandshakingClients(); void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */); - void send_datagram(int /* socket */, io::datagram_packet_t /* packet */); + void send_datagram(int /* socket */, udp::DatagramPacket* /* packet */); std::shared_ptr io; private: static void handleMessageRead(int, short, void *); static void handleMessageWrite(int, short, void *); - - /* execute loop */ - /* TODO - std::mutex execute_list_lock; - protocol::RingBuffer execute_list; - void run_execute_clients(); - */ }; } } \ No newline at end of file diff --git a/server/src/server/voice/DatagramPacket.cpp b/server/src/server/voice/DatagramPacket.cpp new file mode 100644 index 0000000..5636729 --- /dev/null +++ b/server/src/server/voice/DatagramPacket.cpp @@ -0,0 +1,48 @@ +// +// Created by WolverinDEV on 02/08/2020. +// + +#include +#include +#include "DatagramPacket.h" + +using namespace ts::server::udp; + +DatagramPacket* DatagramPacket::create(const sockaddr_storage &address, const pktinfo_storage& address_info, size_t length, const uint8_t *data) { + auto membuf = malloc(sizeof(DatagramPacket) + length); + auto instance = (DatagramPacket*) membuf; + + instance->next_packet = nullptr; + memcpy(&instance->address, &address, sizeof(address)); + if(address.ss_family == AF_INET6) { + memcpy(&instance->pktinfo, &address_info, sizeof(in6_pktinfo)); + } else { + memcpy(&instance->pktinfo, &address_info, sizeof(in_pktinfo)); + } + + instance->data_length = length; + if(data) { + memcpy(&instance->data, data, length); + } + + return instance; +} + +void DatagramPacket::destroy(DatagramPacket *packet) { + free(packet); +} + +int DatagramPacket::extract_info(msghdr &message, pktinfo_storage &info) { + for (cmsghdr* cmsg = CMSG_FIRSTHDR(&message); cmsg != nullptr; cmsg = CMSG_NXTHDR(&message, cmsg)) { // iterate through all the control headers + if(cmsg->cmsg_type != IP_PKTINFO && cmsg->cmsg_type != IPV6_PKTINFO) continue; + + if(cmsg->cmsg_level == IPPROTO_IP) { + memcpy(&info, (void*) CMSG_DATA(cmsg), sizeof(in_pktinfo)); + return 4; + } else if(cmsg->cmsg_level == IPPROTO_IPV6) { + memcpy(&info, (void*) CMSG_DATA(cmsg), sizeof(in6_pktinfo)); + return 6; + } + } + return 0; +} \ No newline at end of file diff --git a/server/src/server/voice/DatagramPacket.h b/server/src/server/voice/DatagramPacket.h new file mode 100644 index 0000000..953bf9d --- /dev/null +++ b/server/src/server/voice/DatagramPacket.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +namespace ts::server::udp { + union pktinfo_storage { + in_pktinfo v4; + in6_pktinfo v6; + }; + + struct DatagramPacket { + public: + DatagramPacket* next_packet; + + sockaddr_storage address; + pktinfo_storage pktinfo; + + size_t data_length; + uint8_t data[0]; + + static void destroy(DatagramPacket*); + static DatagramPacket* create(const sockaddr_storage& address, const pktinfo_storage& address_info, size_t length, const uint8_t* data); + + static int extract_info(msghdr& /* header */, pktinfo_storage& /* info */); + + DatagramPacket() = delete; + DatagramPacket(const DatagramPacket&) = delete; + DatagramPacket(DatagramPacket&&) = delete; + + ~DatagramPacket() = delete; + }; +} \ No newline at end of file diff --git a/server/src/server/voice/UDPVoiceServer.cpp b/server/src/server/voice/UDPVoiceServer.cpp new file mode 100644 index 0000000..3632623 --- /dev/null +++ b/server/src/server/voice/UDPVoiceServer.cpp @@ -0,0 +1,5 @@ +// +// Created by WolverinDEV on 02/08/2020. +// + +#include "UDPVoiceServer.h" diff --git a/server/src/server/voice/UDPVoiceServer.h b/server/src/server/voice/UDPVoiceServer.h new file mode 100644 index 0000000..358c879 --- /dev/null +++ b/server/src/server/voice/UDPVoiceServer.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include +#include + +namespace ts::connection { + class VoiceClientConnection; +} + +namespace ts::server::server::udp { + class Server; + class Socket { + public: + Socket(Server* /* server handle */, ServerId /* server id */, const sockaddr_storage& /* address */); + + void start(); + void stop(); + + [[nodiscard]] inline bool is_active() const { return this->file_descriptor > 0; } + [[nodiscard]] inline std::string address_string() const { return net::to_string(address); } + [[nodiscard]] inline uint16_t address_port() const { return net::port(address); } + + private: + struct EVLoopEntry { + Socket* socket; + + event* event_read{nullptr}; + event* event_write{nullptr}; + }; + + Server* server; + + ServerId server_id; + sockaddr_storage address; + + int file_descriptor{0}; + + std::deque event_loop_entries{}; + + std::mutex clients_lock{}; + std::deque> clients{}; + std::vector> client_map_by_id{}; + lookup::ip_v4 clients_by_ipv4{}; + lookup::ip_v4 clients_by_ipv6{}; + + spin_mutex client_write_lock{}; + connection::VoiceClientConnection* client_write_head{nullptr}; + connection::VoiceClientConnection** client_write_tail{&client_write_head}; + + static void callback_event_read(int, short, void*); + static void callback_event_write(int, short, void*); + }; + + struct ServerEventLoops { + event_base* event_base{nullptr}; + std::thread dispatch_thread{}; + }; + + class Server { + public: + + private: + }; +} \ No newline at end of file diff --git a/shared b/shared index c3188cd..59ec412 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit c3188cd9e5f499f4b5813953d2bc62f56800222f +Subproject commit 59ec412fea21d6342311135a697696624fc0e628