diff --git a/native/serverconnection/src/connection/ServerConnection.cpp b/native/serverconnection/src/connection/ServerConnection.cpp index 2681ce8..a2a044d 100644 --- a/native/serverconnection/src/connection/ServerConnection.cpp +++ b/native/serverconnection/src/connection/ServerConnection.cpp @@ -236,7 +236,7 @@ NAN_METHOD(ServerConnection::connect) { return; } - if(!callback->IsFunction() ) { + if(!callback->IsFunction()) { Nan::ThrowError(tr("Invalid callback")); return; } @@ -304,6 +304,8 @@ NAN_METHOD(ServerConnection::connect) { } this->socket->on_data = [&](const pipes::buffer_view& buffer) { this->protocol_handler->progress_packet(buffer); }; + this->socket->on_fatal_error = [&](int, int) { this->close_connection(); }; + if(teamspeak->IsBoolean() && teamspeak->BooleanValue(info.GetIsolate())) this->protocol_handler->server_type = server_type::TEAMSPEAK; this->protocol_handler->connect(); @@ -603,6 +605,7 @@ void ServerConnection::close_connection() { if(this->protocol_handler) this->protocol_handler->do_close_connection(); this->socket = nullptr; + this->call_disconnect_result.call(0, true); } diff --git a/native/serverconnection/src/connection/Socket.cpp b/native/serverconnection/src/connection/Socket.cpp index 3c06b77..4eed2d1 100644 --- a/native/serverconnection/src/connection/Socket.cpp +++ b/native/serverconnection/src/connection/Socket.cpp @@ -83,15 +83,8 @@ void UDPSocket::finalize() { lock.unlock(); assert(this_thread::get_id() != this->_io_thread.get_id()); - if(event_read) - event_del_block(event_read); - if(event_write) - event_del_block(event_write); - - if(event_write) - event_del(event_write); - if(event_read) - event_del(event_read); + if(event_read) event_del_block(event_read); + if(event_write) event_del_block(event_write); if(io_base) { timeval seconds{1, 0}; @@ -136,7 +129,7 @@ void UDPSocket::io_execute() { } void UDPSocket::callback_read(evutil_socket_t fd) { sockaddr source_address{}; - socklen_t source_address_length = sizeof(sockaddr); + socklen_t source_address_length; ssize_t read_length = -1; size_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */ @@ -160,8 +153,15 @@ void UDPSocket::callback_read(evutil_socket_t fd) { if(errno == EAGAIN) break; #endif + logger::warn(category::socket, tr("Failed to receive data: {}"), error); + if(auto callback{this->on_fatal_error}; callback) + callback(1, error); - logger::warn(category::socket, tr("Failed to receive data: {}"), error); + { + std::lock_guard lock{this->io_lock}; + if(this->event_read) + event_del_noblock(this->event_read); + } break; /* this should never happen! */ } @@ -183,7 +183,12 @@ void UDPSocket::callback_write(evutil_socket_t fd) { auto written = sendto(fd, buffer.data_ptr(), (int) buffer.length(), MSG_DONTWAIT, (sockaddr*) &this->_remote_address, sizeof(this->_remote_address)); if(written != buffer.length()) { - if(errno == EAGAIN) { + int error; +#ifdef WIN32 + if(error = WSAGetLastError(); error == WSAEWOULDBLOCK) { +#else + if(error = errno; errno == EAGAIN) { +#endif lock.lock(); this->write_queue.push_front(buffer); if(this->event_write) @@ -191,6 +196,9 @@ void UDPSocket::callback_write(evutil_socket_t fd) { return; } + logger::warn(category::socket, tr("Failed to send data: {}"), error); + if(auto callback{this->on_fatal_error}; callback) + callback(2, error); return; /* this should never happen! */ } else { //logger::trace(category::socket, tr("Wrote {} bytes"), buffer.length()); diff --git a/native/serverconnection/src/connection/Socket.h b/native/serverconnection/src/connection/Socket.h index 605cb0b..fb12bd7 100644 --- a/native/serverconnection/src/connection/Socket.h +++ b/native/serverconnection/src/connection/Socket.h @@ -15,44 +15,44 @@ #include #endif -namespace tc { - namespace connection { - class UDPSocket { - public: - UDPSocket(const sockaddr_storage& /* target */); - ~UDPSocket(); +namespace tc::connection { + class UDPSocket { + public: + explicit UDPSocket(const sockaddr_storage& /* target */); + ~UDPSocket(); - const sockaddr_storage& remote_address() { return this->_remote_address; } + const sockaddr_storage& remote_address() { return this->_remote_address; } - bool initialize(); - void finalize(); + bool initialize(); + void finalize(); - void send_message(const pipes::buffer_view& /* message */); + void send_message(const pipes::buffer_view& /* message */); - std::function on_data; + /* Callbacks are called within the IO loop. Do not call any other functions except send_message! */ + std::function on_data; + std::function on_fatal_error; - const std::thread& io_thread() { return this->_io_thread; } - private: - static void _io_execute(void *_ptr_socket); - static void _callback_read(evutil_socket_t, short, void*); - static void _callback_write(evutil_socket_t, short, void*); + const std::thread& io_thread() { return this->_io_thread; } + private: + static void _io_execute(void *_ptr_socket); + static void _callback_read(evutil_socket_t, short, void*); + static void _callback_write(evutil_socket_t, short, void*); - void io_execute(); - void callback_read(evutil_socket_t); - void callback_write(evutil_socket_t); + void io_execute(); + void callback_read(evutil_socket_t); + void callback_write(evutil_socket_t); - sockaddr_storage _remote_address; + sockaddr_storage _remote_address; - int file_descriptor = 0; + int file_descriptor = 0; - std::recursive_mutex io_lock; - std::thread _io_thread; - event_base* io_base = nullptr; + std::recursive_mutex io_lock; + std::thread _io_thread; + event_base* io_base = nullptr; - event* event_read = nullptr; - event* event_write = nullptr; + event* event_read = nullptr; + event* event_write = nullptr; - std::deque write_queue; - }; - } + std::deque write_queue; + }; } \ No newline at end of file diff --git a/native/serverconnection/src/connection/audio/VoiceClient.cpp b/native/serverconnection/src/connection/audio/VoiceClient.cpp index 09dc4bc..99b72a2 100644 --- a/native/serverconnection/src/connection/audio/VoiceClient.cpp +++ b/native/serverconnection/src/connection/audio/VoiceClient.cpp @@ -558,8 +558,12 @@ void VoiceClient::event_execute(const std::chrono::system_clock::time_point &sch //TODO: Use statically allocated buffer? auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer); - this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size); - this->set_state(state::playing); + if(!decoded) { + log_warn(category::audio, tr("Failed to decode buffer for client {} (nullptr). Dropping buffer."), this->_client_id, error); + } else { + this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size); + this->set_state(state::playing); + } } local_last_pid = replay_head->packet_id; diff --git a/native/serverconnection/test/js/main.ts b/native/serverconnection/test/js/main.ts index 7c65fa5..e387862 100644 --- a/native/serverconnection/test/js/main.ts +++ b/native/serverconnection/test/js/main.ts @@ -59,11 +59,9 @@ connection.callback_disconnect = reason => { const do_connect = () => { connection.connect({ timeout: 5000, - remote_host: "51.255.107.151", - remote_port: 31515, - //remote_port: 9987, + remote_port: 9987, //remote_host: "188.40.240.20", /* twerion */ - //remote_host: "localhost", + remote_host: "127.0.0.1", //remote_host: "ts.teaspeak.de", //remote_host: "51.68.181.92", //remote_host: "94.130.236.135", @@ -100,19 +98,11 @@ const do_connect = () => { return_code:91 } ], []); - - /* - consumer.callback_data = buffer => { - console.log("Sending voice data"); - connection.send_voice_data_raw(buffer, consumer.channels, consumer.sample_rate, true); - //stream.write_data_rated(buffer.buffer, true, consumer.sample_rate); - }; - */ } }, identity_key: "MG4DAgeAAgEgAiBC9JsqB1am6vowj2obomMyxm1GLk8qyRoxpBkAdiVYxwIgWksaSk7eyVQovZwPZBuiYHARz/xQD5zBUBK6e63V7hICIQCZ2glHe3kV62iIRKpkV2lzZGZtfBPRMbwIcU9aE1EVsg==", - teamspeak: true /* used here to speed up the handsahke process :) */ + teamspeak: true /* used here to speed up the handshake process :) */ }); connection.callback_voice_data = (buffer, client_id, codec_id, flag_head, packet_id) => { @@ -120,9 +110,19 @@ const do_connect = () => { connection.send_voice_data(buffer, codec_id, flag_head); }; + connection.callback_disconnect = error => { + console.log("Disconnect: %s", error); + }; + connection.callback_command = (command, arguments1, switches) => { console.log("Command: %s: %0", command, arguments1); - } + if(command === "channellistfinished") { + setInterval(() => { + console.log("XXX"); + connection.send_command("channelcreate", [{"channel_name": Math.random() + " :D"}], []); + }, 10); + } + }; connection._voice_connection.register_client(7); };