Compare commits

...

2 Commits

Author SHA1 Message Date
windmill-internal-app[bot]
9942bb748c chore: update ee-repo-ref to 592848d59ca2304926fb2bd85d000668a7f46a77
This commit updates the EE repository reference after PR #420 was merged in windmill-ee-private.

Previous ee-repo-ref: 931813b75b8260faa13ddc07f36a11607b7e3bf6

New ee-repo-ref: 592848d59ca2304926fb2bd85d000668a7f46a77

Automated by sync-ee-ref workflow.
2026-02-18 08:53:40 +00:00
Ruben Fiszel
430622261f feat(backend): add sandbox SDK for long-lived nsjail sandbox environments
Add interactive, long-lived sandbox management to Windmill with two
deployment modes (embedded on worker, remote on dedicated hosts).

- Database schema: sandbox, sandbox_exec, sandbox_host tables
- windmill-sandbox crate: core nsjail process manager (create, exec via
  nsenter, suspend/resume via SIGSTOP/SIGCONT, file I/O)
- windmill-api-sandbox crate: REST API for sandbox CRUD and operations
- Embedded sandbox HTTP server for worker-local sandbox access
- Monitor integration for cleanup of expired/idle/orphaned sandboxes
- Python SDK: Sandbox class with exec, suspend, resume, terminate, file ops

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 16:41:00 +00:00
20 changed files with 2079 additions and 3 deletions

35
backend/Cargo.lock generated
View File

