From f88a80ac4772398eef6941477ad6c4e69d2b5e37 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Tue, 4 Feb 2025 21:02:11 -0800 Subject: [PATCH] wip: pyodide kernel --- src/lib/pyodide/pyodideKernel.ts | 81 ++++++++++++++++ src/lib/pyodide/pyodideKernel.worker.ts | 123 ++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 src/lib/pyodide/pyodideKernel.ts create mode 100644 src/lib/pyodide/pyodideKernel.worker.ts diff --git a/src/lib/pyodide/pyodideKernel.ts b/src/lib/pyodide/pyodideKernel.ts new file mode 100644 index 000000000..bd3eaeb77 --- /dev/null +++ b/src/lib/pyodide/pyodideKernel.ts @@ -0,0 +1,81 @@ +import PyodideWorker from '$lib/pyodide/pyodideKernel.worker?worker'; + +export type CellState = { + id: string; + status: 'idle' | 'running' | 'completed' | 'error'; + result: any; + stdout: string; + stderr: string; +}; + +export class PyodideKernel { + private worker: Worker; + private listeners: Map void>; + + constructor() { + this.worker = new PyodideWorker(); + this.listeners = new Map(); + + // Listen to messages from the worker + this.worker.onmessage = (event) => { + const { type, id, ...data } = event.data; + + if ((type === 'stdout' || type === 'stderr') && this.listeners.has(id)) { + this.listeners.get(id)?.({ type, id, ...data }); + } else if (type === 'result' && this.listeners.has(id)) { + this.listeners.get(id)?.({ type, id, ...data }); + // Remove the listener once the result is delivered + this.listeners.delete(id); + } else if (type === 'kernelState') { + this.listeners.forEach((listener) => listener({ type, ...data })); + } + }; + + // Initialize the worker + this.worker.postMessage({ type: 'initialize' }); + } + + async execute(id: string, code: string): Promise { + return new Promise((resolve, reject) => { + // Set up the listener for streaming and execution result + const state: CellState = { + id, + status: 'running', + result: null, + stdout: '', + stderr: '' + }; + + this.listeners.set(id, (data) => { + if (data.type === 'stdout') { + state.stdout += data.message; + } else if (data.type === 'stderr') { + state.stderr += data.message; + } else if (data.type === 'result') { + // Final result + const { state: finalState } = data; + resolve(finalState); + } + }); + + // Send execute request to the worker + this.worker.postMessage({ type: 'execute', id, code }); + }); + } + + async getState() { + return new Promise>((resolve) => { + this.worker.postMessage({ type: 'getState' }); + this.listeners.set('kernelState', (data) => { + if (data.type === 'kernelState') { + resolve(data.state); + } + }); + }); + } + + terminate() { + this.worker.postMessage({ type: 'terminate' }); + this.worker.terminate(); + } +} diff --git a/src/lib/pyodide/pyodideKernel.worker.ts b/src/lib/pyodide/pyodideKernel.worker.ts new file mode 100644 index 000000000..4f1eea6da --- /dev/null +++ b/src/lib/pyodide/pyodideKernel.worker.ts @@ -0,0 +1,123 @@ +import { loadPyodide, type PyodideInterface } from 'pyodide'; + +declare global { + interface Window { + stdout: string | null; + stderr: string | null; + pyodide: PyodideInterface; + cells: Record; + indexURL: string; + } +} + +type CellState = { + id: string; + status: 'idle' | 'running' | 'completed' | 'error'; + result: any; + stdout: string; + stderr: string; +}; + +const initializePyodide = async () => { + // Ensure Pyodide is loaded once and cached in the worker's global scope + if (!self.pyodide) { + self.indexURL = '/pyodide/'; + self.stdout = ''; + self.stderr = ''; + self.cells = {}; + + self.pyodide = await loadPyodide({ + indexURL: self.indexURL + }); + } +}; + +const executeCode = async (id: string, code: string) => { + if (!self.pyodide) { + await initializePyodide(); + } + + // Update the cell state to "running" + self.cells[id] = { + id, + status: 'running', + result: null, + stdout: '', + stderr: '' + }; + + // Redirect stdout/stderr to stream updates + self.pyodide.setStdout({ + batched: (msg: string) => { + self.cells[id].stdout += msg; + self.postMessage({ type: 'stdout', id, message: msg }); + } + }); + self.pyodide.setStderr({ + batched: (msg: string) => { + self.cells[id].stderr += msg; + self.postMessage({ type: 'stderr', id, message: msg }); + } + }); + + try { + // Dynamically load required packages based on imports in the Python code + await self.pyodide.loadPackagesFromImports(code, { + messageCallback: (msg: string) => { + self.postMessage({ type: 'stdout', id, package: true, message: `[package] ${msg}` }); + }, + errorCallback: (msg: string) => { + self.postMessage({ type: 'stderr', id, package: true, message: `[package] ${msg}` }); + } + }); + + // Execute the Python code + const result = await self.pyodide.runPythonAsync(code); + self.cells[id].result = result; + self.cells[id].status = 'completed'; + } catch (error) { + self.cells[id].status = 'error'; + self.cells[id].stderr += `\n${error.toString()}`; + } finally { + // Notify parent thread when execution completes + self.postMessage({ + type: 'result', + id, + state: self.cells[id] + }); + } +}; + +// Handle messages from the main thread +self.onmessage = async (event) => { + const { type, id, code, ...args } = event.data; + + switch (type) { + case 'initialize': + await initializePyodide(); + self.postMessage({ type: 'initialized' }); + break; + + case 'execute': + if (id && code) { + await executeCode(id, code); + } + break; + + case 'getState': + self.postMessage({ + type: 'kernelState', + state: self.cells + }); + break; + + case 'terminate': + // Explicitly clear the worker for cleanup + for (const key in self.cells) delete self.cells[key]; + self.close(); + break; + + default: + console.error(`Unknown message type: ${type}`); + } +};