#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "VoiceClient.h" #include "protocol/AcknowledgeManager.h" #include #include "./PacketStatistics.h" //#define LOG_ACK_SYSTEM #ifdef LOG_ACK_SYSTEM #define LOG_AUTO_ACK_REQUEST #define LOG_AUTO_ACK_RESPONSE #define LOG_PKT_RESEND #endif //#define PKT_LOG_PING namespace ts { namespace server { class VoiceClient; class VoiceServer; class POWHandler; } namespace connection { class VoiceClientConnection { friend class AcknowledgeManager; friend class server::VoiceServer; friend class server::VoiceClient; friend class server::POWHandler; public: enum struct WBufferPopResult { DRAINED, MORE_AVAILABLE }; struct CommandFragment { uint16_t packet_id{0}; uint16_t packet_generation{0}; uint8_t packet_flags{0}; uint32_t payload_length : 24; pipes::buffer payload{}; CommandFragment() { this->payload_length = 0; } CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {} CommandFragment& operator=(const CommandFragment&) = default; CommandFragment(const CommandFragment& other) = default; CommandFragment(CommandFragment&&) = default; }; static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer)); typedef protocol::PacketRingBuffer command_fragment_buffer_t; typedef std::array command_packet_reassembler; explicit VoiceClientConnection(server::VoiceClient*); virtual ~VoiceClientConnection(); /* Do not send command packets via send_packet! The send_packet will take ownership of the packet! */ void send_packet(protocol::OutgoingServerPacket* /* packet */); void send_packet(protocol::PacketType /* type */, protocol::PacketFlag::PacketFlags /* flags */, const void* /* payload */, size_t /* payload length */); void send_command(const std::string_view& /* build command command */, bool /* command low */, std::unique_ptr> /* acknowledge listener */); CryptHandler* getCryptHandler(){ return &crypt_handler; } server::VoiceClient* getClient(){ return client; } #ifdef VC_USE_READ_QUEUE bool handleNextDatagram(); #endif /* if the result is true, ownership has been transferred */ WBufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */); void execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next); void encrypt_write_queue(); bool wait_empty_write_and_prepare_queue(std::chrono::time_point until = std::chrono::time_point()); protocol::PacketIdManager& getPacketIdManager() { return this->packet_id_manager; } AcknowledgeManager& getAcknowledgeManager() { return this->acknowledge_handler; } inline auto& get_incoming_generation_estimators() { return this->incoming_generation_estimators; } void reset(); void force_insert_command(const pipes::buffer_view& /* payload */); void register_initiv_packet(); [[nodiscard]] inline auto& packet_statistics() { return this->packet_statistics_; } //buffer::SortedBufferQueue** getReadQueue() { return this->readTypedQueue; } protected: void handle_incoming_datagram(const pipes::buffer_view &buffer); bool verify_encryption(const pipes::buffer_view& /* full packet */); void triggerWrite(); private: server::VoiceClient* client = nullptr; //Decryption / encryption stuff CryptHandler crypt_handler; /* access to CryptHandler is thread save */ CompressionHandler compress_handler; AcknowledgeManager acknowledge_handler; std::atomic_bool should_reassembled_reschedule{}; /* this get checked as soon the command handle lock has been released so trylock will succeed */ //Handle stuff void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */); bool next_reassembled_command(std::unique_lock &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */); /* ---------- Write ---------- */ spin_mutex write_queue_mutex{}; protocol::OutgoingServerPacket* resend_queue_head{nullptr}; protocol::OutgoingServerPacket** resend_queue_tail{&resend_queue_head}; protocol::OutgoingServerPacket* write_queue_head{nullptr}; protocol::OutgoingServerPacket** write_queue_tail{&write_queue_head}; /* ---------- Processing ---------- */ /* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */ protocol::PacketIdManager packet_id_manager; spin_mutex packet_id_mutex{}; /* this function is thread save :) */ std::atomic prepare_process_count{0}; /* current thread count preparing a packet */ bool prepare_outgoing_packet(protocol::OutgoingServerPacket* /* packet */); std::array incoming_generation_estimators{}; /* implementation is thread save */ std::recursive_mutex packet_buffer_lock; command_packet_reassembler _command_fragment_buffers; static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) { return packet_index & 0x1U; /* use 0 for command and 1 for command low */ } server::client::PacketStatistics packet_statistics_{}; }; } }