@@ -15899,6 +15899,7 @@ dependencies = [
"windmill-api-jobs",
"windmill-api-npm-proxy",
"windmill-api-openapi",
"windmill-api-sandbox",
"windmill-api-schedule",
"windmill-api-scripts",
"windmill-api-settings",
@@ -16229,6 +16230,23 @@ dependencies = [
"windmill-trigger-http",
]
[[package]]
name = "windmill-api-sandbox"
version = "1.638.2"
dependencies = [
"axum 0.7.9",
"chrono",
"reqwest 0.13.1",
"serde",
"serde_json",
"sqlx",
"tracing",
"uuid",
"windmill-api-auth",
"windmill-common",
"windmill-sandbox",
]
[[package]]
name = "windmill-api-schedule"
version = "1.638.2"
@@ -16979,6 +16997,22 @@ dependencies = [
"windmill-queue",
]
[[package]]
name = "windmill-sandbox"
version = "1.638.2"
dependencies = [
"axum 0.7.9",
"chrono",
"nix 0.27.1",
"serde",
"serde_json",
"sqlx",
"tokio",
"tracing",
"uuid",
"windmill-common",
]
[[package]]
name = "windmill-sql-datatype-parser-wasm"
version = "1.638.2"
@@ -17437,6 +17471,7 @@ dependencies = [
"windmill-parser-yaml",
"windmill-queue",
"windmill-runtime-nativets",
"windmill-sandbox",
"yaml-rust",
]

View File

@@ -37,9 +37,11 @@ members = [
"./windmill-api-inputs",
"./windmill-api-npm-proxy",
"./windmill-api-openapi",
"./windmill-api-sandbox",
"./windmill-api-schedule",
"./windmill-api-settings",
"./windmill-api-workers",
"./windmill-sandbox",
"./windmill-store",
"./windmill-queue",
"./windmill-worker",
@@ -305,9 +307,11 @@ windmill-api-flow-conversations = { path = "./windmill-api-flow-conversations" }
windmill-api-inputs = { path = "./windmill-api-inputs" }
windmill-api-npm-proxy = { path = "./windmill-api-npm-proxy" }
windmill-api-openapi = { path = "./windmill-api-openapi" }
windmill-api-sandbox = { path = "./windmill-api-sandbox" }
windmill-api-schedule = { path = "./windmill-api-schedule" }
windmill-api-settings = { path = "./windmill-api-settings" }
windmill-api-workers = { path = "./windmill-api-workers" }
windmill-sandbox = { path = "./windmill-sandbox" }
windmill-store = { path = "./windmill-store" }
windmill-parser = { path = "./parsers/windmill-parser" }
windmill-parser-ts = { path = "./parsers/windmill-parser-ts" }

View File

@@ -1 +1 @@
931813b75b8260faa13ddc07f36a11607b7e3bf6
592848d59ca2304926fb2bd85d000668a7f46a77

View File

@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS sandbox_exec;
DROP TABLE IF EXISTS sandbox;
DROP TABLE IF EXISTS sandbox_host;
DROP TYPE IF EXISTS sandbox_status;

View File

@@ -0,0 +1,69 @@
CREATE TYPE sandbox_status AS ENUM ('creating', 'running', 'suspended', 'stopped', 'error');
CREATE TABLE sandbox_host (
id VARCHAR(255) PRIMARY KEY,
base_url TEXT NOT NULL,
last_ping TIMESTAMPTZ NOT NULL DEFAULT now(),
capacity INT NOT NULL DEFAULT 10,
active_count INT NOT NULL DEFAULT 0,
labels JSONB NOT NULL DEFAULT '{}'
);
CREATE TABLE sandbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id VARCHAR(50) NOT NULL REFERENCES workspace(id),
created_by VARCHAR(55) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
image TEXT,
timeout_secs INT,
idle_timeout_secs INT,
cpu_limit INT NOT NULL DEFAULT 1,
memory_limit_mb INT NOT NULL DEFAULT 512,
disk_limit_mb INT NOT NULL DEFAULT 1024,
env_vars JSONB NOT NULL DEFAULT '{}',
labels JSONB NOT NULL DEFAULT '{}',
mounts JSONB NOT NULL DEFAULT '[]',
network_enabled BOOLEAN NOT NULL DEFAULT false,
mode VARCHAR(20) NOT NULL DEFAULT 'embedded',
parent_job_id UUID,
host_id VARCHAR(255) REFERENCES sandbox_host(id),
status sandbox_status NOT NULL DEFAULT 'creating',
pid INT,
started_at TIMESTAMPTZ,
last_activity_at TIMESTAMPTZ,
suspended_at TIMESTAMPTZ,
stopped_at TIMESTAMPTZ,
error_message TEXT,
ephemeral BOOLEAN NOT NULL DEFAULT false,
auto_stop_after_secs INT,
expires_at TIMESTAMPTZ
);
CREATE INDEX idx_sandbox_workspace_status ON sandbox(workspace_id, status);
CREATE INDEX idx_sandbox_host ON sandbox(host_id) WHERE status IN ('running', 'suspended');
CREATE INDEX idx_sandbox_expires ON sandbox(expires_at) WHERE expires_at IS NOT NULL;
CREATE TABLE sandbox_exec (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sandbox_id UUID NOT NULL REFERENCES sandbox(id) ON DELETE CASCADE,
workspace_id VARCHAR(50) NOT NULL,
command TEXT NOT NULL,
cwd TEXT,
env_vars JSONB,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ,
exit_code INT,
stdout TEXT,
stderr TEXT,
duration_ms BIGINT,
created_by VARCHAR(55) NOT NULL
);
CREATE INDEX idx_sandbox_exec_sandbox ON sandbox_exec(sandbox_id, started_at DESC);

View File

@@ -1004,6 +1004,77 @@ pub async fn delete_expired_items(db: &DB) -> () {
),
}
// Sandbox cleanup: terminate expired sandboxes, stop idle ones, delete ephemeral stopped sandboxes
let sandbox_expired = sqlx::query_scalar!(
"UPDATE sandbox SET status = 'stopped'::sandbox_status, stopped_at = now()
WHERE status IN ('running', 'suspended', 'creating')
AND expires_at IS NOT NULL AND expires_at <= now()
RETURNING id"
)
.fetch_all(db)
.await;
match sandbox_expired {
Ok(ids) if !ids.is_empty() => {
tracing::info!("stopped {} expired sandboxes", ids.len());
}
Err(e) => tracing::error!("Error stopping expired sandboxes: {e}"),
_ => {}
}
let sandbox_idle_stopped = sqlx::query_scalar!(
"UPDATE sandbox SET status = 'stopped'::sandbox_status, stopped_at = now()
WHERE status = 'running'
AND idle_timeout_secs IS NOT NULL
AND last_activity_at IS NOT NULL
AND last_activity_at + (idle_timeout_secs::text || ' seconds')::interval <= now()
RETURNING id"
)
.fetch_all(db)
.await;
match sandbox_idle_stopped {
Ok(ids) if !ids.is_empty() => {
tracing::info!("stopped {} idle sandboxes", ids.len());
}
Err(e) => tracing::error!("Error stopping idle sandboxes: {e}"),
_ => {}
}
let sandbox_ephemeral_deleted = sqlx::query_scalar!(
"DELETE FROM sandbox WHERE ephemeral = true AND status IN ('stopped', 'error')
RETURNING id"
)
.fetch_all(db)
.await;
match sandbox_ephemeral_deleted {
Ok(ids) if !ids.is_empty() => {
tracing::info!("deleted {} ephemeral stopped sandboxes", ids.len());
}
Err(e) => tracing::error!("Error deleting ephemeral sandboxes: {e}"),
_ => {}
}
// Mark sandboxes on dead hosts as errored
let sandbox_orphaned = sqlx::query_scalar!(
"UPDATE sandbox SET status = 'error'::sandbox_status,
error_message = 'Sandbox host became unreachable'
WHERE status IN ('running', 'suspended')
AND mode = 'remote'
AND host_id IS NOT NULL
AND host_id NOT IN (
SELECT id FROM sandbox_host WHERE last_ping > now() - interval '2 minutes'
)
RETURNING id"
)
.fetch_all(db)
.await;
match sandbox_orphaned {
Ok(ids) if !ids.is_empty() => {
tracing::info!("marked {} orphaned sandboxes as error", ids.len());
}
Err(e) => tracing::error!("Error marking orphaned sandboxes: {e}"),
_ => {}
}
let job_retention_secs = *JOB_RETENTION_SECS.read().await;
if job_retention_secs > 0 {
let batch_size = *JOB_CLEANUP_BATCH_SIZE;

View File

@@ -0,0 +1,22 @@
[package]
name = "windmill-api-sandbox"
version.workspace = true
authors.workspace = true
edition.workspace = true
[lib]
name = "windmill_api_sandbox"
path = "src/lib.rs"
[dependencies]
windmill-api-auth.workspace = true
windmill-common = { workspace = true, default-features = false }
windmill-sandbox.workspace = true
axum.workspace = true
chrono.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlx.workspace = true
tracing.workspace = true
uuid.workspace = true

View File

@@ -0,0 +1,703 @@
/*
* Author: Windmill Labs, Inc
* Copyright: Windmill Labs, Inc 2024
* This file and its contents are licensed under the AGPLv3 License.
* Please see the included NOTICE for copyright information and
* LICENSE-AGPL for a copy of the license.
*/
use axum::{
extract::{Path, Query},
routing::{delete, get, post},
Extension, Json, Router,
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use windmill_common::{
db::UserDB,
error::{Error, JsonResult},
};
use windmill_sandbox::{ExecRequest, ExecResult, SandboxConfig, SandboxInfo, SandboxStatus};
use windmill_api_auth::ApiAuthed;
pub fn workspaced_service() -> Router {
Router::new()
.route("/create", post(create_sandbox))
.route("/list", get(list_sandboxes))
.route("/:sandbox_id", get(get_sandbox))
.route("/:sandbox_id", delete(delete_sandbox))
.route("/:sandbox_id/exec", post(exec_sandbox))
.route("/:sandbox_id/suspend", post(suspend_sandbox))
.route("/:sandbox_id/resume", post(resume_sandbox))
.route("/:sandbox_id/terminate", post(terminate_sandbox))
.route("/:sandbox_id/write_file", post(write_file))
.route("/:sandbox_id/read_file", get(read_file))
.route("/:sandbox_id/execs", get(list_execs))
}
#[derive(sqlx::FromRow)]
struct SandboxRow {
id: Uuid,
workspace_id: String,
status: SandboxStatus,
image: Option<String>,
labels: serde_json::Value,
mode: String,
created_by: String,
created_at: chrono::DateTime<Utc>,
started_at: Option<chrono::DateTime<Utc>>,
last_activity_at: Option<chrono::DateTime<Utc>>,
suspended_at: Option<chrono::DateTime<Utc>>,
stopped_at: Option<chrono::DateTime<Utc>>,
error_message: Option<String>,
ephemeral: bool,
cpu_limit: i32,
memory_limit_mb: i32,
network_enabled: bool,
}
impl From<SandboxRow> for SandboxInfo {
fn from(r: SandboxRow) -> Self {
SandboxInfo {
id: r.id,
workspace_id: r.workspace_id,
status: r.status,
image: r.image,
labels: r.labels,
mode: r.mode,
created_by: r.created_by,
created_at: r.created_at,
started_at: r.started_at,
last_activity_at: r.last_activity_at,
suspended_at: r.suspended_at,
stopped_at: r.stopped_at,
error_message: r.error_message,
ephemeral: r.ephemeral,
cpu_limit: r.cpu_limit,
memory_limit_mb: r.memory_limit_mb,
network_enabled: r.network_enabled,
}
}
}
#[derive(sqlx::FromRow)]
struct SandboxStatusRow {
status: SandboxStatus,
host_id: Option<String>,
mode: String,
}
async fn create_sandbox(
authed: ApiAuthed,
Path(w_id): Path<String>,
Extension(user_db): Extension<UserDB>,
Json(config): Json<SandboxConfig>,
) -> JsonResult<SandboxInfo> {
let mut tx = user_db.begin(&authed).await?;
let mode = config.mode.to_string();
let ephemeral = config.ephemeral || config.mode == windmill_sandbox::SandboxMode::Embedded;
let expires_at = config
.timeout_secs
.map(|t| Utc::now() + chrono::Duration::seconds(t as i64));
let row = sqlx::query_as::<_, (Uuid, chrono::DateTime<Utc>)>(
"INSERT INTO sandbox (
workspace_id, created_by, image, timeout_secs, idle_timeout_secs,
cpu_limit, memory_limit_mb, disk_limit_mb, env_vars, labels,
mounts, network_enabled, mode, status, ephemeral,
auto_stop_after_secs, expires_at, started_at, last_activity_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, 'creating', $14, $15, $16, now(), now()
) RETURNING id, created_at",
)
.bind(&w_id)
.bind(&authed.username)
.bind(config.image.as_deref())
.bind(config.timeout_secs)
.bind(config.idle_timeout_secs)
.bind(config.cpu_limit)
.bind(config.memory_limit_mb)
.bind(config.disk_limit_mb)
.bind(&config.env_vars)
.bind(&config.labels)
.bind(&config.mounts)
.bind(config.network_enabled)
.bind(&mode)
.bind(ephemeral)
.bind(config.auto_stop_after_secs)
.bind(expires_at)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(SandboxInfo {
id: row.0,
workspace_id: w_id,
status: SandboxStatus::Creating,
image: config.image,
labels: config.labels,
mode,
created_by: authed.username,
created_at: row.1,
started_at: None,
last_activity_at: None,
suspended_at: None,
stopped_at: None,
error_message: None,
ephemeral,
cpu_limit: config.cpu_limit,
memory_limit_mb: config.memory_limit_mb,
network_enabled: config.network_enabled,
}))
}
#[derive(Deserialize)]
struct ListSandboxesQuery {
status: Option<String>,
label_key: Option<String>,
label_value: Option<String>,
}
const SANDBOX_SELECT_COLS: &str = "id, workspace_id, status, image, labels, mode, created_by, created_at, started_at, last_activity_at, suspended_at, stopped_at, error_message, ephemeral, cpu_limit, memory_limit_mb, network_enabled";
async fn list_sandboxes(
authed: ApiAuthed,
Path(w_id): Path<String>,
Extension(user_db): Extension<UserDB>,
Query(query): Query<ListSandboxesQuery>,
) -> JsonResult<Vec<SandboxInfo>> {
let mut tx = user_db.begin(&authed).await?;
let statuses: Option<Vec<String>> = query
.status
.map(|s| s.split(',').map(|s| s.trim().to_string()).collect());
let rows = sqlx::query_as::<_, SandboxRow>(&format!(
"SELECT {SANDBOX_SELECT_COLS}
FROM sandbox
WHERE workspace_id = $1
AND ($2::text[] IS NULL OR status::text = ANY($2))
AND ($3::text IS NULL OR $4::text IS NULL OR labels->>$3 = $4)
ORDER BY created_at DESC
LIMIT 100"
))
.bind(&w_id)
.bind(statuses.as_deref())
.bind(query.label_key.as_deref())
.bind(query.label_value.as_deref())
.fetch_all(&mut *tx)
.await?;
let sandboxes = rows.into_iter().map(SandboxInfo::from).collect();
Ok(Json(sandboxes))
}
async fn get_sandbox(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
) -> JsonResult<SandboxInfo> {
let mut tx = user_db.begin(&authed).await?;
let r = sqlx::query_as::<_, SandboxRow>(&format!(
"SELECT {SANDBOX_SELECT_COLS} FROM sandbox WHERE id = $1 AND workspace_id = $2"
))
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
Ok(Json(r.into()))
}
async fn delete_sandbox(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
) -> JsonResult<String> {
let mut tx = user_db.begin(&authed).await?;
let status = sqlx::query_scalar::<_, SandboxStatus>(
"SELECT status FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if status != SandboxStatus::Stopped && status != SandboxStatus::Error {
return Err(Error::BadRequest(
"Sandbox must be stopped or in error state before deletion".to_string(),
));
}
sqlx::query("DELETE FROM sandbox WHERE id = $1")
.bind(sandbox_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(format!("Sandbox {sandbox_id} deleted")))
}
async fn exec_sandbox(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
Json(request): Json<ExecRequest>,
) -> JsonResult<ExecResult> {
let mut tx = user_db.begin(&authed).await?;
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sandbox.status != SandboxStatus::Running {
return Err(Error::BadRequest(format!(
"Sandbox is not running (status: {})",
sandbox.status
)));
}
let host_base_url = if sandbox.mode == "remote" {
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
Error::InternalErr("Remote sandbox has no host_id".to_string())
})?;
let host = sqlx::query_scalar::<_, String>(
"SELECT base_url FROM sandbox_host WHERE id = $1",
)
.bind(host_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox host {host_id} not found")))?;
Some(host)
} else {
None
};
let started_at = Utc::now();
let result = if let Some(base_url) = host_base_url {
proxy_exec(&base_url, sandbox_id, &request).await?
} else {
return Err(Error::BadRequest(
"Embedded sandbox exec must be performed directly via WM_SANDBOX_URL".to_string(),
));
};
let completed_at = Utc::now();
let duration_ms = (completed_at - started_at).num_milliseconds();
sqlx::query(
"INSERT INTO sandbox_exec (
sandbox_id, workspace_id, command, cwd, env_vars,
started_at, completed_at, exit_code, stdout, stderr,
duration_ms, created_by
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
)
.bind(sandbox_id)
.bind(&w_id)
.bind(&request.command)
.bind(request.cwd.as_deref())
.bind(&request.env)
.bind(started_at)
.bind(completed_at)
.bind(result.exit_code)
.bind(&result.stdout)
.bind(&result.stderr)
.bind(duration_ms)
.bind(&authed.username)
.execute(&mut *tx)
.await?;
sqlx::query("UPDATE sandbox SET last_activity_at = now() WHERE id = $1")
.bind(sandbox_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(result))
}
async fn proxy_exec(
base_url: &str,
sandbox_id: Uuid,
request: &ExecRequest,
) -> windmill_common::error::Result<ExecResult> {
let client = reqwest::Client::new();
let url = format!("{}/sandbox/{}/exec", base_url, sandbox_id);
let resp = client
.post(&url)
.json(request)
.timeout(std::time::Duration::from_secs(
request.timeout_secs.unwrap_or(300) as u64 + 5,
))
.send()
.await
.map_err(|e| Error::InternalErr(format!("Failed to proxy exec to sandbox host: {e}")))?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(Error::InternalErr(format!(
"Sandbox host returned error: {body}"
)));
}
resp.json::<ExecResult>()
.await
.map_err(|e| Error::InternalErr(format!("Failed to parse exec result: {e}")))
}
async fn proxy_action(
base_url: &str,
sandbox_id: Uuid,
action: &str,
) -> windmill_common::error::Result<SandboxStatus> {
let client = reqwest::Client::new();
let url = format!("{}/sandbox/{}/{}", base_url, sandbox_id, action);
let resp = client
.post(&url)
.timeout(std::time::Duration::from_secs(30))
.send()
.await
.map_err(|e| {
Error::InternalErr(format!("Failed to proxy {action} to sandbox host: {e}"))
})?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(Error::InternalErr(format!(
"Sandbox host returned error: {body}"
)));
}
#[derive(Deserialize)]
struct StatusResponse {
status: SandboxStatus,
}
let status_resp = resp.json::<StatusResponse>().await.map_err(|e| {
Error::InternalErr(format!("Failed to parse action response: {e}"))
})?;
Ok(status_resp.status)
}
async fn get_host_base_url(
tx: &mut sqlx::PgConnection,
host_id: &str,
) -> windmill_common::error::Result<String> {
sqlx::query_scalar::<_, String>("SELECT base_url FROM sandbox_host WHERE id = $1")
.bind(host_id)
.fetch_one(&mut *tx)
.await
.map_err(|_| Error::NotFound(format!("Sandbox host {host_id} not found")))
}
async fn suspend_sandbox(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
) -> JsonResult<SandboxInfo> {
let mut tx = user_db.clone().begin(&authed).await?;
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sandbox.status != SandboxStatus::Running {
return Err(Error::BadRequest(format!(
"Can only suspend a running sandbox (status: {})",
sandbox.status
)));
}
if sandbox.mode == "remote" {
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
Error::InternalErr("Remote sandbox has no host_id".to_string())
})?;
let base_url = get_host_base_url(&mut *tx, host_id).await?;
proxy_action(&base_url, sandbox_id, "suspend").await?;
}
sqlx::query(
"UPDATE sandbox SET status = 'suspended', suspended_at = now() WHERE id = $1",
)
.bind(sandbox_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await
}
async fn resume_sandbox(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
) -> JsonResult<SandboxInfo> {
let mut tx = user_db.clone().begin(&authed).await?;
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sandbox.status != SandboxStatus::Suspended {
return Err(Error::BadRequest(format!(
"Can only resume a suspended sandbox (status: {})",
sandbox.status
)));
}
if sandbox.mode == "remote" {
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
Error::InternalErr("Remote sandbox has no host_id".to_string())
})?;
let base_url = get_host_base_url(&mut *tx, host_id).await?;
proxy_action(&base_url, sandbox_id, "resume").await?;
}
sqlx::query(
"UPDATE sandbox SET status = 'running', suspended_at = NULL, last_activity_at = now() WHERE id = $1",
)
.bind(sandbox_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await
}
async fn terminate_sandbox(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
) -> JsonResult<SandboxInfo> {
let mut tx = user_db.clone().begin(&authed).await?;
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sandbox.status == SandboxStatus::Stopped {
return get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await;
}
if sandbox.mode == "remote" {
if let Some(host_id) = sandbox.host_id.as_ref() {
if let Ok(base_url) = get_host_base_url(&mut *tx, host_id).await {
let _ = proxy_action(&base_url, sandbox_id, "terminate").await;
}
}
}
sqlx::query(
"UPDATE sandbox SET status = 'stopped', stopped_at = now() WHERE id = $1",
)
.bind(sandbox_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await
}
#[derive(Deserialize, Serialize)]
struct WriteFileBody {
path: String,
content: String,
}
async fn write_file(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
Json(body): Json<WriteFileBody>,
) -> JsonResult<String> {
let mut tx = user_db.begin(&authed).await?;
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sandbox.status == SandboxStatus::Stopped {
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
}
if sandbox.mode == "remote" {
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
Error::InternalErr("Remote sandbox has no host_id".to_string())
})?;
let base_url = get_host_base_url(&mut *tx, host_id).await?;
let client = reqwest::Client::new();
let url = format!("{}/sandbox/{}/write_file", base_url, sandbox_id);
let resp = client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| Error::InternalErr(format!("Failed to proxy write_file: {e}")))?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
return Err(Error::InternalErr(format!("Write file failed: {err}")));
}
}
sqlx::query("UPDATE sandbox SET last_activity_at = now() WHERE id = $1")
.bind(sandbox_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json("ok".to_string()))
}
#[derive(Deserialize)]
struct ReadFileQuery {
path: String,
}
async fn read_file(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
Query(query): Query<ReadFileQuery>,
) -> JsonResult<String> {
let mut tx = user_db.begin(&authed).await?;
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sandbox.status == SandboxStatus::Stopped {
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
}
if sandbox.mode == "remote" {
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
Error::InternalErr("Remote sandbox has no host_id".to_string())
})?;
let base_url = get_host_base_url(&mut *tx, host_id).await?;
let client = reqwest::Client::new();
let url = format!(
"{}/sandbox/{}/read_file?path={}",
base_url, sandbox_id, query.path
);
let resp = client
.get(&url)
.send()
.await
.map_err(|e| Error::InternalErr(format!("Failed to proxy read_file: {e}")))?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
return Err(Error::InternalErr(format!("Read file failed: {err}")));
}
let content = resp.text().await.map_err(|e| {
Error::InternalErr(format!("Failed to read response body: {e}"))
})?;
return Ok(Json(content));
}
Err(Error::BadRequest(
"Embedded sandbox file ops must be performed directly via WM_SANDBOX_URL".to_string(),
))
}
#[derive(Serialize, sqlx::FromRow)]
struct ExecRecord {
id: Uuid,
sandbox_id: Uuid,
command: String,
started_at: chrono::DateTime<Utc>,
completed_at: Option<chrono::DateTime<Utc>>,
exit_code: Option<i32>,
stdout: Option<String>,
stderr: Option<String>,
duration_ms: Option<i64>,
created_by: String,
}
async fn list_execs(
authed: ApiAuthed,
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
Extension(user_db): Extension<UserDB>,
) -> JsonResult<Vec<ExecRecord>> {
let mut tx = user_db.begin(&authed).await?;
let rows = sqlx::query_as::<_, ExecRecord>(
"SELECT id, sandbox_id, command, started_at, completed_at,
exit_code, stdout, stderr, duration_ms, created_by
FROM sandbox_exec
WHERE sandbox_id = $1 AND workspace_id = $2
ORDER BY started_at DESC
LIMIT 100",
)
.bind(sandbox_id)
.bind(&w_id)
.fetch_all(&mut *tx)
.await?;
Ok(Json(rows))
}
async fn get_sandbox_info(
user_db: UserDB,
authed: &ApiAuthed,
w_id: &str,
sandbox_id: Uuid,
) -> JsonResult<SandboxInfo> {
let mut tx = user_db.begin(authed).await?;
let r = sqlx::query_as::<_, SandboxRow>(&format!(
"SELECT {SANDBOX_SELECT_COLS} FROM sandbox WHERE id = $1 AND workspace_id = $2"
))
.bind(sandbox_id)
.bind(w_id)
.fetch_one(&mut *tx)
.await?;
Ok(Json(r.into()))
}

