Allow throttling sendToClient()
This commit is contained in:
@@ -1256,7 +1256,7 @@ minisrv-no-mail-count: true`;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function sendToClient(socket, headers_obj, data = null) {
|
async function sendToClient(socket, headers_obj, data = null, throttle = 0) {
|
||||||
let headers = "";
|
let headers = "";
|
||||||
let content_length = 0;
|
let content_length = 0;
|
||||||
const eol = "\n";
|
const eol = "\n";
|
||||||
@@ -1614,64 +1614,98 @@ async function sendToClient(socket, headers_obj, data = null) {
|
|||||||
let toClient = null;
|
let toClient = null;
|
||||||
if (typeof data === 'string') {
|
if (typeof data === 'string') {
|
||||||
toClient = headers + eol + data;
|
toClient = headers + eol + data;
|
||||||
sendToSocket(socket, Buffer.from(toClient));
|
sendToSocket(socket, Buffer.from(toClient), throttle);
|
||||||
} else if (typeof data === 'object') {
|
} else if (typeof data === 'object') {
|
||||||
let verbosity_mod = (headers_obj["wtv-encrypted"] === 'true') ? " encrypted response" : "";
|
let verbosity_mod = (headers_obj["wtv-encrypted"] === 'true') ? " encrypted response" : "";
|
||||||
if (socket_sessions[socket.id].secure_headers === true) {
|
if (socket_sessions[socket.id].secure_headers === true) {
|
||||||
// encrypt headers
|
// encrypt headers
|
||||||
if (minisrv_config.config.debug_flags.quiet) verbosity_mod += " with encrypted headers";
|
if (minisrv_config.config.debug_flags.quiet) verbosity_mod += " with encrypted headers";
|
||||||
const enc_headers = socket_sessions[socket.id].wtvsec.Encrypt(1, headers + eol);
|
const enc_headers = socket_sessions[socket.id].wtvsec.Encrypt(1, headers + eol);
|
||||||
sendToSocket(socket, new Buffer.from(concatArrayBuffer(enc_headers, data)));
|
sendToSocket(socket, new Buffer.from(concatArrayBuffer(enc_headers, data)), throttle);
|
||||||
} else {
|
} else {
|
||||||
sendToSocket(socket, new Buffer.from(concatArrayBuffer(Buffer.from(headers + eol), data)));
|
sendToSocket(socket, new Buffer.from(concatArrayBuffer(Buffer.from(headers + eol), data)), throttle);
|
||||||
}
|
}
|
||||||
if (minisrv_config.config.debug_flags.quiet) console.debug(" * Sent" + verbosity_mod + " " + headers_obj.Status + " to client (Content-Type:", headers_obj['Content-type'], "~", headers_obj['Content-length'], "bytes)");
|
if (minisrv_config.config.debug_flags.quiet) console.debug(" * Sent" + verbosity_mod + " " + headers_obj.Status + " to client (Content-Type:", headers_obj['Content-type'], "~", headers_obj['Content-length'], "bytes)");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function sendToSocket(socket, data) {
|
async function sendToSocket(socket, data, throttle = 0) {
|
||||||
const chunk_size = 16384;
|
const chunk_size = 16384;
|
||||||
let can_write = true;
|
if (!throttle || throttle <= 0) {
|
||||||
let close_socket = false;
|
let can_write = true;
|
||||||
let expected_data_out = 0;
|
let close_socket = false;
|
||||||
while ((socket.bytesWritten === 0 || socket.bytesWritten !== expected_data_out) && can_write) {
|
let expected_data_out = 0;
|
||||||
if (expected_data_out === 0) expected_data_out = data.byteLength + (socket_sessions[socket.id].socket_total_written || 0);
|
while ((socket.bytesWritten === 0 || socket.bytesWritten !== expected_data_out) && can_write) {
|
||||||
if (socket.bytesWritten === expected_data_out) break;
|
if (expected_data_out === 0) expected_data_out = data.byteLength + (socket_sessions[socket.id].socket_total_written || 0);
|
||||||
|
if (socket.bytesWritten === expected_data_out) break;
|
||||||
|
|
||||||
const data_left = (expected_data_out - socket.bytesWritten);
|
const data_left = (expected_data_out - socket.bytesWritten);
|
||||||
// buffer size = lesser of chunk_size or size remaining
|
// buffer size = lesser of chunk_size or size remaining
|
||||||
const buffer_size = (data_left >= chunk_size) ? chunk_size : data_left;
|
const buffer_size = (data_left >= chunk_size) ? chunk_size : data_left;
|
||||||
if (buffer_size < 0) {
|
if (buffer_size < 0) {
|
||||||
socket.destroy();
|
socket.destroy();
|
||||||
close_socket = true;
|
close_socket = true;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
const offset = (data.byteLength - data_left);
|
||||||
|
const chunk = new Buffer.alloc(buffer_size);
|
||||||
|
data.copy(chunk, 0, offset, (offset + buffer_size));
|
||||||
|
can_write = socket.write(chunk);
|
||||||
|
if (!can_write) {
|
||||||
|
socket.once('drain', function () {
|
||||||
|
sendToSocket(socket, data, throttle);
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
const offset = (data.byteLength - data_left);
|
if (socket.bytesWritten === expected_data_out || close_socket) {
|
||||||
const chunk = new Buffer.alloc(buffer_size);
|
socket_sessions[socket.id].socket_total_written = socket.bytesWritten;
|
||||||
data.copy(chunk, 0, offset, (offset + buffer_size));
|
if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data;
|
||||||
can_write = socket.write(chunk);
|
if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer;
|
||||||
|
if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer;
|
||||||
|
if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer;
|
||||||
|
if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers;
|
||||||
|
if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data;
|
||||||
|
if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length;
|
||||||
|
if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown;
|
||||||
|
socket.setTimeout(minisrv_config.config.socket_timeout * 1000);
|
||||||
|
if (socket_sessions[socket.id] && socket_sessions[socket.id].close_me) socket.end();
|
||||||
|
if (socket_sessions[socket.id].destroy_me) socket.destroy();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let offset = 0;
|
||||||
|
while (offset < data.byteLength) {
|
||||||
|
const buffer_size = Math.min(chunk_size, data.byteLength - offset);
|
||||||
|
const chunk = Buffer.alloc(buffer_size);
|
||||||
|
data.copy(chunk, 0, offset, offset + buffer_size);
|
||||||
|
offset += buffer_size;
|
||||||
|
|
||||||
|
const can_write = socket.write(chunk);
|
||||||
if (!can_write) {
|
if (!can_write) {
|
||||||
socket.once('drain', function () {
|
await new Promise(resolve => socket.once('drain', resolve));
|
||||||
sendToSocket(socket, data);
|
}
|
||||||
});
|
|
||||||
break;
|
if (offset < data.byteLength) {
|
||||||
|
const delay_ms = Math.max(1, Math.round((buffer_size * 8) / throttle * 1000));
|
||||||
|
await new Promise(resolve => setTimeout(resolve, delay_ms));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (socket.bytesWritten === expected_data_out || close_socket) {
|
|
||||||
socket_sessions[socket.id].socket_total_written = socket.bytesWritten;
|
socket_sessions[socket.id].socket_total_written = socket.bytesWritten;
|
||||||
if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data;
|
if (socket_sessions[socket.id].expecting_post_data) delete socket_sessions[socket.id].expecting_post_data;
|
||||||
if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer;
|
if (socket_sessions[socket.id].header_buffer) delete socket_sessions[socket.id].header_buffer;
|
||||||
if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer;
|
if (socket_sessions[socket.id].secure_buffer) delete socket_sessions[socket.id].secure_buffer;
|
||||||
if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer;
|
if (socket_sessions[socket.id].buffer) delete socket_sessions[socket.id].buffer;
|
||||||
if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers;
|
if (socket_sessions[socket.id].headers) delete socket_sessions[socket.id].headers;
|
||||||
if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data;
|
if (socket_sessions[socket.id].post_data) delete socket_sessions[socket.id].post_data;
|
||||||
if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length;
|
if (socket_sessions[socket.id].post_data_length) delete socket_sessions[socket.id].post_data_length;
|
||||||
if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown;
|
if (socket_sessions[socket.id].post_data_percents_shown) delete socket_sessions[socket.id].post_data_percents_shown;
|
||||||
socket.setTimeout(minisrv_config.config.socket_timeout * 1000);
|
socket.setTimeout(minisrv_config.config.socket_timeout * 1000);
|
||||||
if (socket_sessions[socket.id] && socket_sessions[socket.id].close_me) socket.end();
|
if (socket_sessions[socket.id] && socket_sessions[socket.id].close_me) socket.end();
|
||||||
if (socket_sessions[socket.id].destroy_me) socket.destroy();
|
if (socket_sessions[socket.id].destroy_me) socket.destroy();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function concatArrayBuffer(buffer1, buffer2) {
|
function concatArrayBuffer(buffer1, buffer2) {
|
||||||
|
|||||||
Reference in New Issue
Block a user