Files
windmill/multiplayer/server.mjs
Ruben Fiszel cb8b264dee add signed request authentication to multiplayer websocket (#8534)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 22:23:47 +00:00

334 lines
10 KiB
JavaScript

#!/usr/bin/env node
/**
* Simple y-websocket server with connection logging and JWT authentication.
* Run with: node server.mjs
*/
import http from 'http'
import crypto from 'node:crypto'
import { WebSocketServer } from 'ws'
import * as Y from 'yjs'
import * as syncProtocol from 'y-protocols/sync'
import * as awarenessProtocol from 'y-protocols/awareness'
import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'
const PORT = process.env.PORT || 3002
const HOST = process.env.HOST || '0.0.0.0'
const WINDMILL_BASE_URL = process.env.WINDMILL_BASE_URL || process.env.BASE_INTERNAL_URL
const REQUIRE_SIGNED_REQUESTS = process.env.REQUIRE_SIGNED_MULTIPLAYER_REQUESTS !== 'false'
const messageSync = 0
const messageAwareness = 1
// --- JWT verification ---
let cachedPublicKey = null
let publicKeyFetchPromise = null
function base64urlDecode(str) {
const padding = '='.repeat((4 - str.length % 4) % 4)
const base64 = str.replace(/-/g, '+').replace(/_/g, '/') + padding
const binary = atob(base64)
return Uint8Array.from(binary, c => c.charCodeAt(0))
}
async function getPublicKey() {
if (cachedPublicKey) return cachedPublicKey
if (publicKeyFetchPromise) return publicKeyFetchPromise
if (!WINDMILL_BASE_URL) {
console.warn(`[${new Date().toISOString()}] WINDMILL_BASE_URL not set - cannot fetch public key`)
return null
}
publicKeyFetchPromise = (async () => {
try {
const jwksUrl = `${WINDMILL_BASE_URL.replace(/\/$/, '')}/api/debug/jwks`
console.log(`[${new Date().toISOString()}] Fetching JWKS from ${jwksUrl}`)
const response = await fetch(jwksUrl)
if (!response.ok) {
throw new Error(`Failed to fetch JWKS: ${response.status} ${response.statusText}`)
}
const jwks = await response.json()
if (!jwks.keys || jwks.keys.length === 0) {
throw new Error('No keys in JWKS')
}
const jwk = jwks.keys[0]
if (jwk.kty !== 'OKP' || jwk.crv !== 'Ed25519') {
throw new Error(`Unsupported key type: ${jwk.kty}/${jwk.crv}`)
}
const publicKeyBytes = base64urlDecode(jwk.x)
const key = await crypto.subtle.importKey(
'raw',
publicKeyBytes,
{ name: 'Ed25519' },
true,
['verify']
)
cachedPublicKey = key
console.log(`[${new Date().toISOString()}] Successfully loaded Ed25519 public key from JWKS`)
return key
} catch (error) {
console.error(`[${new Date().toISOString()}] Failed to fetch/parse JWKS: ${error}`)
return null
} finally {
publicKeyFetchPromise = null
}
})()
return publicKeyFetchPromise
}
/**
* Verify a JWT multiplayer token.
* Returns null if valid, or an error message if invalid.
*/
async function verifyToken(token, docName) {
const publicKey = await getPublicKey()
if (!publicKey) {
if (REQUIRE_SIGNED_REQUESTS) {
return 'Public key not available but signed requests are required. Set WINDMILL_BASE_URL.'
}
console.warn(`[${new Date().toISOString()}] Public key not available - signature verification disabled`)
return null
}
const parts = token.split('.')
if (parts.length !== 3) {
return 'Invalid JWT format'
}
const [headerB64, claimsB64, signatureB64] = parts
try {
const message = new TextEncoder().encode(`${headerB64}.${claimsB64}`)
const signature = base64urlDecode(signatureB64)
const isValid = await crypto.subtle.verify(
{ name: 'Ed25519' },
publicKey,
signature,
message
)
if (!isValid) {
return 'Invalid JWT signature'
}
const claimsJson = new TextDecoder().decode(base64urlDecode(claimsB64))
const claims = JSON.parse(claimsJson)
// Check expiration
const now = Math.floor(Date.now() / 1000)
if (now > claims.exp) {
return `Token expired: ${now - claims.exp} seconds ago`
}
// Check purpose
if (claims.purpose !== 'multiplayer') {
return `Invalid token purpose: ${claims.purpose}`
}
// Check workspace matches the doc room (format: "{workspace}/{path}")
const docWorkspace = docName.split('/')[0]
if (docWorkspace && claims.workspace_id !== docWorkspace) {
return `Token workspace "${claims.workspace_id}" does not match room workspace "${docWorkspace}"`
}
return null
} catch (error) {
return `JWT verification error: ${error}`
}
}
// --- Y.js document management ---
// Store docs in memory
const docs = new Map()
const getYDoc = (docname) => {
let doc = docs.get(docname)
if (!doc) {
doc = new Y.Doc()
doc.name = docname
docs.set(docname, doc)
}
return doc
}
const send = (conn, message) => {
if (conn.readyState === 1) { // WebSocket.OPEN
conn.send(message, err => { if (err) console.error(err) })
}
}
const setupWSConnection = (conn, req, docName) => {
const doc = getYDoc(docName)
// Initialize awareness
if (!doc.awareness) {
doc.awareness = new awarenessProtocol.Awareness(doc)
}
const awareness = doc.awareness
// Track connections per doc
if (!doc.conns) doc.conns = new Set()
doc.conns.add(conn)
conn.on('message', (message) => {
const data = new Uint8Array(message)
const decoder = decoding.createDecoder(data)
const messageType = decoding.readVarUint(decoder)
switch (messageType) {
case messageSync:
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, doc, null)
if (encoding.length(encoder) > 1) {
send(conn, encoding.toUint8Array(encoder))
}
break
case messageAwareness:
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), conn)
break
}
})
// Send initial sync step 1
{
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, doc)
send(conn, encoding.toUint8Array(encoder))
}
// Send awareness states
const awarenessStates = awareness.getStates()
if (awarenessStates.size > 0) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, Array.from(awarenessStates.keys())))
send(conn, encoding.toUint8Array(encoder))
}
// Broadcast awareness changes
const awarenessChangeHandler = ({ added, updated, removed }, origin) => {
const changedClients = added.concat(updated).concat(removed)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients))
const message = encoding.toUint8Array(encoder)
doc.conns.forEach(c => send(c, message))
}
awareness.on('update', awarenessChangeHandler)
// Broadcast doc updates
const updateHandler = (update, origin) => {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
const message = encoding.toUint8Array(encoder)
doc.conns.forEach(c => {
if (c !== origin) send(c, message)
})
}
doc.on('update', updateHandler)
conn.on('close', () => {
doc.conns.delete(conn)
awareness.off('update', awarenessChangeHandler)
doc.off('update', updateHandler)
// Clean up awareness for this connection
awarenessProtocol.removeAwarenessStates(awareness, [doc.clientID], null)
})
}
// --- HTTP + WebSocket server ---
const server = http.createServer((req, res) => {
// Strip /ws_mp/ prefix if present (when accessed without reverse proxy path stripping)
if (req.url?.startsWith('/ws_mp/')) {
req.url = req.url.slice('/ws_mp'.length)
} else if (req.url === '/ws_mp') {
req.url = '/'
}
console.log(`[${new Date().toISOString()}] HTTP ${req.method} ${req.url} from=${req.socket.remoteAddress}`)
if (req.url === '/' || req.url === '/health') {
res.writeHead(200, {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
})
res.end(JSON.stringify({ status: 'ok', service: 'multiplayer' }))
} else {
res.writeHead(404)
res.end('not found')
}
})
const wss = new WebSocketServer({ server })
wss.on('connection', async (ws, req) => {
let docName = req.url?.slice(1).split('?')[0] || 'unknown'
// Strip ws_mp/ prefix if present (when accessed without reverse proxy path stripping)
if (docName.startsWith('ws_mp/')) {
docName = docName.slice('ws_mp/'.length)
}
const clientIp = req.socket.remoteAddress
// Handle ping test — respond and close immediately
if (docName === '__ping__') {
console.log(`[${new Date().toISOString()}] WS ping from=${clientIp}`)
ws.send(JSON.stringify({ type: 'pong', service: 'multiplayer' }))
ws.close()
return
}
// Verify JWT token
const urlParams = new URLSearchParams(req.url?.split('?')[1] || '')
const token = urlParams.get('token')
if (!token) {
if (REQUIRE_SIGNED_REQUESTS) {
console.warn(`[${new Date().toISOString()}] REJECTED: doc="${docName}" from=${clientIp} reason="no token"`)
ws.close(4401, 'Authentication required')
return
}
console.warn(`[${new Date().toISOString()}] WARN: no token for doc="${docName}" from=${clientIp} (signed requests not required)`)
} else {
const error = await verifyToken(token, docName)
if (error) {
console.warn(`[${new Date().toISOString()}] REJECTED: doc="${docName}" from=${clientIp} reason="${error}"`)
ws.close(4403, 'Token verification failed')
return
}
}
console.log(`[${new Date().toISOString()}] CONNECT: doc="${docName}" from=${clientIp}`)
ws.on('close', () => {
console.log(`[${new Date().toISOString()}] DISCONNECT: doc="${docName}" from=${clientIp}`)
})
setupWSConnection(ws, req, docName)
})
server.listen(PORT, HOST, () => {
console.log(`[${new Date().toISOString()}] Multiplayer server running at ${HOST}:${PORT}`)
if (REQUIRE_SIGNED_REQUESTS) {
console.log(`[${new Date().toISOString()}] Signed requests REQUIRED (set REQUIRE_SIGNED_MULTIPLAYER_REQUESTS=false to disable)`)
} else {
console.log(`[${new Date().toISOString()}] Signed requests DISABLED`)
}
})