View File

@@ -153,6 +153,7 @@ windmill-api-flow-conversations.workspace = true
windmill-api-inputs.workspace = true
windmill-api-npm-proxy.workspace = true
windmill-api-openapi = { workspace = true, optional = true }
windmill-api-sandbox.workspace = true
windmill-api-schedule.workspace = true
windmill-api-settings = { workspace = true }
windmill-api-workers.workspace = true

View File

@@ -509,6 +509,7 @@ pub async fn run_server(
.nest("/npm_proxy", windmill_api_npm_proxy::workspaced_service())
.nest("/raw_apps", raw_apps::workspaced_service())
.nest("/resources", resources::workspaced_service())
.nest("/sandboxes", windmill_api_sandbox::workspaced_service())
.nest("/schedules", windmill_api_schedule::workspaced_service())
.nest("/scripts", scripts::workspaced_service())
.nest(

View File

@@ -0,0 +1,21 @@
[package]
name = "windmill-sandbox"
version.workspace = true
authors.workspace = true
edition.workspace = true
[lib]
name = "windmill_sandbox"
path = "src/lib.rs"
[dependencies]
axum.workspace = true
chrono.workspace = true
nix.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlx.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
windmill-common = { workspace = true, default-features = false }

View File

@@ -0,0 +1,14 @@
/*
* Author: Windmill Labs, Inc
* Copyright: Windmill Labs, Inc 2024
* This file and its contents are licensed under the AGPLv3 License.
* Please see the included NOTICE for copyright information and
* LICENSE-AGPL for a copy of the license.
*/
pub mod manager;
pub mod nsjail;
pub mod types;
pub use manager::SandboxManager;
pub use types::*;

View File

@@ -0,0 +1,302 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::Utc;
use tokio::sync::Mutex;
use uuid::Uuid;
use windmill_common::error::{Error, Result};
use crate::nsjail;
use crate::types::*;
struct SandboxProcess {
_id: Uuid,
status: SandboxStatus,
nsjail_child: Option<tokio::process::Child>,
nsjail_pid: Option<u32>,
sandbox_dir: String,
_config: SandboxConfig,
_started_at: Option<chrono::DateTime<Utc>>,
last_activity_at: Option<chrono::DateTime<Utc>>,
suspended_at: Option<chrono::DateTime<Utc>>,
_config_path: String,
}
pub struct SandboxManager {
sandboxes: Arc<Mutex<HashMap<Uuid, SandboxProcess>>>,
base_dir: String,
}
impl SandboxManager {
pub fn new(base_dir: String) -> Self {
std::fs::create_dir_all(&base_dir).ok();
Self {
sandboxes: Arc::new(Mutex::new(HashMap::new())),
base_dir,
}
}
pub async fn create_sandbox(&self, config: SandboxConfig) -> Result<(Uuid, SandboxStatus)> {
let id = Uuid::new_v4();
let sandbox_dir = format!("{}/{}", self.base_dir, id);
std::fs::create_dir_all(&sandbox_dir)
.map_err(|e| Error::InternalErr(format!("Failed to create sandbox dir: {e}")))?;
let nsjail_config = nsjail::NsjailConfig {
sandbox_dir: sandbox_dir.clone(),
clone_newnet: config.network_enabled,
clone_newuser: !is_nuser_disabled(),
memory_limit_bytes: (config.memory_limit_mb as u64) * 1024 * 1024,
cpu_ms_per_sec: (config.cpu_limit as u32) * 1000,
disk_limit_bytes: (config.disk_limit_mb as u64) * 1024 * 1024,
extra_mounts: vec![],
};
let config_content = nsjail::render_config(&nsjail_config);
let config_path = format!("{}/sandbox.config.proto", sandbox_dir);
std::fs::write(&config_path, &config_content)
.map_err(|e| Error::InternalErr(format!("Failed to write nsjail config: {e}")))?;
let env_vars: Vec<(String, String)> = if let Some(obj) = config.env_vars.as_object() {
obj.iter()
.map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_string()))
.collect()
} else {
vec![]
};
let child = nsjail::spawn_nsjail(&config_path, &env_vars).await?;
let nsjail_pid = child.id();
let process = SandboxProcess {
_id: id,
status: SandboxStatus::Running,
nsjail_child: Some(child),
nsjail_pid,
sandbox_dir,
_config: config,
_started_at: Some(Utc::now()),
last_activity_at: Some(Utc::now()),
suspended_at: None,
_config_path: config_path,
};
self.sandboxes.lock().await.insert(id, process);
Ok((id, SandboxStatus::Running))
}
pub async fn exec(
&self,
sandbox_id: Uuid,
request: ExecRequest,
) -> Result<ExecResult> {
let (child_pid, _sandbox_dir) = {
let mut sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get_mut(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status != SandboxStatus::Running {
return Err(Error::BadRequest(format!(
"Sandbox is not running (status: {})",
sb.status
)));
}
sb.last_activity_at = Some(Utc::now());
let pid = sb
.nsjail_pid
.ok_or_else(|| Error::InternalErr("No PID for sandbox".to_string()))?;
(pid, sb.sandbox_dir.clone())
};
let env_vars: Vec<(String, String)> = request
.env
.as_ref()
.and_then(|v| v.as_object())
.map(|obj| {
obj.iter()
.map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_string()))
.collect()
})
.unwrap_or_default();
let started_at = Utc::now();
let (exit_code, stdout, stderr) = nsjail::exec_in_sandbox(
child_pid,
&request.command,
request.cwd.as_deref(),
&env_vars,
request.timeout_secs.map(|s| s as u32),
)
.await?;
let duration_ms = (Utc::now() - started_at).num_milliseconds();
Ok(ExecResult {
exec_id: Uuid::new_v4(),
exit_code,
stdout,
stderr,
duration_ms,
})
}
pub async fn suspend(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
let mut sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get_mut(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status != SandboxStatus::Running {
return Err(Error::BadRequest(format!(
"Can only suspend a running sandbox (status: {})",
sb.status
)));
}
let pid = sb
.nsjail_pid
.ok_or_else(|| Error::InternalErr("No PID for sandbox".to_string()))?;
nsjail::send_signal(pid, nix::sys::signal::Signal::SIGSTOP)?;
sb.status = SandboxStatus::Suspended;
sb.suspended_at = Some(Utc::now());
Ok(SandboxStatus::Suspended)
}
pub async fn resume(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
let mut sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get_mut(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status != SandboxStatus::Suspended {
return Err(Error::BadRequest(format!(
"Can only resume a suspended sandbox (status: {})",
sb.status
)));
}
let pid = sb
.nsjail_pid
.ok_or_else(|| Error::InternalErr("No PID for sandbox".to_string()))?;
nsjail::send_signal(pid, nix::sys::signal::Signal::SIGCONT)?;
sb.status = SandboxStatus::Running;
sb.suspended_at = None;
sb.last_activity_at = Some(Utc::now());
Ok(SandboxStatus::Running)
}
pub async fn terminate(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
let mut sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get_mut(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status == SandboxStatus::Stopped {
return Ok(SandboxStatus::Stopped);
}
if let Some(pid) = sb.nsjail_pid {
let _ = nsjail::send_signal(pid, nix::sys::signal::Signal::SIGTERM);
}
if let Some(ref mut child) = sb.nsjail_child {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(5),
child.wait(),
)
.await;
child.kill().await.ok();
}
sb.status = SandboxStatus::Stopped;
sb.nsjail_child = None;
let _ = std::fs::remove_dir_all(&sb.sandbox_dir);
Ok(SandboxStatus::Stopped)
}
pub async fn status(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
let sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
Ok(sb.status)
}
pub async fn write_file(
&self,
sandbox_id: Uuid,
path: &str,
content: &[u8],
) -> Result<()> {
let sandbox_dir = {
let mut sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get_mut(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status == SandboxStatus::Stopped {
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
}
sb.last_activity_at = Some(Utc::now());
sb.sandbox_dir.clone()
};
nsjail::write_to_sandbox(&sandbox_dir, path, content)
}
pub async fn read_file(&self, sandbox_id: Uuid, path: &str) -> Result<String> {
let sandbox_dir = {
let sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status == SandboxStatus::Stopped {
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
}
sb.sandbox_dir.clone()
};
nsjail::read_from_sandbox(&sandbox_dir, path)
}
pub async fn list_sandboxes(&self) -> Vec<(Uuid, SandboxStatus)> {
let sandboxes = self.sandboxes.lock().await;
sandboxes
.iter()
.map(|(id, sb)| (*id, sb.status))
.collect()
}
pub async fn cleanup_all(&self) {
let ids: Vec<Uuid> = {
let sandboxes = self.sandboxes.lock().await;
sandboxes.keys().cloned().collect()
};
for id in ids {
let _ = self.terminate(id).await;
}
let mut sandboxes = self.sandboxes.lock().await;
sandboxes.clear();
}
pub async fn remove_stopped(&self, sandbox_id: Uuid) -> Result<()> {
let mut sandboxes = self.sandboxes.lock().await;
let sb = sandboxes
.get(&sandbox_id)
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
if sb.status != SandboxStatus::Stopped {
return Err(Error::BadRequest(
"Can only remove stopped sandboxes".to_string(),
));
}
sandboxes.remove(&sandbox_id);
Ok(())
}
}
fn is_nuser_disabled() -> bool {
std::env::var("DISABLE_NUSER")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
}

View File

@@ -0,0 +1,155 @@
use std::path::Path;
use std::process::Stdio;
use tokio::process::Command;
use windmill_common::error::{Error, Result};
const NSJAIL_CONFIG_TEMPLATE: &str = include_str!("../../windmill-worker/nsjail/sandbox.config.proto");
pub struct NsjailConfig {
pub sandbox_dir: String,
pub clone_newnet: bool,
pub clone_newuser: bool,
pub memory_limit_bytes: u64,
pub cpu_ms_per_sec: u32,
pub disk_limit_bytes: u64,
pub extra_mounts: Vec<MountConfig>,
}
pub struct MountConfig {
pub src: String,
pub dst: String,
pub rw: bool,
}
fn nsjail_path() -> String {
std::env::var("NSJAIL_PATH").unwrap_or_else(|_| "nsjail".to_string())
}
pub fn render_config(config: &NsjailConfig) -> String {
let extra_mounts = config
.extra_mounts
.iter()
.map(|m| {
format!(
"mount {{\n src: \"{}\"\n dst: \"{}\"\n is_bind: true\n rw: {}\n}}",
m.src, m.dst, m.rw
)
})
.collect::<Vec<_>>()
.join("\n\n");
NSJAIL_CONFIG_TEMPLATE
.replace("{SANDBOX_DIR}", &config.sandbox_dir)
.replace("{CLONE_NEWNET}", &(!config.clone_newnet).to_string())
.replace("{CLONE_NEWUSER}", &config.clone_newuser.to_string())
.replace("{MEMORY_LIMIT}", &config.memory_limit_bytes.to_string())
.replace("{CPU_MS_PER_SEC}", &config.cpu_ms_per_sec.to_string())
.replace("{DISK_LIMIT}", &config.disk_limit_bytes.to_string())
.replace("{IFACE_NO_LO}", &(!config.clone_newnet).to_string())
.replace("{EXTRA_MOUNTS}", &extra_mounts)
}
pub async fn spawn_nsjail(
config_path: &str,
env_vars: &[(String, String)],
) -> Result<tokio::process::Child> {
let mut cmd = Command::new(nsjail_path());
cmd.arg("--config")
.arg(config_path)
.arg("--")
.arg("/bin/bash")
.arg("--login")
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
for (k, v) in env_vars {
cmd.env(k, v);
}
cmd.spawn()
.map_err(|e| Error::InternalErr(format!("Failed to spawn nsjail: {e}")))
}
pub async fn exec_in_sandbox(
child_pid: u32,
command: &str,
cwd: Option<&str>,
env_vars: &[(String, String)],
timeout_secs: Option<u32>,
) -> Result<(i32, String, String)> {
let mut cmd = Command::new("nsenter");
cmd.arg("--target")
.arg(child_pid.to_string())
.arg("--mount")
.arg("--pid")
.arg("--")
.arg("/bin/bash")
.arg("-c")
.arg(command)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
if let Some(cwd) = cwd {
cmd.current_dir(cwd);
}
for (k, v) in env_vars {
cmd.env(k, v);
}
let timeout = timeout_secs.unwrap_or(300);
let child = cmd
.spawn()
.map_err(|e| Error::InternalErr(format!("Failed to exec nsenter: {e}")))?;
let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout as u64),
child.wait_with_output(),
)
.await
.map_err(|_| Error::ExecutionErr(format!("Command timed out after {timeout}s")))?
.map_err(|e| Error::InternalErr(format!("Failed to wait for nsenter: {e}")))?;
let exit_code = result.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&result.stdout).to_string();
let stderr = String::from_utf8_lossy(&result.stderr).to_string();
Ok((exit_code, stdout, stderr))
}
pub fn send_signal(pid: u32, signal: nix::sys::signal::Signal) -> Result<()> {
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid as i32),
signal,
)
.map_err(|e| Error::InternalErr(format!("Failed to send signal {signal} to PID {pid}: {e}")))
}
pub fn write_to_sandbox(sandbox_dir: &str, file_path: &str, content: &[u8]) -> Result<()> {
let full_path = Path::new(sandbox_dir).join(
file_path
.strip_prefix('/')
.unwrap_or(file_path),
);
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::InternalErr(format!("Failed to create directory: {e}")))?;
}
std::fs::write(&full_path, content)
.map_err(|e| Error::InternalErr(format!("Failed to write file: {e}")))
}
pub fn read_from_sandbox(sandbox_dir: &str, file_path: &str) -> Result<String> {
let full_path = Path::new(sandbox_dir).join(
file_path
.strip_prefix('/')
.unwrap_or(file_path),
);
std::fs::read_to_string(&full_path)
.map_err(|e| Error::NotFound(format!("File not found or unreadable: {e}")))
}

