update: compression system: make asynchronous
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
|
||||
/**
|
||||
* Pure-JS implementation of WebTV's LZPF compression
|
||||
*
|
||||
@@ -7,12 +9,14 @@
|
||||
* By: Eric MacDonald (eMac)
|
||||
*/
|
||||
|
||||
class WTVLzpf {
|
||||
// Note: currentlty doesn't offer streaming support but this is good enough to meet perf demands at the scale we're at.
|
||||
class WTVLzpf extends EventEmitter {
|
||||
// Note: currentlty doesn't offer optimal streaming support but this is good enough to meet perf demands at the scale we're at.
|
||||
|
||||
current_length = 0
|
||||
current_literal = 0
|
||||
compressed_offset = 0
|
||||
total_compressed = 0;
|
||||
chunk_size = 65535;
|
||||
flag_table = new Uint16Array(0x1000)
|
||||
compressed_data = null;
|
||||
|
||||
@@ -261,23 +265,28 @@ class WTVLzpf {
|
||||
/**
|
||||
* Initialize the Lzpf class.
|
||||
*
|
||||
* @param chunk_size {Number} Set the size of the compressed data chunks to fire off with 'data' event. (suggested betweet 2048 and 65535);
|
||||
* @returns {undefined}
|
||||
*/
|
||||
constructor() {
|
||||
constructor(chunk_size = 0) {
|
||||
super(); // for extended class
|
||||
if (chunk_size > 0) this.chunk_size = chunk_size;
|
||||
this.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets starting values for the compression algorithm.
|
||||
*
|
||||
* @param length {Number} size of compressed data Buffer to allocate
|
||||
* @returns {undefined}
|
||||
*/
|
||||
clear() {
|
||||
clear(length = 0) {
|
||||
this.current_length = 0;
|
||||
this.current_literal = 0;
|
||||
this.total_compressed = 0;
|
||||
this.compressed_offset = 0;
|
||||
this.flag_table.fill(0xFFFF, 0, 0x1000);
|
||||
this.compressed_data = null;
|
||||
this.compressed_data = (length > 0) ? null : Buffer.alloc(length);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -288,7 +297,7 @@ class WTVLzpf {
|
||||
*
|
||||
* @returns {undefined}
|
||||
*/
|
||||
EncodeLiteral(code_length, code) {
|
||||
async EncodeLiteral(code_length, code) {
|
||||
// Using >>> to stick with unsigned integers without making a mess with casting.
|
||||
|
||||
this.current_literal |= code >>> (this.current_length & 0x1F);
|
||||
@@ -310,9 +319,8 @@ class WTVLzpf {
|
||||
*
|
||||
* @returns {Buffer} Lzpf compression data
|
||||
*/
|
||||
Compress(uncompressed_data) {
|
||||
async Compress(uncompressed_data) {
|
||||
this.clear();
|
||||
|
||||
if (uncompressed_data.words) {
|
||||
// if its a wordArray convert it to a Buffer
|
||||
|
||||
@@ -326,7 +334,8 @@ class WTVLzpf {
|
||||
}
|
||||
|
||||
var uncompressed_len = uncompressed_data.byteLength;
|
||||
this.compressed_data = new Buffer.alloc(uncompressed_len);
|
||||
this.clear(uncompressed_len);
|
||||
this.compressed_data = new Buffer.alloc((uncompressed_len >= this.chunk_size) ? this.chunk_size : uncompressed_len);
|
||||
|
||||
var i = 0;
|
||||
var sum = 0;
|
||||
@@ -385,6 +394,11 @@ class WTVLzpf {
|
||||
if (code_length > 0) {
|
||||
this.EncodeLiteral(code_length, code);
|
||||
}
|
||||
if (this.compressed_offset == this.chunk_size) {
|
||||
this.total_compressed += this.compressed_offset;
|
||||
this.emit('data', this.compressed_data, this.compressed_offset, (this.total_compressed - this.compressed_offset), false);
|
||||
this.compressed_offset = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Finish up. This would normally be in an Lzpf_Finish method.
|
||||
@@ -415,11 +429,9 @@ class WTVLzpf {
|
||||
this.compressed_offset = this.compressed_data.writeUInt8(this.current_literal >>> 0x18, this.compressed_offset);
|
||||
this.compressed_offset = this.compressed_data.writeUInt8(0x20, this.compressed_offset);
|
||||
|
||||
var output_buffer = new Buffer.alloc(this.compressed_offset);
|
||||
this.compressed_data.copy(output_buffer, 0, 0, this.compressed_offset)
|
||||
this.compressed_data = null;
|
||||
this.total_compressed += this.compressed_offset;
|
||||
|
||||
return output_buffer;
|
||||
this.emit('data', this.compressed_data, this.compressed_offset, (this.total_compressed - this.compressed_offset), this.total_compressed);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -463,6 +463,7 @@ async function sendToClient(socket, headers_obj, data, compress_data = false) {
|
||||
clen = data.byteLength;
|
||||
}
|
||||
|
||||
|
||||
// If wtv-lzpf is in the header then force compression
|
||||
if (headers_obj["wtv-lzpf"]) {
|
||||
compress_data = true;
|
||||
@@ -483,14 +484,28 @@ async function sendToClient(socket, headers_obj, data, compress_data = false) {
|
||||
}
|
||||
|
||||
// compress if needed
|
||||
if (compress_data && clen > 0) {
|
||||
if (compress_data && clen > 0 && !headers_obj['minisrv-already-compressed']) {
|
||||
if (zdebug) console.log(" # Uncompressed data length:", clen);
|
||||
headers_obj["wtv-lzpf"] = 0;
|
||||
var wtvcomp = new WTVLzpf();
|
||||
var uncomp_data = data;
|
||||
data = wtvcomp.Compress(uncomp_data);
|
||||
uncomp_data, wtvcomp = null;
|
||||
var compressed_data = new Buffer.alloc(clen);
|
||||
wtvcomp.on('data', (data, length, offset, complete) => {
|
||||
data.copy(compressed_data, offset, 0, length);
|
||||
if (complete !== false) {
|
||||
data = new Buffer.alloc(complete);
|
||||
compressed_data.copy(data, 0, 0, compressed_data.byteLength);
|
||||
compressed_data, wtvcomp = null;
|
||||
headers_obj['minisrv-already-compressed'] = true;
|
||||
sendToClient(socket, headers_obj, data);
|
||||
}
|
||||
});
|
||||
wtvcomp.Compress(data);
|
||||
return;
|
||||
}
|
||||
|
||||
if (headers_obj['minisrv-already-compressed']) delete headers_obj['minisrv-already-compressed'];
|
||||
|
||||
|
||||
// encrypt if needed
|
||||
if (socket_sessions[socket.id].secure == true) {
|
||||
headers_obj["wtv-encrypted"] = 'true';
|
||||
|
||||
Reference in New Issue
Block a user