WTVPNM: implement UDP retransmit

This commit is contained in:
zefie
2026-04-22 00:01:34 -04:00
parent 6c44d6bf6d
commit 9c6493332f
4 changed files with 670 additions and 79 deletions

View File

@@ -34,7 +34,6 @@ if (serviceVaultIdx !== -1) {
subDirPath = '/' + subdirs.join('/');
}
}
console.log("DEBUG: Detected subDirPath =", subDirPath);
const url_path = request_headers.request_url.split('?')[0];
const pathParts = url_path.split('/').filter(p => p);
@@ -42,8 +41,6 @@ const serviceName = pathParts.length > 0 ? pathParts[0] : '';
let remainingPath = '/' + pathParts.slice(1).join('/');
const hadTrailingSlash = request_headers.request_url.endsWith('/');
console.log("DEBUG: Before stripping - subDirPath =", subDirPath, "remainingPath =", remainingPath);
let strippedSubDir = ''; // Store what was stripped for link rebuilding
// Strip the subdirectory structure from the request path
if (subDirPath) {
@@ -58,7 +55,6 @@ if (subDirPath) {
}
}
console.log("DEBUG: After stripping - remainingPath =", remainingPath, "strippedSubDir =", strippedSubDir);
// Restore trailing slash if original URL had one
if (hadTrailingSlash && !remainingPath.endsWith('/')) {
@@ -67,7 +63,6 @@ if (hadTrailingSlash && !remainingPath.endsWith('/')) {
const filename = remainingPath.endsWith('/') ? '' : remainingPath.split('/').pop().replace('.ram', '');
const directory = remainingPath.endsWith('/') ? remainingPath.replace(/\/$/, '') : remainingPath.substring(0, remainingPath.lastIndexOf('/'));
console.log("DEBUG: Request for service", serviceName, "with filename", filename, "and directory", directory, "remainingPath", remainingPath);
let fileFound = false;
const extensions = ['.ra', '.rm'];
@@ -80,7 +75,6 @@ if (!filename || (request_headers.request_url.endsWith('/') && minisrv_config.se
for (const pnmVault of pnmVaults) {
const targetDir = path.join(pnmVault, listingDir);
console.log("DEBUG: Listing files in", targetDir);
if (fs.existsSync(targetDir) && fs.statSync(targetDir).isDirectory()) {
const files = fs.readdirSync(targetDir);
files.forEach(file => {
@@ -118,7 +112,6 @@ Content-type: text/html`;
for (const pnmVault of pnmVaults) {
for (const ext of extensions) {
const filePath = path.join(pnmVault, directory, filename + ext);
console.log("DEBUG: Checking for file", filePath);
if (fs.existsSync(filePath)) {
fileFound = true;
resolvedPath = filePath;
@@ -134,7 +127,6 @@ Content-type: text/html`;
} else {
const filePath = path.join(directory || '/', filename + path.extname(resolvedPath));
const pnmURL = `pnm://${minisrv_config.config.service_ip}:${minisrv_config.services['pnm'].port}${filePath.replace(/\\/g, '/')}`;
console.log("DEBUG: File found at", resolvedPath, "serving as", pnmURL);
headers = `200 OK
Content-type: audio/x-pn-realaudio`
data = pnmURL;

View File

@@ -37,13 +37,9 @@ class WTVPNM {
this.wtvshared = new WTVShared(minisrv_config, true);
this.server = net.createServer((socket) => this.handleConnection(socket));
// Stable 32-bit "server id" byte embedded in the first 0x4F chunk of every
// descriptor. Observed RealServer values all share the same high bytes
// (0x00071a??), so keep the upper 24 bits fixed and randomize the low byte
// per process lifetime. The low byte (= descriptor[5]) is NOT a checksum;
// cross-referenced against multi_auth.pcap it is constant within a single
// server process.
this.serverId = 0x00071a00 | (crypto.randomInt(0, 0x100) & 0xff);
// Auth-sensitive descriptor server id shape observed in captures.
// Keep fixed upper 24 bits 0x00071a and vary only the low byte.
this.serverIdBase = 0x00071a00;
// Per-session counter embedded in bytes 9..12 of the first 0x4F chunk.
// multi_auth.pcap increments this 1,2,3,... across successive sessions.
@@ -77,6 +73,7 @@ class WTVPNM {
remoteIp: (socket.remoteAddress || '').replace('::ffff:', ''),
helloSent: false,
descriptorSent: false,
descriptorInFlight: false,
descriptorTimer: null,
capabilitiesLogged: false,
capabilities: [],
@@ -92,6 +89,8 @@ class WTVPNM {
// receive events or coalesced with other bytes; we accumulate and
// decode them here in handleControlCommands().
ctrlBuf: Buffer.alloc(0),
// 35-byte auth hash can be fragmented/coalesced on TCP.
authBuf: Buffer.alloc(0),
// RDT b5 nibble state. High nibble = 'seek generation' (starts
// at 1, bumped on every 0x53 seek). Low nibble = (seq -
// seekBaseSeq) & 0xf where seekBaseSeq is the wall-seq of the
@@ -107,7 +106,21 @@ class WTVPNM {
// (avg bitrate, etc.). Falls back to global defaults when unset.
rdtDataTypeLo: null,
rdtSyncType: null,
audioChannels: null
audioChannels: null,
mediaUdpPort: null,
serverId: null,
serverUdpPort: null,
_udpSocketHandlersAttached: false,
udpSocket: null,
udpPacketCache: new Map(),
udpPacketOrder: [],
udpFeedbackWindowStart: 0,
udpFeedbackResentInWindow: 0,
udpFeedbackDropped: 0,
udpFeedbackPeerPort: null,
udpPriorityUntil: 0,
udpInboundCount: 0,
udpFeedbackProbeTimer: null
};
this.sessions.set(socket, session);
@@ -170,10 +183,13 @@ class WTVPNM {
const parsedChallenge = this.getClientChallenge(session.pnaFields);
if (parsedChallenge) session.clientChallenge = parsedChallenge;
// Extract UDP port from PNA field ID 1 (2 bytes, big-endian)
// Extract client UDP port from PNA field ID 1 (big-endian u16).
const udpPortField = session.pnaFields.find(f => f.id === 1 && f.len === 2);
if (udpPortField) {
session.clientUdpPort = udpPortField.value.readUInt16BE(0);
session.clientUdpPort = (udpPortField && Buffer.isBuffer(udpPortField.value))
? this.sanitizeUdpPort(udpPortField.value.readUInt16BE(0))
: null;
session.mediaUdpPort = session.clientUdpPort;
if (session.clientUdpPort) {
this.debugLog('client udp port', session.id, session.clientUdpPort);
}
@@ -182,7 +198,6 @@ class WTVPNM {
session.requestedMedia = parsedMedia;
session.mediaPath = this.resolveMediaPath(session.requestedMedia);
}
if (!session.capabilitiesLogged) {
session.capabilitiesLogged = true;
const cap = this.extractCapabilities(data);
@@ -205,14 +220,20 @@ class WTVPNM {
this.debugLog('requested media', session.id, session.requestedMedia);
}
}
console.log('*', `[${session.id}]`, `PNM RealServer Request for media ${session.mediaPath}`);
const pnmHeaders = {
'clientChallenge': session.clientChallenge,
'timestamp': session.pnaFields?.timestamp,
'requestedMedia': session.requestedMedia,
'User-Agent': session.pnaFields?.useragent,
'clientUDPPort': session.clientUdpPort,
};
console.log('*', 'PNM Request Data:', pnmHeaders);
if (session.requestedMedia && !session.mediaPath) {
console.log(' * PNM RealServer Warning: requested media not found', session.requestedMedia);
console.log('*', 'PNM Error:', session.requestedMedia, 'not found in service vault(s)');
this.sendNotFound(socket, session.requestedMedia);
session.notFoundSent = true;
return;
} else {
console.log(' * PNM RealServer Request from', session.id, 'for media', session.mediaPath);
}
}
@@ -229,28 +250,113 @@ class WTVPNM {
}
if (session.helloSent && session.descriptorSent) {
// Client sends hash response: 0x23 0x00 0x20 + 32 hex chars (35 bytes total)
if (data.length === 35 && data[0] === 0x23) {
const hashHex = data.toString('ascii', 3, 35);
this.debugLog('client hash response', session.id, hashHex);
// Verify client response: RNWK_MD5(serverHello_BE, 0x00000000, clientChallenge)
const expectedResp = this.computeClientResponse(session);
if (expectedResp && hashHex === expectedResp) {
this.debugLog('client hash VERIFIED', session.id);
session.hashVerified = true;
} else {
this.debugLog('client hash MISMATCH', session.id, 'expected', expectedResp);
if (!session.hashVerified) {
session.authBuf = session.authBuf && session.authBuf.length
? Buffer.concat([session.authBuf, data])
: Buffer.from(data);
if (session.authBuf.length < 35) {
return;
}
const expectedResp = this.computeClientResponse(session);
let authOffset = -1;
let hashHex = null;
// Find a complete auth frame anywhere in the buffered stream:
// [0x23, 0x00, 0x20, 32 ASCII hex chars]
for (let i = 0; i + 35 <= session.authBuf.length; i++) {
if (session.authBuf[i] !== 0x23 || session.authBuf[i + 1] !== 0x00 || session.authBuf[i + 2] !== 0x20) {
continue;
}
const candidate = session.authBuf.toString('ascii', i + 3, i + 35).toLowerCase();
if (!/^[a-f0-9]{32}$/.test(candidate)) continue;
// Prefer a candidate that matches the expected digest.
if (expectedResp && candidate === expectedResp) {
authOffset = i;
hashHex = candidate;
break;
}
// Keep first syntactically valid candidate as fallback.
if (authOffset < 0) {
authOffset = i;
hashHex = candidate;
}
}
if (authOffset < 0) {
// No complete auth frame yet; spill older bytes to control parser
// while keeping a tail window for fragmented auth frames.
if (session.authBuf.length > 512) {
const keepTail = 96;
const spill = session.authBuf.slice(0, session.authBuf.length - keepTail);
session.authBuf = session.authBuf.slice(session.authBuf.length - keepTail);
if (spill.length > 0) {
this.handleControlCommands(socket, session, spill);
}
}
return;
}
if (expectedResp && hashHex !== expectedResp) {
// Found a syntactically valid auth frame, but not ours yet.
// Keep buffering in case the matching frame is still pending.
if (session.authBuf.length > 768) {
session.authBuf = session.authBuf.slice(-128);
}
return;
}
const preAuth = session.authBuf.slice(0, authOffset);
const remaining = session.authBuf.slice(authOffset + 35);
session.authBuf = Buffer.alloc(0);
if (preAuth.length > 0) {
this.handleControlCommands(socket, session, preAuth);
}
this.debugLog('client hash response', session.id, hashHex);
if (expectedResp && hashHex === expectedResp) {
session.hashVerified = true;
const burstPrestartMs = typeof this.service_config.burst_prestart_ms === 'number'
? this.service_config.burst_prestart_ms
: 3000;
const mediaHeaders = {
'challengeResponse': expectedResp,
'avgBitrate': session.avgBitRate,
'audioChannels': session.audioChannels,
'burstMaxRate': session.avgBitRate * 2,
'burstDurationMs': burstPrestartMs
};
console.log('*', 'PNM Result Data:', mediaHeaders);
} else {
console.log('*', 'PNM Error: client hash response did not match expected value', session.requestedMedia);
socket.close();
return;
}
if (session.clientUdpPort) {
this.startUdpStream(socket, session);
} else {
this.debugLog('hash verified, waiting for UDP peer port', session.id);
this.attachUdpSocketHandlers(socket, session);
}
} else {
// Post-descriptor control byte stream. See handleControlCommands
// for opcode list. Accumulate and decode — RP8 uses seek/
// pause/resume commands here that can arrive coalesced or
// fragmented across TCP segments.
this.handleControlCommands(socket, session, data);
if (remaining.length > 0) {
this.handleControlCommands(socket, session, remaining);
}
return;
}
// Post-descriptor control byte stream. See handleControlCommands
// for opcode list. Accumulate and decode — RP8 uses seek/
// pause/resume commands here that can arrive coalesced or
// fragmented across TCP segments.
this.handleControlCommands(socket, session, data);
return;
}
}
@@ -262,6 +368,7 @@ class WTVPNM {
session.helloSent = false;
session.descriptorSent = false;
session.descriptorInFlight = false;
session.notFoundSent = false;
session.capabilitiesLogged = false;
session.capabilities = [];
@@ -270,10 +377,24 @@ class WTVPNM {
session.mediaPath = null;
session.pnaFields = null;
session.ctrlBuf = Buffer.alloc(0);
session.authBuf = Buffer.alloc(0);
session.paused = false;
session.eosSent = false;
session.hashVerified = false;
session.sessionNumber = undefined;
session.mediaUdpPort = null;
session.udpPacketCache = new Map();
session.udpPacketOrder = [];
session.udpFeedbackWindowStart = 0;
session.udpFeedbackResentInWindow = 0;
session.udpFeedbackDropped = 0;
session.udpFeedbackPeerPort = null;
session.udpPriorityUntil = 0;
session.udpInboundCount = 0;
if (session.udpFeedbackProbeTimer) {
clearTimeout(session.udpFeedbackProbeTimer);
session.udpFeedbackProbeTimer = null;
}
}
// Parse the post-descriptor TCP control stream sent by RealPlayer during
@@ -605,14 +726,30 @@ class WTVPNM {
? this.service_config.descriptor_after_hello_ms
: 100;
session.descriptorTimer = setTimeout(() => {
session.descriptorTimer = setTimeout(async () => {
session.descriptorTimer = null;
if (socket.destroyed || session.descriptorSent) return;
const descriptor = this.buildDescriptorPacket(session);
session.descriptorSent = true;
this.send(socket, descriptor);
this.debugLog('descriptor sent', session.id, `len=${descriptor.length}`, `delay=${descriptorDelay}ms`);
this.prepareMediaData(session);
if (socket.destroyed || session.descriptorSent || session.descriptorInFlight) return;
session.descriptorInFlight = true;
try {
const udpReady = await this.ensureSessionUdpSocket(session);
if (!udpReady) {
this.debugLog('descriptor aborted: failed to reserve UDP socket', session.id);
socket.destroy();
return;
}
if (socket.destroyed || session.descriptorSent) return;
const descriptor = this.buildDescriptorPacket(session);
session.descriptorSent = true;
this.send(socket, descriptor);
this.debugLog('descriptor sent', session.id, `len=${descriptor.length}`, `delay=${descriptorDelay}ms`);
this.prepareMediaData(session);
} finally {
session.descriptorInFlight = false;
}
}, descriptorDelay);
}
@@ -624,19 +761,34 @@ class WTVPNM {
}
}
sendDescriptorAndStartStream(socket, session, reason) {
if (!socket || !session || session.descriptorSent) return;
async sendDescriptorAndStartStream(socket, session, reason) {
if (!socket || !session || session.descriptorSent || session.descriptorInFlight) return;
this.clearDescriptorTimer(session);
if (socket.destroyed) return;
this.send(socket, this.buildDescriptorPacket(session));
session.descriptorSent = true;
this.debugLog('descriptor sent', session.id, reason);
session.descriptorInFlight = true;
this.prepareMediaData(session);
try {
const udpReady = await this.ensureSessionUdpSocket(session);
if (!udpReady) {
this.debugLog('descriptor aborted: failed to reserve UDP socket', session.id);
socket.destroy();
return;
}
// Wait for UDP port response from client before starting stream
this.debugLog('descriptor sent, waiting for client UDP port response on TCP connection', session.id);
if (socket.destroyed || session.descriptorSent) return;
this.send(socket, this.buildDescriptorPacket(session));
session.descriptorSent = true;
this.debugLog('descriptor sent', session.id, reason);
this.prepareMediaData(session);
// Wait for UDP port response from client before starting stream
this.debugLog('descriptor sent, waiting for client UDP port response on TCP connection', session.id);
} finally {
session.descriptorInFlight = false;
}
}
stopUdpStream(session) {
@@ -645,6 +797,10 @@ class WTVPNM {
clearTimeout(session.udpStartTimer);
session.udpStartTimer = null;
}
if (session.udpFeedbackProbeTimer) {
clearTimeout(session.udpFeedbackProbeTimer);
session.udpFeedbackProbeTimer = null;
}
if (session.udpTimer) {
clearInterval(session.udpTimer);
session.udpTimer = null;
@@ -653,6 +809,333 @@ class WTVPNM {
try { session.udpSocket.close(); } catch(e) {}
session.udpSocket = null;
}
session.serverId = null;
session.serverUdpPort = null;
session._udpSocketHandlersAttached = false;
session.udpPacketCache = new Map();
session.udpPacketOrder = [];
session.udpFeedbackWindowStart = 0;
session.udpFeedbackResentInWindow = 0;
session.udpFeedbackDropped = 0;
session.udpFeedbackPeerPort = null;
session.udpInboundCount = 0;
}
sanitizeUdpPort(port) {
const parsed = Number(port);
return Number.isInteger(parsed) && parsed > 0 && parsed <= 65535 ? parsed : null;
}
async ensureSessionUdpSocket(session) {
if (!session) return false;
const existingPort = this.sanitizeUdpPort(session.serverUdpPort);
if (session.udpSocket && existingPort) return true;
const bindIp = this.minisrv_config?.config?.bind_ip || '0.0.0.0';
// Keep source ports in 0x1a00-0x1aff so descriptor serverId can stay
// in the auth-compatible 0x00071a?? shape while still supporting
// per-session unique ports.
const basePort = Number.isInteger(this.service_config.udp_bind_port_base)
? this.service_config.udp_bind_port_base
: 0x1a00;
const span = Number.isInteger(this.service_config.udp_bind_port_span)
? Math.max(1, this.service_config.udp_bind_port_span)
: 0x100;
const startOffset = crypto.randomInt(0, span);
let udpSocket = null;
let boundPort = null;
for (let i = 0; i < span; i++) {
const port = basePort + ((startOffset + i) % span);
if (port <= 0 || port > 65535) continue;
const candidate = dgram.createSocket('udp4');
const didBind = await new Promise((resolve) => {
const onError = () => resolve(false);
candidate.once('error', onError);
candidate.bind(port, bindIp, () => {
candidate.removeListener('error', onError);
resolve(true);
});
});
if (didBind) {
udpSocket = candidate;
boundPort = port;
break;
}
try { candidate.close(); } catch (_) {}
}
if (!udpSocket || !boundPort) {
this.debugLog('udp reserve bind failed', session.id,
`base=${basePort}`, `span=${span}`, 'no free ports');
return false;
}
session.udpSocket = udpSocket;
session._udpSocketHandlersAttached = false;
try {
const addr = udpSocket.address();
session.serverUdpPort = this.sanitizeUdpPort(addr.port);
session.serverId = (this.serverIdBase | (addr.port & 0xff)) >>> 0;
this.debugLog('udp socket reserved', session.id, `${addr.address}:${addr.port}`);
} catch (_) {
session.serverId = null;
session.serverUdpPort = null;
}
return !!session.serverUdpPort;
}
normalizeIpAddress(ip) {
return String(ip || '').replace(/^::ffff:/i, '');
}
attachUdpSocketHandlers(socket, session) {
if (!socket || !session || !session.udpSocket || session._udpSocketHandlersAttached) return;
session.udpSocket.on('error', (err) => {
this.debugLog('udp socket error', session.id, err.message);
this.stopUdpStream(session);
});
// Some clients send UDP resend/feedback before playback is fully
// underway. Keep this listener active as soon as socket is reserved.
session.udpSocket.on('message', (msg, rinfo) => {
session.udpInboundCount = (session.udpInboundCount || 0) + 1;
this.debugLog('udp rx', session.id, `from=${rinfo.address}:${rinfo.port}`,
`len=${msg.length}`, 'hex', msg.slice(0, 32).toString('hex'));
this.handleUdpFeedback(socket, session, msg, rinfo);
});
session._udpSocketHandlersAttached = true;
}
cacheUdpPacketForRetransmit(session, seq16, payload) {
if (!session || !Buffer.isBuffer(payload)) return;
const enabled = this.service_config.udp_retransmit_enabled !== false;
if (!enabled) return;
if (!session.udpPacketCache) session.udpPacketCache = new Map();
if (!Array.isArray(session.udpPacketOrder)) session.udpPacketOrder = [];
const maxCache = Number.isInteger(this.service_config.udp_retransmit_cache_size)
? Math.max(64, this.service_config.udp_retransmit_cache_size)
: 4096;
const now = Date.now();
const key = seq16 & 0xffff;
const existing = session.udpPacketCache.get(key);
if (!existing) {
session.udpPacketOrder.push(key);
}
session.udpPacketCache.set(key, { payload: Buffer.from(payload), ts: now });
while (session.udpPacketOrder.length > maxCache) {
const dropKey = session.udpPacketOrder.shift();
session.udpPacketCache.delete(dropKey);
}
const maxAgeMs = Number.isInteger(this.service_config.udp_retransmit_cache_max_age_ms)
? Math.max(250, this.service_config.udp_retransmit_cache_max_age_ms)
: 30000;
while (session.udpPacketOrder.length > 0) {
const oldestKey = session.udpPacketOrder[0];
const oldestEntry = session.udpPacketCache.get(oldestKey);
if (!oldestEntry || now - oldestEntry.ts > maxAgeMs) {
session.udpPacketOrder.shift();
session.udpPacketCache.delete(oldestKey);
continue;
}
break;
}
}
extractUdpRetransmitSeqs(session, msg) {
if (!session || !session.udpPacketCache || !Buffer.isBuffer(msg) || msg.length === 0) {
return [];
}
const out = new Set();
const maxSeqs = Number.isInteger(this.service_config.udp_retransmit_max_seqs_per_feedback)
? Math.max(1, this.service_config.udp_retransmit_max_seqs_per_feedback)
: 32;
const pushIfCached = (seq) => {
const key = seq & 0xffff;
if (session.udpPacketCache.has(key)) {
out.add(key);
}
};
// ASCII feedback support (test clients/tools):
// NAK 12,13,0x0014
// RETRANS 12 13
const ascii = msg.toString('latin1').replace(/[^\x20-\x7E]/g, ' ').trim();
if (/^(NAK|NACK|RETRANS|RESEND)\b/i.test(ascii)) {
const matches = ascii.match(/0x[0-9a-fA-F]+|\d+/g) || [];
for (const token of matches) {
const parsed = token.toLowerCase().startsWith('0x')
? parseInt(token, 16)
: parseInt(token, 10);
if (Number.isInteger(parsed)) pushIfCached(parsed);
if (out.size >= maxSeqs) break;
}
return Array.from(out);
}
// Binary fallback heuristics.
// Some clients encode seq requests as BE u16, others as LE u16, and
// some prepend an opcode byte.
if (msg.length === 2) {
pushIfCached(msg.readUInt16BE(0));
pushIfCached(msg.readUInt16LE(0));
return Array.from(out);
}
const collectWords = (startOffset, littleEndian = false) => {
for (let i = startOffset; i + 1 < msg.length; i += 2) {
const seq = littleEndian ? msg.readUInt16LE(i) : msg.readUInt16BE(i);
pushIfCached(seq);
if (out.size >= maxSeqs) break;
}
};
const collectDwordsLow16 = (startOffset, littleEndian = false) => {
for (let i = startOffset; i + 3 < msg.length; i += 4) {
const val = littleEndian ? msg.readUInt32LE(i) : msg.readUInt32BE(i);
pushIfCached(val & 0xffff);
if (out.size >= maxSeqs) break;
}
};
if (msg.length % 2 === 0) {
collectWords(0, false);
if (out.size < maxSeqs) collectWords(0, true);
} else {
collectWords(1, false);
if (out.size < maxSeqs) collectWords(1, true);
}
// Try opposite alignment once.
if (out.size === 0 && msg.length >= 4) {
const alt = msg.length % 2 === 0 ? 1 : 0;
collectWords(alt, false);
if (out.size < maxSeqs) collectWords(alt, true);
}
// Some feedback payloads carry 32-bit request entries.
if (out.size === 0 && msg.length >= 8) {
const preferred = msg.length % 2 === 0 ? 0 : 1;
collectDwordsLow16(preferred, false);
if (out.size < maxSeqs) collectDwordsLow16(preferred, true);
if (out.size === 0) {
const alt = preferred === 0 ? 1 : 0;
collectDwordsLow16(alt, false);
if (out.size < maxSeqs) collectDwordsLow16(alt, true);
}
}
return Array.from(out).slice(0, maxSeqs);
}
handleUdpFeedback(socket, session, msg, rinfo) {
if (!socket || !session || !session.udpSocket) return;
if (this.service_config.udp_retransmit_enabled === false) return;
const expectedIp = this.normalizeIpAddress(socket.remoteAddress);
const rxIp = this.normalizeIpAddress(rinfo?.address);
const rxPort = Number.isInteger(rinfo?.port) ? rinfo.port : -1;
const strictPeerPort = this.service_config.udp_retransmit_strict_peer_port === true;
if (rxIp !== expectedIp) {
this.debugLog('udp feedback ignored (endpoint mismatch)', session.id,
`from=${rxIp}:${rxPort}`,
`expected=${expectedIp}:${session.clientUdpPort}`);
return;
}
if (strictPeerPort && session.clientUdpPort && rxPort !== session.clientUdpPort) {
this.debugLog('udp feedback ignored (port mismatch, strict mode)', session.id,
`from=${rxIp}:${rxPort}`,
`expected=${expectedIp}:${session.clientUdpPort}`);
return;
}
if (!strictPeerPort && !session.udpFeedbackPeerPort) {
session.udpFeedbackPeerPort = rxPort;
this.debugLog('udp feedback peer learned', session.id,
`peer=${rxIp}:${rxPort}`,
`mediaTarget=${expectedIp}:${session.mediaUdpPort || session.clientUdpPort}`,
`retransmitTarget=${expectedIp}:${rxPort}`);
// If auth already passed and stream hasn't started yet, begin now.
if (session.hashVerified && !session.udpTimer && !session.udpStartTimer) {
this.debugLog('starting UDP stream after peer learn', session.id,
`target=${expectedIp}:${session.mediaUdpPort || session.clientUdpPort}`);
this.startUdpStream(socket, session);
}
}
if (!session.clientUdpPort) return;
const requestedSeqs = this.extractUdpRetransmitSeqs(session, msg);
if (!requestedSeqs.length) return;
const priorityHoldMs = Number.isInteger(this.service_config.udp_retransmit_priority_hold_ms)
? Math.max(0, this.service_config.udp_retransmit_priority_hold_ms)
: 18;
if (priorityHoldMs > 0) {
session.udpPriorityUntil = Math.max(session.udpPriorityUntil || 0, Date.now() + priorityHoldMs);
}
const now = Date.now();
const windowMs = Number.isInteger(this.service_config.udp_retransmit_window_ms)
? Math.max(250, this.service_config.udp_retransmit_window_ms)
: 1000;
const maxPerWindow = Number.isInteger(this.service_config.udp_retransmit_max_per_window)
? Math.max(1, this.service_config.udp_retransmit_max_per_window)
: 24;
if (!session.udpFeedbackWindowStart || now - session.udpFeedbackWindowStart >= windowMs) {
session.udpFeedbackWindowStart = now;
session.udpFeedbackResentInWindow = 0;
}
let resent = 0;
for (const seq16 of requestedSeqs) {
if (session.udpFeedbackResentInWindow >= maxPerWindow) {
session.udpFeedbackDropped = (session.udpFeedbackDropped || 0) + 1;
this.debugLog('udp retransmit rate-limited', session.id,
`windowMs=${windowMs}`,
`max=${maxPerWindow}`,
`dropped=${session.udpFeedbackDropped}`);
break;
}
const cached = session.udpPacketCache.get(seq16 & 0xffff);
if (!cached || !Buffer.isBuffer(cached.payload)) continue;
const txPort = this.sanitizeUdpPort(session.udpFeedbackPeerPort)
|| session.mediaUdpPort
|| session.clientUdpPort;
session.udpSocket.send(cached.payload, 0, cached.payload.length,
txPort, expectedIp, (err) => {
if (err) this.debugLog('udp retransmit send err', session.id, `seq=${seq16}`, err.message);
});
resent++;
session.udpFeedbackResentInWindow++;
}
if (resent > 0) {
this.debugLog('udp retransmit', session.id,
`count=${resent}`,
`seqs=${requestedSeqs.slice(0, resent).join(',')}`);
}
}
prepareMediaData(session) {
@@ -674,7 +1157,7 @@ class WTVPNM {
session.rdtPacketMode = 'classic-len';
session.syncEvery = Number.isInteger(this.service_config.rdt_sync_every_classic)
? Math.max(1, this.service_config.rdt_sync_every_classic)
: 8;
: 5;
const cfgDataTypeLo = Number.isInteger(this.service_config.rdt_data_type_lo)
? (this.service_config.rdt_data_type_lo & 0xff)
@@ -689,7 +1172,7 @@ class WTVPNM {
} else {
const useLegacyProfile = classicRa.channels === 1 || classicRa.channels === null;
session.rdtDataTypeLo = useLegacyProfile ? 0x64 : 0x50;
session.rdtSyncType = 0x0455;
session.rdtSyncType = useLegacyProfile ? 0x0477 : 0x04ba;
}
const payload = media.subarray(classicRa.dataOffset);
@@ -993,25 +1476,36 @@ class WTVPNM {
const burstFrameCount = burstPrestartMs > 0 ? Math.ceil(burstPrestartMs / intervalMs) : 0;
session.burstFramesSent = 0;
const targetIp = this.normalizeIpAddress(socket.remoteAddress);
const mediaTargetPort = this.sanitizeUdpPort(session.mediaUdpPort) || session.clientUdpPort;
this.debugLog('udp stream start', session.id,
`frames=${session.mediaFrames?.length || 0}`,
`avgBitRate=${session.avgBitRate || 'unknown'}bps`,
`bodyLen=${bodyLen}`,
`interval=${intervalMs.toFixed(2)}ms`,
`burstFrames=${burstFrameCount}`,
`target=${socket.remoteAddress}:${session.clientUdpPort}`);
`target=${targetIp}:${mediaTargetPort}`,
`sourcePort=${session.serverUdpPort || 'unknown'}`);
session.udpSocket = dgram.createSocket('udp4');
session.udpSocket.on('error', (err) => {
this.debugLog('udp socket error', session.id, err.message);
this.stopUdpStream(session);
});
// Some RDT clients also send ACK/resend requests back on the same UDP
// flow. Bind so we can receive (even if we ignore the content).
session.udpSocket.on('message', (msg, rinfo) => {
this.debugLog('udp rx', session.id, `from=${rinfo.address}:${rinfo.port}`,
`len=${msg.length}`, 'hex', msg.slice(0, 32).toString('hex'));
});
if (!session.udpSocket || !this.sanitizeUdpPort(session.serverUdpPort)) {
this.debugLog('udp stream start failed: socket not reserved', session.id);
return;
}
this.attachUdpSocketHandlers(socket, session);
if (session.udpFeedbackProbeTimer) {
clearTimeout(session.udpFeedbackProbeTimer);
}
session.udpFeedbackProbeTimer = setTimeout(() => {
session.udpFeedbackProbeTimer = null;
if (!session.udpSocket || socket.destroyed) return;
if ((session.udpInboundCount || 0) === 0) {
this.debugLog('udp feedback not seen yet', session.id,
`streamTarget=${this.normalizeIpAddress(socket.remoteAddress)}:${mediaTargetPort}`,
'If packet capture shows ICMP port unreachable, client is not listening on requested UDP port.');
}
}, 2500);
// sendPacket wraps buildMediaPayload with the every-5th-sync-frame
// prefix and writes to the UDP socket. Wall-seq and frame are passed
@@ -1024,9 +1518,11 @@ class WTVPNM {
const out = withSync
? Buffer.concat([this.buildSyncFrame(session, seq), dataFrame])
: dataFrame;
this.cacheUdpPacketForRetransmit(session, seq, out);
const txPort = mediaTargetPort;
session.udpSocket.send(out, 0, out.length,
session.clientUdpPort, socket.remoteAddress, (err) => {
if (err) this.debugLog('udp send err', session.id, err.message);
txPort, targetIp, (err) => {
if (err) this.debugLog('udp send err', session.id, `${targetIp}:${txPort}`, err.message);
});
};
@@ -1048,6 +1544,14 @@ class WTVPNM {
}
// Don't re-arm while paused; resumeUdpStream calls _startDataInterval.
if (session.paused) return;
const priorityUntil = session.udpPriorityUntil || 0;
if (priorityUntil > Date.now()) {
const waitMs = Math.max(1, Math.min(priorityUntil - Date.now(), 10));
session.udpTimer = setTimeout(tick, waitMs);
return;
}
const frames = session.mediaFrames;
if (!frames || session.mediaFrameIdx >= frames.length) {
// End of media: stop sending once all RA frames are out.
@@ -1520,11 +2024,12 @@ class WTVPNM {
const out = Buffer.concat(outChunks);
// The first 0x4F/0x08 chunk carries [serverId_u32_BE][sessionCounter_u32_BE].
// These are NOT a checksum of serverChallenge — verified against
// multi_auth.pcap (6 sessions, constant serverId, incrementing counter).
// Descriptor layout: [0x4F, 0x08, serverId(4), sessionCounter(4), ...]
// so serverId occupies out[2..6] and sessionCounter occupies out[6..10].
out.writeUInt32BE(this.serverId >>> 0, 2);
// Keep auth-compatible 0x00071a?? serverId shape and encode per-session
// UDP port in the low byte (port range 0x1a00-0x1aff).
const serverId = Number.isInteger(session?.serverId)
? session.serverId
: ((this.serverIdBase | 0x27) >>> 0);
out.writeUInt32BE(serverId, 2);
const sessionNumber = (session && typeof session.sessionNumber === 'number')
? session.sessionNumber
: ++this.sessionCounter;
@@ -1896,6 +2401,100 @@ class WTVPNM {
return Array.from(out).slice(0, 20);
}
getPnaFieldAliases(field) {
if (!field) return [];
switch (field.id) {
case 0:
return ['maskedclienttime', 'maskedtime'];
case 1:
return ['udpport', 'clientudpport'];
case 4:
return ['challenge', 'clientchallenge'];
case 23:
return ['timestamp', 'clienttimestamp'];
case 0x42:
return ['bitrate'];
case 0x52:
return ['requestedmedia', 'resourcepath', 'filename'];
case 0x63:
return ['useragent'];
default:
return [];
}
}
decodePnaFieldValue(field, alias = null) {
if (!field || !Buffer.isBuffer(field.value)) return null;
if (alias === 'udpport' || alias === 'clientudpport') {
return field.len >= 2 ? field.value.readUInt16BE(0) : null;
}
if (alias === 'maskedclienttime' || alias === 'maskedtime') {
return field.len >= 4 ? field.value.readUInt32BE(0) : null;
}
if (alias === 'bitrate') {
if (field.len >= 4) {
return field.value.readUInt32BE(0);
}
const bitrateText = field.value.toString('latin1').replace(/\x00+$/g, '').trim();
const bitrateMatch = bitrateText.match(/(?:bitrate|avg[_ -]?bitrate|max[_ -]?bitrate)\D+(\d{3,})/i);
if (bitrateMatch) {
return parseInt(bitrateMatch[1], 10);
}
return bitrateText || null;
}
const textValue = field.value.toString('latin1').replace(/\x00+$/g, '').trim();
if (textValue.length > 0) {
return textValue;
}
return Buffer.from(field.value);
}
attachPnaFieldAlias(fields, alias, field) {
if (!alias || !fields || !field) return;
const decodedValue = this.decodePnaFieldValue(field, alias);
const fieldKey = `${alias}Field`;
if (!(alias in fields)) {
fields[alias] = decodedValue;
fields[fieldKey] = field;
return;
}
if (!Array.isArray(fields[alias])) {
fields[alias] = [fields[alias]];
}
fields[alias].push(decodedValue);
if (!Array.isArray(fields[fieldKey])) {
fields[fieldKey] = [fields[fieldKey]];
}
fields[fieldKey].push(field);
}
decoratePnaFields(fields) {
if (!Array.isArray(fields)) return fields;
for (const field of fields) {
if (!field) continue;
this.attachPnaFieldAlias(fields, `field_${field.id}`, field);
for (const alias of this.getPnaFieldAliases(field)) {
this.attachPnaFieldAlias(fields, alias, field);
}
}
return fields;
}
parsePnaMessage(data) {
const pnaOffset = data.indexOf(Buffer.from('PNA\x00\x0a', 'latin1'));
if (pnaOffset < 0) return null;
@@ -1988,7 +2587,7 @@ class WTVPNM {
if (dbg) this.debugLog('pna phase 2 complete', `start_offset=${phase2Start}`, `end_offset=${offset}`, `phase2_markers=${phase2Count}`, `total_fields=${fields.length}`);
return fields;
return this.decoratePnaFields(fields);
}
}

View File

@@ -401,7 +401,7 @@
"protocol_handler": "pnm",
"descriptor_after_hello_ms": 85,
"burst_prestart_ms": 5000,
"debug": false,
"debug": true,
"allow_indexing": true
}
},

Binary file not shown.