General file transfer updates:

- Added a file source object
- Simplified the JS to native interface
- Correctly implemented TransferObject interfaces
- Fixed possible transfer hangup
This commit is contained in:
WolverinDEV 2019-10-25 23:56:18 +02:00
parent 382c478f69
commit b14af3a1ac
8 changed files with 292 additions and 118 deletions

View File

@ -86,22 +86,18 @@ declare module "teaclient_connection" {
export namespace ft {
export interface TransferObject {
upload: boolean;
download: boolean;
name: string;
direction: "upload" | "download";
}
export interface FileTransferSource extends TransferObject {
total_size: number;
uploaded_size: number;
}
export interface FileTransferTarget extends TransferObject {
expected_size: number;
current_size: number;
}
export interface HandledTransferObject extends TransferObject {}
export interface NativeFileTransfer {
handle: TransferObject;
@ -121,11 +117,12 @@ declare module "teaclient_connection" {
client_transfer_id: number;
server_transfer_id: number;
object: HandledTransferObject;
object: TransferObject;
}
export function upload_transfer_object_from_buffer(buffer: ArrayBuffer);
export function download_transfer_object_from_buffer(target_buffer: ArrayBuffer);
export function upload_transfer_object_from_file(path: string, name: string) : FileTransferSource;
export function upload_transfer_object_from_buffer(buffer: ArrayBuffer) : FileTransferSource;
export function download_transfer_object_from_buffer(target_buffer: ArrayBuffer) : TransferObject;
export function destroy_connection(connection: NativeFileTransfer);
export function spawn_connection(transfer: TransferOptions) : NativeFileTransfer;

View File

@ -201,6 +201,10 @@ NAN_MODULE_INIT(init) {
Nan::New<v8::String>("download_transfer_object_from_buffer").ToLocalChecked(),
Nan::GetFunction(Nan::New<v8::FunctionTemplate>(TransferJSBufferTarget::create_from_buffer)).ToLocalChecked()
);
Nan::Set(ft_namespace,
Nan::New<v8::String>("upload_transfer_object_from_file").ToLocalChecked(),
Nan::GetFunction(Nan::New<v8::FunctionTemplate>(TransferFileSource::create)).ToLocalChecked()
);
//spawn_file_connection destroy_file_connection
JSTransfer::Init(ft_namespace);

View File

@ -193,7 +193,7 @@ void Transfer::callback_read(short flags) {
if(this->last_target_write.time_since_epoch().count() == 0)
this->last_target_write = system_clock::now();
else if(system_clock::now() - this->last_target_write > seconds(5)) {
this->call_callback_failed("timeout");
this->call_callback_failed("timeout (write)");
this->finalize(false);
return;
}
@ -201,7 +201,7 @@ void Transfer::callback_read(short flags) {
if(this->last_source_read.time_since_epoch().count() == 0)
this->last_source_read = system_clock::now();
else if(system_clock::now() - this->last_source_read > seconds(5)) {
this->call_callback_failed("timeout");
this->call_callback_failed("timeout (read)");
this->finalize(false);
return;
}
@ -215,11 +215,11 @@ void Transfer::callback_read(short flags) {
}
}
}
if(flags & EV_READ) {
if(this->_state == state::CONNECTING) {
log_debug(category::file_transfer, tr("Connected (read event)"));
this->handle_connected();
this->call_callback_connected();
}
int64_t buffer_length = 1024;
@ -269,10 +269,13 @@ void Transfer::callback_read(short flags) {
log_error(category::file_transfer, tr("invalid local write return code! ({})"), state);
}
if(target->stream_index() >= target->expected_length()) {
auto stream_index = target->stream_index();
auto expected_bytes = target->expected_length();
if(stream_index >= expected_bytes) {
this->call_callback_finished(false);
this->finalize(false);
}
this->call_callback_process(stream_index, expected_bytes);
} else {
log_warn(category::file_transfer, tr("Read {} bytes, but we're not in download mode"), buffer_length);
}
@ -292,21 +295,23 @@ void Transfer::callback_write(short flags) {
}
}
bool readd_write = false;
bool readd_write = false, readd_write_for_read = false;
if(flags & EV_WRITE) {
if(this->_state == state::CONNECTING) {
if(this->_state == state::CONNECTING)
this->handle_connected();
this->call_callback_connected();
}
pipes::buffer buffer;
while(true) {
{
lock_guard lock(this->queue_lock);
if(this->write_queue.empty())
auto size = this->write_queue.size();
if(size == 0)
break;
buffer = this->write_queue.front();
this->write_queue.pop_front();
readd_write = size > 1;
}
auto written = send(this->_socket, buffer.data_ptr<char>(), buffer.length(), MSG_DONTWAIT);
@ -348,6 +353,7 @@ void Transfer::callback_write(short flags) {
if(written < buffer.length()) {
lock_guard lock(this->queue_lock);
this->write_queue.push_front(buffer.range(written));
readd_write = true;
}
}
}
@ -361,17 +367,21 @@ void Transfer::callback_write(short flags) {
queue_length = this->write_queue.size();
}
string error;
auto bytes_to_write = source->byte_length() - source->stream_index();
auto total_bytes = source->byte_length();
auto bytes_to_write = total_bytes - source->stream_index();
while(queue_length < 8 && bytes_to_write > 0) {
uint64_t buffer_size = 1400; /* best TCP packet size (equal to the MTU) */
pipes::buffer buffer{buffer_size};
auto read_status = source->read_bytes(error, buffer.data_ptr<uint8_t>(), buffer_size);
this->last_source_read = system_clock::now();
if(read_status != error::success) {
if(read_status == error::would_block)
if(read_status == error::would_block) {
readd_write_for_read = true;
break;
else if(read_status == error::custom) {
} else if(read_status == error::custom) {
this->call_callback_failed(tr("failed to read from source: ") + error);
this->finalize(false);
return;
@ -392,12 +402,12 @@ void Transfer::callback_write(short flags) {
lock_guard lock(this->queue_lock);
this->write_queue.push_back(buffer.range(0, buffer_size));
queue_length = this->write_queue.size();
readd_write = true;
}
bytes_to_write -= buffer_size;
}
this->call_callback_process(total_bytes - bytes_to_write, total_bytes);
if(queue_length == 0) {
if(source->stream_index() == source->byte_length()) {
this->call_callback_finished(false);
@ -405,17 +415,31 @@ void Transfer::callback_write(short flags) {
return;
}
}
readd_write = queue_length > 0;
}
}
/* we only need write for connect */
if(readd_write) {
if(readd_write || readd_write_for_read) {
lock_guard lock(this->event_lock);
if(this->event_write) {
timeval timeout{1, 0};
timeval timeout{};
if(readd_write) {
/* we should be writeable within the next second or we do a keep alive circle */
timeout.tv_sec = 1;
timeout.tv_usec = 0;
} else if(readd_write_for_read) {
/* Schedule a next read attempt of our source */
timeout.tv_sec = 0;
timeout.tv_usec = 50000;
}
event_add(this->event_write, &timeout);
this->handle()->execute_event_loop();
}
} else {
log_debug(category::general, tr("No readd"));
}
}
@ -456,6 +480,7 @@ void Transfer::handle_connected() {
this->handle()->execute_event_loop();
this->_write_message(pipes::buffer_view{this->_options->transfer_key.data(), this->_options->transfer_key.length()});
this->call_callback_connected();
//We dont have to add a timeout to write for prebuffering because its already done by writing this message
}
@ -474,8 +499,14 @@ void Transfer::call_callback_finished(bool aborted) {
this->callback_finished(aborted);
}
void Transfer::call_callback_process() {
//FIXME Implement me
void Transfer::call_callback_process(size_t current, size_t max) {
auto now = system_clock::now();
if(now - milliseconds{500} > this->last_process_call)
this->last_process_call = now;
else
return;
if(this->callback_process)
this->callback_process(current, max);
}
FileTransferManager::FileTransferManager() {}
@ -657,7 +688,7 @@ JSTransfer::JSTransfer(std::shared_ptr<tc::ft::Transfer> transfer) : _transfer(m
this->_transfer->callback_failed = [&](std::string error) { this->call_failed(std::forward<string>(error)); };
this->_transfer->callback_finished = [&](bool f) { this->call_finished(std::forward<bool>(f)); };
this->_transfer->callback_start = [&] { this->call_start(); };
//this->_transfer->callback_process = [&](uint64_t a, uint64_t b) { this->call_progress(a, b); };
this->_transfer->callback_process = [&](uint64_t a, uint64_t b) { this->call_progress.call_cpy(a, b); };
}
JSTransfer::~JSTransfer() {
@ -670,6 +701,7 @@ JSTransfer::~JSTransfer() {
NAN_METHOD(JSTransfer::destory_transfer) {
//TODO!
Nan::ThrowError("Not implemented!");
}
NAN_METHOD(JSTransfer::_start) {

View File

@ -39,17 +39,11 @@ namespace tc {
virtual std::string name() const = 0;
virtual bool initialize(std::string& /* error */) = 0;
virtual void finalize() = 0;
#ifdef NODEJS_API
virtual v8::Local<v8::Value> js_object() = 0;
#endif
private:
};
class TransferSource : public TransferObject {
public:
virtual uint64_t byte_length() const = 0;
virtual uint64_t stream_index() const = 0;
virtual error::value read_bytes(std::string& /* error */, uint8_t* /* buffer */, uint64_t& /* max length/result length */) = 0;
@ -109,10 +103,10 @@ namespace tc {
std::shared_ptr<TransferObject> transfer_object() { return this->_transfer_object; }
const TransferOptions& options() { return *this->_options; }
callback_start_t callback_start = NULL;
callback_finished_t callback_finished = NULL;
callback_failed_t callback_failed = NULL;
callback_process_t callback_process = NULL;
callback_start_t callback_start{nullptr};
callback_finished_t callback_finished{nullptr};
callback_failed_t callback_failed{nullptr};
callback_process_t callback_process{nullptr};
private:
static void _callback_read(evutil_socket_t, short, void*);
static void _callback_write(evutil_socket_t, short, void*);
@ -137,6 +131,7 @@ namespace tc {
timeval alive_check_timeout{1, 0};
timeval write_timeout{1, 0};
/*
* Upload mode:
* Write the buffers left in write_queue, and if the queue length is less then 12 create new buffers.
@ -148,12 +143,13 @@ namespace tc {
/* called within the write/read callback */
void handle_disconnect();
void handle_connected();
void handle_upload_process(); /* called from the IO thread with either timeout or after a write */
void call_callback_connected();
void call_callback_failed(const std::string& /* reason */);
void call_callback_finished(bool /* aborted */);
void call_callback_process();
void call_callback_process(size_t /* current */, size_t /* max */);
std::chrono::system_clock::time_point last_process_call;
};
class FileTransferManager {

View File

@ -2,12 +2,14 @@
#include "../../logger.h"
#include <iostream>
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
using namespace tc;
using namespace tc::ft;
using namespace std;
#ifdef NODEJS_API
TransferJSBufferTarget::TransferJSBufferTarget() {
log_allocate("TransferJSBufferTarget", this);
@ -27,14 +29,6 @@ bool TransferJSBufferTarget::initialize(std::string &error) {
void TransferJSBufferTarget::finalize() { }
v8::Local<v8::Value> TransferJSBufferTarget::js_object() {
assert(v8::Isolate::GetCurrent());
if(this->_js_object)
return this->_js_object->handle();
else
return Nan::Undefined();
}
uint64_t TransferJSBufferTarget::stream_index() const {
return this->_js_buffer_index;
}
@ -71,7 +65,7 @@ NAN_METHOD(TransferJSBufferTarget::create_from_buffer) {
instance->_js_buffer_index = 0;
auto object_wrap = new TransferObjectWrap(instance, instance);
auto object_wrap = new TransferObjectWrap(instance);
auto object = Nan::NewInstance(Nan::New(TransferObjectWrap::constructor()), 0, nullptr).ToLocalChecked();
object_wrap->do_wrap(object);
info.GetReturnValue().Set(object);
@ -93,14 +87,6 @@ bool TransferJSBufferSource::initialize(std::string &string) { return true; }
void TransferJSBufferSource::finalize() { }
v8::Local<v8::Value> TransferJSBufferSource::js_object() {
assert(v8::Isolate::GetCurrent());
if(this->_js_object)
return this->_js_object->handle();
else
return Nan::Undefined();
}
uint64_t TransferJSBufferSource::stream_index() const {
return this->_js_buffer_index;
}
@ -139,7 +125,7 @@ NAN_METHOD(TransferJSBufferSource::create_from_buffer) {
instance->_js_buffer_index = 0;
auto object_wrap = new TransferObjectWrap(instance, instance);
auto object_wrap = new TransferObjectWrap(instance);
auto object = Nan::NewInstance(Nan::New(TransferObjectWrap::constructor()), 0, nullptr).ToLocalChecked();
object_wrap->do_wrap(object);
info.GetReturnValue().Set(object);
@ -149,7 +135,7 @@ NAN_METHOD(TransferJSBufferSource::create_from_buffer) {
NAN_MODULE_INIT(TransferObjectWrap::Init) {
auto klass = Nan::New<v8::FunctionTemplate>(TransferObjectWrap::NewInstance);
klass->SetClassName(Nan::New("TransferObjectWrap").ToLocalChecked());
klass->InstanceTemplate()->SetInternalFieldCount(2);
klass->InstanceTemplate()->SetInternalFieldCount(1);
constructor().Reset(Nan::GetFunction(klass).ToLocalChecked());
}
@ -157,6 +143,139 @@ NAN_MODULE_INIT(TransferObjectWrap::Init) {
NAN_METHOD(TransferObjectWrap::NewInstance) {
if(!info.IsConstructCall())
Nan::ThrowError("invalid invoke!");
Nan::SetInternalFieldPointer(info.This(), 1, (void*) TransferObjectWrap::INTERNAL_CHECK_POINTER);
}
void TransferObjectWrap::do_wrap(v8::Local<v8::Object> object) {
this->Wrap(object);
auto source = dynamic_pointer_cast<TransferSource>(this->target());
auto target = dynamic_pointer_cast<TransferTarget>(this->target());
auto direction = source ? "upload" : "download";
Nan::Set(object,
Nan::New<v8::String>("direction").ToLocalChecked(),
v8::String::NewFromUtf8(Nan::GetCurrentContext()->GetIsolate(), direction).ToLocalChecked()
);
Nan::Set(object,
Nan::New<v8::String>("name").ToLocalChecked(),
v8::String::NewFromUtf8(Nan::GetCurrentContext()->GetIsolate(), this->target()->name().c_str()).ToLocalChecked()
);
if(source) {
Nan::Set(object,
Nan::New<v8::String>("total_size").ToLocalChecked(),
Nan::New<v8::Number>(source->byte_length())
);
}
if(target) {
Nan::Set(object,
Nan::New<v8::String>("expected_size").ToLocalChecked(),
Nan::New<v8::Number>(target->expected_length())
);
}
}
#endif
TransferFileSource::TransferFileSource(std::string path, std::string name) : _path{std::move(path)}, _name{std::move(name)} { }
#ifdef WIN32
#define u8path path
#endif
uint64_t TransferFileSource::byte_length() const {
if(file_size.has_value())
return file_size.value();
auto file = fs::u8path(this->_path) / fs::u8path(this->_name);
error_code error;
auto size = fs::file_size(file,error);
if(error)
size = 0;
return (this->file_size = std::make_optional<size_t>(size)).value();
}
bool TransferFileSource::initialize(std::string &error) {
auto file = fs::u8path(this->_path) / fs::u8path(this->_name);
error_code errc;
if(!fs::exists(file)) {
error = "file not found";
return false;
}
if(errc) {
error = "failed to test for file existence: " + to_string(errc.value()) + "/" + errc.message();
return false;
}
if(!fs::is_regular_file(file, errc)) {
error = "target file isn't a regular file";
return false;
}
if(errc) {
error = "failed to test for file regularity: " + to_string(errc.value()) + "/" + errc.message();
return false;
}
this->file_stream = std::ifstream{file, std::ifstream::in | std::ifstream::binary};
if(!this->file_stream) {
error = "failed to open file";
return false;
}
this->file_stream.seekg(0, std::ifstream::end);
auto length = this->file_stream.tellg();
if(length != this->byte_length()) {
error = "file length missmatch";
return false;
}
this->file_stream.seekg(0, std::ifstream::beg);
this->position = 0;
return true;
}
void TransferFileSource::finalize() {
if(this->file_stream)
this->file_stream.close();
this->position = 0;
}
error::value TransferFileSource::read_bytes(std::string &error, uint8_t *buffer, uint64_t &length) {
auto result = this->file_stream.readsome((char*) buffer, length);
if(result > 0) {
length = result;
this->position += result;
return error::success;
}
if(!this->file_stream) {
if(this->file_stream.eof())
error = "eof reached";
else
error = "io error. failed to read";
} else {
error = "read returned " + to_string(result) + "/" + to_string(length);
}
return error::custom;
}
uint64_t TransferFileSource::stream_index() const {
return this->position;
}
#ifdef NODEJS_API
NAN_METHOD(TransferFileSource::create) {
if(info.Length() != 2 || !info[0]->IsString() || !info[1]->IsString()) {
Nan::ThrowError("invalid argument");
return;
}
auto instance = make_shared<TransferFileSource>(*Nan::Utf8String{info[0]}, *Nan::Utf8String{info[1]});
auto object_wrap = new TransferObjectWrap(instance);
auto object = Nan::NewInstance(Nan::New(TransferObjectWrap::constructor()), 0, nullptr).ToLocalChecked();
object_wrap->do_wrap(object);
info.GetReturnValue().Set(object);
}
#endif

View File

@ -1,29 +1,48 @@
#pragma once
#pragma once
#include <fstream>
#include "FileTransferManager.h"
namespace tc {
namespace ft {
class TransferFileSource : public TransferSource {
public:
TransferFileSource(std::string /* path */, std::string /* name */);
[[nodiscard]] inline std::string file_path() const { return this->_path; }
[[nodiscard]] inline std::string file_name() const { return this->_name; }
std::string name() const override { return "TransferFileSource"; }
bool initialize(std::string &string) override;
void finalize() override;
uint64_t byte_length() const override;
uint64_t stream_index() const override;
error::value read_bytes(std::string &string, uint8_t *uint8, uint64_t &uint64) override;
#ifdef NODEJS_API
class TransferObjectWrap;
class TransferJSObject {
friend class TransferObjectWrap;
protected:
TransferObjectWrap* _js_object;
static NAN_METHOD(create);
#endif
private:
std::string _path;
std::string _name;
uint64_t position{0};
std::ifstream file_stream{};
mutable std::optional<size_t> file_size;
};
#ifdef NODEJS_API
class TransferObjectWrap : public Nan::ObjectWrap {
public:
static NAN_MODULE_INIT(Init);
static NAN_METHOD(NewInstance);
static constexpr uintptr_t INTERNAL_CHECK_POINTER = 0xC0FEBABE;
static inline bool is_wrap(const v8::Local<v8::Value>& value) {
if(value.As<v8::Object>().IsEmpty())
return false;
return Nan::GetInternalFieldPointer(value.As<v8::Object>(), 1) == (void*) INTERNAL_CHECK_POINTER;
return value->InstanceOf(Nan::GetCurrentContext(), Nan::New<v8::Function>(constructor())).FromMaybe(false);
}
static inline Nan::Persistent<v8::Function> & constructor() {
@ -31,50 +50,19 @@ namespace tc {
return my_constructor;
}
explicit TransferObjectWrap(std::shared_ptr<TransferJSObject> target, std::shared_ptr<TransferObject> object) : _target(std::move(target)), _transfer(std::move(object)) {
assert(this->_target);
_target->_js_object = this;
explicit TransferObjectWrap(std::shared_ptr<TransferObject> object) : _transfer(std::move(object)) {
}
virtual ~TransferObjectWrap() {
_target->_js_object = nullptr;
}
~TransferObjectWrap() = default;
void do_wrap(v8::Local<v8::Object> object) { this->Wrap(object); }
void do_wrap(v8::Local<v8::Object> object);
std::shared_ptr<TransferObject> target() { return this->_transfer; }
private:
std::shared_ptr<TransferJSObject> _target;
std::shared_ptr<TransferObject> _transfer;
};
class TransferJSBufferTarget : public TransferTarget, public TransferJSObject {
public:
TransferJSBufferTarget();
virtual ~TransferJSBufferTarget();
std::string name() const override { return "TransferJSBufferTarget"; }
bool initialize(std::string &string) override;
void finalize() override;
v8::Local<v8::Value> js_object() override;
uint64_t stream_index() const override;
uint64_t expected_length() const override { return this->_js_buffer_length; }
error::value write_bytes(std::string &string, uint8_t *uint8, uint64_t uint64) override;
static NAN_METHOD(create_from_buffer);
private:
v8::Global<v8::ArrayBuffer> _js_buffer;
void* _js_buffer_source;
uint64_t _js_buffer_length;
uint64_t _js_buffer_index;
};
class TransferJSBufferSource : public TransferSource, public TransferJSObject {
class TransferJSBufferSource : public TransferSource {
public:
TransferJSBufferSource();
virtual ~TransferJSBufferSource();
@ -84,8 +72,6 @@ namespace tc {
void finalize() override;
v8::Local<v8::Value> js_object() override;
uint64_t stream_index() const override;
uint64_t byte_length() const override;
@ -100,6 +86,30 @@ namespace tc {
uint64_t _js_buffer_length;
uint64_t _js_buffer_index;
};
class TransferJSBufferTarget : public TransferTarget {
public:
TransferJSBufferTarget();
virtual ~TransferJSBufferTarget();
std::string name() const override { return "TransferJSBufferTarget"; }
bool initialize(std::string &string) override;
void finalize() override;
uint64_t stream_index() const override;
uint64_t expected_length() const override { return this->_js_buffer_length; }
error::value write_bytes(std::string &string, uint8_t *uint8, uint64_t uint64) override;
static NAN_METHOD(create_from_buffer);
private:
v8::Global<v8::ArrayBuffer> _js_buffer;
void* _js_buffer_source;
uint64_t _js_buffer_length;
uint64_t _js_buffer_index;
};
#endif
}
}

View File

@ -4,7 +4,11 @@ module.paths.push("../../build/win32_64");
import * as fs from "fs";
import * as net from "net";
const original_require = require;
require = (module => original_require("/home/wolverindev/TeaSpeak-Client/client/native/build/linux_x64/" + module + ".node")) as any;
import * as handle from "teaclient_connection";
require = original_require;
const buffer_size = 24;
const start_server = async () => {
@ -33,12 +37,11 @@ const start_server = async () => {
}
}
}
console.log("[SERVER] Received data: %s", buffer.toString());
//console.log("[SERVER] Received data: %s", buffer.toString());
});
});
};
function str2ab(str) {
var buf = new ArrayBuffer(str.length); // 2 bytes for each char
var bufView = new Uint8Array(buf);
@ -53,33 +56,45 @@ start_server().catch(error => {
}).then(() => {
const target_buffer = new Uint8Array(buffer_size);
const destination = handle.ft.download_transfer_object_from_buffer(target_buffer.buffer);
const source = handle.ft.upload_transfer_object_from_buffer(str2ab("Hello World"));
//const source = handle.ft.upload_transfer_object_from_buffer(str2ab("Hello World"));
//console.log(source);
//const source = handle.ft.upload_transfer_object_from_file(__dirname, "test_upload.txt");
const source = handle.ft.upload_transfer_object_from_file("/home/wolverindev/Downloads", "xxx.iso");
console.log(source);
const upload = true;
const transfer = handle.ft.spawn_connection({
client_transfer_id: 0,
server_transfer_id: 0,
object: destination,
transfer_key: "ft_download_data",
//object: source,
//transfer_key: "ft_upload_data__",
object: upload ? source : destination,
transfer_key: upload ? "ft_upload_data__" : "ft_download_data",
remote_address: "localhost",
remote_port: 30303
});
transfer.callback_failed = message => {
console.log("FT failed: %o", message);
console.log("[FT] failed: %o", message);
};
transfer.callback_finished = success => {
console.log("FT done (%o)", success);
console.log("Buffer: %o", String.fromCharCode.apply(null, target_buffer));
transfer.callback_finished = aborted => {
console.log("[FT] done (Aborted %o)", aborted);
if(!upload)
console.log("[FT] Buffer: %o", String.fromCharCode.apply(null, target_buffer));
//console.log("A: %o", transfer);
};
let last = 0;
transfer.callback_progress = (current, max) => {
const diff = current - last;
last = current;
console.log("[FT] Progress: %d|%d (%d) %dmb/s", current, max, Math.ceil(current / max * 100), Math.ceil(diff / 1024 / 1024));
};
transfer.callback_start = () => {
console.log("FT start");
console.log("[FT] start");
};
transfer.start();

View File

@ -0,0 +1 @@
THis is a file for upload