1919 lines
79 KiB
JavaScript
1919 lines
79 KiB
JavaScript
// Pure JS implementation of Microsoft MMS (MMST - MMS over TCP) streaming protocol.
|
||
// Used by WebTV Plus (LC2+) to stream WMA and ASF audio content over mms:// URLs.
|
||
// Port 1755 (standard MMS port). Commands over TCP; data over TCP or UDP.
|
||
//
|
||
// Protocol overview:
|
||
// - Command packets use the 48-byte MMST control header that starts with 0xb00bface.
|
||
// - Header/media packets use the 8-byte MMST data preheader:
|
||
// seq(4) + packetId(1) + flags(1) + totalLen(2)
|
||
// - The server replies with command packets for connect/transport/open/header/start,
|
||
// and sends ASF header/media bytes as MMST data packets paced by file bitrate.
|
||
|
||
'use strict';
|
||
|
||
const dgram = require('dgram');
|
||
|
||
const MMS_MAGIC = 0xB00BFACE;
|
||
const MMS_PROTOCOL = 0x20534D4D; // "MMS " little-endian
|
||
const MMS_COMMAND_HEADER_SIZE = 48;
|
||
const MMS_DIRECTION_TO_CLIENT = 0x0004;
|
||
|
||
// MMS command IDs (client -> server)
|
||
const MMS_CLIENT_CONNECT = 0x01;
|
||
const MMS_CLIENT_TRANSPORT = 0x02;
|
||
const MMS_CLIENT_OPEN = 0x05;
|
||
const MMS_CLIENT_START = 0x07;
|
||
const MMS_CLIENT_STOP = 0x09;
|
||
const MMS_CLIENT_CLOSE = 0x0D;
|
||
const MMS_CLIENT_HEADER_REQ = 0x15;
|
||
const MMS_CLIENT_TIMING_REQ = 0x18;
|
||
const MMS_CLIENT_KEEPALIVE = 0x1B;
|
||
const MMS_CLIENT_STREAM_SELECT = 0x33;
|
||
|
||
// MMS command IDs (server -> client)
|
||
const MMS_SERVER_CLIENT_ACCEPTED = 0x01;
|
||
const MMS_SERVER_PROTOCOL_ACCEPTED = 0x02;
|
||
const MMS_SERVER_PROTOCOL_FAILED = 0x03;
|
||
const MMS_SERVER_MEDIA_FOLLOWS = 0x05;
|
||
const MMS_SERVER_FILE_DETAILS = 0x06;
|
||
const MMS_SERVER_HEADER_ACCEPTED = 0x11;
|
||
const MMS_SERVER_TIMING_REPLY = 0x15;
|
||
const MMS_SERVER_KEEPALIVE = 0x1B;
|
||
const MMS_SERVER_STREAM_STOPPED = 0x1E;
|
||
const MMS_SERVER_STREAM_ACCEPTED = 0x21;
|
||
|
||
const MMS_HEADER_PACKET_ID = 0x05;
|
||
const MMS_MEDIA_PACKET_ID = 0x04;
|
||
const MMS_HEADER_FLAG_MORE = 0x04;
|
||
const MMS_HEADER_FLAG_FINAL = 0x0C;
|
||
|
||
const MMS_DISABLE_PACKET_PAIR = 0xf0f0f0ef
|
||
const MMS_USE_PACKET_PAIR = 0xf0f0f0f0
|
||
|
||
const FILE_ATTRIBUTE_MMS_CANSEEK = 0x01000000;
|
||
|
||
// Default ASF data packet size (bytes); real files embed this in the ASF header.
|
||
// 5001 is the standard Windows Media Encoder default.
|
||
const DEFAULT_ASF_PACKET_SIZE = 5001;
|
||
|
||
// Minimum pacing interval (ms) – prevents spinning too fast on low-bitrate files
|
||
const MIN_PACING_INTERVAL_MS = 10;
|
||
|
||
class WTVMMS {
|
||
minisrv_config = null;
|
||
service_name = null;
|
||
service_config = null;
|
||
server_config = null;
|
||
server = null;
|
||
udpServer = null;
|
||
udpServerReady = false;
|
||
wtvshared = null;
|
||
sessions = new Map();
|
||
|
||
constructor(minisrv_config, service_name, wtvshared, sendToClient, net) {
|
||
this.minisrv_config = minisrv_config;
|
||
this.service_name = service_name;
|
||
this.service_config = minisrv_config.services[service_name] || {};
|
||
this.server_config = this.service_config;
|
||
this.wtvshared = wtvshared;
|
||
this.version = require(`${wtvshared.path.dirname(require.main.filename)}/package.json`).version.replace(/^v/, '');
|
||
this.server = net.createServer((socket) => this.handleConnection(socket));
|
||
if (this.service_config.hide_minisrv_version || minisrv_config.config.hide_minisrv_version) {
|
||
this.serverName = "minisrv";
|
||
this.minorVersion = 1;
|
||
this.majorVersion = 0.0;
|
||
} else {
|
||
this.serverName = `minisrv/${this.version}`;
|
||
this.majorVersion = this.version.split('.').slice(0)[0];
|
||
this.minorVersion = this.version.split('.').slice(1).join('.');
|
||
}
|
||
}
|
||
|
||
listen(port, host = '0.0.0.0') {
|
||
this.server.listen(port, host);
|
||
|
||
const configuredUdpPort = Number(this.server_config?.udp_port);
|
||
const udpPort = Number.isFinite(configuredUdpPort) && configuredUdpPort > 0
|
||
? Math.floor(configuredUdpPort)
|
||
: 1755;
|
||
this.udpServer = dgram.createSocket('udp4');
|
||
this.udpServerReady = false;
|
||
this.udpServer.on('error', (err) => {
|
||
console.error('[WTVMMS] udp socket error', err.message);
|
||
});
|
||
this.udpServer.on('message', (msg, rinfo) => {
|
||
this._handleUdpDatagram(msg, rinfo);
|
||
});
|
||
this.udpServer.bind(udpPort, host, () => {
|
||
this.udpServerReady = true;
|
||
this.debugLog('udp listen', host + ':' + udpPort);
|
||
});
|
||
|
||
return this.server;
|
||
}
|
||
|
||
close() {
|
||
if (this.server) this.server.close();
|
||
if (this.udpServer) {
|
||
try { this.udpServer.close(); } catch (_) {}
|
||
this.udpServer = null;
|
||
this.udpServerReady = false;
|
||
}
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Helpers
|
||
// -------------------------------------------------------------------------
|
||
|
||
getDebugEnabled() {
|
||
return this.service_config.debug;
|
||
}
|
||
|
||
debugLog(...args) {
|
||
if (this.getDebugEnabled()) console.log('[WTVMMS]', ...args);
|
||
}
|
||
|
||
// Resolve a requested media path across all configured ServiceVaults.
|
||
resolveMediaPath(requestedMedia) {
|
||
if (!requestedMedia) return null;
|
||
const serviceVaultDir = this.service_config.servicevault_dir || this.service_name;
|
||
const vaults = this.minisrv_config.config?.ServiceVaults || [];
|
||
const variants = this._mediaVariants(requestedMedia);
|
||
|
||
for (const vault of vaults) {
|
||
const base = this.wtvshared.getAbsolutePath(serviceVaultDir, vault);
|
||
for (const variant of variants) {
|
||
const candidate = this.wtvshared.makeSafePath(base, variant);
|
||
if (candidate && this.wtvshared.fs.existsSync(candidate) && this.wtvshared.fs.lstatSync(candidate).isFile()) {
|
||
this.debugLog('media resolved', variant, '->', candidate);
|
||
return candidate;
|
||
}
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
_mediaVariants(media) {
|
||
const p = media.replace(/\\/g, '/').replace(/^\/+/, '');
|
||
const ext = this.wtvshared.path.posix.extname(p).toLowerCase();
|
||
const stem = ext ? p.slice(0, -ext.length) : p;
|
||
const list = [p];
|
||
// Accept both .wma and .asf for the same stem
|
||
if (ext === '.wma') list.push(stem + '.asf');
|
||
if (ext === '.asf') list.push(stem + '.wma');
|
||
if (!ext) { list.push(stem + '.wma'); list.push(stem + '.asf'); }
|
||
return [...new Set(list)];
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Connection handling
|
||
// -------------------------------------------------------------------------
|
||
|
||
handleConnection(socket) {
|
||
socket.setNoDelay(true);
|
||
|
||
const session = {
|
||
id: `${socket.remoteAddress}:${socket.remotePort}`,
|
||
socket,
|
||
clientEndpointIp: this._normalizeIp(socket.remoteAddress),
|
||
rxBuf: Buffer.alloc(0),
|
||
mediaPath: null,
|
||
streaming: false,
|
||
paused: false,
|
||
pacingTimer: null,
|
||
asfFd: null,
|
||
requestedTransport: '',
|
||
clientPort: 0,
|
||
clientPortCandidates: [],
|
||
udpPeerLearnedPort: 0,
|
||
tcpDataEnabled: true,
|
||
udpDataEnabled: false,
|
||
requestedUdp: false,
|
||
isCubddTransport: false,
|
||
asfPacketSize: DEFAULT_ASF_PACKET_SIZE,
|
||
asfDataOffset: 0,
|
||
asfFileSize: 0,
|
||
asfPacketCount: 0,
|
||
asfDurationSec: 0,
|
||
bitrateBps: 0,
|
||
commandSeq: 0,
|
||
openFileId: 1,
|
||
clientId: (Math.random() * 0x7FFFFFFF + 1) >>> 0,
|
||
openIncarnation: 0,
|
||
headerIncarnation: MMS_HEADER_PACKET_ID,
|
||
mediaIncarnation: MMS_MEDIA_PACKET_ID,
|
||
headerPacketId: MMS_HEADER_PACKET_ID,
|
||
mediaPacketId: MMS_MEDIA_PACKET_ID,
|
||
readBlockIncarnation: 0,
|
||
dataSequence: 0,
|
||
lastDataSequenceTx: 0,
|
||
dataAfMode: 'auto',
|
||
bytesTx: 0,
|
||
headerPacketsTx: 0,
|
||
mediaPacketsTx: 0,
|
||
burstPacketsRemaining: 0,
|
||
burstMultiplier: 1,
|
||
waitingForDrain: false,
|
||
drainHandler: null,
|
||
packetIntervalMs: MIN_PACING_INTERVAL_MS,
|
||
nextPacketDueAt: 0,
|
||
effectivePacketSize: 0,
|
||
seekGeneration: 0,
|
||
packetQueue: [],
|
||
eosTimer: null,
|
||
eosPingTimer: null,
|
||
lastKeepaliveTxMs: 0,
|
||
resendPacketCache: new Map(),
|
||
resendPacketOrder: [],
|
||
recentPacketSizes: [],
|
||
smoothedPacketSize: 0,
|
||
pacingSamples: 0,
|
||
};
|
||
|
||
this.sessions.set(socket, session);
|
||
this.debugLog('client connected', session.id);
|
||
|
||
socket.on('data', (data) => {
|
||
try {
|
||
session.rxBuf = Buffer.concat([session.rxBuf, data]);
|
||
this._processIncoming(socket, session);
|
||
} catch (e) {
|
||
console.error('[WTVMMS] handleData error', e);
|
||
}
|
||
});
|
||
|
||
socket.on('close', () => {
|
||
this._stopStream(session);
|
||
if (session.asfFd !== null) { try { this.wtvshared.fs.closeSync(session.asfFd); } catch(_){} session.asfFd = null; }
|
||
this.sessions.delete(socket);
|
||
this.debugLog('client disconnected', session.id, 'tx=' + session.bytesTx);
|
||
});
|
||
|
||
socket.on('error', (err) => {
|
||
this._stopStream(session);
|
||
if (session.asfFd !== null) { try { this.wtvshared.fs.closeSync(session.asfFd); } catch(_){} session.asfFd = null; }
|
||
this.sessions.delete(socket);
|
||
this.debugLog('socket error', session.id, err.message);
|
||
});
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Incoming packet parser
|
||
// -------------------------------------------------------------------------
|
||
|
||
_processIncoming(socket, session) {
|
||
while (session.rxBuf.length > 0) {
|
||
const buf = session.rxBuf;
|
||
|
||
if (buf[0] === 0x4E || buf[0] === 0x47) {
|
||
const lineEnd = buf.indexOf('\r\n');
|
||
if (lineEnd < 0 && buf.length < 512) break;
|
||
const line = buf.slice(0, lineEnd > 0 ? lineEnd + 2 : buf.length).toString('latin1');
|
||
this.debugLog('text connect line', session.id, line.trim());
|
||
session.rxBuf = buf.slice(lineEnd > 0 ? lineEnd + 2 : buf.length);
|
||
continue;
|
||
}
|
||
|
||
if (buf.length < 8) break;
|
||
|
||
if (buf.length >= 12 && buf.readUInt32LE(4) === MMS_MAGIC) {
|
||
const totalLength = buf.readUInt32LE(8) + 16;
|
||
if (totalLength < MMS_COMMAND_HEADER_SIZE || totalLength > 1048576) {
|
||
this.debugLog('invalid command length, dropping connection', session.id, totalLength);
|
||
socket.destroy();
|
||
return;
|
||
}
|
||
if (buf.length < totalLength) break;
|
||
|
||
const packet = buf.slice(0, totalLength);
|
||
session.rxBuf = buf.slice(totalLength);
|
||
this._handleCommandPacket(socket, session, packet);
|
||
continue;
|
||
}
|
||
|
||
const packetLength = buf.readUInt16LE(6);
|
||
if (packetLength >= 8 && buf.length >= packetLength) {
|
||
this.debugLog('discarding unexpected client data packet', session.id, 'len=' + packetLength);
|
||
session.rxBuf = buf.slice(packetLength);
|
||
continue;
|
||
}
|
||
|
||
this.debugLog('bad magic, resyncing', session.id, buf.slice(0, 8).toString('hex'));
|
||
session.rxBuf = buf.slice(1);
|
||
}
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Command packet dispatch
|
||
// -------------------------------------------------------------------------
|
||
|
||
_handleCommandPacket(socket, session, packet) {
|
||
if (packet.length < MMS_COMMAND_HEADER_SIZE) return;
|
||
|
||
const cmdId = packet.readUInt16LE(36);
|
||
const direction = packet.readUInt16LE(38);
|
||
const prefix1 = packet.readUInt32LE(40);
|
||
const prefix2 = packet.readUInt32LE(44);
|
||
const payload = packet.slice(MMS_COMMAND_HEADER_SIZE);
|
||
|
||
this.debugLog(
|
||
'rx cmd',
|
||
session.id,
|
||
'0x' + cmdId.toString(16),
|
||
'dir=0x' + direction.toString(16),
|
||
'p1=0x' + prefix1.toString(16),
|
||
'p2=0x' + prefix2.toString(16),
|
||
'len=' + packet.length,
|
||
'from=' + socket.remoteAddress + ':' + socket.remotePort
|
||
);
|
||
|
||
switch (cmdId) {
|
||
case MMS_CLIENT_CONNECT:
|
||
this._handleConnect(socket, session, payload);
|
||
break;
|
||
case MMS_CLIENT_TRANSPORT:
|
||
this._handleTransport(socket, session, payload);
|
||
break;
|
||
case MMS_CLIENT_OPEN:
|
||
session.openIncarnation = prefix1; // playIncarnation from OpenFile
|
||
this._handleOpen(socket, session, payload);
|
||
break;
|
||
case MMS_CLIENT_START:
|
||
this._handleStart(socket, session, prefix1, payload);
|
||
break;
|
||
case MMS_CLIENT_STOP:
|
||
this._handleStop(socket, session, prefix2); // prefix2 = stopPlayIncarnation
|
||
break;
|
||
case MMS_CLIENT_HEADER_REQ:
|
||
this._handleHeaderRequest(socket, session, payload);
|
||
break;
|
||
case MMS_CLIENT_TIMING_REQ:
|
||
this._sendCommand(socket, session, MMS_SERVER_TIMING_REPLY, 0, 0xF0F0F0EF, this._buildFunnelInfoPayload(session));
|
||
break;
|
||
case MMS_CLIENT_KEEPALIVE:
|
||
this._handleClientKeepalive(socket, session, prefix1, prefix2);
|
||
break;
|
||
case MMS_CLIENT_STREAM_SELECT:
|
||
this._sendCommand(socket, session, MMS_SERVER_STREAM_ACCEPTED, 0, 0, Buffer.alloc(0));
|
||
break;
|
||
case MMS_CLIENT_CLOSE:
|
||
socket.end();
|
||
break;
|
||
default:
|
||
this.debugLog('unhandled cmd', session.id, '0x' + cmdId.toString(16));
|
||
}
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Protocol handshake handlers
|
||
// -------------------------------------------------------------------------
|
||
|
||
_handleConnect(socket, session, payload) {
|
||
const clientStr = payload.length > 4
|
||
? this._decodeUtf16CString(payload.slice(4))
|
||
: '';
|
||
session.clientPlayer = clientStr;
|
||
session.isLegacyClient = /^NSPlayer\/6\./i.test(clientStr);
|
||
// Preserve ffplay/VLC/MPC compatibility: keep MMST data flags at 0 unless legacy mode is required.
|
||
session.dataAfMode = this._resolveDataAfMode(session);
|
||
this.debugLog('connect', session.id, clientStr || '(no player string)');
|
||
console.log(' * [MMS]', `[${session.id}]`, 'Client connect:', clientStr || '(unknown player)');
|
||
if (session.isLegacyClient) {
|
||
this._sendCommandBatch(socket, session, [
|
||
{
|
||
cmdId: MMS_SERVER_CLIENT_ACCEPTED,
|
||
prefix1: 0,
|
||
prefix2: 0xF0F0F0EF,
|
||
payloadBuf: this._buildConnectAcceptedPayload(session)
|
||
}
|
||
]);
|
||
return;
|
||
}
|
||
|
||
this._sendCommand(socket, session, MMS_SERVER_CLIENT_ACCEPTED, 0, 0xF0F0F0EF, this._buildConnectAcceptedPayload(session));
|
||
}
|
||
|
||
_handleTransport(socket, session, payload) {
|
||
// Transport payload has an 8-byte prefix before the UTF-16 string.
|
||
// If slice(8) yields a valid UTF-16 string, use it; otherwise try slice(4) as fallback.
|
||
const tryBuf8 = payload.length > 8 ? payload.slice(8) : Buffer.alloc(0);
|
||
const tryBuf4 = payload.length > 4 ? payload.slice(4) : Buffer.alloc(0);
|
||
const probe8 = this._decodeUtf16CString(tryBuf8);
|
||
const transportBuf = probe8.length > 0 ? tryBuf8 : tryBuf4;
|
||
const transportUtf16 = probe8.length > 0 ? probe8 : this._decodeUtf16CString(tryBuf4);
|
||
const transportAnsi = transportBuf.toString('latin1').replace(/\0+/g, '').trim();
|
||
const transportStr = this._selectTransportString(transportUtf16, transportAnsi);
|
||
|
||
const requested = this._parseTransportRequest(transportStr);
|
||
this.debugLog('transport parsed', session.id, JSON.stringify(requested));
|
||
if (requested) {
|
||
session.requestedTransport = requested.protocol || session.requestedTransport;
|
||
session.requestedClientIp = requested.ip;
|
||
session.clientPort = this._sanitizeUdpPort(requested.port);
|
||
session.clientPortCandidates = Array.isArray(requested.ports)
|
||
? requested.ports.map((p) => this._sanitizeUdpPort(p)).filter((p) => p > 0)
|
||
: (session.clientPort > 0 ? [session.clientPort] : []);
|
||
if (session.clientPort > 0 && session.clientPortCandidates.indexOf(session.clientPort) < 0) {
|
||
session.clientPortCandidates.unshift(session.clientPort);
|
||
}
|
||
session.udpPeerLearnedPort = 0;
|
||
// Do not trust IP inside transport request; use connected peer IP.
|
||
session.clientEndpointIp = this._normalizeIp(socket.remoteAddress);
|
||
this.debugLog(
|
||
'transport endpoint',
|
||
session.id,
|
||
'requestedTransport=' + session.requestedTransport,
|
||
'requestedIp=' + requested.ip,
|
||
'peerIp=' + session.clientEndpointIp,
|
||
'port=' + requested.port,
|
||
'candidates=' + session.clientPortCandidates.join(',')
|
||
);
|
||
}
|
||
|
||
const transportToken = String(session.requestedTransport || '').toUpperCase();
|
||
|
||
// CUBDD is WebTV's proprietary UDP transport — treat it as UDP-capable but track separately
|
||
// so CUBDD-specific hacks don't bleed into generic MMSU/UDP handling.
|
||
const isCubdd = transportToken === 'CUBDD'
|
||
|| /\bCUBDD\b/i.test(transportStr)
|
||
|| /\bCUBDD\b/i.test(transportUtf16)
|
||
|| /\bCUBDD\b/i.test(transportAnsi);
|
||
|
||
const requestedUdp = isCubdd
|
||
|| transportToken === 'UDP'
|
||
|| transportToken === 'MMSU'
|
||
|| /\b(?:UDP|MMSU)\b/i.test(transportStr)
|
||
|| /\b(?:UDP|MMSU)\b/i.test(transportUtf16)
|
||
|| /\b(?:UDP|MMSU)\b/i.test(transportAnsi);
|
||
|
||
session.isCubddTransport = isCubdd;
|
||
session.requestedUdp = requestedUdp;
|
||
this._refreshUdpTransportState(session);
|
||
|
||
// If client asked for UDP but we can't enable it (e.g. invalid/out-of-range port),
|
||
// fall back to TCP so we don't leave both transport flags false.
|
||
if (requestedUdp && !session.udpDataEnabled) {
|
||
this.debugLog('transport udp fallback to tcp', session.id,
|
||
'reason=udpDataEnabled=false clientPort=' + session.clientPort);
|
||
session.tcpDataEnabled = true;
|
||
} else {
|
||
session.tcpDataEnabled = !requestedUdp;
|
||
}
|
||
this.debugLog('transport', session.id,
|
||
'token=' + (transportToken || '(unknown)'),
|
||
'tcpDataEnabled=' + session.tcpDataEnabled,
|
||
'udpDataEnabled=' + session.udpDataEnabled,
|
||
'udpPort=' + (session.clientPort || 0),
|
||
'udpPeer=' + (session.clientEndpointIp || '(none)'),
|
||
'udpReady=' + this.udpServerReady);
|
||
this._sendCommand(socket, session, MMS_SERVER_PROTOCOL_ACCEPTED, 0, 0, this._buildTransportAcceptedPayload());
|
||
}
|
||
|
||
_handleOpen(socket, session, payload) {
|
||
let requestedUrl = '';
|
||
if (payload.length > 8) {
|
||
requestedUrl = payload.slice(8).toString('utf16le').split('\0')[0].trim();
|
||
}
|
||
this.debugLog('open file', session.id, requestedUrl);
|
||
|
||
let mediaStem = requestedUrl
|
||
.replace(/^mms:\/\/[^/]+/i, '')
|
||
.replace(/^\/+/, '');
|
||
|
||
session.mediaPath = this.resolveMediaPath(mediaStem);
|
||
console.log(' * [MMS]', `[${session.id}]`, 'Open:', requestedUrl, '->', session.mediaPath || '(not found)');
|
||
|
||
if (!session.mediaPath) {
|
||
this._sendCommand(socket, session, MMS_SERVER_FILE_DETAILS, 0x80070002, session.openIncarnation, Buffer.alloc(0));
|
||
socket.end();
|
||
return;
|
||
}
|
||
|
||
this._parseAsfHeader(session);
|
||
|
||
this._sendCommand(socket, session, MMS_SERVER_FILE_DETAILS, 0, session.openIncarnation, this._buildFileDetailsPayload(session));
|
||
}
|
||
|
||
_handleHeaderRequest(socket, session, payload) {
|
||
if (!session.mediaPath) return;
|
||
let requestedInc = 0;
|
||
// _handleCommandPacket passes the reduced ReadBlock payload that starts at offset,
|
||
// so playIncarnation is 32 bytes into that buffer.
|
||
if (payload.length >= 36) {
|
||
requestedInc = payload.readUInt32LE(32) >>> 0;
|
||
}
|
||
session.readBlockIncarnation = requestedInc;
|
||
// Echo the requested playIncarnation for ReportReadBlock/header data packets.
|
||
// If missing/invalid, retain previous value or fall back to legacy packet id.
|
||
if (requestedInc >= 1 && requestedInc <= 0xFE) {
|
||
session.headerIncarnation = requestedInc;
|
||
} else if (!session.headerIncarnation) {
|
||
session.headerIncarnation = MMS_HEADER_PACKET_ID;
|
||
}
|
||
session.headerPacketId = (session.headerIncarnation & 0xFF) || MMS_HEADER_PACKET_ID;
|
||
session.headerPacketsTx = 0;
|
||
this.debugLog(
|
||
'header request',
|
||
session.id,
|
||
'requestedInc=0x' + requestedInc.toString(16),
|
||
'headerIncarnation=0x' + session.headerIncarnation.toString(16),
|
||
'headerPacketId=0x' + session.headerPacketId.toString(16)
|
||
);
|
||
|
||
// ReportReadBlock: hr=0, playIncarnation=from ReadBlock, playSequence=0
|
||
const readBlockPayload = Buffer.alloc(4);
|
||
readBlockPayload.writeUInt32LE(0, 0); // playSequence
|
||
this._sendCommand(socket, session, MMS_SERVER_HEADER_ACCEPTED, 0, session.headerIncarnation, readBlockPayload);
|
||
|
||
this._refreshUdpTransportState(session);
|
||
|
||
if (!session.tcpDataEnabled && !session.udpDataEnabled) {
|
||
this.debugLog('no data transport available', session.id, 'closing connection');
|
||
socket.end();
|
||
return;
|
||
}
|
||
|
||
const asfHeaderBuf = this._readAsfHeader(session);
|
||
if (asfHeaderBuf && asfHeaderBuf.length > 0) {
|
||
this._sendHeaderPackets(socket, session, asfHeaderBuf);
|
||
}
|
||
}
|
||
|
||
_handleStart(socket, session, openFileId, payload) {
|
||
if (!session.mediaPath) {
|
||
this.debugLog('start requested but no media path', session.id);
|
||
return;
|
||
}
|
||
|
||
const startReq = this._parseStartPlaying(openFileId, payload);
|
||
if (startReq === null) {
|
||
this.debugLog('start payload too short', session.id, 'len=' + payload.length);
|
||
return;
|
||
}
|
||
|
||
this._clearPendingEos(session);
|
||
session.mediaIncarnation = startReq.playIncarnation >>> 0;
|
||
session.mediaPacketId = this._resolveMediaPacketId(session);
|
||
|
||
const seek = this._resolveStartSeek(session, startReq);
|
||
if (seek) {
|
||
session._nextReadPos = seek.readPos;
|
||
session._nextPacketNumber = seek.packetNumber;
|
||
}
|
||
|
||
// Increment generation counter to invalidate pending packets from old seek
|
||
session.seekGeneration = (session.seekGeneration + 1) >>> 0;
|
||
session.packetQueue = [];
|
||
|
||
if (session.streaming) {
|
||
this._stopStream(session);
|
||
}
|
||
|
||
// Reset Data packet AFFlags sequence for each new StartPlaying sequence.
|
||
session.dataSequence = 0;
|
||
|
||
this.debugLog(
|
||
'start stream',
|
||
session.id,
|
||
'mediaIncarnation=0x' + session.mediaIncarnation.toString(16),
|
||
'position=' + (startReq.position === null ? 'unspecified' : startReq.position),
|
||
'asfOffset=' + startReq.asfOffset,
|
||
'locationId=' + startReq.locationId,
|
||
seek ? ('seekPacket=' + seek.packetNumber) : 'seekPacket=0'
|
||
);
|
||
|
||
// ReportStartedPlaying: hr=0, playIncarnation=from StartPlaying,
|
||
// tigerFileId=openFileId, unused1=0, unused2=12 bytes zeros
|
||
session.mediaPacketsTx = seek ? seek.packetNumber : 0;
|
||
const startedPayload = Buffer.alloc(20);
|
||
startedPayload.writeUInt32LE(session.openFileId >>> 0, 0); // tigerFileId
|
||
// unused1 [4] = 0, unused2 [8-19] = 0 (already zeroed)
|
||
this._sendCommand(socket, session, MMS_SERVER_MEDIA_FOLLOWS, 0, session.mediaIncarnation, startedPayload);
|
||
|
||
this._refreshUdpTransportState(session);
|
||
|
||
if (!session.tcpDataEnabled && !session.udpDataEnabled) {
|
||
this.debugLog('no data transport available', session.id, 'cannot start media');
|
||
return;
|
||
}
|
||
|
||
this._startStream(socket, session);
|
||
}
|
||
|
||
_handleStop(socket, session, stopIncarnation) {
|
||
this.debugLog('stop stream', session.id);
|
||
this._stopStream(session);
|
||
// Echo back the playIncarnation from StopPlaying (prefix2 of that message)
|
||
const inc = stopIncarnation >>> 0;
|
||
this._sendCommand(socket, session, MMS_SERVER_STREAM_STOPPED, 0, inc, Buffer.alloc(0));
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// ASF header parsing (minimal – just packet size and bitrate)
|
||
// -------------------------------------------------------------------------
|
||
|
||
_parseAsfHeader(session) {
|
||
if (!session.mediaPath) return;
|
||
|
||
try {
|
||
const stat = this.wtvshared.fs.statSync(session.mediaPath);
|
||
session.asfFileSize = stat.size;
|
||
|
||
// Read first 64 KB to find ASF header objects
|
||
const readLen = Math.min(65536, stat.size);
|
||
const buf = Buffer.alloc(readLen);
|
||
const fd = this.wtvshared.fs.openSync(session.mediaPath, 'r');
|
||
this.wtvshared.fs.readSync(fd, buf, 0, readLen, 0);
|
||
this.wtvshared.fs.closeSync(fd);
|
||
|
||
// ASF Header Object GUID: {75B22630-668E-11CF-A6D9-00AA0062CE6C}
|
||
const ASF_HEADER_GUID = Buffer.from([
|
||
0x30, 0x26, 0xB2, 0x75, 0x8E, 0x66, 0xCF, 0x11,
|
||
0xA6, 0xD9, 0x00, 0xAA, 0x00, 0x62, 0xCE, 0x6C
|
||
]);
|
||
|
||
if (!buf.slice(0, 16).equals(ASF_HEADER_GUID)) {
|
||
this.debugLog('not a valid ASF file (bad header GUID)', session.id);
|
||
session.asfDataOffset = 0;
|
||
return;
|
||
}
|
||
|
||
// ASF Header Object: GUID(16) + Size(8) + NumHeaders(4) + Reserved(2)
|
||
const headerSize = Number(buf.readBigUInt64LE(16)); // total header size including GUID+size fields
|
||
|
||
// Walk sub-objects inside the header
|
||
// ASF File Properties Object GUID: {8CABDCA1-A947-11CF-8EE4-00C00C205365}
|
||
const ASF_FILE_PROPS_GUID = Buffer.from([
|
||
0xA1, 0xDC, 0xAB, 0x8C, 0x47, 0xA9, 0xCF, 0x11,
|
||
0x8E, 0xE4, 0x00, 0xC0, 0x0C, 0x20, 0x53, 0x65
|
||
]);
|
||
|
||
// ASF Stream Properties Object GUID: {B7DC0791-A9B7-11CF-8EE6-00C00C205365}
|
||
const ASF_STREAM_PROPS_GUID = Buffer.from([
|
||
0x91, 0x07, 0xDC, 0xB7, 0xB7, 0xA9, 0xCF, 0x11,
|
||
0x8E, 0xE6, 0x00, 0xC0, 0x0C, 0x20, 0x53, 0x65
|
||
]);
|
||
|
||
// ASF Data Object GUID: {75B22636-668E-11CF-A6D9-00AA0062CE6C}
|
||
const ASF_DATA_GUID = Buffer.from([
|
||
0x36, 0x26, 0xB2, 0x75, 0x8E, 0x66, 0xCF, 0x11,
|
||
0xA6, 0xD9, 0x00, 0xAA, 0x00, 0x62, 0xCE, 0x6C
|
||
]);
|
||
|
||
let offset = 30; // skip Header Object's GUID(16)+Size(8)+NumHeaders(4)+Reserved(2)
|
||
let maxBitrate = 0;
|
||
let packetSize = 0;
|
||
let dataPacketCount = 0;
|
||
let durationSec = 0;
|
||
|
||
while (offset + 24 <= readLen) {
|
||
const objGuid = buf.slice(offset, offset + 16);
|
||
const objSize = Number(buf.readBigUInt64LE(offset + 16));
|
||
if (objSize < 24) break;
|
||
|
||
if (objGuid.equals(ASF_FILE_PROPS_GUID) && objSize >= 104) {
|
||
// ASF File Properties layout after the 24-byte object header:
|
||
// FileId(16), FileSize(8), CreationDate(8), DataPacketsCount(8),
|
||
// PlayDuration(8), SendDuration(8), Preroll(8), Flags(4),
|
||
// MinDataPacketSize(4), MaxDataPacketSize(4), MaxBitrate(4)
|
||
const playDuration100ns = Number(buf.readBigUInt64LE(offset + 64));
|
||
const prerollMs = Number(buf.readBigUInt64LE(offset + 80));
|
||
dataPacketCount = Number(buf.readBigUInt64LE(offset + 56));
|
||
packetSize = buf.readUInt32LE(offset + 96);
|
||
maxBitrate = buf.readUInt32LE(offset + 100);
|
||
const rawDurationSec = playDuration100ns > 0 ? (playDuration100ns / 10000000.0) : 0;
|
||
durationSec = Math.max(0, rawDurationSec - (prerollMs / 1000.0));
|
||
}
|
||
|
||
if (objGuid.equals(ASF_DATA_GUID)) {
|
||
session.asfDataOffset = offset;
|
||
break;
|
||
}
|
||
|
||
offset += objSize;
|
||
}
|
||
|
||
// If we didn't hit the Data GUID in the read window, the data starts right after the header
|
||
if (session.asfDataOffset === 0 && headerSize > 0 && headerSize < session.asfFileSize) {
|
||
session.asfDataOffset = headerSize;
|
||
}
|
||
|
||
if (packetSize > 0) session.asfPacketSize = packetSize;
|
||
if (maxBitrate > 0) session.bitrateBps = maxBitrate;
|
||
if (dataPacketCount > 0) session.asfPacketCount = dataPacketCount;
|
||
if (durationSec > 0) session.asfDurationSec = durationSec;
|
||
|
||
this.debugLog('ASF parsed', session.id,
|
||
'packetSize=' + session.asfPacketSize,
|
||
'bitrate=' + session.bitrateBps + 'bps',
|
||
'packetCount=' + session.asfPacketCount,
|
||
'durationSec=' + session.asfDurationSec,
|
||
'dataOffset=0x' + session.asfDataOffset.toString(16));
|
||
|
||
} catch (e) {
|
||
console.error('[WTVMMS] ASF parse error', session.id, e.message);
|
||
session.asfDataOffset = 0;
|
||
}
|
||
}
|
||
|
||
// Read the raw ASF header bytes (everything before the first Data Object)
|
||
_readAsfHeader(session) {
|
||
if (!session.mediaPath || session.asfDataOffset === 0) return null;
|
||
try {
|
||
const headerLength = Math.min(session.asfFileSize, session.asfDataOffset + 50);
|
||
const buf = Buffer.alloc(headerLength);
|
||
const fd = this.wtvshared.fs.openSync(session.mediaPath, 'r');
|
||
this.wtvshared.fs.readSync(fd, buf, 0, headerLength, 0);
|
||
this.wtvshared.fs.closeSync(fd);
|
||
return buf;
|
||
} catch (e) {
|
||
console.error('[WTVMMS] ASF header read error', e.message);
|
||
return null;
|
||
}
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Paced streaming
|
||
// -------------------------------------------------------------------------
|
||
|
||
|
||
_computePacketIntervalMs(session) {
|
||
let intervalMs = MIN_PACING_INTERVAL_MS;
|
||
const packetSizeForPacing = (session.effectivePacketSize > 0)
|
||
? session.effectivePacketSize
|
||
: session.asfPacketSize;
|
||
if (session.bitrateBps > 0 && packetSizeForPacing > 0) {
|
||
intervalMs = Math.max(
|
||
MIN_PACING_INTERVAL_MS,
|
||
(packetSizeForPacing * 8 / session.bitrateBps) * 1000
|
||
);
|
||
}
|
||
|
||
const defaultMultiplier = (session.isLegacyClient && !session.udpDataEnabled) ? 0.94 : 1.0;
|
||
const configuredMultiplier = Number(this.service_config.pacing_multiplier);
|
||
const pacingMultiplier = Number.isFinite(configuredMultiplier) && configuredMultiplier > 0
|
||
? configuredMultiplier
|
||
: defaultMultiplier;
|
||
|
||
return Math.max(MIN_PACING_INTERVAL_MS, Math.floor(intervalMs * pacingMultiplier));
|
||
}
|
||
|
||
_startStream(socket, session) {
|
||
session.streaming = true;
|
||
session.paused = false;
|
||
session.waitingForDrain = false;
|
||
session.effectivePacketSize = 0;
|
||
session.recentPacketSizes = [];
|
||
session.smoothedPacketSize = 0;
|
||
session.pacingSamples = 0;
|
||
session.resendPacketCache.clear();
|
||
session.resendPacketOrder = [];
|
||
session.streamStartWallMs = Date.now();
|
||
session.streamStartPacketIndex = session.mediaPacketsTx || 0;
|
||
|
||
const intervalMs = this._computePacketIntervalMs(session);
|
||
session.packetIntervalMs = intervalMs;
|
||
session.nextPacketDueAt = Date.now();
|
||
|
||
let burstPrestartMs = typeof this.service_config.burst_prestart_ms === 'number'
|
||
? this.service_config.burst_prestart_ms
|
||
: 0;
|
||
let burstMultiplier = typeof this.service_config.burst_multiplier === 'number'
|
||
? this.service_config.burst_multiplier
|
||
: 1;
|
||
|
||
const burstEnabled = this.service_config.enable_burst === true;
|
||
if (!burstEnabled) {
|
||
burstPrestartMs = 0;
|
||
burstMultiplier = 1;
|
||
}
|
||
const burstPacketCount = burstPrestartMs > 0 ? Math.ceil(burstPrestartMs / intervalMs) : 0;
|
||
|
||
session.burstPacketsRemaining = Math.max(0, burstPacketCount - 1);
|
||
session.burstMultiplier = burstMultiplier > 1 ? burstMultiplier : 1;
|
||
|
||
this.debugLog('stream pacing', session.id,
|
||
'interval=' + intervalMs + 'ms',
|
||
'burstPackets=' + session.burstPacketsRemaining,
|
||
'burstMultiplier=' + session.burstMultiplier);
|
||
|
||
try {
|
||
session.asfFd = this.wtvshared.fs.openSync(session.mediaPath, 'r');
|
||
} catch (e) {
|
||
console.error('[WTVMMS] failed to open media for streaming', e.message);
|
||
session.streaming = false;
|
||
return;
|
||
}
|
||
|
||
const dataStart = session.asfDataOffset + 50;
|
||
if (typeof session._nextReadPos === 'number' && Number.isFinite(session._nextReadPos)) {
|
||
session._readPos = Math.max(dataStart, Math.min(session._nextReadPos, session.asfFileSize));
|
||
} else {
|
||
// Position read head at the start of the ASF Data Object payload.
|
||
// The Data Object itself has a 50-byte preamble before the first packet:
|
||
// GUID(16) + Size(8) + FileId(16) + TotalDataPackets(8) + Reserved(2) = 50 bytes
|
||
session._readPos = dataStart;
|
||
}
|
||
delete session._nextReadPos;
|
||
delete session._nextPacketNumber;
|
||
|
||
// Record which audio timestamp (ms) we started streaming from.
|
||
// Used by _estimateEosDelayMs to know how much audio remains.
|
||
const startByteOffset = Math.max(0, session._readPos - dataStart);
|
||
const startPacketIdx = session.asfPacketSize > 0
|
||
? Math.floor(startByteOffset / session.asfPacketSize)
|
||
: 0;
|
||
session.streamStartAudioMs = (session.asfPacketCount > 0 && session.asfDurationSec > 0)
|
||
? Math.floor((startPacketIdx / session.asfPacketCount) * session.asfDurationSec * 1000)
|
||
: 0;
|
||
|
||
this._sendNextPacket(socket, session);
|
||
}
|
||
|
||
_stopStream(session) {
|
||
session.streaming = false;
|
||
if (session.pacingTimer) {
|
||
clearTimeout(session.pacingTimer);
|
||
session.pacingTimer = null;
|
||
}
|
||
if (session.drainHandler && session.socket && !session.socket.destroyed) {
|
||
session.socket.off('drain', session.drainHandler);
|
||
}
|
||
session.drainHandler = null;
|
||
session.waitingForDrain = false;
|
||
if (session.asfFd !== null) {
|
||
try { this.wtvshared.fs.closeSync(session.asfFd); } catch(_){}
|
||
session.asfFd = null;
|
||
}
|
||
session.packetQueue = [];
|
||
session.resendPacketCache.clear();
|
||
session.resendPacketOrder = [];
|
||
this._clearPendingEos(session);
|
||
}
|
||
|
||
_clearPendingEos(session) {
|
||
if (session.eosTimer) {
|
||
clearTimeout(session.eosTimer);
|
||
session.eosTimer = null;
|
||
}
|
||
if (session.eosPingTimer) {
|
||
clearInterval(session.eosPingTimer);
|
||
session.eosPingTimer = null;
|
||
}
|
||
}
|
||
|
||
_estimateEosDelayMs(socket, session) {
|
||
if (!session.isLegacyClient) {
|
||
return 0;
|
||
}
|
||
|
||
const cfgTail = Number(this.service_config.webtv_eos_tail_ms);
|
||
const cfgMin = Number(this.service_config.webtv_eos_min_delay_ms);
|
||
const cfgMax = Number(this.service_config.webtv_eos_max_delay_ms);
|
||
|
||
const tailMs = Number.isFinite(cfgTail) ? Math.max(0, Math.floor(cfgTail)) : 5000;
|
||
const minMs = Number.isFinite(cfgMin) ? Math.max(0, Math.floor(cfgMin)) : 600;
|
||
|
||
const totalDurationMs = Math.floor((session.asfDurationSec || 0) * 1000);
|
||
// Default maxMs to the full file duration + tail + 3s safety headroom.
|
||
// This ensures we never clamp a long file to an arbitrary fixed ceiling.
|
||
const defaultMaxMs = Math.max(10000, totalDurationMs + tailMs + 3000);
|
||
const maxMs = Number.isFinite(cfgMax) ? Math.max(minMs, Math.floor(cfgMax)) : defaultMaxMs;
|
||
|
||
// The client plays audio in real-time at 1x speed starting from wherever
|
||
// the stream began (seeked or from the top). After wallElapsedMs of real
|
||
// time the client has consumed that many ms of audio from the seek point.
|
||
// Remaining = (total duration from seek point) - wallElapsed.
|
||
const seekOffsetMs = session.streamStartAudioMs || 0;
|
||
const streamDurationMs = totalDurationMs - seekOffsetMs; // audio from seek → end
|
||
const wallElapsedMs = session.streamStartWallMs
|
||
? (Date.now() - session.streamStartWallMs)
|
||
: 0;
|
||
const remainingMs = Math.max(0, streamDurationMs - wallElapsedMs);
|
||
|
||
const delayMs = Math.max(minMs, Math.min(maxMs, remainingMs + tailMs));
|
||
|
||
this.debugLog('eos delay calc', session.id,
|
||
'totalDurationMs=' + totalDurationMs,
|
||
'seekOffsetMs=' + seekOffsetMs,
|
||
'wallElapsedMs=' + wallElapsedMs,
|
||
'remainingMs=' + remainingMs,
|
||
'delayMs=' + delayMs);
|
||
|
||
return delayMs;
|
||
}
|
||
|
||
_waitForDrain(socket, session) {
|
||
if (session.waitingForDrain || socket.destroyed) return;
|
||
|
||
session.waitingForDrain = true;
|
||
session.drainHandler = () => {
|
||
session.waitingForDrain = false;
|
||
session.drainHandler = null;
|
||
if (!session.streaming || socket.destroyed) return;
|
||
this.debugLog('socket drain', session.id, 'writableLength=' + socket.writableLength);
|
||
this._scheduleNextPacket(session, socket);
|
||
};
|
||
socket.once('drain', session.drainHandler);
|
||
}
|
||
|
||
_scheduleNextPacket(session, socket) {
|
||
if (!session.streaming || session.paused) return;
|
||
|
||
const writeOk = this._flushQueuedPackets(socket, session);
|
||
if (!writeOk) {
|
||
this._waitForDrain(socket, session);
|
||
return;
|
||
}
|
||
|
||
let intervalMs = session.packetIntervalMs || this._computePacketIntervalMs(session);
|
||
|
||
if (session.burstPacketsRemaining > 0 && session.burstMultiplier > 1) {
|
||
intervalMs = Math.max(
|
||
MIN_PACING_INTERVAL_MS,
|
||
Math.floor(intervalMs / session.burstMultiplier)
|
||
);
|
||
session.burstPacketsRemaining--;
|
||
}
|
||
|
||
const now = Date.now();
|
||
session.nextPacketDueAt = (session.nextPacketDueAt || now) + intervalMs;
|
||
const delayMs = Math.max(MIN_PACING_INTERVAL_MS, session.nextPacketDueAt - now);
|
||
|
||
if (session.mediaPacketsTx < 4 || (session.mediaPacketsTx % 64) === 0) {
|
||
this.debugLog('pace next', session.id,
|
||
'in=' + delayMs + 'ms',
|
||
'packet=' + session.mediaPacketsTx,
|
||
'readPos=0x' + session._readPos.toString(16));
|
||
}
|
||
|
||
session.pacingTimer = setTimeout(() => {
|
||
session.pacingTimer = null;
|
||
this._sendNextPacket(socket, session);
|
||
}, delayMs);
|
||
}
|
||
|
||
_sendNextPacket(socket, session) {
|
||
if (!session.streaming || socket.destroyed) {
|
||
this._stopStream(session);
|
||
return;
|
||
}
|
||
if (session.waitingForDrain) {
|
||
return;
|
||
}
|
||
|
||
const pktSize = session.asfPacketSize;
|
||
const buf = Buffer.alloc(pktSize);
|
||
|
||
let bytesRead = 0;
|
||
try {
|
||
bytesRead = this.wtvshared.fs.readSync(session.asfFd, buf, 0, pktSize, session._readPos);
|
||
} catch (e) {
|
||
this.debugLog('read error', session.id, e.message);
|
||
this._endStream(socket, session);
|
||
return;
|
||
}
|
||
|
||
if (bytesRead === 0) {
|
||
// End of file
|
||
this._endStream(socket, session);
|
||
return;
|
||
}
|
||
|
||
session._readPos += bytesRead;
|
||
|
||
const packetBuf = bytesRead < pktSize ? buf.slice(0, bytesRead) : buf;
|
||
const strippedBuf = this._stripAsfPacketPadding(packetBuf, session);
|
||
|
||
const txSize = strippedBuf.length;
|
||
|
||
// Track transmitted packet sizes for adaptive pacing
|
||
if (txSize > 0) {
|
||
session.recentPacketSizes.push(txSize);
|
||
if (session.recentPacketSizes.length > 20) {
|
||
session.recentPacketSizes.shift();
|
||
}
|
||
|
||
// Compute smoothed (median) packet size from recent samples
|
||
const sorted = [...session.recentPacketSizes].sort((a, b) => a - b);
|
||
session.smoothedPacketSize = sorted[Math.floor(sorted.length / 2)];
|
||
session.pacingSamples = session.recentPacketSizes.length;
|
||
|
||
// Only adjust pacing interval if the transmitted size differs significantly from expected
|
||
// (e.g., due to padding stripping). Use smoothed size to avoid jitter.
|
||
const paceSize = session.effectivePacketSize > 0 ? session.effectivePacketSize : session.asfPacketSize;
|
||
if (Math.abs(txSize - paceSize) > 10) {
|
||
session.effectivePacketSize = txSize;
|
||
const oldInterval = session.packetIntervalMs;
|
||
session.packetIntervalMs = this._computePacketIntervalMs(session);
|
||
if (session.mediaPacketsTx <= 4 || (session.mediaPacketsTx % 64) === 0) {
|
||
this.debugLog('pace adjust', session.id,
|
||
'txSize=' + txSize,
|
||
'old=' + oldInterval + 'ms',
|
||
'new=' + session.packetIntervalMs + 'ms',
|
||
'smoothed=' + session.smoothedPacketSize);
|
||
}
|
||
session.nextPacketDueAt = Date.now();
|
||
}
|
||
}
|
||
|
||
const afFlags = this._nextDataFlags(session);
|
||
const writeOk = this._sendDataPacket(socket, session, session.mediaPacketId, afFlags, strippedBuf, false);
|
||
if (!writeOk) {
|
||
this._waitForDrain(socket, session);
|
||
return;
|
||
}
|
||
this._scheduleNextPacket(session, socket);
|
||
}
|
||
|
||
_endStream(socket, session) {
|
||
const eosDelayMs = this._estimateEosDelayMs(socket, session);
|
||
this.debugLog('end of stream', session.id, 'eosDelayMs=' + eosDelayMs);
|
||
|
||
this._stopStream(session);
|
||
|
||
if (eosDelayMs <= 0) {
|
||
this._sendCommand(socket, session, MMS_SERVER_STREAM_STOPPED, 0, session.mediaIncarnation || 0, Buffer.alloc(0));
|
||
return;
|
||
}
|
||
|
||
const cfgPingIntervalMs = Number(this.service_config.webtv_eos_ping_interval_ms);
|
||
const eosPingIntervalMs = Number.isFinite(cfgPingIntervalMs)
|
||
? Math.max(0, Math.floor(cfgPingIntervalMs))
|
||
: 3000;
|
||
|
||
if (eosPingIntervalMs > 0) {
|
||
session.eosPingTimer = setInterval(() => {
|
||
if (socket.destroyed || session.streaming || !session.eosTimer) {
|
||
if (session.eosPingTimer) {
|
||
clearInterval(session.eosPingTimer);
|
||
session.eosPingTimer = null;
|
||
}
|
||
return;
|
||
}
|
||
// LinkMacToViewerPing (MID=0x0004001B) over MMST command channel.
|
||
this._sendCommand(socket, session, MMS_SERVER_KEEPALIVE, 0, 0, Buffer.alloc(0));
|
||
this.debugLog('eos keepalive ping', session.id, 'intervalMs=' + eosPingIntervalMs);
|
||
}, eosPingIntervalMs);
|
||
}
|
||
|
||
session.eosTimer = setTimeout(() => {
|
||
session.eosTimer = null;
|
||
if (session.eosPingTimer) {
|
||
clearInterval(session.eosPingTimer);
|
||
session.eosPingTimer = null;
|
||
}
|
||
if (!socket.destroyed && !session.streaming) {
|
||
this._sendCommand(socket, session, MMS_SERVER_STREAM_STOPPED, 0, session.mediaIncarnation || 0, Buffer.alloc(0));
|
||
}
|
||
}, eosDelayMs);
|
||
}
|
||
|
||
// Parse one fixed-size ASF data packet, remove trailing padding, and return
|
||
// the compact payload. Per MMS spec §3.2.5.11.1, Padding Data SHOULD be
|
||
// stripped before MMST encapsulation and the Padding Length field MUST be
|
||
// updated to reflect the removal.
|
||
_stripAsfPacketPadding(buf, session) {
|
||
if (!buf || buf.length < 6) return buf;
|
||
|
||
// Locate Length Type Flags byte, skipping optional error correction header.
|
||
let ltFlagsOffset = 0;
|
||
const byte0 = buf[0];
|
||
if (byte0 & 0x80) {
|
||
// Error Correction Present (bit 7 = 1).
|
||
// Bits 5-6 = EC Length Type; 00 means bits 0-3 carry the data length.
|
||
const ecLenType = (byte0 >> 5) & 0x03;
|
||
const ecDataLen = ecLenType === 0 ? (byte0 & 0x0F) : 0;
|
||
ltFlagsOffset = 1 + ecDataLen;
|
||
}
|
||
|
||
if (ltFlagsOffset + 2 > buf.length) return buf;
|
||
|
||
const ltFlags = buf[ltFlagsOffset];
|
||
const padLenType = (ltFlags >> 3) & 0x03; // bits 3-4
|
||
const seqType = (ltFlags >> 1) & 0x03; // bits 1-2
|
||
const pktLenType = (ltFlags >> 5) & 0x03; // bits 5-6
|
||
|
||
if (padLenType === 0) return buf; // no Padding Length field → nothing to strip
|
||
|
||
// Byte widths for each type value (0=absent,1=BYTE,2=WORD,3=DWORD)
|
||
const typeSize = [0, 1, 2, 4];
|
||
|
||
// After LTFlags(1) + PropFlags(1) come the optional variable-width fields.
|
||
let fieldOffset = ltFlagsOffset + 2;
|
||
fieldOffset += typeSize[pktLenType]; // optional Packet Length
|
||
fieldOffset += typeSize[seqType]; // optional Sequence
|
||
|
||
const padLenSize = typeSize[padLenType];
|
||
if (fieldOffset + padLenSize > buf.length) return buf;
|
||
|
||
// Read the declared padding length.
|
||
let paddingLen;
|
||
if (padLenSize === 1) paddingLen = buf.readUInt8(fieldOffset);
|
||
else if (padLenSize === 2) paddingLen = buf.readUInt16LE(fieldOffset);
|
||
else paddingLen = buf.readUInt32LE(fieldOffset);
|
||
|
||
if (paddingLen === 0) return buf; // already no padding
|
||
|
||
const configuredMinStrip = Number(this.service_config.asf_padding_strip_min_bytes);
|
||
const minStripBytes = Number.isFinite(configuredMinStrip) && configuredMinStrip >= 0
|
||
? Math.floor(configuredMinStrip)
|
||
: 8;
|
||
if (paddingLen < minStripBytes) {
|
||
return buf;
|
||
}
|
||
|
||
const newLen = buf.length - paddingLen;
|
||
// Sanity: compacted packet must still hold the header fields we parsed.
|
||
if (newLen < fieldOffset + padLenSize + 6) {
|
||
this.debugLog('asf strip sanity fail', session && session.id,
|
||
'bufLen=' + buf.length, 'paddingLen=' + paddingLen);
|
||
return buf;
|
||
}
|
||
|
||
// Copy the packet, zero the Padding Length field, and trim trailing padding.
|
||
const out = Buffer.from(buf.buffer, buf.byteOffset, buf.length);
|
||
if (padLenSize === 1) out.writeUInt8(0, fieldOffset);
|
||
else if (padLenSize === 2) out.writeUInt16LE(0, fieldOffset);
|
||
else out.writeUInt32LE(0, fieldOffset);
|
||
|
||
return out.slice(0, newLen);
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Packet builders
|
||
// -------------------------------------------------------------------------
|
||
|
||
_buildCommandPacket(session, cmdId, prefix1, prefix2, payloadBuf) {
|
||
const payload = payloadBuf || Buffer.alloc(0);
|
||
const paddedPayloadLength = (payload.length + 7) & ~7;
|
||
const payloadBlocks = paddedPayloadLength >> 3;
|
||
const totalLength = MMS_COMMAND_HEADER_SIZE + paddedPayloadLength;
|
||
const packet = Buffer.alloc(totalLength);
|
||
|
||
packet.writeUInt32LE(1, 0);
|
||
packet.writeUInt32LE(MMS_MAGIC, 4);
|
||
packet.writeUInt32LE(totalLength - 16, 8);
|
||
packet.writeUInt32LE(MMS_PROTOCOL, 12);
|
||
packet.writeUInt32LE(payloadBlocks + 6, 16);
|
||
packet.writeUInt16LE(session.commandSeq++ & 0xFFFF, 20);
|
||
packet.writeUInt16LE(0, 22);
|
||
packet.writeUInt32LE(0, 24);
|
||
packet.writeUInt32LE(0, 28);
|
||
packet.writeUInt32LE(payloadBlocks + 2, 32);
|
||
packet.writeUInt32LE((MMS_DIRECTION_TO_CLIENT << 16) | (cmdId & 0xffff), 36);
|
||
packet.writeUInt32LE(prefix1 >>> 0, 40);
|
||
packet.writeUInt32LE(prefix2 >>> 0, 44);
|
||
payload.copy(packet, MMS_COMMAND_HEADER_SIZE);
|
||
|
||
return packet;
|
||
}
|
||
|
||
_sendCommand(socket, session, cmdId, prefix1, prefix2, payloadBuf) {
|
||
if (socket.destroyed) return;
|
||
|
||
const pkt = this._buildCommandPacket(session, cmdId, prefix1, prefix2, payloadBuf);
|
||
this.debugLog('tx cmd', session.id, '0x' + cmdId.toString(16), 'len=' + pkt.length), 'to='+socket.remoteAddress+":"+socket.remotePort;
|
||
socket.write(pkt);
|
||
session.bytesTx += pkt.length;
|
||
if (cmdId === MMS_SERVER_KEEPALIVE) {
|
||
session.lastKeepaliveTxMs = Date.now();
|
||
}
|
||
}
|
||
|
||
_handleClientKeepalive(socket, session, prefix1, prefix2) {
|
||
// During delayed EOS we already transmit periodic keepalives; if we also
|
||
// answer every client keepalive immediately, both sides can get into a
|
||
// noisy ping-pong loop. Throttle replies while EOS wait is active.
|
||
if (session.eosTimer) {
|
||
const cfgPingIntervalMs = Number(this.service_config.webtv_eos_ping_interval_ms);
|
||
const eosPingIntervalMs = Number.isFinite(cfgPingIntervalMs)
|
||
? Math.max(1000, Math.floor(cfgPingIntervalMs))
|
||
: 3000;
|
||
const minReplyGapMs = Math.max(1000, eosPingIntervalMs - 500);
|
||
const now = Date.now();
|
||
const sinceLastTx = now - (session.lastKeepaliveTxMs || 0);
|
||
|
||
if (sinceLastTx < minReplyGapMs) {
|
||
this.debugLog('keepalive throttled', session.id,
|
||
'sinceLastTxMs=' + sinceLastTx,
|
||
'minReplyGapMs=' + minReplyGapMs,
|
||
'p1=0x' + (prefix1 >>> 0).toString(16),
|
||
'p2=0x' + (prefix2 >>> 0).toString(16));
|
||
return;
|
||
}
|
||
}
|
||
|
||
this._sendCommand(socket, session, MMS_SERVER_KEEPALIVE, 0, 0, Buffer.alloc(0));
|
||
}
|
||
|
||
_sendCommandBatch(socket, session, commands) {
|
||
if (socket.destroyed || !Array.isArray(commands) || commands.length === 0) return;
|
||
|
||
const packets = [];
|
||
for (const command of commands) {
|
||
const pkt = this._buildCommandPacket(
|
||
session,
|
||
command.cmdId,
|
||
command.prefix1,
|
||
command.prefix2,
|
||
command.payloadBuf
|
||
);
|
||
this.debugLog('tx cmd', session.id, '0x' + command.cmdId.toString(16), 'len=' + pkt.length);
|
||
packets.push(pkt);
|
||
session.bytesTx += pkt.length;
|
||
}
|
||
|
||
socket.write(Buffer.concat(packets));
|
||
}
|
||
|
||
_sendHeaderPackets(socket, session, headerBuf) {
|
||
const configuredChunkSize = Number(this.service_config.header_chunk_size);
|
||
const maxPayload = Math.max(
|
||
256,
|
||
Math.min(
|
||
0xFFFF - 8,
|
||
Number.isFinite(configuredChunkSize) && configuredChunkSize > 0 ? Math.floor(configuredChunkSize) : 1400
|
||
)
|
||
);
|
||
let offset = 0;
|
||
let count = 0;
|
||
|
||
while (offset < headerBuf.length && !socket.destroyed) {
|
||
const remaining = headerBuf.length - offset;
|
||
const chunkLength = Math.min(maxPayload, remaining);
|
||
const flags = offset + chunkLength < headerBuf.length ? MMS_HEADER_FLAG_MORE : MMS_HEADER_FLAG_FINAL;
|
||
this._sendDataPacket(socket, session, session.headerPacketId, flags, headerBuf.slice(offset, offset + chunkLength), true);
|
||
offset += chunkLength;
|
||
count++;
|
||
}
|
||
|
||
this.debugLog('header sent', session.id,
|
||
'packets=' + count,
|
||
'bytes=' + headerBuf.length,
|
||
'headerPacketId=0x' + session.headerPacketId.toString(16));
|
||
}
|
||
|
||
_sendDataPacket(socket, session, packetId, flags, payloadBuf, isHeaderPacket = false) {
|
||
if (socket.destroyed) return false;
|
||
|
||
const payload = payloadBuf || Buffer.alloc(0);
|
||
const totalLength = 8 + payload.length;
|
||
const pkt = Buffer.alloc(totalLength);
|
||
|
||
// LocationId: independent per-type sequence starting at 0.
|
||
// For header payloads: 0, 1, 2, ... per header stream.
|
||
// For data payloads: ASF data packet number starting at 0.
|
||
const isHeader = isHeaderPacket;
|
||
const locationId = isHeader ? session.headerPacketsTx : session.mediaPacketsTx;
|
||
|
||
pkt.writeUInt32LE(locationId, 0);
|
||
pkt.writeUInt8(packetId & 0xff, 4);
|
||
pkt.writeUInt8(flags & 0xff, 5);
|
||
pkt.writeUInt16LE(totalLength, 6);
|
||
payload.copy(pkt, 8);
|
||
|
||
if (isHeader) session.headerPacketsTx++;
|
||
else if (packetId === session.mediaPacketId) session.mediaPacketsTx++;
|
||
|
||
if (!isHeader && packetId === session.mediaPacketId) {
|
||
const resendSeq = session.dataAfMode === 'sequence'
|
||
? (session.lastDataSequenceTx >>> 0)
|
||
: (locationId >>> 0);
|
||
this._cacheResendPacket(session, resendSeq, pkt);
|
||
}
|
||
|
||
const shouldLog = isHeader
|
||
? true
|
||
: session.mediaPacketsTx <= 4 || (session.mediaPacketsTx % 64) === 0;
|
||
|
||
if (shouldLog) {
|
||
const previewLength = Math.min(8, payload.length);
|
||
const preview = previewLength > 0 ? payload.subarray(0, previewLength).toString('hex') : '';
|
||
this.debugLog('tx data',
|
||
(session.udpDataEnabled ? 'UDP' : (session.tcpDataEnabled) ? 'TCP' : '???'),
|
||
'locationId=' + locationId,
|
||
'packetId=0x' + packetId.toString(16),
|
||
'flags=0x' + flags.toString(16),
|
||
'packetSizeField=' + totalLength,
|
||
'payload=' + payload.length,
|
||
preview ? 'preview=' + preview : '');
|
||
}
|
||
|
||
if (session.udpDataEnabled) {
|
||
this._sendUdpDataPacket(session, pkt, packetId, locationId, flags, payload.length);
|
||
return true;
|
||
}
|
||
|
||
if (isHeader) {
|
||
const writeOk = socket.write(pkt);
|
||
if (!writeOk && shouldLog) {
|
||
this.debugLog('socket buffered', session.id,
|
||
'packetId=0x' + packetId.toString(16),
|
||
'writableLength=' + socket.writableLength);
|
||
}
|
||
session.bytesTx += pkt.length;
|
||
return writeOk;
|
||
}
|
||
|
||
session.packetQueue.push({
|
||
buffer: pkt,
|
||
generation: session.seekGeneration,
|
||
});
|
||
return true;
|
||
}
|
||
|
||
_sendUdpDataPacket(session, packetBuf, packetId, locationId, flags, payloadLen) {
|
||
if (!this.udpServer || !session || !session.clientEndpointIp || !session.clientPort) {
|
||
this.debugLog('udp send skipped', session && session.id,
|
||
'packetId=0x' + packetId.toString(16),
|
||
'peer=' + ((session && session.clientEndpointIp) || '(none)'),
|
||
'port=' + ((session && session.clientPort) || 0));
|
||
return;
|
||
}
|
||
|
||
const configuredProbePackets = Number(this.service_config.udp_probe_alt_ports_packets);
|
||
const probeAltPortsPackets = Number.isFinite(configuredProbePackets)
|
||
? Math.max(0, Math.floor(configuredProbePackets))
|
||
: 96;
|
||
|
||
const primaryPort = session.clientPort >>> 0;
|
||
const targetPorts = [primaryPort];
|
||
|
||
if (!session.udpPeerLearnedPort && probeAltPortsPackets > 0 && session.mediaPacketsTx <= probeAltPortsPackets) {
|
||
const alternates = Array.isArray(session.clientPortCandidates)
|
||
? session.clientPortCandidates
|
||
: [];
|
||
for (const candidate of alternates) {
|
||
const p = this._sanitizeUdpPort(candidate);
|
||
if (p > 0 && targetPorts.indexOf(p) < 0) {
|
||
targetPorts.push(p);
|
||
}
|
||
}
|
||
}
|
||
|
||
const host = session.clientEndpointIp;
|
||
for (const port of targetPorts) {
|
||
this.udpServer.send(packetBuf, 0, packetBuf.length, port, host, (err) => {
|
||
if (err) {
|
||
this.debugLog('udp send error', session.id,
|
||
'packetId=0x' + packetId.toString(16),
|
||
'port=' + port,
|
||
err.message);
|
||
}
|
||
});
|
||
}
|
||
|
||
session.bytesTx += packetBuf.length;
|
||
}
|
||
|
||
_cacheResendPacket(session, sequenceNumber, packetBuf) {
|
||
if (!session || !Buffer.isBuffer(packetBuf)) return;
|
||
|
||
const configuredCacheSize = Number(this.service_config.udp_retransmit_cache_size);
|
||
const maxCacheSize = Number.isFinite(configuredCacheSize)
|
||
? Math.max(128, Math.floor(configuredCacheSize))
|
||
: 4096;
|
||
|
||
const seq = sequenceNumber >>> 0;
|
||
if (!session.resendPacketCache.has(seq)) {
|
||
session.resendPacketOrder.push(seq);
|
||
}
|
||
|
||
session.resendPacketCache.set(seq, {
|
||
packet: Buffer.from(packetBuf),
|
||
ts: Date.now(),
|
||
});
|
||
|
||
while (session.resendPacketOrder.length > maxCacheSize) {
|
||
const dropSeq = session.resendPacketOrder.shift();
|
||
session.resendPacketCache.delete(dropSeq);
|
||
}
|
||
}
|
||
|
||
_lookupResendPacket(session, sequenceNumber) {
|
||
if (!session) return null;
|
||
|
||
const seq = sequenceNumber >>> 0;
|
||
const direct = session.resendPacketCache.get(seq);
|
||
if (direct && Buffer.isBuffer(direct.packet)) {
|
||
return direct.packet;
|
||
}
|
||
|
||
// Some clients may only preserve the low 8 bits from AFFlags.
|
||
const low8 = seq & 0xFF;
|
||
for (let i = session.resendPacketOrder.length - 1; i >= 0; i--) {
|
||
const candidateSeq = session.resendPacketOrder[i] >>> 0;
|
||
if ((candidateSeq & 0xFF) !== low8) continue;
|
||
const candidate = session.resendPacketCache.get(candidateSeq);
|
||
if (candidate && Buffer.isBuffer(candidate.packet)) {
|
||
return candidate.packet;
|
||
}
|
||
}
|
||
|
||
return null;
|
||
}
|
||
|
||
_handleUdpDatagram(msg, rinfo) {
|
||
if (!Buffer.isBuffer(msg) || msg.length === 0) {
|
||
return;
|
||
}
|
||
|
||
this._learnUdpPeerPort(rinfo);
|
||
|
||
if (msg.length < 12) {
|
||
this.debugLog('udp rx', rinfo.address + ':' + rinfo.port, 'len=' + msg.length);
|
||
return;
|
||
}
|
||
|
||
const resendReq = this._parseRequestPacketListResend(msg);
|
||
if (!resendReq) {
|
||
this.debugLog('udp rx', rinfo.address + ':' + rinfo.port, 'len=' + msg.length);
|
||
return;
|
||
}
|
||
|
||
const peerIp = this._normalizeIp(rinfo.address);
|
||
let targetSession = null;
|
||
|
||
for (const session of this.sessions.values()) {
|
||
if (!session || !session.udpDataEnabled || !session.requestedUdp) continue;
|
||
if (this._normalizeIp(session.clientEndpointIp) !== peerIp) continue;
|
||
if ((session.clientId >>> 0) !== resendReq.clientId) continue;
|
||
if (((session.openFileId || 0) & 0xFFFF) !== resendReq.sourceId) continue;
|
||
|
||
// If we already learned a client media port, require it to match.
|
||
if (session.clientPort > 0 && session.clientPort !== rinfo.port) {
|
||
continue;
|
||
}
|
||
|
||
targetSession = session;
|
||
break;
|
||
}
|
||
|
||
if (!targetSession) {
|
||
this.debugLog('udp resend no session',
|
||
'from=' + peerIp + ':' + rinfo.port,
|
||
'clientId=0x' + resendReq.clientId.toString(16),
|
||
'sourceId=0x' + resendReq.sourceId.toString(16),
|
||
'count=' + resendReq.packetList.length);
|
||
return;
|
||
}
|
||
|
||
let resent = 0;
|
||
let missing = 0;
|
||
for (const sequenceNumber of resendReq.packetList) {
|
||
const packet = this._lookupResendPacket(targetSession, sequenceNumber);
|
||
if (!packet) {
|
||
missing++;
|
||
continue;
|
||
}
|
||
|
||
this.udpServer.send(packet, 0, packet.length, targetSession.clientPort >>> 0, targetSession.clientEndpointIp, (err) => {
|
||
if (err) {
|
||
this.debugLog('udp resend send error', targetSession.id,
|
||
'seq=' + (sequenceNumber >>> 0),
|
||
err.message);
|
||
}
|
||
});
|
||
targetSession.bytesTx += packet.length;
|
||
resent++;
|
||
}
|
||
|
||
this.debugLog('udp resend', targetSession.id,
|
||
'requested=' + resendReq.packetList.length,
|
||
'resent=' + resent,
|
||
'missing=' + missing,
|
||
'from=' + peerIp + ':' + rinfo.port);
|
||
}
|
||
|
||
_parseRequestPacketListResend(msg) {
|
||
if (!Buffer.isBuffer(msg) || msg.length < 12) return null;
|
||
|
||
const MMS_RETRANSMIT_SIGNATURE = 0xBEEFF00D;
|
||
let littleEndian = true;
|
||
if (msg.readUInt32LE(0) !== MMS_RETRANSMIT_SIGNATURE) {
|
||
if (msg.readUInt32BE(0) !== MMS_RETRANSMIT_SIGNATURE) {
|
||
return null;
|
||
}
|
||
littleEndian = false;
|
||
}
|
||
|
||
const readU32 = (offset) => littleEndian ? msg.readUInt32LE(offset) : msg.readUInt32BE(offset);
|
||
const readU16 = (offset) => littleEndian ? msg.readUInt16LE(offset) : msg.readUInt16BE(offset);
|
||
|
||
const clientId = readU32(4) >>> 0;
|
||
const sourceId = readU16(8) >>> 0;
|
||
const numPackets = readU16(10) >>> 0;
|
||
if (numPackets < 1 || numPackets > 32) {
|
||
return null;
|
||
}
|
||
|
||
const expectedLen = 12 + (numPackets * 4);
|
||
if (msg.length < expectedLen) {
|
||
return null;
|
||
}
|
||
|
||
const packetList = [];
|
||
let offset = 12;
|
||
for (let i = 0; i < numPackets; i++) {
|
||
packetList.push(readU32(offset) >>> 0);
|
||
offset += 4;
|
||
}
|
||
|
||
return {
|
||
clientId,
|
||
sourceId,
|
||
packetList,
|
||
};
|
||
}
|
||
|
||
_flushQueuedPackets(socket, session) {
|
||
if (socket.destroyed || session.packetQueue.length === 0) {
|
||
return true;
|
||
}
|
||
|
||
const currentGen = session.seekGeneration;
|
||
let writeOk = true;
|
||
|
||
while (session.packetQueue.length > 0) {
|
||
const queuedPacket = session.packetQueue[0];
|
||
if (queuedPacket.generation !== currentGen) {
|
||
session.packetQueue.shift();
|
||
continue;
|
||
}
|
||
|
||
writeOk = socket.write(queuedPacket.buffer);
|
||
session.bytesTx += queuedPacket.buffer.length;
|
||
session.packetQueue.shift();
|
||
if (!writeOk) return false;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
// -------------------------------------------------------------------------
|
||
// Individual response payload builders
|
||
// -------------------------------------------------------------------------
|
||
|
||
_utf16leBuffer(str) {
|
||
return Buffer.from((str || '') + '\0', 'utf16le');
|
||
}
|
||
|
||
_decodeUtf16CString(buf) {
|
||
if (!buf || buf.length === 0) return '';
|
||
|
||
let end = buf.length;
|
||
for (let offset = 0; offset + 1 < buf.length; offset += 2) {
|
||
if (buf.readUInt16LE(offset) === 0) {
|
||
end = offset;
|
||
break;
|
||
}
|
||
}
|
||
|
||
return buf.slice(0, end).toString('utf16le').trim();
|
||
}
|
||
|
||
_normalizeIp(remoteAddress) {
|
||
if (!remoteAddress) return '';
|
||
return remoteAddress.startsWith('::ffff:') ? remoteAddress.slice(7) : remoteAddress;
|
||
}
|
||
|
||
_selectTransportString(utf16Str, ansiStr) {
|
||
const a = (utf16Str || '').trim();
|
||
const b = (ansiStr || '').trim();
|
||
const score = (s) => {
|
||
let v = 0;
|
||
if (!s) return v;
|
||
if (s.includes('\\')) v += 2;
|
||
if (/\b(?:CUBDD|MMSU|UDP|TCP)\b/i.test(s)) v += 2;
|
||
if (/\\\d+/.test(s)) v += 1;
|
||
return v;
|
||
};
|
||
return score(a) >= score(b) ? a : b;
|
||
}
|
||
|
||
_parseTransportRequest(transportStr) {
|
||
if (!transportStr) return null;
|
||
const ipMatch = transportStr.match(/\\([^\\]+)/);
|
||
const ip = ipMatch ? ipMatch[1] : '';
|
||
const protocol = this._inferTransportToken(transportStr) || '';
|
||
|
||
let port = 0;
|
||
const candidatePorts = [];
|
||
if (protocol) {
|
||
const tokenPortRegex = new RegExp('\\\\' + protocol + '(?:\\\\(\\d+))+', 'ig');
|
||
let tokenMatch;
|
||
while ((tokenMatch = tokenPortRegex.exec(transportStr)) !== null) {
|
||
const numericParts = tokenMatch[0].match(/\\(\\d+)/g) || [];
|
||
for (const numericPart of numericParts) {
|
||
const parsed = parseInt(numericPart.slice(1), 10);
|
||
if (Number.isInteger(parsed) && parsed > 0 && parsed <= 65535) {
|
||
candidatePorts.push(parsed);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if (candidatePorts.length === 0) {
|
||
const fallbackPortRegex = /\\(?:CUBDD|MMSU|UDP|TCP)\\(\d+)/ig;
|
||
let fallbackMatch;
|
||
while ((fallbackMatch = fallbackPortRegex.exec(transportStr)) !== null) {
|
||
const parsed = parseInt(fallbackMatch[1], 10);
|
||
if (Number.isInteger(parsed) && parsed > 0 && parsed <= 65535) {
|
||
candidatePorts.push(parsed);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Prefer the first valid candidate as initial target, then probe alternates.
|
||
if (candidatePorts.length > 0) {
|
||
port = candidatePorts[0];
|
||
}
|
||
|
||
if (!ip && !protocol && !port) return null;
|
||
return {
|
||
ip,
|
||
protocol,
|
||
port,
|
||
ports: [...new Set(candidatePorts)],
|
||
};
|
||
}
|
||
|
||
_learnUdpPeerPort(rinfo) {
|
||
if (!rinfo || !Number.isInteger(rinfo.port)) return;
|
||
const peerIp = this._normalizeIp(rinfo.address);
|
||
const learnedPort = this._sanitizeUdpPort(rinfo.port);
|
||
if (!peerIp || learnedPort <= 0) return;
|
||
|
||
const matchingSessions = [];
|
||
for (const session of this.sessions.values()) {
|
||
if (!session || !session.requestedUdp) continue;
|
||
if (this._normalizeIp(session.clientEndpointIp) !== peerIp) continue;
|
||
matchingSessions.push(session);
|
||
}
|
||
|
||
if (matchingSessions.length !== 1) {
|
||
return;
|
||
}
|
||
|
||
const session = matchingSessions[0];
|
||
if (session.udpPeerLearnedPort === learnedPort && session.clientPort === learnedPort) {
|
||
return;
|
||
}
|
||
|
||
const previousPort = session.clientPort;
|
||
session.udpPeerLearnedPort = learnedPort;
|
||
session.clientPort = learnedPort;
|
||
if (!Array.isArray(session.clientPortCandidates)) {
|
||
session.clientPortCandidates = [];
|
||
}
|
||
if (session.clientPortCandidates.indexOf(learnedPort) < 0) {
|
||
session.clientPortCandidates.unshift(learnedPort);
|
||
}
|
||
this._refreshUdpTransportState(session);
|
||
|
||
this.debugLog('udp peer learned', session.id,
|
||
'oldPort=' + (previousPort || 0),
|
||
'newPort=' + learnedPort,
|
||
'ip=' + peerIp);
|
||
}
|
||
|
||
_sanitizeUdpPort(port) {
|
||
const parsed = Number(port);
|
||
return Number.isInteger(parsed) && parsed > 0 && parsed <= 65535 ? parsed : 0;
|
||
}
|
||
|
||
_refreshUdpTransportState(session) {
|
||
if (!session) return;
|
||
const udpEnabledByConfig = this.service_config.enable_udp !== false;
|
||
session.clientPort = this._sanitizeUdpPort(session.clientPort);
|
||
session.udpDataEnabled = !!session.requestedUdp
|
||
&& udpEnabledByConfig
|
||
&& !!this.udpServer
|
||
&& !!this.udpServerReady
|
||
&& session.clientPort > 0
|
||
&& !!session.clientEndpointIp;
|
||
}
|
||
|
||
_inferTransportToken(transportStr) {
|
||
if (!transportStr) return '';
|
||
const upper = String(transportStr).toUpperCase();
|
||
// Prioritize UDP-like transports over TCP when both appear.
|
||
if (upper.includes('\\CUBDD\\')) return 'CUBDD';
|
||
if (upper.includes('\\MMSU\\')) return 'MMSU';
|
||
if (upper.includes('\\UDP\\')) return 'UDP';
|
||
if (upper.includes('\\TCP\\')) return 'TCP';
|
||
return '';
|
||
}
|
||
|
||
_resolveDataAfMode(session) {
|
||
const mode = String(this.service_config?.data_af_mode || 'auto').toLowerCase();
|
||
if (mode === 'zero' || mode === 'sequence') return mode;
|
||
return session.isLegacyClient ? 'sequence' : 'zero';
|
||
}
|
||
|
||
_resolveMediaPacketId(session) {
|
||
const mode = String(this.service_config?.media_packet_id_mode || 'auto').toLowerCase();
|
||
if (mode === 'fixed') return MMS_MEDIA_PACKET_ID;
|
||
if (mode === 'incarnation') return session.mediaIncarnation & 0xFF;
|
||
return session.isLegacyClient
|
||
? (session.mediaIncarnation & 0xFF)
|
||
: MMS_MEDIA_PACKET_ID;
|
||
}
|
||
|
||
_nextDataFlags(session) {
|
||
if (session.dataAfMode === 'sequence') {
|
||
const sequence = session.dataSequence >>> 0;
|
||
const flags = sequence & 0xFF;
|
||
session.lastDataSequenceTx = sequence;
|
||
session.dataSequence = (sequence + 1) >>> 0;
|
||
return flags;
|
||
}
|
||
session.lastDataSequenceTx = session.mediaPacketsTx >>> 0;
|
||
return 0x00;
|
||
}
|
||
|
||
_parseStartPlaying(openFileId, payload) {
|
||
// _handleCommandPacket passes the reduced StartPlaying payload that begins at position(8).
|
||
if (!payload || payload.length < 24) return null;
|
||
|
||
const rawPosition = payload.readDoubleLE(0);
|
||
const asfOffset = payload.readUInt32LE(8) >>> 0;
|
||
const locationId = payload.readUInt32LE(12) >>> 0;
|
||
const frameOffset = payload.readUInt32LE(16) >>> 0;
|
||
const playIncarnation = payload.readUInt32LE(20) >>> 0;
|
||
const position = Number.isFinite(rawPosition) && rawPosition >= 0 && rawPosition < 1e300
|
||
? rawPosition
|
||
: null;
|
||
|
||
return {
|
||
openFileId: openFileId >>> 0,
|
||
position,
|
||
asfOffset,
|
||
locationId,
|
||
frameOffset,
|
||
playIncarnation,
|
||
};
|
||
}
|
||
|
||
_resolveStartSeek(session, startReq) {
|
||
if (!startReq || !session.mediaPath) return null;
|
||
|
||
const dataStart = session.asfDataOffset + 50;
|
||
const maxPacketSize = Math.max(1, session.asfPacketSize >>> 0);
|
||
const maxMediaBytes = Math.max(0, session.asfFileSize - dataStart);
|
||
const maxPacketFromBytes = maxMediaBytes > 0
|
||
? Math.max(0, Math.floor((maxMediaBytes - 1) / maxPacketSize))
|
||
: 0;
|
||
const maxPacketFromHeader = session.asfPacketCount > 0
|
||
? Math.max(0, (session.asfPacketCount >>> 0) - 1)
|
||
: maxPacketFromBytes;
|
||
const maxPacketNumber = Math.max(maxPacketFromBytes, maxPacketFromHeader);
|
||
|
||
const clampPacketNumber = (value) => {
|
||
const n = Number.isFinite(value) ? Math.floor(value) : 0;
|
||
return Math.max(0, Math.min(n, maxPacketNumber));
|
||
};
|
||
|
||
let packetNumber = 0;
|
||
let readPos = dataStart;
|
||
let seekSource = 'default';
|
||
|
||
if (startReq.locationId !== 0 && startReq.locationId !== 0xFFFFFFFF) {
|
||
packetNumber = clampPacketNumber(startReq.locationId >>> 0);
|
||
readPos = dataStart + (packetNumber * maxPacketSize);
|
||
seekSource = 'locationId';
|
||
this.debugLog('seek resolve', session.id, seekSource, 'packet=' + packetNumber, 'readPos=0x' + readPos.toString(16));
|
||
return {
|
||
packetNumber,
|
||
readPos: Math.min(readPos, session.asfFileSize),
|
||
};
|
||
}
|
||
|
||
if (startReq.asfOffset !== 0 && startReq.asfOffset !== 0xFFFFFFFF) {
|
||
const requested = Math.max(dataStart, startReq.asfOffset >>> 0);
|
||
const relative = Math.max(0, requested - dataStart);
|
||
packetNumber = clampPacketNumber(Math.floor(relative / maxPacketSize));
|
||
readPos = dataStart + (packetNumber * maxPacketSize);
|
||
seekSource = 'asfOffset';
|
||
this.debugLog('seek resolve', session.id, seekSource, 'packet=' + packetNumber, 'readPos=0x' + readPos.toString(16));
|
||
return {
|
||
packetNumber,
|
||
readPos: Math.min(readPos, session.asfFileSize),
|
||
};
|
||
}
|
||
|
||
if (Number.isFinite(startReq.position) && startReq.position > 0 && startReq.position < 1e300) {
|
||
if (session.asfPacketCount > 0 && session.asfDurationSec > 0) {
|
||
const ratio = Math.max(0, Math.min(1, startReq.position / session.asfDurationSec));
|
||
packetNumber = clampPacketNumber(Math.floor(ratio * session.asfPacketCount));
|
||
readPos = dataStart + (packetNumber * maxPacketSize);
|
||
seekSource = 'position-duration';
|
||
} else if (session.bitrateBps > 0) {
|
||
const bytePos = Math.floor((startReq.position * session.bitrateBps) / 8);
|
||
const clamped = Math.max(0, Math.min(bytePos, maxMediaBytes));
|
||
packetNumber = clampPacketNumber(Math.floor(clamped / maxPacketSize));
|
||
readPos = dataStart + (packetNumber * maxPacketSize);
|
||
seekSource = 'position-bitrate';
|
||
}
|
||
}
|
||
|
||
this.debugLog('seek resolve', session.id, seekSource, 'packet=' + packetNumber, 'readPos=0x' + readPos.toString(16));
|
||
|
||
return {
|
||
packetNumber,
|
||
readPos: Math.min(readPos, session.asfFileSize),
|
||
};
|
||
}
|
||
|
||
_buildConnectAcceptedPayload(session) {
|
||
// LinkMacToViewerReportConnectedEX
|
||
const isLegacyClient = /^NSPlayer\/6\./i.test(session?.clientPlayer || '');
|
||
const serverVersion = this._utf16leBuffer(this.majorVersion + '.' + this.minorVersion);
|
||
const playerMinVersion = this._utf16leBuffer(''); // no minimum version
|
||
const updatePlayerUrl = this._utf16leBuffer('');
|
||
const AuthenPackage = this._utf16leBuffer('');
|
||
const configUdp = (this.service_config.enable_udp === false) ? false : true;
|
||
const failure = (!configUdp && session.requestedUdp) || (configUdp && session.requestedUdp) || (!session.requestedUdp && !session.requestedTcp);
|
||
|
||
const header = Buffer.alloc(48);
|
||
header.writeUInt32LE(header.length / 8, 0); // chunkLength in 8-byte blocks
|
||
header.writeUInt32LE(0x00040001, 4); // MessageID
|
||
header.writeUInt32LE((failure ? 0xC00D0005 : 0x00000000), 8); // HRESULT (StatusCode) (NS_E_NOCONNECTION if failure = true, 0 = false)
|
||
header.writeUInt32LE(MMS_DISABLE_PACKET_PAIR, 12); // playIncaration (packet-pair support)
|
||
header.writeUInt32LE(0x0004000B, 16); // MacToViewerProtocolRevision
|
||
header.writeUInt32LE(0x0003001C, 20); // ViewerToMacProtocolRevision
|
||
header.writeDoubleLE(1.0, 8); // blockGroupPlayTime = 1.0
|
||
header.writeUInt32LE(0x00000001, 16); // blockGroupBlocks
|
||
header.writeUInt32LE(0x00000001, 20); // nMaxOpenFiles
|
||
header.writeUInt32LE(0x00008000, 24); // nBlockMaxBytes
|
||
header.writeUInt32LE(0x00989680, 28); // maxBitRate
|
||
header.writeUInt32LE(serverVersion.length / 2, 32); // cbServerVersionInfo (Unicode chars)
|
||
header.writeUInt32LE(playerMinVersion.length / 2, 36); // cbVersionInfo
|
||
header.writeUInt32LE(updatePlayerUrl.length / 2, 40); // cbVersionUrl
|
||
header.writeUInt32LE(AuthenPackage.length / 2, 44); // cbAuthenPackage
|
||
|
||
return Buffer.concat([header, serverVersion, playerMinVersion, updatePlayerUrl, AuthenPackage]);
|
||
}
|
||
|
||
_buildTransportAcceptedPayload() {
|
||
// packetPayloadSize(4) + funnelName (UTF-16LE, null-terminated)
|
||
const funnelName = Buffer.from('Funnel Of The Gods\0', 'utf16le');
|
||
const buf = Buffer.alloc(4 + funnelName.length);
|
||
// packetPayloadSize = 0 (already zero from alloc)
|
||
funnelName.copy(buf, 4);
|
||
return buf;
|
||
}
|
||
|
||
_buildFunnelInfoPayload(session) {
|
||
// transportMask(4) + nBlockFragments(4) + fragmentBytes(4) + nCubs(4)
|
||
// + failedCubs(4) + nDisks(4) + decluster(4) + cubddDatagramSize(4)
|
||
const buf = Buffer.alloc(32);
|
||
buf.writeUInt32LE(0x00000008, 0); // transportMask
|
||
buf.writeUInt32LE(0x00000001, 4); // nBlockFragments
|
||
buf.writeUInt32LE(0x00010000, 8); // fragmentBytes
|
||
buf.writeUInt32LE(session.clientId >>> 0, 12); // nCubs (unique client identifier)
|
||
buf.writeUInt32LE(0x00000000, 16); // failedCubs
|
||
buf.writeUInt32LE(0x00000001, 20); // nDisks
|
||
buf.writeUInt32LE(0x00000000, 24); // decluster
|
||
buf.writeUInt32LE((session.isCubddTransport) ? session.asfPacketSize + 8 : 0x00000000, 28); // cubddDatagramSize
|
||
return buf;
|
||
}
|
||
|
||
_buildFileDetailsPayload(session) {
|
||
// Full LinkMacToViewerReportOpenFile payload (100 bytes)
|
||
const buf = Buffer.alloc(100);
|
||
const headerBuf = this._readAsfHeader(session) || Buffer.alloc(0);
|
||
const mediaDataStart = session.asfDataOffset + 50;
|
||
const mediaBytes = Math.max(0, session.asfFileSize - mediaDataStart);
|
||
const packetCount = session.asfPacketCount > 0
|
||
? session.asfPacketCount
|
||
: (session.asfPacketSize > 0 ? Math.ceil(mediaBytes / session.asfPacketSize) : 0);
|
||
const durationSeconds = session.asfDurationSec > 0
|
||
? Math.ceil(session.asfDurationSec)
|
||
: (session.bitrateBps > 0
|
||
? Math.ceil((mediaBytes * 8) / session.bitrateBps)
|
||
: 0);
|
||
|
||
buf.writeUInt32LE(session.openFileId >>> 0, 0); // openFileId
|
||
// padding [4] = 0, fileName [8] = 0 (already zeroed)
|
||
buf.writeUInt32LE(FILE_ATTRIBUTE_MMS_CANSEEK, 12); // fileAttributes = FILE_ATTRIBUTE_MMS_CANSEEK
|
||
buf.writeDoubleLE(durationSeconds, 16); // fileDuration (8-byte double)
|
||
buf.writeUInt32LE(durationSeconds >>> 0, 24); // fileBlocks (integer seconds)
|
||
// unused1 [28-43] = 0 (already zeroed)
|
||
buf.writeUInt32LE(session.asfPacketSize >>> 0, 44); // filePacketSize
|
||
buf.writeUInt32LE(packetCount >>> 0, 48); // filePacketCount low 32 bits
|
||
// filePacketCount high 32 bits [52-55] = 0
|
||
buf.writeUInt32LE(session.bitrateBps >>> 0, 56); // fileBitRate
|
||
buf.writeUInt32LE(headerBuf.length >>> 0, 60); // fileHeaderSize
|
||
// unused2 [64-99] = 0 (already zeroed)
|
||
return buf;
|
||
}
|
||
|
||
_buildOpenFileErrorPayload() {
|
||
return Buffer.alloc(0);
|
||
}
|
||
}
|
||
|
||
module.exports = WTVMMS;
|