View File

@@ -0,0 +1,136 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "sandbox_status", rename_all = "lowercase")]
#[serde(rename_all = "lowercase")]
pub enum SandboxStatus {
Creating,
Running,
Suspended,
Stopped,
Error,
}
impl std::fmt::Display for SandboxStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SandboxStatus::Creating => write!(f, "creating"),
SandboxStatus::Running => write!(f, "running"),
SandboxStatus::Suspended => write!(f, "suspended"),
SandboxStatus::Stopped => write!(f, "stopped"),
SandboxStatus::Error => write!(f, "error"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SandboxConfig {
#[serde(default)]
pub mode: SandboxMode,
#[serde(skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout_secs: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub idle_timeout_secs: Option<i32>,
#[serde(default = "default_cpu_limit")]
pub cpu_limit: i32,
#[serde(default = "default_memory_limit_mb")]
pub memory_limit_mb: i32,
#[serde(default = "default_disk_limit_mb")]
pub disk_limit_mb: i32,
#[serde(default)]
pub env_vars: serde_json::Value,
#[serde(default)]
pub labels: serde_json::Value,
#[serde(default)]
pub mounts: serde_json::Value,
#[serde(default)]
pub network_enabled: bool,
#[serde(default)]
pub ephemeral: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub auto_stop_after_secs: Option<i32>,
}
fn default_cpu_limit() -> i32 {
1
}
fn default_memory_limit_mb() -> i32 {
512
}
fn default_disk_limit_mb() -> i32 {
1024
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum SandboxMode {
#[default]
Embedded,
Remote,
}
impl std::fmt::Display for SandboxMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SandboxMode::Embedded => write!(f, "embedded"),
SandboxMode::Remote => write!(f, "remote"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SandboxInfo {
pub id: Uuid,
pub workspace_id: String,
pub status: SandboxStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
pub labels: serde_json::Value,
pub mode: String,
pub created_by: String,
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_activity_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub suspended_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stopped_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
pub ephemeral: bool,
pub cpu_limit: i32,
pub memory_limit_mb: i32,
pub network_enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecRequest {
pub command: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout_secs: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecResult {
pub exec_id: Uuid,
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
pub duration_ms: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WriteFileRequest {
pub path: String,
pub content: String,
}

View File

@@ -42,6 +42,7 @@ bedrock = ["dep:aws-sdk-bedrockruntime", "windmill-common/bedrock", "dep:aws-con
[dependencies]
windmill-queue.workspace = true
windmill-sandbox.workspace = true
windmill-dep-map.workspace = true
windmill-audit.workspace = true # there isn't really a reason for audit-worth actions to happen in the worker.
windmill-common = { workspace = true, default-features = false }

View File

@@ -0,0 +1,95 @@
name: "windmill sandbox"
mode: LISTEN
hostname: "sandbox"
log_level: ERROR
disable_rl: true
cwd: "/workspace"
clone_newnet: {CLONE_NEWNET}
clone_newuser: {CLONE_NEWUSER}
skip_setsid: true
keep_caps: false
keep_env: true
mount_proc: true
mount {
src: "/bin"
dst: "/bin"
is_bind: true
}
mount {
src: "/proc/self/fd"
dst: "/dev/fd"
is_symlink: true
mandatory: false
}
mount {
src: "/lib"
dst: "/lib"
is_bind: true
}
mount {
src: "/lib64"
dst: "/lib64"
is_bind: true
mandatory: false
}
mount {
src: "/usr"
dst: "/usr"
is_bind: true
}
mount {
src: "/etc"
dst: "/etc"
is_bind: true
}
mount {
src: "/dev/null"
dst: "/dev/null"
is_bind: true
rw: true
}
mount {
src: "/dev/random"
dst: "/dev/random"
is_bind: true
}
mount {
src: "/dev/urandom"
dst: "/dev/urandom"
is_bind: true
}
mount {
src: "{SANDBOX_DIR}"
dst: "/workspace"
is_bind: true
rw: true
}
mount {
dst: "/tmp"
fstype: "tmpfs"
rw: true
options: "size={DISK_LIMIT}"
}
iface_no_lo: {IFACE_NO_LO}
cgroup_mem_max: {MEMORY_LIMIT}
cgroup_cpu_ms_per_sec: {CPU_MS_PER_SEC}
{EXTRA_MOUNTS}

View File

@@ -66,6 +66,8 @@ mod python_versions;
pub mod result_processor;
#[cfg(feature = "rust")]
mod rust_executor;
#[allow(dead_code)]
mod sandbox_server;
mod sanitized_sql_params;
mod schema;
pub mod sql_utils;

View File

@@ -0,0 +1,221 @@
use std::future::IntoFuture;
use std::sync::Arc;
use axum::{
extract::{Path, Query},
routing::{get, post},
Extension, Json, Router,
};
use serde::Deserialize;
use tokio::net::TcpListener;
use uuid::Uuid;
use windmill_sandbox::{ExecRequest, ExecResult, SandboxConfig, SandboxManager};
struct EmbeddedSandboxState {
manager: SandboxManager,
}
pub struct EmbeddedSandboxHandle {
port: u16,
state: Arc<EmbeddedSandboxState>,
shutdown_tx: tokio::sync::oneshot::Sender<()>,
}
impl EmbeddedSandboxHandle {
pub fn url(&self) -> String {
format!("http://127.0.0.1:{}", self.port)
}
pub async fn shutdown(self) {
self.state.manager.cleanup_all().await;
let _ = self.shutdown_tx.send(());
}
}
pub async fn start_embedded_sandbox_server(
base_dir: String,
) -> windmill_common::error::Result<EmbeddedSandboxHandle> {
let manager = SandboxManager::new(base_dir);
let state = Arc::new(EmbeddedSandboxState { manager });
let app = Router::new()
.route("/sandbox/create", post(handle_create))
.route("/sandbox/:id/exec", post(handle_exec))
.route("/sandbox/:id/suspend", post(handle_suspend))
.route("/sandbox/:id/resume", post(handle_resume))
.route("/sandbox/:id/terminate", post(handle_terminate))
.route("/sandbox/:id/status", get(handle_status))
.route("/sandbox/:id/write_file", post(handle_write_file))
.route("/sandbox/:id/read_file", get(handle_read_file))
.route("/sandboxes", get(handle_list))
.layer(Extension(state.clone()));
let listener = TcpListener::bind("127.0.0.1:0")
.await
.map_err(|e| {
windmill_common::error::Error::InternalErr(format!(
"Failed to bind embedded sandbox server: {e}"
))
})?;
let port = listener.local_addr().unwrap().port();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
let server = axum::serve(listener, app);
tokio::select! {
result = server.into_future() => {
if let Err(e) = result {
tracing::error!("Embedded sandbox server error: {e}");
}
}
_ = shutdown_rx => {
tracing::debug!("Embedded sandbox server shutting down");
}
}
});
tracing::info!("Embedded sandbox server started on port {port}");
Ok(EmbeddedSandboxHandle {
port,
state,
shutdown_tx,
})
}
async fn handle_create(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Json(config): Json<SandboxConfig>,
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
match state.manager.create_sandbox(config).await {
Ok((id, status)) => Ok(Json(serde_json::json!({
"id": id,
"status": status,
}))),
Err(e) => {
tracing::error!("Failed to create sandbox: {e}");
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn handle_exec(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
Json(request): Json<ExecRequest>,
) -> Result<Json<ExecResult>, axum::http::StatusCode> {
match state.manager.exec(id, request).await {
Ok(result) => Ok(Json(result)),
Err(e) => {
tracing::error!("Failed to exec in sandbox {id}: {e}");
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn handle_suspend(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
match state.manager.suspend(id).await {
Ok(status) => Ok(Json(serde_json::json!({ "status": status }))),
Err(e) => {
tracing::error!("Failed to suspend sandbox {id}: {e}");
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn handle_resume(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
match state.manager.resume(id).await {
Ok(status) => Ok(Json(serde_json::json!({ "status": status }))),
Err(e) => {
tracing::error!("Failed to resume sandbox {id}: {e}");
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn handle_terminate(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
match state.manager.terminate(id).await {
Ok(status) => Ok(Json(serde_json::json!({ "status": status }))),
Err(e) => {
tracing::error!("Failed to terminate sandbox {id}: {e}");
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn handle_status(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
match state.manager.status(id).await {
Ok(status) => Ok(Json(serde_json::json!({ "id": id, "status": status }))),
Err(e) => {
tracing::error!("Failed to get sandbox {id} status: {e}");
Err(axum::http::StatusCode::NOT_FOUND)
}
}
}
#[derive(Deserialize)]
struct WriteFileBody {
path: String,
content: String,
}
async fn handle_write_file(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
Json(body): Json<WriteFileBody>,
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
match state
.manager
.write_file(id, &body.path, body.content.as_bytes())
.await
{
Ok(()) => Ok(Json(serde_json::json!({ "ok": true }))),
Err(e) => {
tracing::error!("Failed to write file to sandbox {id}: {e}");
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
#[derive(Deserialize)]
struct ReadFileQuery {
path: String,
}
async fn handle_read_file(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
Path(id): Path<Uuid>,
Query(query): Query<ReadFileQuery>,
) -> Result<String, axum::http::StatusCode> {
match state.manager.read_file(id, &query.path).await {
Ok(content) => Ok(content),
Err(e) => {
tracing::error!("Failed to read file from sandbox {id}: {e}");
Err(axum::http::StatusCode::NOT_FOUND)
}
}
}
async fn handle_list(
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
) -> Json<serde_json::Value> {
let sandboxes = state.manager.list_sandboxes().await;
let list: Vec<serde_json::Value> = sandboxes
.into_iter()
.map(|(id, status)| serde_json::json!({ "id": id, "status": status }))
.collect();
Json(serde_json::json!(list))
}

View File

@@ -2352,7 +2352,226 @@ def parse_sql_client_name(name: str) -> tuple[str, Optional[str]]:
name = name
schema = None
if ":" in name:
name, schema = name.split(":", 1)
name, schema = name.split(":", 1)
if not name:
name = "main"
return name, schema
return name, schema
# --- Sandbox SDK ---
class SandboxExecResult:
def __init__(self, data: dict):
self.exec_id: str = data.get("exec_id", "")
self.exit_code: int = data.get("exit_code", -1)
self.stdout: str = data.get("stdout", "")
self.stderr: str = data.get("stderr", "")
self.duration_ms: int = data.get("duration_ms", 0)
def __repr__(self):
return f"SandboxExecResult(exit_code={self.exit_code}, duration_ms={self.duration_ms})"
class SandboxInfo:
def __init__(self, data: dict):
self.id: str = data["id"]
self.workspace_id: str = data.get("workspace_id", "")
self.status: str = data["status"]
self.image: Optional[str] = data.get("image")
self.labels: dict = data.get("labels", {})
self.mode: str = data.get("mode", "embedded")
self.created_by: str = data.get("created_by", "")
self.created_at: str = data.get("created_at", "")
self.started_at: Optional[str] = data.get("started_at")
self.last_activity_at: Optional[str] = data.get("last_activity_at")
self.suspended_at: Optional[str] = data.get("suspended_at")
self.stopped_at: Optional[str] = data.get("stopped_at")
self.error_message: Optional[str] = data.get("error_message")
self.ephemeral: bool = data.get("ephemeral", False)
self.cpu_limit: int = data.get("cpu_limit", 1)
self.memory_limit_mb: int = data.get("memory_limit_mb", 512)
self.network_enabled: bool = data.get("network_enabled", False)
def __repr__(self):
return f"SandboxInfo(id={self.id}, status={self.status})"
class Sandbox:
def __init__(self, sandbox_id: str, base_url: str, use_direct_http: bool, token: str = "", workspace: str = ""):
self.id = sandbox_id
self._base_url = base_url
self._use_direct_http = use_direct_http
self._token = token
self._workspace = workspace
@staticmethod
def create(
mode: str = "embedded",
image: Optional[str] = None,
timeout_secs: Optional[int] = None,
idle_timeout_secs: Optional[int] = None,
cpu_limit: int = 1,
memory_limit_mb: int = 512,
disk_limit_mb: int = 1024,
env_vars: Optional[Dict[str, str]] = None,
labels: Optional[Dict[str, str]] = None,
network_enabled: bool = False,
ephemeral: bool = False,
) -> "Sandbox":
config = {
"mode": mode,
"cpu_limit": cpu_limit,
"memory_limit_mb": memory_limit_mb,
"disk_limit_mb": disk_limit_mb,
"env_vars": env_vars or {},
"labels": labels or {},
"mounts": [],
"network_enabled": network_enabled,
"ephemeral": ephemeral,
}
if image is not None:
config["image"] = image
if timeout_secs is not None:
config["timeout_secs"] = timeout_secs
if idle_timeout_secs is not None:
config["idle_timeout_secs"] = idle_timeout_secs
sandbox_url = os.environ.get("WM_SANDBOX_URL")
if mode == "embedded" and sandbox_url:
resp = httpx.post(f"{sandbox_url}/sandbox/create", json=config, timeout=30)
resp.raise_for_status()
data = resp.json()
return Sandbox(data["id"], sandbox_url, True)
wm = _client or Windmill()
data = wm.post(
f"/w/{wm.workspace}/sandboxes/create",
json=config,
).json()
return Sandbox(
data["id"],
wm.base_url,
False,
wm.token,
wm.workspace,
)
@staticmethod
def get(sandbox_id: str) -> "Sandbox":
sandbox_url = os.environ.get("WM_SANDBOX_URL")
if sandbox_url:
return Sandbox(sandbox_id, sandbox_url, True)
wm = _client or Windmill()
return Sandbox(sandbox_id, wm.base_url, False, wm.token, wm.workspace)
@staticmethod
def list(
status: Optional[list[str]] = None,
labels: Optional[Dict[str, str]] = None,
) -> list[SandboxInfo]:
wm = _client or Windmill()
params: Dict[str, str] = {}
if status:
params["status"] = ",".join(status)
if labels:
first_key = next(iter(labels))
params["label_key"] = first_key
params["label_value"] = labels[first_key]
resp = wm.get(f"/w/{wm.workspace}/sandboxes/list", params=params).json()
return [SandboxInfo(s) for s in resp]
def exec(
self,
command: str,
*,
timeout_secs: Optional[int] = None,
cwd: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
) -> SandboxExecResult:
payload: Dict[str, Any] = {"command": command}
if timeout_secs is not None:
payload["timeout_secs"] = timeout_secs
if cwd is not None:
payload["cwd"] = cwd
if env is not None:
payload["env"] = env
if self._use_direct_http:
resp = httpx.post(
f"{self._base_url}/sandbox/{self.id}/exec",
json=payload,
timeout=httpx.Timeout((timeout_secs or 300) + 10, connect=10),
)
resp.raise_for_status()
return SandboxExecResult(resp.json())
wm = _client or Windmill()
data = wm.post(f"/w/{wm.workspace}/sandboxes/{self.id}/exec", json=payload).json()
return SandboxExecResult(data)
def suspend(self) -> None:
self._action("suspend")
def resume(self) -> None:
self._action("resume")
def terminate(self) -> None:
self._action("terminate")
def status(self) -> SandboxInfo:
if self._use_direct_http:
resp = httpx.get(f"{self._base_url}/sandbox/{self.id}/status", timeout=10)
resp.raise_for_status()
return SandboxInfo(resp.json())
wm = _client or Windmill()
data = wm.get(f"/w/{wm.workspace}/sandboxes/{self.id}").json()
return SandboxInfo(data)
def write_file(self, path: str, content: Union[str, bytes]) -> None:
if isinstance(content, bytes):
content = content.decode("utf-8")
if self._use_direct_http:
resp = httpx.post(
f"{self._base_url}/sandbox/{self.id}/write_file",
json={"path": path, "content": content},
timeout=30,
)
resp.raise_for_status()
return
wm = _client or Windmill()
wm.post(f"/w/{wm.workspace}/sandboxes/{self.id}/write_file", json={"path": path, "content": content})
def read_file(self, path: str) -> str:
if self._use_direct_http:
resp = httpx.get(
f"{self._base_url}/sandbox/{self.id}/read_file",
params={"path": path},
timeout=30,
)
resp.raise_for_status()
return resp.text()
wm = _client or Windmill()
return wm.get(f"/w/{wm.workspace}/sandboxes/{self.id}/read_file", params={"path": path}).json()
def list_execs(self) -> list[SandboxExecResult]:
wm = _client or Windmill()
data = wm.get(f"/w/{wm.workspace}/sandboxes/{self.id}/execs").json()
return [SandboxExecResult(e) for e in data]
def _action(self, action: str) -> None:
if self._use_direct_http:
resp = httpx.post(
f"{self._base_url}/sandbox/{self.id}/{action}",
timeout=30,
)
resp.raise_for_status()
return
wm = _client or Windmill()
wm.post(f"/w/{wm.workspace}/sandboxes/{self.id}/{action}", json={})