streaming output data with drain detection support

- other stuff x.x
This commit is contained in:
zefie
2021-08-11 19:48:27 -04:00
parent fecbd032da
commit 0f0cc623c0
4 changed files with 100 additions and 35 deletions

View File

@@ -57,7 +57,7 @@ if (request_headers['wtv-request-type'] == 'download') {
break; break;
case "GET": case "GET":
wtvdl.get(update_list[k].file.replace(diskmap_data.base, ""), update_list[k].file, service_name + ":/" + update_list[k].location, diskmap_group_data, update_list[k].checksum) wtvdl.get(update_list[k].file.replace(diskmap_data.base, ""), update_list[k].file, service_name + ":/" + update_list[k].location, diskmap_group_data, update_list[k].checksum, update_list[k].uncompressed_size || null, update_list[k].original_filename)
break; break;
} }
}); });
@@ -188,6 +188,8 @@ if (request_headers['wtv-request-type'] == 'download') {
if (!fs.existsSync(post_match_file)) post_match_file = null; if (!fs.existsSync(post_match_file)) post_match_file = null;
}); });
var post_match_file_lstat = fs.lstatSync(post_match_file); var post_match_file_lstat = fs.lstatSync(post_match_file);
var post_match_file_data = new Buffer.from(fs.readFileSync(post_match_file, { var post_match_file_data = new Buffer.from(fs.readFileSync(post_match_file, {
encoding: null, encoding: null,
@@ -196,11 +198,23 @@ if (request_headers['wtv-request-type'] == 'download') {
diskmap_group_data.files[k].base = diskmap_group_data.base; diskmap_group_data.files[k].base = diskmap_group_data.base;
diskmap_group_data.files[k].last_modified = (new Date(new Date(post_match_file_lstat.mtime).toUTCString()) / 1000); diskmap_group_data.files[k].last_modified = (new Date(new Date(post_match_file_lstat.mtime).toUTCString()) / 1000);
diskmap_group_data.files[k].content_length = post_match_file_lstat.size; diskmap_group_data.files[k].content_length = post_match_file_lstat.size;
diskmap_group_data.files[k].checksum = CryptoJS.MD5(CryptoJS.lib.WordArray.create(post_match_file_data)).toString(CryptoJS.enc.Hex).toLowerCase();
diskmap_group_data.files[k].action = (diskmap_group_data.files[k].action) ? diskmap_group_data.files[k].action.toUpperCase() : "GET"; diskmap_group_data.files[k].action = (diskmap_group_data.files[k].action) ? diskmap_group_data.files[k].action.toUpperCase() : "GET";
if (wtvshared.getFileExt(post_match_file).toLowerCase() == "gz") {
// we need the checksum of the uncompressed data
var gunzipped = zlib.gunzipSync(post_match_file_data);
diskmap_group_data.files[k].checksum = CryptoJS.MD5(CryptoJS.lib.WordArray.create(gunzipped)).toString(CryptoJS.enc.Hex).toLowerCase();
var gzip_fn_end = post_match_file_data.indexOf("\0", 10);
if (!diskmap_group_data.files[k].dont_extract_filename) {
diskmap_group_data.files[k].original_filename = post_match_file_data.toString('utf8', 10, gzip_fn_end);
}
//diskmap_group_data.files[k].uncompressed_size = gunzipped.byteLength;
gunzipped = null;
} else {
diskmap_group_data.files[k].checksum = CryptoJS.MD5(CryptoJS.lib.WordArray.create(post_match_file_data)).toString(CryptoJS.enc.Hex).toLowerCase();
}
if (parseInt(diskmap_group_data.files[k].last_modified) > newest_file_epoch) newest_file_epoch = parseInt(diskmap_group_data.files[k].last_modified); if (parseInt(diskmap_group_data.files[k].last_modified) > newest_file_epoch) newest_file_epoch = parseInt(diskmap_group_data.files[k].last_modified);
//if (!diskmap_group_data.files[k].display) diskmap_group_data.files[k].display = diskmap_group_data.display;
diskmap_group_data.files[k].invalid = true; diskmap_group_data.files[k].invalid = true;
wtv_download_list.push(diskmap_group_data.files[k]); wtv_download_list.push(diskmap_group_data.files[k]);
@@ -271,6 +285,6 @@ if (request_headers['wtv-request-type'] == 'download') {
} else if (request_headers.query.group && request_headers.query.diskmap) { } else if (request_headers.query.group && request_headers.query.diskmap) {
var message = request_headers.query.message || "Retrieving files..."; var message = request_headers.query.message || "Retrieving files...";
var main_message = request_headers.query.main_message || "Your receiver is downloading files."; var main_message = request_headers.query.main_message || "Your receiver is downloading files.";
headers = "200 OK\nContent-Type: text/html"; headers = "200 OK\nwtv-connection-close: close\nConnection: close\nContent-Type: text/html";
data = wtvdl.getSyncPage(message, request_headers.query.group, request_headers.query.diskmap, main_message, message, force_update) data = wtvdl.getSyncPage(message, request_headers.query.group, request_headers.query.diskmap, main_message, message, force_update)
} }

View File

@@ -146,16 +146,21 @@ class WTVDownloadList {
* @param {string} checksum md5sum of the file * @param {string} checksum md5sum of the file
* @param {string} file_permission File permissions * @param {string} file_permission File permissions
*/ */
get(file, path, source, group, checksum = null, uncompressed_size = null, file_permission = 'r') { get(file, path, source, group, checksum = null, uncompressed_size = null, original_filename = null, file_permission = 'r') {
file = this.wtvshared.stripGzipFromPath(file); if (original_filename) this.download_list += "GET " + original_filename + "\n";
this.download_list += "GET " + file + "\n"; else this.download_list += "GET " + file + "\n";
this.download_list += "group: " + group + "-UPDATE\n"; this.download_list += "group: " + group + "-UPDATE\n";
this.download_list += "location: " + source + "\n"; this.download_list += "location: " + source + "\n";
this.download_list += "file-permission: " + file_permission + "\n"; this.download_list += "file-permission: " + file_permission + "\n";
if (checksum != null) this.download_list += "wtv-checksum: " + checksum + "\n"; if (checksum != null) this.download_list += "wtv-checksum: " + checksum + "\n";
if (uncompressed_size != null) this.download_list += "wtv-uncompressed-filesize: " + uncompressed_size + "\n"; if (uncompressed_size != null) this.download_list += "wtv-uncompressed-filesize: " + uncompressed_size + "\n";
this.download_list += "service-source-location: /webtv/content/" + source.substr(source.indexOf('-') + 1, source.indexOf(':/') - source.indexOf('-') - 1) + "d/" + source.substr(source.indexOf(':/') + 2) + "\n"; this.download_list += "service-source-location: /webtv/content/" + source.substr(source.indexOf('-') + 1, source.indexOf(':/') - source.indexOf('-') - 1) + "d/" + source.substr(source.indexOf(':/') + 2) + "\n";
path = this.wtvshared.stripGzipFromPath(path); if (original_filename) {
file = file.split('/');
file = file[file.length - 1];
path = path.replace(file, original_filename);
}
this.download_list += "client-dest-location: " + path + "\n\n"; this.download_list += "client-dest-location: " + path + "\n\n";
} }
@@ -241,7 +246,7 @@ class WTVDownloadList {
if (fail_url === null) fail_url = new this.clientShowAlert({ if (fail_url === null) fail_url = new this.clientShowAlert({
'image': this.minisrv_config.config.service_logo, 'image': this.minisrv_config.config.service_logo,
'message': "Download failed...", 'message': "Download failed...",
'buttonlabel1': "Okay...", 'buttonlabel1': "Fuck!",
'buttonaction1': "client:goback", 'buttonaction1': "client:goback",
'noback': true, 'noback': true,
}).getURL(); }).getURL();

View File

@@ -94,6 +94,16 @@ class WTVShared {
return path; return path;
} }
/**
* Returns a percentage
* @param {number} partialValue
* @param {number} totalValue
* @returns {number} percentage
*/
getPercentage = function (partialValue, totalValue) {
return Math.floor((100 * partialValue) / totalValue);
}
/** /**
* If the file ends with .gz, remove it * If the file ends with .gz, remove it
* @param {string} path * @param {string} path

View File

@@ -584,26 +584,26 @@ async function sendToClient(socket, headers_obj, data) {
} }
} }
// if box can do compression, see if its worth enabling if (content_length > 0) {
// small files actually get larger, so don't compress them if (socket_sessions[socket.id].wtv_request_type == "download") {
var compression_type = 0; if (headers_obj['Content-Type'] != "wtv/download-list") {
if (content_length >= 256) compression_type = wtvmime.shouldWeCompress(ssid_sessions[socket.ssid], headers_obj); if (wtvshared.getFileExt(socket_sessions[socket.id].request_headers.request_url).toLowerCase() == "gz") {
// we need the checksum of the uncompressed data
// disk service hack before further processing :)
if (socket_sessions[socket.id].wtv_request_type == "download" && content_length > 0 && headers_obj['Content-Type'] != "wtv/download-list") {
if (minisrv_config.config.debug_flags.debug) console.log(" * Calculating uncompressed size and checksum...");
if (headers_obj['Content-Type'] == "application/gzip") {
var gunzipped = zlib.gunzipSync(data); var gunzipped = zlib.gunzipSync(data);
headers_obj['wtv-checksum'] = CryptoJS.MD5(CryptoJS.lib.WordArray.create(gunzipped)).toString(CryptoJS.enc.Hex).toLowerCase(); headers_obj['wtv-checksum'] = CryptoJS.MD5(CryptoJS.lib.WordArray.create(gunzipped)).toString(CryptoJS.enc.Hex).toLowerCase();
headers_obj['wtv-uncompressed-filesize'] = gunzipped.byteLength; headers_obj['wtv-uncompressed-size'] = gunzipped.byteLength;
headers_obj['Content-Type'] = wtvmime.getSimpleContentType(wtvshared.stripGzipFromPath(socket_sessions[socket.id].request_headers.request_url));
gunzipped = null; gunzipped = null;
} else { } else {
headers_obj['wtv-checksum'] = CryptoJS.MD5(CryptoJS.lib.WordArray.create(data)).toString(CryptoJS.enc.Hex).toLowerCase(); headers_obj['wtv-checksum'] = CryptoJS.MD5(CryptoJS.lib.WordArray.create(data)).toString(CryptoJS.enc.Hex).toLowerCase();
} }
} }
delete socket_sessions[socket.id].wtv_request_type; }
delete socket_sessions[socket.id].request_headers; }
// if box can do compression, see if its worth enabling
// small files actually get larger, so don't compress them
var compression_type = 0;
if (content_length >= 256) compression_type = wtvmime.shouldWeCompress(ssid_sessions[socket.ssid], headers_obj);
// compress if needed // compress if needed
if (compression_type > 0 && content_length > 0 && headers_obj['http_response'].substring(0, 3) == "200") { if (compression_type > 0 && content_length > 0 && headers_obj['http_response'].substring(0, 3) == "200") {
@@ -641,7 +641,7 @@ async function sendToClient(socket, headers_obj, data) {
} }
// encrypt if needed // encrypt if needed
if (socket_sessions[socket.id].secure == true) { if (socket_sessions[socket.id].secure == true && !socket_sessions[socket.id].do_not_encrypt) {
headers_obj["wtv-encrypted"] = 'true'; headers_obj["wtv-encrypted"] = 'true';
headers_obj = moveObjectElement('wtv-encrypted', 'Connection', headers_obj); headers_obj = moveObjectElement('wtv-encrypted', 'Connection', headers_obj);
if (content_length > 0 && socket_sessions[socket.id].wtvsec) { if (content_length > 0 && socket_sessions[socket.id].wtvsec) {
@@ -651,6 +651,11 @@ async function sendToClient(socket, headers_obj, data) {
} }
} }
if (socket_sessions[socket.id].do_not_encrypt) {
if (headers_obj["wtv-encrypted"]) delete headers_obj["wtv-encrypted"];
if (headers_obj["secure"]) delete headers_obj["secure"];
}
// calculate content length // calculate content length
// make sure we are using our Content-length and not one set in a script. // make sure we are using our Content-length and not one set in a script.
if (headers_obj["Content-Length"]) delete headers_obj["Content-Length"]; if (headers_obj["Content-Length"]) delete headers_obj["Content-Length"];
@@ -676,6 +681,21 @@ async function sendToClient(socket, headers_obj, data) {
headers_obj['http_response'] = "HTTP/1.0 " + headers_obj['http_response']; headers_obj['http_response'] = "HTTP/1.0 " + headers_obj['http_response'];
} }
/* // wtv-request-type download wants minimal headers?
if (data.byteLength > 0) {
if (socket_sessions[socket.id].wtv_request_type == "download") {
if (headers_obj['Content-Type'] != "wtv/download-list") {
// minimalize headers
var new_headers = { "http_response": headers_obj['http_response'].split(" ")[0] + " " }
if (headers_obj['wtv-encrypted']) new_headers['wtv-encrypted'] = headers_obj['wtv-encrypted'];
new_headers["content-type"] = headers_obj['Content-Type'];
new_headers["content-length"] = headers_obj['Content-length'];
headers_obj = new_headers;
}
}
}
*/
// header object to string // header object to string
if (minisrv_config.config.debug_flags.show_headers) console.log(" * Outgoing headers on socket ID", socket.id, (await wtvshared.filterSSID(headers_obj))); if (minisrv_config.config.debug_flags.show_headers) console.log(" * Outgoing headers on socket ID", socket.id, (await wtvshared.filterSSID(headers_obj)));
Object.keys(headers_obj).forEach(function (k) { Object.keys(headers_obj).forEach(function (k) {
@@ -696,16 +716,16 @@ async function sendToClient(socket, headers_obj, data) {
var toClient = null; var toClient = null;
if (typeof data == 'string') { if (typeof data == 'string') {
toClient = headers + end_of_line + data; toClient = headers + end_of_line + data;
socket.write(toClient); sendToSocket(socket, Buffer.from(toClient));
} else if (typeof data == 'object') { } else if (typeof data == 'object') {
if (minisrv_config.config.debug_flags.quiet) var verbosity_mod = (headers_obj["wtv-encrypted"] == 'true') ? " encrypted response" : ""; if (minisrv_config.config.debug_flags.quiet) var verbosity_mod = (headers_obj["wtv-encrypted"] == 'true') ? " encrypted response" : "";
if (socket_sessions[socket.id].secure_headers == true) { if (socket_sessions[socket.id].secure_headers == true) {
// encrypt headers // encrypt headers
if (minisrv_config.config.debug_flags.quiet) verbosity_mod += " with encrypted headers"; if (minisrv_config.config.debug_flags.quiet) verbosity_mod += " with encrypted headers";
var enc_headers = socket_sessions[socket.id].wtvsec.Encrypt(1, headers + end_of_line); var enc_headers = socket_sessions[socket.id].wtvsec.Encrypt(1, headers + end_of_line);
socket.write(new Uint8Array(concatArrayBuffer(enc_headers, data))); sendToSocket(socket, new Buffer.from(concatArrayBuffer(enc_headers, data)));
} else { } else {
socket.write(new Uint8Array(concatArrayBuffer(Buffer.from(headers + end_of_line), data))); sendToSocket(socket, new Buffer.from(concatArrayBuffer(Buffer.from(headers + end_of_line), data)));
} }
if (minisrv_config.config.debug_flags.quiet) console.log(" * Sent" + verbosity_mod + " " + headers_obj.http_response + " to client (Content-Type:", headers_obj['Content-Type'], "~", headers_obj['Content-length'], "bytes)"); if (minisrv_config.config.debug_flags.quiet) console.log(" * Sent" + verbosity_mod + " " + headers_obj.http_response + " to client (Content-Type:", headers_obj['Content-Type'], "~", headers_obj['Content-length'], "bytes)");
} }
@@ -727,6 +747,25 @@ async function sendToClient(socket, headers_obj, data) {
} }
} }
async function sendToSocket(socket, data, chunk_offset = 0) {
socket.setNoDelay(true);
// buffer size = lesser of minisrv_config.config.chunk_size or size remaining
var chunk_size = 16384;
var buffer_size = ((data.byteLength - chunk_offset) >= chunk_size) ? chunk_size : (data.byteLength - chunk_offset);
var chunk = new Buffer.alloc(buffer_size);
data.copy(chunk, 0, chunk_offset, (chunk_offset + buffer_size));
if (!socket.write(chunk)) {
socket.once('drain', function () {
sendToSocket(socket, data, socket.bytesWritten);
});
} else {
if (socket.bytesWritten == data.byteLength) socket.end();
else setTimeout(function () {
sendToSocket(socket, data, socket.bytesWritten);
}, 10);
}
}
function concatArrayBuffer(buffer1, buffer2) { function concatArrayBuffer(buffer1, buffer2) {
var tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength); var tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength);
tmp.set(new Uint8Array(buffer1), 0); tmp.set(new Uint8Array(buffer1), 0);
@@ -1136,10 +1175,7 @@ async function processRequest(socket, data_hex, skipSecure = false, encryptedReq
console.log(" * ", Math.floor(new Date().getTime() / 1000), "Receiving", post_string, "data on", socket.id, "[", socket_sessions[socket.id].post_data.length / 2, "of", socket_sessions[socket.id].post_data_length, "bytes ]"); console.log(" * ", Math.floor(new Date().getTime() / 1000), "Receiving", post_string, "data on", socket.id, "[", socket_sessions[socket.id].post_data.length / 2, "of", socket_sessions[socket.id].post_data_length, "bytes ]");
} else { } else {
// calculate and display percentage of data received // calculate and display percentage of data received
var getPercentage = function (partialValue, totalValue) { var postPercent = wtvshared.getPercentage(socket_sessions[socket.id].post_data.length, (socket_sessions[socket.id].post_data_length * 2));
return Math.floor((100 * partialValue) / totalValue);
}
var postPercent = getPercentage(socket_sessions[socket.id].post_data.length, (socket_sessions[socket.id].post_data_length * 2));
if (minisrv_config.config.post_percentages) { if (minisrv_config.config.post_percentages) {
if (minisrv_config.config.post_percentages.includes(postPercent)) { if (minisrv_config.config.post_percentages.includes(postPercent)) {
if (!socket_sessions[socket.id].post_data_percents_shown) socket_sessions[socket.id].post_data_percents_shown = new Array(); if (!socket_sessions[socket.id].post_data_percents_shown) socket_sessions[socket.id].post_data_percents_shown = new Array();