Some file server related things

This commit is contained in:
WolverinDEV 2020-07-30 11:40:03 +02:00
parent 7a974677fb
commit 3f700e79d3
14 changed files with 77 additions and 46 deletions

View File

@ -261,6 +261,7 @@ namespace ts::server::file {
size_t flush_network_buffer();
void flush_disk_buffer();
[[nodiscard]] bool buffers_flushed();
[[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; }
};

View File

@ -49,7 +49,6 @@ void LocalFileTransfer::disconnect_client(const std::shared_ptr<FileClient> &cli
del_ev_noblock(client->networking.event_read);
/* max flush 10 seconds */
client->networking.disconnect_timeout = std::chrono::system_clock::now() + network_flush_time;
debugMessage(LOG_FT, "{} Disconnecting client. Flushing pending bytes (max {} seconds)", client->log_prefix(), std::chrono::floor<std::chrono::seconds>(network_flush_time).count());
@ -70,20 +69,16 @@ void LocalFileTransfer::test_disconnecting_state(const std::shared_ptr<FileClien
if(client->state != FileClient::STATE_DISCONNECTING)
return;
{
std::lock_guard db_lock{client->disk_buffer.mutex};
std::lock_guard nb_lock{client->network_buffer.mutex};
if(!client->buffers_flushed())
return;
if(client->disk_buffer.bytes > 0)
return;
if(client->network_buffer.bytes > 0)
return;
if(client->networking.protocol != FileClient::PROTOCOL_TS_V1) {
debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Closing connection.", client->log_prefix());
std::unique_lock s_lock{client->state_mutex};
this->disconnect_client(client, s_lock, false);
} else {
debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Awaiting client disconnect.", client->log_prefix());
}
debugMessage(LOG_FT, "{} Disk and network buffers are flushed.", client->log_prefix());
std::unique_lock s_lock{client->state_mutex};
this->disconnect_client(client, s_lock, false);
}
void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
@ -106,9 +101,16 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
case FileClient::STATE_TRANSFERRING:
break;
case FileClient::STATE_DISCONNECTING:
if(transfer->transfer && transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD)
break; /* we're still transferring (sending data) */
continue;
if(!transfer->transfer)
continue;
if(transfer->transfer->direction != Transfer::DIRECTION_DOWNLOAD)
continue;
if(transfer->buffers_flushed())
continue;
break; /* we're still transferring (sending data) */
case FileClient::STATE_AWAITING_KEY:
case FileClient::STATE_DISCONNECTED:
default:
@ -183,7 +185,10 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
provider->invoke_aborted_callback(client, { TransferError::TRANSFER_TIMEOUT, "" });
break;
case FileClient::STATE_DISCONNECTING:
logMessage(LOG_FT, "{} Failed to flush connection. Dropping client", client->log_prefix());
if(!client->buffers_flushed())
logMessage(LOG_FT, "{} Failed to flush connection. Dropping client", client->log_prefix());
else
; /* we just awaited a client disconnect */
break;
case FileClient::STATE_DISCONNECTED:
default:

View File

@ -140,6 +140,22 @@ void FileClient::flush_disk_buffer() {
}
}
bool FileClient::buffers_flushed() {
{
std::lock_guard db_lock{this->disk_buffer.mutex};
if(this->disk_buffer.bytes > 0)
return false;
}
{
std::lock_guard nb_lock{this->network_buffer.mutex};
if(this->network_buffer.bytes > 0)
return false;
}
return true;
}
FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr<FileClient> &transfer) {
FileInitializeResult result{FileInitializeResult::SUCCESS};
assert(transfer->transfer);

View File

@ -588,6 +588,9 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
break;
}
case FileClient::STATE_DISCONNECTING:
logMessage(LOG_FT, "{} Remote client closed connection. Finalizing disconnect.", transfer->log_prefix());
break;
case FileClient::STATE_DISCONNECTED:
default:
break;

@ -1 +1 @@
Subproject commit 5f0349e5f930b4f0275c43518d01b9e8c57aa73a
Subproject commit e6087512662f5bed71a47b45b1824fc78d27aeda

View File

@ -694,8 +694,8 @@ bool ConnectedClient::handle_text_command(
if(!vc) return false;
send_message(_this.lock(), "I lost your IP address. I'm so dump :)");
vc->connection->reset_remote_address();
memset(&vc->remote_address, 0, sizeof(vc->remote_address));
memset(&vc->address_info, 0, sizeof(vc->address_info));
send_message(_this.lock(), "Hey, we got the address back");
return true;
} else if(TARG(0, "fb")) {

View File

@ -2127,11 +2127,11 @@ command_result ConnectedClient::handleCommandLogView(ts::Command& cmd) {
this->sendCommand(result);
#else
constexpr static std::array<std::string_view, 5> log_output{
"The command 'logview' is not supported anymore.",
"In order to lookup the server actions use 'logquery'.",
"",
"located at your TeaSpeak installation folder. All logs could be found there.",
"If you need to lookup the TeaSpeak - Server logs, please visit the 'logs/' folder,",
"located at your TeaSpeak installation folder. All logs could be found there."
"",
"In order to lookup the server actions use 'logquery'.",
"The command 'logview' is not supported anymore."
};
command_builder result{this->getExternalType() == ClientType::CLIENT_TEAMSPEAK ? "notifyserverlog" : ""};
@ -2141,13 +2141,13 @@ command_result ConnectedClient::handleCommandLogView(ts::Command& cmd) {
size_t index{0};
if(lagacy) {
for(const auto& message : log_output) {
std::string line{"2020-06-27 00:00.0000|CRITICAL|Server Instance | |"};
std::string line{"2020-06-27 00:00.000" + std::to_string(index) + "|CRITICAL|Server Instance | |"};
line += message;
result.put_unchecked(index++, "l", line);
}
} else {
for(const auto& message : log_output) {
std::string line{"[2020-06-27 00:00:00][ERROR] "};
std::string line{"[2020-06-27 00:00:0" + std::to_string(index) + "][ERROR] "};
line += message;
result.put_unchecked(index++, "l", line);
}

View File

@ -65,7 +65,6 @@ void VoiceClient::tick(const std::chrono::system_clock::time_point &time) {
this->connection->ping_handler().tick(time);
this->connection->packet_statistics().tick();
} else if(this->state == ConnectionState::INIT_LOW || this->state == ConnectionState::INIT_HIGH) {
/* FIXME: Handshake timeout */
auto last_command = this->connection->crypt_setup_handler().last_handled_command();
if(last_command.time_since_epoch().count() != 0) {
if(time - last_command > seconds(5)) {

View File

@ -55,7 +55,7 @@ namespace ts {
friend class server::udp::CryptSetupHandler;
using ServerCommandExecutor = ts::server::server::udp::ServerCommandExecutor;
public:
VoiceClient(const std::shared_ptr<VoiceServer>& server,const sockaddr_storage*);
VoiceClient(const std::shared_ptr<VoiceServer>& server, const sockaddr_storage*);
~VoiceClient() override;
bool close_connection(const std::chrono::system_clock::time_point &timeout) override;
@ -95,9 +95,6 @@ namespace ts {
protected:
virtual command_result handleCommand(Command &command) override;
private:
int socket = 0;
io::pktinfo_storage address_info;
void finalDisconnect();
bool final_disconnected = false;

View File

@ -1,11 +1,12 @@
#include <algorithm>
#include <log/LogUtils.h>
#include "../../server/VoiceServer.h"
#include <misc/memtracker.h>
#include <protocol/Packet.h>
#include <ThreadPool/Timer.h>
#include "VoiceClientConnection.h"
#include "VoiceClient.h"
#include "../../server/VoiceServer.h"
#include "./VoiceClientConnection.h"
#include "./VoiceClient.h"
//#define LOG_AUTO_ACK_AUTORESPONSE
@ -74,12 +75,6 @@ void VoiceClientConnection::triggerWrite() {
this->current_client->voice_server->triggerWrite(dynamic_pointer_cast<VoiceClient>(this->current_client->_this.lock()));
}
#ifdef CLIENT_LOG_PREFIX
#undef CLIENT_LOG_PREFIX
#endif
#define CLIENT_LOG_PREFIX "[" << this->client->getPeerIp() << ":" << this->client->getPeerPort() << " | " << this->client->getDisplayName() << "]"
//Message handle methods
void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid())
@ -216,6 +211,11 @@ void VoiceClientConnection::reset() {
this->packet_encoder_.reset();
}
void VoiceClientConnection::reset_remote_address() {
memset(&this->remote_address_, 0, sizeof(this->remote_address_));
memset(&this->remote_address_info_, 0, sizeof(this->remote_address_info_));
}
void VoiceClientConnection::send_packet(protocol::PacketType type, protocol::PacketFlag::PacketFlags flag, const void *payload, size_t payload_size) {
this->packet_encoder_.send_packet(type, flag, payload, payload_size);
}
@ -300,5 +300,6 @@ void VoiceClientConnection::callback_ping_send_recovery(void *ptr_this) {
}
void VoiceClientConnection::callback_ping_timeout(void *ptr_this) {
(void) ptr_this;
/* doing nothing a packet resend failed will cause the client to disconnect */
}

View File

@ -65,10 +65,15 @@ namespace ts {
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
void reset();
void reset_remote_address();
[[nodiscard]] std::string log_prefix();
[[nodiscard]] inline auto virtual_server_id() const { return this->virtual_server_id_; }
[[nodiscard]] inline const auto& remote_address() const { return this->remote_address_; }
[[nodiscard]] inline const auto& socket_id() const { return this->socket_id_; }
[[nodiscard]] inline auto& packet_statistics() { return this->packet_statistics_; }
[[nodiscard]] inline auto& packet_decoder() { return this->packet_decoder_; }
[[nodiscard]] inline auto& packet_encoder() { return this->packet_encoder_; }
@ -84,6 +89,10 @@ namespace ts {
ServerId virtual_server_id_;
server::VoiceClient* current_client;
int socket_id_{0};
sockaddr_storage remote_address_{};
io::pktinfo_storage remote_address_info_{};
CryptHandler crypt_handler; /* access to CryptHandler is thread save */
server::client::PacketStatistics packet_statistics_{};

View File

@ -298,9 +298,9 @@ shared_ptr<VoiceClient> POWHandler::register_verified_client(const std::shared_p
voice_client->_this = voice_client;
voice_client->initialize();
voice_client->socket = client->socket;
voice_client->connection->socket_id_ = client->socket;
voice_client->state = ConnectionState::INIT_LOW;
memcpy(&voice_client->address_info, &client->address_info, sizeof(client->address_info));
memcpy(&voice_client->connection->remote_address_info_, &client->address_info, sizeof(client->address_info));
{
lock_guard lock(this->server->connectionLock);

View File

@ -217,7 +217,7 @@ int IOServerHandler::resolve_file_descriptor(const std::shared_ptr<ts::server::V
if(this->event_loop_events.empty())
return -1;
auto socket = client->socket;
auto socket = client->connection->socket_id();
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return -1;
@ -229,7 +229,7 @@ void IOServerHandler::invoke_write(const std::shared_ptr<ts::server::VoiceClient
if(this->event_loop_events.empty())
return; /* TODO any kind of error or warning? */
auto socket = client->socket;
auto socket = client->connection->socket_id();
auto event_loop = this->event_loop_events[this->event_loop_index++ % this->event_loop_events.size()];
if(socket < 0 || socket > event_loop->events.size())
return; /* TODO any kind of error or warning? */

View File

@ -332,7 +332,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
auto command = "dummy_ipchange old_ip=" + old_address + " new_ip=" + new_address;
client->server_command_executor().force_insert_command(pipes::buffer_view{command.data(), command.length()});
memcpy(&client->remote_address, &remote_address, sizeof(remote_address));
io::DatagramPacket::extract_info(message, client->address_info);
io::DatagramPacket::extract_info(message, client->connection->remote_address_info_);
}
} else {
continue; /* we've no clue */
@ -459,7 +459,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
break;
}
ssize_t res = write_datagram(io, client->remote_address, &client->address_info, packet->packet_length(), packet->packet_data());
ssize_t res = write_datagram(io, client->remote_address, &client->connection->remote_address_info_, packet->packet_length(), packet->packet_data());
if(res != packet->packet_length()) {
if(errno == EAGAIN) {
logCritical(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN).", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
@ -477,7 +477,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
res,
fd,
event_handle->file_descriptor,
voice_client->socket,
voice_client->connection->socket_id(),
event_handle->socket_id,
voice_server->io->resolve_file_descriptor(voice_client),
voice_client->isAddressV4() ? "v4" : voice_client->isAddressV6() ? "v6" : "v?",