diff --git a/zefie_wtvp_minisrv/includes/ServiceVault/wtv-music/ragen/catchall.js b/zefie_wtvp_minisrv/includes/ServiceVault/wtv-music/ragen/catchall.js index 5be42660..5cf16b2f 100644 --- a/zefie_wtvp_minisrv/includes/ServiceVault/wtv-music/ragen/catchall.js +++ b/zefie_wtvp_minisrv/includes/ServiceVault/wtv-music/ragen/catchall.js @@ -34,7 +34,6 @@ if (serviceVaultIdx !== -1) { subDirPath = '/' + subdirs.join('/'); } } -console.log("DEBUG: Detected subDirPath =", subDirPath); const url_path = request_headers.request_url.split('?')[0]; const pathParts = url_path.split('/').filter(p => p); @@ -42,8 +41,6 @@ const serviceName = pathParts.length > 0 ? pathParts[0] : ''; let remainingPath = '/' + pathParts.slice(1).join('/'); const hadTrailingSlash = request_headers.request_url.endsWith('/'); -console.log("DEBUG: Before stripping - subDirPath =", subDirPath, "remainingPath =", remainingPath); - let strippedSubDir = ''; // Store what was stripped for link rebuilding // Strip the subdirectory structure from the request path if (subDirPath) { @@ -58,7 +55,6 @@ if (subDirPath) { } } -console.log("DEBUG: After stripping - remainingPath =", remainingPath, "strippedSubDir =", strippedSubDir); // Restore trailing slash if original URL had one if (hadTrailingSlash && !remainingPath.endsWith('/')) { @@ -67,7 +63,6 @@ if (hadTrailingSlash && !remainingPath.endsWith('/')) { const filename = remainingPath.endsWith('/') ? '' : remainingPath.split('/').pop().replace('.ram', ''); const directory = remainingPath.endsWith('/') ? remainingPath.replace(/\/$/, '') : remainingPath.substring(0, remainingPath.lastIndexOf('/')); -console.log("DEBUG: Request for service", serviceName, "with filename", filename, "and directory", directory, "remainingPath", remainingPath); let fileFound = false; const extensions = ['.ra', '.rm']; @@ -80,7 +75,6 @@ if (!filename || (request_headers.request_url.endsWith('/') && minisrv_config.se for (const pnmVault of pnmVaults) { const targetDir = path.join(pnmVault, listingDir); - console.log("DEBUG: Listing files in", targetDir); if (fs.existsSync(targetDir) && fs.statSync(targetDir).isDirectory()) { const files = fs.readdirSync(targetDir); files.forEach(file => { @@ -118,7 +112,6 @@ Content-type: text/html`; for (const pnmVault of pnmVaults) { for (const ext of extensions) { const filePath = path.join(pnmVault, directory, filename + ext); - console.log("DEBUG: Checking for file", filePath); if (fs.existsSync(filePath)) { fileFound = true; resolvedPath = filePath; @@ -134,7 +127,6 @@ Content-type: text/html`; } else { const filePath = path.join(directory || '/', filename + path.extname(resolvedPath)); const pnmURL = `pnm://${minisrv_config.config.service_ip}:${minisrv_config.services['pnm'].port}${filePath.replace(/\\/g, '/')}`; - console.log("DEBUG: File found at", resolvedPath, "serving as", pnmURL); headers = `200 OK Content-type: audio/x-pn-realaudio` data = pnmURL; diff --git a/zefie_wtvp_minisrv/includes/classes/WTVPNM.js b/zefie_wtvp_minisrv/includes/classes/WTVPNM.js index 76cab7f5..6e00ec9b 100644 --- a/zefie_wtvp_minisrv/includes/classes/WTVPNM.js +++ b/zefie_wtvp_minisrv/includes/classes/WTVPNM.js @@ -37,13 +37,9 @@ class WTVPNM { this.wtvshared = new WTVShared(minisrv_config, true); this.server = net.createServer((socket) => this.handleConnection(socket)); - // Stable 32-bit "server id" byte embedded in the first 0x4F chunk of every - // descriptor. Observed RealServer values all share the same high bytes - // (0x00071a??), so keep the upper 24 bits fixed and randomize the low byte - // per process lifetime. The low byte (= descriptor[5]) is NOT a checksum; - // cross-referenced against multi_auth.pcap it is constant within a single - // server process. - this.serverId = 0x00071a00 | (crypto.randomInt(0, 0x100) & 0xff); + // Auth-sensitive descriptor server id shape observed in captures. + // Keep fixed upper 24 bits 0x00071a and vary only the low byte. + this.serverIdBase = 0x00071a00; // Per-session counter embedded in bytes 9..12 of the first 0x4F chunk. // multi_auth.pcap increments this 1,2,3,... across successive sessions. @@ -77,6 +73,7 @@ class WTVPNM { remoteIp: (socket.remoteAddress || '').replace('::ffff:', ''), helloSent: false, descriptorSent: false, + descriptorInFlight: false, descriptorTimer: null, capabilitiesLogged: false, capabilities: [], @@ -92,6 +89,8 @@ class WTVPNM { // receive events or coalesced with other bytes; we accumulate and // decode them here in handleControlCommands(). ctrlBuf: Buffer.alloc(0), + // 35-byte auth hash can be fragmented/coalesced on TCP. + authBuf: Buffer.alloc(0), // RDT b5 nibble state. High nibble = 'seek generation' (starts // at 1, bumped on every 0x53 seek). Low nibble = (seq - // seekBaseSeq) & 0xf where seekBaseSeq is the wall-seq of the @@ -107,7 +106,21 @@ class WTVPNM { // (avg bitrate, etc.). Falls back to global defaults when unset. rdtDataTypeLo: null, rdtSyncType: null, - audioChannels: null + audioChannels: null, + mediaUdpPort: null, + serverId: null, + serverUdpPort: null, + _udpSocketHandlersAttached: false, + udpSocket: null, + udpPacketCache: new Map(), + udpPacketOrder: [], + udpFeedbackWindowStart: 0, + udpFeedbackResentInWindow: 0, + udpFeedbackDropped: 0, + udpFeedbackPeerPort: null, + udpPriorityUntil: 0, + udpInboundCount: 0, + udpFeedbackProbeTimer: null }; this.sessions.set(socket, session); @@ -170,10 +183,13 @@ class WTVPNM { const parsedChallenge = this.getClientChallenge(session.pnaFields); if (parsedChallenge) session.clientChallenge = parsedChallenge; - // Extract UDP port from PNA field ID 1 (2 bytes, big-endian) + // Extract client UDP port from PNA field ID 1 (big-endian u16). const udpPortField = session.pnaFields.find(f => f.id === 1 && f.len === 2); - if (udpPortField) { - session.clientUdpPort = udpPortField.value.readUInt16BE(0); + session.clientUdpPort = (udpPortField && Buffer.isBuffer(udpPortField.value)) + ? this.sanitizeUdpPort(udpPortField.value.readUInt16BE(0)) + : null; + session.mediaUdpPort = session.clientUdpPort; + if (session.clientUdpPort) { this.debugLog('client udp port', session.id, session.clientUdpPort); } @@ -182,7 +198,6 @@ class WTVPNM { session.requestedMedia = parsedMedia; session.mediaPath = this.resolveMediaPath(session.requestedMedia); } - if (!session.capabilitiesLogged) { session.capabilitiesLogged = true; const cap = this.extractCapabilities(data); @@ -205,14 +220,20 @@ class WTVPNM { this.debugLog('requested media', session.id, session.requestedMedia); } } - + console.log('*', `[${session.id}]`, `PNM RealServer Request for media ${session.mediaPath}`); + const pnmHeaders = { + 'clientChallenge': session.clientChallenge, + 'timestamp': session.pnaFields?.timestamp, + 'requestedMedia': session.requestedMedia, + 'User-Agent': session.pnaFields?.useragent, + 'clientUDPPort': session.clientUdpPort, + }; + console.log('*', 'PNM Request Data:', pnmHeaders); if (session.requestedMedia && !session.mediaPath) { - console.log(' * PNM RealServer Warning: requested media not found', session.requestedMedia); + console.log('*', 'PNM Error:', session.requestedMedia, 'not found in service vault(s)'); this.sendNotFound(socket, session.requestedMedia); session.notFoundSent = true; return; - } else { - console.log(' * PNM RealServer Request from', session.id, 'for media', session.mediaPath); } } @@ -229,28 +250,113 @@ class WTVPNM { } if (session.helloSent && session.descriptorSent) { - // Client sends hash response: 0x23 0x00 0x20 + 32 hex chars (35 bytes total) - if (data.length === 35 && data[0] === 0x23) { - const hashHex = data.toString('ascii', 3, 35); - this.debugLog('client hash response', session.id, hashHex); - // Verify client response: RNWK_MD5(serverHello_BE, 0x00000000, clientChallenge) - const expectedResp = this.computeClientResponse(session); - if (expectedResp && hashHex === expectedResp) { - this.debugLog('client hash VERIFIED', session.id); - session.hashVerified = true; - } else { - this.debugLog('client hash MISMATCH', session.id, 'expected', expectedResp); + if (!session.hashVerified) { + session.authBuf = session.authBuf && session.authBuf.length + ? Buffer.concat([session.authBuf, data]) + : Buffer.from(data); + + if (session.authBuf.length < 35) { + return; } + + const expectedResp = this.computeClientResponse(session); + let authOffset = -1; + let hashHex = null; + + // Find a complete auth frame anywhere in the buffered stream: + // [0x23, 0x00, 0x20, 32 ASCII hex chars] + for (let i = 0; i + 35 <= session.authBuf.length; i++) { + if (session.authBuf[i] !== 0x23 || session.authBuf[i + 1] !== 0x00 || session.authBuf[i + 2] !== 0x20) { + continue; + } + + const candidate = session.authBuf.toString('ascii', i + 3, i + 35).toLowerCase(); + if (!/^[a-f0-9]{32}$/.test(candidate)) continue; + + // Prefer a candidate that matches the expected digest. + if (expectedResp && candidate === expectedResp) { + authOffset = i; + hashHex = candidate; + break; + } + + // Keep first syntactically valid candidate as fallback. + if (authOffset < 0) { + authOffset = i; + hashHex = candidate; + } + } + + if (authOffset < 0) { + // No complete auth frame yet; spill older bytes to control parser + // while keeping a tail window for fragmented auth frames. + if (session.authBuf.length > 512) { + const keepTail = 96; + const spill = session.authBuf.slice(0, session.authBuf.length - keepTail); + session.authBuf = session.authBuf.slice(session.authBuf.length - keepTail); + if (spill.length > 0) { + this.handleControlCommands(socket, session, spill); + } + } + return; + } + + if (expectedResp && hashHex !== expectedResp) { + // Found a syntactically valid auth frame, but not ours yet. + // Keep buffering in case the matching frame is still pending. + if (session.authBuf.length > 768) { + session.authBuf = session.authBuf.slice(-128); + } + return; + } + + const preAuth = session.authBuf.slice(0, authOffset); + const remaining = session.authBuf.slice(authOffset + 35); + session.authBuf = Buffer.alloc(0); + + if (preAuth.length > 0) { + this.handleControlCommands(socket, session, preAuth); + } + + this.debugLog('client hash response', session.id, hashHex); + + if (expectedResp && hashHex === expectedResp) { + session.hashVerified = true; + const burstPrestartMs = typeof this.service_config.burst_prestart_ms === 'number' + ? this.service_config.burst_prestart_ms + : 3000; + const mediaHeaders = { + 'challengeResponse': expectedResp, + 'avgBitrate': session.avgBitRate, + 'audioChannels': session.audioChannels, + 'burstMaxRate': session.avgBitRate * 2, + 'burstDurationMs': burstPrestartMs + }; + console.log('*', 'PNM Result Data:', mediaHeaders); + } else { + console.log('*', 'PNM Error: client hash response did not match expected value', session.requestedMedia); + socket.close(); + return; + } + if (session.clientUdpPort) { this.startUdpStream(socket, session); + } else { + this.debugLog('hash verified, waiting for UDP peer port', session.id); + this.attachUdpSocketHandlers(socket, session); } - } else { - // Post-descriptor control byte stream. See handleControlCommands - // for opcode list. Accumulate and decode — RP8 uses seek/ - // pause/resume commands here that can arrive coalesced or - // fragmented across TCP segments. - this.handleControlCommands(socket, session, data); + + if (remaining.length > 0) { + this.handleControlCommands(socket, session, remaining); + } + return; } + + // Post-descriptor control byte stream. See handleControlCommands + // for opcode list. Accumulate and decode — RP8 uses seek/ + // pause/resume commands here that can arrive coalesced or + // fragmented across TCP segments. + this.handleControlCommands(socket, session, data); return; } } @@ -262,6 +368,7 @@ class WTVPNM { session.helloSent = false; session.descriptorSent = false; + session.descriptorInFlight = false; session.notFoundSent = false; session.capabilitiesLogged = false; session.capabilities = []; @@ -270,10 +377,24 @@ class WTVPNM { session.mediaPath = null; session.pnaFields = null; session.ctrlBuf = Buffer.alloc(0); + session.authBuf = Buffer.alloc(0); session.paused = false; session.eosSent = false; session.hashVerified = false; session.sessionNumber = undefined; + session.mediaUdpPort = null; + session.udpPacketCache = new Map(); + session.udpPacketOrder = []; + session.udpFeedbackWindowStart = 0; + session.udpFeedbackResentInWindow = 0; + session.udpFeedbackDropped = 0; + session.udpFeedbackPeerPort = null; + session.udpPriorityUntil = 0; + session.udpInboundCount = 0; + if (session.udpFeedbackProbeTimer) { + clearTimeout(session.udpFeedbackProbeTimer); + session.udpFeedbackProbeTimer = null; + } } // Parse the post-descriptor TCP control stream sent by RealPlayer during @@ -605,14 +726,30 @@ class WTVPNM { ? this.service_config.descriptor_after_hello_ms : 100; - session.descriptorTimer = setTimeout(() => { + session.descriptorTimer = setTimeout(async () => { session.descriptorTimer = null; - if (socket.destroyed || session.descriptorSent) return; - const descriptor = this.buildDescriptorPacket(session); - session.descriptorSent = true; - this.send(socket, descriptor); - this.debugLog('descriptor sent', session.id, `len=${descriptor.length}`, `delay=${descriptorDelay}ms`); - this.prepareMediaData(session); + if (socket.destroyed || session.descriptorSent || session.descriptorInFlight) return; + + session.descriptorInFlight = true; + + try { + const udpReady = await this.ensureSessionUdpSocket(session); + if (!udpReady) { + this.debugLog('descriptor aborted: failed to reserve UDP socket', session.id); + socket.destroy(); + return; + } + + if (socket.destroyed || session.descriptorSent) return; + + const descriptor = this.buildDescriptorPacket(session); + session.descriptorSent = true; + this.send(socket, descriptor); + this.debugLog('descriptor sent', session.id, `len=${descriptor.length}`, `delay=${descriptorDelay}ms`); + this.prepareMediaData(session); + } finally { + session.descriptorInFlight = false; + } }, descriptorDelay); } @@ -624,19 +761,34 @@ class WTVPNM { } } - sendDescriptorAndStartStream(socket, session, reason) { - if (!socket || !session || session.descriptorSent) return; + async sendDescriptorAndStartStream(socket, session, reason) { + if (!socket || !session || session.descriptorSent || session.descriptorInFlight) return; this.clearDescriptorTimer(session); if (socket.destroyed) return; - this.send(socket, this.buildDescriptorPacket(session)); - session.descriptorSent = true; - this.debugLog('descriptor sent', session.id, reason); + session.descriptorInFlight = true; - this.prepareMediaData(session); + try { + const udpReady = await this.ensureSessionUdpSocket(session); + if (!udpReady) { + this.debugLog('descriptor aborted: failed to reserve UDP socket', session.id); + socket.destroy(); + return; + } - // Wait for UDP port response from client before starting stream - this.debugLog('descriptor sent, waiting for client UDP port response on TCP connection', session.id); + if (socket.destroyed || session.descriptorSent) return; + + this.send(socket, this.buildDescriptorPacket(session)); + session.descriptorSent = true; + this.debugLog('descriptor sent', session.id, reason); + + this.prepareMediaData(session); + + // Wait for UDP port response from client before starting stream + this.debugLog('descriptor sent, waiting for client UDP port response on TCP connection', session.id); + } finally { + session.descriptorInFlight = false; + } } stopUdpStream(session) { @@ -645,6 +797,10 @@ class WTVPNM { clearTimeout(session.udpStartTimer); session.udpStartTimer = null; } + if (session.udpFeedbackProbeTimer) { + clearTimeout(session.udpFeedbackProbeTimer); + session.udpFeedbackProbeTimer = null; + } if (session.udpTimer) { clearInterval(session.udpTimer); session.udpTimer = null; @@ -653,6 +809,333 @@ class WTVPNM { try { session.udpSocket.close(); } catch(e) {} session.udpSocket = null; } + session.serverId = null; + session.serverUdpPort = null; + session._udpSocketHandlersAttached = false; + session.udpPacketCache = new Map(); + session.udpPacketOrder = []; + session.udpFeedbackWindowStart = 0; + session.udpFeedbackResentInWindow = 0; + session.udpFeedbackDropped = 0; + session.udpFeedbackPeerPort = null; + session.udpInboundCount = 0; + } + + sanitizeUdpPort(port) { + const parsed = Number(port); + return Number.isInteger(parsed) && parsed > 0 && parsed <= 65535 ? parsed : null; + } + + async ensureSessionUdpSocket(session) { + if (!session) return false; + + const existingPort = this.sanitizeUdpPort(session.serverUdpPort); + if (session.udpSocket && existingPort) return true; + + const bindIp = this.minisrv_config?.config?.bind_ip || '0.0.0.0'; + // Keep source ports in 0x1a00-0x1aff so descriptor serverId can stay + // in the auth-compatible 0x00071a?? shape while still supporting + // per-session unique ports. + const basePort = Number.isInteger(this.service_config.udp_bind_port_base) + ? this.service_config.udp_bind_port_base + : 0x1a00; + const span = Number.isInteger(this.service_config.udp_bind_port_span) + ? Math.max(1, this.service_config.udp_bind_port_span) + : 0x100; + + const startOffset = crypto.randomInt(0, span); + let udpSocket = null; + let boundPort = null; + + for (let i = 0; i < span; i++) { + const port = basePort + ((startOffset + i) % span); + if (port <= 0 || port > 65535) continue; + + const candidate = dgram.createSocket('udp4'); + const didBind = await new Promise((resolve) => { + const onError = () => resolve(false); + candidate.once('error', onError); + candidate.bind(port, bindIp, () => { + candidate.removeListener('error', onError); + resolve(true); + }); + }); + + if (didBind) { + udpSocket = candidate; + boundPort = port; + break; + } + + try { candidate.close(); } catch (_) {} + } + + if (!udpSocket || !boundPort) { + this.debugLog('udp reserve bind failed', session.id, + `base=${basePort}`, `span=${span}`, 'no free ports'); + return false; + } + + session.udpSocket = udpSocket; + session._udpSocketHandlersAttached = false; + + try { + const addr = udpSocket.address(); + session.serverUdpPort = this.sanitizeUdpPort(addr.port); + session.serverId = (this.serverIdBase | (addr.port & 0xff)) >>> 0; + this.debugLog('udp socket reserved', session.id, `${addr.address}:${addr.port}`); + } catch (_) { + session.serverId = null; + session.serverUdpPort = null; + } + + return !!session.serverUdpPort; + } + + normalizeIpAddress(ip) { + return String(ip || '').replace(/^::ffff:/i, ''); + } + + attachUdpSocketHandlers(socket, session) { + if (!socket || !session || !session.udpSocket || session._udpSocketHandlersAttached) return; + + session.udpSocket.on('error', (err) => { + this.debugLog('udp socket error', session.id, err.message); + this.stopUdpStream(session); + }); + + // Some clients send UDP resend/feedback before playback is fully + // underway. Keep this listener active as soon as socket is reserved. + session.udpSocket.on('message', (msg, rinfo) => { + session.udpInboundCount = (session.udpInboundCount || 0) + 1; + this.debugLog('udp rx', session.id, `from=${rinfo.address}:${rinfo.port}`, + `len=${msg.length}`, 'hex', msg.slice(0, 32).toString('hex')); + this.handleUdpFeedback(socket, session, msg, rinfo); + }); + + session._udpSocketHandlersAttached = true; + } + + cacheUdpPacketForRetransmit(session, seq16, payload) { + if (!session || !Buffer.isBuffer(payload)) return; + const enabled = this.service_config.udp_retransmit_enabled !== false; + if (!enabled) return; + + if (!session.udpPacketCache) session.udpPacketCache = new Map(); + if (!Array.isArray(session.udpPacketOrder)) session.udpPacketOrder = []; + + const maxCache = Number.isInteger(this.service_config.udp_retransmit_cache_size) + ? Math.max(64, this.service_config.udp_retransmit_cache_size) + : 4096; + const now = Date.now(); + const key = seq16 & 0xffff; + const existing = session.udpPacketCache.get(key); + if (!existing) { + session.udpPacketOrder.push(key); + } + session.udpPacketCache.set(key, { payload: Buffer.from(payload), ts: now }); + + while (session.udpPacketOrder.length > maxCache) { + const dropKey = session.udpPacketOrder.shift(); + session.udpPacketCache.delete(dropKey); + } + + const maxAgeMs = Number.isInteger(this.service_config.udp_retransmit_cache_max_age_ms) + ? Math.max(250, this.service_config.udp_retransmit_cache_max_age_ms) + : 30000; + while (session.udpPacketOrder.length > 0) { + const oldestKey = session.udpPacketOrder[0]; + const oldestEntry = session.udpPacketCache.get(oldestKey); + if (!oldestEntry || now - oldestEntry.ts > maxAgeMs) { + session.udpPacketOrder.shift(); + session.udpPacketCache.delete(oldestKey); + continue; + } + break; + } + } + + extractUdpRetransmitSeqs(session, msg) { + if (!session || !session.udpPacketCache || !Buffer.isBuffer(msg) || msg.length === 0) { + return []; + } + + const out = new Set(); + const maxSeqs = Number.isInteger(this.service_config.udp_retransmit_max_seqs_per_feedback) + ? Math.max(1, this.service_config.udp_retransmit_max_seqs_per_feedback) + : 32; + + const pushIfCached = (seq) => { + const key = seq & 0xffff; + if (session.udpPacketCache.has(key)) { + out.add(key); + } + }; + + // ASCII feedback support (test clients/tools): + // NAK 12,13,0x0014 + // RETRANS 12 13 + const ascii = msg.toString('latin1').replace(/[^\x20-\x7E]/g, ' ').trim(); + if (/^(NAK|NACK|RETRANS|RESEND)\b/i.test(ascii)) { + const matches = ascii.match(/0x[0-9a-fA-F]+|\d+/g) || []; + for (const token of matches) { + const parsed = token.toLowerCase().startsWith('0x') + ? parseInt(token, 16) + : parseInt(token, 10); + if (Number.isInteger(parsed)) pushIfCached(parsed); + if (out.size >= maxSeqs) break; + } + return Array.from(out); + } + + // Binary fallback heuristics. + // Some clients encode seq requests as BE u16, others as LE u16, and + // some prepend an opcode byte. + if (msg.length === 2) { + pushIfCached(msg.readUInt16BE(0)); + pushIfCached(msg.readUInt16LE(0)); + return Array.from(out); + } + + const collectWords = (startOffset, littleEndian = false) => { + for (let i = startOffset; i + 1 < msg.length; i += 2) { + const seq = littleEndian ? msg.readUInt16LE(i) : msg.readUInt16BE(i); + pushIfCached(seq); + if (out.size >= maxSeqs) break; + } + }; + + const collectDwordsLow16 = (startOffset, littleEndian = false) => { + for (let i = startOffset; i + 3 < msg.length; i += 4) { + const val = littleEndian ? msg.readUInt32LE(i) : msg.readUInt32BE(i); + pushIfCached(val & 0xffff); + if (out.size >= maxSeqs) break; + } + }; + + if (msg.length % 2 === 0) { + collectWords(0, false); + if (out.size < maxSeqs) collectWords(0, true); + } else { + collectWords(1, false); + if (out.size < maxSeqs) collectWords(1, true); + } + + // Try opposite alignment once. + if (out.size === 0 && msg.length >= 4) { + const alt = msg.length % 2 === 0 ? 1 : 0; + collectWords(alt, false); + if (out.size < maxSeqs) collectWords(alt, true); + } + + // Some feedback payloads carry 32-bit request entries. + if (out.size === 0 && msg.length >= 8) { + const preferred = msg.length % 2 === 0 ? 0 : 1; + collectDwordsLow16(preferred, false); + if (out.size < maxSeqs) collectDwordsLow16(preferred, true); + if (out.size === 0) { + const alt = preferred === 0 ? 1 : 0; + collectDwordsLow16(alt, false); + if (out.size < maxSeqs) collectDwordsLow16(alt, true); + } + } + + return Array.from(out).slice(0, maxSeqs); + } + + handleUdpFeedback(socket, session, msg, rinfo) { + if (!socket || !session || !session.udpSocket) return; + if (this.service_config.udp_retransmit_enabled === false) return; + + const expectedIp = this.normalizeIpAddress(socket.remoteAddress); + const rxIp = this.normalizeIpAddress(rinfo?.address); + const rxPort = Number.isInteger(rinfo?.port) ? rinfo.port : -1; + const strictPeerPort = this.service_config.udp_retransmit_strict_peer_port === true; + + if (rxIp !== expectedIp) { + this.debugLog('udp feedback ignored (endpoint mismatch)', session.id, + `from=${rxIp}:${rxPort}`, + `expected=${expectedIp}:${session.clientUdpPort}`); + return; + } + + if (strictPeerPort && session.clientUdpPort && rxPort !== session.clientUdpPort) { + this.debugLog('udp feedback ignored (port mismatch, strict mode)', session.id, + `from=${rxIp}:${rxPort}`, + `expected=${expectedIp}:${session.clientUdpPort}`); + return; + } + + if (!strictPeerPort && !session.udpFeedbackPeerPort) { + session.udpFeedbackPeerPort = rxPort; + this.debugLog('udp feedback peer learned', session.id, + `peer=${rxIp}:${rxPort}`, + `mediaTarget=${expectedIp}:${session.mediaUdpPort || session.clientUdpPort}`, + `retransmitTarget=${expectedIp}:${rxPort}`); + + // If auth already passed and stream hasn't started yet, begin now. + if (session.hashVerified && !session.udpTimer && !session.udpStartTimer) { + this.debugLog('starting UDP stream after peer learn', session.id, + `target=${expectedIp}:${session.mediaUdpPort || session.clientUdpPort}`); + this.startUdpStream(socket, session); + } + } + + if (!session.clientUdpPort) return; + + const requestedSeqs = this.extractUdpRetransmitSeqs(session, msg); + if (!requestedSeqs.length) return; + + const priorityHoldMs = Number.isInteger(this.service_config.udp_retransmit_priority_hold_ms) + ? Math.max(0, this.service_config.udp_retransmit_priority_hold_ms) + : 18; + if (priorityHoldMs > 0) { + session.udpPriorityUntil = Math.max(session.udpPriorityUntil || 0, Date.now() + priorityHoldMs); + } + + const now = Date.now(); + const windowMs = Number.isInteger(this.service_config.udp_retransmit_window_ms) + ? Math.max(250, this.service_config.udp_retransmit_window_ms) + : 1000; + const maxPerWindow = Number.isInteger(this.service_config.udp_retransmit_max_per_window) + ? Math.max(1, this.service_config.udp_retransmit_max_per_window) + : 24; + + if (!session.udpFeedbackWindowStart || now - session.udpFeedbackWindowStart >= windowMs) { + session.udpFeedbackWindowStart = now; + session.udpFeedbackResentInWindow = 0; + } + + let resent = 0; + for (const seq16 of requestedSeqs) { + if (session.udpFeedbackResentInWindow >= maxPerWindow) { + session.udpFeedbackDropped = (session.udpFeedbackDropped || 0) + 1; + this.debugLog('udp retransmit rate-limited', session.id, + `windowMs=${windowMs}`, + `max=${maxPerWindow}`, + `dropped=${session.udpFeedbackDropped}`); + break; + } + + const cached = session.udpPacketCache.get(seq16 & 0xffff); + if (!cached || !Buffer.isBuffer(cached.payload)) continue; + + const txPort = this.sanitizeUdpPort(session.udpFeedbackPeerPort) + || session.mediaUdpPort + || session.clientUdpPort; + session.udpSocket.send(cached.payload, 0, cached.payload.length, + txPort, expectedIp, (err) => { + if (err) this.debugLog('udp retransmit send err', session.id, `seq=${seq16}`, err.message); + }); + resent++; + session.udpFeedbackResentInWindow++; + } + + if (resent > 0) { + this.debugLog('udp retransmit', session.id, + `count=${resent}`, + `seqs=${requestedSeqs.slice(0, resent).join(',')}`); + } } prepareMediaData(session) { @@ -674,7 +1157,7 @@ class WTVPNM { session.rdtPacketMode = 'classic-len'; session.syncEvery = Number.isInteger(this.service_config.rdt_sync_every_classic) ? Math.max(1, this.service_config.rdt_sync_every_classic) - : 8; + : 5; const cfgDataTypeLo = Number.isInteger(this.service_config.rdt_data_type_lo) ? (this.service_config.rdt_data_type_lo & 0xff) @@ -689,7 +1172,7 @@ class WTVPNM { } else { const useLegacyProfile = classicRa.channels === 1 || classicRa.channels === null; session.rdtDataTypeLo = useLegacyProfile ? 0x64 : 0x50; - session.rdtSyncType = 0x0455; + session.rdtSyncType = useLegacyProfile ? 0x0477 : 0x04ba; } const payload = media.subarray(classicRa.dataOffset); @@ -993,25 +1476,36 @@ class WTVPNM { const burstFrameCount = burstPrestartMs > 0 ? Math.ceil(burstPrestartMs / intervalMs) : 0; session.burstFramesSent = 0; + const targetIp = this.normalizeIpAddress(socket.remoteAddress); + const mediaTargetPort = this.sanitizeUdpPort(session.mediaUdpPort) || session.clientUdpPort; this.debugLog('udp stream start', session.id, `frames=${session.mediaFrames?.length || 0}`, `avgBitRate=${session.avgBitRate || 'unknown'}bps`, `bodyLen=${bodyLen}`, `interval=${intervalMs.toFixed(2)}ms`, `burstFrames=${burstFrameCount}`, - `target=${socket.remoteAddress}:${session.clientUdpPort}`); + `target=${targetIp}:${mediaTargetPort}`, + `sourcePort=${session.serverUdpPort || 'unknown'}`); - session.udpSocket = dgram.createSocket('udp4'); - session.udpSocket.on('error', (err) => { - this.debugLog('udp socket error', session.id, err.message); - this.stopUdpStream(session); - }); - // Some RDT clients also send ACK/resend requests back on the same UDP - // flow. Bind so we can receive (even if we ignore the content). - session.udpSocket.on('message', (msg, rinfo) => { - this.debugLog('udp rx', session.id, `from=${rinfo.address}:${rinfo.port}`, - `len=${msg.length}`, 'hex', msg.slice(0, 32).toString('hex')); - }); + if (!session.udpSocket || !this.sanitizeUdpPort(session.serverUdpPort)) { + this.debugLog('udp stream start failed: socket not reserved', session.id); + return; + } + + this.attachUdpSocketHandlers(socket, session); + + if (session.udpFeedbackProbeTimer) { + clearTimeout(session.udpFeedbackProbeTimer); + } + session.udpFeedbackProbeTimer = setTimeout(() => { + session.udpFeedbackProbeTimer = null; + if (!session.udpSocket || socket.destroyed) return; + if ((session.udpInboundCount || 0) === 0) { + this.debugLog('udp feedback not seen yet', session.id, + `streamTarget=${this.normalizeIpAddress(socket.remoteAddress)}:${mediaTargetPort}`, + 'If packet capture shows ICMP port unreachable, client is not listening on requested UDP port.'); + } + }, 2500); // sendPacket wraps buildMediaPayload with the every-5th-sync-frame // prefix and writes to the UDP socket. Wall-seq and frame are passed @@ -1024,9 +1518,11 @@ class WTVPNM { const out = withSync ? Buffer.concat([this.buildSyncFrame(session, seq), dataFrame]) : dataFrame; + this.cacheUdpPacketForRetransmit(session, seq, out); + const txPort = mediaTargetPort; session.udpSocket.send(out, 0, out.length, - session.clientUdpPort, socket.remoteAddress, (err) => { - if (err) this.debugLog('udp send err', session.id, err.message); + txPort, targetIp, (err) => { + if (err) this.debugLog('udp send err', session.id, `${targetIp}:${txPort}`, err.message); }); }; @@ -1048,6 +1544,14 @@ class WTVPNM { } // Don't re-arm while paused; resumeUdpStream calls _startDataInterval. if (session.paused) return; + + const priorityUntil = session.udpPriorityUntil || 0; + if (priorityUntil > Date.now()) { + const waitMs = Math.max(1, Math.min(priorityUntil - Date.now(), 10)); + session.udpTimer = setTimeout(tick, waitMs); + return; + } + const frames = session.mediaFrames; if (!frames || session.mediaFrameIdx >= frames.length) { // End of media: stop sending once all RA frames are out. @@ -1520,11 +2024,12 @@ class WTVPNM { const out = Buffer.concat(outChunks); // The first 0x4F/0x08 chunk carries [serverId_u32_BE][sessionCounter_u32_BE]. - // These are NOT a checksum of serverChallenge — verified against - // multi_auth.pcap (6 sessions, constant serverId, incrementing counter). - // Descriptor layout: [0x4F, 0x08, serverId(4), sessionCounter(4), ...] - // so serverId occupies out[2..6] and sessionCounter occupies out[6..10]. - out.writeUInt32BE(this.serverId >>> 0, 2); + // Keep auth-compatible 0x00071a?? serverId shape and encode per-session + // UDP port in the low byte (port range 0x1a00-0x1aff). + const serverId = Number.isInteger(session?.serverId) + ? session.serverId + : ((this.serverIdBase | 0x27) >>> 0); + out.writeUInt32BE(serverId, 2); const sessionNumber = (session && typeof session.sessionNumber === 'number') ? session.sessionNumber : ++this.sessionCounter; @@ -1896,6 +2401,100 @@ class WTVPNM { return Array.from(out).slice(0, 20); } + getPnaFieldAliases(field) { + if (!field) return []; + + switch (field.id) { + case 0: + return ['maskedclienttime', 'maskedtime']; + case 1: + return ['udpport', 'clientudpport']; + case 4: + return ['challenge', 'clientchallenge']; + case 23: + return ['timestamp', 'clienttimestamp']; + case 0x42: + return ['bitrate']; + case 0x52: + return ['requestedmedia', 'resourcepath', 'filename']; + case 0x63: + return ['useragent']; + default: + return []; + } + } + + decodePnaFieldValue(field, alias = null) { + if (!field || !Buffer.isBuffer(field.value)) return null; + + if (alias === 'udpport' || alias === 'clientudpport') { + return field.len >= 2 ? field.value.readUInt16BE(0) : null; + } + + if (alias === 'maskedclienttime' || alias === 'maskedtime') { + return field.len >= 4 ? field.value.readUInt32BE(0) : null; + } + + if (alias === 'bitrate') { + if (field.len >= 4) { + return field.value.readUInt32BE(0); + } + + const bitrateText = field.value.toString('latin1').replace(/\x00+$/g, '').trim(); + const bitrateMatch = bitrateText.match(/(?:bitrate|avg[_ -]?bitrate|max[_ -]?bitrate)\D+(\d{3,})/i); + if (bitrateMatch) { + return parseInt(bitrateMatch[1], 10); + } + return bitrateText || null; + } + + const textValue = field.value.toString('latin1').replace(/\x00+$/g, '').trim(); + if (textValue.length > 0) { + return textValue; + } + + return Buffer.from(field.value); + } + + attachPnaFieldAlias(fields, alias, field) { + if (!alias || !fields || !field) return; + + const decodedValue = this.decodePnaFieldValue(field, alias); + const fieldKey = `${alias}Field`; + + if (!(alias in fields)) { + fields[alias] = decodedValue; + fields[fieldKey] = field; + return; + } + + if (!Array.isArray(fields[alias])) { + fields[alias] = [fields[alias]]; + } + fields[alias].push(decodedValue); + + if (!Array.isArray(fields[fieldKey])) { + fields[fieldKey] = [fields[fieldKey]]; + } + fields[fieldKey].push(field); + } + + decoratePnaFields(fields) { + if (!Array.isArray(fields)) return fields; + + for (const field of fields) { + if (!field) continue; + + this.attachPnaFieldAlias(fields, `field_${field.id}`, field); + + for (const alias of this.getPnaFieldAliases(field)) { + this.attachPnaFieldAlias(fields, alias, field); + } + } + + return fields; + } + parsePnaMessage(data) { const pnaOffset = data.indexOf(Buffer.from('PNA\x00\x0a', 'latin1')); if (pnaOffset < 0) return null; @@ -1988,7 +2587,7 @@ class WTVPNM { if (dbg) this.debugLog('pna phase 2 complete', `start_offset=${phase2Start}`, `end_offset=${offset}`, `phase2_markers=${phase2Count}`, `total_fields=${fields.length}`); - return fields; + return this.decoratePnaFields(fields); } } diff --git a/zefie_wtvp_minisrv/includes/config.json b/zefie_wtvp_minisrv/includes/config.json index 9fb48db5..89447d9f 100644 --- a/zefie_wtvp_minisrv/includes/config.json +++ b/zefie_wtvp_minisrv/includes/config.json @@ -401,7 +401,7 @@ "protocol_handler": "pnm", "descriptor_after_hello_ms": 85, "burst_prestart_ms": 5000, - "debug": false, + "debug": true, "allow_indexing": true } }, diff --git a/zefie_wtvp_minisrv/realaudio3.pcap b/zefie_wtvp_minisrv/realaudio3.pcap deleted file mode 100644 index bdb05005..00000000 Binary files a/zefie_wtvp_minisrv/realaudio3.pcap and /dev/null differ