Some minor improvements

This commit is contained in:
WolverinDEV 2020-04-16 13:12:00 +02:00
parent 90c9fad28b
commit d7fa67b0aa
14 changed files with 115 additions and 92 deletions

View File

@ -24,8 +24,8 @@ function(setup_nodejs)
set(NODEJS_URL "https://atom.io/download/atom-shell")
set(NODEJS_VERSION "v8.0.0")
# set(NODEJS_URL "https://nodejs.org/download/release/")
# set(NODEJS_VERSION "v12.13.0")
set(NODEJS_URL "https://nodejs.org/download/release/")
set(NODEJS_VERSION "v12.13.0")
find_package(NodeJS REQUIRED)

View File

@ -187,8 +187,9 @@ declare module "tc-native/connection" {
}
export interface MarginedFilter {
get_margin_frames() : number;
set_margin_frames(value: number);
/* in seconds */
get_margin_time() : number;
set_margin_time(value: number);
}
export interface VADConsumeFilter extends ConsumeFilter, MarginedFilter {

View File

@ -13,7 +13,7 @@ ThresholdFilter::~ThresholdFilter() {}
bool ThresholdFilter::initialize(std::string &, float val, size_t margin) {
this->_threshold = val;
this->_margin_frames = margin;
this->_margin_samples = margin;
return true;
}
@ -53,7 +53,7 @@ bool ThresholdFilter::process(const void *_buffer) {
auto last_level = this->_current_level;
float smooth;
if(this->_margin_processed_frames == 0) /* we're in release */
if(this->_margin_processed_samples == 0) /* we're in release */
smooth = this->_release_smooth;
else
smooth = this->_attack_smooth;
@ -65,11 +65,11 @@ bool ThresholdFilter::process(const void *_buffer) {
analyze_callback(this->_current_level);
if(this->_current_level >= this->_threshold) {
this->_margin_processed_frames = 0;
this->_margin_processed_samples = 0;
return true;
}
return this->_margin_processed_frames++ < this->_margin_frames;
return (this->_margin_processed_samples += this->_frame_size) < this->_margin_samples;
}

View File

@ -21,8 +21,9 @@ namespace tc {
inline float threshold() { return this->_threshold; }
inline void set_threshold(float value) { this->_threshold = value; }
inline size_t margin_frames() { return this->_margin_frames; }
inline void set_margin_frames(size_t value) { this->_margin_frames = value; }
/* in seconds */
inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; }
inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); }
inline void attack_smooth(float value) { this->_attack_smooth = value; }
inline float attack_smooth() { return this->_attack_smooth; }
@ -38,8 +39,8 @@ namespace tc {
float _threshold;
size_t _margin_frames = 0;
size_t _margin_processed_frames = 0;
size_t _margin_samples = 0;
size_t _margin_processed_samples = 0;
};
}
}

View File

@ -44,7 +44,7 @@ bool VadFilter::initialize(std::string &error, size_t mode, size_t margin) {
}
this->_mode = mode;
this->_margin_frames = margin;
this->_margin_samples = margin;
if(this->_channels > 1) {
this->ensure_buffer(this->_frame_size * this->_channels * 4); /* buffer to merge the channels into one channel */
} else {
@ -99,10 +99,10 @@ bool VadFilter::process(const void *buffer) {
auto flag_vad = result == 1;
if(!flag_vad) {
this->_margin_processed_frames++;
return this->_margin_processed_frames <= this->_margin_frames;
this->_margin_processed_samples += this->_frame_size;
return this->_margin_processed_samples <= this->_margin_samples;
} else {
this->_margin_processed_frames = 0;
this->_margin_processed_samples = 0;
}
return flag_vad;
}

View File

@ -15,16 +15,16 @@ namespace tc {
bool initialize(std::string& /* error */, size_t /* mode */, size_t /* margin frames */);
bool process(const void* /* buffer */) override;
inline size_t margin_frames() { return this->_margin_frames; }
inline void set_margin_frames(size_t value) { this->_margin_frames = value; }
inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; }
inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); }
inline size_t mode() { return this->_mode; }
private:
Fvad* _vad_handle = nullptr;
size_t _mode = 0;
size_t _margin_frames = 0;
size_t _margin_processed_frames = 0;
size_t _margin_samples = 0;
size_t _margin_processed_samples = 0;
std::mutex _buffer_lock;
void* _buffer = nullptr;

