From b294771061dea4768410f6a3aac951a5452b99b6 Mon Sep 17 00:00:00 2001 From: zefie Date: Tue, 28 Apr 2026 23:11:06 -0400 Subject: [PATCH] Allow throttling sendToClient() --- zefie_wtvp_minisrv/app.js | 114 +++++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 40 deletions(-) diff --git a/zefie_wtvp_minisrv/app.js b/zefie_wtvp_minisrv/app.js index 95ea9baa..027cb8b5 100644 --- a/zefie_wtvp_minisrv/app.js +++ b/zefie_wtvp_minisrv/app.js @@ -1256,7 +1256,7 @@ minisrv-no-mail-count: true`; } } -async function sendToClient(socket, headers_obj, data = null) { +async function sendToClient(socket, headers_obj, data = null, throttle = 0) { let headers = ""; let content_length = 0; const eol = "\n"; @@ -1614,64 +1614,98 @@ async function sendToClient(socket, headers_obj, data = null) { let toClient = null; if (typeof data === 'string') { toClient = headers + eol + data; - sendToSocket(socket, Buffer.from(toClient)); + sendToSocket(socket, Buffer.from(toClient), throttle); } else if (typeof data === 'object') { let verbosity_mod = (headers_obj["wtv-encrypted"] === 'true') ? " encrypted response" : ""; if (socket_sessions[socket.id].secure_headers === true) { // encrypt headers if (minisrv_config.config.debug_flags.quiet) verbosity_mod += " with encrypted headers"; const enc_headers = socket_sessions[socket.id].wtvsec.Encrypt(1, headers + eol); - sendToSocket(socket, new Buffer.from(concatArrayBuffer(enc_headers, data))); + sendToSocket(socket, new Buffer.from(concatArrayBuffer(enc_headers, data)), throttle); } else { - sendToSocket(socket, new Buffer.from(concatArrayBuffer(Buffer.from(headers + eol), data))); + sendToSocket(socket, new Buffer.from(concatArrayBuffer(Buffer.from(headers + eol), data)), throttle); } if (minisrv_config.config.debug_flags.quiet) console.debug(" * Sent" + verbosity_mod + " " + headers_obj.Status + " to client (Content-Type:", headers_obj['Content-type'], "~", headers_obj['Content-length'], "bytes)"); } } } -async function sendToSocket(socket, data) { +async function sendToSocket(socket, data, throttle = 0) { const chunk_size = 16384; - let can_write = true; - let close_socket = false; - let expected_data_out = 0; - while ((socket.bytesWritten === 0 || socket.bytesWritten !== expected_data_out) && can_write) { - if (expected_data_out === 0) expected_data_out = data.byteLength + (socket_sessions[socket.id].socket_total_written || 0); - if (socket.bytesWritten === expected_data_out) break; + if (!throttle || throttle <= 0) { + let can_write = true; + let close_socket = false; + let expected_data_out = 0; + while ((socket.bytesWritten === 0 || socket.bytesWritten !== expected_data_out) && can_write) { + if (expected_data_out === 0) expected_data_out = data.byteLength + (socket_sessions[socket.id].socket_total_written || 0); + if (socket.bytesWritten === expected_data_out) break; - const data_left = (expected_data_out - socket.bytesWritten); - // buffer size = lesser of chunk_size or size remaining - const buffer_size = (data_left >= chunk_size) ? chunk_size : data_left; - if (buffer_size < 0) { - socket.destroy(); - close_socket = true; - break; + const data_left = (expected_data_out - socket.bytesWritten); + // buffer size = lesser of chunk_size or size remaining + const buffer_size = (data_left >= chunk_size) ? chunk_size : data_left; + if (buffer_size < 0) { + socket.destroy(); + close_socket = true; + break; + } + const offset = (data.byteLength - data_left); + const chunk = new Buffer.alloc(buffer_size); + data.copy(chunk, 0, offset, (offset + buffer_size)); + can_write = socket.write(chunk); + if (!can_write) { + socket.once('drain', function () { + sendToSocket(socket, data, throttle); + }); + break; + } } - const offset = (data.byteLength - data_left); - const chunk = new Buffer.alloc(buffer_size); - data.copy(chunk, 0, offset, (offset + buffer_size)); - can_write = socket.write(chunk); + if (socket.bytesWritten === expected_data_out || close_socket) { + socket_sessions[socket.id].socket_total_written = socket.bytesWritten; + if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data; + if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer; + if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer; + if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer; + if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers; + if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data; + if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length; + if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown; + socket.setTimeout(minisrv_config.config.socket_timeout * 1000); + if (socket_sessions[socket.id] && socket_sessions[socket.id].close_me) socket.end(); + if (socket_sessions[socket.id].destroy_me) socket.destroy(); + } + return; + } + + let offset = 0; + while (offset < data.byteLength) { + const buffer_size = Math.min(chunk_size, data.byteLength - offset); + const chunk = Buffer.alloc(buffer_size); + data.copy(chunk, 0, offset, offset + buffer_size); + offset += buffer_size; + + const can_write = socket.write(chunk); if (!can_write) { - socket.once('drain', function () { - sendToSocket(socket, data); - }); - break; + await new Promise(resolve => socket.once('drain', resolve)); + } + + if (offset < data.byteLength) { + const delay_ms = Math.max(1, Math.round((buffer_size * 8) / throttle * 1000)); + await new Promise(resolve => setTimeout(resolve, delay_ms)); } } - if (socket.bytesWritten === expected_data_out || close_socket) { - socket_sessions[socket.id].socket_total_written = socket.bytesWritten; - if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data; - if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer; - if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer; - if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer; - if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers; - if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data; - if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length; - if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown; - socket.setTimeout(minisrv_config.config.socket_timeout * 1000); - if (socket_sessions[socket.id] && socket_sessions[socket.id].close_me) socket.end(); - if (socket_sessions[socket.id].destroy_me) socket.destroy(); - } + + socket_sessions[socket.id].socket_total_written = socket.bytesWritten; + if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data; + if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer; + if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer; + if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer; + if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers; + if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data; + if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length; + if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown; + socket.setTimeout(minisrv_config.config.socket_timeout * 1000); + if (socket_sessions[socket.id] && socket_sessions[socket.id].close_me) socket.end(); + if (socket_sessions[socket.id].destroy_me) socket.destroy(); } function concatArrayBuffer(buffer1, buffer2) {