Files
minisrv/zefie_wtvp_minisrv/includes/classes/WTVMMS.js
2026-04-26 16:51:31 -04:00

1922 lines
79 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Pure JS implementation of Microsoft MMS (MMST - MMS over TCP) streaming protocol.
// Used by WebTV Plus (LC2+) to stream WMA and ASF audio content over mms:// URLs.
// Port 1755 (standard MMS port). Commands over TCP; data over TCP or UDP.
//
// Protocol overview:
// - Command packets use the 48-byte MMST control header that starts with 0xb00bface.
// - Header/media packets use the 8-byte MMST data preheader:
// seq(4) + packetId(1) + flags(1) + totalLen(2)
// - The server replies with command packets for connect/transport/open/header/start,
// and sends ASF header/media bytes as MMST data packets paced by file bitrate.
'use strict';
const net = require('net');
const dgram = require('dgram');
const fs = require('fs');
const path = require('path');
const MMS_MAGIC = 0xB00BFACE;
const MMS_PROTOCOL = 0x20534D4D; // "MMS " little-endian
const MMS_COMMAND_HEADER_SIZE = 48;
const MMS_DIRECTION_TO_CLIENT = 0x0004;
// MMS command IDs (client -> server)
const MMS_CLIENT_CONNECT = 0x01;
const MMS_CLIENT_TRANSPORT = 0x02;
const MMS_CLIENT_OPEN = 0x05;
const MMS_CLIENT_START = 0x07;
const MMS_CLIENT_STOP = 0x09;
const MMS_CLIENT_CLOSE = 0x0D;
const MMS_CLIENT_HEADER_REQ = 0x15;
const MMS_CLIENT_TIMING_REQ = 0x18;
const MMS_CLIENT_KEEPALIVE = 0x1B;
const MMS_CLIENT_STREAM_SELECT = 0x33;
// MMS command IDs (server -> client)
const MMS_SERVER_CLIENT_ACCEPTED = 0x01;
const MMS_SERVER_PROTOCOL_ACCEPTED = 0x02;
const MMS_SERVER_PROTOCOL_FAILED = 0x03;
const MMS_SERVER_MEDIA_FOLLOWS = 0x05;
const MMS_SERVER_FILE_DETAILS = 0x06;
const MMS_SERVER_HEADER_ACCEPTED = 0x11;
const MMS_SERVER_TIMING_REPLY = 0x15;
const MMS_SERVER_KEEPALIVE = 0x1B;
const MMS_SERVER_STREAM_STOPPED = 0x1E;
const MMS_SERVER_STREAM_ACCEPTED = 0x21;
const MMS_HEADER_PACKET_ID = 0x05;
const MMS_MEDIA_PACKET_ID = 0x04;
const MMS_HEADER_FLAG_MORE = 0x04;
const MMS_HEADER_FLAG_FINAL = 0x0C;
const MMS_DISABLE_PACKET_PAIR = 0xf0f0f0ef
const MMS_USE_PACKET_PAIR = 0xf0f0f0f0
const FILE_ATTRIBUTE_MMS_CANSEEK = 0x01000000;
// Default ASF data packet size (bytes); real files embed this in the ASF header.
// 5001 is the standard Windows Media Encoder default.
const DEFAULT_ASF_PACKET_SIZE = 5001;
// Minimum pacing interval (ms) prevents spinning too fast on low-bitrate files
const MIN_PACING_INTERVAL_MS = 10;
class WTVMMS {
minisrv_config = null;
service_name = null;
service_config = null;
server_config = null;
server = null;
udpServer = null;
udpServerReady = false;
wtvshared = null;
sessions = new Map();
constructor(minisrv_config, service_name, wtvshared) {
this.minisrv_config = minisrv_config;
this.service_name = service_name;
this.service_config = minisrv_config.services[service_name] || {};
this.server_config = this.service_config;
this.wtvshared = wtvshared;
this.version = require(`${wtvshared.path.dirname(require.main.filename)}/package.json`).version.replace(/^v/, '');
this.server = net.createServer((socket) => this.handleConnection(socket));
if (this.service_config.hide_minisrv_version || minisrv_config.config.hide_minisrv_version) {
this.serverName = "minisrv";
this.minorVersion = 1;
this.majorVersion = 0.0;
} else {
this.serverName = `minisrv/${this.version}`;
this.majorVersion = this.version.split('.').slice(0)[0];
this.minorVersion = this.version.split('.').slice(1).join('.');
}
}
listen(port, host = '0.0.0.0') {
this.server.listen(port, host);
const configuredUdpPort = Number(this.server_config?.udp_port);
const udpPort = Number.isFinite(configuredUdpPort) && configuredUdpPort > 0
? Math.floor(configuredUdpPort)
: 1755;
this.udpServer = dgram.createSocket('udp4');
this.udpServerReady = false;
this.udpServer.on('error', (err) => {
console.error('[WTVMMS] udp socket error', err.message);
});
this.udpServer.on('message', (msg, rinfo) => {
this._handleUdpDatagram(msg, rinfo);
});
this.udpServer.bind(udpPort, host, () => {
this.udpServerReady = true;
this.debugLog('udp listen', host + ':' + udpPort);
});
return this.server;
}
close() {
if (this.server) this.server.close();
if (this.udpServer) {
try { this.udpServer.close(); } catch (_) {}
this.udpServer = null;
this.udpServerReady = false;
}
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
getDebugEnabled() {
return this.service_config.debug;
}
debugLog(...args) {
if (this.getDebugEnabled()) console.log('[WTVMMS]', ...args);
}
// Resolve a requested media path across all configured ServiceVaults.
resolveMediaPath(requestedMedia) {
if (!requestedMedia) return null;
const serviceVaultDir = this.service_config.servicevault_dir || this.service_name;
const vaults = this.minisrv_config.config?.ServiceVaults || [];
const variants = this._mediaVariants(requestedMedia);
for (const vault of vaults) {
const base = this.wtvshared.getAbsolutePath(serviceVaultDir, vault);
for (const variant of variants) {
const candidate = this.wtvshared.makeSafePath(base, variant);
if (candidate && fs.existsSync(candidate) && fs.lstatSync(candidate).isFile()) {
this.debugLog('media resolved', variant, '->', candidate);
return candidate;
}
}
}
return null;
}
_mediaVariants(media) {
const p = media.replace(/\\/g, '/').replace(/^\/+/, '');
const ext = path.posix.extname(p).toLowerCase();
const stem = ext ? p.slice(0, -ext.length) : p;
const list = [p];
// Accept both .wma and .asf for the same stem
if (ext === '.wma') list.push(stem + '.asf');
if (ext === '.asf') list.push(stem + '.wma');
if (!ext) { list.push(stem + '.wma'); list.push(stem + '.asf'); }
return [...new Set(list)];
}
// -------------------------------------------------------------------------
// Connection handling
// -------------------------------------------------------------------------
handleConnection(socket) {
socket.setNoDelay(true);
const session = {
id: `${socket.remoteAddress}:${socket.remotePort}`,
socket,
clientEndpointIp: this._normalizeIp(socket.remoteAddress),
rxBuf: Buffer.alloc(0),
mediaPath: null,
streaming: false,
paused: false,
pacingTimer: null,
asfFd: null,
requestedTransport: '',
clientPort: 0,
clientPortCandidates: [],
udpPeerLearnedPort: 0,
tcpDataEnabled: true,
udpDataEnabled: false,
requestedUdp: false,
isCubddTransport: false,
asfPacketSize: DEFAULT_ASF_PACKET_SIZE,
asfDataOffset: 0,
asfFileSize: 0,
asfPacketCount: 0,
asfDurationSec: 0,
bitrateBps: 0,
commandSeq: 0,
openFileId: 1,
clientId: (Math.random() * 0x7FFFFFFF + 1) >>> 0,
openIncarnation: 0,
headerIncarnation: MMS_HEADER_PACKET_ID,
mediaIncarnation: MMS_MEDIA_PACKET_ID,
headerPacketId: MMS_HEADER_PACKET_ID,
mediaPacketId: MMS_MEDIA_PACKET_ID,
readBlockIncarnation: 0,
dataSequence: 0,
lastDataSequenceTx: 0,
dataAfMode: 'auto',
bytesTx: 0,
headerPacketsTx: 0,
mediaPacketsTx: 0,
burstPacketsRemaining: 0,
burstMultiplier: 1,
waitingForDrain: false,
drainHandler: null,
packetIntervalMs: MIN_PACING_INTERVAL_MS,
nextPacketDueAt: 0,
effectivePacketSize: 0,
seekGeneration: 0,
packetQueue: [],
eosTimer: null,
eosPingTimer: null,
lastKeepaliveTxMs: 0,
resendPacketCache: new Map(),
resendPacketOrder: [],
recentPacketSizes: [],
smoothedPacketSize: 0,
pacingSamples: 0,
};
this.sessions.set(socket, session);
this.debugLog('client connected', session.id);
socket.on('data', (data) => {
try {
session.rxBuf = Buffer.concat([session.rxBuf, data]);
this._processIncoming(socket, session);
} catch (e) {
console.error('[WTVMMS] handleData error', e);
}
});
socket.on('close', () => {
this._stopStream(session);
if (session.asfFd !== null) { try { fs.closeSync(session.asfFd); } catch(_){} session.asfFd = null; }
this.sessions.delete(socket);
this.debugLog('client disconnected', session.id, 'tx=' + session.bytesTx);
});
socket.on('error', (err) => {
this._stopStream(session);
if (session.asfFd !== null) { try { fs.closeSync(session.asfFd); } catch(_){} session.asfFd = null; }
this.sessions.delete(socket);
this.debugLog('socket error', session.id, err.message);
});
}
// -------------------------------------------------------------------------
// Incoming packet parser
// -------------------------------------------------------------------------
_processIncoming(socket, session) {
while (session.rxBuf.length > 0) {
const buf = session.rxBuf;
if (buf[0] === 0x4E || buf[0] === 0x47) {
const lineEnd = buf.indexOf('\r\n');
if (lineEnd < 0 && buf.length < 512) break;
const line = buf.slice(0, lineEnd > 0 ? lineEnd + 2 : buf.length).toString('latin1');
this.debugLog('text connect line', session.id, line.trim());
session.rxBuf = buf.slice(lineEnd > 0 ? lineEnd + 2 : buf.length);
continue;
}
if (buf.length < 8) break;
if (buf.length >= 12 && buf.readUInt32LE(4) === MMS_MAGIC) {
const totalLength = buf.readUInt32LE(8) + 16;
if (totalLength < MMS_COMMAND_HEADER_SIZE || totalLength > 1048576) {
this.debugLog('invalid command length, dropping connection', session.id, totalLength);
socket.destroy();
return;
}
if (buf.length < totalLength) break;
const packet = buf.slice(0, totalLength);
session.rxBuf = buf.slice(totalLength);
this._handleCommandPacket(socket, session, packet);
continue;
}
const packetLength = buf.readUInt16LE(6);
if (packetLength >= 8 && buf.length >= packetLength) {
this.debugLog('discarding unexpected client data packet', session.id, 'len=' + packetLength);
session.rxBuf = buf.slice(packetLength);
continue;
}
this.debugLog('bad magic, resyncing', session.id, buf.slice(0, 8).toString('hex'));
session.rxBuf = buf.slice(1);
}
}
// -------------------------------------------------------------------------
// Command packet dispatch
// -------------------------------------------------------------------------
_handleCommandPacket(socket, session, packet) {
if (packet.length < MMS_COMMAND_HEADER_SIZE) return;
const cmdId = packet.readUInt16LE(36);
const direction = packet.readUInt16LE(38);
const prefix1 = packet.readUInt32LE(40);
const prefix2 = packet.readUInt32LE(44);
const payload = packet.slice(MMS_COMMAND_HEADER_SIZE);
this.debugLog(
'rx cmd',
session.id,
'0x' + cmdId.toString(16),
'dir=0x' + direction.toString(16),
'p1=0x' + prefix1.toString(16),
'p2=0x' + prefix2.toString(16),
'len=' + packet.length,
'from=' + socket.remoteAddress + ':' + socket.remotePort
);
switch (cmdId) {
case MMS_CLIENT_CONNECT:
this._handleConnect(socket, session, payload);
break;
case MMS_CLIENT_TRANSPORT:
this._handleTransport(socket, session, payload);
break;
case MMS_CLIENT_OPEN:
session.openIncarnation = prefix1; // playIncarnation from OpenFile
this._handleOpen(socket, session, payload);
break;
case MMS_CLIENT_START:
this._handleStart(socket, session, prefix1, payload);
break;
case MMS_CLIENT_STOP:
this._handleStop(socket, session, prefix2); // prefix2 = stopPlayIncarnation
break;
case MMS_CLIENT_HEADER_REQ:
this._handleHeaderRequest(socket, session, payload);
break;
case MMS_CLIENT_TIMING_REQ:
this._sendCommand(socket, session, MMS_SERVER_TIMING_REPLY, 0, 0xF0F0F0EF, this._buildFunnelInfoPayload(session));
break;
case MMS_CLIENT_KEEPALIVE:
this._handleClientKeepalive(socket, session, prefix1, prefix2);
break;
case MMS_CLIENT_STREAM_SELECT:
this._sendCommand(socket, session, MMS_SERVER_STREAM_ACCEPTED, 0, 0, Buffer.alloc(0));
break;
case MMS_CLIENT_CLOSE:
socket.end();
break;
default:
this.debugLog('unhandled cmd', session.id, '0x' + cmdId.toString(16));
}
}
// -------------------------------------------------------------------------
// Protocol handshake handlers
// -------------------------------------------------------------------------
_handleConnect(socket, session, payload) {
const clientStr = payload.length > 4
? this._decodeUtf16CString(payload.slice(4))
: '';
session.clientPlayer = clientStr;
session.isLegacyClient = /^NSPlayer\/6\./i.test(clientStr);
// Preserve ffplay/VLC/MPC compatibility: keep MMST data flags at 0 unless legacy mode is required.
session.dataAfMode = this._resolveDataAfMode(session);
this.debugLog('connect', session.id, clientStr || '(no player string)');
console.log(' * [MMS]', `[${session.id}]`, 'Client connect:', clientStr || '(unknown player)');
if (session.isLegacyClient) {
this._sendCommandBatch(socket, session, [
{
cmdId: MMS_SERVER_CLIENT_ACCEPTED,
prefix1: 0,
prefix2: 0xF0F0F0EF,
payloadBuf: this._buildConnectAcceptedPayload(session)
}
]);
return;
}
this._sendCommand(socket, session, MMS_SERVER_CLIENT_ACCEPTED, 0, 0xF0F0F0EF, this._buildConnectAcceptedPayload(session));
}
_handleTransport(socket, session, payload) {
// Transport payload has an 8-byte prefix before the UTF-16 string.
// If slice(8) yields a valid UTF-16 string, use it; otherwise try slice(4) as fallback.
const tryBuf8 = payload.length > 8 ? payload.slice(8) : Buffer.alloc(0);
const tryBuf4 = payload.length > 4 ? payload.slice(4) : Buffer.alloc(0);
const probe8 = this._decodeUtf16CString(tryBuf8);
const transportBuf = probe8.length > 0 ? tryBuf8 : tryBuf4;
const transportUtf16 = probe8.length > 0 ? probe8 : this._decodeUtf16CString(tryBuf4);
const transportAnsi = transportBuf.toString('latin1').replace(/\0+/g, '').trim();
const transportStr = this._selectTransportString(transportUtf16, transportAnsi);
const requested = this._parseTransportRequest(transportStr);
this.debugLog('transport parsed', session.id, JSON.stringify(requested));
if (requested) {
session.requestedTransport = requested.protocol || session.requestedTransport;
session.requestedClientIp = requested.ip;
session.clientPort = this._sanitizeUdpPort(requested.port);
session.clientPortCandidates = Array.isArray(requested.ports)
? requested.ports.map((p) => this._sanitizeUdpPort(p)).filter((p) => p > 0)
: (session.clientPort > 0 ? [session.clientPort] : []);
if (session.clientPort > 0 && session.clientPortCandidates.indexOf(session.clientPort) < 0) {
session.clientPortCandidates.unshift(session.clientPort);
}
session.udpPeerLearnedPort = 0;
// Do not trust IP inside transport request; use connected peer IP.
session.clientEndpointIp = this._normalizeIp(socket.remoteAddress);
this.debugLog(
'transport endpoint',
session.id,
'requestedTransport=' + session.requestedTransport,
'requestedIp=' + requested.ip,
'peerIp=' + session.clientEndpointIp,
'port=' + requested.port,
'candidates=' + session.clientPortCandidates.join(',')
);
}
const transportToken = String(session.requestedTransport || '').toUpperCase();
// CUBDD is WebTV's proprietary UDP transport — treat it as UDP-capable but track separately
// so CUBDD-specific hacks don't bleed into generic MMSU/UDP handling.
const isCubdd = transportToken === 'CUBDD'
|| /\bCUBDD\b/i.test(transportStr)
|| /\bCUBDD\b/i.test(transportUtf16)
|| /\bCUBDD\b/i.test(transportAnsi);
const requestedUdp = isCubdd
|| transportToken === 'UDP'
|| transportToken === 'MMSU'
|| /\b(?:UDP|MMSU)\b/i.test(transportStr)
|| /\b(?:UDP|MMSU)\b/i.test(transportUtf16)
|| /\b(?:UDP|MMSU)\b/i.test(transportAnsi);
session.isCubddTransport = isCubdd;
session.requestedUdp = requestedUdp;
this._refreshUdpTransportState(session);
// If client asked for UDP but we can't enable it (e.g. invalid/out-of-range port),
// fall back to TCP so we don't leave both transport flags false.
if (requestedUdp && !session.udpDataEnabled) {
this.debugLog('transport udp fallback to tcp', session.id,
'reason=udpDataEnabled=false clientPort=' + session.clientPort);
session.tcpDataEnabled = true;
} else {
session.tcpDataEnabled = !requestedUdp;
}
this.debugLog('transport', session.id,
'token=' + (transportToken || '(unknown)'),
'tcpDataEnabled=' + session.tcpDataEnabled,
'udpDataEnabled=' + session.udpDataEnabled,
'udpPort=' + (session.clientPort || 0),
'udpPeer=' + (session.clientEndpointIp || '(none)'),
'udpReady=' + this.udpServerReady);
this._sendCommand(socket, session, MMS_SERVER_PROTOCOL_ACCEPTED, 0, 0, this._buildTransportAcceptedPayload());
}
_handleOpen(socket, session, payload) {
let requestedUrl = '';
if (payload.length > 8) {
requestedUrl = payload.slice(8).toString('utf16le').split('\0')[0].trim();
}
this.debugLog('open file', session.id, requestedUrl);
let mediaStem = requestedUrl
.replace(/^mms:\/\/[^/]+/i, '')
.replace(/^\/+/, '');
session.mediaPath = this.resolveMediaPath(mediaStem);
console.log(' * [MMS]', `[${session.id}]`, 'Open:', requestedUrl, '->', session.mediaPath || '(not found)');
if (!session.mediaPath) {
this._sendCommand(socket, session, MMS_SERVER_FILE_DETAILS, 0x80070002, session.openIncarnation, Buffer.alloc(0));
socket.end();
return;
}
this._parseAsfHeader(session);
this._sendCommand(socket, session, MMS_SERVER_FILE_DETAILS, 0, session.openIncarnation, this._buildFileDetailsPayload(session));
}
_handleHeaderRequest(socket, session, payload) {
if (!session.mediaPath) return;
let requestedInc = 0;
// _handleCommandPacket passes the reduced ReadBlock payload that starts at offset,
// so playIncarnation is 32 bytes into that buffer.
if (payload.length >= 36) {
requestedInc = payload.readUInt32LE(32) >>> 0;
}
session.readBlockIncarnation = requestedInc;
// Echo the requested playIncarnation for ReportReadBlock/header data packets.
// If missing/invalid, retain previous value or fall back to legacy packet id.
if (requestedInc >= 1 && requestedInc <= 0xFE) {
session.headerIncarnation = requestedInc;
} else if (!session.headerIncarnation) {
session.headerIncarnation = MMS_HEADER_PACKET_ID;
}
session.headerPacketId = (session.headerIncarnation & 0xFF) || MMS_HEADER_PACKET_ID;
session.headerPacketsTx = 0;
this.debugLog(
'header request',
session.id,
'requestedInc=0x' + requestedInc.toString(16),
'headerIncarnation=0x' + session.headerIncarnation.toString(16),
'headerPacketId=0x' + session.headerPacketId.toString(16)
);
// ReportReadBlock: hr=0, playIncarnation=from ReadBlock, playSequence=0
const readBlockPayload = Buffer.alloc(4);
readBlockPayload.writeUInt32LE(0, 0); // playSequence
this._sendCommand(socket, session, MMS_SERVER_HEADER_ACCEPTED, 0, session.headerIncarnation, readBlockPayload);
this._refreshUdpTransportState(session);
if (!session.tcpDataEnabled && !session.udpDataEnabled) {
this.debugLog('no data transport available', session.id, 'closing connection');
socket.end();
return;
}
const asfHeaderBuf = this._readAsfHeader(session);
if (asfHeaderBuf && asfHeaderBuf.length > 0) {
this._sendHeaderPackets(socket, session, asfHeaderBuf);
}
}
_handleStart(socket, session, openFileId, payload) {
if (!session.mediaPath) {
this.debugLog('start requested but no media path', session.id);
return;
}
const startReq = this._parseStartPlaying(openFileId, payload);
if (startReq === null) {
this.debugLog('start payload too short', session.id, 'len=' + payload.length);
return;
}
this._clearPendingEos(session);
session.mediaIncarnation = startReq.playIncarnation >>> 0;
session.mediaPacketId = this._resolveMediaPacketId(session);
const seek = this._resolveStartSeek(session, startReq);
if (seek) {
session._nextReadPos = seek.readPos;
session._nextPacketNumber = seek.packetNumber;
}
// Increment generation counter to invalidate pending packets from old seek
session.seekGeneration = (session.seekGeneration + 1) >>> 0;
session.packetQueue = [];
if (session.streaming) {
this._stopStream(session);
}
// Reset Data packet AFFlags sequence for each new StartPlaying sequence.
session.dataSequence = 0;
this.debugLog(
'start stream',
session.id,
'mediaIncarnation=0x' + session.mediaIncarnation.toString(16),
'position=' + (startReq.position === null ? 'unspecified' : startReq.position),
'asfOffset=' + startReq.asfOffset,
'locationId=' + startReq.locationId,
seek ? ('seekPacket=' + seek.packetNumber) : 'seekPacket=0'
);
// ReportStartedPlaying: hr=0, playIncarnation=from StartPlaying,
// tigerFileId=openFileId, unused1=0, unused2=12 bytes zeros
session.mediaPacketsTx = seek ? seek.packetNumber : 0;
const startedPayload = Buffer.alloc(20);
startedPayload.writeUInt32LE(session.openFileId >>> 0, 0); // tigerFileId
// unused1 [4] = 0, unused2 [8-19] = 0 (already zeroed)
this._sendCommand(socket, session, MMS_SERVER_MEDIA_FOLLOWS, 0, session.mediaIncarnation, startedPayload);
this._refreshUdpTransportState(session);
if (!session.tcpDataEnabled && !session.udpDataEnabled) {
this.debugLog('no data transport available', session.id, 'cannot start media');
return;
}
this._startStream(socket, session);
}
_handleStop(socket, session, stopIncarnation) {
this.debugLog('stop stream', session.id);
this._stopStream(session);
// Echo back the playIncarnation from StopPlaying (prefix2 of that message)
const inc = stopIncarnation >>> 0;
this._sendCommand(socket, session, MMS_SERVER_STREAM_STOPPED, 0, inc, Buffer.alloc(0));
}
// -------------------------------------------------------------------------
// ASF header parsing (minimal just packet size and bitrate)
// -------------------------------------------------------------------------
_parseAsfHeader(session) {
if (!session.mediaPath) return;
try {
const stat = fs.statSync(session.mediaPath);
session.asfFileSize = stat.size;
// Read first 64 KB to find ASF header objects
const readLen = Math.min(65536, stat.size);
const buf = Buffer.alloc(readLen);
const fd = fs.openSync(session.mediaPath, 'r');
fs.readSync(fd, buf, 0, readLen, 0);
fs.closeSync(fd);
// ASF Header Object GUID: {75B22630-668E-11CF-A6D9-00AA0062CE6C}
const ASF_HEADER_GUID = Buffer.from([
0x30, 0x26, 0xB2, 0x75, 0x8E, 0x66, 0xCF, 0x11,
0xA6, 0xD9, 0x00, 0xAA, 0x00, 0x62, 0xCE, 0x6C
]);
if (!buf.slice(0, 16).equals(ASF_HEADER_GUID)) {
this.debugLog('not a valid ASF file (bad header GUID)', session.id);
session.asfDataOffset = 0;
return;
}
// ASF Header Object: GUID(16) + Size(8) + NumHeaders(4) + Reserved(2)
const headerSize = Number(buf.readBigUInt64LE(16)); // total header size including GUID+size fields
// Walk sub-objects inside the header
// ASF File Properties Object GUID: {8CABDCA1-A947-11CF-8EE4-00C00C205365}
const ASF_FILE_PROPS_GUID = Buffer.from([
0xA1, 0xDC, 0xAB, 0x8C, 0x47, 0xA9, 0xCF, 0x11,
0x8E, 0xE4, 0x00, 0xC0, 0x0C, 0x20, 0x53, 0x65
]);
// ASF Stream Properties Object GUID: {B7DC0791-A9B7-11CF-8EE6-00C00C205365}
const ASF_STREAM_PROPS_GUID = Buffer.from([
0x91, 0x07, 0xDC, 0xB7, 0xB7, 0xA9, 0xCF, 0x11,
0x8E, 0xE6, 0x00, 0xC0, 0x0C, 0x20, 0x53, 0x65
]);
// ASF Data Object GUID: {75B22636-668E-11CF-A6D9-00AA0062CE6C}
const ASF_DATA_GUID = Buffer.from([
0x36, 0x26, 0xB2, 0x75, 0x8E, 0x66, 0xCF, 0x11,
0xA6, 0xD9, 0x00, 0xAA, 0x00, 0x62, 0xCE, 0x6C
]);
let offset = 30; // skip Header Object's GUID(16)+Size(8)+NumHeaders(4)+Reserved(2)
let maxBitrate = 0;
let packetSize = 0;
let dataPacketCount = 0;
let durationSec = 0;
while (offset + 24 <= readLen) {
const objGuid = buf.slice(offset, offset + 16);
const objSize = Number(buf.readBigUInt64LE(offset + 16));
if (objSize < 24) break;
if (objGuid.equals(ASF_FILE_PROPS_GUID) && objSize >= 104) {
// ASF File Properties layout after the 24-byte object header:
// FileId(16), FileSize(8), CreationDate(8), DataPacketsCount(8),
// PlayDuration(8), SendDuration(8), Preroll(8), Flags(4),
// MinDataPacketSize(4), MaxDataPacketSize(4), MaxBitrate(4)
const playDuration100ns = Number(buf.readBigUInt64LE(offset + 64));
const prerollMs = Number(buf.readBigUInt64LE(offset + 80));
dataPacketCount = Number(buf.readBigUInt64LE(offset + 56));
packetSize = buf.readUInt32LE(offset + 96);
maxBitrate = buf.readUInt32LE(offset + 100);
const rawDurationSec = playDuration100ns > 0 ? (playDuration100ns / 10000000.0) : 0;
durationSec = Math.max(0, rawDurationSec - (prerollMs / 1000.0));
}
if (objGuid.equals(ASF_DATA_GUID)) {
session.asfDataOffset = offset;
break;
}
offset += objSize;
}
// If we didn't hit the Data GUID in the read window, the data starts right after the header
if (session.asfDataOffset === 0 && headerSize > 0 && headerSize < session.asfFileSize) {
session.asfDataOffset = headerSize;
}
if (packetSize > 0) session.asfPacketSize = packetSize;
if (maxBitrate > 0) session.bitrateBps = maxBitrate;
if (dataPacketCount > 0) session.asfPacketCount = dataPacketCount;
if (durationSec > 0) session.asfDurationSec = durationSec;
this.debugLog('ASF parsed', session.id,
'packetSize=' + session.asfPacketSize,
'bitrate=' + session.bitrateBps + 'bps',
'packetCount=' + session.asfPacketCount,
'durationSec=' + session.asfDurationSec,
'dataOffset=0x' + session.asfDataOffset.toString(16));
} catch (e) {
console.error('[WTVMMS] ASF parse error', session.id, e.message);
session.asfDataOffset = 0;
}
}
// Read the raw ASF header bytes (everything before the first Data Object)
_readAsfHeader(session) {
if (!session.mediaPath || session.asfDataOffset === 0) return null;
try {
const headerLength = Math.min(session.asfFileSize, session.asfDataOffset + 50);
const buf = Buffer.alloc(headerLength);
const fd = fs.openSync(session.mediaPath, 'r');
fs.readSync(fd, buf, 0, headerLength, 0);
fs.closeSync(fd);
return buf;
} catch (e) {
console.error('[WTVMMS] ASF header read error', e.message);
return null;
}
}
// -------------------------------------------------------------------------
// Paced streaming
// -------------------------------------------------------------------------
_computePacketIntervalMs(session) {
let intervalMs = MIN_PACING_INTERVAL_MS;
const packetSizeForPacing = (session.effectivePacketSize > 0)
? session.effectivePacketSize
: session.asfPacketSize;
if (session.bitrateBps > 0 && packetSizeForPacing > 0) {
intervalMs = Math.max(
MIN_PACING_INTERVAL_MS,
(packetSizeForPacing * 8 / session.bitrateBps) * 1000
);
}
const defaultMultiplier = (session.isLegacyClient && !session.udpDataEnabled) ? 0.94 : 1.0;
const configuredMultiplier = Number(this.service_config.pacing_multiplier);
const pacingMultiplier = Number.isFinite(configuredMultiplier) && configuredMultiplier > 0
? configuredMultiplier
: defaultMultiplier;
return Math.max(MIN_PACING_INTERVAL_MS, Math.floor(intervalMs * pacingMultiplier));
}
_startStream(socket, session) {
session.streaming = true;
session.paused = false;
session.waitingForDrain = false;
session.effectivePacketSize = 0;
session.recentPacketSizes = [];
session.smoothedPacketSize = 0;
session.pacingSamples = 0;
session.resendPacketCache.clear();
session.resendPacketOrder = [];
session.streamStartWallMs = Date.now();
session.streamStartPacketIndex = session.mediaPacketsTx || 0;
const intervalMs = this._computePacketIntervalMs(session);
session.packetIntervalMs = intervalMs;
session.nextPacketDueAt = Date.now();
let burstPrestartMs = typeof this.service_config.burst_prestart_ms === 'number'
? this.service_config.burst_prestart_ms
: 0;
let burstMultiplier = typeof this.service_config.burst_multiplier === 'number'
? this.service_config.burst_multiplier
: 1;
const burstEnabled = this.service_config.enable_burst === true;
if (!burstEnabled) {
burstPrestartMs = 0;
burstMultiplier = 1;
}
const burstPacketCount = burstPrestartMs > 0 ? Math.ceil(burstPrestartMs / intervalMs) : 0;
session.burstPacketsRemaining = Math.max(0, burstPacketCount - 1);
session.burstMultiplier = burstMultiplier > 1 ? burstMultiplier : 1;
this.debugLog('stream pacing', session.id,
'interval=' + intervalMs + 'ms',
'burstPackets=' + session.burstPacketsRemaining,
'burstMultiplier=' + session.burstMultiplier);
try {
session.asfFd = fs.openSync(session.mediaPath, 'r');
} catch (e) {
console.error('[WTVMMS] failed to open media for streaming', e.message);
session.streaming = false;
return;
}
const dataStart = session.asfDataOffset + 50;
if (typeof session._nextReadPos === 'number' && Number.isFinite(session._nextReadPos)) {
session._readPos = Math.max(dataStart, Math.min(session._nextReadPos, session.asfFileSize));
} else {
// Position read head at the start of the ASF Data Object payload.
// The Data Object itself has a 50-byte preamble before the first packet:
// GUID(16) + Size(8) + FileId(16) + TotalDataPackets(8) + Reserved(2) = 50 bytes
session._readPos = dataStart;
}
delete session._nextReadPos;
delete session._nextPacketNumber;
// Record which audio timestamp (ms) we started streaming from.
// Used by _estimateEosDelayMs to know how much audio remains.
const startByteOffset = Math.max(0, session._readPos - dataStart);
const startPacketIdx = session.asfPacketSize > 0
? Math.floor(startByteOffset / session.asfPacketSize)
: 0;
session.streamStartAudioMs = (session.asfPacketCount > 0 && session.asfDurationSec > 0)
? Math.floor((startPacketIdx / session.asfPacketCount) * session.asfDurationSec * 1000)
: 0;
this._sendNextPacket(socket, session);
}
_stopStream(session) {
session.streaming = false;
if (session.pacingTimer) {
clearTimeout(session.pacingTimer);
session.pacingTimer = null;
}
if (session.drainHandler && session.socket && !session.socket.destroyed) {
session.socket.off('drain', session.drainHandler);
}
session.drainHandler = null;
session.waitingForDrain = false;
if (session.asfFd !== null) {
try { fs.closeSync(session.asfFd); } catch(_){}
session.asfFd = null;
}
session.packetQueue = [];
session.resendPacketCache.clear();
session.resendPacketOrder = [];
this._clearPendingEos(session);
}
_clearPendingEos(session) {
if (session.eosTimer) {
clearTimeout(session.eosTimer);
session.eosTimer = null;
}
if (session.eosPingTimer) {
clearInterval(session.eosPingTimer);
session.eosPingTimer = null;
}
}
_estimateEosDelayMs(socket, session) {
if (!session.isLegacyClient) {
return 0;
}
const cfgTail = Number(this.service_config.webtv_eos_tail_ms);
const cfgMin = Number(this.service_config.webtv_eos_min_delay_ms);
const cfgMax = Number(this.service_config.webtv_eos_max_delay_ms);
const tailMs = Number.isFinite(cfgTail) ? Math.max(0, Math.floor(cfgTail)) : 5000;
const minMs = Number.isFinite(cfgMin) ? Math.max(0, Math.floor(cfgMin)) : 600;
const totalDurationMs = Math.floor((session.asfDurationSec || 0) * 1000);
// Default maxMs to the full file duration + tail + 3s safety headroom.
// This ensures we never clamp a long file to an arbitrary fixed ceiling.
const defaultMaxMs = Math.max(10000, totalDurationMs + tailMs + 3000);
const maxMs = Number.isFinite(cfgMax) ? Math.max(minMs, Math.floor(cfgMax)) : defaultMaxMs;
// The client plays audio in real-time at 1x speed starting from wherever
// the stream began (seeked or from the top). After wallElapsedMs of real
// time the client has consumed that many ms of audio from the seek point.
// Remaining = (total duration from seek point) - wallElapsed.
const seekOffsetMs = session.streamStartAudioMs || 0;
const streamDurationMs = totalDurationMs - seekOffsetMs; // audio from seek → end
const wallElapsedMs = session.streamStartWallMs
? (Date.now() - session.streamStartWallMs)
: 0;
const remainingMs = Math.max(0, streamDurationMs - wallElapsedMs);
const delayMs = Math.max(minMs, Math.min(maxMs, remainingMs + tailMs));
this.debugLog('eos delay calc', session.id,
'totalDurationMs=' + totalDurationMs,
'seekOffsetMs=' + seekOffsetMs,
'wallElapsedMs=' + wallElapsedMs,
'remainingMs=' + remainingMs,
'delayMs=' + delayMs);
return delayMs;
}
_waitForDrain(socket, session) {
if (session.waitingForDrain || socket.destroyed) return;
session.waitingForDrain = true;
session.drainHandler = () => {
session.waitingForDrain = false;
session.drainHandler = null;
if (!session.streaming || socket.destroyed) return;
this.debugLog('socket drain', session.id, 'writableLength=' + socket.writableLength);
this._scheduleNextPacket(session, socket);
};
socket.once('drain', session.drainHandler);
}
_scheduleNextPacket(session, socket) {
if (!session.streaming || session.paused) return;
const writeOk = this._flushQueuedPackets(socket, session);
if (!writeOk) {
this._waitForDrain(socket, session);
return;
}
let intervalMs = session.packetIntervalMs || this._computePacketIntervalMs(session);
if (session.burstPacketsRemaining > 0 && session.burstMultiplier > 1) {
intervalMs = Math.max(
MIN_PACING_INTERVAL_MS,
Math.floor(intervalMs / session.burstMultiplier)
);
session.burstPacketsRemaining--;
}
const now = Date.now();
session.nextPacketDueAt = (session.nextPacketDueAt || now) + intervalMs;
const delayMs = Math.max(MIN_PACING_INTERVAL_MS, session.nextPacketDueAt - now);
if (session.mediaPacketsTx < 4 || (session.mediaPacketsTx % 64) === 0) {
this.debugLog('pace next', session.id,
'in=' + delayMs + 'ms',
'packet=' + session.mediaPacketsTx,
'readPos=0x' + session._readPos.toString(16));
}
session.pacingTimer = setTimeout(() => {
session.pacingTimer = null;
this._sendNextPacket(socket, session);
}, delayMs);
}
_sendNextPacket(socket, session) {
if (!session.streaming || socket.destroyed) {
this._stopStream(session);
return;
}
if (session.waitingForDrain) {
return;
}
const pktSize = session.asfPacketSize;
const buf = Buffer.alloc(pktSize);
let bytesRead = 0;
try {
bytesRead = fs.readSync(session.asfFd, buf, 0, pktSize, session._readPos);
} catch (e) {
this.debugLog('read error', session.id, e.message);
this._endStream(socket, session);
return;
}
if (bytesRead === 0) {
// End of file
this._endStream(socket, session);
return;
}
session._readPos += bytesRead;
const packetBuf = bytesRead < pktSize ? buf.slice(0, bytesRead) : buf;
const strippedBuf = this._stripAsfPacketPadding(packetBuf, session);
const txSize = strippedBuf.length;
// Track transmitted packet sizes for adaptive pacing
if (txSize > 0) {
session.recentPacketSizes.push(txSize);
if (session.recentPacketSizes.length > 20) {
session.recentPacketSizes.shift();
}
// Compute smoothed (median) packet size from recent samples
const sorted = [...session.recentPacketSizes].sort((a, b) => a - b);
session.smoothedPacketSize = sorted[Math.floor(sorted.length / 2)];
session.pacingSamples = session.recentPacketSizes.length;
// Only adjust pacing interval if the transmitted size differs significantly from expected
// (e.g., due to padding stripping). Use smoothed size to avoid jitter.
const paceSize = session.effectivePacketSize > 0 ? session.effectivePacketSize : session.asfPacketSize;
if (Math.abs(txSize - paceSize) > 10) {
session.effectivePacketSize = txSize;
const oldInterval = session.packetIntervalMs;
session.packetIntervalMs = this._computePacketIntervalMs(session);
if (session.mediaPacketsTx <= 4 || (session.mediaPacketsTx % 64) === 0) {
this.debugLog('pace adjust', session.id,
'txSize=' + txSize,
'old=' + oldInterval + 'ms',
'new=' + session.packetIntervalMs + 'ms',
'smoothed=' + session.smoothedPacketSize);
}
session.nextPacketDueAt = Date.now();
}
}
const afFlags = this._nextDataFlags(session);
const writeOk = this._sendDataPacket(socket, session, session.mediaPacketId, afFlags, strippedBuf, false);
if (!writeOk) {
this._waitForDrain(socket, session);
return;
}
this._scheduleNextPacket(session, socket);
}
_endStream(socket, session) {
const eosDelayMs = this._estimateEosDelayMs(socket, session);
this.debugLog('end of stream', session.id, 'eosDelayMs=' + eosDelayMs);
this._stopStream(session);
if (eosDelayMs <= 0) {
this._sendCommand(socket, session, MMS_SERVER_STREAM_STOPPED, 0, session.mediaIncarnation || 0, Buffer.alloc(0));
return;
}
const cfgPingIntervalMs = Number(this.service_config.webtv_eos_ping_interval_ms);
const eosPingIntervalMs = Number.isFinite(cfgPingIntervalMs)
? Math.max(0, Math.floor(cfgPingIntervalMs))
: 3000;
if (eosPingIntervalMs > 0) {
session.eosPingTimer = setInterval(() => {
if (socket.destroyed || session.streaming || !session.eosTimer) {
if (session.eosPingTimer) {
clearInterval(session.eosPingTimer);
session.eosPingTimer = null;
}
return;
}
// LinkMacToViewerPing (MID=0x0004001B) over MMST command channel.
this._sendCommand(socket, session, MMS_SERVER_KEEPALIVE, 0, 0, Buffer.alloc(0));
this.debugLog('eos keepalive ping', session.id, 'intervalMs=' + eosPingIntervalMs);
}, eosPingIntervalMs);
}
session.eosTimer = setTimeout(() => {
session.eosTimer = null;
if (session.eosPingTimer) {
clearInterval(session.eosPingTimer);
session.eosPingTimer = null;
}
if (!socket.destroyed && !session.streaming) {
this._sendCommand(socket, session, MMS_SERVER_STREAM_STOPPED, 0, session.mediaIncarnation || 0, Buffer.alloc(0));
}
}, eosDelayMs);
}
// Parse one fixed-size ASF data packet, remove trailing padding, and return
// the compact payload. Per MMS spec §3.2.5.11.1, Padding Data SHOULD be
// stripped before MMST encapsulation and the Padding Length field MUST be
// updated to reflect the removal.
_stripAsfPacketPadding(buf, session) {
if (!buf || buf.length < 6) return buf;
// Locate Length Type Flags byte, skipping optional error correction header.
let ltFlagsOffset = 0;
const byte0 = buf[0];
if (byte0 & 0x80) {
// Error Correction Present (bit 7 = 1).
// Bits 5-6 = EC Length Type; 00 means bits 0-3 carry the data length.
const ecLenType = (byte0 >> 5) & 0x03;
const ecDataLen = ecLenType === 0 ? (byte0 & 0x0F) : 0;
ltFlagsOffset = 1 + ecDataLen;
}
if (ltFlagsOffset + 2 > buf.length) return buf;
const ltFlags = buf[ltFlagsOffset];
const padLenType = (ltFlags >> 3) & 0x03; // bits 3-4
const seqType = (ltFlags >> 1) & 0x03; // bits 1-2
const pktLenType = (ltFlags >> 5) & 0x03; // bits 5-6
if (padLenType === 0) return buf; // no Padding Length field → nothing to strip
// Byte widths for each type value (0=absent,1=BYTE,2=WORD,3=DWORD)
const typeSize = [0, 1, 2, 4];
// After LTFlags(1) + PropFlags(1) come the optional variable-width fields.
let fieldOffset = ltFlagsOffset + 2;
fieldOffset += typeSize[pktLenType]; // optional Packet Length
fieldOffset += typeSize[seqType]; // optional Sequence
const padLenSize = typeSize[padLenType];
if (fieldOffset + padLenSize > buf.length) return buf;
// Read the declared padding length.
let paddingLen;
if (padLenSize === 1) paddingLen = buf.readUInt8(fieldOffset);
else if (padLenSize === 2) paddingLen = buf.readUInt16LE(fieldOffset);
else paddingLen = buf.readUInt32LE(fieldOffset);
if (paddingLen === 0) return buf; // already no padding
const configuredMinStrip = Number(this.service_config.asf_padding_strip_min_bytes);
const minStripBytes = Number.isFinite(configuredMinStrip) && configuredMinStrip >= 0
? Math.floor(configuredMinStrip)
: 8;
if (paddingLen < minStripBytes) {
return buf;
}
const newLen = buf.length - paddingLen;
// Sanity: compacted packet must still hold the header fields we parsed.
if (newLen < fieldOffset + padLenSize + 6) {
this.debugLog('asf strip sanity fail', session && session.id,
'bufLen=' + buf.length, 'paddingLen=' + paddingLen);
return buf;
}
// Copy the packet, zero the Padding Length field, and trim trailing padding.
const out = Buffer.from(buf.buffer, buf.byteOffset, buf.length);
if (padLenSize === 1) out.writeUInt8(0, fieldOffset);
else if (padLenSize === 2) out.writeUInt16LE(0, fieldOffset);
else out.writeUInt32LE(0, fieldOffset);
return out.slice(0, newLen);
}
// -------------------------------------------------------------------------
// Packet builders
// -------------------------------------------------------------------------
_buildCommandPacket(session, cmdId, prefix1, prefix2, payloadBuf) {
const payload = payloadBuf || Buffer.alloc(0);
const paddedPayloadLength = (payload.length + 7) & ~7;
const payloadBlocks = paddedPayloadLength >> 3;
const totalLength = MMS_COMMAND_HEADER_SIZE + paddedPayloadLength;
const packet = Buffer.alloc(totalLength);
packet.writeUInt32LE(1, 0);
packet.writeUInt32LE(MMS_MAGIC, 4);
packet.writeUInt32LE(totalLength - 16, 8);
packet.writeUInt32LE(MMS_PROTOCOL, 12);
packet.writeUInt32LE(payloadBlocks + 6, 16);
packet.writeUInt16LE(session.commandSeq++ & 0xFFFF, 20);
packet.writeUInt16LE(0, 22);
packet.writeUInt32LE(0, 24);
packet.writeUInt32LE(0, 28);
packet.writeUInt32LE(payloadBlocks + 2, 32);
packet.writeUInt32LE((MMS_DIRECTION_TO_CLIENT << 16) | (cmdId & 0xffff), 36);
packet.writeUInt32LE(prefix1 >>> 0, 40);
packet.writeUInt32LE(prefix2 >>> 0, 44);
payload.copy(packet, MMS_COMMAND_HEADER_SIZE);
return packet;
}
_sendCommand(socket, session, cmdId, prefix1, prefix2, payloadBuf) {
if (socket.destroyed) return;
const pkt = this._buildCommandPacket(session, cmdId, prefix1, prefix2, payloadBuf);
this.debugLog('tx cmd', session.id, '0x' + cmdId.toString(16), 'len=' + pkt.length), 'to='+socket.remoteAddress+":"+socket.remotePort;
socket.write(pkt);
session.bytesTx += pkt.length;
if (cmdId === MMS_SERVER_KEEPALIVE) {
session.lastKeepaliveTxMs = Date.now();
}
}
_handleClientKeepalive(socket, session, prefix1, prefix2) {
// During delayed EOS we already transmit periodic keepalives; if we also
// answer every client keepalive immediately, both sides can get into a
// noisy ping-pong loop. Throttle replies while EOS wait is active.
if (session.eosTimer) {
const cfgPingIntervalMs = Number(this.service_config.webtv_eos_ping_interval_ms);
const eosPingIntervalMs = Number.isFinite(cfgPingIntervalMs)
? Math.max(1000, Math.floor(cfgPingIntervalMs))
: 3000;
const minReplyGapMs = Math.max(1000, eosPingIntervalMs - 500);
const now = Date.now();
const sinceLastTx = now - (session.lastKeepaliveTxMs || 0);
if (sinceLastTx < minReplyGapMs) {
this.debugLog('keepalive throttled', session.id,
'sinceLastTxMs=' + sinceLastTx,
'minReplyGapMs=' + minReplyGapMs,
'p1=0x' + (prefix1 >>> 0).toString(16),
'p2=0x' + (prefix2 >>> 0).toString(16));
return;
}
}
this._sendCommand(socket, session, MMS_SERVER_KEEPALIVE, 0, 0, Buffer.alloc(0));
}
_sendCommandBatch(socket, session, commands) {
if (socket.destroyed || !Array.isArray(commands) || commands.length === 0) return;
const packets = [];
for (const command of commands) {
const pkt = this._buildCommandPacket(
session,
command.cmdId,
command.prefix1,
command.prefix2,
command.payloadBuf
);
this.debugLog('tx cmd', session.id, '0x' + command.cmdId.toString(16), 'len=' + pkt.length);
packets.push(pkt);
session.bytesTx += pkt.length;
}
socket.write(Buffer.concat(packets));
}
_sendHeaderPackets(socket, session, headerBuf) {
const configuredChunkSize = Number(this.service_config.header_chunk_size);
const maxPayload = Math.max(
256,
Math.min(
0xFFFF - 8,
Number.isFinite(configuredChunkSize) && configuredChunkSize > 0 ? Math.floor(configuredChunkSize) : 1400
)
);
let offset = 0;
let count = 0;
while (offset < headerBuf.length && !socket.destroyed) {
const remaining = headerBuf.length - offset;
const chunkLength = Math.min(maxPayload, remaining);
const flags = offset + chunkLength < headerBuf.length ? MMS_HEADER_FLAG_MORE : MMS_HEADER_FLAG_FINAL;
this._sendDataPacket(socket, session, session.headerPacketId, flags, headerBuf.slice(offset, offset + chunkLength), true);
offset += chunkLength;
count++;
}
this.debugLog('header sent', session.id,
'packets=' + count,
'bytes=' + headerBuf.length,
'headerPacketId=0x' + session.headerPacketId.toString(16));
}
_sendDataPacket(socket, session, packetId, flags, payloadBuf, isHeaderPacket = false) {
if (socket.destroyed) return false;
const payload = payloadBuf || Buffer.alloc(0);
const totalLength = 8 + payload.length;
const pkt = Buffer.alloc(totalLength);
// LocationId: independent per-type sequence starting at 0.
// For header payloads: 0, 1, 2, ... per header stream.
// For data payloads: ASF data packet number starting at 0.
const isHeader = isHeaderPacket;
const locationId = isHeader ? session.headerPacketsTx : session.mediaPacketsTx;
pkt.writeUInt32LE(locationId, 0);
pkt.writeUInt8(packetId & 0xff, 4);
pkt.writeUInt8(flags & 0xff, 5);
pkt.writeUInt16LE(totalLength, 6);
payload.copy(pkt, 8);
if (isHeader) session.headerPacketsTx++;
else if (packetId === session.mediaPacketId) session.mediaPacketsTx++;
if (!isHeader && packetId === session.mediaPacketId) {
const resendSeq = session.dataAfMode === 'sequence'
? (session.lastDataSequenceTx >>> 0)
: (locationId >>> 0);
this._cacheResendPacket(session, resendSeq, pkt);
}
const shouldLog = isHeader
? true
: session.mediaPacketsTx <= 4 || (session.mediaPacketsTx % 64) === 0;
if (shouldLog) {
const previewLength = Math.min(8, payload.length);
const preview = previewLength > 0 ? payload.subarray(0, previewLength).toString('hex') : '';
this.debugLog('tx data',
(session.udpDataEnabled ? 'UDP' : (session.tcpDataEnabled) ? 'TCP' : '???'),
'locationId=' + locationId,
'packetId=0x' + packetId.toString(16),
'flags=0x' + flags.toString(16),
'packetSizeField=' + totalLength,
'payload=' + payload.length,
preview ? 'preview=' + preview : '');
}
if (session.udpDataEnabled) {
this._sendUdpDataPacket(session, pkt, packetId, locationId, flags, payload.length);
return true;
}
if (isHeader) {
const writeOk = socket.write(pkt);
if (!writeOk && shouldLog) {
this.debugLog('socket buffered', session.id,
'packetId=0x' + packetId.toString(16),
'writableLength=' + socket.writableLength);
}
session.bytesTx += pkt.length;
return writeOk;
}
session.packetQueue.push({
buffer: pkt,
generation: session.seekGeneration,
});
return true;
}
_sendUdpDataPacket(session, packetBuf, packetId, locationId, flags, payloadLen) {
if (!this.udpServer || !session || !session.clientEndpointIp || !session.clientPort) {
this.debugLog('udp send skipped', session && session.id,
'packetId=0x' + packetId.toString(16),
'peer=' + ((session && session.clientEndpointIp) || '(none)'),
'port=' + ((session && session.clientPort) || 0));
return;
}
const configuredProbePackets = Number(this.service_config.udp_probe_alt_ports_packets);
const probeAltPortsPackets = Number.isFinite(configuredProbePackets)
? Math.max(0, Math.floor(configuredProbePackets))
: 96;
const primaryPort = session.clientPort >>> 0;
const targetPorts = [primaryPort];
if (!session.udpPeerLearnedPort && probeAltPortsPackets > 0 && session.mediaPacketsTx <= probeAltPortsPackets) {
const alternates = Array.isArray(session.clientPortCandidates)
? session.clientPortCandidates
: [];
for (const candidate of alternates) {
const p = this._sanitizeUdpPort(candidate);
if (p > 0 && targetPorts.indexOf(p) < 0) {
targetPorts.push(p);
}
}
}
const host = session.clientEndpointIp;
for (const port of targetPorts) {
this.udpServer.send(packetBuf, 0, packetBuf.length, port, host, (err) => {
if (err) {
this.debugLog('udp send error', session.id,
'packetId=0x' + packetId.toString(16),
'port=' + port,
err.message);
}
});
}
session.bytesTx += packetBuf.length;
}
_cacheResendPacket(session, sequenceNumber, packetBuf) {
if (!session || !Buffer.isBuffer(packetBuf)) return;
const configuredCacheSize = Number(this.service_config.udp_retransmit_cache_size);
const maxCacheSize = Number.isFinite(configuredCacheSize)
? Math.max(128, Math.floor(configuredCacheSize))
: 4096;
const seq = sequenceNumber >>> 0;
if (!session.resendPacketCache.has(seq)) {
session.resendPacketOrder.push(seq);
}
session.resendPacketCache.set(seq, {
packet: Buffer.from(packetBuf),
ts: Date.now(),
});
while (session.resendPacketOrder.length > maxCacheSize) {
const dropSeq = session.resendPacketOrder.shift();
session.resendPacketCache.delete(dropSeq);
}
}
_lookupResendPacket(session, sequenceNumber) {
if (!session) return null;
const seq = sequenceNumber >>> 0;
const direct = session.resendPacketCache.get(seq);
if (direct && Buffer.isBuffer(direct.packet)) {
return direct.packet;
}
// Some clients may only preserve the low 8 bits from AFFlags.
const low8 = seq & 0xFF;
for (let i = session.resendPacketOrder.length - 1; i >= 0; i--) {
const candidateSeq = session.resendPacketOrder[i] >>> 0;
if ((candidateSeq & 0xFF) !== low8) continue;
const candidate = session.resendPacketCache.get(candidateSeq);
if (candidate && Buffer.isBuffer(candidate.packet)) {
return candidate.packet;
}
}
return null;
}
_handleUdpDatagram(msg, rinfo) {
if (!Buffer.isBuffer(msg) || msg.length === 0) {
return;
}
this._learnUdpPeerPort(rinfo);
if (msg.length < 12) {
this.debugLog('udp rx', rinfo.address + ':' + rinfo.port, 'len=' + msg.length);
return;
}
const resendReq = this._parseRequestPacketListResend(msg);
if (!resendReq) {
this.debugLog('udp rx', rinfo.address + ':' + rinfo.port, 'len=' + msg.length);
return;
}
const peerIp = this._normalizeIp(rinfo.address);
let targetSession = null;
for (const session of this.sessions.values()) {
if (!session || !session.udpDataEnabled || !session.requestedUdp) continue;
if (this._normalizeIp(session.clientEndpointIp) !== peerIp) continue;
if ((session.clientId >>> 0) !== resendReq.clientId) continue;
if (((session.openFileId || 0) & 0xFFFF) !== resendReq.sourceId) continue;
// If we already learned a client media port, require it to match.
if (session.clientPort > 0 && session.clientPort !== rinfo.port) {
continue;
}
targetSession = session;
break;
}
if (!targetSession) {
this.debugLog('udp resend no session',
'from=' + peerIp + ':' + rinfo.port,
'clientId=0x' + resendReq.clientId.toString(16),
'sourceId=0x' + resendReq.sourceId.toString(16),
'count=' + resendReq.packetList.length);
return;
}
let resent = 0;
let missing = 0;
for (const sequenceNumber of resendReq.packetList) {
const packet = this._lookupResendPacket(targetSession, sequenceNumber);
if (!packet) {
missing++;
continue;
}
this.udpServer.send(packet, 0, packet.length, targetSession.clientPort >>> 0, targetSession.clientEndpointIp, (err) => {
if (err) {
this.debugLog('udp resend send error', targetSession.id,
'seq=' + (sequenceNumber >>> 0),
err.message);
}
});
targetSession.bytesTx += packet.length;
resent++;
}
this.debugLog('udp resend', targetSession.id,
'requested=' + resendReq.packetList.length,
'resent=' + resent,
'missing=' + missing,
'from=' + peerIp + ':' + rinfo.port);
}
_parseRequestPacketListResend(msg) {
if (!Buffer.isBuffer(msg) || msg.length < 12) return null;
const MMS_RETRANSMIT_SIGNATURE = 0xBEEFF00D;
let littleEndian = true;
if (msg.readUInt32LE(0) !== MMS_RETRANSMIT_SIGNATURE) {
if (msg.readUInt32BE(0) !== MMS_RETRANSMIT_SIGNATURE) {
return null;
}
littleEndian = false;
}
const readU32 = (offset) => littleEndian ? msg.readUInt32LE(offset) : msg.readUInt32BE(offset);
const readU16 = (offset) => littleEndian ? msg.readUInt16LE(offset) : msg.readUInt16BE(offset);
const clientId = readU32(4) >>> 0;
const sourceId = readU16(8) >>> 0;
const numPackets = readU16(10) >>> 0;
if (numPackets < 1 || numPackets > 32) {
return null;
}
const expectedLen = 12 + (numPackets * 4);
if (msg.length < expectedLen) {
return null;
}
const packetList = [];
let offset = 12;
for (let i = 0; i < numPackets; i++) {
packetList.push(readU32(offset) >>> 0);
offset += 4;
}
return {
clientId,
sourceId,
packetList,
};
}
_flushQueuedPackets(socket, session) {
if (socket.destroyed || session.packetQueue.length === 0) {
return true;
}
const currentGen = session.seekGeneration;
let writeOk = true;
while (session.packetQueue.length > 0) {
const queuedPacket = session.packetQueue[0];
if (queuedPacket.generation !== currentGen) {
session.packetQueue.shift();
continue;
}
writeOk = socket.write(queuedPacket.buffer);
session.bytesTx += queuedPacket.buffer.length;
session.packetQueue.shift();
if (!writeOk) return false;
}
return true;
}
// -------------------------------------------------------------------------
// Individual response payload builders
// -------------------------------------------------------------------------
_utf16leBuffer(str) {
return Buffer.from((str || '') + '\0', 'utf16le');
}
_decodeUtf16CString(buf) {
if (!buf || buf.length === 0) return '';
let end = buf.length;
for (let offset = 0; offset + 1 < buf.length; offset += 2) {
if (buf.readUInt16LE(offset) === 0) {
end = offset;
break;
}
}
return buf.slice(0, end).toString('utf16le').trim();
}
_normalizeIp(remoteAddress) {
if (!remoteAddress) return '';
return remoteAddress.startsWith('::ffff:') ? remoteAddress.slice(7) : remoteAddress;
}
_selectTransportString(utf16Str, ansiStr) {
const a = (utf16Str || '').trim();
const b = (ansiStr || '').trim();
const score = (s) => {
let v = 0;
if (!s) return v;
if (s.includes('\\')) v += 2;
if (/\b(?:CUBDD|MMSU|UDP|TCP)\b/i.test(s)) v += 2;
if (/\\\d+/.test(s)) v += 1;
return v;
};
return score(a) >= score(b) ? a : b;
}
_parseTransportRequest(transportStr) {
if (!transportStr) return null;
const ipMatch = transportStr.match(/\\([^\\]+)/);
const ip = ipMatch ? ipMatch[1] : '';
const protocol = this._inferTransportToken(transportStr) || '';
let port = 0;
const candidatePorts = [];
if (protocol) {
const tokenPortRegex = new RegExp('\\\\' + protocol + '(?:\\\\(\\d+))+', 'ig');
let tokenMatch;
while ((tokenMatch = tokenPortRegex.exec(transportStr)) !== null) {
const numericParts = tokenMatch[0].match(/\\(\\d+)/g) || [];
for (const numericPart of numericParts) {
const parsed = parseInt(numericPart.slice(1), 10);
if (Number.isInteger(parsed) && parsed > 0 && parsed <= 65535) {
candidatePorts.push(parsed);
}
}
}
}
if (candidatePorts.length === 0) {
const fallbackPortRegex = /\\(?:CUBDD|MMSU|UDP|TCP)\\(\d+)/ig;
let fallbackMatch;
while ((fallbackMatch = fallbackPortRegex.exec(transportStr)) !== null) {
const parsed = parseInt(fallbackMatch[1], 10);
if (Number.isInteger(parsed) && parsed > 0 && parsed <= 65535) {
candidatePorts.push(parsed);
}
}
}
// Prefer the first valid candidate as initial target, then probe alternates.
if (candidatePorts.length > 0) {
port = candidatePorts[0];
}
if (!ip && !protocol && !port) return null;
return {
ip,
protocol,
port,
ports: [...new Set(candidatePorts)],
};
}
_learnUdpPeerPort(rinfo) {
if (!rinfo || !Number.isInteger(rinfo.port)) return;
const peerIp = this._normalizeIp(rinfo.address);
const learnedPort = this._sanitizeUdpPort(rinfo.port);
if (!peerIp || learnedPort <= 0) return;
const matchingSessions = [];
for (const session of this.sessions.values()) {
if (!session || !session.requestedUdp) continue;
if (this._normalizeIp(session.clientEndpointIp) !== peerIp) continue;
matchingSessions.push(session);
}
if (matchingSessions.length !== 1) {
return;
}
const session = matchingSessions[0];
if (session.udpPeerLearnedPort === learnedPort && session.clientPort === learnedPort) {
return;
}
const previousPort = session.clientPort;
session.udpPeerLearnedPort = learnedPort;
session.clientPort = learnedPort;
if (!Array.isArray(session.clientPortCandidates)) {
session.clientPortCandidates = [];
}
if (session.clientPortCandidates.indexOf(learnedPort) < 0) {
session.clientPortCandidates.unshift(learnedPort);
}
this._refreshUdpTransportState(session);
this.debugLog('udp peer learned', session.id,
'oldPort=' + (previousPort || 0),
'newPort=' + learnedPort,
'ip=' + peerIp);
}
_sanitizeUdpPort(port) {
const parsed = Number(port);
return Number.isInteger(parsed) && parsed > 0 && parsed <= 65535 ? parsed : 0;
}
_refreshUdpTransportState(session) {
if (!session) return;
const udpEnabledByConfig = this.service_config.enable_udp !== false;
session.clientPort = this._sanitizeUdpPort(session.clientPort);
session.udpDataEnabled = !!session.requestedUdp
&& udpEnabledByConfig
&& !!this.udpServer
&& !!this.udpServerReady
&& session.clientPort > 0
&& !!session.clientEndpointIp;
}
_inferTransportToken(transportStr) {
if (!transportStr) return '';
const upper = String(transportStr).toUpperCase();
// Prioritize UDP-like transports over TCP when both appear.
if (upper.includes('\\CUBDD\\')) return 'CUBDD';
if (upper.includes('\\MMSU\\')) return 'MMSU';
if (upper.includes('\\UDP\\')) return 'UDP';
if (upper.includes('\\TCP\\')) return 'TCP';
return '';
}
_resolveDataAfMode(session) {
const mode = String(this.service_config?.data_af_mode || 'auto').toLowerCase();
if (mode === 'zero' || mode === 'sequence') return mode;
return session.isLegacyClient ? 'sequence' : 'zero';
}
_resolveMediaPacketId(session) {
const mode = String(this.service_config?.media_packet_id_mode || 'auto').toLowerCase();
if (mode === 'fixed') return MMS_MEDIA_PACKET_ID;
if (mode === 'incarnation') return session.mediaIncarnation & 0xFF;
return session.isLegacyClient
? (session.mediaIncarnation & 0xFF)
: MMS_MEDIA_PACKET_ID;
}
_nextDataFlags(session) {
if (session.dataAfMode === 'sequence') {
const sequence = session.dataSequence >>> 0;
const flags = sequence & 0xFF;
session.lastDataSequenceTx = sequence;
session.dataSequence = (sequence + 1) >>> 0;
return flags;
}
session.lastDataSequenceTx = session.mediaPacketsTx >>> 0;
return 0x00;
}
_parseStartPlaying(openFileId, payload) {
// _handleCommandPacket passes the reduced StartPlaying payload that begins at position(8).
if (!payload || payload.length < 24) return null;
const rawPosition = payload.readDoubleLE(0);
const asfOffset = payload.readUInt32LE(8) >>> 0;
const locationId = payload.readUInt32LE(12) >>> 0;
const frameOffset = payload.readUInt32LE(16) >>> 0;
const playIncarnation = payload.readUInt32LE(20) >>> 0;
const position = Number.isFinite(rawPosition) && rawPosition >= 0 && rawPosition < 1e300
? rawPosition
: null;
return {
openFileId: openFileId >>> 0,
position,
asfOffset,
locationId,
frameOffset,
playIncarnation,
};
}
_resolveStartSeek(session, startReq) {
if (!startReq || !session.mediaPath) return null;
const dataStart = session.asfDataOffset + 50;
const maxPacketSize = Math.max(1, session.asfPacketSize >>> 0);
const maxMediaBytes = Math.max(0, session.asfFileSize - dataStart);
const maxPacketFromBytes = maxMediaBytes > 0
? Math.max(0, Math.floor((maxMediaBytes - 1) / maxPacketSize))
: 0;
const maxPacketFromHeader = session.asfPacketCount > 0
? Math.max(0, (session.asfPacketCount >>> 0) - 1)
: maxPacketFromBytes;
const maxPacketNumber = Math.max(maxPacketFromBytes, maxPacketFromHeader);
const clampPacketNumber = (value) => {
const n = Number.isFinite(value) ? Math.floor(value) : 0;
return Math.max(0, Math.min(n, maxPacketNumber));
};
let packetNumber = 0;
let readPos = dataStart;
let seekSource = 'default';
if (startReq.locationId !== 0 && startReq.locationId !== 0xFFFFFFFF) {
packetNumber = clampPacketNumber(startReq.locationId >>> 0);
readPos = dataStart + (packetNumber * maxPacketSize);
seekSource = 'locationId';
this.debugLog('seek resolve', session.id, seekSource, 'packet=' + packetNumber, 'readPos=0x' + readPos.toString(16));
return {
packetNumber,
readPos: Math.min(readPos, session.asfFileSize),
};
}
if (startReq.asfOffset !== 0 && startReq.asfOffset !== 0xFFFFFFFF) {
const requested = Math.max(dataStart, startReq.asfOffset >>> 0);
const relative = Math.max(0, requested - dataStart);
packetNumber = clampPacketNumber(Math.floor(relative / maxPacketSize));
readPos = dataStart + (packetNumber * maxPacketSize);
seekSource = 'asfOffset';
this.debugLog('seek resolve', session.id, seekSource, 'packet=' + packetNumber, 'readPos=0x' + readPos.toString(16));
return {
packetNumber,
readPos: Math.min(readPos, session.asfFileSize),
};
}
if (Number.isFinite(startReq.position) && startReq.position > 0 && startReq.position < 1e300) {
if (session.asfPacketCount > 0 && session.asfDurationSec > 0) {
const ratio = Math.max(0, Math.min(1, startReq.position / session.asfDurationSec));
packetNumber = clampPacketNumber(Math.floor(ratio * session.asfPacketCount));
readPos = dataStart + (packetNumber * maxPacketSize);
seekSource = 'position-duration';
} else if (session.bitrateBps > 0) {
const bytePos = Math.floor((startReq.position * session.bitrateBps) / 8);
const clamped = Math.max(0, Math.min(bytePos, maxMediaBytes));
packetNumber = clampPacketNumber(Math.floor(clamped / maxPacketSize));
readPos = dataStart + (packetNumber * maxPacketSize);
seekSource = 'position-bitrate';
}
}
this.debugLog('seek resolve', session.id, seekSource, 'packet=' + packetNumber, 'readPos=0x' + readPos.toString(16));
return {
packetNumber,
readPos: Math.min(readPos, session.asfFileSize),
};
}
_buildConnectAcceptedPayload(session) {
// LinkMacToViewerReportConnectedEX
const isLegacyClient = /^NSPlayer\/6\./i.test(session?.clientPlayer || '');
const serverVersion = this._utf16leBuffer(this.majorVersion + '.' + this.minorVersion);
const playerMinVersion = this._utf16leBuffer(''); // no minimum version
const updatePlayerUrl = this._utf16leBuffer('');
const AuthenPackage = this._utf16leBuffer('');
const configUdp = (this.service_config.enable_udp === false) ? false : true;
const failure = (!configUdp && session.requestedUdp) || (configUdp && session.requestedUdp) || (!session.requestedUdp && !session.requestedTcp);
const header = Buffer.alloc(48);
header.writeUInt32LE(header.length / 8, 0); // chunkLength in 8-byte blocks
header.writeUInt32LE(0x00040001, 4); // MessageID
header.writeUInt32LE((failure ? 0xC00D0005 : 0x00000000), 8); // HRESULT (StatusCode) (NS_E_NOCONNECTION if failure = true, 0 = false)
header.writeUInt32LE(MMS_DISABLE_PACKET_PAIR, 12); // playIncaration (packet-pair support)
header.writeUInt32LE(0x0004000B, 16); // MacToViewerProtocolRevision
header.writeUInt32LE(0x0003001C, 20); // ViewerToMacProtocolRevision
header.writeDoubleLE(1.0, 8); // blockGroupPlayTime = 1.0
header.writeUInt32LE(0x00000001, 16); // blockGroupBlocks
header.writeUInt32LE(0x00000001, 20); // nMaxOpenFiles
header.writeUInt32LE(0x00008000, 24); // nBlockMaxBytes
header.writeUInt32LE(0x00989680, 28); // maxBitRate
header.writeUInt32LE(serverVersion.length / 2, 32); // cbServerVersionInfo (Unicode chars)
header.writeUInt32LE(playerMinVersion.length / 2, 36); // cbVersionInfo
header.writeUInt32LE(updatePlayerUrl.length / 2, 40); // cbVersionUrl
header.writeUInt32LE(AuthenPackage.length / 2, 44); // cbAuthenPackage
return Buffer.concat([header, serverVersion, playerMinVersion, updatePlayerUrl, AuthenPackage]);
}
_buildTransportAcceptedPayload() {
// packetPayloadSize(4) + funnelName (UTF-16LE, null-terminated)
const funnelName = Buffer.from('Funnel Of The Gods\0', 'utf16le');
const buf = Buffer.alloc(4 + funnelName.length);
// packetPayloadSize = 0 (already zero from alloc)
funnelName.copy(buf, 4);
return buf;
}
_buildFunnelInfoPayload(session) {
// transportMask(4) + nBlockFragments(4) + fragmentBytes(4) + nCubs(4)
// + failedCubs(4) + nDisks(4) + decluster(4) + cubddDatagramSize(4)
const buf = Buffer.alloc(32);
buf.writeUInt32LE(0x00000008, 0); // transportMask
buf.writeUInt32LE(0x00000001, 4); // nBlockFragments
buf.writeUInt32LE(0x00010000, 8); // fragmentBytes
buf.writeUInt32LE(session.clientId >>> 0, 12); // nCubs (unique client identifier)
buf.writeUInt32LE(0x00000000, 16); // failedCubs
buf.writeUInt32LE(0x00000001, 20); // nDisks
buf.writeUInt32LE(0x00000000, 24); // decluster
buf.writeUInt32LE((session.isCubddTransport) ? session.asfPacketSize + 8 : 0x00000000, 28); // cubddDatagramSize
return buf;
}
_buildFileDetailsPayload(session) {
// Full LinkMacToViewerReportOpenFile payload (100 bytes)
const buf = Buffer.alloc(100);
const headerBuf = this._readAsfHeader(session) || Buffer.alloc(0);
const mediaDataStart = session.asfDataOffset + 50;
const mediaBytes = Math.max(0, session.asfFileSize - mediaDataStart);
const packetCount = session.asfPacketCount > 0
? session.asfPacketCount
: (session.asfPacketSize > 0 ? Math.ceil(mediaBytes / session.asfPacketSize) : 0);
const durationSeconds = session.asfDurationSec > 0
? Math.ceil(session.asfDurationSec)
: (session.bitrateBps > 0
? Math.ceil((mediaBytes * 8) / session.bitrateBps)
: 0);
buf.writeUInt32LE(session.openFileId >>> 0, 0); // openFileId
// padding [4] = 0, fileName [8] = 0 (already zeroed)
buf.writeUInt32LE(FILE_ATTRIBUTE_MMS_CANSEEK, 12); // fileAttributes = FILE_ATTRIBUTE_MMS_CANSEEK
buf.writeDoubleLE(durationSeconds, 16); // fileDuration (8-byte double)
buf.writeUInt32LE(durationSeconds >>> 0, 24); // fileBlocks (integer seconds)
// unused1 [28-43] = 0 (already zeroed)
buf.writeUInt32LE(session.asfPacketSize >>> 0, 44); // filePacketSize
buf.writeUInt32LE(packetCount >>> 0, 48); // filePacketCount low 32 bits
// filePacketCount high 32 bits [52-55] = 0
buf.writeUInt32LE(session.bitrateBps >>> 0, 56); // fileBitRate
buf.writeUInt32LE(headerBuf.length >>> 0, 60); // fileHeaderSize
// unused2 [64-99] = 0 (already zeroed)
return buf;
}
_buildOpenFileErrorPayload() {
return Buffer.alloc(0);
}
}
module.exports = WTVMMS;