View File

@ -16,8 +16,8 @@ NAN_MODULE_INIT(AudioFilterWrapper::Init) {
Nan::SetPrototypeMethod(klass, "get_name", AudioFilterWrapper::_get_name);
Nan::SetPrototypeMethod(klass, "get_margin_frames", AudioFilterWrapper::_get_margin_frames);
Nan::SetPrototypeMethod(klass, "set_margin_frames", AudioFilterWrapper::_set_margin_frames);
Nan::SetPrototypeMethod(klass, "get_margin_time", AudioFilterWrapper::_get_margin_time);
Nan::SetPrototypeMethod(klass, "set_margin_time", AudioFilterWrapper::_set_margin_time);
Nan::SetPrototypeMethod(klass, "get_level", AudioFilterWrapper::_get_level);
@ -102,7 +102,7 @@ NAN_METHOD(AudioFilterWrapper::_get_level) {
}
NAN_METHOD(AudioFilterWrapper::_get_margin_frames) {
NAN_METHOD(AudioFilterWrapper::_get_margin_time) {
auto handle = ObjectWrap::Unwrap<AudioFilterWrapper>(info.Holder());
if(!handle->_filter) {
Nan::ThrowError("invalid handle");
@ -112,16 +112,16 @@ NAN_METHOD(AudioFilterWrapper::_get_margin_frames) {
auto vad_filter = dynamic_pointer_cast<filter::VadFilter>(handle->_filter);
auto threshold_filter = dynamic_pointer_cast<filter::ThresholdFilter>(handle->_filter);
if(vad_filter) {
info.GetReturnValue().Set((int) vad_filter->margin_frames());
info.GetReturnValue().Set((float) vad_filter->margin_release_time());
} else if(threshold_filter) {
info.GetReturnValue().Set((int) threshold_filter->margin_frames());
info.GetReturnValue().Set((float) threshold_filter->margin_release_time());
} else {
Nan::ThrowError("invalid handle");
return;
}
}
NAN_METHOD(AudioFilterWrapper::_set_margin_frames) {
NAN_METHOD(AudioFilterWrapper::_set_margin_time) {
auto handle = ObjectWrap::Unwrap<AudioFilterWrapper>(info.Holder());
if(!handle->_filter) {
Nan::ThrowError("invalid handle");
@ -136,9 +136,9 @@ NAN_METHOD(AudioFilterWrapper::_set_margin_frames) {
auto vad_filter = dynamic_pointer_cast<filter::VadFilter>(handle->_filter);
auto threshold_filter = dynamic_pointer_cast<filter::ThresholdFilter>(handle->_filter);
if(vad_filter) {
vad_filter->set_margin_frames(info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0));
vad_filter->set_margin_release_time(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0));
} else if(threshold_filter) {
threshold_filter->set_margin_frames(info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0));
threshold_filter->set_margin_release_time(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0));
} else {
Nan::ThrowError("invalid handle");
return;

View File

@ -32,8 +32,8 @@ namespace tc {
static NAN_METHOD(_get_name);
/* VAD and Threshold */
static NAN_METHOD(_get_margin_frames);
static NAN_METHOD(_set_margin_frames);
static NAN_METHOD(_get_margin_time);
static NAN_METHOD(_set_margin_time);
/* VAD relevant */
static NAN_METHOD(_get_level);

View File

@ -36,6 +36,8 @@ void ProtocolHandler::reset() {
{ /* initialize pow handler */
this->pow.state = pow_state::COOKIE_SET;
this->pow.retry_count = 0;
this->pow.last_buffer = pipes::buffer{};
this->pow.last_resend = system_clock::time_point{};
this->pow.last_response = system_clock::time_point{};

View File

@ -98,6 +98,7 @@ namespace tc {
uint8_t disconnect_id = 0;
struct {
size_t retry_count{0};
pow_state::value state;
uint64_t client_ts3_build_timestamp = 173265950 /* TS3 */; /* needs to be lower than 173265950 for old stuff, else new protocol */

View File

@ -49,6 +49,14 @@ void ProtocolHandler::handlePacketInit(const std::shared_ptr<ts::protocol::Serve
if(packet_state == pow_state::COMMAND_RESET) {
log_trace(category::connection, tr("[POW] Received reset"));
this->pow.retry_count++;
if(this->pow.retry_count > 8) {
log_trace(category::connection, tr("[POW] Retied puzzle too many times. Aborting connect."));
this->handle->call_connect_result.call(this->handle->errors.register_error(tr("failed to solve connect puzzle")), true);
this->handle->close_connection();
return;
}
this->pow.state = pow_state::COOKIE_SET; /* next expected packet state */
this->pow_send_cookie_get();
return;
@ -130,7 +138,7 @@ void ProtocolHandler::handlePacketInit(const std::shared_ptr<ts::protocol::Serve
auto offset = 4 + 1 + 2 * 64 + 04 + 100;
memset(&response_buffer[offset], 0, 64);
mp_to_unsigned_bin(&result, (u_char*) &response_buffer[offset]);
mp_to_unsigned_bin(&result, (u_char*) &response_buffer[offset + 64 - mp_unsigned_bin_size(&result)]);
memcpy(&response_buffer[301], command.data(), command.size());

View File

@ -60,7 +60,7 @@ bool UDPSocket::initialize() {
this->event_write = event_new(this->io_base, this->file_descriptor, EV_WRITE, &UDPSocket::_callback_write, this);
event_add(this->event_read, nullptr);
this->_io_thread = thread(&UDPSocket::_io_execute, this);
this->_io_thread = thread(&UDPSocket::_io_execute, this, this->io_base);
#ifdef WIN32
//TODO set thread name
#else
@ -71,38 +71,42 @@ bool UDPSocket::initialize() {
}
void UDPSocket::finalize() {
const auto is_event_thread = this_thread::get_id() == this->_io_thread.get_id();
if(this->file_descriptor == 0)
return;
unique_lock lock(this->io_lock);
auto event_read = this->event_read, event_write = this->event_write;
auto io_base = this->io_base;
this->io_base = nullptr;
this->event_read = nullptr;
this->event_write = nullptr;
auto event_read = std::exchange(this->event_read, nullptr);
auto event_write = std::exchange(this->event_write, nullptr);
auto io_base = std::exchange(this->io_base, nullptr);
lock.unlock();
assert(this_thread::get_id() != this->_io_thread.get_id());
if(event_read) event_del_block(event_read);
if(event_write) event_del_block(event_write);
if(io_base) {
timeval seconds{1, 0};
event_base_loopexit(io_base, &seconds);
event_base_loopexit(io_base, nullptr);
}
if(this->_io_thread.joinable())
this->_io_thread.join();
if(is_event_thread) {
if(event_read) event_del_block(event_read);
if(event_write) event_del_block(event_write);
} else {
if(event_read) event_del_noblock(event_read);
if(event_write) event_del_noblock(event_write);
}
if(io_base)
event_base_free(io_base);
event_base_loopexit(io_base, nullptr);
if(is_event_thread) {
event_base_loopexit(io_base, nullptr);
this->_io_thread.detach();
} else {
event_base_loopexit(io_base, nullptr);
if(this->_io_thread.joinable())
this->_io_thread.join();
}
#ifdef WIN32
if(::closesocket(this->file_descriptor) != 0) {
const auto close_result = ::closesocket(this->file_descriptor);
#else
if(::close(this->file_descriptor) != 0) {
const auto close_result = ::close(this->file_descriptor);
#endif
if(close_result != 0) {
if(errno != EBADF)
logger::warn(category::socket, tr("Failed to close file descriptor ({}/{})"), to_string(errno), strerror(errno));
}
@ -117,15 +121,16 @@ void UDPSocket::_callback_read(evutil_socket_t fd, short, void *_ptr_socket) {
((UDPSocket*) _ptr_socket)->callback_read(fd);
}
void UDPSocket::_io_execute(void *_ptr_socket) {
((UDPSocket*) _ptr_socket)->io_execute();
void UDPSocket::_io_execute(void *_ptr_socket, void* _ptr_event_base) {
((UDPSocket*) _ptr_socket)->io_execute(_ptr_event_base);
}
void UDPSocket::io_execute() {
while(this->io_base) {
event_base_loop(this->io_base, EVLOOP_NO_EXIT_ON_EMPTY);
}
void UDPSocket::io_execute(void* ptr_event_base) {
auto base = (event_base*) ptr_event_base;
event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
/* this pointer might be dangling here! */
logger::trace(category::socket, tr("Socket IO loop exited"));
event_base_free(base);
}
void UDPSocket::callback_read(evutil_socket_t fd) {
sockaddr source_address{};
@ -135,41 +140,36 @@ void UDPSocket::callback_read(evutil_socket_t fd) {
size_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */
char buffer[1600];
size_t read_count = 0;
while(true) { //TODO: Some kind of timeout
source_address_length = sizeof(sockaddr);
read_length = recvfrom(fd, (char*) buffer, (int) buffer_length, MSG_DONTWAIT, &source_address, &source_address_length);
if(read_length <= 0) {
if(read_length == 0 && read_count > 0)
break;
int error;
source_address_length = sizeof(sockaddr);
read_length = recvfrom(fd, (char*) buffer, (int) buffer_length, MSG_DONTWAIT, &source_address, &source_address_length);
if(read_length <= 0) {
int error;
#ifdef WIN32
error = WSAGetLastError();
if(error == WSAEWOULDBLOCK)
break;
error = WSAGetLastError();
if(error == WSAEWOULDBLOCK)
return;
#else
error = errno;
if(errno == EAGAIN)
break;
error = errno;
if(errno == EAGAIN)
return;
#endif
logger::warn(category::socket, tr("Failed to receive data: {}"), error);
if(auto callback{this->on_fatal_error}; callback)
callback(1, error);
logger::warn(category::socket, tr("Failed to receive data: {}"), error);
{
std::lock_guard lock{this->io_lock};
if(this->event_read)
event_del_noblock(this->event_read);
}
{
std::lock_guard lock{this->io_lock};
if(this->event_read)
event_del_noblock(this->event_read);
}
break; /* this should never happen! */
}
//logger::trace(category::socket, tr("Read {} bytes"), read_length);
read_count++;
if(this->on_data)
this->on_data(pipes::buffer_view{buffer, (size_t) read_length});
if(auto callback{this->on_fatal_error}; callback)
callback(1, error);
/* this pointer might be dangling now because we got deleted while handling this data */
return;
}
//logger::trace(category::socket, tr("Read {} bytes"), read_length);
if(this->on_data)
this->on_data(pipes::buffer_view{buffer, (size_t) read_length});
/* this pointer might be dangling now because we got deleted while handling this data */
}
void UDPSocket::callback_write(evutil_socket_t fd) {

View File

@ -34,11 +34,11 @@ namespace tc::connection {
const std::thread& io_thread() { return this->_io_thread; }
private:
static void _io_execute(void *_ptr_socket);
static void _io_execute(void *_ptr_socket, void *_ptr_event_base);
static void _callback_read(evutil_socket_t, short, void*);
static void _callback_write(evutil_socket_t, short, void*);
void io_execute();
void io_execute(void*);
void callback_read(evutil_socket_t);
void callback_write(evutil_socket_t);

View File

@ -56,7 +56,7 @@ connection.callback_disconnect = reason => {
console.log("Got disconnect: %s", reason);
};
const do_connect = () => {
const do_connect = (connection) => {
connection.connect({
timeout: 5000,
remote_port: 9987,
@ -150,7 +150,16 @@ const do_connect = () => {
connection._voice_connection.register_client(7);
};
do_connect();
do_connect(connection);
let _connections = [];
let i = 0;
let ii = setInterval(() => {
if(i++ > 35)
clearInterval(ii);
const c = handle.spawn_server_connection();
_connections.push(c);
do_connect(c);
}, 500);
connection.callback_voice_data = (buffer, client_id, codec_id, flag_head, packet_id) => {
console.log("Received voice of length %d from client %d in codec %d (Head: %o | ID: %d)", buffer.byteLength, client_id, codec_id, flag_head, packet_id);
@ -167,6 +176,7 @@ setInterval(() => {
/* keep the object alive */
setTimeout(() => {
connection.connected();
_connections.forEach(e => e.current_ping());
}, 1000);
connection_list.push(connection);