From 896bba7c4b18eee75ab7f410ef9100b0334cea7b Mon Sep 17 00:00:00 2001 From: zefie Date: Thu, 12 Aug 2021 00:42:54 -0400 Subject: [PATCH] Revert "undo streaming data out" This reverts commit d37c2f60ecc37122216ad81e5c3b11fa8f655a82. --- zefie_wtvp_minisrv/app.js | 40 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/zefie_wtvp_minisrv/app.js b/zefie_wtvp_minisrv/app.js index 6046689c..7a84ef55 100644 --- a/zefie_wtvp_minisrv/app.js +++ b/zefie_wtvp_minisrv/app.js @@ -721,21 +721,55 @@ async function sendToClient(socket, headers_obj, data) { var toClient = null; if (typeof data == 'string') { toClient = headers + end_of_line + data; - socket.write(Buffer.from(toClient)); + sendToSocket(socket, Buffer.from(toClient)); } else if (typeof data == 'object') { if (minisrv_config.config.debug_flags.quiet) var verbosity_mod = (headers_obj["wtv-encrypted"] == 'true') ? " encrypted response" : ""; if (socket_sessions[socket.id].secure_headers == true) { // encrypt headers if (minisrv_config.config.debug_flags.quiet) verbosity_mod += " with encrypted headers"; var enc_headers = socket_sessions[socket.id].wtvsec.Encrypt(1, headers + end_of_line); - socket.write(new Buffer.from(concatArrayBuffer(enc_headers, data))); + sendToSocket(socket, new Buffer.from(concatArrayBuffer(enc_headers, data))); } else { - socket.write(new Buffer.from(concatArrayBuffer(Buffer.from(headers + end_of_line), data))); + sendToSocket(socket, new Buffer.from(concatArrayBuffer(Buffer.from(headers + end_of_line), data))); } if (minisrv_config.config.debug_flags.quiet) console.log(" * Sent" + verbosity_mod + " " + headers_obj.http_response + " to client (Content-Type:", headers_obj['Content-Type'], "~", headers_obj['Content-length'], "bytes)"); } } +async function sendToSocket(socket, data, chunk_offset = 0) { + // buffer size = lesser of minisrv_config.config.chunk_size or size remaining + var chunk_size = 16384; + var can_write = true; + var data_left = 0; + while (data_left != data.byteLength && can_write) { + var data_left = (data.byteLength - chunk_offset); + if (data_left === 0) break; + + var buffer_size = (data_left >= chunk_size) ? chunk_size : data_left; + var chunk = new Buffer.alloc(buffer_size); + chunk_offset += data.copy(chunk, 0, chunk_offset, (chunk_offset + buffer_size)); + can_write = socket.write(chunk); + if (!can_write) { + socket.once('drain', function () { + sendToSocket(socket, data, socket.bytesWritten); + }); + break; + } + } + if (data_left == data.byteLength) { + if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data; + if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer; + if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer; + if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer; + if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers; + if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data; + if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length; + if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown; + socket.setTimeout(minisrv_config.config.socket_timeout * 1000); + if (socket_sessions[socket.id].close_me) socket.end(); + if (socket_sessions[socket.id].destroy_me) socket.destroy(); + } +} function concatArrayBuffer(buffer1, buffer2) { var tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength);