#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "VoiceClient.h" #include "protocol/AcknowledgeManager.h" #include #include "./PacketStatistics.h" //#define LOG_ACK_SYSTEM #ifdef LOG_ACK_SYSTEM #define LOG_AUTO_ACK_REQUEST #define LOG_AUTO_ACK_RESPONSE #define LOG_PKT_RESEND #endif //#define PKT_LOG_PING namespace ts { namespace server { class VoiceClient; class VoiceServer; class POWHandler; } namespace connection { class VoiceClientConnection { friend class AcknowledgeManager; friend class server::VoiceServer; friend class server::VoiceClient; friend class server::POWHandler; public: struct CommandFragment { uint16_t packet_id{0}; uint16_t packet_generation{0}; uint8_t packet_flags{0}; uint32_t payload_length : 24; pipes::buffer payload{}; CommandFragment() { this->payload_length = 0; } CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {} CommandFragment& operator=(const CommandFragment&) = default; CommandFragment(const CommandFragment& other) = default; CommandFragment(CommandFragment&&) = default; }; static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer)); typedef protocol::PacketRingBuffer command_fragment_buffer_t; typedef std::array command_packet_reassembler; explicit VoiceClientConnection(server::VoiceClient*); virtual ~VoiceClientConnection(); void sendPacket(const std::shared_ptr& original_packet, bool copy = false, bool prepare_directly = false); CryptHandler* getCryptHandler(){ return &crypt_handler; } server::VoiceClient* getClient(){ return client; } #ifdef VC_USE_READ_QUEUE bool handleNextDatagram(); #endif /* * Split packets waiting in write_process_queue and moves the final buffers to writeQueue. * @returns true when there are more packets to prepare */ bool preprocess_write_packets(); /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ int pop_write_buffer(pipes::buffer& /* buffer */); bool wait_empty_write_and_prepare_queue(std::chrono::time_point until = std::chrono::time_point()); protocol::PacketIdManager& getPacketIdManager() { return this->packet_id_manager; } AcknowledgeManager& getAcknowledgeManager() { return this->acknowledge_handler; } inline auto& get_incoming_generation_estimators() { return this->incoming_generation_estimators; } void reset(); void force_insert_command(const pipes::buffer_view& /* payload */); void register_initiv_packet(); [[nodiscard]] inline auto& packet_statistics() { return this->packet_statistics_; } //buffer::SortedBufferQueue** getReadQueue() { return this->readTypedQueue; } protected: void handle_incoming_datagram(const pipes::buffer_view &buffer); bool verify_encryption(const pipes::buffer_view& /* full packet */); void triggerWrite(); private: server::VoiceClient* client = nullptr; //Decryption / encryption stuff CryptHandler crypt_handler; /* access to CryptHandler is thread save */ CompressionHandler compress_handler; AcknowledgeManager acknowledge_handler; std::atomic_bool should_reassembled_reschedule{}; /* this get checked as soon the command handle lock has been released so trylock will succeed */ //Handle stuff void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */); bool next_reassembled_command(std::unique_lock &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */); /* ---------- Write declarations ---------- */ spin_lock write_queue_lock{}; /* queue access isn't for long in general */ std::deque write_queue{}; struct WritePreprocessCategory { enum value { PING_PONG = 0, //Ping/Pongs ACK = 2, VOICE_WHISPER = 1, //Voice/Whisper COMMAND = 3, INIT = 4, MAX = INIT }; inline static value from_type(protocol::PacketType type) { switch(type) { case protocol::PING: case protocol::PONG: return value::PING_PONG; case protocol::VOICE: case protocol::VOICE_WHISPER: return value::VOICE_WHISPER; case protocol::ACK: case protocol::ACK_LOW: return value::ACK; case protocol::COMMAND: case protocol::COMMAND_LOW: return value::COMMAND; default: return value::INIT; } } }; struct WritePreprocessQueue { int _zero1{0}; bool has_work{false}; std::mutex work_lock{}; spin_lock queue_lock{}; std::deque> queue{}; int _zero{0}; }; std::array write_preprocess_queues{}; /* ---------- Processing ---------- */ /* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */ protocol::PacketIdManager packet_id_manager; /* this function is thread save :) */ std::atomic prepare_process_count{0}; /* current thread count preparing a packet */ bool prepare_packet_for_write(std::vector &/* buffers which need to be transferred */, const std::shared_ptr &/* the packet */, std::unique_lock& /* work lock */); std::array incoming_generation_estimators{}; /* implementation is thread save */ std::recursive_mutex packet_buffer_lock; command_packet_reassembler _command_fragment_buffers; static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) { return packet_index & 0x1U; /* use 0 for command and 1 for command low */ } server::client::PacketStatistics packet_statistics_{}; }; } }